Skip to main content

Storage

Struct Storage 

Source
pub struct Storage { /* private fields */ }
Expand description

Append-only event log storage with checkpoint support and segment rotation.

Manages segment files on disk, providing append and read operations for event streams. Each stream gets its own directory with numbered segment files. Segments rotate when they exceed max_segment_size bytes.

§Invariants

  • Records are append-only; existing data is never modified
  • Each record links to the previous via prev_hash (hash chain)
  • The offset index stays in sync with the log (updated atomically with appends)
  • Checkpoints are created according to the configured policy
  • Hash chain integrity is maintained across segment boundaries

Implementations§

Source§

impl Storage

Source

pub fn new(data_dir: impl Into<PathBuf>) -> Self

Creates a new storage instance with the given data directory.

The directory will be created if it doesn’t exist when the first write occurs. Uses the default checkpoint policy.

Source

pub fn with_checkpoint_policy( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, ) -> Self

Creates a new storage instance with a custom checkpoint policy.

Source

pub fn with_max_segment_size( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, max_segment_size: u64, ) -> Self

Creates a new storage instance with a custom maximum segment size.

Source

pub fn with_compression( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, compression: CompressionKind, ) -> Self

Creates a new storage instance with compression enabled.

Source

pub fn default_compression(&self) -> CompressionKind

Returns the default compression kind.

Source

pub fn checkpoint_policy(&self) -> &CheckpointPolicy

Returns the current checkpoint policy.

Source

pub fn data_dir(&self) -> &PathBuf

Returns the data directory path.

Source

pub fn max_segment_size(&self) -> u64

Returns the maximum segment size in bytes.

Source

pub fn index_path(&self, stream_id: StreamId) -> PathBuf

Returns the path to the index file for the active segment.

Source

pub fn rebuild_index( &self, stream_id: StreamId, ) -> Result<OffsetIndex, StorageError>

Rebuilds the offset index for a specific segment by scanning the log file.

This is the recovery path when the index file is missing or corrupted. Scans every record in the segment to reconstruct byte positions.

§Performance

O(n) where n is the number of records in the segment. With segment rotation, this is bounded to a single segment’s worth of data.

§Errors

Returns StorageError::CorruptedRecord if any record in the log is invalid.

Source

pub fn load_or_rebuild_index( &self, stream_id: StreamId, ) -> Result<OffsetIndex, StorageError>

Loads the offset index from disk, or rebuilds it if missing/corrupted.

This is the primary way to obtain an index for the active segment.

Source

pub fn append_batch( &mut self, stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>

Appends a batch of events to a stream, building the hash chain.

Each event is written as a Record with a cryptographic link to the previous record, forming a tamper-evident chain. The offset index is updated atomically with the append to maintain O(1) lookup capability.

If the active segment exceeds max_segment_size, a new segment is created (rotation). The hash chain remains continuous across segments.

§Arguments
  • stream_id - The stream to append to
  • events - The event payloads to append (must not be empty)
  • expected_offset - The offset to start writing at
  • prev_hash - Hash of the previous record (None for genesis)
  • fsync - Whether to fsync after writing (recommended for durability)
§Returns

A tuple of:

  • The next offset (for subsequent appends)
  • The hash of the last record written (for chain continuity)
§Panics

Panics if events is empty. Empty batches are a caller bug.

Source

pub fn read_from( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>

Reads events from a stream with checkpoint-optimized verification.

Uses the nearest checkpoint as a verification anchor, reducing verification cost from O(n) to O(k) where k is the distance to the nearest checkpoint. Falls back to genesis verification if no checkpoints exist.

Reads span across segment boundaries transparently.

Source

pub fn read_from_genesis( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>

Reads events from a stream with full genesis verification.

Verifies the hash chain from genesis (offset 0) across all segments. For most use cases, prefer Self::read_from which uses checkpoint-optimized verification for better performance.

Source

pub fn read_records_from_genesis( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Record>, StorageError>

Reads records from a stream with full genesis hash chain verification.

Verifies the hash chain from genesis up to and including the requested records, spanning all segments. This ensures tamper detection.

Source

pub fn append_batch_pipelined( &mut self, stream_id: StreamId, events: &[Bytes], expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, pipeline: &mut AppendPipeline, ) -> Result<(Offset, ChainHash), StorageError>

Appends a batch of events using the two-stage pipeline.

Stage 1 (CPU): Serializes records, computes hash chain, compresses payloads into a pre-allocated buffer. Stage 2 (I/O): Writes the buffer to disk.

This method is functionally equivalent to Self::append_batch but uses the pipeline’s buffer management for better throughput when called repeatedly.

Source

pub fn rebuild_checkpoint_index( &mut self, stream_id: StreamId, ) -> Result<CheckpointIndex, StorageError>

Rebuilds the checkpoint index by scanning all segments for checkpoint records.

Source

pub fn create_checkpoint( &mut self, stream_id: StreamId, current_offset: Offset, prev_hash: Option<ChainHash>, record_count: u64, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>

Creates a checkpoint at the current position in the active segment.

Source

pub fn read_records_verified( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Record>, StorageError>

Reads records with checkpoint-optimized verification.

Instead of verifying from genesis, this method verifies from the nearest checkpoint before from_offset. Reads span across segment boundaries.

Source

pub fn last_checkpoint( &mut self, stream_id: StreamId, ) -> Result<Option<Offset>, StorageError>

Returns the last checkpoint for a stream, if any.

Source

pub fn latest_chain_hash( &mut self, stream_id: StreamId, ) -> Result<Option<ChainHash>, StorageError>

Returns the chain hash of the last appended record for a stream.

Used to recover the in-memory chain_heads map on process restart: without this, the first post-restart append would write prev_hash = None into a stream whose on-disk tail is non-null, producing a permanent chain break that only surfaces on a later verified read.

Returns Ok(None) for streams with no records (never written to, or written then pruned). Returns Ok(Some(hash)) for streams whose active segment contains at least one record.

Scans the active segment from its start (or the nearest checkpoint when one exists) — linear in segment size, bounded by the configured max segment size. Only called on startup or on first append to a stream after process restart, so the cost amortises to near zero.

Source

pub fn segment_count(&self, stream_id: StreamId) -> usize

Returns information about all segments for a stream.

Source

pub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32>

Returns the list of completed (immutable) segment numbers for a stream.

Source

pub fn flush_indexes(&mut self) -> Result<(), StorageError>

Flushes all dirty indexes to disk.

Call this on shutdown or before operations that require index durability. This is also called automatically from Drop to prevent stale indexes.

Trait Implementations§

Source§

impl Debug for Storage

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Drop for Storage

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more
Source§

impl StorageBackend for Storage

Source§

fn append_batch( &mut self, stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>

Appends a batch of events to a stream and extends the hash chain. Read more
Source§

fn read_from( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>

Reads events from a stream with checkpoint-optimised chain verification. Read more
Source§

fn latest_chain_hash( &mut self, stream_id: StreamId, ) -> Result<Option<ChainHash>, StorageError>

Returns the chain hash of the last appended record for the stream, or None if the stream has never been written to. Read more
Source§

fn segment_count(&self, stream_id: StreamId) -> usize

Number of segments (active + completed) for a stream. Read more
Source§

fn completed_segments(&self, stream_id: StreamId) -> Vec<u32>

Numbers of the completed (immutable) segments for a stream.
Source§

fn flush_indexes(&mut self) -> Result<(), StorageError>

Best-effort flush of any backend-internal buffers. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more