use std::path::PathBuf;
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "l1_overlap".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_row(i: i64, tag: &str) -> Row {
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(Bytes::from(tag.to_string()))),
])
}
fn test_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
l0_compaction_trigger: 1,
..Default::default()
}
}
#[tokio::test]
async fn overlapping_l1_files_serve_correct_version_on_get() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 10..=30i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i, "v1"))
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
{
let l1 = list_parquet_files(&tmp.path().join("data").join("L1"));
assert_eq!(
l1.len(),
1,
"L1 should have one file after first compaction; got {l1:?}"
);
}
for i in 1..=20i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i, "v2"))
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
{
let l1 = list_parquet_files(&tmp.path().join("data").join("L1"));
assert_eq!(
l1.len(),
2,
"L1 should have two files after second compaction; got {l1:?}"
);
}
let mut failures: Vec<String> = Vec::new();
for i in 1..=9i64 {
check_get(&engine, i, "v2", &mut failures);
}
for i in 10..=20i64 {
check_get(&engine, i, "v2", &mut failures);
}
for i in 21..=30i64 {
check_get(&engine, i, "v1", &mut failures);
}
assert!(
failures.is_empty(),
"{} point-lookup failures across overlapping L1 files:\n{}",
failures.len(),
failures.join("\n")
);
let scan = engine.scan(None, None).unwrap();
let mut observed: std::collections::BTreeMap<i64, Vec<u8>> = std::collections::BTreeMap::new();
for (_ikey, row) in scan {
let id = match row.get(0) {
Some(FieldValue::Int64(v)) => *v,
other => panic!("non-int id in scan result: {other:?}"),
};
let payload = match row.get(1) {
Some(FieldValue::Bytes(b)) => b.to_vec(),
other => panic!("non-bytes payload in scan result: {other:?}"),
};
assert!(
observed.insert(id, payload).is_none(),
"scan returned duplicate id {id}"
);
}
for i in 1..=9i64 {
assert_eq!(
observed.get(&i).map(|v| v.as_slice()),
Some(b"v2".as_slice()),
"scan: id {i} expected v2"
);
}
for i in 10..=20i64 {
assert_eq!(
observed.get(&i).map(|v| v.as_slice()),
Some(b"v2".as_slice()),
"scan: id {i} expected v2 (overlap region)"
);
}
for i in 21..=30i64 {
assert_eq!(
observed.get(&i).map(|v| v.as_slice()),
Some(b"v1".as_slice()),
"scan: id {i} expected v1"
);
}
assert_eq!(observed.len(), 30, "scan missed keys");
}
fn list_parquet_files(dir: &std::path::Path) -> Vec<PathBuf> {
if !dir.exists() {
return Vec::new();
}
let mut out = Vec::new();
for entry in std::fs::read_dir(dir).unwrap() {
let p = entry.unwrap().path();
if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
out.push(p);
}
}
out.sort();
out
}
fn check_get(engine: &MeruEngine, id: i64, expected: &str, failures: &mut Vec<String>) {
let got = engine.get(&[FieldValue::Int64(id)]).unwrap();
match got {
Some(row) => match row.get(1) {
Some(FieldValue::Bytes(b)) if b.as_ref() == expected.as_bytes() => {}
Some(FieldValue::Bytes(b)) => failures.push(format!(
"id {id}: expected {expected:?}, got {:?}",
String::from_utf8_lossy(b)
)),
other => failures.push(format!(
"id {id}: expected {expected:?}, got non-bytes {other:?}"
)),
},
None => failures.push(format!("id {id}: expected {expected:?}, got None")),
}
}