kimberlite_storage/memory.rs
1//! Pure in-memory implementation of [`StorageBackend`].
2//!
3//! Mirrors enough of the on-disk [`crate::Storage`] semantics for tests
4//! and ephemeral worker processes to run the full `Kimberlite` stack
5//! without touching disk. Explicitly NOT a fault-injection backend —
6//! see `kimberlite-sim::SimStorage` for that.
7//!
8//! # What is preserved
9//!
10//! - **Hash-chain determinism**: `append_batch` computes per-record
11//! hashes exactly like [`crate::Storage`]. A byte-for-byte comparable
12//! sequence of appends produces the same terminal `ChainHash`. The
13//! parity test in `crate::tests` locks this in.
14//! - **Offset monotonicity**: offsets advance by `events.len()` per
15//! batch, matching the on-disk impl.
16//! - **Notional segment rotation**: when the total in-memory size for
17//! a stream would exceed `max_segment_size`, a new logical segment
18//! is opened. The data isn't actually partitioned — the segment
19//! boundaries are just bookkeeping that lets `segment_count` and
20//! `completed_segments` return the same counts as the on-disk impl
21//! would on the equivalent workload.
22//! - **Record format**: events are stored as encoded [`Record`] bytes
23//! so that read paths produce the same visible payload sequence as
24//! the on-disk impl. A pure `Vec<Bytes>` would have been cheaper
25//! but would drift from the on-disk format, making parity tests
26//! fragile.
27//!
28//! # What is dropped
29//!
30//! - No fsync. `fsync` argument is silently ignored.
31//! - No durable state across process restarts — reopening
32//! `MemoryStorage::new()` always starts empty. `latest_chain_hash`
33//! on a stream you never appended to returns `Ok(None)`.
34//! - No mmap, no segment files, no manifest.json.
35//! - No compression. All records are stored with `CompressionKind::None`.
36//! (The on-disk impl only compresses when it actually reduces size,
37//! so opting out entirely is a semantics-preserving simplification.)
38//! - No checkpoint records. `read_from` never synthesises checkpoints,
39//! so the `read_records_verified` code path used by `Storage`
40//! naturally collapses to a linear walk. That's fine — `MemoryStorage`
41//! is optimised for latency, not for minimising verification cost.
42//!
43//! # Assertions
44//!
45//! Production assertions documented per `docs/internals/testing/assertions-inventory.md`:
46//! - `append_batch` asserts `!events.is_empty()` (matches `Storage`).
47//! - `append_batch` asserts offset monotonicity post-write.
48//!
49//! Each has a paired `#[should_panic]` test in the crate tests module.
50
51use std::collections::HashMap;
52
53use bytes::Bytes;
54use kimberlite_crypto::ChainHash;
55use kimberlite_types::{CompressionKind, Offset, RecordKind, StreamId};
56
57use crate::backend::StorageBackend;
58use crate::error::StorageError;
59use crate::record::Record;
60
61/// Default maximum segment size in bytes (256 MB).
62///
63/// Mirrors the `Storage` constant. Kept private so the two can drift
64/// independently if the on-disk backend ever changes its default
65/// without breaking the memory backend's tests.
66const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
67
68/// Bookkeeping for a single notional segment.
69///
70/// Records aren't actually partitioned — the `bytes` field is the
71/// shared per-stream record buffer. Each segment just remembers its
72/// byte range within that buffer so `segment_count` /
73/// `completed_segments` return on-disk-equivalent numbers.
74#[derive(Debug, Clone, Copy)]
75#[allow(dead_code)] // first_offset/next_offset mirror the on-disk manifest layout
76struct SegmentMeta {
77 /// Segment number (0-based).
78 segment_num: u32,
79 /// First logical offset in this segment.
80 first_offset: u64,
81 /// One past the last logical offset in this segment.
82 next_offset: u64,
83 /// Size of this segment in bytes.
84 size_bytes: u64,
85}
86
87/// Per-stream state: all records serialized into a flat buffer plus a
88/// segment manifest that tracks notional rotation boundaries.
89///
90/// The `Default` impl is intentionally absent — a brand-new stream
91/// needs a one-element manifest (segment 0), which `Default::default()`
92/// would produce an empty `Vec` for. Use [`StreamState::new`] instead.
93#[derive(Debug)]
94struct StreamState {
95 /// All record bytes for this stream, in append order.
96 ///
97 /// Shared across notional segments — the segment boundary is
98 /// purely logical. `SegmentMeta::size_bytes` sums to `bytes.len()`.
99 bytes: Vec<u8>,
100 /// Per-record starting byte offsets within `bytes`. Index == offset.
101 record_starts: Vec<u64>,
102 /// Notional segment manifest (ordered by `segment_num`).
103 segments: Vec<SegmentMeta>,
104 /// Active (writable) segment number.
105 active_segment: u32,
106}
107
108impl StreamState {
109 fn new() -> Self {
110 Self {
111 bytes: Vec::new(),
112 record_starts: Vec::new(),
113 segments: vec![SegmentMeta {
114 segment_num: 0,
115 first_offset: 0,
116 next_offset: 0,
117 size_bytes: 0,
118 }],
119 active_segment: 0,
120 }
121 }
122
123 fn active_mut(&mut self) -> &mut SegmentMeta {
124 let active = self.active_segment;
125 self.segments
126 .iter_mut()
127 .find(|s| s.segment_num == active)
128 .expect("active segment must exist in manifest")
129 }
130
131 /// Rotates to a new notional segment. Mirrors `SegmentManifest::rotate`
132 /// in the on-disk impl.
133 fn rotate(&mut self, first_offset: u64) {
134 let new_num = self.active_segment + 1;
135 self.segments.push(SegmentMeta {
136 segment_num: new_num,
137 first_offset,
138 next_offset: first_offset,
139 size_bytes: 0,
140 });
141 self.active_segment = new_num;
142 }
143}
144
145/// Pure in-memory storage backend.
146///
147/// See module docs for the detailed semantic contract. Safe to share
148/// across threads as `Send + Sync`; internal state is behind `&mut self`
149/// so callers wrap it in their own synchronisation (as
150/// `KimberliteInner` does via its outer `RwLock`).
151#[derive(Debug)]
152pub struct MemoryStorage {
153 streams: HashMap<StreamId, StreamState>,
154 max_segment_size: u64,
155}
156
157impl MemoryStorage {
158 /// Creates a new empty in-memory storage with default segment size.
159 pub fn new() -> Self {
160 Self {
161 streams: HashMap::new(),
162 max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
163 }
164 }
165
166 /// Creates a new empty in-memory storage with a custom notional
167 /// segment size. Intended for tests that want to exercise the
168 /// rotation bookkeeping without writing 256 MB of data.
169 pub fn with_max_segment_size(max_segment_size: u64) -> Self {
170 Self {
171 streams: HashMap::new(),
172 max_segment_size,
173 }
174 }
175
176 /// Returns the configured notional segment size in bytes.
177 pub fn max_segment_size(&self) -> u64 {
178 self.max_segment_size
179 }
180}
181
182impl Default for MemoryStorage {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl StorageBackend for MemoryStorage {
189 fn append_batch(
190 &mut self,
191 stream_id: StreamId,
192 events: Vec<Bytes>,
193 expected_offset: Offset,
194 prev_hash: Option<ChainHash>,
195 _fsync: bool,
196 ) -> Result<(Offset, ChainHash), StorageError> {
197 // Precondition: batch must not be empty. Matches `Storage`.
198 assert!(!events.is_empty(), "cannot append empty batch");
199
200 // `or_insert_with` — StreamState deliberately does not impl
201 // Default (a default Vec-of-segments is empty, which would
202 // break the `active_mut()` invariant).
203 let stream = self
204 .streams
205 .entry(stream_id)
206 .or_insert_with(StreamState::new);
207
208 let mut current_offset = expected_offset;
209 let mut current_hash = prev_hash;
210
211 for event in events {
212 // Pure `Record` with no compression — hash is computed over
213 // the original payload, matching `Storage::append_batch`.
214 let hash_record = Record::new(current_offset, current_hash, event.clone());
215 let record_hash = hash_record.compute_hash();
216
217 let stored_record = Record::with_compression(
218 current_offset,
219 current_hash,
220 RecordKind::Data,
221 CompressionKind::None,
222 event,
223 );
224 let bytes = stored_record.to_bytes();
225
226 stream.record_starts.push(stream.bytes.len() as u64);
227 stream.bytes.extend_from_slice(&bytes);
228
229 {
230 let active = stream.active_mut();
231 active.next_offset = current_offset.as_u64() + 1;
232 active.size_bytes += bytes.len() as u64;
233 }
234
235 current_hash = Some(record_hash);
236 current_offset += Offset::from(1u64);
237 }
238
239 // Rotate if the active segment exceeded the notional limit.
240 // Matches the on-disk impl's post-batch rotation check.
241 let should_rotate = stream.active_mut().size_bytes >= self.max_segment_size;
242 if should_rotate {
243 stream.rotate(current_offset.as_u64());
244 }
245
246 // Postcondition: offset must only advance forward.
247 assert!(
248 current_offset.as_u64() >= expected_offset.as_u64(),
249 "offset must only advance forward after append_batch"
250 );
251
252 Ok((
253 current_offset,
254 current_hash.expect("batch was non-empty, hash must be set"),
255 ))
256 }
257
258 fn read_from(
259 &mut self,
260 stream_id: StreamId,
261 from_offset: Offset,
262 max_bytes: u64,
263 ) -> Result<Vec<Bytes>, StorageError> {
264 let Some(stream) = self.streams.get(&stream_id) else {
265 return Ok(Vec::new());
266 };
267
268 let mut results = Vec::new();
269 let mut bytes_read: u64 = 0;
270 let mut expected_prev_hash: Option<ChainHash> = None;
271
272 // Linear walk through the record buffer, verifying the full
273 // hash chain (genesis-to-from_offset). Equivalent to the
274 // on-disk `read_records_from_genesis` code path modulo
275 // checkpoint skipping, which `MemoryStorage` doesn't emit.
276 let buf = Bytes::copy_from_slice(&stream.bytes);
277 let mut pos = 0usize;
278 while pos < buf.len() && bytes_read < max_bytes {
279 let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
280
281 if record.prev_hash() != expected_prev_hash {
282 return Err(StorageError::ChainVerificationFailed {
283 offset: record.offset(),
284 expected: expected_prev_hash,
285 actual: record.prev_hash(),
286 });
287 }
288
289 expected_prev_hash = Some(record.compute_hash());
290 pos += consumed;
291
292 // `MemoryStorage` never emits checkpoint records, but the
293 // skip is cheap and keeps us honest if the record
294 // serialiser ever changes.
295 if record.offset() < from_offset || record.is_checkpoint() {
296 continue;
297 }
298
299 bytes_read += record.payload().len() as u64;
300 results.push(record.payload().clone());
301 }
302
303 Ok(results)
304 }
305
306 fn latest_chain_hash(
307 &mut self,
308 stream_id: StreamId,
309 ) -> Result<Option<ChainHash>, StorageError> {
310 let Some(stream) = self.streams.get(&stream_id) else {
311 return Ok(None);
312 };
313
314 if stream.bytes.is_empty() {
315 return Ok(None);
316 }
317
318 let buf = Bytes::copy_from_slice(&stream.bytes);
319 let mut pos = 0usize;
320 let mut last_hash: Option<ChainHash> = None;
321 while pos < buf.len() {
322 let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
323 last_hash = Some(record.compute_hash());
324 pos += consumed;
325 }
326 Ok(last_hash)
327 }
328
329 fn segment_count(&self, stream_id: StreamId) -> usize {
330 self.streams.get(&stream_id).map_or(0, |s| s.segments.len())
331 }
332
333 fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
334 self.streams.get(&stream_id).map_or_else(Vec::new, |s| {
335 s.segments
336 .iter()
337 .filter(|seg| seg.segment_num != s.active_segment)
338 .map(|seg| seg.segment_num)
339 .collect()
340 })
341 }
342
343 fn flush_indexes(&mut self) -> Result<(), StorageError> {
344 // No-op. In-memory state is always "flushed" in the sense that
345 // a subsequent `read_from` will observe it. No index file to
346 // compact.
347 Ok(())
348 }
349
350 #[cfg(feature = "fuzz-reset")]
351 fn reset(&mut self) -> Result<(), StorageError> {
352 self.streams.clear();
353 Ok(())
354 }
355}