use std::fs;
use std::path::{Path, PathBuf};
use stoolap::Database;
use tempfile::{tempdir, TempDir};
const WAL_ENTRY_MAGIC: u32 = 0x454C4157;
const WAL_HEADER_SIZE: usize = 32;
const CRC_SIZE: usize = 4;
const COMPRESSED_FLAG: u8 = 0x01;
const COMMIT_MARKER_FLAG: u8 = 0x02;
const _CHECKPOINT_MAGIC: u32 = 0x43504F49;
#[derive(Debug, Clone)]
struct WalEntryInfo {
offset: usize,
total_size: usize,
_lsn: u64,
flags: u8,
entry_size: usize,
data_offset: usize,
crc_offset: usize,
}
struct TestFixture {
_dir: TempDir,
db_path: PathBuf,
dsn: String,
}
fn find_wal_files(db_path: &Path) -> Vec<PathBuf> {
let wal_dir = db_path.join("wal");
if !wal_dir.exists() {
return Vec::new();
}
let mut files: Vec<PathBuf> = fs::read_dir(&wal_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
let name = e.file_name().to_string_lossy().to_string();
(name.starts_with("wal-") || name.starts_with("wal_")) && name.ends_with(".log")
})
.map(|e| e.path())
.collect();
files.sort();
files
}
fn find_checkpoint_file(db_path: &Path) -> Option<PathBuf> {
let path = db_path.join("wal").join("checkpoint.meta");
if path.exists() {
Some(path)
} else {
None
}
}
fn find_entry_boundaries(data: &[u8]) -> Vec<WalEntryInfo> {
let mut entries = Vec::new();
let mut pos = 0;
while pos + WAL_HEADER_SIZE <= data.len() {
if pos + 4 > data.len() {
break;
}
let magic = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
if magic != WAL_ENTRY_MAGIC {
pos += 1;
continue;
}
let flags = data[pos + 5];
let header_size = u16::from_le_bytes(data[pos + 6..pos + 8].try_into().unwrap()) as usize;
let lsn = u64::from_le_bytes(data[pos + 8..pos + 16].try_into().unwrap());
let entry_size = u32::from_le_bytes(data[pos + 24..pos + 28].try_into().unwrap()) as usize;
if entry_size > 64 * 1024 * 1024 || header_size < WAL_HEADER_SIZE {
pos += 1;
continue;
}
let data_offset = pos + header_size;
let total_data_with_crc = entry_size + CRC_SIZE;
let total_size = header_size + total_data_with_crc;
if pos + total_size > data.len() {
break;
}
let crc_offset = data_offset + entry_size;
entries.push(WalEntryInfo {
offset: pos,
total_size,
_lsn: lsn,
flags,
entry_size,
data_offset,
crc_offset,
});
pos += total_size;
}
entries
}
fn zero_range(data: &mut [u8], offset: usize, len: usize) {
let end = (offset + len).min(data.len());
for byte in &mut data[offset..end] {
*byte = 0;
}
}
fn flip_bit(data: &mut [u8], byte_offset: usize, bit_pos: u8) {
if byte_offset < data.len() {
data[byte_offset] ^= 1 << bit_pos;
}
}
fn zero_page(data: &mut [u8], page_num: usize) {
let page_size = 4096;
let start = page_num * page_size;
let end = (start + page_size).min(data.len());
if start < data.len() {
for byte in &mut data[start..end] {
*byte = 0;
}
}
}
fn remove_lock_file(db_path: &Path) {
let lock_file = db_path.join("db.lock");
let _ = fs::remove_file(lock_file);
}
fn setup_test_db(num_txns: usize, rows_per_txn: usize) -> TestFixture {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
let mut id = 1;
for txn in 0..num_txns {
for row in 0..rows_per_txn {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'txn{}_row{}', {})",
id, txn, row, txn
),
(),
)
.unwrap();
id += 1;
}
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert_eq!(count, (num_txns * rows_per_txn) as i64);
}
remove_lock_file(&db_path);
TestFixture {
_dir: dir,
db_path,
dsn,
}
}
fn setup_test_db_large_rows(num_txns: usize, rows_per_txn: usize) -> TestFixture {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
let large_prefix = "A".repeat(200);
let mut id = 1;
for txn in 0..num_txns {
for row in 0..rows_per_txn {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, '{}_txn{}_row{}', {})",
id, large_prefix, txn, row, txn
),
(),
)
.unwrap();
id += 1;
}
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert_eq!(count, (num_txns * rows_per_txn) as i64);
}
remove_lock_file(&db_path);
TestFixture {
_dir: dir,
db_path,
dsn,
}
}
fn verify_recovery_at_least(fixture: &TestFixture, table: &str, min_rows: i64) {
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one(&format!("SELECT COUNT(*) FROM {}", table), ());
match result {
Ok(count) => {
assert!(
count >= min_rows,
"Expected at least {} rows, got {} after recovery",
min_rows,
count
);
let new_id = count + 10000;
db.execute(
&format!(
"INSERT INTO {} (id, value, seq) VALUES ({}, 'post_recovery', 999)",
table, new_id
),
(),
)
.unwrap();
let new_count: i64 = db
.query_one(&format!("SELECT COUNT(*) FROM {}", table), ())
.unwrap();
assert_eq!(new_count, count + 1);
}
Err(_) => {
assert_eq!(
min_rows, 0,
"Table '{}' doesn't exist after recovery, but expected at least {} rows",
table, min_rows
);
}
}
}
fn verify_recovery_exact(fixture: &TestFixture, table: &str, exact_rows: i64) {
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db
.query_one(&format!("SELECT COUNT(*) FROM {}", table), ())
.unwrap();
assert_eq!(
count, exact_rows,
"Expected exactly {} rows, got {} after recovery",
exact_rows, count
);
let new_id = exact_rows + 10000;
db.execute(
&format!(
"INSERT INTO {} (id, value, seq) VALUES ({}, 'post_recovery', 999)",
table, new_id
),
(),
)
.unwrap();
}
fn torn_write_test(cut_into_last: usize) {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty(), "No WAL files found");
let wal_path = &wal_files[wal_files.len() - 1]; let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(
entries.len() >= 2,
"Need at least 2 entries, found {}",
entries.len()
);
let last = &entries[entries.len() - 1];
let truncate_at = last.offset + cut_into_last;
let truncated = &data[..truncate_at.min(data.len())];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_torn_write_partial_header_1_byte() {
torn_write_test(1);
}
#[test]
fn test_torn_write_partial_header_16_bytes() {
torn_write_test(16);
}
#[test]
fn test_torn_write_partial_header_31_bytes() {
torn_write_test(31);
}
#[test]
fn test_torn_write_after_header_no_data() {
torn_write_test(WAL_HEADER_SIZE);
}
#[test]
fn test_torn_write_partial_data() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2);
let last = &entries[entries.len() - 1];
let half_data = last.entry_size / 2;
let truncate_at = last.data_offset + half_data;
let truncated = &data[..truncate_at.min(data.len())];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_torn_write_after_data_no_crc() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2);
let last = &entries[entries.len() - 1];
let truncate_at = last.crc_offset; let truncated = &data[..truncate_at.min(data.len())];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_torn_write_partial_crc_1_byte() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2);
let last = &entries[entries.len() - 1];
let truncate_at = last.crc_offset + 1; let truncated = &data[..truncate_at.min(data.len())];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_torn_write_partial_crc_3_bytes() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2);
let last = &entries[entries.len() - 1];
let truncate_at = last.crc_offset + 3; let truncated = &data[..truncate_at.min(data.len())];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_sector_loss_first_page() {
let fixture = setup_test_db(20, 5);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
if data.len() > 4096 {
zero_page(&mut data, 0);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM test_data", ());
if let Ok(count) = result {
let new_id = count + 10000;
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'post_recovery', 999)",
new_id
),
(),
)
.unwrap();
}
}
#[test]
fn test_sector_loss_middle_page() {
let fixture = setup_test_db(20, 5);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let num_pages = data.len() / 4096;
if num_pages >= 3 {
let middle = num_pages / 2;
zero_page(&mut data, middle);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_sector_loss_last_page() {
let fixture = setup_test_db(20, 5);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let num_pages = data.len() / 4096;
if num_pages >= 2 {
let last = num_pages - 1;
zero_page(&mut data, last);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_sector_loss_alternating_pages() {
let fixture = setup_test_db(20, 5);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let num_pages = data.len() / 4096;
for page in (1..num_pages).step_by(2) {
zero_page(&mut data, page);
}
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_bit_flip_magic_bytes() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let target = &entries[entries.len() / 2];
flip_bit(&mut data, target.offset, 0); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_bit_flip_crc() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let target = &entries[entries.len() / 2];
flip_bit(&mut data, target.crc_offset, 3); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_bit_flip_data_portion() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let target = &entries[entries.len() / 2];
let data_mid = target.data_offset + target.entry_size / 2;
flip_bit(&mut data, data_mid, 5);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_bit_flip_entry_size() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let target = &entries[entries.len() / 2];
flip_bit(&mut data, target.offset + 24, 7); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_bit_flip_flags_compressed() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let target = &entries[entries.len() / 2];
data[target.offset + 5] |= COMPRESSED_FLAG;
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_checkpoint_corrupt_magic() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
let mut data = fs::read(&cp_path).unwrap();
if data.len() >= 4 {
zero_range(&mut data, 0, 4); fs::write(&cp_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_checkpoint_corrupt_crc() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
let mut data = fs::read(&cp_path).unwrap();
if data.len() >= 4 {
let crc_offset = data.len() - 4;
flip_bit(&mut data, crc_offset, 0); fs::write(&cp_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_checkpoint_truncated() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
let data = fs::read(&cp_path).unwrap();
if data.len() > 10 {
fs::write(&cp_path, &data[..10]).unwrap(); }
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_checkpoint_zeroed() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
let data = fs::read(&cp_path).unwrap();
let zeroed = vec![0u8; data.len()];
fs::write(&cp_path, &zeroed).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_checkpoint_deleted() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
fs::remove_file(&cp_path).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
fn find_dml_data_entries(entries: &[WalEntryInfo]) -> Vec<usize> {
entries
.iter()
.enumerate()
.filter(|(_, e)| {
(e.flags & COMMIT_MARKER_FLAG) == 0 && (e.flags & 0x04) == 0
})
.map(|(i, _)| i)
.collect()
}
fn find_commit_entries(entries: &[WalEntryInfo]) -> Vec<usize> {
entries
.iter()
.enumerate()
.filter(|(_, e)| (e.flags & COMMIT_MARKER_FLAG) != 0)
.map(|(i, _)| i)
.collect()
}
#[test]
fn test_first_data_entry_corrupt() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
if !dml_entries.is_empty() {
let idx = dml_entries[0];
let target = &entries[idx];
zero_range(&mut data, target.offset, 4); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_last_entry_corrupt() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(!entries.is_empty());
let last = &entries[entries.len() - 1];
let truncated = &data[..last.offset];
fs::write(wal_path, truncated).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_middle_entry_corrupt() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid_idx = entries.len() / 2;
let target = &entries[mid_idx];
zero_range(&mut data, target.offset, 4);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_commit_marker_destroyed() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries = find_commit_entries(&entries);
if !commit_entries.is_empty() {
let last_commit_idx = commit_entries[commit_entries.len() - 1];
let target = &entries[last_commit_idx];
zero_range(&mut data, target.offset, target.total_size);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_data_entries_destroyed_commit_intact() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
if dml_entries.len() >= 3 {
for &idx in &dml_entries[dml_entries.len() - 3..] {
let target = &entries[idx];
zero_range(&mut data, target.offset, 4); }
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_truncation_crash_bak_exists() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[0];
let bak_path = wal_path.with_extension("log.bak");
fs::rename(wal_path, &bak_path).unwrap();
let remaining_wals = find_wal_files(&fixture.db_path);
assert!(
remaining_wals.is_empty() || !remaining_wals.contains(wal_path),
"Original WAL file should be gone"
);
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert!(
count > 0,
"Should recover data from .bak file, got {} rows",
count
);
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'post_recovery', 999)",
count + 10000
),
(),
)
.unwrap();
}
#[test]
fn test_truncation_crash_temp_and_bak() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_dir = fixture.db_path.join("wal");
let temp_path = wal_dir.join("wal-temp-20250101120000.log");
fs::write(&temp_path, b"temporary data").unwrap();
let wal_path = &wal_files[0];
let bak_path = wal_path.with_extension("log.bak");
fs::copy(wal_path, &bak_path).unwrap();
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert_eq!(count, 15);
assert!(!temp_path.exists(), "Temp file should have been cleaned up");
}
#[test]
fn test_truncation_crash_temp_only() {
let fixture = setup_test_db(5, 3);
let wal_dir = fixture.db_path.join("wal");
let temp_path = wal_dir.join("wal-temp-20250101120000.log");
fs::write(&temp_path, b"temporary data").unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
assert!(!temp_path.exists(), "Temp file should have been cleaned up");
}
#[test]
fn test_lsn_gap_in_sequence() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 5 {
let remove_idx = entries.len() / 2;
let target = &entries[remove_idx];
let before = &data[..target.offset];
let after = &data[target.offset + target.total_size..];
let mut spliced = Vec::with_capacity(before.len() + after.len());
spliced.extend_from_slice(before);
spliced.extend_from_slice(after);
fs::write(wal_path, &spliced).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_duplicate_entry_in_wal() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let dup_idx = entries.len() / 2;
let target = &entries[dup_idx];
let entry_bytes = &data[target.offset..target.offset + target.total_size];
let mut new_data = data.clone();
new_data.extend_from_slice(entry_bytes);
fs::write(wal_path, &new_data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert!(count >= 0, "Recovery should produce a consistent state");
}
#[test]
fn test_recovery_without_checkpoint() {
let fixture = setup_test_db(5, 3);
if let Some(cp_path) = find_checkpoint_file(&fixture.db_path) {
fs::remove_file(&cp_path).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_corrupt_compressed_payload() {
let fixture = setup_test_db_large_rows(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let compressed: Vec<usize> = entries
.iter()
.enumerate()
.filter(|(_, e)| (e.flags & COMPRESSED_FLAG) != 0)
.map(|(i, _)| i)
.collect();
if !compressed.is_empty() {
let target_idx = compressed[compressed.len() / 2];
let target = &entries[target_idx];
let data_mid = target.data_offset + target.entry_size / 2;
if data_mid < data.len() {
data[data_mid] ^= 0xFF;
data[data_mid.saturating_sub(1)] ^= 0xAA;
let data_start = target.data_offset;
let data_end = target.crc_offset;
let new_crc = crc32fast::hash(&data[data_start..data_end]);
data[target.crc_offset..target.crc_offset + 4].copy_from_slice(&new_crc.to_le_bytes());
fs::write(wal_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_force_compressed_flag_on_uncompressed() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let uncompressed: Vec<usize> = entries
.iter()
.enumerate()
.filter(|(_, e)| (e.flags & COMPRESSED_FLAG) == 0 && (e.flags & COMMIT_MARKER_FLAG) == 0)
.map(|(i, _)| i)
.collect();
if uncompressed.len() >= 2 {
let target_idx = uncompressed[uncompressed.len() / 2];
let target = &entries[target_idx];
data[target.offset + 5] |= COMPRESSED_FLAG;
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_clear_compressed_flag_on_compressed() {
let fixture = setup_test_db_large_rows(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let compressed: Vec<usize> = entries
.iter()
.enumerate()
.filter(|(_, e)| (e.flags & COMPRESSED_FLAG) != 0)
.map(|(i, _)| i)
.collect();
if !compressed.is_empty() {
let target_idx = compressed[compressed.len() / 2];
let target = &entries[target_idx];
data[target.offset + 5] &= !COMPRESSED_FLAG;
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_combined_torn_write_plus_bit_flip() {
let fixture = setup_test_db(10, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 5 {
let mid = entries.len() / 2;
let target = &entries[mid];
flip_bit(&mut data, target.data_offset + 5, 2);
}
if entries.len() >= 2 {
let last = &entries[entries.len() - 1];
let truncate_at = last.offset + WAL_HEADER_SIZE / 2;
data.truncate(truncate_at);
}
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_many_transactions_last_commit_corrupt() {
let fixture = setup_test_db(20, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries = find_commit_entries(&entries);
if !commit_entries.is_empty() {
let last_commit_idx = commit_entries[commit_entries.len() - 1];
let target = &entries[last_commit_idx];
zero_range(&mut data, target.offset, target.total_size);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_recovery_then_new_data_then_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
let mut id = 1;
for txn in 0..5 {
for row in 0..3 {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'txn{}_row{}', {})",
id, txn, row, txn
),
(),
)
.unwrap();
id += 1;
}
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert_eq!(count, 15);
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 2 {
let last = &entries[entries.len() - 1];
let truncated = &data[..last.offset + 10]; fs::write(wal_path, truncated).unwrap();
}
remove_lock_file(&db_path);
let initial_count;
{
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
initial_count = count;
for i in 0..5 {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'new_data_{}', 100)",
10000 + i,
i
),
(),
)
.unwrap();
}
let after_insert: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert_eq!(after_insert, initial_count + 5);
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let final_count: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert!(
final_count >= initial_count,
"Post-recovery count ({}) should be at least initial count ({})",
final_count,
initial_count
);
db.execute(
"INSERT INTO test_data (id, value, seq) VALUES (99999, 'final_test', 200)",
(),
)
.unwrap();
}
}
#[test]
fn test_complete_wal_destruction_with_checkpoint() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'checkpoint_data_{}', 1)",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO test_data (id, value, seq) VALUES ({}, 'post_checkpoint_{}', 2)",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
for wal_path in &wal_files {
let data = fs::read(wal_path).unwrap();
let zeroed = vec![0u8; data.len()];
fs::write(wal_path, &zeroed).unwrap();
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM test_data", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have consistent state after recovery");
}
Err(_) => {
}
}
}
#[test]
fn test_empty_wal_file() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
fs::write(wal_path, []).unwrap();
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let _result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM test_data", ());
}
#[test]
fn test_wal_file_with_only_header_garbage() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let garbage = vec![0xDE; 32];
fs::write(wal_path, &garbage).unwrap();
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let _result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM test_data", ());
}
#[test]
fn test_wal_magic_at_very_end_of_file() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(!entries.is_empty());
if entries.len() >= 2 {
let second_to_last = &entries[entries.len() - 2];
let end_of_stl = second_to_last.offset + second_to_last.total_size;
data.truncate(end_of_stl);
data.extend_from_slice(&WAL_ENTRY_MAGIC.to_le_bytes());
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_multiple_wal_files_one_corrupt() {
let fixture = setup_test_db(20, 5);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
if wal_files.len() >= 2 {
let mut data = fs::read(&wal_files[0]).unwrap();
zero_range(&mut data, 0, 100);
fs::write(&wal_files[0], &data).unwrap();
} else {
let mut data = fs::read(&wal_files[0]).unwrap();
if data.len() > 4096 {
let mid = data.len() / 2;
zero_range(&mut data, mid, 100);
fs::write(&wal_files[0], &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_all_commit_markers_destroyed() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries = find_commit_entries(&entries);
for &idx in &commit_entries {
let target = &entries[idx];
zero_range(&mut data, target.offset, target.total_size);
}
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM test_data", ());
match result {
Ok(count) => {
assert!(count >= 0, "Count should be non-negative");
}
Err(_) => {
}
}
}
#[test]
fn test_repeated_recovery_idempotent() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = entries.len() / 2;
let target = &entries[mid];
flip_bit(&mut data, target.crc_offset, 0);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let count1;
{
let db = Database::open(&fixture.dsn).unwrap();
let c: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
count1 = c;
}
remove_lock_file(&fixture.db_path);
{
let db = Database::open(&fixture.dsn).unwrap();
let count2: i64 = db.query_one("SELECT COUNT(*) FROM test_data", ()).unwrap();
assert!(
count2 >= count1,
"Second recovery ({}) should have at least as many rows as first ({})",
count2,
count1
);
}
}
#[test]
fn test_concurrent_corruption_patterns() {
let fixture = setup_test_db(10, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 8 {
let target1 = &entries[2];
flip_bit(&mut data, target1.crc_offset, 4);
let target2 = entries[4].clone();
zero_range(&mut data, target2.offset, 4);
let target3 = &entries[6];
data[target3.offset + 5] |= COMPRESSED_FLAG;
}
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_wal_with_garbage_appended() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let garbage: Vec<u8> = (0..1024).map(|i| ((i * 7 + 13) % 256) as u8).collect();
data.extend_from_slice(&garbage);
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_exact(&fixture, "test_data", 15);
}
#[test]
fn test_wal_entry_size_zero() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = entries.len() / 2;
let target = &entries[mid];
data[target.offset + 24] = 0;
data[target.offset + 25] = 0;
data[target.offset + 26] = 0;
data[target.offset + 27] = 0;
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_wal_entry_size_very_large() {
let fixture = setup_test_db(5, 3);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = entries.len() / 2;
let target = &entries[mid];
data[target.offset + 24] = 0;
data[target.offset + 25] = 0;
data[target.offset + 26] = 0;
data[target.offset + 27] = 0x10;
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
fn setup_multi_table_db() -> TestFixture {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL, email TEXT)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE orders (id INTEGER PRIMARY KEY, user_id INTEGER, amount FLOAT)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT NOT NULL, price FLOAT)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO users (id, name, email) VALUES ({}, 'user{}', 'user{}@test.com')",
i, i, i
),
(),
)
.unwrap();
}
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO orders (id, user_id, amount) VALUES ({}, {}, {})",
i,
(i % 10) + 1,
i as f64 * 9.99
),
(),
)
.unwrap();
}
for i in 1..=15 {
db.execute(
&format!(
"INSERT INTO products (id, name, price) VALUES ({}, 'product{}', {})",
i,
i,
i as f64 * 5.50
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
TestFixture {
_dir: dir,
db_path,
dsn,
}
}
#[test]
fn test_multi_table_corrupt_middle_entry() {
let fixture = setup_multi_table_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 5 {
let mid = entries.len() / 2;
let target = &entries[mid];
flip_bit(&mut data, target.crc_offset, 0);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let users: i64 = db.query_one("SELECT COUNT(*) FROM users", ()).unwrap();
let orders: i64 = db.query_one("SELECT COUNT(*) FROM orders", ()).unwrap();
let products: i64 = db.query_one("SELECT COUNT(*) FROM products", ()).unwrap();
let total = users + orders + products;
assert!(
total >= 40,
"Expected at least 40 total rows across tables, got {} (users={}, orders={}, products={})",
total,
users,
orders,
products
);
db.execute(
"INSERT INTO users (id, name, email) VALUES (999, 'new_user', 'new@test.com')",
(),
)
.unwrap();
db.execute(
"INSERT INTO orders (id, user_id, amount) VALUES (999, 1, 100.0)",
(),
)
.unwrap();
db.execute(
"INSERT INTO products (id, name, price) VALUES (999, 'new_product', 50.0)",
(),
)
.unwrap();
}
#[test]
fn test_multi_table_one_tables_commit_destroyed() {
let fixture = setup_multi_table_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries = find_commit_entries(&entries);
if commit_entries.len() >= 3 {
let mid_commit = commit_entries[commit_entries.len() / 2];
let target = &entries[mid_commit];
zero_range(&mut data, target.offset, target.total_size);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let users: i64 = db.query_one("SELECT COUNT(*) FROM users", ()).unwrap();
let orders: i64 = db.query_one("SELECT COUNT(*) FROM orders", ()).unwrap();
let products: i64 = db.query_one("SELECT COUNT(*) FROM products", ()).unwrap();
let total = users + orders + products;
assert!(
total >= 30,
"Expected at least 30 total rows, got {} (users={}, orders={}, products={})",
total,
users,
orders,
products
);
}
#[test]
fn test_multi_table_first_page_zeroed() {
let fixture = setup_multi_table_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
if data.len() > 4096 {
zero_page(&mut data, 0);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let mut surviving_tables = 0;
for table in &["users", "orders", "products"] {
let result: Result<i64, _> = db.query_one(&format!("SELECT COUNT(*) FROM {}", table), ());
if result.is_ok() {
surviving_tables += 1;
}
}
assert!(surviving_tables >= 0); }
#[test]
fn test_multi_table_cross_table_join_after_recovery() {
let fixture = setup_multi_table_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 2 {
let last = &entries[entries.len() - 1];
let truncated = &data[..last.offset];
fs::write(wal_path, truncated).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result = db.query(
"SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id ORDER BY o.id LIMIT 5",
(),
);
match result {
Ok(rows) => {
let collected: Vec<_> = rows.filter_map(|r| r.ok()).collect();
assert!(collected.len() <= 100, "Sanity check: not too many rows");
}
Err(_) => {
}
}
}
fn setup_indexed_db() -> TestFixture {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE indexed_data (id INTEGER PRIMARY KEY, category TEXT NOT NULL, value FLOAT, active BOOLEAN)",
(),
)
.unwrap();
db.execute("CREATE INDEX idx_value ON indexed_data(value)", ())
.unwrap();
db.execute("CREATE INDEX idx_category ON indexed_data(category)", ())
.unwrap();
for i in 1..=50 {
let category = format!("cat{}", i % 5);
let active = i % 2 == 0;
db.execute(
&format!(
"INSERT INTO indexed_data (id, category, value, active) VALUES ({}, '{}', {}, {})",
i, category, i as f64 * 1.5, active
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
TestFixture {
_dir: dir,
db_path,
dsn,
}
}
#[test]
fn test_index_recovery_after_torn_write() {
let fixture = setup_indexed_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 2 {
let last = &entries[entries.len() - 1];
let truncated = &data[..last.offset];
fs::write(wal_path, truncated).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data WHERE value > 50.0", ())
.unwrap();
assert!(count >= 0, "Index query should work");
let count2: i64 = db
.query_one(
"SELECT COUNT(*) FROM indexed_data WHERE category = 'cat1'",
(),
)
.unwrap();
assert!(count2 >= 0, "Hash index query should work");
}
#[test]
fn test_index_recovery_after_bit_corruption() {
let fixture = setup_indexed_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 5 {
let mid = entries.len() / 2;
let target = &entries[mid];
flip_bit(&mut data, target.data_offset + 10, 3);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let total: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data", ())
.unwrap();
let indexed: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data WHERE value > 0.0", ())
.unwrap();
assert!(
indexed <= total,
"Index query ({}) should never return more rows than total ({})",
indexed,
total
);
}
#[test]
fn test_index_consistency_after_corruption() {
let fixture = setup_indexed_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = entries.len() / 2;
let target = &entries[mid];
zero_range(&mut data, target.offset, 4); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let total: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data", ())
.unwrap();
let cat_total: i64 = db
.query_one(
"SELECT COUNT(*) FROM indexed_data WHERE category IN ('cat0', 'cat1', 'cat2', 'cat3', 'cat4')",
(),
)
.unwrap();
assert_eq!(
total, cat_total,
"Index scan over all categories ({}) should match full scan ({})",
cat_total, total
);
db.execute(
"INSERT INTO indexed_data (id, category, value, active) VALUES (999, 'cat_new', 999.0, true)",
(),
)
.unwrap();
let after: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data", ())
.unwrap();
assert_eq!(after, total + 1);
}
#[test]
fn test_index_ddl_corrupt_but_data_intact() {
let fixture = setup_indexed_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
if dml_entries.len() >= 2 && entries.len() > dml_entries[0] {
let ddl_like: Vec<usize> = entries
.iter()
.enumerate()
.filter(|(i, e)| {
(e.flags & COMMIT_MARKER_FLAG) == 0
&& (e.flags & 0x04) == 0
&& !dml_entries.contains(i)
})
.map(|(i, _)| i)
.collect();
for &idx in ddl_like.iter().skip(1).take(2) {
let target = &entries[idx];
flip_bit(&mut data, target.crc_offset, 5);
}
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data", ())
.unwrap();
assert!(count >= 0, "Data should be accessible even without indexes");
let result: i64 = db
.query_one("SELECT COUNT(*) FROM indexed_data WHERE value > 50.0", ())
.unwrap();
assert!(result >= 0);
}
#[test]
fn test_multi_column_index_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE orders (id INTEGER PRIMARY KEY, customer_id INTEGER, order_date TEXT, amount FLOAT)",
(),
)
.unwrap();
db.execute(
"CREATE INDEX idx_cust_date ON orders(customer_id, order_date)",
(),
)
.unwrap();
for i in 1..=30 {
db.execute(
&format!(
"INSERT INTO orders (id, customer_id, order_date, amount) VALUES ({}, {}, '2024-01-{:02}', {})",
i, i % 5, (i % 28) + 1, i as f64 * 10.0
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = entries.len() / 2;
let target = &entries[mid];
flip_bit(&mut data, target.data_offset + 5, 1);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&db_path);
let fixture = TestFixture {
_dir: dir,
db_path,
dsn,
};
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db
.query_one(
"SELECT COUNT(*) FROM orders WHERE customer_id = 1 AND order_date = '2024-01-01'",
(),
)
.unwrap();
assert!(count >= 0);
db.execute(
"INSERT INTO orders (id, customer_id, order_date, amount) VALUES (999, 1, '2024-12-25', 500.0)",
(),
)
.unwrap();
}
fn setup_explicit_txn_db() -> TestFixture {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE txn_data (id INTEGER PRIMARY KEY, value TEXT NOT NULL, batch INTEGER)",
(),
)
.unwrap();
for batch in 0..5 {
db.execute("BEGIN", ()).unwrap();
for row in 0..4 {
let id = batch * 4 + row + 1;
db.execute(
&format!(
"INSERT INTO txn_data (id, value, batch) VALUES ({}, 'batch{}_row{}', {})",
id, batch, row, batch
),
(),
)
.unwrap();
}
db.execute("COMMIT", ()).unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM txn_data", ()).unwrap();
assert_eq!(count, 20);
}
remove_lock_file(&db_path);
TestFixture {
_dir: dir,
db_path,
dsn,
}
}
#[test]
fn test_explicit_txn_one_insert_corrupt() {
let fixture = setup_explicit_txn_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
if dml_entries.len() >= 10 {
let target_idx = dml_entries[dml_entries.len() / 2];
let target = &entries[target_idx];
flip_bit(&mut data, target.data_offset + 8, 4); fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM txn_data", ()).unwrap();
assert!(
count >= 15,
"Expected at least 15 rows (lost at most a few from corruption), got {}",
count
);
db.execute(
"INSERT INTO txn_data (id, value, batch) VALUES (999, 'post_recovery', 99)",
(),
)
.unwrap();
}
#[test]
fn test_explicit_txn_commit_marker_corrupt() {
let fixture = setup_explicit_txn_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries = find_commit_entries(&entries);
if commit_entries.len() >= 5 {
let target_idx = commit_entries[2]; let target = &entries[target_idx];
zero_range(&mut data, target.offset, target.total_size);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM txn_data", ()).unwrap();
assert!(
count >= 12,
"Expected at least 12 rows (lost 1 txn of 4 rows + possible side effects), got {}",
count
);
for batch in 0..5 {
let batch_count: i64 = db
.query_one(
&format!("SELECT COUNT(*) FROM txn_data WHERE batch = {}", batch),
(),
)
.unwrap();
assert!(
batch_count == 0 || batch_count == 4,
"Batch {} has {} rows — expected 0 (aborted) or 4 (committed)",
batch,
batch_count
);
}
}
#[test]
fn test_explicit_txn_all_inserts_in_one_txn_corrupt() {
let fixture = setup_explicit_txn_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
let commit_entries = find_commit_entries(&entries);
if dml_entries.len() >= 12 && commit_entries.len() >= 3 {
for &idx in &dml_entries[8..12] {
let target = &entries[idx];
zero_range(&mut data, target.offset, 4); }
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM txn_data", ()).unwrap();
assert!(
count >= 12,
"Expected at least 12 rows (4 batches intact minus possible losses), got {}",
count
);
}
#[test]
fn test_explicit_txn_interleaved_corruption() {
let fixture = setup_explicit_txn_db();
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries = find_dml_data_entries(&entries);
if dml_entries.len() >= 16 {
let target = &entries[dml_entries[5]];
flip_bit(&mut data, target.crc_offset, 2);
let target2 = entries[dml_entries[13]].clone();
flip_bit(&mut data, target2.crc_offset, 6);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM txn_data", ()).unwrap();
assert!(
count >= 16,
"Expected at least 16 rows (lost 2 individual rows), got {}",
count
);
}
fn find_volume_files(db_path: &Path, table: &str) -> Vec<PathBuf> {
let vol_dir = db_path.join("volumes").join(table);
if !vol_dir.exists() {
return Vec::new();
}
let mut files: Vec<PathBuf> = fs::read_dir(&vol_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
let name = e.file_name().to_string_lossy().to_string();
name.starts_with("vol_") && name.ends_with(".vol")
})
.map(|e| e.path())
.collect();
files.sort();
files
}
fn find_manifest_file(db_path: &Path, table: &str) -> Option<PathBuf> {
let path = db_path.join("volumes").join(table).join("manifest.bin");
if path.exists() {
Some(path)
} else {
None
}
}
fn setup_db_with_volumes() -> (TestFixture, i64) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
let pre_checkpoint_count;
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE vol_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL, phase INTEGER)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO vol_test (id, value, phase) VALUES ({}, 'sealed_{}', 1)",
i, i
),
(),
)
.unwrap();
}
pre_checkpoint_count = 20;
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=40 {
db.execute(
&format!(
"INSERT INTO vol_test (id, value, phase) VALUES ({}, 'wal_only_{}', 2)",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
(
TestFixture {
_dir: dir,
db_path,
dsn,
},
pre_checkpoint_count,
)
}
#[test]
fn test_volume_corrupt_with_valid_wal() {
let (fixture, _) = setup_db_with_volumes();
let vol_files = find_volume_files(&fixture.db_path, "vol_test");
for vol_path in &vol_files {
let mut data = fs::read(vol_path).unwrap();
if data.len() > 10 {
zero_range(&mut data, 0, 4);
fs::write(vol_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM vol_test", ());
match result {
Ok(count) => {
assert!(
count >= 0,
"Should have non-negative row count, got {}",
count
);
db.execute(
"INSERT INTO vol_test (id, value, phase) VALUES (999, 'post_recovery', 3)",
(),
)
.unwrap();
}
Err(_) => {
}
}
}
#[test]
fn test_volume_deleted_with_valid_wal() {
let (fixture, _) = setup_db_with_volumes();
let vol_files = find_volume_files(&fixture.db_path, "vol_test");
assert!(
!vol_files.is_empty(),
"PRAGMA CHECKPOINT should have created volume files"
);
for vol_path in &vol_files {
let _ = fs::remove_file(vol_path);
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM vol_test", ());
match result {
Ok(count) => {
assert!(
count >= 0,
"Should have non-negative row count, got {}",
count
);
}
Err(_) => {
}
}
}
#[test]
fn test_volume_valid_wal_corrupt() {
let (fixture, pre_count) = setup_db_with_volumes();
let wal_files = find_wal_files(&fixture.db_path);
if !wal_files.is_empty() {
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
if data.len() > 200 {
let corrupt_start = data.len() * 3 / 4;
let corrupt_len = 200.min(data.len() - corrupt_start);
zero_range(&mut data, corrupt_start, corrupt_len);
fs::write(wal_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM vol_test", ()).unwrap();
assert!(
count >= pre_count,
"Expected at least {} rows (volume data), got {}",
pre_count,
count
);
db.execute(
"INSERT INTO vol_test (id, value, phase) VALUES (999, 'post_recovery', 3)",
(),
)
.unwrap();
}
#[test]
fn test_manifest_corrupt_with_valid_volumes() {
let (fixture, _) = setup_db_with_volumes();
if let Some(manifest_path) = find_manifest_file(&fixture.db_path, "vol_test") {
let mut data = fs::read(&manifest_path).unwrap();
if data.len() > 10 {
zero_range(&mut data, 0, 4);
fs::write(&manifest_path, &data).unwrap();
}
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM vol_test", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative row count");
db.execute(
"INSERT INTO vol_test (id, value, phase) VALUES (999, 'post_recovery', 3)",
(),
)
.unwrap();
}
Err(_) => {
}
}
}
#[test]
fn test_manifest_deleted() {
let (fixture, _) = setup_db_with_volumes();
if let Some(manifest_path) = find_manifest_file(&fixture.db_path, "vol_test") {
fs::remove_file(&manifest_path).unwrap();
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM vol_test", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative row count");
}
Err(_) => {
}
}
}
#[test]
fn test_volume_truncated() {
let (fixture, _) = setup_db_with_volumes();
let vol_files = find_volume_files(&fixture.db_path, "vol_test");
for vol_path in &vol_files {
let data = fs::read(vol_path).unwrap();
if data.len() > 100 {
fs::write(vol_path, &data[..data.len() / 4]).unwrap();
}
}
remove_lock_file(&fixture.db_path);
let db = Database::open(&fixture.dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM vol_test", ());
match result {
Ok(count) => {
assert!(count >= 0);
}
Err(_) => {
}
}
}
fn total_wal_size(db_path: &Path) -> u64 {
find_wal_files(db_path)
.iter()
.map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
.sum()
}
#[test]
fn test_checkpoint_single_then_recover() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE safe_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO safe_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=40 {
db.execute(
&format!(
"INSERT INTO safe_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let vol_files = find_volume_files(&db_path, "safe_test");
assert!(!vol_files.is_empty(), "Should have at least 1 volume file");
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM safe_test", ()).unwrap();
assert_eq!(count, 40, "All 40 rows should be recovered (volumes + WAL)");
}
#[test]
fn test_checkpoint_two_cycles_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE safe_test2 (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO safe_test2 (id, value) VALUES ({}, 'phase1_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO safe_test2 (id, value) VALUES ({}, 'phase2_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=30 {
db.execute(
&format!(
"INSERT INTO safe_test2 (id, value) VALUES ({}, 'phase3_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM safe_test2", ()).unwrap();
assert_eq!(
count, 30,
"All 30 rows should be recovered from volumes + WAL"
);
}
#[test]
fn test_checkpoint_volume_corrupt_wal_truncated() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE safe_test3 (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO safe_test3 (id, value) VALUES ({}, 'data_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO safe_test3 (id, value) VALUES ({}, 'data_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let vol_files = find_volume_files(&db_path, "safe_test3");
for vol_path in &vol_files {
let mut data = fs::read(vol_path).unwrap();
if data.len() > 10 {
zero_range(&mut data, 0, 10);
fs::write(vol_path, &data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM safe_test3", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative count, got {}", count);
}
Err(_) => {
}
}
}
#[test]
fn test_checkpoint_three_cycles_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE safe_test4 (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO safe_test4 (id, value) VALUES ({}, 'p1_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO safe_test4 (id, value) VALUES ({}, 'p2_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=30 {
db.execute(
&format!(
"INSERT INTO safe_test4 (id, value) VALUES ({}, 'p3_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 31..=40 {
db.execute(
&format!(
"INSERT INTO safe_test4 (id, value) VALUES ({}, 'p4_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM safe_test4", ()).unwrap();
assert_eq!(
count, 40,
"All 40 rows should be recovered from volumes + WAL"
);
}
#[test]
fn test_safe_truncation_keep_count_one() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?keep_snapshots=1&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE keep1_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO keep1_test (id, value) VALUES ({}, 'p1_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO keep1_test (id, value) VALUES ({}, 'p2_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 21..=30 {
db.execute(
&format!(
"INSERT INTO keep1_test (id, value) VALUES ({}, 'p3_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM keep1_test", ()).unwrap();
assert_eq!(
count, 30,
"All 30 rows should be recovered (volumes + WAL replay)"
);
}
#[test]
fn test_checkpoint_leftover_tmp_volume_cleaned_up() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE cleanup_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO cleanup_test (id, value) VALUES ({}, 'v{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 6..=10 {
db.execute(
&format!(
"INSERT INTO cleanup_test (id, value) VALUES ({}, 'v{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=15 {
db.execute(
&format!(
"INSERT INTO cleanup_test (id, value) VALUES ({}, 'v{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let vol_dir = db_path.join("volumes").join("cleanup_test");
if vol_dir.exists() {
fs::write(
vol_dir.join("vol_ffffffffffffffff.vol.tmp"),
b"leftover garbage from crashed seal",
)
.unwrap();
}
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM cleanup_test", ())
.unwrap();
assert_eq!(count, 15, "All 15 rows should be recovered");
}
#[test]
fn test_checkpoint_multi_table_one_volume_corrupt() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE multi_a (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE multi_b (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO multi_a (id, value) VALUES ({}, 'a{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO multi_b (id, value) VALUES ({}, 'b{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!("INSERT INTO multi_a (id, value) VALUES ({}, 'a{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO multi_b (id, value) VALUES ({}, 'b{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=30 {
db.execute(
&format!("INSERT INTO multi_a (id, value) VALUES ({}, 'a{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO multi_b (id, value) VALUES ({}, 'b{}')", i, i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let vols_a = find_volume_files(&db_path, "multi_a");
for vol_path in &vols_a {
let mut data = fs::read(vol_path).unwrap();
if data.len() > 10 {
zero_range(&mut data, 0, 10);
fs::write(vol_path, &data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let count_b: i64 = db.query_one("SELECT COUNT(*) FROM multi_b", ()).unwrap();
assert_eq!(count_b, 30, "Table B: all 30 rows from volumes + WAL");
let result_a: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM multi_a", ());
match result_a {
Ok(count) => {
assert!(
count >= 0,
"Table A should have non-negative count, got {}",
count
);
}
Err(_) => {
}
}
}
#[test]
fn test_safe_truncation_table_created_between_snapshots() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE early_table (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO early_table (id, value) VALUES ({}, 'e{}')",
i, i
),
(),
)
.unwrap();
}
let wal_before_first = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_after_first = total_wal_size(&db_path);
assert!(
wal_after_first < wal_before_first,
"WAL should be truncated after first checkpoint: before={}, after={}",
wal_before_first,
wal_after_first
);
db.execute(
"CREATE TABLE late_table (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO late_table (id, value) VALUES ({}, 'l{}')",
i, i
),
(),
)
.unwrap();
}
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO early_table (id, value) VALUES ({}, 'e{}')",
i, i
),
(),
)
.unwrap();
db.execute(
&format!(
"INSERT INTO late_table (id, value) VALUES ({}, 'l{}')",
i, i
),
(),
)
.unwrap();
}
let wal_before_second = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_after_second = total_wal_size(&db_path);
assert!(
wal_after_second < wal_before_second,
"WAL should be truncated after second checkpoint: before={}, after={}",
wal_before_second,
wal_after_second
);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count_early: i64 = db
.query_one("SELECT COUNT(*) FROM early_table", ())
.unwrap();
let count_late: i64 = db.query_one("SELECT COUNT(*) FROM late_table", ()).unwrap();
assert_eq!(count_early, 20, "early_table should have all 20 rows");
assert_eq!(count_late, 20, "late_table should have all 20 rows");
}
#[test]
fn test_safe_truncation_drop_table_no_block() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE keeper (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE dropper (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO keeper (id, value) VALUES ({}, 'k{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO dropper (id, value) VALUES ({}, 'd{}')", i, i),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
db.execute("DROP TABLE dropper", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!("INSERT INTO keeper (id, value) VALUES ({}, 'k{}')", i, i),
(),
)
.unwrap();
}
let wal_before = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_after = total_wal_size(&db_path);
assert!(
wal_after < wal_before,
"WAL should be truncated after drop + checkpoint: before={}, after={}",
wal_before,
wal_after
);
for i in 21..=30 {
db.execute(
&format!("INSERT INTO keeper (id, value) VALUES ({}, 'k{}')", i, i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM keeper", ()).unwrap();
assert_eq!(count, 30, "All 30 rows in keeper should be recovered");
let result: std::result::Result<i64, _> = db.query_one("SELECT COUNT(*) FROM dropper", ());
assert!(result.is_err(), "dropper table should not exist after drop");
}
#[test]
fn test_checkpoint_all_volumes_corrupt_one_table() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE healthy (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE doomed (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO healthy (id, value) VALUES ({}, 'h{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO doomed (id, value) VALUES ({}, 'd{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!("INSERT INTO healthy (id, value) VALUES ({}, 'h{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO doomed (id, value) VALUES ({}, 'd{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 21..=30 {
db.execute(
&format!("INSERT INTO healthy (id, value) VALUES ({}, 'h{}')", i, i),
(),
)
.unwrap();
db.execute(
&format!("INSERT INTO doomed (id, value) VALUES ({}, 'd{}')", i, i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let doomed_vols = find_volume_files(&db_path, "doomed");
for vol in &doomed_vols {
let mut data = fs::read(vol).unwrap();
if data.len() > 10 {
zero_range(&mut data, 0, 10);
fs::write(vol, &data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let healthy_count: i64 = db.query_one("SELECT COUNT(*) FROM healthy", ()).unwrap();
assert_eq!(
healthy_count, 30,
"Healthy table should have all 30 rows from volumes + WAL"
);
let doomed_result: std::result::Result<i64, _> =
db.query_one("SELECT COUNT(*) FROM doomed", ());
if let Ok(count) = doomed_result {
assert!(
count <= 30,
"Doomed should have at most 30 rows (got {})",
count
);
}
}
#[test]
fn test_checkpoint_drop_and_recreate_same_name() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE reborn (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO reborn (id, value) VALUES ({}, 'old{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
db.execute("DROP TABLE reborn", ()).unwrap();
db.execute(
"CREATE TABLE reborn (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!("INSERT INTO reborn (id, value) VALUES ({}, 'new{}')", i, i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 6..=15 {
db.execute(
&format!("INSERT INTO reborn (id, value) VALUES ({}, 'new{}')", i, i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM reborn", ()).unwrap();
assert_eq!(count, 15, "Recreated table should have 15 rows");
let val: String = db
.query_one("SELECT value FROM reborn WHERE id = 1", ())
.unwrap();
assert_eq!(val, "new1", "Data should be from the recreated table");
}
#[test]
fn test_safe_truncation_keep_count_zero_no_cleanup() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?keep_snapshots=0&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE keep_all (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO keep_all (id, value) VALUES ({}, 'v{}')", i, i),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 11..=20 {
db.execute(
&format!("INSERT INTO keep_all (id, value) VALUES ({}, 'v{}')", i, i),
(),
)
.unwrap();
}
let wal_before = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_after = total_wal_size(&db_path);
assert!(
wal_after < wal_before,
"WAL should be truncated after checkpoint: before={}, after={}",
wal_before,
wal_after
);
for i in 21..=30 {
db.execute(
&format!("INSERT INTO keep_all (id, value) VALUES ({}, 'v{}')", i, i),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM keep_all", ()).unwrap();
assert_eq!(count, 30, "All 30 rows recovered from volumes");
}
#[test]
fn test_safe_truncation_with_updates_and_deletes() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE mut_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL, status TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO mut_test (id, value, status) VALUES ({}, 'original{}', 'active')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 1..=5 {
db.execute(
&format!(
"UPDATE mut_test SET value = 'updated{}', status = 'modified' WHERE id = {}",
i, i
),
(),
)
.unwrap();
}
for i in 16..=20 {
db.execute(&format!("DELETE FROM mut_test WHERE id = {}", i), ())
.unwrap();
}
for i in 21..=25 {
db.execute(
&format!(
"INSERT INTO mut_test (id, value, status) VALUES ({}, 'new{}', 'active')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
db.execute("UPDATE mut_test SET status = 'final' WHERE id <= 5", ())
.unwrap();
db.execute("DELETE FROM mut_test WHERE id = 21", ())
.unwrap();
for i in 26..=30 {
db.execute(
&format!(
"INSERT INTO mut_test (id, value, status) VALUES ({}, 'late{}', 'active')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let updated_val: String = db
.query_one("SELECT value FROM mut_test WHERE id = 3", ())
.unwrap();
assert_eq!(updated_val, "updated3", "UPDATE should be preserved");
let final_status: String = db
.query_one("SELECT status FROM mut_test WHERE id = 3", ())
.unwrap();
assert_eq!(final_status, "final", "Second UPDATE should be preserved");
let deleted_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM mut_test WHERE id BETWEEN 16 AND 20",
(),
)
.unwrap();
assert_eq!(deleted_count, 0, "DELETEd rows should stay deleted");
let late_delete: i64 = db
.query_one("SELECT COUNT(*) FROM mut_test WHERE id = 21", ())
.unwrap();
assert_eq!(late_delete, 0, "Late DELETE should be preserved");
let total: i64 = db.query_one("SELECT COUNT(*) FROM mut_test", ()).unwrap();
assert_eq!(total, 24, "Total rows after all mutations");
}
#[test]
fn test_safe_truncation_survives_restart_cycles() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE cycle (id INTEGER PRIMARY KEY, cycle_num INTEGER NOT NULL, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO cycle (id, cycle_num, value) VALUES ({}, 1, 'c1_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM cycle", ()).unwrap();
assert_eq!(count, 10, "Cycle 2 open: should have 10 rows");
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO cycle (id, cycle_num, value) VALUES ({}, 2, 'c2_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM cycle", ()).unwrap();
assert_eq!(count, 20, "Cycle 3 open: should have 20 rows");
for i in 21..=30 {
db.execute(
&format!(
"INSERT INTO cycle (id, cycle_num, value) VALUES ({}, 3, 'c3_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM cycle", ()).unwrap();
assert_eq!(
count, 30,
"All 30 rows should be recovered from volumes across 3 checkpoint cycles"
);
for cycle_num in 1..=3 {
let cycle_count: i64 = db
.query_one(
&format!("SELECT COUNT(*) FROM cycle WHERE cycle_num = {}", cycle_num),
(),
)
.unwrap();
assert_eq!(cycle_count, 10, "Cycle {} should have 10 rows", cycle_num);
}
}
}
#[test]
fn test_manifest_corrupt_magic_bytes() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE meta_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=15 {
db.execute(
&format!(
"INSERT INTO meta_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 16..=20 {
db.execute(
&format!(
"INSERT INTO meta_test (id, value) VALUES ({}, 'wal_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
if let Some(manifest_path) = find_manifest_file(&db_path, "meta_test") {
let mut data = fs::read(&manifest_path).unwrap();
if data.len() >= 4 {
zero_range(&mut data, 0, 4); fs::write(&manifest_path, &data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM meta_test", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative count, got {}", count);
db.execute(
"INSERT INTO meta_test (id, value) VALUES (100, 'post_recovery')",
(),
)
.unwrap();
}
Err(_) => {
}
}
}
#[test]
fn test_manifest_corrupt_checkpoint_lsn() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE meta_lsn (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO meta_lsn (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
if let Some(manifest_path) = find_manifest_file(&db_path, "meta_lsn") {
let mut data = fs::read(&manifest_path).unwrap();
if data.len() > 20 {
flip_bit(&mut data, 12, 5);
flip_bit(&mut data, 16, 3);
fs::write(&manifest_path, &data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM meta_lsn", ());
if let Ok(count) = result {
assert!(count >= 0, "Should have non-negative count");
}
}
#[test]
fn test_manifest_deleted_volumes_exist() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE meta_del (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=12 {
db.execute(
&format!(
"INSERT INTO meta_del (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let vol_files = find_volume_files(&db_path, "meta_del");
assert!(!vol_files.is_empty(), "Should have volume files");
if let Some(manifest_path) = find_manifest_file(&db_path, "meta_del") {
fs::remove_file(&manifest_path).unwrap();
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM meta_del", ());
if let Ok(count) = result {
assert!(count >= 0, "Should have non-negative count");
}
}
#[test]
fn test_manifest_truncated() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE meta_trunc (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=8 {
db.execute(
&format!(
"INSERT INTO meta_trunc (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
if let Some(manifest_path) = find_manifest_file(&db_path, "meta_trunc") {
let data = fs::read(&manifest_path).unwrap();
if data.len() > 10 {
fs::write(&manifest_path, &data[..data.len() / 2]).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM meta_trunc", ());
if let Ok(count) = result {
assert!(count >= 0, "Should have non-negative count");
}
}
#[test]
fn test_crash_leftover_tmp_volume_file() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE tmp_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO tmp_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let vol_dir = db_path.join("volumes").join("tmp_test");
let _ = fs::create_dir_all(&vol_dir);
fs::write(
vol_dir.join("vol_ffffffffffffffff.vol.tmp"),
b"leftover garbage data from crashed seal write",
)
.unwrap();
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM tmp_test", ()).unwrap();
assert_eq!(count, 10, "All 10 rows should be present");
let vol_files = find_volume_files(&db_path, "tmp_test");
for f in &vol_files {
let name = f.file_name().unwrap().to_string_lossy();
assert!(
!name.ends_with(".tmp"),
"No .tmp files should be in volume listing"
);
}
}
#[test]
fn test_crash_after_volume_write_before_manifest() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE crash_manifest (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO crash_manifest (id, value) VALUES ({}, 'batch1_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO crash_manifest (id, value) VALUES ({}, 'batch2_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
if let Some(manifest_path) = find_manifest_file(&db_path, "crash_manifest") {
fs::remove_file(&manifest_path).unwrap();
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM crash_manifest", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative count, got {}", count);
}
Err(_) => {
}
}
}
#[test]
fn test_crash_partial_volume_file() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE partial_vol (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=15 {
db.execute(
&format!(
"INSERT INTO partial_vol (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 16..=25 {
db.execute(
&format!(
"INSERT INTO partial_vol (id, value) VALUES ({}, 'post_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let vol_files = find_volume_files(&db_path, "partial_vol");
if !vol_files.is_empty() {
let vol_path = &vol_files[0];
let data = fs::read(vol_path).unwrap();
if data.len() > 50 {
fs::write(vol_path, &data[..50]).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM partial_vol", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should have non-negative count");
}
Err(_) => {
}
}
}
#[test]
fn test_ddl_create_index_survives_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE idx_test (id INTEGER PRIMARY KEY, category TEXT NOT NULL, amount FLOAT)",
(),
)
.unwrap();
for i in 1..=20 {
let cat = if i % 2 == 0 { "even" } else { "odd" };
db.execute(
&format!(
"INSERT INTO idx_test (id, category, amount) VALUES ({}, '{}', {})",
i,
cat,
i as f64 * 1.5
),
(),
)
.unwrap();
}
db.execute("CREATE INDEX idx_cat ON idx_test(category)", ())
.unwrap();
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
let vol_files = find_volume_files(&db_path, "idx_test");
assert!(
!vol_files.is_empty(),
"Checkpoint should create volume files"
);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM idx_test", ()).unwrap();
assert_eq!(count, 20, "All 20 rows should be recovered from volumes");
let even_count: i64 = db
.query_one("SELECT COUNT(*) FROM idx_test WHERE category = 'even'", ())
.unwrap();
assert_eq!(even_count, 10, "Should find 10 'even' rows via index");
let odd_count: i64 = db
.query_one("SELECT COUNT(*) FROM idx_test WHERE category = 'odd'", ())
.unwrap();
assert_eq!(odd_count, 10, "Should find 10 'odd' rows via index");
}
#[test]
fn test_ddl_multiple_operations_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE ddl_t1 (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!("INSERT INTO ddl_t1 (id, name) VALUES ({}, 'name_{}')", i, i),
(),
)
.unwrap();
}
db.execute(
"CREATE TABLE ddl_t2 (id INTEGER PRIMARY KEY, data TEXT)",
(),
)
.unwrap();
for i in 1..=3 {
db.execute(
&format!("INSERT INTO ddl_t2 (id, data) VALUES ({}, 'data_{}')", i, i),
(),
)
.unwrap();
}
db.execute("CREATE INDEX idx_name ON ddl_t1(name)", ())
.unwrap();
db.execute("DROP TABLE ddl_t2", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM ddl_t1", ()).unwrap();
assert_eq!(count, 5, "ddl_t1 should have 5 rows");
let name_count: i64 = db
.query_one("SELECT COUNT(*) FROM ddl_t1 WHERE name = 'name_3'", ())
.unwrap();
assert_eq!(name_count, 1, "Should find 1 row via index lookup");
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM ddl_t2", ());
assert!(
result.is_err(),
"ddl_t2 should not exist after DROP TABLE was replayed"
);
}
#[test]
fn test_view_survives_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE view_base (id INTEGER PRIMARY KEY, name TEXT NOT NULL, score INTEGER)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO view_base (id, name, score) VALUES ({}, 'user_{}', {})",
i,
i,
i * 10
),
(),
)
.unwrap();
}
db.execute(
"CREATE VIEW high_scores AS SELECT id, UPPER(name) AS name, score FROM view_base WHERE score >= 50",
(),
)
.unwrap();
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
let vol_files = find_volume_files(&db_path, "view_base");
assert!(
!vol_files.is_empty(),
"Checkpoint should create volume files"
);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let view_count: i64 = db
.query_one("SELECT COUNT(*) FROM high_scores", ())
.unwrap();
assert_eq!(
view_count, 6,
"high_scores view should return 6 rows (scores 50-100)"
);
let name: String = db
.query_one("SELECT name FROM high_scores WHERE id = 5", ())
.unwrap();
assert_eq!(name, "USER_5", "UPPER() should be applied in the view");
}
#[test]
fn test_view_drop_and_recreate_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE vdr_base (id INTEGER PRIMARY KEY, name TEXT NOT NULL, active INTEGER)",
(),
)
.unwrap();
for i in 1..=6 {
let active = if i <= 3 { 1 } else { 0 };
db.execute(
&format!(
"INSERT INTO vdr_base (id, name, active) VALUES ({}, 'user_{}', {})",
i, i, active
),
(),
)
.unwrap();
}
db.execute("CREATE VIEW vdr_view AS SELECT * FROM vdr_base", ())
.unwrap();
db.execute("DROP VIEW vdr_view", ()).unwrap();
db.execute(
"CREATE VIEW vdr_view AS SELECT id, UPPER(name) AS name FROM vdr_base WHERE active = 1",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM vdr_view", ()).unwrap();
assert_eq!(count, 3, "View should return only 3 active users");
let name: String = db
.query_one("SELECT name FROM vdr_view WHERE id = 1", ())
.unwrap();
assert_eq!(name, "USER_1", "New view definition should apply UPPER()");
}
#[test]
fn test_uncommitted_data_not_in_checkpoint() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE uncommit_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO uncommit_test (id, value) VALUES ({}, 'committed_{}')",
i, i
),
(),
)
.unwrap();
}
let mut tx = db.begin().unwrap();
for i in 11..=15 {
tx.execute(
&format!(
"INSERT INTO uncommit_test (id, value) VALUES ({}, 'uncommitted_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
drop(tx);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM uncommit_test", ())
.unwrap();
assert_eq!(
count, 10,
"Only 10 committed rows should survive, uncommitted are lost"
);
}
#[test]
fn test_data_committed_after_checkpoint_survives() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE post_snap (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO post_snap (id, value) VALUES ({}, 'pre_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO post_snap (id, value) VALUES ({}, 'post_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM post_snap", ()).unwrap();
assert_eq!(count, 20, "All 20 rows should survive (volumes + WAL)");
}
#[test]
fn test_check_constraint_enforced_after_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE check_test (id INTEGER PRIMARY KEY, age INTEGER CHECK (age >= 0 AND age <= 150))",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO check_test (id, age) VALUES ({}, {})",
i,
i * 20
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM check_test", ()).unwrap();
assert_eq!(count, 5, "All 5 rows should be recovered from volumes");
let result = db.execute("INSERT INTO check_test (id, age) VALUES (100, -1)", ());
assert!(
result.is_err(),
"CHECK constraint should reject age = -1 after recovery"
);
let result = db.execute("INSERT INTO check_test (id, age) VALUES (101, 200)", ());
assert!(
result.is_err(),
"CHECK constraint should reject age = 200 after recovery"
);
db.execute("INSERT INTO check_test (id, age) VALUES (102, 50)", ())
.unwrap();
}
#[test]
fn test_not_null_constraint_after_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE notnull_test (id INTEGER PRIMARY KEY, name TEXT NOT NULL, score INTEGER)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO notnull_test (id, name, score) VALUES ({}, 'user_{}', {})",
i,
i,
i * 10
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM notnull_test", ())
.unwrap();
assert_eq!(count, 5, "All 5 rows should be recovered");
let result = db.execute(
"INSERT INTO notnull_test (id, name, score) VALUES (100, NULL, 50)",
(),
);
assert!(
result.is_err(),
"NOT NULL constraint should reject NULL name after recovery"
);
db.execute(
"INSERT INTO notnull_test (id, name, score) VALUES (101, 'valid', 60)",
(),
)
.unwrap();
}
#[test]
fn test_unique_index_enforced_after_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE unique_test (id INTEGER PRIMARY KEY, email TEXT NOT NULL)",
(),
)
.unwrap();
db.execute("CREATE UNIQUE INDEX idx_email ON unique_test(email)", ())
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO unique_test (id, email) VALUES ({}, 'user{}@example.com')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM unique_test", ())
.unwrap();
assert_eq!(count, 5, "All 5 rows should be recovered from volumes");
let result = db.execute(
"INSERT INTO unique_test (id, email) VALUES (100, 'user1@example.com')",
(),
);
assert!(
result.is_err(),
"UNIQUE constraint should reject duplicate email after recovery"
);
db.execute(
"INSERT INTO unique_test (id, email) VALUES (101, 'new@example.com')",
(),
)
.unwrap();
}
#[test]
fn test_concurrent_writers_recovery() {
use std::sync::{Arc, Barrier};
use std::thread;
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE conc_test (id INTEGER PRIMARY KEY, thread_id INTEGER, value TEXT)",
(),
)
.unwrap();
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for t in 0..4u32 {
let db_clone = db.clone();
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait();
for i in 0..25u32 {
let id = t * 25 + i + 1;
db_clone
.execute(
&format!(
"INSERT INTO conc_test (id, thread_id, value) VALUES ({}, {}, 'row_{}')",
id, t, id
),
(),
)
.unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM conc_test", ()).unwrap();
assert_eq!(count, 100);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM conc_test", ()).unwrap();
assert_eq!(count, 100, "All 100 concurrent rows should be recovered");
db.execute(
"INSERT INTO conc_test (id, thread_id, value) VALUES (9999, 0, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_concurrent_writers_with_wal_corruption() {
use std::sync::{Arc, Barrier};
use std::thread;
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE conc_corrupt (id INTEGER PRIMARY KEY, value TEXT)",
(),
)
.unwrap();
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for t in 0..4u32 {
let db_clone = db.clone();
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait();
for i in 0..25u32 {
let id = t * 25 + i + 1;
db_clone
.execute(
&format!(
"INSERT INTO conc_corrupt (id, value) VALUES ({}, 'data_{}')",
id, id
),
(),
)
.unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() > 4 {
let mid = entries.len() / 2;
for entry in entries.iter().skip(mid).take(3.min(entries.len() - mid)) {
if entry.data_offset + 4 < data.len() {
flip_bit(&mut data, entry.data_offset + 2, 3);
}
}
fs::write(wal_path, &data).unwrap();
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM conc_corrupt", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should recover some rows, got {}", count);
db.execute(
"INSERT INTO conc_corrupt (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
Err(_) => {
}
}
}
#[test]
fn test_concurrent_writers_checkpoint_recovery() {
use std::sync::{Arc, Barrier};
use std::thread;
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE conc_snap (id INTEGER PRIMARY KEY, phase INTEGER, value TEXT)",
(),
)
.unwrap();
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for t in 0..4u32 {
let db_clone = db.clone();
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait();
for i in 0..25u32 {
let id = t * 25 + i + 1;
db_clone
.execute(
&format!(
"INSERT INTO conc_snap (id, phase, value) VALUES ({}, 1, 'p1_{}')",
id, id
),
(),
)
.unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
let barrier2 = Arc::new(Barrier::new(4));
let mut handles2 = Vec::new();
for t in 0..4u32 {
let db_clone = db.clone();
let bar = Arc::clone(&barrier2);
handles2.push(thread::spawn(move || {
bar.wait();
for i in 0..25u32 {
let id = 100 + t * 25 + i + 1;
db_clone
.execute(
&format!(
"INSERT INTO conc_snap (id, phase, value) VALUES ({}, 2, 'p2_{}')",
id, id
),
(),
)
.unwrap();
}
}));
}
for h in handles2 {
h.join().unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM conc_snap", ()).unwrap();
assert_eq!(count, 200);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM conc_snap", ()).unwrap();
assert_eq!(count, 200, "All 200 rows should survive (volumes + WAL)");
db.execute(
"INSERT INTO conc_snap (id, phase, value) VALUES (9999, 0, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_concurrent_transactions_uncommitted_lost() {
use std::sync::{Arc, Barrier};
use std::thread;
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE uncommit_test (id INTEGER PRIMARY KEY, source TEXT)",
(),
)
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let db1 = db.clone();
let bar1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
bar1.wait();
for i in 1..=10 {
db1.execute(
&format!(
"INSERT INTO uncommit_test (id, source) VALUES ({}, 'committed')",
i
),
(),
)
.unwrap();
}
});
let db2 = db.clone();
let bar2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
bar2.wait();
let mut tx = db2.begin().unwrap();
for i in 11..=20 {
tx.execute(
&format!(
"INSERT INTO uncommit_test (id, source) VALUES ({}, 'uncommitted')",
i
),
(),
)
.unwrap();
}
drop(tx);
});
h1.join().unwrap();
h2.join().unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM uncommit_test", ())
.unwrap();
assert_eq!(
count, 10,
"Only 10 committed rows should survive, got {}",
count
);
db.execute(
"INSERT INTO uncommit_test (id, source) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_rotation_artifacts_bak_file_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE bak_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO bak_test (id, value, seq) VALUES ({}, 'row_{}', {})",
i, i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty(), "No WAL files found");
let wal_path = &wal_files[0];
let bak_path = wal_path.with_extension("log.bak");
fs::rename(wal_path, &bak_path).unwrap();
assert!(bak_path.exists());
let remaining = find_wal_files(&db_path);
assert!(remaining.is_empty(), "Should have no .log WAL files");
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM bak_test", ()).unwrap();
assert_eq!(count, 20, "All 20 rows should be recovered from .bak file");
db.execute(
"INSERT INTO bak_test (id, value, seq) VALUES (999, 'post_recovery', 999)",
(),
)
.unwrap();
}
#[test]
fn test_rotation_artifacts_temp_file_cleanup() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE temp_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO temp_test (id, value, seq) VALUES ({}, 'row_{}', {})",
i, i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_dir = db_path.join("wal");
let temp_file = wal_dir.join("wal-temp-999.log");
fs::write(&temp_file, b"garbage data that should be cleaned up").unwrap();
assert!(temp_file.exists());
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM temp_test", ()).unwrap();
assert_eq!(
count, 20,
"All 20 rows should survive with temp file present"
);
assert!(
!temp_file.exists(),
"Temp file should be cleaned up during recovery"
);
}
#[test]
fn test_multiple_wal_files_newest_corrupt() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE multi_wal (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for i in 1..=30 {
db.execute(
&format!(
"INSERT INTO multi_wal (id, value, seq) VALUES ({}, 'row_{}', {})",
i, i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_dir = db_path.join("wal");
let corrupt_wal = wal_dir.join("wal_00000001-99999999-lsn-99999.log");
fs::write(&corrupt_wal, b"this is not a valid WAL file at all").unwrap();
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM multi_wal", ()).unwrap();
assert!(
count >= 1,
"At least some rows should survive from valid WAL file, got {}",
count
);
db.execute(
"INSERT INTO multi_wal (id, value, seq) VALUES (9999, 'post_recovery', 0)",
(),
)
.unwrap();
}
#[test]
fn test_stale_checkpoint_references_missing_wal() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE stale_cp (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO stale_cp (id, value, seq) VALUES ({}, 'row_{}', {})",
i, i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
if let Some(cp_path) = find_checkpoint_file(&db_path) {
let mut cp_data = fs::read(&cp_path).unwrap();
if cp_data.len() > 8 {
for b in cp_data.iter_mut().skip(4).take(20) {
*b = 0xFF;
}
fs::write(&cp_path, &cp_data).unwrap();
}
}
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM stale_cp", ()).unwrap();
assert!(
count >= 1,
"Data should be recovered despite corrupt checkpoint, got {}",
count
);
db.execute(
"INSERT INTO stale_cp (id, value, seq) VALUES (9999, 'post_recovery', 0)",
(),
)
.unwrap();
}
#[test]
fn test_power_loss_truncated_last_entry() {
let fixture = setup_test_db(20, 1);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2, "Need at least 2 entries");
let last = &entries[entries.len() - 1];
let cut_point = last.offset + last.total_size / 2;
fs::write(wal_path, &data[..cut_point]).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_power_loss_garbage_appended() {
let fixture = setup_test_db(15, 1);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
data.extend_from_slice(&[0xDE; 200]);
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 15);
}
#[test]
fn test_power_loss_last_page_zeroed() {
let fixture = setup_test_db(20, 1);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let file_len = data.len();
if file_len > 4096 {
let last_page = (file_len - 1) / 4096;
zero_page(&mut data, last_page);
fs::write(wal_path, &data).unwrap();
}
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_power_loss_commit_marker_torn() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE commit_torn (id INTEGER PRIMARY KEY, value TEXT NOT NULL, seq INTEGER)",
(),
)
.unwrap();
for txn in 0..4 {
let mut tx = db.begin().unwrap();
for row in 0..5 {
let id = txn * 5 + row + 1;
tx.execute(
&format!(
"INSERT INTO commit_torn (id, value, seq) VALUES ({}, 'txn{}_row{}', {})",
id, txn, row, txn
),
(),
)
.unwrap();
}
tx.commit().unwrap();
}
let count: i64 = db
.query_one("SELECT COUNT(*) FROM commit_torn", ())
.unwrap();
assert_eq!(count, 20);
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let commit_entries: Vec<_> = entries
.iter()
.enumerate()
.filter(|(_, e)| e.flags & COMMIT_MARKER_FLAG != 0)
.collect();
if !commit_entries.is_empty() {
let (_, last_commit) = commit_entries[commit_entries.len() - 1];
let cut_point = last_commit.offset + last_commit.total_size / 2;
fs::write(wal_path, &data[..cut_point]).unwrap();
}
let fixture = TestFixture {
_dir: dir,
db_path,
dsn,
};
verify_recovery_at_least(&fixture, "commit_torn", 0);
}
#[test]
fn test_compression_threshold_boundary() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
let small_value = "x".repeat(63); let large_value = "y".repeat(65);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE threshold_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO threshold_test (id, value) VALUES ({}, '{}')",
i, small_value
),
(),
)
.unwrap();
}
for i in 11..=20 {
db.execute(
&format!(
"INSERT INTO threshold_test (id, value) VALUES ({}, '{}')",
i, large_value
),
(),
)
.unwrap();
}
let count: i64 = db
.query_one("SELECT COUNT(*) FROM threshold_test", ())
.unwrap();
assert_eq!(count, 20);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM threshold_test", ())
.unwrap();
assert_eq!(count, 20, "All 20 rows should be recovered");
let small_count: i64 = db
.query_one(
&format!(
"SELECT COUNT(*) FROM threshold_test WHERE value = '{}'",
small_value
),
(),
)
.unwrap();
assert_eq!(small_count, 10, "All 10 small rows should match exactly");
let large_count: i64 = db
.query_one(
&format!(
"SELECT COUNT(*) FROM threshold_test WHERE value = '{}'",
large_value
),
(),
)
.unwrap();
assert_eq!(large_count, 10, "All 10 large rows should match exactly");
}
#[test]
fn test_large_row_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
let large_text = "L".repeat(100_000);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE large_row (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO large_row (id, value) VALUES ({}, '{}')",
i, large_text
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM large_row", ()).unwrap();
assert_eq!(count, 5);
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
let dml_entries: Vec<_> = entries
.iter()
.filter(|e| e.flags & COMMIT_MARKER_FLAG == 0 && e.entry_size > 100)
.collect();
if !dml_entries.is_empty() {
let target = dml_entries[dml_entries.len() / 2];
flip_bit(&mut data, target.data_offset + 50, 5);
fs::write(wal_path, &data).unwrap();
}
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM large_row", ());
match result {
Ok(count) => {
assert!(count >= 0, "Should recover some rows, got {}", count);
db.execute(
"INSERT INTO large_row (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
Err(_) => {
}
}
}
#[test]
fn test_many_small_rows_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE many_rows (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
let mut id = 1;
for _ in 0..10 {
let mut tx = db.begin().unwrap();
for _ in 0..100 {
tx.execute(
&format!(
"INSERT INTO many_rows (id, value) VALUES ({}, 'small_{}')",
id, id
),
(),
)
.unwrap();
id += 1;
}
tx.commit().unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM many_rows", ()).unwrap();
assert_eq!(count, 1000);
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() > 10 {
let mid_entry = &entries[entries.len() / 2];
zero_range(&mut data, mid_entry.crc_offset, CRC_SIZE);
fs::write(wal_path, &data).unwrap();
}
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM many_rows", ()).unwrap();
assert!(
count >= 900,
"At least 900 of 1000 rows should survive, got {}",
count
);
db.execute(
"INSERT INTO many_rows (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_mixed_size_rows_with_checkpoint() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
let tiny = "t".repeat(5);
let medium = "m".repeat(200);
let large = "L".repeat(50_000);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE mixed_size (id INTEGER PRIMARY KEY, category TEXT, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO mixed_size (id, category, value) VALUES ({}, 'tiny', '{}')",
i, tiny
),
(),
)
.unwrap();
}
for i in 6..=10 {
db.execute(
&format!(
"INSERT INTO mixed_size (id, category, value) VALUES ({}, 'medium', '{}')",
i, medium
),
(),
)
.unwrap();
}
for i in 11..=13 {
db.execute(
&format!(
"INSERT INTO mixed_size (id, category, value) VALUES ({}, 'large', '{}')",
i, large
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 14..=18 {
db.execute(
&format!(
"INSERT INTO mixed_size (id, category, value) VALUES ({}, 'tiny', '{}')",
i, tiny
),
(),
)
.unwrap();
}
for i in 19..=23 {
db.execute(
&format!(
"INSERT INTO mixed_size (id, category, value) VALUES ({}, 'medium', '{}')",
i, medium
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM mixed_size", ()).unwrap();
assert_eq!(count, 23);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM mixed_size", ()).unwrap();
assert_eq!(count, 23, "All 23 mixed-size rows should be recovered");
let tiny_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM mixed_size WHERE category = 'tiny'",
(),
)
.unwrap();
assert_eq!(tiny_count, 10, "10 tiny rows expected");
let medium_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM mixed_size WHERE category = 'medium'",
(),
)
.unwrap();
assert_eq!(medium_count, 10, "10 medium rows expected");
let large_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM mixed_size WHERE category = 'large'",
(),
)
.unwrap();
assert_eq!(large_count, 3, "3 large rows expected");
db.execute(
"INSERT INTO mixed_size (id, category, value) VALUES (9999, 'post', 'recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_truncation_and_replay() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO trunc_test (id, value) VALUES ({}, 'phase1_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_before = find_wal_files(&db_path);
let wal_size_before = total_wal_size(&db_path);
for i in 21..=40 {
db.execute(
&format!(
"INSERT INTO trunc_test (id, value) VALUES ({}, 'phase2_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_after = find_wal_files(&db_path);
let wal_size_after = total_wal_size(&db_path);
if !wal_before.is_empty() && !wal_after.is_empty() {
assert_ne!(
wal_before[0].file_name(),
wal_after[0].file_name(),
"WAL filename should change after truncation"
);
}
assert!(
wal_size_after <= wal_size_before,
"WAL should not grow after truncation: before={}, after={}",
wal_size_before,
wal_size_after
);
for i in 41..=50 {
db.execute(
&format!(
"INSERT INTO trunc_test (id, value) VALUES ({}, 'phase3_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(count, 50);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(
count, 50,
"All 50 rows should be recovered (snapshot + truncated WAL)"
);
db.execute(
"INSERT INTO trunc_test (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_truncation_then_corrupt_new_wal() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_corrupt (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO trunc_corrupt (id, value) VALUES ({}, 'p1_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 21..=40 {
db.execute(
&format!(
"INSERT INTO trunc_corrupt (id, value) VALUES ({}, 'p2_{}')",
i, i
),
(),
)
.unwrap();
}
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 41..=50 {
db.execute(
&format!(
"INSERT INTO trunc_corrupt (id, value) VALUES ({}, 'p3_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
if data.len() > 64 {
let mid = data.len() / 2;
let len = (data.len() - mid).min(256);
zero_range(&mut data, mid, len);
fs::write(wal_path, &data).unwrap();
}
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM trunc_corrupt", ())
.unwrap();
assert!(
count >= 40,
"Snapshot should provide at least 40 rows, got {}",
count
);
db.execute(
"INSERT INTO trunc_corrupt (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_entry_size_exceeds_sanity_limit() {
let fixture = setup_test_db(10, 1);
let wal_files = find_wal_files(&fixture.db_path);
assert!(!wal_files.is_empty());
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
assert!(entries.len() >= 2, "Need at least 2 entries");
let target = &entries[entries.len() / 2];
let size_offset = target.offset + 24;
let huge_size: u32 = 70 * 1024 * 1024;
data[size_offset..size_offset + 4].copy_from_slice(&huge_size.to_le_bytes());
fs::write(wal_path, &data).unwrap();
remove_lock_file(&fixture.db_path);
verify_recovery_at_least(&fixture, "test_data", 0);
}
#[test]
fn test_large_wal_full_replay() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE large_wal (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
let mut id = 1;
for _ in 0..10 {
let mut tx = db.begin().unwrap();
for _ in 0..50 {
const CHARS: &[u8] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let value: String = (0..500)
.map(|j| CHARS[((id * 7 + j * 13 + 37) % CHARS.len() as i32) as usize] as char)
.collect();
db.execute(
&format!(
"INSERT INTO large_wal (id, value) VALUES ({}, '{}')",
id, value
),
(),
)
.unwrap();
id += 1;
}
tx.commit().unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM large_wal", ()).unwrap();
assert_eq!(count, 500);
}
remove_lock_file(&db_path);
let wal_size = total_wal_size(&db_path);
assert!(
wal_size > 100_000,
"WAL should be > 100KB with high-entropy data, got {} bytes",
wal_size
);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM large_wal", ()).unwrap();
assert_eq!(
count, 500,
"All 500 rows should be recovered from large WAL"
);
let row1_value: String = db
.query_one("SELECT value FROM large_wal WHERE id = 1", ())
.unwrap();
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let expected: String = (0..500)
.map(|j| CHARS[((7 + j * 13 + 37) % CHARS.len() as i32) as usize] as char)
.collect();
assert_eq!(row1_value, expected, "Row 1 value should match exactly");
db.execute(
"INSERT INTO large_wal (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_and_replay() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_test", ()).unwrap();
assert_eq!(count, 50);
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Expected multiple WAL files from rotation, got {}",
wal_files.len()
);
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_test", ()).unwrap();
assert_eq!(
count, 50,
"All 50 rows should be recovered from multi-file WAL replay"
);
db.execute(
"INSERT INTO rot_test (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_oldest_corrupt() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_oldest (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_oldest (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Need multiple WAL files, got {}",
wal_files.len()
);
let oldest = &wal_files[0];
let mut data = fs::read(oldest).unwrap();
if data.len() > 32 {
let mid = data.len() / 2;
let len = (data.len() - mid).min(256);
zero_range(&mut data, mid, len);
fs::write(oldest, &data).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM rot_oldest", ());
match result {
Ok(count) => {
assert!(
count > 0,
"At least some rows should survive from newer WAL files"
);
}
Err(_) => {
}
}
}
#[test]
fn test_wal_rotation_newest_corrupt() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_newest (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_newest (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Need multiple WAL files, got {}",
wal_files.len()
);
let newest = &wal_files[wal_files.len() - 1];
let len = fs::metadata(newest).unwrap().len() as usize;
fs::write(newest, vec![0u8; len]).unwrap();
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM rot_newest", ());
match result {
Ok(count) => {
assert!(
count > 0,
"At least some rows should survive from older WAL files, got {}",
count
);
assert!(count <= 50);
}
Err(_) => {
}
}
}
#[test]
fn test_wal_rotation_snapshot_cleans_old_files() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_clean (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_clean (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let wal_phase1 = find_wal_files(&db_path);
assert!(
wal_phase1.len() >= 2,
"Expected multiple WAL files from rotation, got {}",
wal_phase1.len()
);
let _ = db.execute("PRAGMA CHECKPOINT", ());
for i in 51..=80 {
db.execute(
&format!(
"INSERT INTO rot_clean (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let wal_size_before_snap2 = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_size_after_snap2 = total_wal_size(&db_path);
assert!(
wal_size_after_snap2 < wal_size_before_snap2,
"WAL size should decrease after 2nd checkpoint: before={}, after={}",
wal_size_before_snap2,
wal_size_after_snap2
);
for i in 81..=90 {
db.execute(
&format!(
"INSERT INTO rot_clean (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let wal_size_before_snap3 = total_wal_size(&db_path);
let _ = db.execute("PRAGMA CHECKPOINT", ());
let wal_size_after_snap3 = total_wal_size(&db_path);
assert!(
wal_size_after_snap3 < wal_size_before_snap3,
"WAL size should decrease after 3rd checkpoint: before={}, after={}",
wal_size_before_snap3,
wal_size_after_snap3
);
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_clean", ()).unwrap();
assert_eq!(count, 90);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_clean", ()).unwrap();
assert_eq!(
count, 90,
"All 90 rows should be recovered after rotation + truncation cleanup"
);
db.execute(
"INSERT INTO rot_clean (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_cleanup_preserves_boundary_entries() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
let total_rows;
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_boundary (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_boundary (id, value) VALUES ({}, 'phase1_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
for i in 51..=80 {
db.execute(
&format!(
"INSERT INTO rot_boundary (id, value) VALUES ({}, 'phase2_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
total_rows = 80;
let count: i64 = db
.query_one("SELECT COUNT(*) FROM rot_boundary", ())
.unwrap();
assert_eq!(count, total_rows);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM rot_boundary", ())
.unwrap();
assert_eq!(
count, total_rows,
"All {} rows must survive volume recovery (got {}).",
total_rows, count
);
db.execute(
"INSERT INTO rot_boundary (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_crash_mid_rotate() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_crash (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO rot_crash (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_crash", ()).unwrap();
assert_eq!(count, 50);
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Need multiple WAL files from rotation, got {}",
wal_files.len()
);
let wal_dir = db_path.join("wal");
let checkpoint_path = wal_dir.join("checkpoint.meta");
if checkpoint_path.exists() {
fs::remove_file(&checkpoint_path).unwrap();
}
let orphan_path = wal_dir.join("wal_99999999-20260101-120000-lsn-999999.log");
fs::write(&orphan_path, b"").unwrap();
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_crash", ()).unwrap();
assert_eq!(
count, 50,
"All 50 rows should be recovered when checkpoint.meta is missing \
and WAL files must be discovered by scanning"
);
db.execute(
"INSERT INTO rot_crash (id, value) VALUES (9999, 'post_crash')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_update_delete_replay() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!(
"file://{}?wal_max_size=500&checkpoint_on_close=off",
db_path.display()
);
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_upd (id INTEGER PRIMARY KEY, value TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active')",
(),
)
.unwrap();
for i in 1..=30 {
db.execute(
&format!(
"INSERT INTO rot_upd (id, value) VALUES ({}, 'original_{}')",
i, i
),
(),
)
.unwrap();
}
for i in 1..=10 {
db.execute(
&format!(
"UPDATE rot_upd SET value = 'updated_{}', status = 'modified' WHERE id = {}",
i, i
),
(),
)
.unwrap();
}
for i in 21..=25 {
db.execute(&format!("DELETE FROM rot_upd WHERE id = {}", i), ())
.unwrap();
}
let total: i64 = db.query_one("SELECT COUNT(*) FROM rot_upd", ()).unwrap();
assert_eq!(total, 25);
let modified: i64 = db
.query_one("SELECT COUNT(*) FROM rot_upd WHERE status = 'modified'", ())
.unwrap();
assert_eq!(modified, 10);
let active: i64 = db
.query_one("SELECT COUNT(*) FROM rot_upd WHERE status = 'active'", ())
.unwrap();
assert_eq!(active, 15);
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Expected rotation to produce multiple WAL files, got {}",
wal_files.len()
);
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM rot_upd", ()).unwrap();
assert_eq!(
total, 25,
"Expected 25 rows (30 inserted - 5 deleted) after replay, got {}",
total
);
let modified: i64 = db
.query_one("SELECT COUNT(*) FROM rot_upd WHERE status = 'modified'", ())
.unwrap();
assert_eq!(
modified, 10,
"Expected 10 modified rows after replay, got {}",
modified
);
let active: i64 = db
.query_one("SELECT COUNT(*) FROM rot_upd WHERE status = 'active'", ())
.unwrap();
assert_eq!(
active, 15,
"Expected 15 active rows after replay, got {}",
active
);
let val: String = db
.query_one("SELECT value FROM rot_upd WHERE id = 5", ())
.unwrap();
assert_eq!(val, "updated_5");
let deleted_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM rot_upd WHERE id BETWEEN 21 AND 25",
(),
)
.unwrap();
assert_eq!(deleted_count, 0, "Deleted rows should not reappear");
db.execute(
"INSERT INTO rot_upd (id, value) VALUES (9999, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_wal_rotation_explicit_transaction() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?wal_max_size=200", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE rot_txn (id INTEGER PRIMARY KEY, value TEXT NOT NULL, batch INTEGER NOT NULL)",
(),
)
.unwrap();
for batch in 0..5 {
db.execute("BEGIN", ()).unwrap();
for row in 0..6 {
let id = batch * 6 + row + 1;
db.execute(
&format!(
"INSERT INTO rot_txn (id, value, batch) VALUES ({}, 'b{}_r{}', {})",
id, batch, row, batch
),
(),
)
.unwrap();
}
db.execute("COMMIT", ()).unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_txn", ()).unwrap();
assert_eq!(count, 30);
}
let wal_files = find_wal_files(&db_path);
assert!(
wal_files.len() >= 2,
"Expected rotation to fire mid-transaction with wal_max_size=200, got {} files",
wal_files.len()
);
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM rot_txn", ()).unwrap();
assert_eq!(
count, 30,
"All 30 rows from 5 committed transactions should be recovered, got {}",
count
);
for batch in 0..5 {
let batch_count: i64 = db
.query_one(
&format!("SELECT COUNT(*) FROM rot_txn WHERE batch = {}", batch),
(),
)
.unwrap();
assert_eq!(
batch_count, 6,
"Batch {} has {} rows — expected 6 (atomic commit across rotation boundary)",
batch, batch_count
);
}
db.execute(
"INSERT INTO rot_txn (id, value, batch) VALUES (9999, 'post', 99)",
(),
)
.unwrap();
}
#[test]
fn test_drop_index_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE idx_drop (id INTEGER PRIMARY KEY, category TEXT NOT NULL, value INTEGER)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO idx_drop (id, category, value) VALUES ({}, 'cat_{}', {})",
i,
i % 5,
i * 10
),
(),
)
.unwrap();
}
db.execute("CREATE INDEX idx_cat ON idx_drop(category)", ())
.unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM idx_drop WHERE category = 'cat_0'", ())
.unwrap();
assert_eq!(count, 4);
db.execute("DROP INDEX idx_cat ON idx_drop", ()).unwrap();
let count2: i64 = db
.query_one("SELECT COUNT(*) FROM idx_drop WHERE category = 'cat_1'", ())
.unwrap();
assert_eq!(count2, 4);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM idx_drop", ()).unwrap();
assert_eq!(count, 20, "All 20 rows should survive after recovery");
let count2: i64 = db
.query_one("SELECT COUNT(*) FROM idx_drop WHERE category = 'cat_2'", ())
.unwrap();
assert_eq!(count2, 4);
db.execute("CREATE INDEX idx_cat ON idx_drop(category)", ())
.unwrap();
let count3: i64 = db
.query_one("SELECT COUNT(*) FROM idx_drop WHERE category = 'cat_3'", ())
.unwrap();
assert_eq!(count3, 4);
}
#[test]
fn test_truncate_table_survives_close_reopen() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_test (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO trunc_test (id, value) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(count, 50);
db.execute("TRUNCATE TABLE trunc_test", ()).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(count, 0);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(count, 0, "TRUNCATE should persist — table must be empty");
db.execute(
"INSERT INTO trunc_test (id, value) VALUES (1, 'after_truncate')",
(),
)
.unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_test", ()).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_truncate_table_then_insert_survives_close_reopen() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_ins (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO trunc_ins (id, value) VALUES ({}, 'old_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("TRUNCATE TABLE trunc_ins", ()).unwrap();
for i in 100..=105 {
db.execute(
&format!(
"INSERT INTO trunc_ins (id, value) VALUES ({}, 'new_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_ins", ()).unwrap();
assert_eq!(count, 6);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_ins", ()).unwrap();
assert_eq!(count, 6, "Only post-truncate rows should exist");
let old: i64 = db
.query_one(
"SELECT COUNT(*) FROM trunc_ins WHERE value LIKE 'old_%'",
(),
)
.unwrap();
assert_eq!(old, 0, "Pre-truncate rows must not reappear");
let new: i64 = db
.query_one(
"SELECT COUNT(*) FROM trunc_ins WHERE value LIKE 'new_%'",
(),
)
.unwrap();
assert_eq!(new, 6, "Post-truncate rows must all survive");
}
#[test]
fn test_truncate_table_with_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_snap (id INTEGER PRIMARY KEY, value TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=30 {
db.execute(
&format!(
"INSERT INTO trunc_snap (id, value) VALUES ({}, 'snap_row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
db.execute("TRUNCATE TABLE trunc_snap", ()).unwrap();
for i in 100..=102 {
db.execute(
&format!(
"INSERT INTO trunc_snap (id, value) VALUES ({}, 'post_trunc_{}')",
i, i
),
(),
)
.unwrap();
}
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_snap", ()).unwrap();
assert_eq!(count, 3);
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM trunc_snap", ()).unwrap();
assert_eq!(
count, 3,
"Only post-truncate rows should exist after checkpoint + WAL replay"
);
let old: i64 = db
.query_one(
"SELECT COUNT(*) FROM trunc_snap WHERE value LIKE 'snap_row_%'",
(),
)
.unwrap();
assert_eq!(old, 0, "Volume data must not survive TRUNCATE in WAL");
}
#[test]
fn test_truncate_with_index_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE trunc_idx (id INTEGER PRIMARY KEY, category TEXT NOT NULL, val INTEGER)",
(),
)
.unwrap();
db.execute("CREATE INDEX idx_trunc_cat ON trunc_idx(category)", ())
.unwrap();
for i in 1..=40 {
db.execute(
&format!(
"INSERT INTO trunc_idx (id, category, val) VALUES ({}, 'cat_{}', {})",
i,
i % 4,
i * 10
),
(),
)
.unwrap();
}
db.execute("TRUNCATE TABLE trunc_idx", ()).unwrap();
db.execute(
"INSERT INTO trunc_idx (id, category, val) VALUES (100, 'cat_0', 999)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM trunc_idx", ()).unwrap();
assert_eq!(total, 1, "Only the post-truncate row should exist");
let cat0: i64 = db
.query_one(
"SELECT COUNT(*) FROM trunc_idx WHERE category = 'cat_0'",
(),
)
.unwrap();
assert_eq!(
cat0, 1,
"Index query should find only the post-truncate row"
);
let cat1: i64 = db
.query_one(
"SELECT COUNT(*) FROM trunc_idx WHERE category = 'cat_1'",
(),
)
.unwrap();
assert_eq!(cat1, 0, "Pre-truncate index entries must be gone");
}
#[test]
fn test_alter_table_add_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE alter_add (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO alter_add (id, name) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("ALTER TABLE alter_add ADD COLUMN score INTEGER", ())
.unwrap();
db.execute(
"INSERT INTO alter_add (id, name, score) VALUES (11, 'with_score', 100)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM alter_add", ()).unwrap();
assert_eq!(count, 11, "All 11 rows should survive");
let score: i64 = db
.query_one("SELECT score FROM alter_add WHERE id = 11", ())
.unwrap();
assert_eq!(score, 100, "New column value must survive recovery");
let null_count: i64 = db
.query_one("SELECT COUNT(*) FROM alter_add WHERE score IS NULL", ())
.unwrap();
assert_eq!(
null_count, 10,
"Old rows should have NULL in the new column"
);
db.execute(
"INSERT INTO alter_add (id, name, score) VALUES (12, 'post_recovery', 200)",
(),
)
.unwrap();
}
#[test]
fn test_alter_table_drop_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE alter_drop (id INTEGER PRIMARY KEY, name TEXT NOT NULL, extra TEXT)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO alter_drop (id, name, extra) VALUES ({}, 'row_{}', 'extra_{}')",
i, i, i
),
(),
)
.unwrap();
}
db.execute("ALTER TABLE alter_drop DROP COLUMN extra", ())
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM alter_drop", ()).unwrap();
assert_eq!(count, 10, "All rows should survive");
let result: Result<i64, _> = db.query_one(
"SELECT COUNT(*) FROM alter_drop WHERE extra IS NOT NULL",
(),
);
assert!(
result.is_err(),
"Column 'extra' should not exist after DROP COLUMN recovery"
);
db.execute(
"INSERT INTO alter_drop (id, name) VALUES (11, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_alter_table_rename_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE alter_rename (id INTEGER PRIMARY KEY, old_name TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO alter_rename (id, old_name) VALUES ({}, 'val_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute(
"ALTER TABLE alter_rename RENAME COLUMN old_name TO new_name",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one(
"SELECT COUNT(*) FROM alter_rename WHERE new_name IS NOT NULL",
(),
)
.unwrap();
assert_eq!(count, 5, "All rows accessible via renamed column");
let result: Result<i64, _> = db.query_one(
"SELECT COUNT(*) FROM alter_rename WHERE old_name IS NOT NULL",
(),
);
assert!(
result.is_err(),
"Old column name 'old_name' should not exist after RENAME recovery"
);
db.execute(
"INSERT INTO alter_rename (id, new_name) VALUES (6, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_alter_table_rename_table_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE old_tbl (id INTEGER PRIMARY KEY, data TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!("INSERT INTO old_tbl (id, data) VALUES ({}, 'row_{}')", i, i),
(),
)
.unwrap();
}
db.execute("ALTER TABLE old_tbl RENAME TO new_tbl", ())
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM new_tbl", ()).unwrap();
assert_eq!(
count, 10,
"All rows should be accessible via new table name"
);
let result: Result<i64, _> = db.query_one("SELECT COUNT(*) FROM old_tbl", ());
assert!(
result.is_err(),
"Old table name 'old_tbl' should not exist after RENAME recovery"
);
db.execute(
"INSERT INTO new_tbl (id, data) VALUES (11, 'post_recovery')",
(),
)
.unwrap();
}
#[test]
fn test_alter_table_with_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE alter_snap (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO alter_snap (id, name) VALUES ({}, 'row_{}')",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
db.execute("ALTER TABLE alter_snap ADD COLUMN status TEXT", ())
.unwrap();
db.execute(
"INSERT INTO alter_snap (id, name, status) VALUES (11, 'new_row', 'active')",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM alter_snap", ()).unwrap();
assert_eq!(count, 11, "All 11 rows should survive");
let status: String = db
.query_one("SELECT status FROM alter_snap WHERE id = 11", ())
.unwrap();
assert_eq!(status, "active", "New column value must survive");
let null_count: i64 = db
.query_one("SELECT COUNT(*) FROM alter_snap WHERE status IS NULL", ())
.unwrap();
assert_eq!(null_count, 10);
}
#[test]
fn test_alter_table_multiple_operations_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE alter_multi (id INTEGER PRIMARY KEY, a TEXT, b TEXT, c TEXT)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!(
"INSERT INTO alter_multi (id, a, b, c) VALUES ({}, 'a{}', 'b{}', 'c{}')",
i, i, i, i
),
(),
)
.unwrap();
}
db.execute("ALTER TABLE alter_multi DROP COLUMN c", ())
.unwrap();
db.execute("ALTER TABLE alter_multi ADD COLUMN d INTEGER", ())
.unwrap();
db.execute("ALTER TABLE alter_multi RENAME COLUMN b TO beta", ())
.unwrap();
db.execute(
"INSERT INTO alter_multi (id, a, beta, d) VALUES (6, 'a6', 'beta6', 42)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM alter_multi", ())
.unwrap();
assert_eq!(count, 6, "All rows should survive");
let result: Result<i64, _> =
db.query_one("SELECT COUNT(*) FROM alter_multi WHERE c IS NOT NULL", ());
assert!(result.is_err(), "Column 'c' should be dropped");
let beta_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM alter_multi WHERE beta IS NOT NULL",
(),
)
.unwrap();
assert_eq!(beta_count, 6, "Column 'beta' (renamed from 'b') must exist");
let d_val: i64 = db
.query_one("SELECT d FROM alter_multi WHERE id = 6", ())
.unwrap();
assert_eq!(d_val, 42, "New column 'd' must have correct value");
}
#[test]
fn test_empty_table_survives_close_reopen() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE empty_tbl (id INTEGER PRIMARY KEY, value TEXT)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM empty_tbl", ()).unwrap();
assert_eq!(count, 0, "Empty table should exist with 0 rows");
db.execute(
"INSERT INTO empty_tbl (id, value) VALUES (1, 'first_row')",
(),
)
.unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM empty_tbl", ()).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_multiple_empty_tables_survive() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute("CREATE TABLE empty_a (id INTEGER PRIMARY KEY)", ())
.unwrap();
db.execute(
"CREATE TABLE empty_b (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE empty_c (id INTEGER PRIMARY KEY, x INTEGER, y INTEGER)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
for tbl in &["empty_a", "empty_b", "empty_c"] {
let count: i64 = db
.query_one(&format!("SELECT COUNT(*) FROM {}", tbl), ())
.unwrap();
assert_eq!(count, 0, "Table '{}' should exist and be empty", tbl);
}
db.execute("INSERT INTO empty_a (id) VALUES (1)", ())
.unwrap();
db.execute("INSERT INTO empty_b (id, name) VALUES (1, 'test')", ())
.unwrap();
db.execute("INSERT INTO empty_c (id, x, y) VALUES (1, 10, 20)", ())
.unwrap();
}
#[test]
fn test_empty_table_with_index_survives() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE empty_idx (id INTEGER PRIMARY KEY, category TEXT NOT NULL)",
(),
)
.unwrap();
db.execute("CREATE INDEX idx_empty_cat ON empty_idx(category)", ())
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM empty_idx", ()).unwrap();
assert_eq!(count, 0);
db.execute("INSERT INTO empty_idx (id, category) VALUES (1, 'A')", ())
.unwrap();
let cat_count: i64 = db
.query_one("SELECT COUNT(*) FROM empty_idx WHERE category = 'A'", ())
.unwrap();
assert_eq!(
cat_count, 1,
"Index should work after recovery of empty table"
);
}
#[test]
fn test_null_values_survive_wal_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE null_test (id INTEGER PRIMARY KEY, name TEXT, score INTEGER, active BOOLEAN)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_test (id, name, score, active) VALUES (1, 'alice', 100, TRUE)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_test (id, name, score, active) VALUES (2, NULL, NULL, NULL)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_test (id, name, score, active) VALUES (3, 'charlie', NULL, TRUE)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_test (id, name, score, active) VALUES (4, NULL, 50, NULL)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM null_test", ()).unwrap();
assert_eq!(total, 4, "All 4 rows must survive");
let null_names: i64 = db
.query_one("SELECT COUNT(*) FROM null_test WHERE name IS NULL", ())
.unwrap();
assert_eq!(null_names, 2, "Rows 2 and 4 should have NULL name");
let null_scores: i64 = db
.query_one("SELECT COUNT(*) FROM null_test WHERE score IS NULL", ())
.unwrap();
assert_eq!(null_scores, 2, "Rows 2 and 3 should have NULL score");
let null_active: i64 = db
.query_one("SELECT COUNT(*) FROM null_test WHERE active IS NULL", ())
.unwrap();
assert_eq!(null_active, 2, "Rows 2 and 4 should have NULL active");
let alice_score: i64 = db
.query_one("SELECT score FROM null_test WHERE id = 1", ())
.unwrap();
assert_eq!(alice_score, 100);
let charlie_active: bool = db
.query_one("SELECT active FROM null_test WHERE id = 3", ())
.unwrap();
assert!(charlie_active);
}
#[test]
fn test_null_values_survive_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE null_snap (id INTEGER PRIMARY KEY, val TEXT, num FLOAT)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_snap (id, val, num) VALUES (1, 'hello', 3.14)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_snap (id, val, num) VALUES (2, NULL, NULL)",
(),
)
.unwrap();
db.execute(
"INSERT INTO null_snap (id, val, num) VALUES (3, NULL, 2.71)",
(),
)
.unwrap();
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM null_snap", ()).unwrap();
assert_eq!(total, 3);
let null_vals: i64 = db
.query_one("SELECT COUNT(*) FROM null_snap WHERE val IS NULL", ())
.unwrap();
assert_eq!(null_vals, 2, "NULLs must survive volume serialization");
let null_nums: i64 = db
.query_one("SELECT COUNT(*) FROM null_snap WHERE num IS NULL", ())
.unwrap();
assert_eq!(null_nums, 1, "Row 2 should have NULL num");
let val: f64 = db
.query_one("SELECT num FROM null_snap WHERE id = 3", ())
.unwrap();
assert!(
(val - 2.71).abs() < 0.001,
"Non-NULL float must be preserved"
);
}
#[test]
fn test_multiple_crash_recovery_cycles() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE cycle_test (id INTEGER PRIMARY KEY, cycle INTEGER NOT NULL, value TEXT)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO cycle_test (id, cycle, value) VALUES ({}, 1, 'cycle1_row_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
if !wal_files.is_empty() {
let wal_path = &wal_files[wal_files.len() - 1];
let data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 2 {
let last = &entries[entries.len() - 1];
let truncated = &data[..last.offset];
fs::write(wal_path, truncated).unwrap();
}
}
let cycle1_count: i64;
{
let db = Database::open(&dsn).unwrap();
cycle1_count = db.query_one("SELECT COUNT(*) FROM cycle_test", ()).unwrap();
assert!(
cycle1_count > 0,
"Should recover at least some cycle 1 data"
);
for i in 100..=110 {
db.execute(
&format!(
"INSERT INTO cycle_test (id, cycle, value) VALUES ({}, 2, 'cycle2_row_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
if !wal_files.is_empty() {
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let entries = find_entry_boundaries(&data);
if entries.len() >= 3 {
let mid = &entries[entries.len() / 2];
if mid.data_offset + 5 < data.len() {
flip_bit(&mut data, mid.data_offset + 5, 3);
fs::write(wal_path, &data).unwrap();
}
}
}
let cycle2_count: i64;
{
let db = Database::open(&dsn).unwrap();
cycle2_count = db.query_one("SELECT COUNT(*) FROM cycle_test", ()).unwrap();
assert!(
cycle2_count >= cycle1_count,
"Cycle 3 recovery ({}) should have at least cycle 1 data ({})",
cycle2_count,
cycle1_count
);
for i in 200..=205 {
db.execute(
&format!(
"INSERT INTO cycle_test (id, cycle, value) VALUES ({}, 3, 'cycle3_row_{}')",
i, i
),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let final_count: i64 = db.query_one("SELECT COUNT(*) FROM cycle_test", ()).unwrap();
assert!(
final_count >= cycle2_count + 6,
"Final count ({}) should include cycle 3 inserts (>= {})",
final_count,
cycle2_count + 6
);
let c3: i64 = db
.query_one("SELECT COUNT(*) FROM cycle_test WHERE cycle = 3", ())
.unwrap();
assert_eq!(
c3, 6,
"All 6 cycle 3 rows must survive (no corruption applied)"
);
db.execute(
"INSERT INTO cycle_test (id, cycle, value) VALUES (999, 4, 'final')",
(),
)
.unwrap();
}
}
#[test]
fn test_crash_recovery_with_checkpoint_between_cycles() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE snap_cycle (id INTEGER PRIMARY KEY, phase INTEGER NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!("INSERT INTO snap_cycle (id, phase) VALUES ({}, 1)", i),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
}
remove_lock_file(&db_path);
let wal_files = find_wal_files(&db_path);
if !wal_files.is_empty() {
let wal_path = &wal_files[wal_files.len() - 1];
let mut data = fs::read(wal_path).unwrap();
let file_len = data.len();
if file_len > 4096 {
let last_page = (file_len - 1) / 4096;
zero_page(&mut data, last_page);
fs::write(wal_path, &data).unwrap();
}
}
{
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM snap_cycle", ()).unwrap();
assert!(
count >= 20,
"Volumes should guarantee at least 20 rows, got {}",
count
);
for i in 100..=110 {
db.execute(
&format!("INSERT INTO snap_cycle (id, phase) VALUES ({}, 2)", i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
{
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM snap_cycle", ()).unwrap();
assert!(
total >= 31,
"Should have at least 20 (volumes) + 11 (cycle 2) = 31 rows, got {}",
total
);
let phase2: i64 = db
.query_one("SELECT COUNT(*) FROM snap_cycle WHERE phase = 2", ())
.unwrap();
assert_eq!(
phase2, 11,
"All cycle 2 inserts must survive clean recovery"
);
}
}
#[test]
fn test_default_constraint_after_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE def_test (id INTEGER PRIMARY KEY, name TEXT NOT NULL, status TEXT DEFAULT 'pending', priority INTEGER DEFAULT 0)",
(),
)
.unwrap();
db.execute("INSERT INTO def_test (id, name) VALUES (1, 'task_1')", ())
.unwrap();
db.execute(
"INSERT INTO def_test (id, name, status, priority) VALUES (2, 'task_2', 'active', 5)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM def_test", ()).unwrap();
assert_eq!(count, 2);
db.execute("INSERT INTO def_test (id, name) VALUES (3, 'task_3')", ())
.unwrap();
let status: String = db
.query_one("SELECT status FROM def_test WHERE id = 3", ())
.unwrap();
assert_eq!(
status, "pending",
"DEFAULT value must be applied after recovery"
);
let priority: i64 = db
.query_one("SELECT priority FROM def_test WHERE id = 3", ())
.unwrap();
assert_eq!(
priority, 0,
"DEFAULT integer value must be applied after recovery"
);
}
#[test]
fn test_boolean_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE bool_test (id INTEGER PRIMARY KEY, flag BOOLEAN NOT NULL, optional_flag BOOLEAN)",
(),
)
.unwrap();
db.execute(
"INSERT INTO bool_test (id, flag, optional_flag) VALUES (1, TRUE, FALSE)",
(),
)
.unwrap();
db.execute(
"INSERT INTO bool_test (id, flag, optional_flag) VALUES (2, FALSE, TRUE)",
(),
)
.unwrap();
db.execute(
"INSERT INTO bool_test (id, flag, optional_flag) VALUES (3, TRUE, NULL)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM bool_test", ()).unwrap();
assert_eq!(count, 3);
let true_count: i64 = db
.query_one("SELECT COUNT(*) FROM bool_test WHERE flag = TRUE", ())
.unwrap();
assert_eq!(true_count, 2, "TRUE values must survive recovery");
let false_count: i64 = db
.query_one("SELECT COUNT(*) FROM bool_test WHERE flag = FALSE", ())
.unwrap();
assert_eq!(false_count, 1, "FALSE values must survive recovery");
let null_opt: i64 = db
.query_one(
"SELECT COUNT(*) FROM bool_test WHERE optional_flag IS NULL",
(),
)
.unwrap();
assert_eq!(null_opt, 1, "NULL boolean must survive recovery");
}
#[test]
fn test_timestamp_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE ts_test (id INTEGER PRIMARY KEY, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP)",
(),
)
.unwrap();
db.execute(
"INSERT INTO ts_test (id, created_at, updated_at) VALUES (1, '2024-01-15 10:30:00', '2024-06-20 14:00:00')",
(),
)
.unwrap();
db.execute(
"INSERT INTO ts_test (id, created_at, updated_at) VALUES (2, '2024-12-31 23:59:59', NULL)",
(),
)
.unwrap();
db.execute(
"INSERT INTO ts_test (id, created_at, updated_at) VALUES (3, '2020-01-01 00:00:00', '2020-01-01 00:00:01')",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM ts_test", ()).unwrap();
assert_eq!(count, 3);
let ordered_count: i64 = db
.query_one(
"SELECT COUNT(*) FROM ts_test WHERE created_at >= '2024-01-01 00:00:00'",
(),
)
.unwrap();
assert_eq!(
ordered_count, 2,
"Timestamp comparison must work after recovery"
);
let null_updated: i64 = db
.query_one("SELECT COUNT(*) FROM ts_test WHERE updated_at IS NULL", ())
.unwrap();
assert_eq!(null_updated, 1, "NULL timestamp must survive recovery");
let year: i64 = db
.query_one(
"SELECT EXTRACT(YEAR FROM created_at) FROM ts_test WHERE id = 2",
(),
)
.unwrap();
assert_eq!(year, 2024, "EXTRACT from recovered timestamp must work");
}
#[test]
fn test_update_after_checkpoint_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}?checkpoint_on_close=off", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE upd_snap (id INTEGER PRIMARY KEY, value TEXT NOT NULL, version INTEGER)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO upd_snap (id, value, version) VALUES ({}, 'original_{}', 1)",
i, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA CHECKPOINT", ()).unwrap();
db.execute(
"UPDATE upd_snap SET value = 'updated_5', version = 2 WHERE id = 5",
(),
)
.unwrap();
db.execute(
"UPDATE upd_snap SET value = 'updated_10', version = 2 WHERE id = 10",
(),
)
.unwrap();
db.execute("DELETE FROM upd_snap WHERE id = 3", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let total: i64 = db.query_one("SELECT COUNT(*) FROM upd_snap", ()).unwrap();
assert_eq!(total, 9, "10 inserted - 1 deleted = 9 rows");
let v5: String = db
.query_one("SELECT value FROM upd_snap WHERE id = 5", ())
.unwrap();
assert_eq!(v5, "updated_5", "UPDATE must be replayed from WAL");
let v10: String = db
.query_one("SELECT value FROM upd_snap WHERE id = 10", ())
.unwrap();
assert_eq!(v10, "updated_10");
let deleted: i64 = db
.query_one("SELECT COUNT(*) FROM upd_snap WHERE id = 3", ())
.unwrap();
assert_eq!(deleted, 0, "DELETE must be replayed from WAL");
let v1: String = db
.query_one("SELECT value FROM upd_snap WHERE id = 1", ())
.unwrap();
assert_eq!(v1, "original_1");
}
#[test]
fn test_multi_column_unique_index_enforced_after_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE mc_unique (id INTEGER PRIMARY KEY, a TEXT NOT NULL, b TEXT NOT NULL, data TEXT)",
(),
)
.unwrap();
db.execute("CREATE UNIQUE INDEX idx_mc_ab ON mc_unique(a, b)", ())
.unwrap();
db.execute(
"INSERT INTO mc_unique (id, a, b, data) VALUES (1, 'x', 'y', 'first')",
(),
)
.unwrap();
db.execute(
"INSERT INTO mc_unique (id, a, b, data) VALUES (2, 'x', 'z', 'second')",
(),
)
.unwrap();
db.execute(
"INSERT INTO mc_unique (id, a, b, data) VALUES (3, 'w', 'y', 'third')",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM mc_unique", ()).unwrap();
assert_eq!(count, 3);
let result = db.execute(
"INSERT INTO mc_unique (id, a, b, data) VALUES (4, 'x', 'y', 'duplicate')",
(),
);
assert!(
result.is_err(),
"Multi-column UNIQUE constraint must be enforced after recovery"
);
db.execute(
"INSERT INTO mc_unique (id, a, b, data) VALUES (4, 'x', 'w', 'allowed')",
(),
)
.unwrap();
}
#[test]
fn test_alter_table_modify_column_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE modify_test (id INTEGER PRIMARY KEY, score INTEGER NOT NULL, label TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=10 {
db.execute(
&format!(
"INSERT INTO modify_test (id, score, label) VALUES ({}, {}, 'item_{}')",
i,
i * 10,
i
),
(),
)
.unwrap();
}
db.execute("ALTER TABLE modify_test MODIFY COLUMN score INTEGER", ())
.unwrap();
db.execute(
"INSERT INTO modify_test (id, score, label) VALUES (11, NULL, 'null_score')",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM modify_test", ())
.unwrap();
assert_eq!(count, 11, "All 11 rows should survive after recovery");
let score: i64 = db
.query_one("SELECT score FROM modify_test WHERE id = 5", ())
.unwrap();
assert_eq!(score, 50, "Original score values must be preserved");
let null_count: i64 = db
.query_one("SELECT COUNT(*) FROM modify_test WHERE score IS NULL", ())
.unwrap();
assert_eq!(
null_count, 1,
"NULL score row must survive — MODIFY COLUMN nullable change must persist"
);
db.execute(
"INSERT INTO modify_test (id, score, label) VALUES (12, NULL, 'post_recovery_null')",
(),
)
.unwrap();
let null_count2: i64 = db
.query_one("SELECT COUNT(*) FROM modify_test WHERE score IS NULL", ())
.unwrap();
assert_eq!(
null_count2, 2,
"Post-recovery NULL insert must work after MODIFY COLUMN"
);
}
#[test]
fn test_alter_table_modify_column_with_snapshot() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE modify_snap (id INTEGER PRIMARY KEY, value TEXT NOT NULL, flag BOOLEAN NOT NULL)",
(),
)
.unwrap();
for i in 1..=20 {
db.execute(
&format!(
"INSERT INTO modify_snap (id, value, flag) VALUES ({}, 'v_{}', {})",
i,
i,
if i % 2 == 0 { "TRUE" } else { "FALSE" }
),
(),
)
.unwrap();
}
db.execute("PRAGMA snapshot", ()).unwrap();
db.execute("ALTER TABLE modify_snap MODIFY COLUMN value TEXT", ())
.unwrap();
db.execute("ALTER TABLE modify_snap MODIFY COLUMN flag BOOLEAN", ())
.unwrap();
db.execute(
"INSERT INTO modify_snap (id, value, flag) VALUES (21, NULL, NULL)",
(),
)
.unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM modify_snap", ())
.unwrap();
assert_eq!(count, 21, "All 21 rows must survive snapshot + WAL replay");
let null_value_count: i64 = db
.query_one("SELECT COUNT(*) FROM modify_snap WHERE value IS NULL", ())
.unwrap();
assert_eq!(
null_value_count, 1,
"NULL value row must survive — MODIFY COLUMN must replay over snapshot"
);
let null_flag_count: i64 = db
.query_one("SELECT COUNT(*) FROM modify_snap WHERE flag IS NULL", ())
.unwrap();
assert_eq!(
null_flag_count, 1,
"NULL flag row must survive — MODIFY COLUMN must replay over snapshot"
);
db.execute(
"INSERT INTO modify_snap (id, value, flag) VALUES (22, NULL, TRUE)",
(),
)
.unwrap();
db.execute(
"INSERT INTO modify_snap (id, value, flag) VALUES (23, 'hello', NULL)",
(),
)
.unwrap();
}
#[test]
fn test_bitmap_index_durability() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE bitmap_test (id INTEGER PRIMARY KEY, active BOOLEAN NOT NULL, category TEXT NOT NULL)",
(),
)
.unwrap();
for i in 1..=50 {
db.execute(
&format!(
"INSERT INTO bitmap_test (id, active, category) VALUES ({}, {}, '{}')",
i,
if i % 3 == 0 { "TRUE" } else { "FALSE" },
if i % 4 == 0 {
"A"
} else if i % 4 == 1 {
"B"
} else if i % 4 == 2 {
"C"
} else {
"D"
}
),
(),
)
.unwrap();
}
db.execute(
"CREATE INDEX idx_active_bitmap ON bitmap_test(active) USING BITMAP",
(),
)
.unwrap();
db.execute(
"CREATE INDEX idx_cat_bitmap ON bitmap_test(category) USING BITMAP",
(),
)
.unwrap();
let active_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE active = TRUE", ())
.unwrap();
assert_eq!(active_count, 16);
let cat_a_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE category = 'A'", ())
.unwrap();
assert_eq!(cat_a_count, 12); }
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test", ())
.unwrap();
assert_eq!(count, 50, "All 50 rows must survive recovery");
let active_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE active = TRUE", ())
.unwrap();
assert_eq!(
active_count, 16,
"Bitmap index on active must return correct results after recovery"
);
let inactive_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE active = FALSE", ())
.unwrap();
assert_eq!(
inactive_count, 34,
"Bitmap index on active=FALSE must return correct results after recovery"
);
let cat_b_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE category = 'B'", ())
.unwrap();
assert_eq!(
cat_b_count, 13,
"Bitmap index on category must return correct results after recovery"
);
db.execute(
"INSERT INTO bitmap_test (id, active, category) VALUES (51, TRUE, 'A')",
(),
)
.unwrap();
let new_active_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE active = TRUE", ())
.unwrap();
assert_eq!(
new_active_count, 17,
"Bitmap index must handle post-recovery inserts"
);
let new_cat_a_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_test WHERE category = 'A'", ())
.unwrap();
assert_eq!(
new_cat_a_count, 13,
"Bitmap index on category must handle post-recovery inserts"
);
}
#[test]
fn test_bitmap_index_with_snapshot_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE bitmap_snap (id INTEGER PRIMARY KEY, status BOOLEAN NOT NULL)",
(),
)
.unwrap();
for i in 1..=30 {
db.execute(
&format!(
"INSERT INTO bitmap_snap (id, status) VALUES ({}, {})",
i,
if i % 2 == 0 { "TRUE" } else { "FALSE" }
),
(),
)
.unwrap();
}
db.execute("PRAGMA snapshot", ()).unwrap();
db.execute(
"CREATE INDEX idx_status_bm ON bitmap_snap(status) USING BITMAP",
(),
)
.unwrap();
for i in 31..=40 {
db.execute(
&format!("INSERT INTO bitmap_snap (id, status) VALUES ({}, TRUE)", i),
(),
)
.unwrap();
}
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_snap", ())
.unwrap();
assert_eq!(count, 40, "All 40 rows must survive snapshot + WAL replay");
let true_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_snap WHERE status = TRUE", ())
.unwrap();
assert_eq!(
true_count, 25,
"Bitmap index must return correct results after snapshot + WAL replay"
);
let false_count: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_snap WHERE status = FALSE", ())
.unwrap();
assert_eq!(
false_count, 15,
"Bitmap index must return correct FALSE count after recovery"
);
db.execute("DROP INDEX idx_status_bm ON bitmap_snap", ())
.unwrap();
db.execute(
"CREATE INDEX idx_status_bm ON bitmap_snap(status) USING BITMAP",
(),
)
.unwrap();
let recheck: i64 = db
.query_one("SELECT COUNT(*) FROM bitmap_snap WHERE status = TRUE", ())
.unwrap();
assert_eq!(recheck, 25, "Recreated bitmap index must work correctly");
}
#[test]
fn test_foreign_key_with_snapshot_recovery() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE fk_parent (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE fk_child (id INTEGER PRIMARY KEY, parent_id INTEGER REFERENCES fk_parent(id) ON DELETE CASCADE, val TEXT)",
(),
)
.unwrap();
for i in 1..=5 {
db.execute(
&format!("INSERT INTO fk_parent (id, name) VALUES ({}, 'p{}')", i, i),
(),
)
.unwrap();
}
for i in 1..=10 {
let parent_id = (i % 5) + 1;
db.execute(
&format!(
"INSERT INTO fk_child (id, parent_id, val) VALUES ({}, {}, 'c{}')",
i, parent_id, i
),
(),
)
.unwrap();
}
db.execute("PRAGMA snapshot", ()).unwrap();
db.execute("INSERT INTO fk_parent (id, name) VALUES (100, 'extra')", ())
.unwrap();
db.execute("PRAGMA snapshot", ()).unwrap();
}
remove_lock_file(&db_path);
let db = Database::open(&dsn).unwrap();
let parent_count: i64 = db.query_one("SELECT COUNT(*) FROM fk_parent", ()).unwrap();
assert_eq!(parent_count, 6, "All 6 parent rows must survive");
let child_count: i64 = db.query_one("SELECT COUNT(*) FROM fk_child", ()).unwrap();
assert_eq!(child_count, 10, "All 10 child rows must survive");
let err = db.execute(
"INSERT INTO fk_child (id, parent_id, val) VALUES (99, 999, 'bad')",
(),
);
assert!(
err.is_err(),
"FK constraint must be enforced after snapshot recovery"
);
db.execute("DELETE FROM fk_child WHERE parent_id = 100", ())
.unwrap_or_default(); db.execute("DELETE FROM fk_parent WHERE id = 1", ())
.unwrap();
let remaining: i64 = db
.query_one("SELECT COUNT(*) FROM fk_child WHERE parent_id = 1", ())
.unwrap();
assert_eq!(
remaining, 0,
"CASCADE DELETE must work after snapshot recovery"
);
db.execute(
"INSERT INTO fk_child (id, parent_id, val) VALUES (50, 2, 'valid')",
(),
)
.unwrap();
}
#[test]
fn test_drop_parent_strips_child_fk_on_recovery() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("fk_drop_recovery.db");
let dsn = format!("file://{}", db_path.display());
{
let db = Database::open(&dsn).unwrap();
db.execute(
"CREATE TABLE parent_drop (id INTEGER PRIMARY KEY, name TEXT)",
(),
)
.unwrap();
db.execute(
"CREATE TABLE child_drop (
id INTEGER PRIMARY KEY,
pid INTEGER REFERENCES parent_drop(id),
val TEXT
)",
(),
)
.unwrap();
db.execute("INSERT INTO parent_drop VALUES (1, 'Alice')", ())
.unwrap();
db.execute("INSERT INTO child_drop VALUES (1, NULL, 'x')", ())
.unwrap();
db.execute("DROP TABLE parent_drop", ()).unwrap();
db.execute("INSERT INTO child_drop VALUES (2, 999, 'y')", ())
.unwrap();
}
{
let db = Database::open(&dsn).unwrap();
let err = db.execute("SELECT * FROM parent_drop", ());
assert!(err.is_err(), "parent_drop should not exist after recovery");
let count: i64 = db.query_one("SELECT COUNT(*) FROM child_drop", ()).unwrap();
assert_eq!(count, 2, "child_drop should have 2 rows");
db.execute("INSERT INTO child_drop VALUES (3, 12345, 'z')", ())
.unwrap();
let count: i64 = db.query_one("SELECT COUNT(*) FROM child_drop", ()).unwrap();
assert_eq!(count, 3);
}
}