pub mod acid;
pub mod file;
pub(crate) mod fork;
pub mod memory;
pub(crate) mod shared;
use crate::protocol::error::Result;
use crate::protocol::offset::Offset;
use crate::protocol::producer::ProducerHeaders;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tokio::sync::broadcast;
pub(crate) use shared::{
NOTIFY_CHANNEL_CAPACITY, ProducerCheck, ProducerState, check_producer, cleanup_stale_producers,
is_stream_expired, validate_content_type, validate_seq,
};
#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
pub struct StreamConfig {
pub content_type: String,
pub ttl_seconds: Option<u64>,
pub expires_at: Option<DateTime<Utc>>,
pub created_closed: bool,
}
impl PartialEq for StreamConfig {
fn eq(&self, other: &Self) -> bool {
self.content_type == other.content_type
&& self.ttl_seconds == other.ttl_seconds
&& self.created_closed == other.created_closed
&& if self.ttl_seconds.is_some() {
true
} else {
self.expires_at == other.expires_at
}
}
}
impl StreamConfig {
#[must_use]
pub fn new(content_type: String) -> Self {
Self {
content_type,
ttl_seconds: None,
expires_at: None,
created_closed: false,
}
}
#[must_use]
pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
self.ttl_seconds = Some(ttl_seconds);
self
}
#[must_use]
pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
self.expires_at = Some(expires_at);
self
}
#[must_use]
pub fn with_created_closed(mut self, created_closed: bool) -> Self {
self.created_closed = created_closed;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ForkInfo {
pub source_name: String,
#[serde(
serialize_with = "crate::protocol::offset::serialize_offset",
deserialize_with = "crate::protocol::offset::deserialize_offset"
)]
pub fork_offset: Offset,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum StreamState {
#[default]
Active,
Tombstone,
}
#[derive(Debug, Clone)]
pub struct Message {
pub offset: Offset,
pub data: Bytes,
pub byte_len: u64,
}
impl Message {
#[must_use]
pub fn new(offset: Offset, data: Bytes) -> Self {
let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
Self {
offset,
data,
byte_len,
}
}
}
#[derive(Debug)]
pub struct ReadResult {
pub messages: Vec<Bytes>,
pub next_offset: Offset,
pub at_tail: bool,
pub closed: bool,
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub config: StreamConfig,
pub next_offset: Offset,
pub closed: bool,
pub total_bytes: u64,
pub message_count: u64,
pub created_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CreateStreamResult {
Created,
AlreadyExists,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ForkCreateSpec {
pub source_name: String,
pub fork_offset: Offset,
pub config: StreamConfig,
}
#[derive(Debug)]
pub struct CreateWithDataResult {
pub status: CreateStreamResult,
pub next_offset: Offset,
pub closed: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProducerAppendResult {
Accepted {
epoch: u64,
seq: u64,
next_offset: Offset,
closed: bool,
},
Duplicate {
epoch: u64,
seq: u64,
next_offset: Offset,
closed: bool,
},
}
#[allow(clippy::missing_errors_doc)]
pub trait Storage: Send + Sync {
fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
fn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>;
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
fn delete(&self, name: &str) -> Result<()>;
fn head(&self, name: &str) -> Result<StreamMetadata>;
fn close_stream(&self, name: &str) -> Result<()>;
fn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>;
fn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>;
fn exists(&self, name: &str) -> bool;
fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
fn cleanup_expired_streams(&self) -> usize;
fn list_streams(&self) -> Result<Vec<(String, StreamMetadata)>>;
fn create_fork(
&self,
name: &str,
source_name: &str,
fork_offset: Option<&Offset>,
config: StreamConfig,
) -> Result<CreateStreamResult>;
}