use std::{path::Path, time::Duration};
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "auto_flush".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_row(i: i64) -> Row {
let payload = vec![b'x'; 512];
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(Bytes::from(payload))),
])
}
fn tiny_memtable_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 4 * 1024,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
}
}
fn count_parquet_files(dir: &Path) -> usize {
if !dir.exists() {
return 0;
}
std::fs::read_dir(dir)
.map(|iter| {
iter.flatten()
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("parquet"))
.count()
})
.unwrap_or(0)
}
#[tokio::test]
async fn auto_flush_produces_l0_file_without_explicit_flush() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(tiny_memtable_config(&tmp)).await.unwrap();
let l0_dir = tmp.path().join("data").join("L0");
for i in 1..=200i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
if count_parquet_files(&l0_dir) > 0 {
break;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"auto-flush never produced an L0 Parquet file within 5s: \
dir={l0_dir:?}, memtable never rotated, engine has silently \
skipped backpressure"
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
for i in 1..=200i64 {
let got = engine
.get(&[FieldValue::Int64(i)])
.unwrap()
.unwrap_or_else(|| panic!("row {i} missing after auto-flush"));
assert_eq!(got.get(0), Some(&FieldValue::Int64(i)));
match got.get(1) {
Some(FieldValue::Bytes(b)) => {
assert_eq!(b.len(), 512, "payload length mismatch at id {i}");
}
other => panic!("id {i}: payload not bytes: {other:?}"),
}
}
}
#[tokio::test]
async fn auto_flush_produces_multiple_l0_files_under_sustained_write() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(tiny_memtable_config(&tmp)).await.unwrap();
let l0_dir = tmp.path().join("data").join("L0");
for i in 1..=1000i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
while tokio::time::Instant::now() < deadline {
if count_parquet_files(&l0_dir) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let n_files = count_parquet_files(&l0_dir);
assert!(
n_files >= 2,
"expected ≥2 L0 Parquet files from sustained auto-flush; got {n_files}. \
This means the memtable never rotated under backpressure."
);
for i in 1..=1000i64 {
assert!(
engine.get(&[FieldValue::Int64(i)]).unwrap().is_some(),
"row {i} missing after sustained auto-flush"
);
}
}