#![cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "fs_persist")]
use absurder_sql::storage::block_storage::{
BLOCK_SIZE, BlockStorage, CorruptionAction, RecoveryMode, RecoveryOptions,
};
#[cfg(feature = "fs_persist")]
use serial_test::serial;
#[cfg(feature = "fs_persist")]
use std::fs;
#[cfg(feature = "fs_persist")]
use std::path::PathBuf;
#[cfg(feature = "fs_persist")]
use tempfile::TempDir;
#[cfg(feature = "fs_persist")]
#[path = "common/mod.rs"]
mod common;
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_finalize_pending_metadata_when_data_present() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_finalize_pending";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let bid = s.allocate_block().await.expect("alloc");
assert_eq!(bid, 1);
let data_v1 = vec![1u8; BLOCK_SIZE];
s.write_block(bid, data_v1.clone()).await.expect("write v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let data_v2 = vec![2u8; BLOCK_SIZE];
s.write_block(bid, data_v2.clone()).await.expect("write v2");
s.sync().await.expect("sync v2");
let meta_v2 = fs::read_to_string(&meta_path).expect("read meta v2");
fs::rename(&meta_path, &meta_pending_path).expect("rename meta -> pending");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending metadata should be removed (finalized)"
);
let meta_now = fs::read_to_string(&meta_path).expect("read meta after recovery");
assert_eq!(meta_now, meta_v2, "metadata should be finalized to v2");
let read_back = s2.read_block_sync(bid).expect("read block after recovery");
assert_eq!(read_back, data_v2, "block contents should reflect v2");
assert!(blocks_dir.join(format!("block_{}.bin", bid)).exists());
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_rollback_pending_metadata_when_data_missing() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_rollback_pending";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let bid1 = s.allocate_block().await.expect("alloc1");
assert_eq!(bid1, 1);
let data1 = vec![9u8; BLOCK_SIZE];
s.write_block(bid1, data1.clone()).await.expect("write v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let bid2 = s.allocate_block().await.expect("alloc2");
assert_eq!(bid2, 2);
let data2 = vec![7u8; BLOCK_SIZE];
s.write_block(bid2, data2.clone())
.await
.expect("write v2 b2");
s.sync().await.expect("sync v2");
let _meta_v2 = fs::read_to_string(&meta_path).expect("read meta v2");
let b2_path = blocks_dir.join(format!("block_{}.bin", bid2));
assert!(b2_path.exists());
fs::remove_file(&b2_path)
.expect("remove new block file to simulate crash before data persisted");
fs::rename(&meta_path, &meta_pending_path).expect("rename meta -> pending");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending metadata should be removed (rolled back)"
);
let meta_now = fs::read_to_string(&meta_path).expect("read meta after recovery");
assert_eq!(
meta_now, meta_v1,
"metadata should remain at v1 after rollback"
);
assert!(
!blocks_dir.join(format!("block_{}.bin", bid2)).exists(),
"no stray file for missing block"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_rollback_on_malformed_pending_metadata() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_malformed_pending";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let bid = s.allocate_block().await.expect("alloc");
assert_eq!(bid, 1);
let data_v1 = vec![1u8; BLOCK_SIZE];
s.write_block(bid, data_v1.clone()).await.expect("write v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let data_v2 = vec![2u8; BLOCK_SIZE];
s.write_block(bid, data_v2).await.expect("write v2");
s.sync().await.expect("sync v2");
fs::write(&meta_pending_path, b"not-json").expect("write malformed pending");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending metadata should be removed on rollback for malformed file"
);
let meta_now = fs::read_to_string(&meta_path).expect("read meta after recovery");
assert_eq!(
meta_now, meta_v1,
"metadata should remain at v1 after rollback of malformed pending"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_rollback_on_invalid_block_size_in_pending() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_invalid_size_pending";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let bid1 = s.allocate_block().await.expect("alloc1");
assert_eq!(bid1, 1);
let data1 = vec![9u8; BLOCK_SIZE];
s.write_block(bid1, data1.clone()).await.expect("write v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let bid2 = s.allocate_block().await.expect("alloc2");
assert_eq!(bid2, 2);
let data2 = vec![7u8; BLOCK_SIZE];
s.write_block(bid2, data2.clone())
.await
.expect("write v2 b2");
s.sync().await.expect("sync v2");
let b2_path = blocks_dir.join(format!("block_{}.bin", bid2));
assert!(b2_path.exists());
fs::write(&b2_path, vec![0u8; BLOCK_SIZE - 1]).expect("truncate/corrupt block file size");
fs::rename(&meta_path, &meta_pending_path).expect("rename meta -> pending");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending metadata should be removed (rolled back) due to invalid block size"
);
let meta_now = fs::read_to_string(&meta_path).expect("read meta after recovery");
assert_eq!(
meta_now, meta_v1,
"metadata should remain at v1 after rollback"
);
assert!(
!b2_path.exists(),
"invalid-size block file should be removed during reconciliation"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_finalize_pending_atomic_multi_block() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_finalize_atomic_multi";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let b1 = s.allocate_block().await.expect("alloc1");
let b2 = s.allocate_block().await.expect("alloc2");
assert_eq!((b1, b2), (1, 2));
s.write_block(b1, vec![1u8; BLOCK_SIZE])
.await
.expect("write b1 v1");
s.write_block(b2, vec![2u8; BLOCK_SIZE])
.await
.expect("write b2 v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
s.write_block(b1, vec![9u8; BLOCK_SIZE])
.await
.expect("write b1 v2");
s.write_block(b2, vec![8u8; BLOCK_SIZE])
.await
.expect("write b2 v2");
s.sync().await.expect("sync v2");
let meta_v2 = fs::read_to_string(&meta_path).expect("read meta v2");
fs::rename(&meta_path, &meta_pending_path).expect("rename meta -> pending");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending metadata should be removed (finalized)"
);
let meta_now = fs::read_to_string(&meta_path).expect("read meta after recovery");
assert_eq!(
meta_now, meta_v2,
"metadata should be finalized to v2 atomically"
);
let rb1 = s2.read_block_sync(b1).expect("read b1");
let rb2 = s2.read_block_sync(b2).expect("read b2");
assert_eq!(rb1, vec![9u8; BLOCK_SIZE]);
assert_eq!(rb2, vec![8u8; BLOCK_SIZE]);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_finalize_pending_deallocation_removes_stray_file() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_finalize_pending_dealloc";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let b1 = s.allocate_block().await.expect("alloc1");
let b2 = s.allocate_block().await.expect("alloc2");
assert_eq!((b1, b2), (1, 2));
s.write_block(b1, vec![1u8; BLOCK_SIZE])
.await
.expect("write b1 v1");
s.write_block(b2, vec![2u8; BLOCK_SIZE])
.await
.expect("write b2 v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let mut val: serde_json::Value = serde_json::from_str(&meta_v1).expect("parse meta v1");
if let Some(entries) = val.get_mut("entries").and_then(|v| v.as_array_mut()) {
entries.retain(|ent| {
ent.as_array()
.and_then(|arr| arr.first())
.and_then(|v| v.as_u64())
.map(|id| id != b2)
.unwrap_or(true)
});
}
let meta_dealloc = serde_json::to_string(&val).expect("stringify meta_dealloc");
fs::write(&meta_pending_path, meta_dealloc).expect("write pending dealloc");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending should be removed (finalized)"
);
let b2_path = blocks_dir.join(format!("block_{}.bin", b2));
assert!(
!b2_path.exists(),
"stray block file for deallocated b2 should be removed"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_crash_rollback_pending_deallocation_on_invalid_remaining_file() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_crash_rollback_pending_dealloc";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let b1 = s.allocate_block().await.expect("alloc1");
let b2 = s.allocate_block().await.expect("alloc2");
assert_eq!((b1, b2), (1, 2));
s.write_block(b1, vec![1u8; BLOCK_SIZE])
.await
.expect("write b1 v1");
s.write_block(b2, vec![2u8; BLOCK_SIZE])
.await
.expect("write b2 v1");
s.sync().await.expect("sync v1");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v1 = fs::read_to_string(&meta_path).expect("read meta v1");
let mut val: serde_json::Value = serde_json::from_str(&meta_v1).expect("parse meta v1");
if let Some(entries) = val.get_mut("entries").and_then(|v| v.as_array_mut()) {
entries.retain(|ent| {
ent.as_array()
.and_then(|arr| arr.first())
.and_then(|v| v.as_u64())
.map(|id| id == b1)
.unwrap_or(false)
});
}
let meta_dealloc = serde_json::to_string(&val).expect("stringify meta_dealloc");
fs::write(&meta_pending_path, meta_dealloc).expect("write pending dealloc");
let b1_path = blocks_dir.join(format!("block_{}.bin", b1));
assert!(b1_path.exists());
fs::write(&b1_path, vec![0u8; BLOCK_SIZE - 1]).expect("corrupt b1 size");
fs::write(&meta_path, &meta_v1).expect("restore meta v1");
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending should be removed (rolled back)"
);
let meta_now_s = fs::read_to_string(&meta_path).expect("read meta after recovery");
let meta_now: serde_json::Value = serde_json::from_str(&meta_now_s).expect("parse meta after");
let ids: Vec<u64> = meta_now
.get("entries")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|ent| {
ent.as_array()
.and_then(|a| a.first())
.and_then(|v| v.as_u64())
})
.collect::<Vec<u64>>()
})
.unwrap_or_default();
assert!(
ids.contains(&b2),
"rollback should keep b2 present in metadata"
);
assert!(
!b1_path.exists(),
"invalid-sized b1 should be removed during reconciliation"
);
assert!(
!ids.contains(&b1),
"metadata should drop invalid b1 during reconciliation"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial]
#[cfg(feature = "fs_persist")]
async fn test_tombstone_persistence_across_finalize() {
let tmp = TempDir::new().expect("tempdir");
common::set_var("ABSURDERSQL_FS_BASE", tmp.path());
let db = "test_tombstone_persistence_across_finalize";
let mut s = BlockStorage::new_with_capacity(db, 8)
.await
.expect("create storage");
let b1 = s.allocate_block().await.expect("alloc1");
let b2 = s.allocate_block().await.expect("alloc2");
assert_eq!((b1, b2), (1, 2));
s.write_block(b1, vec![1u8; BLOCK_SIZE])
.await
.expect("write b1 v1");
s.write_block(b2, vec![2u8; BLOCK_SIZE])
.await
.expect("write b2 v1");
s.sync().await.expect("sync v1");
s.deallocate_block(b2).await.expect("dealloc b2");
s.sync().await.expect("sync after dealloc");
s.write_block(b1, vec![9u8; BLOCK_SIZE])
.await
.expect("write b1 v2");
s.sync().await.expect("sync v2");
let base: PathBuf = tmp.path().into();
let db_dir = base.join(db);
let blocks_dir = db_dir.join("blocks");
let meta_path = db_dir.join("metadata.json");
let meta_pending_path = db_dir.join("metadata.json.pending");
let meta_v2 = fs::read_to_string(&meta_path).expect("read meta v2");
fs::rename(&meta_path, &meta_pending_path).expect("rename to pending");
fs::write(&meta_path, meta_v2.clone()).expect("restore meta");
let dealloc_path = db_dir.join("deallocated.json");
let dealloc_s = fs::read_to_string(&dealloc_path).expect("read deallocated.json");
let dealloc_v: serde_json::Value = serde_json::from_str(&dealloc_s).expect("parse dealloc v1");
let tombs: Vec<u64> = dealloc_v
.get("tombstones")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|x| x.as_u64()).collect::<Vec<u64>>())
.unwrap_or_default();
assert!(
tombs.contains(&b2),
"tombstone for b2 should exist before restart"
);
drop(s);
let opts = RecoveryOptions {
mode: RecoveryMode::Full,
on_corruption: CorruptionAction::Report,
};
let _s2 = BlockStorage::new_with_recovery_options(db, opts)
.await
.expect("reopen with recovery");
assert!(
!meta_pending_path.exists(),
"pending should be removed (finalized)"
);
let dealloc_s2 = fs::read_to_string(&dealloc_path).expect("read deallocated.json after");
let dealloc_v2: serde_json::Value =
serde_json::from_str(&dealloc_s2).expect("parse dealloc after");
let tombs2: Vec<u64> = dealloc_v2
.get("tombstones")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|x| x.as_u64()).collect::<Vec<u64>>())
.unwrap_or_default();
assert!(
tombs2.contains(&b2),
"tombstone for b2 should persist across finalize"
);
let b2_path = blocks_dir.join(format!("block_{}.bin", b2));
assert!(
!b2_path.exists(),
"deallocated block file should remain deleted"
);
}