Skip to main content

kimberlite_storage/
memory.rs

1//! Pure in-memory implementation of [`StorageBackend`].
2//!
3//! Mirrors enough of the on-disk [`crate::Storage`] semantics for tests
4//! and ephemeral worker processes to run the full `Kimberlite` stack
5//! without touching disk. Explicitly NOT a fault-injection backend —
6//! see `kimberlite-sim::SimStorage` for that.
7//!
8//! # What is preserved
9//!
10//! - **Hash-chain determinism**: `append_batch` computes per-record
11//!   hashes exactly like [`crate::Storage`]. A byte-for-byte comparable
12//!   sequence of appends produces the same terminal `ChainHash`. The
13//!   parity test in `crate::tests` locks this in.
14//! - **Offset monotonicity**: offsets advance by `events.len()` per
15//!   batch, matching the on-disk impl.
16//! - **Notional segment rotation**: when the total in-memory size for
17//!   a stream would exceed `max_segment_size`, a new logical segment
18//!   is opened. The data isn't actually partitioned — the segment
19//!   boundaries are just bookkeeping that lets `segment_count` and
20//!   `completed_segments` return the same counts as the on-disk impl
21//!   would on the equivalent workload.
22//! - **Record format**: events are stored as encoded [`Record`] bytes
23//!   so that read paths produce the same visible payload sequence as
24//!   the on-disk impl. A pure `Vec<Bytes>` would have been cheaper
25//!   but would drift from the on-disk format, making parity tests
26//!   fragile.
27//!
28//! # What is dropped
29//!
30//! - No fsync. `fsync` argument is silently ignored.
31//! - No durable state across process restarts — reopening
32//!   `MemoryStorage::new()` always starts empty. `latest_chain_hash`
33//!   on a stream you never appended to returns `Ok(None)`.
34//! - No mmap, no segment files, no manifest.json.
35//! - No compression. All records are stored with `CompressionKind::None`.
36//!   (The on-disk impl only compresses when it actually reduces size,
37//!   so opting out entirely is a semantics-preserving simplification.)
38//! - No checkpoint records. `read_from` never synthesises checkpoints,
39//!   so the `read_records_verified` code path used by `Storage`
40//!   naturally collapses to a linear walk. That's fine — `MemoryStorage`
41//!   is optimised for latency, not for minimising verification cost.
42//!
43//! # Assertions
44//!
45//! Production assertions documented per `docs/internals/testing/assertions-inventory.md`:
46//! - `append_batch` asserts `!events.is_empty()` (matches `Storage`).
47//! - `append_batch` asserts offset monotonicity post-write.
48//!
49//! Each has a paired `#[should_panic]` test in the crate tests module.
50
51use std::collections::HashMap;
52
53use bytes::Bytes;
54use kimberlite_crypto::ChainHash;
55use kimberlite_types::{CompressionKind, Offset, RecordKind, StreamId};
56
57use crate::backend::StorageBackend;
58use crate::error::StorageError;
59use crate::record::Record;
60
61/// Default maximum segment size in bytes (256 MB).
62///
63/// Mirrors the `Storage` constant. Kept private so the two can drift
64/// independently if the on-disk backend ever changes its default
65/// without breaking the memory backend's tests.
66const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
67
68/// Bookkeeping for a single notional segment.
69///
70/// Records aren't actually partitioned — the `bytes` field is the
71/// shared per-stream record buffer. Each segment just remembers its
72/// byte range within that buffer so `segment_count` /
73/// `completed_segments` return on-disk-equivalent numbers.
74#[derive(Debug, Clone, Copy)]
75#[allow(dead_code)] // first_offset/next_offset mirror the on-disk manifest layout
76struct SegmentMeta {
77    /// Segment number (0-based).
78    segment_num: u32,
79    /// First logical offset in this segment.
80    first_offset: u64,
81    /// One past the last logical offset in this segment.
82    next_offset: u64,
83    /// Size of this segment in bytes.
84    size_bytes: u64,
85}
86
87/// Per-stream state: all records serialized into a flat buffer plus a
88/// segment manifest that tracks notional rotation boundaries.
89///
90/// The `Default` impl is intentionally absent — a brand-new stream
91/// needs a one-element manifest (segment 0), which `Default::default()`
92/// would produce an empty `Vec` for. Use [`StreamState::new`] instead.
93#[derive(Debug)]
94struct StreamState {
95    /// All record bytes for this stream, in append order.
96    ///
97    /// Shared across notional segments — the segment boundary is
98    /// purely logical. `SegmentMeta::size_bytes` sums to `bytes.len()`.
99    bytes: Vec<u8>,
100    /// Per-record starting byte offsets within `bytes`. Index == offset.
101    record_starts: Vec<u64>,
102    /// Notional segment manifest (ordered by `segment_num`).
103    segments: Vec<SegmentMeta>,
104    /// Active (writable) segment number.
105    active_segment: u32,
106}
107
108impl StreamState {
109    fn new() -> Self {
110        Self {
111            bytes: Vec::new(),
112            record_starts: Vec::new(),
113            segments: vec![SegmentMeta {
114                segment_num: 0,
115                first_offset: 0,
116                next_offset: 0,
117                size_bytes: 0,
118            }],
119            active_segment: 0,
120        }
121    }
122
123    fn active_mut(&mut self) -> &mut SegmentMeta {
124        let active = self.active_segment;
125        self.segments
126            .iter_mut()
127            .find(|s| s.segment_num == active)
128            .expect("active segment must exist in manifest")
129    }
130
131    /// Rotates to a new notional segment. Mirrors `SegmentManifest::rotate`
132    /// in the on-disk impl.
133    fn rotate(&mut self, first_offset: u64) {
134        let new_num = self.active_segment + 1;
135        self.segments.push(SegmentMeta {
136            segment_num: new_num,
137            first_offset,
138            next_offset: first_offset,
139            size_bytes: 0,
140        });
141        self.active_segment = new_num;
142    }
143}
144
145/// Pure in-memory storage backend.
146///
147/// See module docs for the detailed semantic contract. Safe to share
148/// across threads as `Send + Sync`; internal state is behind `&mut self`
149/// so callers wrap it in their own synchronisation (as
150/// `KimberliteInner` does via its outer `RwLock`).
151#[derive(Debug)]
152pub struct MemoryStorage {
153    streams: HashMap<StreamId, StreamState>,
154    max_segment_size: u64,
155}
156
157impl MemoryStorage {
158    /// Creates a new empty in-memory storage with default segment size.
159    pub fn new() -> Self {
160        Self {
161            streams: HashMap::new(),
162            max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
163        }
164    }
165
166    /// Creates a new empty in-memory storage with a custom notional
167    /// segment size. Intended for tests that want to exercise the
168    /// rotation bookkeeping without writing 256 MB of data.
169    pub fn with_max_segment_size(max_segment_size: u64) -> Self {
170        Self {
171            streams: HashMap::new(),
172            max_segment_size,
173        }
174    }
175
176    /// Returns the configured notional segment size in bytes.
177    pub fn max_segment_size(&self) -> u64 {
178        self.max_segment_size
179    }
180}
181
182impl Default for MemoryStorage {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl StorageBackend for MemoryStorage {
189    fn append_batch(
190        &mut self,
191        stream_id: StreamId,
192        events: Vec<Bytes>,
193        expected_offset: Offset,
194        prev_hash: Option<ChainHash>,
195        _fsync: bool,
196    ) -> Result<(Offset, ChainHash), StorageError> {
197        // Precondition: batch must not be empty. Matches `Storage`.
198        assert!(!events.is_empty(), "cannot append empty batch");
199
200        // `or_insert_with` — StreamState deliberately does not impl
201        // Default (a default Vec-of-segments is empty, which would
202        // break the `active_mut()` invariant).
203        let stream = self
204            .streams
205            .entry(stream_id)
206            .or_insert_with(StreamState::new);
207
208        let mut current_offset = expected_offset;
209        let mut current_hash = prev_hash;
210
211        for event in events {
212            // Pure `Record` with no compression — hash is computed over
213            // the original payload, matching `Storage::append_batch`.
214            let hash_record = Record::new(current_offset, current_hash, event.clone());
215            let record_hash = hash_record.compute_hash();
216
217            let stored_record = Record::with_compression(
218                current_offset,
219                current_hash,
220                RecordKind::Data,
221                CompressionKind::None,
222                event,
223            );
224            let bytes = stored_record.to_bytes();
225
226            stream.record_starts.push(stream.bytes.len() as u64);
227            stream.bytes.extend_from_slice(&bytes);
228
229            {
230                let active = stream.active_mut();
231                active.next_offset = current_offset.as_u64() + 1;
232                active.size_bytes += bytes.len() as u64;
233            }
234
235            current_hash = Some(record_hash);
236            current_offset += Offset::from(1u64);
237        }
238
239        // Rotate if the active segment exceeded the notional limit.
240        // Matches the on-disk impl's post-batch rotation check.
241        let should_rotate = stream.active_mut().size_bytes >= self.max_segment_size;
242        if should_rotate {
243            stream.rotate(current_offset.as_u64());
244        }
245
246        // Postcondition: offset must only advance forward.
247        assert!(
248            current_offset.as_u64() >= expected_offset.as_u64(),
249            "offset must only advance forward after append_batch"
250        );
251
252        Ok((
253            current_offset,
254            current_hash.expect("batch was non-empty, hash must be set"),
255        ))
256    }
257
258    fn read_from(
259        &mut self,
260        stream_id: StreamId,
261        from_offset: Offset,
262        max_bytes: u64,
263    ) -> Result<Vec<Bytes>, StorageError> {
264        let Some(stream) = self.streams.get(&stream_id) else {
265            return Ok(Vec::new());
266        };
267
268        let mut results = Vec::new();
269        let mut bytes_read: u64 = 0;
270        let mut expected_prev_hash: Option<ChainHash> = None;
271
272        // Linear walk through the record buffer, verifying the full
273        // hash chain (genesis-to-from_offset). Equivalent to the
274        // on-disk `read_records_from_genesis` code path modulo
275        // checkpoint skipping, which `MemoryStorage` doesn't emit.
276        let buf = Bytes::copy_from_slice(&stream.bytes);
277        let mut pos = 0usize;
278        while pos < buf.len() && bytes_read < max_bytes {
279            let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
280
281            if record.prev_hash() != expected_prev_hash {
282                return Err(StorageError::ChainVerificationFailed {
283                    offset: record.offset(),
284                    expected: expected_prev_hash,
285                    actual: record.prev_hash(),
286                });
287            }
288
289            expected_prev_hash = Some(record.compute_hash());
290            pos += consumed;
291
292            // `MemoryStorage` never emits checkpoint records, but the
293            // skip is cheap and keeps us honest if the record
294            // serialiser ever changes.
295            if record.offset() < from_offset || record.is_checkpoint() {
296                continue;
297            }
298
299            bytes_read += record.payload().len() as u64;
300            results.push(record.payload().clone());
301        }
302
303        Ok(results)
304    }
305
306    fn latest_chain_hash(
307        &mut self,
308        stream_id: StreamId,
309    ) -> Result<Option<ChainHash>, StorageError> {
310        let Some(stream) = self.streams.get(&stream_id) else {
311            return Ok(None);
312        };
313
314        if stream.bytes.is_empty() {
315            return Ok(None);
316        }
317
318        let buf = Bytes::copy_from_slice(&stream.bytes);
319        let mut pos = 0usize;
320        let mut last_hash: Option<ChainHash> = None;
321        while pos < buf.len() {
322            let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
323            last_hash = Some(record.compute_hash());
324            pos += consumed;
325        }
326        Ok(last_hash)
327    }
328
329    fn segment_count(&self, stream_id: StreamId) -> usize {
330        self.streams.get(&stream_id).map_or(0, |s| s.segments.len())
331    }
332
333    fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
334        self.streams.get(&stream_id).map_or_else(Vec::new, |s| {
335            s.segments
336                .iter()
337                .filter(|seg| seg.segment_num != s.active_segment)
338                .map(|seg| seg.segment_num)
339                .collect()
340        })
341    }
342
343    fn flush_indexes(&mut self) -> Result<(), StorageError> {
344        // No-op. In-memory state is always "flushed" in the sense that
345        // a subsequent `read_from` will observe it. No index file to
346        // compact.
347        Ok(())
348    }
349
350    #[cfg(feature = "fuzz-reset")]
351    fn reset(&mut self) -> Result<(), StorageError> {
352        self.streams.clear();
353        Ok(())
354    }
355}