use std::{fs, sync::Arc, time::Duration};
use arrow_schema::{DataType, Field, Schema};
use fusio::{
DynFs, disk::LocalFs, executor::tokio::TokioExecutor, fs::FsCas, mem::fs::InMemoryFs,
path::Path,
};
use typed_arrow_dyn::{DynCell, DynRow};
use super::common::{wal_segment_paths, workspace_temp_dir};
use crate::{
db::{DB, DbInner},
inmem::policy::BatchesThreshold,
mode::DynModeConfig,
ondisk::sstable::{SsTableConfig, SsTableDescriptor, SsTableId},
schema::SchemaBuilder,
test::build_batch,
wal::{WalConfig as RuntimeWalConfig, WalExt, WalSyncPolicy, state::FsWalStateStore},
};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wal_gc_smoke_prunes_segments_after_flush() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("wal-gc-smoke");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let mode_config = DynModeConfig::from_key_name(schema.clone(), "id")?;
let executor = Arc::new(TokioExecutor::default());
let namespace = temp_root
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("wal-gc-smoke");
let mut db: DbInner<InMemoryFs, TokioExecutor> =
DB::<InMemoryFs, TokioExecutor>::builder(mode_config)
.in_memory(namespace.to_string())?
.disable_minor_compaction()
.open_with_executor(Arc::clone(&executor))
.await?
.into_inner();
let wal_local_fs = Arc::new(LocalFs {});
let wal_dyn_fs: Arc<dyn DynFs> = wal_local_fs.clone();
let wal_cas: Arc<dyn FsCas> = wal_local_fs.clone();
let wal_dir = temp_root.join("wal");
fs::create_dir_all(&wal_dir)?;
let wal_path = Path::from_filesystem_path(&wal_dir)?;
let wal_cfg = RuntimeWalConfig {
dir: wal_path,
segment_backend: wal_dyn_fs,
state_store: Some(Arc::new(FsWalStateStore::new(wal_cas))),
segment_max_bytes: 1,
flush_interval: Duration::from_millis(1),
sync: WalSyncPolicy::Disabled,
..RuntimeWalConfig::default()
};
db.enable_wal(wal_cfg.clone()).await?;
db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
let sst_dir = temp_root.join("sst");
fs::create_dir_all(&sst_dir)?;
let sst_root = Path::from_filesystem_path(&sst_dir)?;
let sst_fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_cfg = Arc::new(SsTableConfig::new(schema.clone(), sst_fs, sst_root));
for idx in 0..4 {
let rows = vec![DynRow(vec![
Some(DynCell::Str(format!("user-{idx}"))),
Some(DynCell::I32(idx)),
])];
let batch = build_batch(schema.clone(), rows)?;
db.ingest(batch).await?;
}
assert!(
db.num_immutable_segments() >= 1,
"expected first seal to produce immutables"
);
let descriptor_a = SsTableDescriptor::new(SsTableId::new(99), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor_a)
.await?;
let floor_after_first = db
.manifest_wal_floor()
.await
.map(|ref_| ref_.seq())
.expect("manifest floor after first flush");
let first_prune_view = wal_segment_paths(&wal_dir);
assert!(
first_prune_view.len() >= 2,
"expected multiple WAL segments present after first flush"
);
for idx in 4..8 {
let rows = vec![DynRow(vec![
Some(DynCell::Str(format!("user-{idx}"))),
Some(DynCell::I32(idx)),
])];
let batch = build_batch(schema.clone(), rows)?;
db.ingest(batch).await?;
}
assert!(
db.num_immutable_segments() >= 1,
"expected second seal to produce immutables"
);
let descriptor_b = SsTableDescriptor::new(SsTableId::new(100), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor_b)
.await?;
let floor_after_second = db
.manifest_wal_floor()
.await
.map(|ref_| ref_.seq())
.expect("manifest floor after second flush");
assert!(
floor_after_second > floor_after_first,
"floor should advance after second flush"
);
let after = wal_segment_paths(&wal_dir);
let removed: Vec<_> = first_prune_view
.iter()
.filter(|path| !after.contains(path))
.collect();
assert!(
!removed.is_empty(),
"second flush should prune at least one WAL segment"
);
if let Some(handle) = db.wal().cloned() {
let metrics = handle.metrics();
let guard = metrics.read().await;
assert!(
guard.wal_segments_pruned >= removed.len() as u64,
"pruned segment count should be reflected in metrics"
);
assert!(
guard.wal_floor_advancements >= 2,
"floor should advance at least once per flush"
);
assert_eq!(
guard.wal_prune_dry_runs, 0,
"regular pruning should not increment dry-run counters"
);
}
db.disable_wal().await?;
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wal_gc_dry_run_reports_only() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("wal-gc-dry-run");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let mode_config = SchemaBuilder::from_schema(schema)
.primary_key("id")
.with_metadata()
.build()
.expect("key field");
let schema = Arc::clone(&mode_config.schema);
let executor = Arc::new(TokioExecutor::default());
let namespace = temp_root
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("wal-gc-dry-run");
let mut db: DbInner<InMemoryFs, TokioExecutor> =
DB::<InMemoryFs, TokioExecutor>::builder(mode_config)
.in_memory(namespace.to_string())?
.disable_minor_compaction()
.open_with_executor(Arc::clone(&executor))
.await?
.into_inner();
let wal_local_fs = Arc::new(LocalFs {});
let wal_dyn_fs: Arc<dyn DynFs> = wal_local_fs.clone();
let wal_cas: Arc<dyn FsCas> = wal_local_fs.clone();
let wal_dir = temp_root.join("wal");
fs::create_dir_all(&wal_dir)?;
let wal_path = Path::from_filesystem_path(&wal_dir)?;
let wal_cfg = RuntimeWalConfig {
dir: wal_path,
segment_backend: wal_dyn_fs,
state_store: Some(Arc::new(FsWalStateStore::new(wal_cas))),
segment_max_bytes: 1,
prune_dry_run: true,
..RuntimeWalConfig::default()
};
db.enable_wal(wal_cfg.clone()).await?;
db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
let sst_dir = temp_root.join("sst");
fs::create_dir_all(&sst_dir)?;
let sst_root = Path::from_filesystem_path(&sst_dir)?;
let sst_fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_cfg = Arc::new(SsTableConfig::new(schema.clone(), sst_fs, sst_root));
for idx in 0..4 {
let rows = vec![vec![
Some(DynCell::Str(format!("dry-{idx}"))),
Some(DynCell::I32(idx)),
]];
let batch = build_batch(schema.clone(), rows)?;
db.ingest(batch).await?;
}
let descriptor_a = SsTableDescriptor::new(SsTableId::new(991), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor_a)
.await?;
let before = wal_segment_paths(&wal_dir);
for idx in 4..8 {
let rows = vec![vec![
Some(DynCell::Str(format!("dry-{idx}"))),
Some(DynCell::I32(idx)),
]];
let batch = build_batch(schema.clone(), rows)?;
db.ingest(batch).await?;
}
let descriptor_b = SsTableDescriptor::new(SsTableId::new(992), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor_b)
.await?;
let after = wal_segment_paths(&wal_dir);
assert!(
after.len() >= before.len(),
"dry-run pruning should not delete WAL segments"
);
if let Some(handle) = db.wal().cloned() {
let metrics = handle.metrics();
let guard = metrics.read().await;
assert_eq!(guard.wal_segments_pruned, 0);
assert!(
guard.wal_prune_dry_runs > 0,
"dry-run pruning should report would-be deletions"
);
}
db.disable_wal().await?;
fs::remove_dir_all(&temp_root)?;
Ok(())
}