use std::collections::{BTreeMap, BTreeSet};
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "cross_level_stress".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "name".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
ColumnDef {
name: "score".into(),
col_type: ColumnType::Double,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_row(id: i64, tag: &str, score: f64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Bytes(Bytes::from(tag.to_string()))),
Some(FieldValue::Double(score)),
])
}
fn test_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
l0_compaction_trigger: 2,
level_target_bytes: vec![4 * 1024, 8 * 1024, 16 * 1024],
memtable_size_bytes: 64 * 1024 * 1024,
..Default::default()
}
}
#[tokio::test]
async fn cross_level_data_integrity() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
let mut expected: BTreeMap<i64, (String, f64)> = BTreeMap::new();
let mut deleted: BTreeSet<i64> = BTreeSet::new();
let batch_size = 100i64;
let num_phases = 5;
for phase in 0..num_phases {
let base = phase * batch_size;
for i in 0..batch_size {
let id = base + i + 1; let tag = format!("phase{phase}_row{id}");
let score = id as f64 * 1.1;
engine
.put(vec![FieldValue::Int64(id)], make_row(id, &tag, score))
.await
.unwrap();
deleted.remove(&id);
expected.insert(id, (tag, score));
}
engine.flush().await.unwrap();
eprintln!(
"phase {phase}: flushed batch [{}, {}]",
base + 1,
base + batch_size
);
if phase > 0 {
let overwrite_base = (phase - 1) * batch_size;
for offset in [10, 20, 30, 40, 50] {
let id = overwrite_base + offset;
if id < 1 {
continue;
}
let tag = format!("updated_p{phase}_id{id}");
let score = id as f64 * 99.9;
engine
.put(vec![FieldValue::Int64(id)], make_row(id, &tag, score))
.await
.unwrap();
deleted.remove(&id);
expected.insert(id, (tag, score));
}
engine.flush().await.unwrap();
eprintln!("phase {phase}: flushed overwrites");
}
if phase >= 2 {
let delete_base = (phase - 2) * batch_size;
for offset in [5, 15, 25, 35, 45, 55, 65, 75, 85, 95] {
let id = delete_base + offset;
if id < 1 {
continue;
}
engine.delete(vec![FieldValue::Int64(id)]).await.unwrap();
deleted.insert(id);
}
engine.flush().await.unwrap();
eprintln!("phase {phase}: flushed deletes");
}
for round in 0..4 {
let result = engine.compact().await;
match &result {
Ok(()) => eprintln!("phase {phase}: compaction round {round} succeeded"),
Err(e) => eprintln!("phase {phase}: compaction round {round} skipped: {e}"),
}
let _ = result;
}
}
for round in 0..8 {
let _ = engine.compact().await;
eprintln!("final compaction round {round}");
}
let live_expected: BTreeMap<i64, (String, f64)> = expected
.iter()
.filter(|(id, _)| !deleted.contains(id))
.map(|(id, v)| (*id, v.clone()))
.collect();
eprintln!(
"expected {} live rows, {} deleted keys",
live_expected.len(),
deleted.len()
);
let mut get_failures: Vec<String> = Vec::new();
for (id, (expected_tag, expected_score)) in &live_expected {
match engine.get(&[FieldValue::Int64(*id)]).unwrap() {
Some(row) => {
match row.get(1) {
Some(FieldValue::Bytes(b)) => {
let got = String::from_utf8_lossy(b);
if got.as_ref() != expected_tag.as_str() {
get_failures.push(format!(
"id {id}: name expected {expected_tag:?}, got {got:?}"
));
}
}
other => {
get_failures.push(format!(
"id {id}: name expected Bytes({expected_tag:?}), got {other:?}"
));
}
}
match row.get(2) {
Some(FieldValue::Double(s)) => {
if (*s - expected_score).abs() > 1e-9 {
get_failures
.push(format!("id {id}: score expected {expected_score}, got {s}"));
}
}
other => {
get_failures.push(format!(
"id {id}: score expected Double({expected_score}), got {other:?}"
));
}
}
}
None => {
get_failures.push(format!("id {id}: expected row, got None (data lost)"));
}
}
}
assert!(
get_failures.is_empty(),
"point lookup failures ({}):\n{}",
get_failures.len(),
get_failures.join("\n")
);
eprintln!(
"PASS: all {} point lookups returned correct values",
live_expected.len()
);
let mut delete_failures: Vec<String> = Vec::new();
for id in &deleted {
if let Some(row) = engine.get(&[FieldValue::Int64(*id)]).unwrap() {
delete_failures.push(format!(
"id {id}: expected None (deleted), got {:?}",
row.get(1)
));
}
}
assert!(
delete_failures.is_empty(),
"deleted key resurrections ({}):\n{}",
delete_failures.len(),
delete_failures.join("\n")
);
eprintln!("PASS: all {} deleted keys correctly absent", deleted.len());
let scan_results = engine.scan(None, None).unwrap();
let mut scan_ids: BTreeMap<i64, usize> = BTreeMap::new();
let mut scan_failures: Vec<String> = Vec::new();
for (_ikey, row) in &scan_results {
let id = match row.get(0) {
Some(FieldValue::Int64(v)) => *v,
other => panic!("non-int id in scan result: {other:?}"),
};
*scan_ids.entry(id).or_insert(0) += 1;
}
for (id, count) in &scan_ids {
if *count > 1 {
scan_failures.push(format!(
"id {id}: appeared {count} times in scan (duplicate)"
));
}
}
for id in live_expected.keys() {
if !scan_ids.contains_key(id) {
scan_failures.push(format!("id {id}: missing from scan (live key lost)"));
}
}
for id in &deleted {
if scan_ids.contains_key(id) {
scan_failures.push(format!(
"id {id}: found in scan but should be deleted (tombstone leak)"
));
}
}
if scan_ids.len() != live_expected.len() {
scan_failures.push(format!(
"scan returned {} unique keys, expected {}",
scan_ids.len(),
live_expected.len()
));
}
assert!(
scan_failures.is_empty(),
"scan verification failures ({}):\n{}",
scan_failures.len(),
scan_failures.join("\n")
);
eprintln!(
"PASS: scan returned exactly {} rows, no duplicates, no ghosts",
scan_results.len()
);
let mut value_failures: Vec<String> = Vec::new();
for (_ikey, row) in &scan_results {
let id = match row.get(0) {
Some(FieldValue::Int64(v)) => *v,
_ => continue,
};
if let Some((expected_tag, expected_score)) = live_expected.get(&id) {
match row.get(1) {
Some(FieldValue::Bytes(b)) => {
let got = String::from_utf8_lossy(b);
if got.as_ref() != expected_tag.as_str() {
value_failures.push(format!(
"scan id {id}: name expected {expected_tag:?}, got {got:?}"
));
}
}
other => {
value_failures
.push(format!("scan id {id}: name expected Bytes, got {other:?}"));
}
}
match row.get(2) {
Some(FieldValue::Double(s)) => {
if (*s - expected_score).abs() > 1e-9 {
value_failures.push(format!(
"scan id {id}: score expected {expected_score}, got {s}"
));
}
}
other => {
value_failures.push(format!(
"scan id {id}: score expected Double, got {other:?}"
));
}
}
}
}
assert!(
value_failures.is_empty(),
"scan value mismatches ({}):\n{}",
value_failures.len(),
value_failures.join("\n")
);
eprintln!("PASS: all scan values match expected");
let data_dir = tmp.path().join("data");
let mut level_file_counts: Vec<(u8, usize)> = Vec::new();
for level_num in 0..=4u8 {
let level_dir = data_dir.join(format!("L{level_num}"));
let count = count_parquet_files(&level_dir);
if count > 0 {
level_file_counts.push((level_num, count));
}
}
eprintln!("level file distribution:");
for (lvl, count) in &level_file_counts {
eprintln!(" L{lvl}: {count} files");
}
let levels_with_files = level_file_counts.len();
assert!(
levels_with_files >= 2,
"expected files at >= 2 levels to prove cross-level compaction, \
but only found files at {} level(s): {:?}",
levels_with_files,
level_file_counts
);
eprintln!("PASS: data spread across {levels_with_files} levels");
let max_level_with_files = level_file_counts.iter().map(|(l, _)| *l).max().unwrap_or(0);
eprintln!("deepest level with files: L{max_level_with_files}");
if max_level_with_files < 2 {
eprintln!(
"WARNING: data only reached L{max_level_with_files}, \
ideally should reach L2+ for cross-level validation"
);
}
eprintln!("cross_level_data_integrity: ALL CHECKS PASSED");
}
#[tokio::test]
async fn cross_level_bounded_scan() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
for i in 1..=100i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i, "batch1", i as f64))
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
for i in 101..=200i64 {
engine
.put(vec![FieldValue::Int64(i)], make_row(i, "batch2", i as f64))
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
for i in 50..=60i64 {
engine
.put(
vec![FieldValue::Int64(i)],
make_row(i, "overwritten", i as f64 * 2.0),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
for i in 70..=75i64 {
engine.delete(vec![FieldValue::Int64(i)]).await.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
let results = engine
.scan(
Some(&[FieldValue::Int64(40)]),
Some(&[FieldValue::Int64(80)]),
)
.unwrap();
let ids: Vec<i64> = results
.iter()
.map(|(_, r)| match r.get(0) {
Some(FieldValue::Int64(v)) => *v,
_ => panic!("non-int id"),
})
.collect();
let expected_ids: Vec<i64> = (40..80).filter(|i| !(70..=75).contains(i)).collect();
assert_eq!(
ids,
expected_ids,
"bounded scan mismatch: got {} rows, expected {}",
ids.len(),
expected_ids.len()
);
for (_, row) in &results {
let id = match row.get(0) {
Some(FieldValue::Int64(v)) => *v,
_ => continue,
};
if (50..=60).contains(&id) {
let name = match row.get(1) {
Some(FieldValue::Bytes(b)) => String::from_utf8_lossy(b).to_string(),
other => panic!("id {id}: expected Bytes, got {other:?}"),
};
assert_eq!(
name, "overwritten",
"id {id}: expected overwritten tag, got {name:?}"
);
}
}
eprintln!("cross_level_bounded_scan: PASSED");
}
fn count_parquet_files(dir: &std::path::Path) -> usize {
if !dir.exists() {
return 0;
}
std::fs::read_dir(dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("parquet"))
.count()
}