use std::{path::Path, time::Duration};
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "wal_gc".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_row(i: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(Bytes::from(format!("payload_{i:04}")))),
])
}
fn test_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
}
}
fn count_wal_files(dir: &Path) -> usize {
if !dir.exists() {
return 0;
}
std::fs::read_dir(dir)
.map(|iter| {
iter.flatten()
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("wal"))
.count()
})
.unwrap_or(0)
}
fn sum_wal_bytes(dir: &Path) -> u64 {
if !dir.exists() {
return 0;
}
std::fs::read_dir(dir)
.map(|iter| {
iter.flatten()
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("wal"))
.map(|e| e.metadata().map(|m| m.len()).unwrap_or(0))
.sum()
})
.unwrap_or(0)
}
#[tokio::test]
async fn explicit_flush_rotates_and_gcs_wal() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
let wal_dir = tmp.path().join("wal");
for i in 1..=50i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
assert_eq!(
count_wal_files(&wal_dir),
1,
"pre-flush expected exactly one active WAL file"
);
let pre_flush_bytes = sum_wal_bytes(&wal_dir);
assert!(pre_flush_bytes > 0);
engine.flush().await.unwrap();
let post_files = count_wal_files(&wal_dir);
assert_eq!(
post_files, 1,
"post-flush expected exactly one (fresh) WAL file, got {post_files}"
);
let post_bytes = sum_wal_bytes(&wal_dir);
assert!(
post_bytes < pre_flush_bytes,
"post-flush WAL ({post_bytes}B) should be smaller than pre-flush ({pre_flush_bytes}B): \
old log was not GC'd"
);
}
#[tokio::test]
async fn repeated_flushes_do_not_accumulate_wal_files() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
let wal_dir = tmp.path().join("wal");
for batch in 0..6 {
for i in 1..=10i64 {
let id = batch * 10 + i;
engine
.put(vec![FieldValue::Int64(id)], make_row(id))
.await
.unwrap();
}
engine.flush().await.unwrap();
}
let n = count_wal_files(&wal_dir);
assert!(
n <= 1,
"WAL dir should hold ≤1 file after repeated flush/GC cycles; got {n} files"
);
}
#[tokio::test]
async fn auto_flush_gcs_wal_files() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 4 * 1024;
let engine = MeruEngine::open(config).await.unwrap();
let wal_dir = tmp.path().join("wal");
for i in 1..=500i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
while tokio::time::Instant::now() < deadline {
if count_wal_files(&wal_dir) <= 3 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let n = count_wal_files(&wal_dir);
assert!(
n <= 3,
"auto-flush path leaked WAL files: {n} files left in {wal_dir:?} \
(expected ≤3 mid-flight, ideally 1 steady state)"
);
engine.flush().await.unwrap();
let final_n = count_wal_files(&wal_dir);
assert_eq!(
final_n, 1,
"post-explicit-flush steady state: exactly one WAL file, got {final_n}"
);
}
#[tokio::test]
async fn recovery_does_not_replay_flushed_batches() {
let tmp = tempfile::tempdir().unwrap();
let wal_dir = tmp.path().join("wal");
{
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 1..=20i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
engine.flush().await.unwrap();
}
assert_eq!(
count_wal_files(&wal_dir),
1,
"expected exactly one WAL file after flush before reopen"
);
let pre_reopen_bytes = sum_wal_bytes(&wal_dir);
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 1..=20i64 {
let row = engine
.get(&[FieldValue::Int64(i)])
.unwrap()
.unwrap_or_else(|| panic!("row {i} missing after reopen"));
assert_eq!(row.get(0), Some(&FieldValue::Int64(i)));
}
let post_reopen = count_wal_files(&wal_dir);
assert!(
post_reopen <= 2,
"expected ≤2 WAL files after reopen, got {post_reopen}"
);
let _ = pre_reopen_bytes;
}