Skip to main content

kimberlite_storage/
storage.rs

1//! Append-only event log storage with checkpoint support and segment rotation.
2//!
3//! The [`Storage`] struct manages segment files on disk, providing append and read
4//! operations for event streams. Each stream gets its own directory with
5//! numbered segment files that rotate when reaching the configured size limit.
6//!
7//! # File Layout
8//!
9//! ```text
10//! {data_dir}/
11//! └── {stream_id}/
12//!     ├── segment_000000.log      <- first segment (immutable after rotation)
13//!     ├── segment_000000.log.idx  <- offset index for segment 0
14//!     ├── segment_000001.log      <- second segment (active)
15//!     ├── segment_000001.log.idx  <- offset index for segment 1
16//!     └── manifest.json           <- segment manifest (offset ranges)
17//! ```
18//!
19//! # Segment Rotation
20//!
21//! When a segment exceeds `max_segment_size` bytes, a new segment is created.
22//! Completed segments are immutable and can be safely memory-mapped. The hash
23//! chain is continuous across segment boundaries.
24//!
25//! # Hash Chain Integrity
26//!
27//! Every record contains a cryptographic link (`prev_hash`) to the previous record,
28//! forming a tamper-evident chain. Reads verify this chain from genesis (or a
29//! checkpoint) to detect any corruption or tampering.
30//!
31//! # Checkpoints
32//!
33//! Checkpoints are periodic verification anchors stored as special records in the
34//! log. They enable efficient verified reads by reducing verification from O(n)
35//! to O(k) where k is the distance to the nearest checkpoint.
36
37use std::collections::HashMap;
38use std::fs::{self, OpenOptions};
39use std::io::Write;
40use std::path::PathBuf;
41
42use bytes::Bytes;
43use kimberlite_crypto::ChainHash;
44use kimberlite_types::{CheckpointPolicy, CompressionKind, Offset, RecordKind, StreamId};
45
46use crate::checkpoint::{
47    CheckpointIndex, deserialize_checkpoint_payload, serialize_checkpoint_payload,
48};
49use crate::codec::CodecRegistry;
50use crate::{OffsetIndex, Record, StorageError};
51
52/// Number of dirty records before an index is flushed to disk.
53const INDEX_FLUSH_THRESHOLD: usize = 100;
54
55/// Default maximum segment size in bytes (256 MB).
56const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
57
58/// Manifest filename for segment metadata.
59const MANIFEST_FILENAME: &str = "manifest.json";
60
61/// Formats a segment filename from its number.
62fn segment_filename(segment_num: u32) -> String {
63    format!("segment_{segment_num:06}.log")
64}
65
66/// Formats an index filename from a segment number.
67fn segment_index_filename(segment_num: u32) -> String {
68    format!("segment_{segment_num:06}.log.idx")
69}
70
71/// Metadata for a single segment.
72#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
73struct SegmentMeta {
74    /// Segment number (0-based).
75    segment_num: u32,
76    /// First logical offset in this segment.
77    first_offset: u64,
78    /// One past the last logical offset in this segment (exclusive).
79    /// For the active segment this is the next offset to be written.
80    next_offset: u64,
81    /// Size of the segment file in bytes.
82    size_bytes: u64,
83}
84
85/// Per-stream segment manifest tracking all segments and their offset ranges.
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct SegmentManifest {
88    /// Ordered list of segments (ascending by `segment_num`).
89    segments: Vec<SegmentMeta>,
90    /// The currently active (writable) segment number.
91    active_segment: u32,
92}
93
94impl SegmentManifest {
95    /// Creates a new manifest with a single empty segment.
96    fn new() -> Self {
97        Self {
98            segments: vec![SegmentMeta {
99                segment_num: 0,
100                first_offset: 0,
101                next_offset: 0,
102                size_bytes: 0,
103            }],
104            active_segment: 0,
105        }
106    }
107
108    /// Persists the manifest to disk.
109    fn save(&self, stream_dir: &std::path::Path) -> Result<(), StorageError> {
110        let path = stream_dir.join(MANIFEST_FILENAME);
111        let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
112        fs::write(path, json)?;
113        Ok(())
114    }
115
116    /// Loads a manifest from disk.
117    fn load(stream_dir: &std::path::Path) -> Result<Self, StorageError> {
118        let path = stream_dir.join(MANIFEST_FILENAME);
119        let json = fs::read_to_string(path)?;
120        let manifest: Self = serde_json::from_str(&json).map_err(std::io::Error::other)?;
121        Ok(manifest)
122    }
123
124    /// Returns the active segment metadata mutably.
125    fn active_mut(&mut self) -> &mut SegmentMeta {
126        self.segments
127            .iter_mut()
128            .find(|s| s.segment_num == self.active_segment)
129            .expect("active segment must exist in manifest")
130    }
131
132    /// Finds which segment contains a given logical offset.
133    fn find_segment_for_offset(&self, offset: u64) -> Option<&SegmentMeta> {
134        // Binary search: segments are ordered by first_offset
135        match self
136            .segments
137            .binary_search_by_key(&offset, |s| s.first_offset)
138        {
139            Ok(idx) => Some(&self.segments[idx]),
140            Err(idx) => {
141                if idx == 0 {
142                    None
143                } else {
144                    let seg = &self.segments[idx - 1];
145                    if offset < seg.next_offset {
146                        Some(seg)
147                    } else {
148                        // Offset is in the active segment (beyond last completed segment)
149                        self.segments.last()
150                    }
151                }
152            }
153        }
154    }
155
156    /// Adds a new segment and returns its number.
157    fn rotate(&mut self, first_offset: u64) -> u32 {
158        let new_num = self.active_segment + 1;
159        self.segments.push(SegmentMeta {
160            segment_num: new_num,
161            first_offset,
162            next_offset: first_offset,
163            size_bytes: 0,
164        });
165        self.active_segment = new_num;
166        new_num
167    }
168}
169
170/// Append-only event log storage with checkpoint support and segment rotation.
171///
172/// Manages segment files on disk, providing append and read operations for
173/// event streams. Each stream gets its own directory with numbered segment files.
174/// Segments rotate when they exceed `max_segment_size` bytes.
175///
176/// # Invariants
177///
178/// - Records are append-only; existing data is never modified
179/// - Each record links to the previous via `prev_hash` (hash chain)
180/// - The offset index stays in sync with the log (updated atomically with appends)
181/// - Checkpoints are created according to the configured policy
182/// - Hash chain integrity is maintained across segment boundaries
183#[derive(Debug)]
184pub struct Storage {
185    /// Root directory for all stream data.
186    data_dir: PathBuf,
187
188    /// In-memory cache of offset indexes, keyed by (stream, `segment_num`).
189    /// Loaded lazily on first access, kept in sync during appends.
190    index_cache: HashMap<(StreamId, u32), OffsetIndex>,
191
192    /// In-memory cache of checkpoint indexes, keyed by stream.
193    /// Rebuilt on first access by scanning for checkpoint records.
194    checkpoint_cache: HashMap<StreamId, CheckpointIndex>,
195
196    /// Policy for automatic checkpoint creation.
197    checkpoint_policy: CheckpointPolicy,
198
199    /// Tracks how many records have been appended to each stream/segment's index
200    /// since the last index flush. Used to batch index writes for performance.
201    index_dirty_count: HashMap<(StreamId, u32), usize>,
202
203    /// Per-stream segment manifests.
204    manifests: HashMap<StreamId, SegmentManifest>,
205
206    /// Maximum segment size in bytes before rotation.
207    max_segment_size: u64,
208
209    /// Tracks the number of entries already flushed to the main index file
210    /// for each (stream, segment). Entries beyond this count are in the WAL.
211    index_flushed_count: HashMap<(StreamId, u32), usize>,
212
213    /// Cached data for completed (immutable) segments.
214    ///
215    /// Only rotated segments are cached. Active segments are read fresh from disk
216    /// since they may still be written to. This avoids repeated `fs::read()` calls
217    /// and heap allocations for immutable data.
218    segment_data_cache: HashMap<(StreamId, u32), Bytes>,
219
220    /// Default compression algorithm for new records.
221    default_compression: CompressionKind,
222
223    /// Codec registry for compress/decompress operations.
224    codec_registry: CodecRegistry,
225}
226
227impl Storage {
228    /// Creates a new storage instance with the given data directory.
229    ///
230    /// The directory will be created if it doesn't exist when the first
231    /// write occurs. Uses the default checkpoint policy.
232    pub fn new(data_dir: impl Into<PathBuf>) -> Self {
233        Self::with_checkpoint_policy(data_dir, CheckpointPolicy::default())
234    }
235
236    /// Creates a new storage instance with a custom checkpoint policy.
237    pub fn with_checkpoint_policy(
238        data_dir: impl Into<PathBuf>,
239        checkpoint_policy: CheckpointPolicy,
240    ) -> Self {
241        Self {
242            data_dir: data_dir.into(),
243            index_cache: HashMap::new(),
244            checkpoint_cache: HashMap::new(),
245            checkpoint_policy,
246            index_dirty_count: HashMap::new(),
247            manifests: HashMap::new(),
248            max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
249            index_flushed_count: HashMap::new(),
250            segment_data_cache: HashMap::new(),
251            default_compression: CompressionKind::None,
252            codec_registry: CodecRegistry::new(),
253        }
254    }
255
256    /// Creates a new storage instance with a custom maximum segment size.
257    pub fn with_max_segment_size(
258        data_dir: impl Into<PathBuf>,
259        checkpoint_policy: CheckpointPolicy,
260        max_segment_size: u64,
261    ) -> Self {
262        Self {
263            data_dir: data_dir.into(),
264            index_cache: HashMap::new(),
265            checkpoint_cache: HashMap::new(),
266            checkpoint_policy,
267            index_dirty_count: HashMap::new(),
268            manifests: HashMap::new(),
269            max_segment_size,
270            index_flushed_count: HashMap::new(),
271            segment_data_cache: HashMap::new(),
272            default_compression: CompressionKind::None,
273            codec_registry: CodecRegistry::new(),
274        }
275    }
276
277    /// Creates a new storage instance with compression enabled.
278    pub fn with_compression(
279        data_dir: impl Into<PathBuf>,
280        checkpoint_policy: CheckpointPolicy,
281        compression: CompressionKind,
282    ) -> Self {
283        Self {
284            data_dir: data_dir.into(),
285            index_cache: HashMap::new(),
286            checkpoint_cache: HashMap::new(),
287            checkpoint_policy,
288            index_dirty_count: HashMap::new(),
289            manifests: HashMap::new(),
290            max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
291            index_flushed_count: HashMap::new(),
292            segment_data_cache: HashMap::new(),
293            default_compression: compression,
294            codec_registry: CodecRegistry::new(),
295        }
296    }
297
298    /// Returns the default compression kind.
299    pub fn default_compression(&self) -> CompressionKind {
300        self.default_compression
301    }
302
303    /// Wipes all persisted state and clears every in-memory cache,
304    /// leaving `self` in a state observationally equivalent to
305    /// `Self::new(data_dir)` on a fresh directory.
306    ///
307    /// Intended **only** for libFuzzer persistent-mode targets that
308    /// keep a single `Kimberlite` alive across iterations and need a
309    /// cheap "reset to empty" between inputs. Deletes the contents of
310    /// `data_dir` on disk — never call this from application code.
311    ///
312    /// `Storage` does not cache open file handles, so the filesystem
313    /// reset is just `remove_dir_all` + `create_dir_all`; no close
314    /// dance is required.
315    #[cfg(feature = "fuzz-reset")]
316    pub fn reset(&mut self) -> Result<(), crate::error::StorageError> {
317        if self.data_dir.exists() {
318            std::fs::remove_dir_all(&self.data_dir)?;
319        }
320        std::fs::create_dir_all(&self.data_dir)?;
321        self.index_cache.clear();
322        self.checkpoint_cache.clear();
323        self.index_dirty_count.clear();
324        self.manifests.clear();
325        self.index_flushed_count.clear();
326        self.segment_data_cache.clear();
327        Ok(())
328    }
329
330    /// Returns the current checkpoint policy.
331    pub fn checkpoint_policy(&self) -> &CheckpointPolicy {
332        &self.checkpoint_policy
333    }
334
335    /// Returns the data directory path.
336    pub fn data_dir(&self) -> &PathBuf {
337        &self.data_dir
338    }
339
340    /// Returns the maximum segment size in bytes.
341    pub fn max_segment_size(&self) -> u64 {
342        self.max_segment_size
343    }
344
345    /// Returns the stream directory path.
346    fn stream_dir(&self, stream_id: StreamId) -> PathBuf {
347        self.data_dir.join(stream_id.to_string())
348    }
349
350    /// Returns the path to a specific segment file.
351    fn segment_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
352        self.stream_dir(stream_id)
353            .join(segment_filename(segment_num))
354    }
355
356    /// Returns the path to the index file for a specific segment.
357    fn index_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
358        self.stream_dir(stream_id)
359            .join(segment_index_filename(segment_num))
360    }
361
362    /// Returns the path to the index file for the active segment.
363    pub fn index_path(&self, stream_id: StreamId) -> PathBuf {
364        let segment_num = self
365            .manifests
366            .get(&stream_id)
367            .map_or(0, |m| m.active_segment);
368        self.index_path_for(stream_id, segment_num)
369    }
370
371    /// Gets or loads the manifest for a stream.
372    ///
373    /// If no manifest exists on disk, creates a fresh empty manifest.
374    fn get_or_load_manifest(
375        &mut self,
376        stream_id: StreamId,
377    ) -> Result<&mut SegmentManifest, StorageError> {
378        if !self.manifests.contains_key(&stream_id) {
379            let stream_dir = self.stream_dir(stream_id);
380            let manifest = if stream_dir.join(MANIFEST_FILENAME).exists() {
381                SegmentManifest::load(&stream_dir)?
382            } else {
383                SegmentManifest::new()
384            };
385            self.manifests.insert(stream_id, manifest);
386        }
387        Ok(self.manifests.get_mut(&stream_id).expect("just inserted"))
388    }
389
390    /// Rebuilds the offset index for a specific segment by scanning the log file.
391    ///
392    /// This is the recovery path when the index file is missing or corrupted.
393    /// Scans every record in the segment to reconstruct byte positions.
394    ///
395    /// # Performance
396    ///
397    /// O(n) where n is the number of records in the segment.
398    /// With segment rotation, this is bounded to a single segment's worth of data.
399    ///
400    /// # Errors
401    ///
402    /// Returns [`StorageError::CorruptedRecord`] if any record in the log is invalid.
403    pub fn rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
404        let segment_num = self
405            .manifests
406            .get(&stream_id)
407            .map_or(0, |m| m.active_segment);
408        self.rebuild_index_for_segment(stream_id, segment_num)
409    }
410
411    /// Rebuilds the offset index for a specific segment.
412    ///
413    /// **AUDIT-2026-03 M-8 (Torn Write Recovery):**
414    /// When a torn write is detected (RECORD_START present but RECORD_END absent),
415    /// the log is truncated at the last complete record. This handles power-loss
416    /// scenarios where the final write was not fully committed to stable storage.
417    ///
418    /// Truncation is safe because:
419    /// 1. The incomplete record was never acknowledged to any client.
420    /// 2. The VSR protocol requires the leader to retransmit any un-committed
421    ///    operations during recovery.
422    fn rebuild_index_for_segment(
423        &self,
424        stream_id: StreamId,
425        segment_num: u32,
426    ) -> Result<OffsetIndex, StorageError> {
427        let segment_path = self.segment_path_for(stream_id, segment_num);
428
429        if !segment_path.exists() {
430            return Ok(OffsetIndex::new());
431        }
432
433        let data: Bytes = fs::read(&segment_path)?.into();
434        let mut index = OffsetIndex::new();
435        let mut pos = 0;
436
437        loop {
438            if pos >= data.len() {
439                break;
440            }
441
442            match Record::from_bytes(&data.slice(pos..)) {
443                Ok((_, consumed)) => {
444                    index.append(pos as u64);
445                    pos += consumed;
446                }
447                Err(StorageError::TornWrite { ref reason }) => {
448                    // **AUDIT-2026-03 M-8: Torn Write Recovery**
449                    //
450                    // The record at `pos` has RECORD_START but missing/corrupt
451                    // RECORD_END — it was incompletely written (power loss or crash).
452                    // Truncate the file at `pos` (the start of the torn record) so
453                    // only complete records remain.
454                    tracing::warn!(
455                        stream_id = %stream_id,
456                        segment_num = segment_num,
457                        torn_byte_offset = pos,
458                        complete_records = index.len(),
459                        reason = %reason,
460                        "torn write detected during recovery — truncating log at last complete record"
461                    );
462
463                    // Open the segment file and truncate at the torn record boundary.
464                    // This is safe: the incomplete record was never committed.
465                    let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
466                    file.set_len(pos as u64)?;
467
468                    tracing::info!(
469                        stream_id = %stream_id,
470                        segment_num = segment_num,
471                        truncated_to_bytes = pos,
472                        "log truncated to last complete record"
473                    );
474                    break;
475                }
476                Err(StorageError::UnexpectedEof) => {
477                    // Partial data at end of file — treat as a torn write.
478                    // The file has a fragment that is smaller than a valid record.
479                    tracing::warn!(
480                        stream_id = %stream_id,
481                        segment_num = segment_num,
482                        partial_byte_offset = pos,
483                        complete_records = index.len(),
484                        "unexpected EOF during recovery — truncating log at last complete record"
485                    );
486
487                    let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
488                    file.set_len(pos as u64)?;
489                    break;
490                }
491                Err(e) => {
492                    // Other errors (corrupted CRC, invalid kind) are not torn writes
493                    // and propagate normally — they indicate data corruption, not
494                    // an incomplete write.
495                    return Err(e);
496                }
497            }
498        }
499
500        let index_path = self.index_path_for(stream_id, segment_num);
501        index.save(&index_path)?;
502
503        Ok(index)
504    }
505
506    /// Loads the offset index for a specific segment from disk, or rebuilds it.
507    ///
508    /// Attempts to load the main index + replay WAL (fast path). Falls back
509    /// to a full log scan if the index is corrupted.
510    fn load_or_rebuild_index_for_segment(
511        &self,
512        stream_id: StreamId,
513        segment_num: u32,
514    ) -> Result<OffsetIndex, StorageError> {
515        let index_path = self.index_path_for(stream_id, segment_num);
516
517        // If the index file doesn't exist yet (first write to this segment),
518        // return an empty index without warning — this is expected behaviour.
519        if !index_path.exists() {
520            let wal_path = index_path.with_extension("idx.wal");
521            if !wal_path.exists() {
522                return self.rebuild_index_for_segment(stream_id, segment_num);
523            }
524        }
525
526        // Try load with WAL replay (fast path)
527        if let Ok(index) = OffsetIndex::load_with_wal(&index_path) {
528            return Ok(index);
529        }
530
531        // Fall back to plain load (no WAL)
532        if let Ok(index) = OffsetIndex::load(&index_path) {
533            return Ok(index);
534        }
535
536        // Index file exists but failed to load — genuine corruption.
537        tracing::warn!(
538            stream_id = %stream_id,
539            segment_num = segment_num,
540            "index corrupted, rebuilding from log"
541        );
542        self.rebuild_index_for_segment(stream_id, segment_num)
543    }
544
545    /// Loads the offset index from disk, or rebuilds it if missing/corrupted.
546    ///
547    /// This is the primary way to obtain an index for the active segment.
548    pub fn load_or_rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
549        let segment_num = self
550            .manifests
551            .get(&stream_id)
552            .map_or(0, |m| m.active_segment);
553        self.load_or_rebuild_index_for_segment(stream_id, segment_num)
554    }
555
556    /// Ensures the index for a given (stream, segment) is in the cache.
557    fn ensure_index_cached(
558        &mut self,
559        stream_id: StreamId,
560        segment_num: u32,
561    ) -> Result<(), StorageError> {
562        let key = (stream_id, segment_num);
563        if !self.index_cache.contains_key(&key) {
564            let loaded = self.load_or_rebuild_index_for_segment(stream_id, segment_num)?;
565            let flushed = loaded.len(); // Everything loaded is considered "flushed"
566            self.index_cache.insert(key, loaded);
567            self.index_flushed_count.insert(key, flushed);
568        }
569        Ok(())
570    }
571
572    /// Reads segment data, using cached `Bytes` for completed segments and fresh `fs::read` for active.
573    ///
574    /// Completed (immutable) segments are memory-mapped for zero-copy access.
575    /// The active segment uses standard I/O since it may still be written to.
576    fn read_segment_data(
577        &mut self,
578        stream_id: StreamId,
579        segment_num: u32,
580    ) -> Result<Bytes, StorageError> {
581        let is_active = self
582            .manifests
583            .get(&stream_id)
584            .is_some_and(|m| m.active_segment == segment_num);
585
586        if is_active {
587            // Active segment: read fresh from disk (file may still be written to)
588            let path = self.segment_path_for(stream_id, segment_num);
589            Ok(fs::read(&path)?.into())
590        } else {
591            // Completed segment: return cached data or read and cache
592            let key = (stream_id, segment_num);
593            if let Some(cached) = self.segment_data_cache.get(&key) {
594                return Ok(cached.clone());
595            }
596
597            let path = self.segment_path_for(stream_id, segment_num);
598            let data: Bytes = fs::read(&path)?.into();
599            self.segment_data_cache.insert(key, data.clone());
600            Ok(data)
601        }
602    }
603
604    /// Returns a list of all segment numbers for a stream, in order.
605    fn segment_numbers(&self, stream_id: StreamId) -> Vec<u32> {
606        self.manifests.get(&stream_id).map_or_else(
607            || {
608                // No manifest yet, check if segment 0 exists
609                if self.segment_path_for(stream_id, 0).exists() {
610                    vec![0]
611                } else {
612                    vec![]
613                }
614            },
615            |m| m.segments.iter().map(|s| s.segment_num).collect(),
616        )
617    }
618
619    /// Appends a batch of events to a stream, building the hash chain.
620    ///
621    /// Each event is written as a [`Record`] with a cryptographic link to the
622    /// previous record, forming a tamper-evident chain. The offset index is
623    /// updated atomically with the append to maintain O(1) lookup capability.
624    ///
625    /// If the active segment exceeds `max_segment_size`, a new segment is
626    /// created (rotation). The hash chain remains continuous across segments.
627    ///
628    /// # Arguments
629    ///
630    /// * `stream_id` - The stream to append to
631    /// * `events` - The event payloads to append (must not be empty)
632    /// * `expected_offset` - The offset to start writing at
633    /// * `prev_hash` - Hash of the previous record (`None` for genesis)
634    /// * `fsync` - Whether to fsync after writing (recommended for durability)
635    ///
636    /// # Returns
637    ///
638    /// A tuple of:
639    /// - The next offset (for subsequent appends)
640    /// - The hash of the last record written (for chain continuity)
641    ///
642    /// # Panics
643    ///
644    /// Panics if `events` is empty. Empty batches are a caller bug.
645    pub fn append_batch(
646        &mut self,
647        stream_id: StreamId,
648        events: Vec<Bytes>,
649        expected_offset: Offset,
650        prev_hash: Option<ChainHash>,
651        fsync: bool,
652    ) -> Result<(Offset, ChainHash), StorageError> {
653        // Precondition: batch must not be empty
654        assert!(!events.is_empty(), "cannot append empty batch");
655
656        let event_count = events.len();
657
658        // Ensure stream directory exists
659        let stream_dir = self.stream_dir(stream_id);
660        fs::create_dir_all(&stream_dir)?;
661
662        // Load or create manifest
663        let manifest = self.get_or_load_manifest(stream_id)?;
664        let active_seg = manifest.active_segment;
665
666        // Open segment file for appending
667        let segment_path = self.segment_path_for(stream_id, active_seg);
668        let mut file = OpenOptions::new()
669            .create(true)
670            .append(true)
671            .open(&segment_path)?;
672
673        // Get current file size as starting byte position for new records
674        let mut byte_position: u64 = file.metadata()?.len();
675
676        // Pre-compute paths
677        let index_path = self.index_path_for(stream_id, active_seg);
678        let cache_key = (stream_id, active_seg);
679
680        // Load index into cache if not present
681        self.ensure_index_cached(stream_id, active_seg)?;
682
683        let index = self
684            .index_cache
685            .get_mut(&cache_key)
686            .expect("index exists: just ensured");
687
688        let mut current_offset = expected_offset;
689        let mut current_hash = prev_hash;
690
691        let compression = self.default_compression;
692
693        for event in events {
694            // Record byte position BEFORE writing (where this record starts)
695            index.append(byte_position);
696
697            // Compress the payload if compression is enabled
698            let (stored_payload, record_compression) = if compression == CompressionKind::None {
699                (event.clone(), CompressionKind::None)
700            } else {
701                let compressed = self.codec_registry.compress(compression, &event)?;
702                // Only use compression if it actually reduces size
703                if compressed.len() < event.len() {
704                    (Bytes::from(compressed), compression)
705                } else {
706                    (event.clone(), CompressionKind::None)
707                }
708            };
709
710            // Hash is computed over the ORIGINAL (uncompressed) payload
711            let hash_record = Record::new(current_offset, current_hash, event);
712            current_hash = Some(hash_record.compute_hash());
713
714            // But the on-disk record stores the compressed payload
715            let record = Record::with_compression(
716                current_offset,
717                hash_record.prev_hash(),
718                RecordKind::Data,
719                record_compression,
720                stored_payload,
721            );
722            let record_bytes = record.to_bytes();
723
724            // Update position AFTER computing record size
725            byte_position += record_bytes.len() as u64;
726
727            file.write_all(&record_bytes)?;
728
729            current_offset += Offset::from(1u64);
730        }
731
732        // Ensure durability if requested
733        if fsync {
734            file.sync_all()?;
735        }
736
737        // Update manifest with new segment size and offset
738        let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
739        let active_meta = manifest.active_mut();
740        active_meta.size_bytes = byte_position;
741        active_meta.next_offset = current_offset.as_u64();
742
743        // Track dirty count and use WAL for incremental index writes.
744        // WAL appends are O(1) per entry vs O(n) for full index rewrite.
745        // Compaction to full index happens when WAL reaches 1000 entries.
746        let cache_key_for_flush = (stream_id, active_seg);
747        let dirty = self
748            .index_dirty_count
749            .entry(cache_key_for_flush)
750            .or_insert(0);
751        *dirty += event_count;
752        if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
753            let index = self
754                .index_cache
755                .get(&cache_key_for_flush)
756                .expect("index exists: just used above");
757            let flushed = *self
758                .index_flushed_count
759                .get(&cache_key_for_flush)
760                .unwrap_or(&0);
761            // Use WAL for incremental writes; compact when exceeding MAX_WAL_BYTES (AUDIT-2026-03 M-7)
762            index.save_incremental(&index_path, flushed, crate::index::MAX_WAL_BYTES)?;
763            // After save_incremental, all entries up to current len are on disk (main + WAL)
764            self.index_flushed_count
765                .insert(cache_key_for_flush, index.len());
766            *dirty = 0;
767        }
768
769        // Check if segment rotation is needed
770        if byte_position >= self.max_segment_size {
771            self.rotate_segment(stream_id, current_offset)?;
772        }
773
774        // Persist manifest after writes
775        let stream_dir = self.stream_dir(stream_id);
776        let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
777        manifest.save(&stream_dir)?;
778
779        // Postcondition: we wrote exactly event_count records
780        debug_assert_eq!(
781            current_offset.as_u64() - expected_offset.as_u64(),
782            event_count as u64,
783            "offset mismatch after batch write"
784        );
785
786        // Property: offset must only advance forward (append-only invariant)
787        kimberlite_properties::always!(
788            current_offset.as_u64() >= expected_offset.as_u64(),
789            "storage.offset_advances_forward",
790            "offset must only advance forward after append_batch"
791        );
792
793        // Property: hash chain prev_hash links are valid after append
794        kimberlite_properties::always!(
795            current_hash.is_some(),
796            "storage.hash_chain_valid_after_append",
797            "hash chain must produce a valid hash after non-empty batch append"
798        );
799
800        Ok((current_offset, current_hash.expect("batch was non-empty")))
801    }
802
803    /// Rotates the active segment for a stream.
804    ///
805    /// Flushes the current segment's index and creates a new empty segment.
806    fn rotate_segment(
807        &mut self,
808        stream_id: StreamId,
809        next_offset: Offset,
810    ) -> Result<(), StorageError> {
811        let old_seg = self
812            .manifests
813            .get(&stream_id)
814            .expect("manifest loaded")
815            .active_segment;
816
817        // Flush the old segment's index
818        let old_key = (stream_id, old_seg);
819        if let Some(index) = self.index_cache.get(&old_key) {
820            let index_path = self.index_path_for(stream_id, old_seg);
821            index.save(&index_path)?;
822        }
823        self.index_dirty_count.insert(old_key, 0);
824
825        // Rotate to new segment
826        let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
827        let new_seg = manifest.rotate(next_offset.as_u64());
828
829        tracing::info!(
830            stream_id = %stream_id,
831            old_segment = old_seg,
832            new_segment = new_seg,
833            "rotated segment"
834        );
835
836        Ok(())
837    }
838
839    /// Reads events from a stream with checkpoint-optimized verification.
840    ///
841    /// Uses the nearest checkpoint as a verification anchor, reducing verification
842    /// cost from O(n) to O(k) where k is the distance to the nearest checkpoint.
843    /// Falls back to genesis verification if no checkpoints exist.
844    ///
845    /// Reads span across segment boundaries transparently.
846    pub fn read_from(
847        &mut self,
848        stream_id: StreamId,
849        from_offset: Offset,
850        max_bytes: u64,
851    ) -> Result<Vec<Bytes>, StorageError> {
852        let records = self.read_records_verified(stream_id, from_offset, max_bytes)?;
853        Ok(records.into_iter().map(|r| r.payload().clone()).collect())
854    }
855
856    /// Reads events from a stream with full genesis verification.
857    ///
858    /// Verifies the hash chain from genesis (offset 0) across all segments.
859    /// For most use cases, prefer [`Self::read_from`] which uses checkpoint-optimized
860    /// verification for better performance.
861    pub fn read_from_genesis(
862        &mut self,
863        stream_id: StreamId,
864        from_offset: Offset,
865        max_bytes: u64,
866    ) -> Result<Vec<Bytes>, StorageError> {
867        let records = self.read_records_from_genesis(stream_id, from_offset, max_bytes)?;
868        Ok(records.into_iter().map(|r| r.payload().clone()).collect())
869    }
870
871    /// Reads records from a stream with full genesis hash chain verification.
872    ///
873    /// Verifies the hash chain from genesis up to and including the requested
874    /// records, spanning all segments. This ensures tamper detection.
875    pub fn read_records_from_genesis(
876        &mut self,
877        stream_id: StreamId,
878        from_offset: Offset,
879        max_bytes: u64,
880    ) -> Result<Vec<Record>, StorageError> {
881        let segment_nums = self.segment_numbers(stream_id);
882
883        let mut results = Vec::new();
884        let mut bytes_read: u64 = 0;
885        let mut expected_prev_hash: Option<ChainHash> = None;
886        let mut records_verified: u64 = 0;
887
888        for seg_num in segment_nums {
889            let seg_path = self.segment_path_for(stream_id, seg_num);
890            if !seg_path.exists() {
891                continue;
892            }
893
894            let data = self.read_segment_data(stream_id, seg_num)?;
895            let mut pos = 0;
896
897            while pos < data.len() && bytes_read < max_bytes {
898                let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
899
900                // Decompress payload if needed
901                let record = self.decompress_record(record)?;
902
903                // Verify hash chain integrity
904                if record.prev_hash() != expected_prev_hash {
905                    return Err(StorageError::ChainVerificationFailed {
906                        offset: record.offset(),
907                        expected: expected_prev_hash,
908                        actual: record.prev_hash(),
909                    });
910                }
911
912                // Property: hash chain prev_hash links verified valid on read
913                kimberlite_properties::always!(
914                    record.prev_hash() == expected_prev_hash,
915                    "storage.hash_chain_valid_on_genesis_read",
916                    "prev_hash must match expected hash during genesis-verified read"
917                );
918
919                expected_prev_hash = Some(record.compute_hash());
920                records_verified += 1;
921                pos += consumed;
922
923                // Only collect records at or after the requested offset
924                if record.offset() < from_offset {
925                    continue;
926                }
927
928                bytes_read += record.payload().len() as u64;
929                results.push(record);
930            }
931
932            if bytes_read >= max_bytes {
933                break;
934            }
935        }
936
937        // Postcondition: we verified all records we read
938        debug_assert!(
939            records_verified == 0 || expected_prev_hash.is_some(),
940            "verified records but no final hash"
941        );
942
943        // Property: simulation should exercise storage read-after-write
944        kimberlite_properties::sometimes!(
945            !results.is_empty(),
946            "storage.read_after_write_exercised",
947            "simulation should exercise reading non-empty results from storage"
948        );
949
950        Ok(results)
951    }
952
953    // ========================================================================
954    // Pipelined Append
955    // ========================================================================
956
957    /// Appends a batch of events using the two-stage pipeline.
958    ///
959    /// Stage 1 (CPU): Serializes records, computes hash chain, compresses payloads
960    /// into a pre-allocated buffer. Stage 2 (I/O): Writes the buffer to disk.
961    ///
962    /// This method is functionally equivalent to [`Self::append_batch`] but uses
963    /// the pipeline's buffer management for better throughput when called repeatedly.
964    pub fn append_batch_pipelined(
965        &mut self,
966        stream_id: StreamId,
967        events: &[Bytes],
968        expected_offset: Offset,
969        prev_hash: Option<ChainHash>,
970        fsync: bool,
971        pipeline: &mut crate::AppendPipeline,
972    ) -> Result<(Offset, ChainHash), StorageError> {
973        assert!(!events.is_empty(), "cannot append empty batch");
974
975        let event_count = events.len();
976
977        // Ensure stream directory exists
978        let stream_dir = self.stream_dir(stream_id);
979        fs::create_dir_all(&stream_dir)?;
980
981        // Load or create manifest
982        let manifest = self.get_or_load_manifest(stream_id)?;
983        let active_seg = manifest.active_segment;
984
985        // Open segment file for appending
986        let segment_path = self.segment_path_for(stream_id, active_seg);
987        let mut file = OpenOptions::new()
988            .create(true)
989            .append(true)
990            .open(&segment_path)?;
991
992        let base_byte_pos: u64 = file.metadata()?.len();
993
994        // Stage 1 (CPU): Prepare the batch
995        let batch = pipeline.prepare_batch(
996            events,
997            expected_offset,
998            prev_hash,
999            base_byte_pos,
1000            self.default_compression,
1001            &self.codec_registry,
1002        )?;
1003
1004        // Stage 2 (I/O): Write to disk
1005        file.write_all(&batch.data)?;
1006
1007        if fsync {
1008            file.sync_all()?;
1009        }
1010
1011        // Update index cache
1012        let index_path = self.index_path_for(stream_id, active_seg);
1013        let cache_key = (stream_id, active_seg);
1014        self.ensure_index_cached(stream_id, active_seg)?;
1015        let index = self
1016            .index_cache
1017            .get_mut(&cache_key)
1018            .expect("index exists: just ensured");
1019
1020        for &(_offset, byte_pos) in &batch.index_entries {
1021            index.append(byte_pos);
1022        }
1023
1024        let new_byte_pos = base_byte_pos + batch.bytes_written;
1025        let new_offset = expected_offset + Offset::from(event_count as u64);
1026
1027        // Update manifest
1028        let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
1029        let active_meta = manifest.active_mut();
1030        active_meta.size_bytes = new_byte_pos;
1031        active_meta.next_offset = new_offset.as_u64();
1032
1033        // Flush index
1034        let dirty = self.index_dirty_count.entry(cache_key).or_insert(0);
1035        *dirty += event_count;
1036        if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
1037            let index = self.index_cache.get(&cache_key).expect("index exists");
1038            let flushed = *self.index_flushed_count.get(&cache_key).unwrap_or(&0);
1039            index.save_incremental(&index_path, flushed, 1000)?;
1040            self.index_flushed_count.insert(cache_key, index.len());
1041            *dirty = 0;
1042        }
1043
1044        // Check segment rotation
1045        if new_byte_pos >= self.max_segment_size {
1046            self.rotate_segment(stream_id, new_offset)?;
1047        }
1048
1049        // Persist manifest
1050        let stream_dir = self.stream_dir(stream_id);
1051        let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
1052        manifest.save(&stream_dir)?;
1053
1054        debug_assert_eq!(
1055            new_offset.as_u64() - expected_offset.as_u64(),
1056            event_count as u64,
1057            "offset mismatch after pipelined batch write"
1058        );
1059
1060        Ok((new_offset, batch.final_hash))
1061    }
1062
1063    // ========================================================================
1064    // Compression Support
1065    // ========================================================================
1066
1067    /// Decompresses a record's payload if it was stored with compression.
1068    ///
1069    /// Returns a new record with the decompressed payload and `CompressionKind::None`.
1070    /// Records with `CompressionKind::None` are returned unchanged.
1071    fn decompress_record(&self, record: Record) -> Result<Record, StorageError> {
1072        if record.compression() == CompressionKind::None {
1073            return Ok(record);
1074        }
1075
1076        let decompressed = self
1077            .codec_registry
1078            .decompress(record.compression(), record.payload())?;
1079
1080        Ok(Record::with_compression(
1081            record.offset(),
1082            record.prev_hash(),
1083            record.kind(),
1084            CompressionKind::None,
1085            Bytes::from(decompressed),
1086        ))
1087    }
1088
1089    // ========================================================================
1090    // Checkpoint Support
1091    // ========================================================================
1092
1093    /// Rebuilds the checkpoint index by scanning all segments for checkpoint records.
1094    pub fn rebuild_checkpoint_index(
1095        &mut self,
1096        stream_id: StreamId,
1097    ) -> Result<CheckpointIndex, StorageError> {
1098        let segment_nums = self.segment_numbers(stream_id);
1099        let mut checkpoint_index = CheckpointIndex::new();
1100
1101        for seg_num in segment_nums {
1102            let seg_path = self.segment_path_for(stream_id, seg_num);
1103            if !seg_path.exists() {
1104                continue;
1105            }
1106
1107            let data = self.read_segment_data(stream_id, seg_num)?;
1108            let mut pos = 0;
1109
1110            while pos < data.len() {
1111                let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1112
1113                if record.is_checkpoint() {
1114                    checkpoint_index.add(record.offset());
1115                }
1116
1117                pos += consumed;
1118            }
1119        }
1120
1121        tracing::debug!(
1122            stream_id = %stream_id,
1123            checkpoint_count = checkpoint_index.len(),
1124            "rebuilt checkpoint index"
1125        );
1126
1127        Ok(checkpoint_index)
1128    }
1129
1130    /// Gets the checkpoint index for a stream, rebuilding if necessary.
1131    fn get_or_rebuild_checkpoint_index(
1132        &mut self,
1133        stream_id: StreamId,
1134    ) -> Result<&CheckpointIndex, StorageError> {
1135        if !self.checkpoint_cache.contains_key(&stream_id) {
1136            let index = self.rebuild_checkpoint_index(stream_id)?;
1137            self.checkpoint_cache.insert(stream_id, index);
1138        }
1139        Ok(self
1140            .checkpoint_cache
1141            .get(&stream_id)
1142            .expect("just inserted"))
1143    }
1144
1145    /// Creates a checkpoint at the current position in the active segment.
1146    pub fn create_checkpoint(
1147        &mut self,
1148        stream_id: StreamId,
1149        current_offset: Offset,
1150        prev_hash: Option<ChainHash>,
1151        record_count: u64,
1152        fsync: bool,
1153    ) -> Result<(Offset, ChainHash), StorageError> {
1154        let chain_hash = prev_hash.unwrap_or_else(|| ChainHash::from_bytes(&[0u8; 32]));
1155
1156        let payload = serialize_checkpoint_payload(&chain_hash, record_count);
1157
1158        let record = Record::with_kind(current_offset, prev_hash, RecordKind::Checkpoint, payload);
1159        let record_bytes = record.to_bytes();
1160        let record_hash = record.compute_hash();
1161
1162        // Ensure stream directory exists and manifest loaded
1163        let stream_dir = self.stream_dir(stream_id);
1164        fs::create_dir_all(&stream_dir)?;
1165        let manifest = self.get_or_load_manifest(stream_id)?;
1166        let active_seg = manifest.active_segment;
1167
1168        // Open active segment file for appending
1169        let segment_path = self.segment_path_for(stream_id, active_seg);
1170        let mut file = OpenOptions::new()
1171            .create(true)
1172            .append(true)
1173            .open(&segment_path)?;
1174
1175        let byte_position = file.metadata()?.len();
1176
1177        file.write_all(&record_bytes)?;
1178
1179        if fsync {
1180            file.sync_all()?;
1181        }
1182
1183        // Update offset index
1184        let cache_key = (stream_id, active_seg);
1185        let index_path = self.index_path_for(stream_id, active_seg);
1186        self.ensure_index_cached(stream_id, active_seg)?;
1187        let index = self.index_cache.get_mut(&cache_key).expect("just loaded");
1188        index.append(byte_position);
1189        // Checkpoints are safety boundaries — always flush full index + compact WAL
1190        index.save(&index_path)?;
1191        self.index_dirty_count.insert(cache_key, 0);
1192        self.index_flushed_count.insert(cache_key, index.len());
1193        // Remove WAL after compaction
1194        let wal_path = {
1195            let mut p = index_path.as_os_str().to_owned();
1196            p.push(".wal");
1197            std::path::PathBuf::from(p)
1198        };
1199        let _ = fs::remove_file(wal_path);
1200
1201        // Update manifest
1202        let new_size = byte_position + record_bytes.len() as u64;
1203        let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
1204        let active_meta = manifest.active_mut();
1205        active_meta.size_bytes = new_size;
1206        active_meta.next_offset = current_offset.as_u64() + 1;
1207        manifest.save(&stream_dir)?;
1208
1209        // Update checkpoint index
1210        if let Some(cp_index) = self.checkpoint_cache.get_mut(&stream_id) {
1211            cp_index.add(current_offset);
1212        }
1213
1214        tracing::info!(
1215            stream_id = %stream_id,
1216            offset = %current_offset,
1217            record_count = record_count,
1218            "created checkpoint"
1219        );
1220
1221        let next_offset = current_offset + Offset::from(1u64);
1222        Ok((next_offset, record_hash))
1223    }
1224
1225    /// Reads records with checkpoint-optimized verification.
1226    ///
1227    /// Instead of verifying from genesis, this method verifies from the nearest
1228    /// checkpoint before `from_offset`. Reads span across segment boundaries.
1229    pub fn read_records_verified(
1230        &mut self,
1231        stream_id: StreamId,
1232        from_offset: Offset,
1233        max_bytes: u64,
1234    ) -> Result<Vec<Record>, StorageError> {
1235        // Load manifest to know about segments
1236        let _ = self.get_or_load_manifest(stream_id);
1237        let segment_nums = self.segment_numbers(stream_id);
1238
1239        if segment_nums.is_empty() {
1240            return Ok(Vec::new());
1241        }
1242
1243        // Check if the first segment even exists
1244        let first_seg_path = self.segment_path_for(stream_id, segment_nums[0]);
1245        if !first_seg_path.exists() {
1246            return Ok(Vec::new());
1247        }
1248
1249        // Find nearest checkpoint
1250        let checkpoint_index = self.get_or_rebuild_checkpoint_index(stream_id)?;
1251        let verification_start = checkpoint_index.find_nearest(from_offset);
1252
1253        // Determine which segment to start verification from
1254        let (start_seg_num, start_pos, mut expected_prev_hash) = match verification_start {
1255            Some(cp_offset) => {
1256                // Find which segment contains this checkpoint
1257                let manifest = self.manifests.get(&stream_id);
1258                let seg_num = manifest
1259                    .and_then(|m| {
1260                        m.find_segment_for_offset(cp_offset.as_u64())
1261                            .map(|s| s.segment_num)
1262                    })
1263                    .unwrap_or(0);
1264
1265                // Load the index for that segment to get byte position
1266                self.ensure_index_cached(stream_id, seg_num)?;
1267                let offset_index = self
1268                    .index_cache
1269                    .get(&(stream_id, seg_num))
1270                    .expect("just ensured");
1271
1272                // The checkpoint offset within this segment
1273                let first_offset_in_seg = self
1274                    .manifests
1275                    .get(&stream_id)
1276                    .and_then(|m| {
1277                        m.find_segment_for_offset(cp_offset.as_u64())
1278                            .map(|s| s.first_offset)
1279                    })
1280                    .unwrap_or(0);
1281                let local_offset = Offset::new(cp_offset.as_u64() - first_offset_in_seg);
1282
1283                let byte_pos = offset_index
1284                    .lookup(local_offset)
1285                    .ok_or(StorageError::UnexpectedEof)?;
1286
1287                // Read checkpoint record to get its chain_hash
1288                let data = self.read_segment_data(stream_id, seg_num)?;
1289                let (cp_record, _) = Record::from_bytes(&data.slice(byte_pos as usize..))?;
1290                debug_assert!(cp_record.is_checkpoint());
1291
1292                let (chain_hash, _) =
1293                    deserialize_checkpoint_payload(cp_record.payload(), cp_offset)?;
1294
1295                (seg_num, byte_pos as usize, Some(chain_hash))
1296            }
1297            None => (segment_nums[0], 0, None),
1298        };
1299
1300        let mut results = Vec::new();
1301        let mut bytes_read: u64 = 0;
1302        let mut started = false;
1303
1304        for &seg_num in &segment_nums {
1305            if seg_num < start_seg_num {
1306                continue;
1307            }
1308
1309            let seg_path = self.segment_path_for(stream_id, seg_num);
1310            if !seg_path.exists() {
1311                continue;
1312            }
1313
1314            let data = self.read_segment_data(stream_id, seg_num)?;
1315            let mut pos = if seg_num == start_seg_num && !started {
1316                started = true;
1317                start_pos
1318            } else {
1319                0
1320            };
1321
1322            while pos < data.len() && bytes_read < max_bytes {
1323                let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1324
1325                // Decompress payload if needed
1326                let record = self.decompress_record(record)?;
1327
1328                // Property: verified reads must never encounter chain breaks
1329                kimberlite_properties::never!(
1330                    record.prev_hash() != expected_prev_hash,
1331                    "storage.verified_read_chain_break",
1332                    "verified read must never encounter a hash chain break in non-corrupted storage"
1333                );
1334
1335                if record.prev_hash() != expected_prev_hash {
1336                    return Err(StorageError::ChainVerificationFailed {
1337                        offset: record.offset(),
1338                        expected: expected_prev_hash,
1339                        actual: record.prev_hash(),
1340                    });
1341                }
1342
1343                expected_prev_hash = Some(record.compute_hash());
1344                pos += consumed;
1345
1346                if record.offset() >= from_offset && !record.is_checkpoint() {
1347                    bytes_read += record.payload().len() as u64;
1348                    results.push(record);
1349                }
1350            }
1351
1352            if bytes_read >= max_bytes {
1353                break;
1354            }
1355        }
1356
1357        Ok(results)
1358    }
1359
1360    /// Returns the last checkpoint for a stream, if any.
1361    pub fn last_checkpoint(&mut self, stream_id: StreamId) -> Result<Option<Offset>, StorageError> {
1362        let index = self.get_or_rebuild_checkpoint_index(stream_id)?;
1363        Ok(index.last())
1364    }
1365
1366    /// Returns the chain hash of the last appended record for a stream.
1367    ///
1368    /// Used to recover the in-memory `chain_heads` map on process restart:
1369    /// without this, the first post-restart append would write
1370    /// `prev_hash = None` into a stream whose on-disk tail is non-null,
1371    /// producing a permanent chain break that only surfaces on a later
1372    /// verified read.
1373    ///
1374    /// Returns `Ok(None)` for streams with no records (never written to,
1375    /// or written then pruned). Returns `Ok(Some(hash))` for streams
1376    /// whose active segment contains at least one record.
1377    ///
1378    /// Scans the active segment from its start (or the nearest checkpoint
1379    /// when one exists) — linear in segment size, bounded by the configured
1380    /// max segment size. Only called on startup or on first append to a
1381    /// stream after process restart, so the cost amortises to near zero.
1382    pub fn latest_chain_hash(
1383        &mut self,
1384        stream_id: StreamId,
1385    ) -> Result<Option<ChainHash>, StorageError> {
1386        let segment_nums = self.segment_numbers(stream_id);
1387        if segment_nums.is_empty() {
1388            return Ok(None);
1389        }
1390        let last_seg_num = *segment_nums.last().expect("segment_nums non-empty");
1391        let seg_path = self.segment_path_for(stream_id, last_seg_num);
1392        if !seg_path.exists() {
1393            return Ok(None);
1394        }
1395
1396        let data = self.read_segment_data(stream_id, last_seg_num)?;
1397        if data.is_empty() {
1398            return Ok(None);
1399        }
1400
1401        let mut pos = 0usize;
1402        let mut last_hash: Option<ChainHash> = None;
1403        while pos < data.len() {
1404            let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1405            let record = self.decompress_record(record)?;
1406            last_hash = Some(record.compute_hash());
1407            pos += consumed;
1408        }
1409
1410        Ok(last_hash)
1411    }
1412
1413    /// Returns information about all segments for a stream.
1414    pub fn segment_count(&self, stream_id: StreamId) -> usize {
1415        self.manifests
1416            .get(&stream_id)
1417            .map_or(0, |m| m.segments.len())
1418    }
1419
1420    /// Returns the list of completed (immutable) segment numbers for a stream.
1421    pub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
1422        self.manifests.get(&stream_id).map_or_else(Vec::new, |m| {
1423            m.segments
1424                .iter()
1425                .filter(|s| s.segment_num != m.active_segment)
1426                .map(|s| s.segment_num)
1427                .collect()
1428        })
1429    }
1430
1431    /// Flushes all dirty indexes to disk.
1432    ///
1433    /// Call this on shutdown or before operations that require index durability.
1434    /// This is also called automatically from `Drop` to prevent stale indexes.
1435    pub fn flush_indexes(&mut self) -> Result<(), StorageError> {
1436        let dirty_keys: Vec<(StreamId, u32)> = self
1437            .index_dirty_count
1438            .iter()
1439            .filter(|(_, count)| **count > 0)
1440            .map(|(&key, _)| key)
1441            .collect();
1442
1443        let mut first_error: Option<StorageError> = None;
1444
1445        for (stream_id, segment_num) in dirty_keys {
1446            if let Some(index) = self.index_cache.get(&(stream_id, segment_num)) {
1447                let index_path = self.index_path_for(stream_id, segment_num);
1448                // Full save compacts the WAL into the main index
1449                if let Err(e) = index.save(&index_path) {
1450                    tracing::error!(
1451                        stream_id = %stream_id,
1452                        segment_num = segment_num,
1453                        error = %e,
1454                        "failed to flush index on shutdown"
1455                    );
1456                    if first_error.is_none() {
1457                        first_error = Some(e);
1458                    }
1459                } else {
1460                    self.index_dirty_count.insert((stream_id, segment_num), 0);
1461                    self.index_flushed_count
1462                        .insert((stream_id, segment_num), index.len());
1463                    // Remove WAL file after successful compaction
1464                    let wal_path = {
1465                        let mut p = index_path.as_os_str().to_owned();
1466                        p.push(".wal");
1467                        std::path::PathBuf::from(p)
1468                    };
1469                    let _ = fs::remove_file(wal_path);
1470                }
1471            }
1472        }
1473
1474        match first_error {
1475            Some(e) => Err(e),
1476            None => Ok(()),
1477        }
1478    }
1479}
1480
1481impl Drop for Storage {
1482    fn drop(&mut self) {
1483        if let Err(e) = self.flush_indexes() {
1484            tracing::error!(error = %e, "failed to flush indexes during Storage drop");
1485        }
1486    }
1487}
1488
1489// ============================================================================
1490// StorageBackend trait impl
1491// ============================================================================
1492
1493impl crate::backend::StorageBackend for Storage {
1494    fn append_batch(
1495        &mut self,
1496        stream_id: StreamId,
1497        events: Vec<Bytes>,
1498        expected_offset: Offset,
1499        prev_hash: Option<ChainHash>,
1500        fsync: bool,
1501    ) -> Result<(Offset, ChainHash), StorageError> {
1502        Storage::append_batch(self, stream_id, events, expected_offset, prev_hash, fsync)
1503    }
1504
1505    fn read_from(
1506        &mut self,
1507        stream_id: StreamId,
1508        from_offset: Offset,
1509        max_bytes: u64,
1510    ) -> Result<Vec<Bytes>, StorageError> {
1511        Storage::read_from(self, stream_id, from_offset, max_bytes)
1512    }
1513
1514    fn latest_chain_hash(
1515        &mut self,
1516        stream_id: StreamId,
1517    ) -> Result<Option<ChainHash>, StorageError> {
1518        Storage::latest_chain_hash(self, stream_id)
1519    }
1520
1521    fn segment_count(&self, stream_id: StreamId) -> usize {
1522        Storage::segment_count(self, stream_id)
1523    }
1524
1525    fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
1526        Storage::completed_segments(self, stream_id)
1527    }
1528
1529    fn flush_indexes(&mut self) -> Result<(), StorageError> {
1530        Storage::flush_indexes(self)
1531    }
1532
1533    #[cfg(feature = "fuzz-reset")]
1534    fn reset(&mut self) -> Result<(), StorageError> {
1535        Storage::reset(self)
1536    }
1537}