pub struct Storage { /* private fields */ }Expand description
Append-only event log storage with checkpoint support and segment rotation.
Manages segment files on disk, providing append and read operations for
event streams. Each stream gets its own directory with numbered segment files.
Segments rotate when they exceed max_segment_size bytes.
§Invariants
- Records are append-only; existing data is never modified
- Each record links to the previous via
prev_hash(hash chain) - The offset index stays in sync with the log (updated atomically with appends)
- Checkpoints are created according to the configured policy
- Hash chain integrity is maintained across segment boundaries
Implementations§
Source§impl Storage
impl Storage
Sourcepub fn new(data_dir: impl Into<PathBuf>) -> Self
pub fn new(data_dir: impl Into<PathBuf>) -> Self
Creates a new storage instance with the given data directory.
The directory will be created if it doesn’t exist when the first write occurs. Uses the default checkpoint policy.
Sourcepub fn with_checkpoint_policy(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
) -> Self
pub fn with_checkpoint_policy( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, ) -> Self
Creates a new storage instance with a custom checkpoint policy.
Sourcepub fn with_max_segment_size(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
max_segment_size: u64,
) -> Self
pub fn with_max_segment_size( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, max_segment_size: u64, ) -> Self
Creates a new storage instance with a custom maximum segment size.
Sourcepub fn with_compression(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
compression: CompressionKind,
) -> Self
pub fn with_compression( data_dir: impl Into<PathBuf>, checkpoint_policy: CheckpointPolicy, compression: CompressionKind, ) -> Self
Creates a new storage instance with compression enabled.
Sourcepub fn default_compression(&self) -> CompressionKind
pub fn default_compression(&self) -> CompressionKind
Returns the default compression kind.
Sourcepub fn checkpoint_policy(&self) -> &CheckpointPolicy
pub fn checkpoint_policy(&self) -> &CheckpointPolicy
Returns the current checkpoint policy.
Sourcepub fn max_segment_size(&self) -> u64
pub fn max_segment_size(&self) -> u64
Returns the maximum segment size in bytes.
Sourcepub fn index_path(&self, stream_id: StreamId) -> PathBuf
pub fn index_path(&self, stream_id: StreamId) -> PathBuf
Returns the path to the index file for the active segment.
Sourcepub fn rebuild_index(
&self,
stream_id: StreamId,
) -> Result<OffsetIndex, StorageError>
pub fn rebuild_index( &self, stream_id: StreamId, ) -> Result<OffsetIndex, StorageError>
Rebuilds the offset index for a specific segment by scanning the log file.
This is the recovery path when the index file is missing or corrupted. Scans every record in the segment to reconstruct byte positions.
§Performance
O(n) where n is the number of records in the segment. With segment rotation, this is bounded to a single segment’s worth of data.
§Errors
Returns StorageError::CorruptedRecord if any record in the log is invalid.
Sourcepub fn load_or_rebuild_index(
&self,
stream_id: StreamId,
) -> Result<OffsetIndex, StorageError>
pub fn load_or_rebuild_index( &self, stream_id: StreamId, ) -> Result<OffsetIndex, StorageError>
Loads the offset index from disk, or rebuilds it if missing/corrupted.
This is the primary way to obtain an index for the active segment.
Sourcepub fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError>
pub fn append_batch( &mut self, stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>
Appends a batch of events to a stream, building the hash chain.
Each event is written as a Record with a cryptographic link to the
previous record, forming a tamper-evident chain. The offset index is
updated atomically with the append to maintain O(1) lookup capability.
If the active segment exceeds max_segment_size, a new segment is
created (rotation). The hash chain remains continuous across segments.
§Arguments
stream_id- The stream to append toevents- The event payloads to append (must not be empty)expected_offset- The offset to start writing atprev_hash- Hash of the previous record (Nonefor genesis)fsync- Whether to fsync after writing (recommended for durability)
§Returns
A tuple of:
- The next offset (for subsequent appends)
- The hash of the last record written (for chain continuity)
§Panics
Panics if events is empty. Empty batches are a caller bug.
Sourcepub fn read_from(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError>
pub fn read_from( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>
Reads events from a stream with checkpoint-optimized verification.
Uses the nearest checkpoint as a verification anchor, reducing verification cost from O(n) to O(k) where k is the distance to the nearest checkpoint. Falls back to genesis verification if no checkpoints exist.
Reads span across segment boundaries transparently.
Sourcepub fn read_from_genesis(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError>
pub fn read_from_genesis( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>
Reads events from a stream with full genesis verification.
Verifies the hash chain from genesis (offset 0) across all segments.
For most use cases, prefer Self::read_from which uses checkpoint-optimized
verification for better performance.
Sourcepub fn read_records_from_genesis(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError>
pub fn read_records_from_genesis( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Record>, StorageError>
Reads records from a stream with full genesis hash chain verification.
Verifies the hash chain from genesis up to and including the requested records, spanning all segments. This ensures tamper detection.
Sourcepub fn append_batch_pipelined(
&mut self,
stream_id: StreamId,
events: &[Bytes],
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
pipeline: &mut AppendPipeline,
) -> Result<(Offset, ChainHash), StorageError>
pub fn append_batch_pipelined( &mut self, stream_id: StreamId, events: &[Bytes], expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, pipeline: &mut AppendPipeline, ) -> Result<(Offset, ChainHash), StorageError>
Appends a batch of events using the two-stage pipeline.
Stage 1 (CPU): Serializes records, computes hash chain, compresses payloads into a pre-allocated buffer. Stage 2 (I/O): Writes the buffer to disk.
This method is functionally equivalent to Self::append_batch but uses
the pipeline’s buffer management for better throughput when called repeatedly.
Sourcepub fn rebuild_checkpoint_index(
&mut self,
stream_id: StreamId,
) -> Result<CheckpointIndex, StorageError>
pub fn rebuild_checkpoint_index( &mut self, stream_id: StreamId, ) -> Result<CheckpointIndex, StorageError>
Rebuilds the checkpoint index by scanning all segments for checkpoint records.
Sourcepub fn create_checkpoint(
&mut self,
stream_id: StreamId,
current_offset: Offset,
prev_hash: Option<ChainHash>,
record_count: u64,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError>
pub fn create_checkpoint( &mut self, stream_id: StreamId, current_offset: Offset, prev_hash: Option<ChainHash>, record_count: u64, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>
Creates a checkpoint at the current position in the active segment.
Sourcepub fn read_records_verified(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError>
pub fn read_records_verified( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Record>, StorageError>
Reads records with checkpoint-optimized verification.
Instead of verifying from genesis, this method verifies from the nearest
checkpoint before from_offset. Reads span across segment boundaries.
Sourcepub fn last_checkpoint(
&mut self,
stream_id: StreamId,
) -> Result<Option<Offset>, StorageError>
pub fn last_checkpoint( &mut self, stream_id: StreamId, ) -> Result<Option<Offset>, StorageError>
Returns the last checkpoint for a stream, if any.
Sourcepub fn latest_chain_hash(
&mut self,
stream_id: StreamId,
) -> Result<Option<ChainHash>, StorageError>
pub fn latest_chain_hash( &mut self, stream_id: StreamId, ) -> Result<Option<ChainHash>, StorageError>
Returns the chain hash of the last appended record for a stream.
Used to recover the in-memory chain_heads map on process restart:
without this, the first post-restart append would write
prev_hash = None into a stream whose on-disk tail is non-null,
producing a permanent chain break that only surfaces on a later
verified read.
Returns Ok(None) for streams with no records (never written to,
or written then pruned). Returns Ok(Some(hash)) for streams
whose active segment contains at least one record.
Scans the active segment from its start (or the nearest checkpoint when one exists) — linear in segment size, bounded by the configured max segment size. Only called on startup or on first append to a stream after process restart, so the cost amortises to near zero.
Sourcepub fn segment_count(&self, stream_id: StreamId) -> usize
pub fn segment_count(&self, stream_id: StreamId) -> usize
Returns information about all segments for a stream.
Sourcepub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32>
pub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32>
Returns the list of completed (immutable) segment numbers for a stream.
Sourcepub fn flush_indexes(&mut self) -> Result<(), StorageError>
pub fn flush_indexes(&mut self) -> Result<(), StorageError>
Flushes all dirty indexes to disk.
Call this on shutdown or before operations that require index durability.
This is also called automatically from Drop to prevent stale indexes.
Trait Implementations§
Source§impl StorageBackend for Storage
impl StorageBackend for Storage
Source§fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError>
fn append_batch( &mut self, stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset, prev_hash: Option<ChainHash>, fsync: bool, ) -> Result<(Offset, ChainHash), StorageError>
Source§fn read_from(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError>
fn read_from( &mut self, stream_id: StreamId, from_offset: Offset, max_bytes: u64, ) -> Result<Vec<Bytes>, StorageError>
Source§fn latest_chain_hash(
&mut self,
stream_id: StreamId,
) -> Result<Option<ChainHash>, StorageError>
fn latest_chain_hash( &mut self, stream_id: StreamId, ) -> Result<Option<ChainHash>, StorageError>
None if the stream has never been written to. Read more