pub trait Storage: Send + Sync {
// Required methods
fn create_stream(
&self,
name: &str,
config: StreamConfig,
) -> Result<CreateStreamResult>;
fn append(
&self,
name: &str,
data: Bytes,
content_type: &str,
) -> Result<Offset>;
fn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>;
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
fn delete(&self, name: &str) -> Result<()>;
fn head(&self, name: &str) -> Result<StreamMetadata>;
fn close_stream(&self, name: &str) -> Result<()>;
fn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>;
fn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>;
fn exists(&self, name: &str) -> bool;
fn subscribe(&self, name: &str) -> Option<Receiver<()>>;
}Expand description
Storage trait for stream persistence
Methods are intentionally sync (not async) to keep the core logic simple. Async boundaries are at the handler layer and notification layer.
Implementations must be thread-safe (Send + Sync).
Error conditions are documented inline rather than in separate sections to avoid repetitive documentation on internal trait methods.
Required Methods§
Sourcefn create_stream(
&self,
name: &str,
config: StreamConfig,
) -> Result<CreateStreamResult>
fn create_stream( &self, name: &str, config: StreamConfig, ) -> Result<CreateStreamResult>
Create a new stream
Returns whether the stream was newly created or already existed.
Returns Err(Error::ConfigMismatch) if stream exists with different config.
Sourcefn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>
fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>
Append data to a stream
Generates and returns the offset for this message. Offsets must be monotonically increasing.
Returns Err(Error::StreamClosed) if stream is closed.
Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.
Sourcefn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>
fn batch_append( &self, name: &str, messages: Vec<Bytes>, content_type: &str, seq: Option<&str>, ) -> Result<Offset>
Append multiple messages atomically
All messages are validated and committed as a single atomic operation. Either all messages are appended successfully, or none are. Returns the next offset (the offset that will be assigned to the next message appended after this batch).
If seq is Some, validates lexicographic ordering against the
stream’s last seq and updates it on success.
Returns Err(Error::StreamClosed) if stream is closed.
Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.
Returns Err(Error::SeqOrderingViolation) if seq <= last seq.
Returns Err(Error::MemoryLimitExceeded) if batch would exceed limits.
Sourcefn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>
Read messages from a stream starting at offset
If from_offset is Offset::start(), reads from beginning.
If from_offset is Offset::now(), returns empty result at tail.
Returns Err(Error::NotFound) if stream doesn’t exist.
Returns Err(Error::InvalidOffset) if offset is invalid.
Sourcefn delete(&self, name: &str) -> Result<()>
fn delete(&self, name: &str) -> Result<()>
Delete a stream
Returns Ok(()) on successful deletion.
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn head(&self, name: &str) -> Result<StreamMetadata>
fn head(&self, name: &str) -> Result<StreamMetadata>
Get stream metadata
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn close_stream(&self, name: &str) -> Result<()>
fn close_stream(&self, name: &str) -> Result<()>
Close a stream
Prevents further appends.
Returns Ok(()) if already closed (idempotent).
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>
fn append_with_producer( &self, name: &str, messages: Vec<Bytes>, content_type: &str, producer: &ProducerHeaders, should_close: bool, seq: Option<&str>, ) -> Result<ProducerAppendResult>
Append messages with producer sequencing (atomic validation + append)
Validates producer epoch/sequence, appends data if accepted, and optionally closes the stream — all within a single lock hold.
Returns ProducerAppendResult::Accepted for new data (200 OK).
Returns ProducerAppendResult::Duplicate for already-seen seq (204).
Returns Err(EpochFenced) if epoch < current (403).
Returns Err(SequenceGap) if seq > expected (409).
Returns Err(InvalidProducerState) if epoch bump with seq != 0 (400).
Sourcefn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>
fn create_stream_with_data( &self, name: &str, config: StreamConfig, messages: Vec<Bytes>, should_close: bool, ) -> Result<CreateWithDataResult>
Atomically create a stream with optional initial data and close.
Creates the stream, appends messages (if non-empty), and closes
(if should_close) — all before the entry becomes visible to other
operations. If commit_messages fails (e.g. memory limit), the
stream is never created.
For idempotent recreates (AlreadyExists), the body and close
flag are ignored and existing metadata is returned.
Sourcefn subscribe(&self, name: &str) -> Option<Receiver<()>>
fn subscribe(&self, name: &str) -> Option<Receiver<()>>
Subscribe to notifications for new data on a stream.
Returns a broadcast receiver that fires when data is appended
or the stream is closed. Returns None if the stream does not
exist or has expired.
The method itself is sync; the handler awaits on the receiver.