use ironwal::sharded::ShardedWal;
use ironwal::{SyncMode, WalOptions};
use std::fs;
use tempfile::TempDir;
fn setup(shard_count: u16) -> (ShardedWal, TempDir) {
let dir = TempDir::new().unwrap();
let mut opts = WalOptions::default();
opts.root_path = dir.path().to_path_buf();
opts.sync_mode = SyncMode::Strict;
opts.max_segment_size = 2_000; let wal = ShardedWal::new(opts, shard_count).unwrap();
(wal, dir)
}
#[test]
fn test_prune_before_checkpoint() {
let (wal, _dir) = setup(4);
for i in 0..100 {
let data = vec![0xAB; 200];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
wal.create_checkpoint(b"midpoint").unwrap();
for i in 100..300 {
let data = vec![0xCD; 200];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
let stats = wal.prune_before_checkpoint(b"midpoint").unwrap();
println!(
"Pruned {} segments from {} shards",
stats.segments_deleted, stats.shards_pruned
);
assert!(
stats.segments_deleted > 0,
"Expected segments to be deleted"
);
let checkpoint = wal.load_checkpoint(b"midpoint").unwrap();
for shard_id in 0..4 {
let offset = checkpoint.offsets[shard_id as usize];
if offset > 0 {
let mut iter = wal.iter_shard(shard_id, offset).unwrap();
let _first = iter.next();
}
}
}
#[test]
fn test_prune_nonexistent_checkpoint() {
let (wal, _dir) = setup(4);
wal.append(b"key", b"data").unwrap();
let result = wal.prune_before_checkpoint(b"does_not_exist");
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
ironwal::Error::CheckpointNotFound(_)
));
}
#[test]
fn test_prune_empty_checkpoint() {
let (wal, _dir) = setup(4);
wal.create_checkpoint(b"empty").unwrap();
let stats = wal.prune_before_checkpoint(b"empty").unwrap();
assert_eq!(stats.segments_deleted, 0);
}
#[test]
fn test_compact_checkpoints() {
let (wal, _dir) = setup(4);
for i in 0..10 {
wal
.append(format!("key_{}", i).as_bytes(), b"data")
.unwrap();
let checkpoint_id = format!("ckpt_{}", i);
wal.create_checkpoint(checkpoint_id.as_bytes()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
let stats = wal.compact_checkpoints(3).unwrap();
assert_eq!(stats.checkpoints_before, 10);
assert_eq!(stats.checkpoints_after, 3);
assert!(stats.bytes_reclaimed > 0);
for i in 0..7 {
let checkpoint_id = format!("ckpt_{}", i);
assert!(wal.load_checkpoint(checkpoint_id.as_bytes()).is_err());
}
for i in 7..10 {
let checkpoint_id = format!("ckpt_{}", i);
assert!(wal.load_checkpoint(checkpoint_id.as_bytes()).is_ok());
}
}
#[test]
fn test_compact_keep_more_than_exists() {
let (wal, _dir) = setup(4);
for i in 0..5 {
let checkpoint_id = format!("ckpt_{}", i);
wal.create_checkpoint(checkpoint_id.as_bytes()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
let stats = wal.compact_checkpoints(10).unwrap();
assert_eq!(stats.checkpoints_before, 5);
assert_eq!(stats.checkpoints_after, 5);
for i in 0..5 {
let checkpoint_id = format!("ckpt_{}", i);
assert!(wal.load_checkpoint(checkpoint_id.as_bytes()).is_ok());
}
}
#[test]
fn test_prune_then_write() {
let (wal, _dir) = setup(4);
for i in 0..100 {
let data = vec![i as u8; 100];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
wal.create_checkpoint(b"before_prune").unwrap();
wal.prune_before_checkpoint(b"before_prune").unwrap();
for i in 100..200 {
let data = vec![i as u8; 100];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
wal.create_checkpoint(b"after_prune").unwrap();
let checkpoint = wal.load_checkpoint(b"after_prune").unwrap();
let total: u64 = checkpoint.offsets.iter().sum();
assert_eq!(total, 200, "Should have 200 total entries");
}
#[test]
fn test_prune_restart_prune() {
let dir = TempDir::new().unwrap();
let root = dir.path().to_path_buf();
{
let mut opts = WalOptions::default();
opts.root_path = root.clone();
opts.sync_mode = SyncMode::Strict;
opts.max_segment_size = 10_000;
let wal = ShardedWal::new(opts, 4).unwrap();
for i in 0..300 {
let data = vec![0xAB; 100];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
wal.create_checkpoint(b"ckpt_1").unwrap();
for i in 300..600 {
let data = vec![0xCD; 100];
wal.append(format!("key_{}", i).as_bytes(), &data).unwrap();
}
wal.create_checkpoint(b"ckpt_2").unwrap();
}
{
let mut opts = WalOptions::default();
opts.root_path = root;
opts.sync_mode = SyncMode::Strict;
opts.max_segment_size = 10_000;
let wal = ShardedWal::new(opts, 4).unwrap();
let stats = wal.prune_before_checkpoint(b"ckpt_2").unwrap();
assert!(stats.segments_deleted > 0);
let checkpoint = wal.load_checkpoint(b"ckpt_2").unwrap();
for shard_id in 0..4 {
let offset = checkpoint.offsets[shard_id as usize];
let _iter = wal.iter_shard(shard_id, offset).unwrap();
}
}
}
#[test]
fn test_compact_after_restart() {
let dir = TempDir::new().unwrap();
let root = dir.path().to_path_buf();
{
let mut opts = WalOptions::default();
opts.root_path = root.clone();
let wal = ShardedWal::new(opts, 4).unwrap();
for i in 0..10 {
let checkpoint_id = format!("ckpt_{}", i);
wal.create_checkpoint(checkpoint_id.as_bytes()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
{
let mut opts = WalOptions::default();
opts.root_path = root;
let wal = ShardedWal::new(opts, 4).unwrap();
let stats = wal.compact_checkpoints(3).unwrap();
assert_eq!(stats.checkpoints_before, 10);
assert_eq!(stats.checkpoints_after, 3);
for i in 7..10 {
let checkpoint_id = format!("ckpt_{}", i);
assert!(wal.load_checkpoint(checkpoint_id.as_bytes()).is_ok());
}
}
}