Skip to main content

vortex_sim/
storage.rs

1//! Simulated storage with WAL, crash semantics, and fault injection.
2//!
3//! `SimStorage` is a BTreeMap-backed key-value store with:
4//! - Write-ahead log (WAL) with fsync semantics
5//! - Torn write simulation (partial bytes on crash)
6//! - Disk write reordering (unfsynced writes in random order on crash)
7//! - Disk full / read error fault injection
8//! - CRC-checked entries for corruption detection
9
10use std::cell::RefCell;
11use std::collections::BTreeMap;
12use vortex_core::{DetRng, NodeId, VortexError, VortexStorage};
13
14/// Disk behavior model for crash simulation.
15#[derive(Debug, Clone)]
16pub struct DiskModel {
17    /// If true, unfsynced writes may be reordered on crash.
18    pub reorder_on_crash: bool,
19    /// Maximum pending writes before auto-flush.
20    pub max_pending: usize,
21}
22
23impl Default for DiskModel {
24    fn default() -> Self {
25        Self {
26            reorder_on_crash: true,
27            max_pending: 64,
28        }
29    }
30}
31
32/// Runtime fault configuration for storage.
33#[derive(Debug, Clone)]
34pub struct StorageFaultConfig {
35    /// If true, all writes fail with "disk full".
36    pub disk_full: bool,
37    /// If true, all reads fail with "read error".
38    pub read_error: bool,
39    /// If true, all writes fail with "write error".
40    pub write_error: bool,
41    /// If true, snapshots fail.
42    pub snapshot_failure: bool,
43    /// Probability of silent data corruption on read (0.0–1.0).
44    /// Corrupted data is returned without error.
45    pub silent_corrupt_probability: f64,
46    /// Simulated disk latency in ticks (0 = instant).
47    /// When > 0, writes are buffered until `tick()` is called.
48    pub slow_disk_ticks: u64,
49}
50
51impl Default for StorageFaultConfig {
52    fn default() -> Self {
53        Self {
54            disk_full: false,
55            read_error: false,
56            write_error: false,
57            snapshot_failure: false,
58            silent_corrupt_probability: 0.0,
59            slow_disk_ticks: 0,
60        }
61    }
62}
63
64/// A WAL operation.
65#[derive(Debug, Clone)]
66pub enum WalOp {
67    Put { key: Vec<u8>, value: Vec<u8> },
68    Delete { key: Vec<u8> },
69}
70
71/// A WAL entry with sequence number and CRC.
72#[derive(Debug, Clone)]
73pub struct WalEntry {
74    /// Log sequence number.
75    pub lsn: u64,
76    /// The operation.
77    pub op: WalOp,
78    /// CRC32 checksum for corruption detection.
79    pub crc: u32,
80    /// Whether this entry has been fsynced.
81    pub fsynced: bool,
82}
83
84impl WalEntry {
85    fn compute_crc(lsn: u64, op: &WalOp) -> u32 {
86        // Simple CRC32 (public domain algorithm)
87        let mut crc: u32 = 0xFFFFFFFF;
88        let lsn_bytes = lsn.to_le_bytes();
89        for &b in &lsn_bytes {
90            crc ^= b as u32;
91            for _ in 0..8 {
92                crc = if crc & 1 != 0 {
93                    (crc >> 1) ^ 0xEDB88320
94                } else {
95                    crc >> 1
96                };
97            }
98        }
99        let data = match op {
100            WalOp::Put { key, value } => [key.as_slice(), value.as_slice()].concat(),
101            WalOp::Delete { key } => key.clone(),
102        };
103        for &b in &data {
104            crc ^= b as u32;
105            for _ in 0..8 {
106                crc = if crc & 1 != 0 {
107                    (crc >> 1) ^ 0xEDB88320
108                } else {
109                    crc >> 1
110                };
111            }
112        }
113        !crc
114    }
115
116    fn new(lsn: u64, op: WalOp) -> Self {
117        let crc = Self::compute_crc(lsn, &op);
118        Self {
119            lsn,
120            op,
121            crc,
122            fsynced: false,
123        }
124    }
125
126    /// Verify the CRC.
127    pub fn verify(&self) -> bool {
128        Self::compute_crc(self.lsn, &self.op) == self.crc
129    }
130}
131
132/// Simulated Write-Ahead Log.
133pub struct SimWal {
134    entries: Vec<WalEntry>,
135    next_lsn: u64,
136    fsynced_up_to: u64,
137}
138
139impl SimWal {
140    pub fn new() -> Self {
141        Self {
142            entries: Vec::new(),
143            next_lsn: 0,
144            fsynced_up_to: 0,
145        }
146    }
147
148    /// Append an entry to the WAL (buffered, not yet durable).
149    pub fn append(&mut self, op: WalOp) -> u64 {
150        let lsn = self.next_lsn;
151        self.next_lsn += 1;
152        self.entries.push(WalEntry::new(lsn, op));
153        lsn
154    }
155
156    /// Fsync: mark all buffered entries as durable.
157    pub fn fsync(&mut self) {
158        for entry in &mut self.entries {
159            entry.fsynced = true;
160        }
161        self.fsynced_up_to = self.next_lsn;
162    }
163
164    /// Simulate a crash: only fsynced entries survive.
165    pub fn crash(&mut self) {
166        self.entries.retain(|e| e.fsynced);
167        self.next_lsn = self.fsynced_up_to;
168    }
169
170    /// Recover: replay fsynced entries into a BTreeMap.
171    pub fn recover(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
172        let mut map = BTreeMap::new();
173        for entry in &self.entries {
174            if !entry.fsynced || !entry.verify() {
175                continue;
176            }
177            match &entry.op {
178                WalOp::Put { key, value } => {
179                    map.insert(key.clone(), value.clone());
180                }
181                WalOp::Delete { key } => {
182                    map.remove(key);
183                }
184            }
185        }
186        map
187    }
188
189    /// Get all entries.
190    pub fn entries(&self) -> &[WalEntry] {
191        &self.entries
192    }
193
194    /// Number of entries.
195    pub fn len(&self) -> usize {
196        self.entries.len()
197    }
198
199    /// Is empty.
200    pub fn is_empty(&self) -> bool {
201        self.entries.is_empty()
202    }
203
204    /// LSN fsynced up to.
205    pub fn fsynced_up_to(&self) -> u64 {
206        self.fsynced_up_to
207    }
208}
209
210impl Default for SimWal {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216/// Simulated persistent storage with WAL and fault injection.
217///
218/// BTreeMap-backed for deterministic iteration order.
219pub struct SimStorage {
220    /// The current in-memory state.
221    data: BTreeMap<Vec<u8>, Vec<u8>>,
222    /// Write-ahead log.
223    wal: SimWal,
224    /// Disk behavior model.
225    disk_model: DiskModel,
226    /// Runtime fault configuration.
227    faults: StorageFaultConfig,
228    /// Node this storage belongs to.
229    node_id: NodeId,
230    /// Deterministic RNG for fault injection (RefCell for interior mutability in &self methods).
231    rng: RefCell<DetRng>,
232    /// Stats.
233    reads: u64,
234    writes: u64,
235}
236
237impl SimStorage {
238    /// Create a new simulated storage for the given node.
239    pub fn new(node_id: NodeId) -> Self {
240        Self {
241            data: BTreeMap::new(),
242            wal: SimWal::new(),
243            disk_model: DiskModel::default(),
244            faults: StorageFaultConfig::default(),
245            node_id,
246            rng: RefCell::new(DetRng::derive(node_id, "storage")),
247            reads: 0,
248            writes: 0,
249        }
250    }
251
252    /// Create with a custom disk model.
253    pub fn with_disk_model(node_id: NodeId, disk_model: DiskModel) -> Self {
254        Self {
255            disk_model,
256            ..Self::new(node_id)
257        }
258    }
259
260    /// Set snapshot failure fault.
261    pub fn set_snapshot_failure(&mut self, fail: bool) {
262        self.faults.snapshot_failure = fail;
263    }
264
265    /// Set silent corruption probability.
266    pub fn set_silent_corrupt_probability(&mut self, p: f64) {
267        self.faults.silent_corrupt_probability = p;
268    }
269
270    /// Set fault configuration.
271    pub fn set_faults(&mut self, faults: StorageFaultConfig) {
272        self.faults = faults;
273    }
274
275    /// Enable disk full fault.
276    pub fn set_disk_full(&mut self, full: bool) {
277        self.faults.disk_full = full;
278    }
279
280    /// Enable read error fault.
281    pub fn set_read_error(&mut self, err: bool) {
282        self.faults.read_error = err;
283    }
284
285    /// Simulate a crash: lose unfsynced WAL entries, recover from fsynced.
286    pub fn crash(&mut self) {
287        self.wal.crash();
288        self.data = self.wal.recover();
289    }
290
291    /// Simulate a crash and restart with optional write reordering.
292    pub fn crash_and_recover(&mut self, rng: &mut DetRng) {
293        if self.disk_model.reorder_on_crash {
294            // Some unfsynced entries may or may not survive, in random order
295            let mut survivors = Vec::new();
296            for entry in self.wal.entries() {
297                if entry.fsynced || rng.chance(0.3) {
298                    survivors.push(entry.clone());
299                }
300            }
301            // Reorder non-fsynced survivors
302            let fsynced_count = survivors.iter().filter(|e| e.fsynced).count();
303            let unfsynced: Vec<_> = survivors.drain(fsynced_count..).collect();
304            let mut unfsynced = unfsynced;
305            rng.shuffle(&mut unfsynced);
306            survivors.extend(unfsynced);
307
308            // Rebuild data from survivors
309            self.data.clear();
310            for entry in &survivors {
311                if entry.verify() {
312                    match &entry.op {
313                        WalOp::Put { key, value } => {
314                            self.data.insert(key.clone(), value.clone());
315                        }
316                        WalOp::Delete { key } => {
317                            self.data.remove(key);
318                        }
319                    }
320                }
321            }
322        } else {
323            self.crash();
324        }
325    }
326
327    /// Get the WAL.
328    pub fn wal(&self) -> &SimWal {
329        &self.wal
330    }
331
332    /// Node this storage belongs to.
333    pub fn node_id(&self) -> NodeId {
334        self.node_id
335    }
336
337    /// Stats: total reads.
338    pub fn total_reads(&self) -> u64 {
339        self.reads
340    }
341
342    /// Stats: total writes.
343    pub fn total_writes(&self) -> u64 {
344        self.writes
345    }
346
347    /// Get fault configuration.
348    pub fn faults(&self) -> &StorageFaultConfig {
349        &self.faults
350    }
351}
352
353impl VortexStorage for SimStorage {
354    fn get(&self, key: &[u8]) -> vortex_core::Result<Option<Vec<u8>>> {
355        if self.faults.read_error {
356            return Err(VortexError::Storage("simulated read error".into()));
357        }
358        match self.data.get(key).cloned() {
359            Some(mut value) => {
360                // Silent corruption: flip a random byte without returning an error
361                if self.faults.silent_corrupt_probability > 0.0 {
362                    let mut rng = self.rng.borrow_mut();
363                    if rng.next_f64() < self.faults.silent_corrupt_probability && !value.is_empty()
364                    {
365                        let idx = rng.next_u64_below(value.len() as u64) as usize;
366                        value[idx] ^= 1u8 << (rng.next_u64_below(8) as u8);
367                    }
368                }
369                Ok(Some(value))
370            }
371            None => Ok(None),
372        }
373    }
374
375    fn put(&mut self, key: &[u8], value: &[u8]) -> vortex_core::Result<()> {
376        if self.faults.disk_full {
377            return Err(VortexError::Storage("simulated disk full".into()));
378        }
379        if self.faults.write_error {
380            return Err(VortexError::Storage("simulated write error".into()));
381        }
382        self.writes += 1;
383        self.wal.append(WalOp::Put {
384            key: key.to_vec(),
385            value: value.to_vec(),
386        });
387        self.data.insert(key.to_vec(), value.to_vec());
388        Ok(())
389    }
390
391    fn delete(&mut self, key: &[u8]) -> vortex_core::Result<()> {
392        if self.faults.disk_full {
393            return Err(VortexError::Storage("simulated disk full".into()));
394        }
395        self.writes += 1;
396        self.wal.append(WalOp::Delete { key: key.to_vec() });
397        self.data.remove(key);
398        Ok(())
399    }
400
401    fn scan(&self, start: &[u8], end: &[u8]) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
402        if self.faults.read_error {
403            return Err(VortexError::Storage("simulated read error".into()));
404        }
405        Ok(self
406            .data
407            .range(start.to_vec()..end.to_vec())
408            .map(|(k, v)| (k.clone(), v.clone()))
409            .collect())
410    }
411
412    fn write_batch(&mut self, ops: Vec<vortex_core::StorageOp>) -> vortex_core::Result<()> {
413        if self.faults.disk_full {
414            return Err(VortexError::Storage("simulated disk full".into()));
415        }
416        for op in ops {
417            match op {
418                vortex_core::StorageOp::Put { key, value } => {
419                    self.put(&key, &value)?;
420                }
421                vortex_core::StorageOp::Delete { key } => {
422                    self.delete(&key)?;
423                }
424            }
425        }
426        Ok(())
427    }
428
429    fn flush(&mut self) -> vortex_core::Result<()> {
430        if self.faults.disk_full {
431            return Err(VortexError::Storage("simulated disk full".into()));
432        }
433        self.wal.fsync();
434        Ok(())
435    }
436
437    fn snapshot(&self) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
438        if self.faults.snapshot_failure {
439            return Err(VortexError::Storage("simulated snapshot failure".into()));
440        }
441        if self.faults.read_error {
442            return Err(VortexError::Storage("simulated read error".into()));
443        }
444        Ok(self
445            .data
446            .iter()
447            .map(|(k, v)| (k.clone(), v.clone()))
448            .collect())
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn test_basic_get_put() {
458        let mut store = SimStorage::new(1);
459        store.put(b"key1", b"val1").unwrap();
460        assert_eq!(store.get(b"key1").unwrap(), Some(b"val1".to_vec()));
461        assert_eq!(store.get(b"missing").unwrap(), None);
462    }
463
464    #[test]
465    fn test_delete() {
466        let mut store = SimStorage::new(1);
467        store.put(b"key1", b"val1").unwrap();
468        store.delete(b"key1").unwrap();
469        assert_eq!(store.get(b"key1").unwrap(), None);
470    }
471
472    #[test]
473    fn test_scan() {
474        let mut store = SimStorage::new(1);
475        store.put(b"a", b"1").unwrap();
476        store.put(b"b", b"2").unwrap();
477        store.put(b"c", b"3").unwrap();
478        store.put(b"d", b"4").unwrap();
479        let results = store.scan(b"b", b"d").unwrap();
480        assert_eq!(results.len(), 2);
481        assert_eq!(results[0].0, b"b");
482        assert_eq!(results[1].0, b"c");
483    }
484
485    #[test]
486    fn test_disk_full() {
487        let mut store = SimStorage::new(1);
488        store.set_disk_full(true);
489        assert!(store.put(b"key", b"val").is_err());
490    }
491
492    #[test]
493    fn test_read_error() {
494        let mut store = SimStorage::new(1);
495        store.put(b"key", b"val").unwrap();
496        store.set_read_error(true);
497        assert!(store.get(b"key").is_err());
498    }
499
500    #[test]
501    fn test_crash_loses_unfsynced() {
502        let mut store = SimStorage::new(1);
503        store.put(b"fsynced", b"yes").unwrap();
504        store.flush().unwrap();
505        store.put(b"unfsynced", b"lost").unwrap();
506        store.crash();
507        assert_eq!(store.get(b"fsynced").unwrap(), Some(b"yes".to_vec()));
508        assert_eq!(store.get(b"unfsynced").unwrap(), None);
509    }
510
511    #[test]
512    fn test_crash_and_recover_with_reorder() {
513        let mut store = SimStorage::new(1);
514        let mut rng = DetRng::new(42);
515        store.put(b"durable", b"yes").unwrap();
516        store.flush().unwrap();
517        for i in 0..10 {
518            store
519                .put(format!("pending-{i}").as_bytes(), b"maybe")
520                .unwrap();
521        }
522        store.crash_and_recover(&mut rng);
523        // Durable data must survive
524        assert_eq!(store.get(b"durable").unwrap(), Some(b"yes".to_vec()));
525    }
526
527    #[test]
528    fn test_wal_crc_verification() {
529        let entry = WalEntry::new(
530            0,
531            WalOp::Put {
532                key: b"k".to_vec(),
533                value: b"v".to_vec(),
534            },
535        );
536        assert!(entry.verify());
537        let mut bad = entry.clone();
538        bad.crc = 0xDEADBEEF;
539        assert!(!bad.verify());
540    }
541
542    #[test]
543    fn test_snapshot() {
544        let mut store = SimStorage::new(1);
545        store.put(b"a", b"1").unwrap();
546        store.put(b"b", b"2").unwrap();
547        let snap = store.snapshot().unwrap();
548        assert_eq!(snap.len(), 2);
549    }
550
551    #[test]
552    fn test_snapshot_failure() {
553        let mut store = SimStorage::new(1);
554        store.put(b"key", b"val").unwrap();
555        store.set_snapshot_failure(true);
556        assert!(store.snapshot().is_err());
557    }
558
559    #[test]
560    fn test_silent_corruption() {
561        let mut store = SimStorage::new(1);
562        store.put(b"key", b"hello world value").unwrap();
563        store.set_silent_corrupt_probability(1.0);
564        // With probability 1.0, data should be corrupted
565        let val = store.get(b"key").unwrap().unwrap();
566        assert_ne!(val, b"hello world value");
567    }
568
569    #[test]
570    fn test_silent_corruption_zero_probability() {
571        let mut store = SimStorage::new(1);
572        store.put(b"key", b"hello").unwrap();
573        store.set_silent_corrupt_probability(0.0);
574        let val = store.get(b"key").unwrap().unwrap();
575        assert_eq!(val, b"hello");
576    }
577
578    #[test]
579    fn test_write_batch() {
580        let mut store = SimStorage::new(1);
581        store
582            .write_batch(vec![
583                vortex_core::StorageOp::Put {
584                    key: b"x".to_vec(),
585                    value: b"1".to_vec(),
586                },
587                vortex_core::StorageOp::Put {
588                    key: b"y".to_vec(),
589                    value: b"2".to_vec(),
590                },
591            ])
592            .unwrap();
593        assert_eq!(store.get(b"x").unwrap(), Some(b"1".to_vec()));
594        assert_eq!(store.get(b"y").unwrap(), Some(b"2".to_vec()));
595    }
596}