use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "engine_forensic".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_row(i: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(Bytes::from(format!("payload_{i:04}")))),
])
}
fn test_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
}
}
#[tokio::test]
async fn get_returns_flushed_row_from_l0_parquet() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 1..=20i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
engine.flush().await.unwrap();
for i in 1..=20i64 {
let row = engine
.get(&[FieldValue::Int64(i)])
.unwrap()
.unwrap_or_else(|| panic!("flushed row {i} must be readable via engine.get"));
assert_eq!(row.get(0), Some(&FieldValue::Int64(i)), "id at {i}");
assert_eq!(
row.get(1),
Some(&FieldValue::Bytes(Bytes::from(format!("payload_{i:04}")))),
"payload at {i}"
);
}
}
#[tokio::test]
async fn scan_merges_memtable_and_flushed_l0() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 1..=10i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
engine.flush().await.unwrap();
for i in 11..=15i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
let results = engine.scan(None, None).unwrap();
assert_eq!(
results.len(),
15,
"scan should merge 10 flushed rows + 5 memtable rows; got {}",
results.len()
);
let mut last_id: Option<i64> = None;
for (_ikey, row) in &results {
let id = match row.get(0) {
Some(FieldValue::Int64(v)) => *v,
other => panic!("unexpected id field: {other:?}"),
};
if let Some(prev) = last_id {
assert!(
prev < id,
"scan result not sorted ascending: {prev} before {id}"
);
}
last_id = Some(id);
let expected_payload = Bytes::from(format!("payload_{id:04}"));
assert_eq!(
row.get(1),
Some(&FieldValue::Bytes(expected_payload)),
"payload mismatch at id {id}"
);
}
let bounded = engine
.scan(
Some(&[FieldValue::Int64(5)]),
Some(&[FieldValue::Int64(13)]),
)
.unwrap();
let ids: Vec<i64> = bounded
.iter()
.map(|(_, r)| match r.get(0) {
Some(FieldValue::Int64(v)) => *v,
_ => panic!("non-int id"),
})
.collect();
assert_eq!(ids, vec![5, 6, 7, 8, 9, 10, 11, 12]);
}
#[tokio::test]
async fn memtable_shadows_flushed_older_version() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
engine
.put(
vec![FieldValue::Int64(7)],
Row::new(vec![
Some(FieldValue::Int64(7)),
Some(FieldValue::Bytes(Bytes::from("v1"))),
]),
)
.await
.unwrap();
engine.flush().await.unwrap();
engine
.put(
vec![FieldValue::Int64(7)],
Row::new(vec![
Some(FieldValue::Int64(7)),
Some(FieldValue::Bytes(Bytes::from("v2"))),
]),
)
.await
.unwrap();
let row = engine.get(&[FieldValue::Int64(7)]).unwrap().unwrap();
assert_eq!(
row.get(1),
Some(&FieldValue::Bytes(Bytes::from("v2"))),
"memtable v2 must shadow flushed v1"
);
let results = engine.scan(None, None).unwrap();
let sevens: Vec<_> = results
.iter()
.filter(|(_, r)| matches!(r.get(0), Some(FieldValue::Int64(7))))
.collect();
assert_eq!(sevens.len(), 1, "dedup failed across memtable+L0");
assert_eq!(
sevens[0].1.get(1),
Some(&FieldValue::Bytes(Bytes::from("v2")))
);
}
#[tokio::test]
async fn tombstone_in_memtable_hides_flushed_row() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
engine
.put(vec![FieldValue::Int64(42)], make_row(42))
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete(vec![FieldValue::Int64(42)]).await.unwrap();
let got = engine.get(&[FieldValue::Int64(42)]).unwrap();
assert!(
got.is_none(),
"tombstone must mask the flushed L0 row; got {got:?}"
);
let results = engine.scan(None, None).unwrap();
assert!(
results
.iter()
.all(|(_, r)| { !matches!(r.get(0), Some(FieldValue::Int64(42))) }),
"scan leaked a tombstoned key"
);
}
#[tokio::test]
async fn compaction_rewrites_l0_to_l1_and_reads_survive() {
let tmp = tempfile::tempdir().unwrap();
let mut cfg = test_config(&tmp);
cfg.l0_compaction_trigger = 2;
let engine = MeruEngine::open(cfg).await.unwrap();
for batch in 0..3 {
let base = batch * 10;
for i in 1..=10i64 {
let id = base + i;
engine
.put(vec![FieldValue::Int64(id)], make_row(id))
.await
.unwrap();
}
engine.flush().await.unwrap();
}
for id in 1..=30i64 {
assert!(
engine.get(&[FieldValue::Int64(id)]).unwrap().is_some(),
"pre-compaction: id {id} missing from L0"
);
}
engine.compact().await.unwrap();
for id in 1..=30i64 {
let row = engine
.get(&[FieldValue::Int64(id)])
.unwrap()
.unwrap_or_else(|| panic!("post-compaction: id {id} missing; compaction ate data"));
assert_eq!(row.get(0), Some(&FieldValue::Int64(id)));
assert_eq!(
row.get(1),
Some(&FieldValue::Bytes(Bytes::from(format!("payload_{id:04}")))),
"payload corrupted at id {id}"
);
}
let l1_dir = tmp.path().join("data").join("L1");
let mut parquet_files: Vec<std::path::PathBuf> = Vec::new();
let mut entries = tokio::fs::read_dir(&l1_dir)
.await
.expect("L1 dir must exist after compaction");
while let Some(e) = entries.next_entry().await.unwrap() {
let p = e.path();
if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
parquet_files.push(p);
}
}
assert_eq!(
parquet_files.len(),
1,
"expected exactly one L1 parquet file after compaction, got {parquet_files:?}"
);
let file = std::fs::File::open(&parquet_files[0]).unwrap();
let builder =
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let schema = builder.schema().clone();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert!(
field_names.contains(&"id"),
"L1 file missing typed 'id' column: {field_names:?}"
);
assert!(
field_names.contains(&"payload"),
"L1 file missing typed 'payload' column: {field_names:?}"
);
assert!(
!field_names.contains(&"_merutable_value"),
"L1 must not carry the postcard value blob"
);
let reader = builder.build().unwrap();
let mut row_count = 0usize;
for batch in reader {
row_count += batch.unwrap().num_rows();
}
assert_eq!(row_count, 30, "L1 parquet file row count mismatch");
let l0_remaining = engine
.scan(Some(&[FieldValue::Int64(1)]), Some(&[FieldValue::Int64(2)]))
.unwrap();
assert!(
!l0_remaining.is_empty(),
"scan should still find id=1 via L1"
);
}
#[tokio::test]
async fn bug_j_flushed_tombstone_hides_flushed_put_across_l0_files() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
engine
.put(vec![FieldValue::Int64(99)], make_row(99))
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete(vec![FieldValue::Int64(99)]).await.unwrap();
engine.flush().await.unwrap();
let got = engine.get(&[FieldValue::Int64(99)]).unwrap();
assert!(
got.is_none(),
"Bug J: flushed tombstone in file B must mask flushed Put in file A; got {got:?}"
);
let results = engine.scan(None, None).unwrap();
assert!(
results
.iter()
.all(|(_, r)| !matches!(r.get(0), Some(FieldValue::Int64(99)))),
"Bug J: scan leaked a tombstoned key across L0 files"
);
for i in 200..=210i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i))
.await
.unwrap();
}
engine.flush().await.unwrap();
for i in (200..=210i64).step_by(2) {
engine.delete(vec![FieldValue::Int64(i)]).await.unwrap();
}
engine.flush().await.unwrap();
let results = engine.scan(None, None).unwrap();
let ids: Vec<i64> = results
.iter()
.filter_map(|(_, r)| match r.get(0) {
Some(FieldValue::Int64(v)) if *v >= 200 && *v <= 210 => Some(*v),
_ => None,
})
.collect();
assert_eq!(
ids,
vec![201, 203, 205, 207, 209],
"Bug J: even-numbered keys should be hidden by flushed tombstones"
);
}