pub trait Storage: Send + Sync {
// Required methods
fn create_stream(
&self,
name: &str,
config: StreamConfig,
) -> Result<CreateStreamResult>;
fn append(
&self,
name: &str,
data: Bytes,
content_type: &str,
) -> Result<Offset>;
fn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>;
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
fn delete(&self, name: &str) -> Result<()>;
fn head(&self, name: &str) -> Result<StreamMetadata>;
fn close_stream(&self, name: &str) -> Result<()>;
fn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>;
fn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>;
fn exists(&self, name: &str) -> bool;
fn subscribe(&self, name: &str) -> Option<Receiver<()>>;
fn cleanup_expired_streams(&self) -> usize;
}Expand description
Persistence contract for Durable Streams server state.
Methods are intentionally synchronous. The server keeps async boundaries in the HTTP and notification layers so storage implementations can focus on atomicity, ordering, and recovery.
Implementations are expected to preserve these invariants:
- per-stream offsets are monotonic
- create, append, close, and delete are atomic at the stream level
- duplicate producer appends are idempotent
- reads observe a coherent snapshot
- expired streams behave as if they no longer exist
Implementations must also be thread-safe (Send + Sync), because the axum
server shares them across request handlers.
Error conditions are documented inline rather than in separate sections to avoid repetitive documentation on internal trait methods.
Required Methods§
Sourcefn create_stream(
&self,
name: &str,
config: StreamConfig,
) -> Result<CreateStreamResult>
fn create_stream( &self, name: &str, config: StreamConfig, ) -> Result<CreateStreamResult>
Create a stream entry with immutable configuration.
Returns whether the stream was newly created or already existed with matching configuration.
Returns Err(Error::ConfigMismatch) if stream exists with different config.
Sourcefn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>
fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>
Append one message to an existing stream.
Generates and returns the offset assigned to the appended message. Offsets must remain monotonically increasing within a stream.
Returns Err(Error::StreamClosed) if stream is closed.
Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.
Sourcefn batch_append(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
seq: Option<&str>,
) -> Result<Offset>
fn batch_append( &self, name: &str, messages: Vec<Bytes>, content_type: &str, seq: Option<&str>, ) -> Result<Offset>
Append a batch of messages as one atomic operation.
All messages are validated and committed as a single atomic operation. Either all messages are appended successfully, or none are. Returns the next offset (the offset that will be assigned to the next message appended after this batch).
If seq is Some, validates lexicographic ordering against the
stream’s last seq and updates it on success.
Returns Err(Error::StreamClosed) if stream is closed.
Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.
Returns Err(Error::SeqOrderingViolation) if seq <= last seq.
Returns Err(Error::MemoryLimitExceeded) if batch would exceed limits.
Sourcefn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>
fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>
Read from a stream starting at from_offset.
Offset::start() reads from the beginning of the stream.
Offset::now() positions the caller at the current tail and returns
an empty catch-up result.
Returns Err(Error::NotFound) if stream doesn’t exist.
Returns Err(Error::InvalidOffset) if offset is invalid.
Sourcefn delete(&self, name: &str) -> Result<()>
fn delete(&self, name: &str) -> Result<()>
Delete a stream and all of its persisted data.
Returns Ok(()) on successful deletion.
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn head(&self, name: &str) -> Result<StreamMetadata>
fn head(&self, name: &str) -> Result<StreamMetadata>
Return stream metadata without reading message bodies.
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn close_stream(&self, name: &str) -> Result<()>
fn close_stream(&self, name: &str) -> Result<()>
Mark a stream closed so future appends are rejected.
Prevents further appends.
Returns Ok(()) if already closed (idempotent).
Returns Err(Error::NotFound) if stream doesn’t exist.
Sourcefn append_with_producer(
&self,
name: &str,
messages: Vec<Bytes>,
content_type: &str,
producer: &ProducerHeaders,
should_close: bool,
seq: Option<&str>,
) -> Result<ProducerAppendResult>
fn append_with_producer( &self, name: &str, messages: Vec<Bytes>, content_type: &str, producer: &ProducerHeaders, should_close: bool, seq: Option<&str>, ) -> Result<ProducerAppendResult>
Append with idempotent producer sequencing.
Validates producer epoch/sequence, appends data if accepted, and optionally closes the stream — all within a single lock hold.
Returns ProducerAppendResult::Accepted for new data (200 OK).
Returns ProducerAppendResult::Duplicate for already-seen seq (204).
Returns Err(EpochFenced) if epoch < current (403).
Returns Err(SequenceGap) if seq > expected (409).
Returns Err(InvalidProducerState) if epoch bump with seq != 0 (400).
Sourcefn create_stream_with_data(
&self,
name: &str,
config: StreamConfig,
messages: Vec<Bytes>,
should_close: bool,
) -> Result<CreateWithDataResult>
fn create_stream_with_data( &self, name: &str, config: StreamConfig, messages: Vec<Bytes>, should_close: bool, ) -> Result<CreateWithDataResult>
Atomically create a stream, optionally seed it with data, and optionally close it.
Creates the stream, appends messages (if non-empty), and closes
(if should_close) — all before the entry becomes visible to other
operations. If commit_messages fails (e.g. memory limit), the
stream is never created.
For idempotent recreates (AlreadyExists), the body and close
flag are ignored and existing metadata is returned.
Sourcefn subscribe(&self, name: &str) -> Option<Receiver<()>>
fn subscribe(&self, name: &str) -> Option<Receiver<()>>
Subscribe to notifications for new data on a stream.
Returns a broadcast receiver that fires when data is appended
or the stream is closed. Returns None if the stream does not
exist or has expired.
The method itself is sync; the handler awaits on the receiver.
Sourcefn cleanup_expired_streams(&self) -> usize
fn cleanup_expired_streams(&self) -> usize
Proactively remove all expired streams, returning the count deleted.
By default, expired streams are only cleaned up lazily when accessed. This method sweeps all streams and deletes any that have expired, reclaiming their resources immediately.