1use std::cell::RefCell;
11use std::collections::BTreeMap;
12use vortex_core::{DetRng, NodeId, VortexError, VortexStorage};
13
14#[derive(Debug, Clone)]
16pub struct DiskModel {
17 pub reorder_on_crash: bool,
19 pub max_pending: usize,
21}
22
23impl Default for DiskModel {
24 fn default() -> Self {
25 Self {
26 reorder_on_crash: true,
27 max_pending: 64,
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct StorageFaultConfig {
35 pub disk_full: bool,
37 pub read_error: bool,
39 pub write_error: bool,
41 pub snapshot_failure: bool,
43 pub silent_corrupt_probability: f64,
46 pub slow_disk_ticks: u64,
49}
50
51impl Default for StorageFaultConfig {
52 fn default() -> Self {
53 Self {
54 disk_full: false,
55 read_error: false,
56 write_error: false,
57 snapshot_failure: false,
58 silent_corrupt_probability: 0.0,
59 slow_disk_ticks: 0,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub enum WalOp {
67 Put { key: Vec<u8>, value: Vec<u8> },
68 Delete { key: Vec<u8> },
69}
70
71#[derive(Debug, Clone)]
73pub struct WalEntry {
74 pub lsn: u64,
76 pub op: WalOp,
78 pub crc: u32,
80 pub fsynced: bool,
82}
83
84impl WalEntry {
85 fn compute_crc(lsn: u64, op: &WalOp) -> u32 {
86 let mut crc: u32 = 0xFFFFFFFF;
88 let lsn_bytes = lsn.to_le_bytes();
89 for &b in &lsn_bytes {
90 crc ^= b as u32;
91 for _ in 0..8 {
92 crc = if crc & 1 != 0 {
93 (crc >> 1) ^ 0xEDB88320
94 } else {
95 crc >> 1
96 };
97 }
98 }
99 let data = match op {
100 WalOp::Put { key, value } => [key.as_slice(), value.as_slice()].concat(),
101 WalOp::Delete { key } => key.clone(),
102 };
103 for &b in &data {
104 crc ^= b as u32;
105 for _ in 0..8 {
106 crc = if crc & 1 != 0 {
107 (crc >> 1) ^ 0xEDB88320
108 } else {
109 crc >> 1
110 };
111 }
112 }
113 !crc
114 }
115
116 fn new(lsn: u64, op: WalOp) -> Self {
117 let crc = Self::compute_crc(lsn, &op);
118 Self {
119 lsn,
120 op,
121 crc,
122 fsynced: false,
123 }
124 }
125
126 pub fn verify(&self) -> bool {
128 Self::compute_crc(self.lsn, &self.op) == self.crc
129 }
130}
131
132pub struct SimWal {
134 entries: Vec<WalEntry>,
135 next_lsn: u64,
136 fsynced_up_to: u64,
137}
138
139impl SimWal {
140 pub fn new() -> Self {
141 Self {
142 entries: Vec::new(),
143 next_lsn: 0,
144 fsynced_up_to: 0,
145 }
146 }
147
148 pub fn append(&mut self, op: WalOp) -> u64 {
150 let lsn = self.next_lsn;
151 self.next_lsn += 1;
152 self.entries.push(WalEntry::new(lsn, op));
153 lsn
154 }
155
156 pub fn fsync(&mut self) {
158 for entry in &mut self.entries {
159 entry.fsynced = true;
160 }
161 self.fsynced_up_to = self.next_lsn;
162 }
163
164 pub fn crash(&mut self) {
166 self.entries.retain(|e| e.fsynced);
167 self.next_lsn = self.fsynced_up_to;
168 }
169
170 pub fn recover(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
172 let mut map = BTreeMap::new();
173 for entry in &self.entries {
174 if !entry.fsynced || !entry.verify() {
175 continue;
176 }
177 match &entry.op {
178 WalOp::Put { key, value } => {
179 map.insert(key.clone(), value.clone());
180 }
181 WalOp::Delete { key } => {
182 map.remove(key);
183 }
184 }
185 }
186 map
187 }
188
189 pub fn entries(&self) -> &[WalEntry] {
191 &self.entries
192 }
193
194 pub fn len(&self) -> usize {
196 self.entries.len()
197 }
198
199 pub fn is_empty(&self) -> bool {
201 self.entries.is_empty()
202 }
203
204 pub fn fsynced_up_to(&self) -> u64 {
206 self.fsynced_up_to
207 }
208}
209
210impl Default for SimWal {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216pub struct SimStorage {
220 data: BTreeMap<Vec<u8>, Vec<u8>>,
222 wal: SimWal,
224 disk_model: DiskModel,
226 faults: StorageFaultConfig,
228 node_id: NodeId,
230 rng: RefCell<DetRng>,
232 reads: u64,
234 writes: u64,
235}
236
237impl SimStorage {
238 pub fn new(node_id: NodeId) -> Self {
240 Self {
241 data: BTreeMap::new(),
242 wal: SimWal::new(),
243 disk_model: DiskModel::default(),
244 faults: StorageFaultConfig::default(),
245 node_id,
246 rng: RefCell::new(DetRng::derive(node_id, "storage")),
247 reads: 0,
248 writes: 0,
249 }
250 }
251
252 pub fn with_disk_model(node_id: NodeId, disk_model: DiskModel) -> Self {
254 Self {
255 disk_model,
256 ..Self::new(node_id)
257 }
258 }
259
260 pub fn set_snapshot_failure(&mut self, fail: bool) {
262 self.faults.snapshot_failure = fail;
263 }
264
265 pub fn set_silent_corrupt_probability(&mut self, p: f64) {
267 self.faults.silent_corrupt_probability = p;
268 }
269
270 pub fn set_faults(&mut self, faults: StorageFaultConfig) {
272 self.faults = faults;
273 }
274
275 pub fn set_disk_full(&mut self, full: bool) {
277 self.faults.disk_full = full;
278 }
279
280 pub fn set_read_error(&mut self, err: bool) {
282 self.faults.read_error = err;
283 }
284
285 pub fn crash(&mut self) {
287 self.wal.crash();
288 self.data = self.wal.recover();
289 }
290
291 pub fn crash_and_recover(&mut self, rng: &mut DetRng) {
293 if self.disk_model.reorder_on_crash {
294 let mut survivors = Vec::new();
296 for entry in self.wal.entries() {
297 if entry.fsynced || rng.chance(0.3) {
298 survivors.push(entry.clone());
299 }
300 }
301 let fsynced_count = survivors.iter().filter(|e| e.fsynced).count();
303 let unfsynced: Vec<_> = survivors.drain(fsynced_count..).collect();
304 let mut unfsynced = unfsynced;
305 rng.shuffle(&mut unfsynced);
306 survivors.extend(unfsynced);
307
308 self.data.clear();
310 for entry in &survivors {
311 if entry.verify() {
312 match &entry.op {
313 WalOp::Put { key, value } => {
314 self.data.insert(key.clone(), value.clone());
315 }
316 WalOp::Delete { key } => {
317 self.data.remove(key);
318 }
319 }
320 }
321 }
322 } else {
323 self.crash();
324 }
325 }
326
327 pub fn wal(&self) -> &SimWal {
329 &self.wal
330 }
331
332 pub fn node_id(&self) -> NodeId {
334 self.node_id
335 }
336
337 pub fn total_reads(&self) -> u64 {
339 self.reads
340 }
341
342 pub fn total_writes(&self) -> u64 {
344 self.writes
345 }
346
347 pub fn faults(&self) -> &StorageFaultConfig {
349 &self.faults
350 }
351}
352
353impl VortexStorage for SimStorage {
354 fn get(&self, key: &[u8]) -> vortex_core::Result<Option<Vec<u8>>> {
355 if self.faults.read_error {
356 return Err(VortexError::Storage("simulated read error".into()));
357 }
358 match self.data.get(key).cloned() {
359 Some(mut value) => {
360 if self.faults.silent_corrupt_probability > 0.0 {
362 let mut rng = self.rng.borrow_mut();
363 if rng.next_f64() < self.faults.silent_corrupt_probability && !value.is_empty()
364 {
365 let idx = rng.next_u64_below(value.len() as u64) as usize;
366 value[idx] ^= 1u8 << (rng.next_u64_below(8) as u8);
367 }
368 }
369 Ok(Some(value))
370 }
371 None => Ok(None),
372 }
373 }
374
375 fn put(&mut self, key: &[u8], value: &[u8]) -> vortex_core::Result<()> {
376 if self.faults.disk_full {
377 return Err(VortexError::Storage("simulated disk full".into()));
378 }
379 if self.faults.write_error {
380 return Err(VortexError::Storage("simulated write error".into()));
381 }
382 self.writes += 1;
383 self.wal.append(WalOp::Put {
384 key: key.to_vec(),
385 value: value.to_vec(),
386 });
387 self.data.insert(key.to_vec(), value.to_vec());
388 Ok(())
389 }
390
391 fn delete(&mut self, key: &[u8]) -> vortex_core::Result<()> {
392 if self.faults.disk_full {
393 return Err(VortexError::Storage("simulated disk full".into()));
394 }
395 self.writes += 1;
396 self.wal.append(WalOp::Delete { key: key.to_vec() });
397 self.data.remove(key);
398 Ok(())
399 }
400
401 fn scan(&self, start: &[u8], end: &[u8]) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
402 if self.faults.read_error {
403 return Err(VortexError::Storage("simulated read error".into()));
404 }
405 Ok(self
406 .data
407 .range(start.to_vec()..end.to_vec())
408 .map(|(k, v)| (k.clone(), v.clone()))
409 .collect())
410 }
411
412 fn write_batch(&mut self, ops: Vec<vortex_core::StorageOp>) -> vortex_core::Result<()> {
413 if self.faults.disk_full {
414 return Err(VortexError::Storage("simulated disk full".into()));
415 }
416 for op in ops {
417 match op {
418 vortex_core::StorageOp::Put { key, value } => {
419 self.put(&key, &value)?;
420 }
421 vortex_core::StorageOp::Delete { key } => {
422 self.delete(&key)?;
423 }
424 }
425 }
426 Ok(())
427 }
428
429 fn flush(&mut self) -> vortex_core::Result<()> {
430 if self.faults.disk_full {
431 return Err(VortexError::Storage("simulated disk full".into()));
432 }
433 self.wal.fsync();
434 Ok(())
435 }
436
437 fn snapshot(&self) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
438 if self.faults.snapshot_failure {
439 return Err(VortexError::Storage("simulated snapshot failure".into()));
440 }
441 if self.faults.read_error {
442 return Err(VortexError::Storage("simulated read error".into()));
443 }
444 Ok(self
445 .data
446 .iter()
447 .map(|(k, v)| (k.clone(), v.clone()))
448 .collect())
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_basic_get_put() {
458 let mut store = SimStorage::new(1);
459 store.put(b"key1", b"val1").unwrap();
460 assert_eq!(store.get(b"key1").unwrap(), Some(b"val1".to_vec()));
461 assert_eq!(store.get(b"missing").unwrap(), None);
462 }
463
464 #[test]
465 fn test_delete() {
466 let mut store = SimStorage::new(1);
467 store.put(b"key1", b"val1").unwrap();
468 store.delete(b"key1").unwrap();
469 assert_eq!(store.get(b"key1").unwrap(), None);
470 }
471
472 #[test]
473 fn test_scan() {
474 let mut store = SimStorage::new(1);
475 store.put(b"a", b"1").unwrap();
476 store.put(b"b", b"2").unwrap();
477 store.put(b"c", b"3").unwrap();
478 store.put(b"d", b"4").unwrap();
479 let results = store.scan(b"b", b"d").unwrap();
480 assert_eq!(results.len(), 2);
481 assert_eq!(results[0].0, b"b");
482 assert_eq!(results[1].0, b"c");
483 }
484
485 #[test]
486 fn test_disk_full() {
487 let mut store = SimStorage::new(1);
488 store.set_disk_full(true);
489 assert!(store.put(b"key", b"val").is_err());
490 }
491
492 #[test]
493 fn test_read_error() {
494 let mut store = SimStorage::new(1);
495 store.put(b"key", b"val").unwrap();
496 store.set_read_error(true);
497 assert!(store.get(b"key").is_err());
498 }
499
500 #[test]
501 fn test_crash_loses_unfsynced() {
502 let mut store = SimStorage::new(1);
503 store.put(b"fsynced", b"yes").unwrap();
504 store.flush().unwrap();
505 store.put(b"unfsynced", b"lost").unwrap();
506 store.crash();
507 assert_eq!(store.get(b"fsynced").unwrap(), Some(b"yes".to_vec()));
508 assert_eq!(store.get(b"unfsynced").unwrap(), None);
509 }
510
511 #[test]
512 fn test_crash_and_recover_with_reorder() {
513 let mut store = SimStorage::new(1);
514 let mut rng = DetRng::new(42);
515 store.put(b"durable", b"yes").unwrap();
516 store.flush().unwrap();
517 for i in 0..10 {
518 store
519 .put(format!("pending-{i}").as_bytes(), b"maybe")
520 .unwrap();
521 }
522 store.crash_and_recover(&mut rng);
523 assert_eq!(store.get(b"durable").unwrap(), Some(b"yes".to_vec()));
525 }
526
527 #[test]
528 fn test_wal_crc_verification() {
529 let entry = WalEntry::new(
530 0,
531 WalOp::Put {
532 key: b"k".to_vec(),
533 value: b"v".to_vec(),
534 },
535 );
536 assert!(entry.verify());
537 let mut bad = entry.clone();
538 bad.crc = 0xDEADBEEF;
539 assert!(!bad.verify());
540 }
541
542 #[test]
543 fn test_snapshot() {
544 let mut store = SimStorage::new(1);
545 store.put(b"a", b"1").unwrap();
546 store.put(b"b", b"2").unwrap();
547 let snap = store.snapshot().unwrap();
548 assert_eq!(snap.len(), 2);
549 }
550
551 #[test]
552 fn test_snapshot_failure() {
553 let mut store = SimStorage::new(1);
554 store.put(b"key", b"val").unwrap();
555 store.set_snapshot_failure(true);
556 assert!(store.snapshot().is_err());
557 }
558
559 #[test]
560 fn test_silent_corruption() {
561 let mut store = SimStorage::new(1);
562 store.put(b"key", b"hello world value").unwrap();
563 store.set_silent_corrupt_probability(1.0);
564 let val = store.get(b"key").unwrap().unwrap();
566 assert_ne!(val, b"hello world value");
567 }
568
569 #[test]
570 fn test_silent_corruption_zero_probability() {
571 let mut store = SimStorage::new(1);
572 store.put(b"key", b"hello").unwrap();
573 store.set_silent_corrupt_probability(0.0);
574 let val = store.get(b"key").unwrap().unwrap();
575 assert_eq!(val, b"hello");
576 }
577
578 #[test]
579 fn test_write_batch() {
580 let mut store = SimStorage::new(1);
581 store
582 .write_batch(vec![
583 vortex_core::StorageOp::Put {
584 key: b"x".to_vec(),
585 value: b"1".to_vec(),
586 },
587 vortex_core::StorageOp::Put {
588 key: b"y".to_vec(),
589 value: b"2".to_vec(),
590 },
591 ])
592 .unwrap();
593 assert_eq!(store.get(b"x").unwrap(), Some(b"1".to_vec()));
594 assert_eq!(store.get(b"y").unwrap(), Some(b"2".to_vec()));
595 }
596}