pub mod acid;
pub mod file;
pub mod memory;
use crate::protocol::error::{Error, Result};
use crate::protocol::offset::Offset;
use crate::protocol::producer::ProducerHeaders;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use tokio::sync::broadcast;
#[derive(Debug, Clone, Eq, serde::Serialize, serde::Deserialize)]
pub struct StreamConfig {
pub content_type: String,
pub ttl_seconds: Option<u64>,
pub expires_at: Option<DateTime<Utc>>,
pub created_closed: bool,
}
impl PartialEq for StreamConfig {
fn eq(&self, other: &Self) -> bool {
self.content_type == other.content_type
&& self.ttl_seconds == other.ttl_seconds
&& self.created_closed == other.created_closed
&& if self.ttl_seconds.is_some() {
true
} else {
self.expires_at == other.expires_at
}
}
}
impl StreamConfig {
#[must_use]
pub fn new(content_type: String) -> Self {
Self {
content_type,
ttl_seconds: None,
expires_at: None,
created_closed: false,
}
}
#[must_use]
pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
self.ttl_seconds = Some(ttl_seconds);
self
}
#[must_use]
pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
self.expires_at = Some(expires_at);
self
}
#[must_use]
pub fn with_created_closed(mut self, created_closed: bool) -> Self {
self.created_closed = created_closed;
self
}
}
#[derive(Debug, Clone)]
pub struct Message {
pub offset: Offset,
pub data: Bytes,
pub byte_len: u64,
}
impl Message {
#[must_use]
pub fn new(offset: Offset, data: Bytes) -> Self {
let byte_len = u64::try_from(data.len()).unwrap_or(u64::MAX);
Self {
offset,
data,
byte_len,
}
}
}
#[derive(Debug)]
pub struct ReadResult {
pub messages: Vec<Bytes>,
pub next_offset: Offset,
pub at_tail: bool,
pub closed: bool,
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub config: StreamConfig,
pub next_offset: Offset,
pub closed: bool,
pub total_bytes: u64,
pub message_count: u64,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CreateStreamResult {
Created,
AlreadyExists,
}
#[derive(Debug)]
pub struct CreateWithDataResult {
pub status: CreateStreamResult,
pub next_offset: Offset,
pub closed: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProducerAppendResult {
Accepted {
epoch: u64,
seq: u64,
next_offset: Offset,
closed: bool,
},
Duplicate {
epoch: u64,
seq: u64,
next_offset: Offset,
closed: bool,
},
}
pub(crate) const PRODUCER_STATE_TTL_SECS: i64 = 7 * 24 * 60 * 60;
pub(crate) const NOTIFY_CHANNEL_CAPACITY: usize = 16;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProducerState {
pub epoch: u64,
pub last_seq: u64,
pub updated_at: DateTime<Utc>,
}
pub(crate) enum ProducerCheck {
Accept,
Duplicate { epoch: u64, seq: u64 },
}
pub(crate) fn is_stream_expired(config: &StreamConfig) -> bool {
config
.expires_at
.is_some_and(|expires_at| Utc::now() >= expires_at)
}
pub(crate) fn validate_content_type(stream_ct: &str, request_ct: &str) -> Result<()> {
if !request_ct.eq_ignore_ascii_case(stream_ct) {
return Err(Error::ContentTypeMismatch {
expected: stream_ct.to_string(),
actual: request_ct.to_string(),
});
}
Ok(())
}
pub(crate) fn validate_seq(
last_seq: Option<&str>,
new_seq: Option<&str>,
) -> Result<Option<String>> {
if let Some(new) = new_seq {
if let Some(last) = last_seq
&& new <= last
{
return Err(Error::SeqOrderingViolation {
last: last.to_string(),
received: new.to_string(),
});
}
return Ok(Some(new.to_string()));
}
Ok(None)
}
pub(crate) fn cleanup_stale_producers(producers: &mut HashMap<String, ProducerState>) {
let cutoff = Utc::now()
- chrono::TimeDelta::try_seconds(PRODUCER_STATE_TTL_SECS)
.expect("7 days fits in TimeDelta");
producers.retain(|_, state| state.updated_at > cutoff);
}
pub(crate) fn check_producer(
existing: Option<&ProducerState>,
producer: &ProducerHeaders,
stream_closed: bool,
) -> Result<ProducerCheck> {
if let Some(state) = existing {
if producer.epoch < state.epoch {
return Err(Error::EpochFenced {
current: state.epoch,
received: producer.epoch,
});
}
if producer.epoch == state.epoch && producer.seq <= state.last_seq {
return Ok(ProducerCheck::Duplicate {
epoch: state.epoch,
seq: state.last_seq,
});
}
if stream_closed {
return Err(Error::StreamClosed);
}
if producer.epoch > state.epoch {
if producer.seq != 0 {
return Err(Error::InvalidProducerState(
"new epoch must start at seq 0".to_string(),
));
}
} else if producer.seq > state.last_seq + 1 {
return Err(Error::SequenceGap {
expected: state.last_seq + 1,
actual: producer.seq,
});
}
} else {
if stream_closed {
return Err(Error::StreamClosed);
}
if producer.seq != 0 {
return Err(Error::SequenceGap {
expected: 0,
actual: producer.seq,
});
}
}
Ok(ProducerCheck::Accept)
}
#[allow(clippy::missing_errors_doc)]
pub trait Storage: Send + Sync {
fn create_stream(&self, name: &str, config: StreamConfig) -> Result<CreateStreamResult>;
fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>;
fn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>;
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
fn delete(&self, name: &str) -> Result<()>;
fn head(&self, name: &str) -> Result<StreamMetadata>;
fn close_stream(&self, name: &str) -> Result<()>;
fn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>;
fn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>;
fn exists(&self, name: &str) -> bool;
fn subscribe(&self, name: &str) -> Option<broadcast::Receiver<()>>;
fn cleanup_expired_streams(&self) -> usize;
}