use std::sync::Arc;
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "issue35".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn turbo_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
l0_compaction_trigger: 2,
l0_slowdown_trigger: 8,
l0_stop_trigger: 12,
memtable_size_bytes: 16 * 1024 * 1024,
row_cache_capacity: 1_000,
..Default::default()
}
}
fn row(id: i64, payload: &[u8]) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Bytes(Bytes::copy_from_slice(payload))),
])
}
fn pk(id: i64) -> Vec<FieldValue> {
vec![FieldValue::Int64(id)]
}
#[tokio::test]
async fn cache_populated_get_does_not_survive_delete() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
engine.put(pk(1), row(1, b"v1")).await.unwrap();
engine.flush().await.unwrap();
let hit = engine.get(&pk(1)).unwrap();
assert!(hit.is_some(), "get should hit L0 after flush");
engine.delete(pk(1)).await.unwrap();
assert!(engine.get(&pk(1)).unwrap().is_none(), "immediate get");
engine.flush().await.unwrap();
assert!(
engine.get(&pk(1)).unwrap().is_none(),
"delete must survive flush — cache invalidation or L0 tombstone"
);
}
#[tokio::test]
async fn delete_in_memtable_shadows_put_in_l0() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
engine.put(pk(2), row(2, b"v2")).await.unwrap();
engine.flush().await.unwrap();
engine.delete(pk(2)).await.unwrap();
assert!(engine.get(&pk(2)).unwrap().is_none());
}
#[tokio::test]
async fn delete_in_l0_shadows_put_in_l0() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
engine.put(pk(3), row(3, b"v3")).await.unwrap();
engine.flush().await.unwrap();
engine.delete(pk(3)).await.unwrap();
engine.flush().await.unwrap();
assert!(
engine.get(&pk(3)).unwrap().is_none(),
"L0 seq_max DESC traversal must hit the tombstone before the put"
);
}
#[tokio::test]
async fn delete_survives_l0_to_l1_compaction() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
engine.put(pk(4), row(4, b"v4")).await.unwrap();
engine.flush().await.unwrap();
engine.delete(pk(4)).await.unwrap();
engine.flush().await.unwrap();
engine.compact().await.unwrap();
assert!(
engine.get(&pk(4)).unwrap().is_none(),
"tombstone MUST survive L0→L1 compaction when not at bottom level"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn alternating_put_delete_has_no_ghosts() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
const POOL: i64 = 100;
const OPS: i64 = 1_000;
let mut expected_present: std::collections::HashMap<i64, bool> =
std::collections::HashMap::new();
for i in 0..OPS {
let key = i % POOL;
if i % 7 == 0 {
engine.delete(pk(key)).await.unwrap();
expected_present.insert(key, false);
} else {
engine
.put(pk(key), row(key, format!("v{i}").as_bytes()))
.await
.unwrap();
expected_present.insert(key, true);
}
if i > 0 && i % 50 == 0 {
engine.flush().await.unwrap();
}
if i > 0 && i % 10 == 0 {
for (k, &present) in expected_present.iter().take(5) {
let got = engine.get(&pk(*k)).unwrap();
assert_eq!(
got.is_some(),
present,
"op={i} key={k} expected present={present} got={got:?}"
);
}
}
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
for (k, &present) in &expected_present {
let got = engine.get(&pk(*k)).unwrap();
assert_eq!(
got.is_some(),
present,
"post-compact key={k} expected present={present} got={got:?}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn memtable_tombstone_beats_stale_cache() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
engine.put(pk(7), row(7, b"original")).await.unwrap();
engine.flush().await.unwrap();
let _ = engine.get(&pk(7)).unwrap();
engine.delete(pk(7)).await.unwrap();
let got = engine.get(&pk(7)).unwrap();
assert!(
got.is_none(),
"delete must be visible immediately; got {got:?}"
);
for i in 0..100i64 {
engine.put(pk(7), row(7, b"v")).await.unwrap();
assert!(engine.get(&pk(7)).unwrap().is_some(), "iter {i} after put");
engine.delete(pk(7)).await.unwrap();
assert!(engine.get(&pk(7)).unwrap().is_none(), "iter {i} after del");
}
}