Skip to main content

Storage

Trait Storage 

Source
pub trait Storage: Send + Sync {
    // Required methods
    fn create_stream(
        &self,
        name: &str,
        config: StreamConfig,
    ) -> Result<CreateStreamResult>;
    fn append(
        &self,
        name: &str,
        data: Bytes,
        content_type: &str,
    ) -> Result<Offset>;
    fn batch_append(
        &self,
        name: &str,
        messages: Vec<Bytes>,
        content_type: &str,
        seq: Option<&str>,
    ) -> Result<Offset>;
    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
    fn delete(&self, name: &str) -> Result<()>;
    fn head(&self, name: &str) -> Result<StreamMetadata>;
    fn close_stream(&self, name: &str) -> Result<()>;
    fn append_with_producer(
        &self,
        name: &str,
        messages: Vec<Bytes>,
        content_type: &str,
        producer: &ProducerHeaders,
        should_close: bool,
        seq: Option<&str>,
    ) -> Result<ProducerAppendResult>;
    fn create_stream_with_data(
        &self,
        name: &str,
        config: StreamConfig,
        messages: Vec<Bytes>,
        should_close: bool,
    ) -> Result<CreateWithDataResult>;
    fn exists(&self, name: &str) -> bool;
    fn subscribe(&self, name: &str) -> Option<Receiver<()>>;
    fn cleanup_expired_streams(&self) -> usize;
}
Expand description

Persistence contract for Durable Streams server state.

Methods are intentionally synchronous. The server keeps async boundaries in the HTTP and notification layers so storage implementations can focus on atomicity, ordering, and recovery.

Implementations are expected to preserve these invariants:

  • per-stream offsets are monotonic
  • create, append, close, and delete are atomic at the stream level
  • duplicate producer appends are idempotent
  • reads observe a coherent snapshot
  • expired streams behave as if they no longer exist

Implementations must also be thread-safe (Send + Sync), because the axum server shares them across request handlers.

Error conditions are documented inline rather than in separate sections to avoid repetitive documentation on internal trait methods.

Required Methods§

Source

fn create_stream( &self, name: &str, config: StreamConfig, ) -> Result<CreateStreamResult>

Create a stream entry with immutable configuration.

Returns whether the stream was newly created or already existed with matching configuration.

Returns Err(Error::ConfigMismatch) if stream exists with different config.

Source

fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>

Append one message to an existing stream.

Generates and returns the offset assigned to the appended message. Offsets must remain monotonically increasing within a stream.

Returns Err(Error::StreamClosed) if stream is closed. Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.

Source

fn batch_append( &self, name: &str, messages: Vec<Bytes>, content_type: &str, seq: Option<&str>, ) -> Result<Offset>

Append a batch of messages as one atomic operation.

All messages are validated and committed as a single atomic operation. Either all messages are appended successfully, or none are. Returns the next offset (the offset that will be assigned to the next message appended after this batch).

If seq is Some, validates lexicographic ordering against the stream’s last seq and updates it on success.

Returns Err(Error::StreamClosed) if stream is closed. Returns Err(Error::ContentTypeMismatch) if content type doesn’t match. Returns Err(Error::SeqOrderingViolation) if seq <= last seq. Returns Err(Error::MemoryLimitExceeded) if batch would exceed limits.

Source

fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>

Read from a stream starting at from_offset.

Offset::start() reads from the beginning of the stream. Offset::now() positions the caller at the current tail and returns an empty catch-up result.

Returns Err(Error::NotFound) if stream doesn’t exist. Returns Err(Error::InvalidOffset) if offset is invalid.

Source

fn delete(&self, name: &str) -> Result<()>

Delete a stream and all of its persisted data.

Returns Ok(()) on successful deletion. Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn head(&self, name: &str) -> Result<StreamMetadata>

Return stream metadata without reading message bodies.

Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn close_stream(&self, name: &str) -> Result<()>

Mark a stream closed so future appends are rejected.

Prevents further appends. Returns Ok(()) if already closed (idempotent). Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn append_with_producer( &self, name: &str, messages: Vec<Bytes>, content_type: &str, producer: &ProducerHeaders, should_close: bool, seq: Option<&str>, ) -> Result<ProducerAppendResult>

Append with idempotent producer sequencing.

Validates producer epoch/sequence, appends data if accepted, and optionally closes the stream — all within a single lock hold.

Returns ProducerAppendResult::Accepted for new data (200 OK). Returns ProducerAppendResult::Duplicate for already-seen seq (204). Returns Err(EpochFenced) if epoch < current (403). Returns Err(SequenceGap) if seq > expected (409). Returns Err(InvalidProducerState) if epoch bump with seq != 0 (400).

Source

fn create_stream_with_data( &self, name: &str, config: StreamConfig, messages: Vec<Bytes>, should_close: bool, ) -> Result<CreateWithDataResult>

Atomically create a stream, optionally seed it with data, and optionally close it.

Creates the stream, appends messages (if non-empty), and closes (if should_close) — all before the entry becomes visible to other operations. If commit_messages fails (e.g. memory limit), the stream is never created.

For idempotent recreates (AlreadyExists), the body and close flag are ignored and existing metadata is returned.

Source

fn exists(&self, name: &str) -> bool

Check if a stream exists

Source

fn subscribe(&self, name: &str) -> Option<Receiver<()>>

Subscribe to notifications for new data on a stream.

Returns a broadcast receiver that fires when data is appended or the stream is closed. Returns None if the stream does not exist or has expired.

The method itself is sync; the handler awaits on the receiver.

Source

fn cleanup_expired_streams(&self) -> usize

Proactively remove all expired streams, returning the count deleted.

By default, expired streams are only cleaned up lazily when accessed. This method sweeps all streams and deletes any that have expired, reclaiming their resources immediately.

Implementors§