Skip to main content

Storage

Trait Storage 

Source
pub trait Storage: Send + Sync {
    // Required methods
    fn create_stream(
        &self,
        name: &str,
        config: StreamConfig,
    ) -> Result<CreateStreamResult>;
    fn append(
        &self,
        name: &str,
        data: Bytes,
        content_type: &str,
    ) -> Result<Offset>;
    fn batch_append(
        &self,
        name: &str,
        messages: Vec<Bytes>,
        content_type: &str,
        seq: Option<&str>,
    ) -> Result<Offset>;
    fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>;
    fn delete(&self, name: &str) -> Result<()>;
    fn head(&self, name: &str) -> Result<StreamMetadata>;
    fn close_stream(&self, name: &str) -> Result<()>;
    fn append_with_producer(
        &self,
        name: &str,
        messages: Vec<Bytes>,
        content_type: &str,
        producer: &ProducerHeaders,
        should_close: bool,
        seq: Option<&str>,
    ) -> Result<ProducerAppendResult>;
    fn create_stream_with_data(
        &self,
        name: &str,
        config: StreamConfig,
        messages: Vec<Bytes>,
        should_close: bool,
    ) -> Result<CreateWithDataResult>;
    fn exists(&self, name: &str) -> bool;
    fn subscribe(&self, name: &str) -> Option<Receiver<()>>;
}
Expand description

Storage trait for stream persistence

Methods are intentionally sync (not async) to keep the core logic simple. Async boundaries are at the handler layer and notification layer.

Implementations must be thread-safe (Send + Sync).

Error conditions are documented inline rather than in separate sections to avoid repetitive documentation on internal trait methods.

Required Methods§

Source

fn create_stream( &self, name: &str, config: StreamConfig, ) -> Result<CreateStreamResult>

Create a new stream

Returns whether the stream was newly created or already existed.

Returns Err(Error::ConfigMismatch) if stream exists with different config.

Source

fn append(&self, name: &str, data: Bytes, content_type: &str) -> Result<Offset>

Append data to a stream

Generates and returns the offset for this message. Offsets must be monotonically increasing.

Returns Err(Error::StreamClosed) if stream is closed. Returns Err(Error::ContentTypeMismatch) if content type doesn’t match.

Source

fn batch_append( &self, name: &str, messages: Vec<Bytes>, content_type: &str, seq: Option<&str>, ) -> Result<Offset>

Append multiple messages atomically

All messages are validated and committed as a single atomic operation. Either all messages are appended successfully, or none are. Returns the next offset (the offset that will be assigned to the next message appended after this batch).

If seq is Some, validates lexicographic ordering against the stream’s last seq and updates it on success.

Returns Err(Error::StreamClosed) if stream is closed. Returns Err(Error::ContentTypeMismatch) if content type doesn’t match. Returns Err(Error::SeqOrderingViolation) if seq <= last seq. Returns Err(Error::MemoryLimitExceeded) if batch would exceed limits.

Source

fn read(&self, name: &str, from_offset: &Offset) -> Result<ReadResult>

Read messages from a stream starting at offset

If from_offset is Offset::start(), reads from beginning. If from_offset is Offset::now(), returns empty result at tail.

Returns Err(Error::NotFound) if stream doesn’t exist. Returns Err(Error::InvalidOffset) if offset is invalid.

Source

fn delete(&self, name: &str) -> Result<()>

Delete a stream

Returns Ok(()) on successful deletion. Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn head(&self, name: &str) -> Result<StreamMetadata>

Get stream metadata

Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn close_stream(&self, name: &str) -> Result<()>

Close a stream

Prevents further appends. Returns Ok(()) if already closed (idempotent). Returns Err(Error::NotFound) if stream doesn’t exist.

Source

fn append_with_producer( &self, name: &str, messages: Vec<Bytes>, content_type: &str, producer: &ProducerHeaders, should_close: bool, seq: Option<&str>, ) -> Result<ProducerAppendResult>

Append messages with producer sequencing (atomic validation + append)

Validates producer epoch/sequence, appends data if accepted, and optionally closes the stream — all within a single lock hold.

Returns ProducerAppendResult::Accepted for new data (200 OK). Returns ProducerAppendResult::Duplicate for already-seen seq (204). Returns Err(EpochFenced) if epoch < current (403). Returns Err(SequenceGap) if seq > expected (409). Returns Err(InvalidProducerState) if epoch bump with seq != 0 (400).

Source

fn create_stream_with_data( &self, name: &str, config: StreamConfig, messages: Vec<Bytes>, should_close: bool, ) -> Result<CreateWithDataResult>

Atomically create a stream with optional initial data and close.

Creates the stream, appends messages (if non-empty), and closes (if should_close) — all before the entry becomes visible to other operations. If commit_messages fails (e.g. memory limit), the stream is never created.

For idempotent recreates (AlreadyExists), the body and close flag are ignored and existing metadata is returned.

Source

fn exists(&self, name: &str) -> bool

Check if a stream exists

Source

fn subscribe(&self, name: &str) -> Option<Receiver<()>>

Subscribe to notifications for new data on a stream.

Returns a broadcast receiver that fires when data is appended or the stream is closed. Returns None if the stream does not exist or has expired.

The method itself is sync; the handler awaits on the receiver.

Implementors§