use std::cell::RefCell;
use std::collections::BTreeMap;
use vortex_core::{DetRng, NodeId, VortexError, VortexStorage};
#[derive(Debug, Clone)]
pub struct DiskModel {
pub reorder_on_crash: bool,
pub max_pending: usize,
}
impl Default for DiskModel {
fn default() -> Self {
Self {
reorder_on_crash: true,
max_pending: 64,
}
}
}
#[derive(Debug, Clone)]
pub struct StorageFaultConfig {
pub disk_full: bool,
pub read_error: bool,
pub write_error: bool,
pub snapshot_failure: bool,
pub silent_corrupt_probability: f64,
pub slow_disk_ticks: u64,
}
impl Default for StorageFaultConfig {
fn default() -> Self {
Self {
disk_full: false,
read_error: false,
write_error: false,
snapshot_failure: false,
silent_corrupt_probability: 0.0,
slow_disk_ticks: 0,
}
}
}
#[derive(Debug, Clone)]
pub enum WalOp {
Put { key: Vec<u8>, value: Vec<u8> },
Delete { key: Vec<u8> },
}
#[derive(Debug, Clone)]
pub struct WalEntry {
pub lsn: u64,
pub op: WalOp,
pub crc: u32,
pub fsynced: bool,
}
impl WalEntry {
fn compute_crc(lsn: u64, op: &WalOp) -> u32 {
let mut crc: u32 = 0xFFFFFFFF;
let lsn_bytes = lsn.to_le_bytes();
for &b in &lsn_bytes {
crc ^= b as u32;
for _ in 0..8 {
crc = if crc & 1 != 0 {
(crc >> 1) ^ 0xEDB88320
} else {
crc >> 1
};
}
}
let data = match op {
WalOp::Put { key, value } => [key.as_slice(), value.as_slice()].concat(),
WalOp::Delete { key } => key.clone(),
};
for &b in &data {
crc ^= b as u32;
for _ in 0..8 {
crc = if crc & 1 != 0 {
(crc >> 1) ^ 0xEDB88320
} else {
crc >> 1
};
}
}
!crc
}
fn new(lsn: u64, op: WalOp) -> Self {
let crc = Self::compute_crc(lsn, &op);
Self {
lsn,
op,
crc,
fsynced: false,
}
}
pub fn verify(&self) -> bool {
Self::compute_crc(self.lsn, &self.op) == self.crc
}
}
pub struct SimWal {
entries: Vec<WalEntry>,
next_lsn: u64,
fsynced_up_to: u64,
}
impl SimWal {
pub fn new() -> Self {
Self {
entries: Vec::new(),
next_lsn: 0,
fsynced_up_to: 0,
}
}
pub fn append(&mut self, op: WalOp) -> u64 {
let lsn = self.next_lsn;
self.next_lsn += 1;
self.entries.push(WalEntry::new(lsn, op));
lsn
}
pub fn fsync(&mut self) {
for entry in &mut self.entries {
entry.fsynced = true;
}
self.fsynced_up_to = self.next_lsn;
}
pub fn crash(&mut self) {
self.entries.retain(|e| e.fsynced);
self.next_lsn = self.fsynced_up_to;
}
pub fn recover(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
let mut map = BTreeMap::new();
for entry in &self.entries {
if !entry.fsynced || !entry.verify() {
continue;
}
match &entry.op {
WalOp::Put { key, value } => {
map.insert(key.clone(), value.clone());
}
WalOp::Delete { key } => {
map.remove(key);
}
}
}
map
}
pub fn entries(&self) -> &[WalEntry] {
&self.entries
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn fsynced_up_to(&self) -> u64 {
self.fsynced_up_to
}
}
impl Default for SimWal {
fn default() -> Self {
Self::new()
}
}
pub struct SimStorage {
data: BTreeMap<Vec<u8>, Vec<u8>>,
wal: SimWal,
disk_model: DiskModel,
faults: StorageFaultConfig,
node_id: NodeId,
rng: RefCell<DetRng>,
reads: u64,
writes: u64,
}
impl SimStorage {
pub fn new(node_id: NodeId) -> Self {
Self {
data: BTreeMap::new(),
wal: SimWal::new(),
disk_model: DiskModel::default(),
faults: StorageFaultConfig::default(),
node_id,
rng: RefCell::new(DetRng::derive(node_id, "storage")),
reads: 0,
writes: 0,
}
}
pub fn with_disk_model(node_id: NodeId, disk_model: DiskModel) -> Self {
Self {
disk_model,
..Self::new(node_id)
}
}
pub fn set_snapshot_failure(&mut self, fail: bool) {
self.faults.snapshot_failure = fail;
}
pub fn set_silent_corrupt_probability(&mut self, p: f64) {
self.faults.silent_corrupt_probability = p;
}
pub fn set_faults(&mut self, faults: StorageFaultConfig) {
self.faults = faults;
}
pub fn set_disk_full(&mut self, full: bool) {
self.faults.disk_full = full;
}
pub fn set_read_error(&mut self, err: bool) {
self.faults.read_error = err;
}
pub fn crash(&mut self) {
self.wal.crash();
self.data = self.wal.recover();
}
pub fn crash_and_recover(&mut self, rng: &mut DetRng) {
if self.disk_model.reorder_on_crash {
let mut survivors = Vec::new();
for entry in self.wal.entries() {
if entry.fsynced || rng.chance(0.3) {
survivors.push(entry.clone());
}
}
let fsynced_count = survivors.iter().filter(|e| e.fsynced).count();
let unfsynced: Vec<_> = survivors.drain(fsynced_count..).collect();
let mut unfsynced = unfsynced;
rng.shuffle(&mut unfsynced);
survivors.extend(unfsynced);
self.data.clear();
for entry in &survivors {
if entry.verify() {
match &entry.op {
WalOp::Put { key, value } => {
self.data.insert(key.clone(), value.clone());
}
WalOp::Delete { key } => {
self.data.remove(key);
}
}
}
}
} else {
self.crash();
}
}
pub fn wal(&self) -> &SimWal {
&self.wal
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn total_reads(&self) -> u64 {
self.reads
}
pub fn total_writes(&self) -> u64 {
self.writes
}
pub fn faults(&self) -> &StorageFaultConfig {
&self.faults
}
}
impl VortexStorage for SimStorage {
fn get(&self, key: &[u8]) -> vortex_core::Result<Option<Vec<u8>>> {
if self.faults.read_error {
return Err(VortexError::Storage("simulated read error".into()));
}
match self.data.get(key).cloned() {
Some(mut value) => {
if self.faults.silent_corrupt_probability > 0.0 {
let mut rng = self.rng.borrow_mut();
if rng.next_f64() < self.faults.silent_corrupt_probability && !value.is_empty()
{
let idx = rng.next_u64_below(value.len() as u64) as usize;
value[idx] ^= 1u8 << (rng.next_u64_below(8) as u8);
}
}
Ok(Some(value))
}
None => Ok(None),
}
}
fn put(&mut self, key: &[u8], value: &[u8]) -> vortex_core::Result<()> {
if self.faults.disk_full {
return Err(VortexError::Storage("simulated disk full".into()));
}
if self.faults.write_error {
return Err(VortexError::Storage("simulated write error".into()));
}
self.writes += 1;
self.wal.append(WalOp::Put {
key: key.to_vec(),
value: value.to_vec(),
});
self.data.insert(key.to_vec(), value.to_vec());
Ok(())
}
fn delete(&mut self, key: &[u8]) -> vortex_core::Result<()> {
if self.faults.disk_full {
return Err(VortexError::Storage("simulated disk full".into()));
}
self.writes += 1;
self.wal.append(WalOp::Delete { key: key.to_vec() });
self.data.remove(key);
Ok(())
}
fn scan(&self, start: &[u8], end: &[u8]) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
if self.faults.read_error {
return Err(VortexError::Storage("simulated read error".into()));
}
Ok(self
.data
.range(start.to_vec()..end.to_vec())
.map(|(k, v)| (k.clone(), v.clone()))
.collect())
}
fn write_batch(&mut self, ops: Vec<vortex_core::StorageOp>) -> vortex_core::Result<()> {
if self.faults.disk_full {
return Err(VortexError::Storage("simulated disk full".into()));
}
for op in ops {
match op {
vortex_core::StorageOp::Put { key, value } => {
self.put(&key, &value)?;
}
vortex_core::StorageOp::Delete { key } => {
self.delete(&key)?;
}
}
}
Ok(())
}
fn flush(&mut self) -> vortex_core::Result<()> {
if self.faults.disk_full {
return Err(VortexError::Storage("simulated disk full".into()));
}
self.wal.fsync();
Ok(())
}
fn snapshot(&self) -> vortex_core::Result<Vec<(Vec<u8>, Vec<u8>)>> {
if self.faults.snapshot_failure {
return Err(VortexError::Storage("simulated snapshot failure".into()));
}
if self.faults.read_error {
return Err(VortexError::Storage("simulated read error".into()));
}
Ok(self
.data
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_get_put() {
let mut store = SimStorage::new(1);
store.put(b"key1", b"val1").unwrap();
assert_eq!(store.get(b"key1").unwrap(), Some(b"val1".to_vec()));
assert_eq!(store.get(b"missing").unwrap(), None);
}
#[test]
fn test_delete() {
let mut store = SimStorage::new(1);
store.put(b"key1", b"val1").unwrap();
store.delete(b"key1").unwrap();
assert_eq!(store.get(b"key1").unwrap(), None);
}
#[test]
fn test_scan() {
let mut store = SimStorage::new(1);
store.put(b"a", b"1").unwrap();
store.put(b"b", b"2").unwrap();
store.put(b"c", b"3").unwrap();
store.put(b"d", b"4").unwrap();
let results = store.scan(b"b", b"d").unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].0, b"b");
assert_eq!(results[1].0, b"c");
}
#[test]
fn test_disk_full() {
let mut store = SimStorage::new(1);
store.set_disk_full(true);
assert!(store.put(b"key", b"val").is_err());
}
#[test]
fn test_read_error() {
let mut store = SimStorage::new(1);
store.put(b"key", b"val").unwrap();
store.set_read_error(true);
assert!(store.get(b"key").is_err());
}
#[test]
fn test_crash_loses_unfsynced() {
let mut store = SimStorage::new(1);
store.put(b"fsynced", b"yes").unwrap();
store.flush().unwrap();
store.put(b"unfsynced", b"lost").unwrap();
store.crash();
assert_eq!(store.get(b"fsynced").unwrap(), Some(b"yes".to_vec()));
assert_eq!(store.get(b"unfsynced").unwrap(), None);
}
#[test]
fn test_crash_and_recover_with_reorder() {
let mut store = SimStorage::new(1);
let mut rng = DetRng::new(42);
store.put(b"durable", b"yes").unwrap();
store.flush().unwrap();
for i in 0..10 {
store
.put(format!("pending-{i}").as_bytes(), b"maybe")
.unwrap();
}
store.crash_and_recover(&mut rng);
assert_eq!(store.get(b"durable").unwrap(), Some(b"yes".to_vec()));
}
#[test]
fn test_wal_crc_verification() {
let entry = WalEntry::new(
0,
WalOp::Put {
key: b"k".to_vec(),
value: b"v".to_vec(),
},
);
assert!(entry.verify());
let mut bad = entry.clone();
bad.crc = 0xDEADBEEF;
assert!(!bad.verify());
}
#[test]
fn test_snapshot() {
let mut store = SimStorage::new(1);
store.put(b"a", b"1").unwrap();
store.put(b"b", b"2").unwrap();
let snap = store.snapshot().unwrap();
assert_eq!(snap.len(), 2);
}
#[test]
fn test_snapshot_failure() {
let mut store = SimStorage::new(1);
store.put(b"key", b"val").unwrap();
store.set_snapshot_failure(true);
assert!(store.snapshot().is_err());
}
#[test]
fn test_silent_corruption() {
let mut store = SimStorage::new(1);
store.put(b"key", b"hello world value").unwrap();
store.set_silent_corrupt_probability(1.0);
let val = store.get(b"key").unwrap().unwrap();
assert_ne!(val, b"hello world value");
}
#[test]
fn test_silent_corruption_zero_probability() {
let mut store = SimStorage::new(1);
store.put(b"key", b"hello").unwrap();
store.set_silent_corrupt_probability(0.0);
let val = store.get(b"key").unwrap().unwrap();
assert_eq!(val, b"hello");
}
#[test]
fn test_write_batch() {
let mut store = SimStorage::new(1);
store
.write_batch(vec![
vortex_core::StorageOp::Put {
key: b"x".to_vec(),
value: b"1".to_vec(),
},
vortex_core::StorageOp::Put {
key: b"y".to_vec(),
value: b"2".to_vec(),
},
])
.unwrap();
assert_eq!(store.get(b"x").unwrap(), Some(b"1".to_vec()));
assert_eq!(store.get(b"y").unwrap(), Some(b"2".to_vec()));
}
}