use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
fn test_schema() -> TableSchema {
TableSchema {
table_name: "imp_test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "val".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn test_config(tmp: &tempfile::TempDir) -> EngineConfig {
EngineConfig {
schema: test_schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 512,
gc_grace_period_secs: 0,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
}
}
#[tokio::test]
async fn imp03_row_cache_cleared_after_compaction() {
let tmp = tempfile::tempdir().unwrap();
let config = EngineConfig {
row_cache_capacity: 1000,
..test_config(&tmp)
};
let engine = MeruEngine::open(config).await.unwrap();
engine
.put(
vec![FieldValue::Int64(1)],
Row::new(vec![
Some(FieldValue::Int64(1)),
Some(FieldValue::Bytes(bytes::Bytes::from("v1"))),
]),
)
.await
.unwrap();
engine.flush().await.unwrap();
let row = engine.get(&[FieldValue::Int64(1)]).unwrap().unwrap();
assert_eq!(
row.get(1).cloned(),
Some(FieldValue::Bytes(bytes::Bytes::from("v1")))
);
engine
.put(
vec![FieldValue::Int64(1)],
Row::new(vec![
Some(FieldValue::Int64(1)),
Some(FieldValue::Bytes(bytes::Bytes::from("v2"))),
]),
)
.await
.unwrap();
engine.flush().await.unwrap();
engine.compact().await.unwrap();
let row = engine.get(&[FieldValue::Int64(1)]).unwrap().unwrap();
assert_eq!(
row.get(1).cloned(),
Some(FieldValue::Bytes(bytes::Bytes::from("v2"))),
"IMP-03: cache must be cleared after compaction — stale v1 was served"
);
}
#[tokio::test]
async fn imp02_visible_seq_matches_memtable() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
let seq_before = engine.read_seq().0;
engine
.put(
vec![FieldValue::Int64(42)],
Row::new(vec![
Some(FieldValue::Int64(42)),
Some(FieldValue::Bytes(bytes::Bytes::from("hello"))),
]),
)
.await
.unwrap();
let seq_after = engine.read_seq().0;
assert_eq!(
seq_after,
seq_before + 1,
"IMP-02: visible_seq must advance by 1 after a single put"
);
let row = engine.get(&[FieldValue::Int64(42)]).unwrap();
assert!(
row.is_some(),
"IMP-02: data must be visible immediately after put returns"
);
}
#[tokio::test]
async fn imp02_batch_advances_visible_seq() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
let seq_before = engine.read_seq().0;
let mut batch = merutable::engine::write_path::MutationBatch::new();
for i in 1..=5i64 {
batch.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{i}")))),
]),
);
}
merutable::engine::write_path::apply_batch(&engine, batch)
.await
.unwrap();
let seq_after = engine.read_seq().0;
assert_eq!(
seq_after,
seq_before + 5,
"IMP-02: visible_seq must advance by batch size (5)"
);
for i in 1..=5i64 {
let row = engine.get(&[FieldValue::Int64(i)]).unwrap();
assert!(row.is_some(), "key {i} must be readable after batch");
}
}
#[tokio::test]
async fn imp15_refresh_rejects_missing_files() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024;
let primary = MeruEngine::open(config.clone()).await.unwrap();
primary
.put(
vec![FieldValue::Int64(1)],
Row::new(vec![Some(FieldValue::Int64(1)), None]),
)
.await
.unwrap();
primary.flush().await.unwrap();
drop(primary);
config.read_only = true;
let replica = MeruEngine::open(config).await.unwrap();
let l0_dir = tmp.path().join("data").join("L0");
if l0_dir.exists() {
for entry in std::fs::read_dir(&l0_dir).unwrap() {
let entry = entry.unwrap();
if entry.path().extension().is_some_and(|e| e == "parquet") {
std::fs::remove_file(entry.path()).unwrap();
}
}
}
let result = replica.refresh().await;
assert!(
result.is_err(),
"IMP-15: refresh must reject when referenced data files are missing"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn compactions_on_disjoint_levels_run_concurrently() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024;
config.gc_grace_period_secs = 0;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..6i64 {
for i in 0..20i64 {
let key = batch * 100 + i;
engine
.put(
vec![FieldValue::Int64(key)],
Row::new(vec![
Some(FieldValue::Int64(key)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{key}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
}
let e1 = engine.clone();
let e2 = engine.clone();
let h1 = tokio::spawn(async move { e1.compact().await });
let h2 = tokio::spawn(async move { e2.compact().await });
let timeout = std::time::Duration::from_secs(60);
let (r1, r2) = tokio::join!(
tokio::time::timeout(timeout, h1),
tokio::time::timeout(timeout, h2),
);
assert!(
r1.is_ok() && r2.is_ok(),
"concurrent compactions must not deadlock"
);
r1.unwrap().unwrap().unwrap();
r2.unwrap().unwrap().unwrap();
let stats = engine.stats();
let l0 = stats
.levels
.iter()
.find(|l| l.level == 0)
.map(|l| l.file_count)
.unwrap_or(0);
assert!(
l0 < 4,
"after concurrent compactions L0 must be below trigger (got {l0})"
);
}
#[tokio::test]
async fn level_reservations_released_after_compact() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024;
config.gc_grace_period_secs = 0;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..5i64 {
for i in 0..10i64 {
let key = batch * 100 + i;
engine
.put(
vec![FieldValue::Int64(key)],
Row::new(vec![Some(FieldValue::Int64(key)), None]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
}
engine.compact().await.unwrap();
for _ in 0..16 {
let busy = engine.__compacting_levels_snapshot().await;
if busy.is_empty() {
break;
}
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let busy = engine.__compacting_levels_snapshot().await;
assert!(
busy.is_empty(),
"level reservations leaked after compact(): {busy:?}"
);
engine.compact().await.unwrap();
}
#[tokio::test]
async fn compact_loops_until_tree_healthy() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024; config.gc_grace_period_secs = 0;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..6i64 {
for i in 0..10i64 {
let key = batch * 100 + i;
engine
.put(
vec![FieldValue::Int64(key)],
Row::new(vec![
Some(FieldValue::Int64(key)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{key}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
}
let stats_before = engine.stats();
let l0_before = stats_before
.levels
.iter()
.find(|l| l.level == 0)
.map(|l| l.file_count)
.unwrap_or(0);
assert!(
l0_before >= 4,
"setup invariant: need ≥4 L0 files, got {l0_before}"
);
engine.compact().await.unwrap();
let stats_after = engine.stats();
let l0_after = stats_after
.levels
.iter()
.find(|l| l.level == 0)
.map(|l| l.file_count)
.unwrap_or(0);
assert!(
l0_after < 4,
"compact() should have drained L0 below trigger, got {l0_after} files"
);
}
#[tokio::test]
async fn close_flushes_and_rejects_writes() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024; let engine = MeruEngine::open(config).await.unwrap();
for i in 0..20i64 {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{i}")))),
]),
)
.await
.unwrap();
}
engine.close().await.unwrap();
for i in 0..20i64 {
let row = engine.get(&[FieldValue::Int64(i)]).unwrap();
assert!(row.is_some(), "key {i} must be readable after close");
}
let err = engine
.put(
vec![FieldValue::Int64(99)],
Row::new(vec![Some(FieldValue::Int64(99)), None]),
)
.await;
assert!(err.is_err(), "put must fail after close");
}
#[tokio::test]
async fn close_data_durable_across_reopen() {
let tmp = tempfile::tempdir().unwrap();
let config = test_config(&tmp);
{
let mut cfg = config.clone();
cfg.memtable_size_bytes = 64 * 1024 * 1024; let engine = MeruEngine::open(cfg).await.unwrap();
for i in 0..30i64 {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("data_{i}")))),
]),
)
.await
.unwrap();
}
engine.close().await.unwrap();
}
let engine = MeruEngine::open(config).await.unwrap();
for i in 0..30i64 {
let row = engine.get(&[FieldValue::Int64(i)]).unwrap();
assert!(row.is_some(), "key {i} must survive close + reopen");
}
}
#[tokio::test]
async fn issue12_write_api_rejects_malformed_rows() {
use merutable::types::schema::{ColumnDef, ColumnType, TableSchema};
let tmp = tempfile::tempdir().unwrap();
let schema = TableSchema {
table_name: "shape".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
ColumnDef {
name: "fixed8".into(),
col_type: ColumnType::FixedLenByteArray(8),
nullable: false,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
};
let config = EngineConfig {
schema: schema.clone(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 64 * 1024 * 1024,
gc_grace_period_secs: 0,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
};
let engine = MeruEngine::open(config).await.unwrap();
let ok = Row::new(vec![
Some(FieldValue::Int64(1)),
Some(FieldValue::Bytes(bytes::Bytes::from("abc"))),
Some(FieldValue::Bytes(bytes::Bytes::from_static(b"12345678"))),
]);
engine
.put(vec![FieldValue::Int64(1)], ok)
.await
.expect("valid row must succeed");
let short = Row::new(vec![Some(FieldValue::Int64(2))]);
let err = engine
.put(vec![FieldValue::Int64(2)], short)
.await
.unwrap_err();
assert!(
matches!(err, merutable::types::MeruError::SchemaMismatch(_)),
"arity-short must return SchemaMismatch, got: {err:?}"
);
let long = Row::new(vec![
Some(FieldValue::Int64(3)),
Some(FieldValue::Bytes(bytes::Bytes::from("x"))),
Some(FieldValue::Bytes(bytes::Bytes::from_static(b"12345678"))),
Some(FieldValue::Int32(999)),
]);
let err = engine
.put(vec![FieldValue::Int64(3)], long)
.await
.unwrap_err();
assert!(matches!(
err,
merutable::types::MeruError::SchemaMismatch(_)
));
let typ = Row::new(vec![
Some(FieldValue::Bytes(bytes::Bytes::from("not_an_int"))),
Some(FieldValue::Bytes(bytes::Bytes::from("x"))),
Some(FieldValue::Bytes(bytes::Bytes::from_static(b"12345678"))),
]);
let err = engine
.put(vec![FieldValue::Int64(4)], typ)
.await
.unwrap_err();
assert!(matches!(
err,
merutable::types::MeruError::SchemaMismatch(_)
));
let null_nn = Row::new(vec![
None,
Some(FieldValue::Bytes(bytes::Bytes::from("x"))),
Some(FieldValue::Bytes(bytes::Bytes::from_static(b"12345678"))),
]);
let err = engine
.put(vec![FieldValue::Int64(5)], null_nn)
.await
.unwrap_err();
assert!(matches!(
err,
merutable::types::MeruError::SchemaMismatch(_)
));
let bad_fixed = Row::new(vec![
Some(FieldValue::Int64(6)),
None,
Some(FieldValue::Bytes(bytes::Bytes::from_static(b"short"))), ]);
let err = engine
.put(vec![FieldValue::Int64(6)], bad_fixed)
.await
.unwrap_err();
assert!(matches!(
err,
merutable::types::MeruError::SchemaMismatch(_)
));
}
#[tokio::test]
async fn issue12_decode_corruption_propagates_through_read_path() {
use merutable::engine::codec;
let bad = vec![0x01, 0xFF, 0xFF, 0xFF];
let err = codec::decode_row(&bad).unwrap_err();
assert!(
matches!(err, merutable::types::MeruError::Corruption(_)),
"decode_row on corrupt bytes must return Corruption, got: {err:?}"
);
}
#[tokio::test]
async fn issue11_close_sweeps_pending_deletions() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024;
config.gc_grace_period_secs = 0;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..5i64 {
for i in 0..5i64 {
let key = batch * 10 + i;
engine
.put(
vec![FieldValue::Int64(key)],
Row::new(vec![
Some(FieldValue::Int64(key)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{key}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
}
engine.compact().await.unwrap();
let l0_dir = tmp.path().join("data").join("L0");
let _pre_close_files = std::fs::read_dir(&l0_dir)
.map(|it| it.filter_map(|e| e.ok()).count())
.unwrap_or(0);
engine.close().await.unwrap();
let post_close_files = std::fs::read_dir(&l0_dir)
.map(|it| {
it.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.count()
})
.unwrap_or(0);
assert_eq!(
post_close_files, 0,
"Issue #11: close() must sweep pending deletions; {post_close_files} L0 files leaked",
);
}
#[tokio::test]
async fn issue10_readonly_refresh_clears_row_cache() {
let tmp = tempfile::tempdir().unwrap();
let mut config_rw = test_config(&tmp);
config_rw.memtable_size_bytes = 64 * 1024 * 1024;
config_rw.row_cache_capacity = 1000;
let mut config_ro = config_rw.clone();
config_ro.read_only = true;
let primary = MeruEngine::open(config_rw.clone()).await.unwrap();
for i in 0..20i64 {
primary
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v1_{i}")))),
]),
)
.await
.unwrap();
}
primary.flush().await.unwrap();
let replica = MeruEngine::open(config_ro.clone()).await.unwrap();
for i in [3i64, 5, 7] {
let row = replica.get(&[FieldValue::Int64(i)]).unwrap().unwrap();
match row.get(1) {
Some(FieldValue::Bytes(b)) => assert_eq!(
b.as_ref(),
format!("v1_{i}").as_bytes(),
"expected v1 before refresh"
),
_ => panic!("unexpected field type"),
}
}
for i in 0..20i64 {
primary
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v2_{i}")))),
]),
)
.await
.unwrap();
}
primary.delete(vec![FieldValue::Int64(5)]).await.unwrap();
primary.flush().await.unwrap();
replica.refresh().await.unwrap();
let row = replica.get(&[FieldValue::Int64(3)]).unwrap().unwrap();
match row.get(1) {
Some(FieldValue::Bytes(b)) => assert_eq!(
b.as_ref(),
b"v2_3",
"Issue #10: cache must be cleared on refresh; got stale value"
),
_ => panic!("unexpected field type"),
}
let row = replica.get(&[FieldValue::Int64(5)]).unwrap();
assert!(
row.is_none(),
"Issue #10: deleted key must return None after refresh; cache was serving stale data"
);
}
#[tokio::test]
async fn issue6_readonly_refresh_picks_up_new_data() {
let tmp = tempfile::tempdir().unwrap();
let mut config_rw = test_config(&tmp);
config_rw.memtable_size_bytes = 64 * 1024 * 1024;
let mut config_ro = config_rw.clone();
config_ro.read_only = true;
let primary = MeruEngine::open(config_rw.clone()).await.unwrap();
for i in 0..50i64 {
primary
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("first_{i}")))),
]),
)
.await
.unwrap();
}
primary.flush().await.unwrap();
let replica = MeruEngine::open(config_ro.clone()).await.unwrap();
for i in 0..50i64 {
let row = replica.get(&[FieldValue::Int64(i)]).unwrap();
assert!(row.is_some(), "pre-refresh: replica missing key {i}");
}
for i in 50..100i64 {
primary
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("second_{i}")))),
]),
)
.await
.unwrap();
}
primary.flush().await.unwrap();
replica.refresh().await.unwrap();
let mut missing = Vec::new();
for i in 0..100i64 {
let row = replica.get(&[FieldValue::Int64(i)]).unwrap();
if row.is_none() {
missing.push(i);
}
}
assert!(
missing.is_empty(),
"Issue #6: replica missing keys after refresh: {missing:?}",
);
}
#[tokio::test]
async fn issue7_narrow_diagnostic() {
use merutable::types::schema::{ColumnDef, ColumnType, TableSchema};
let tmp = tempfile::tempdir().unwrap();
let schema = TableSchema {
table_name: "ba".into(),
columns: vec![
ColumnDef {
name: "k".into(),
col_type: ColumnType::ByteArray,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
};
let config = EngineConfig {
schema: schema.clone(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 64 * 1024 * 1024,
gc_grace_period_secs: 0,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
};
let engine = MeruEngine::open(config).await.unwrap();
let empty = bytes::Bytes::new();
let null1 = bytes::Bytes::from_static(&[0u8]);
engine
.put(
vec![FieldValue::Bytes(empty.clone())],
Row::new(vec![
Some(FieldValue::Bytes(empty.clone())),
Some(FieldValue::Int64(100)),
]),
)
.await
.unwrap();
engine
.put(
vec![FieldValue::Bytes(null1.clone())],
Row::new(vec![
Some(FieldValue::Bytes(null1.clone())),
Some(FieldValue::Int64(200)),
]),
)
.await
.unwrap();
let r = engine.get(&[FieldValue::Bytes(empty.clone())]).unwrap();
println!("STAGE memtable empty: {r:?}");
let r = engine.get(&[FieldValue::Bytes(null1.clone())]).unwrap();
println!("STAGE memtable [0x00]: {r:?}");
engine.flush().await.unwrap();
let r = engine.get(&[FieldValue::Bytes(empty.clone())]).unwrap();
println!("STAGE after flush empty: {r:?}");
let r = engine.get(&[FieldValue::Bytes(null1.clone())]).unwrap();
println!("STAGE after flush [0x00]: {r:?}");
engine.compact().await.unwrap();
let r = engine.get(&[FieldValue::Bytes(empty.clone())]).unwrap();
println!("STAGE after compact empty: {r:?}");
let r = engine.get(&[FieldValue::Bytes(null1.clone())]).unwrap();
println!("STAGE after compact [0x00]: {r:?}");
engine.close().await.unwrap();
drop(engine);
let config2 = EngineConfig {
schema: schema.clone(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 64 * 1024 * 1024,
gc_grace_period_secs: 0,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
};
let engine = MeruEngine::open(config2).await.unwrap();
let r = engine.get(&[FieldValue::Bytes(empty.clone())]).unwrap();
println!("STAGE after reopen empty: {r:?}");
let r = engine.get(&[FieldValue::Bytes(null1.clone())]).unwrap();
println!("STAGE after reopen [0x00]: {r:?}");
let scanned = engine.scan(None, None).unwrap();
println!("SCAN results: {} rows", scanned.len());
for (ik, row) in &scanned {
println!(" ikey_bytes={:?} row={:?}", ik.as_bytes(), row);
}
}
#[tokio::test]
async fn issue7_bytearray_edge_case_pks_roundtrip_through_parquet() {
use merutable::types::schema::{ColumnDef, ColumnType, TableSchema};
let tmp = tempfile::tempdir().unwrap();
let schema = TableSchema {
table_name: "ba".into(),
columns: vec![
ColumnDef {
name: "k".into(),
col_type: ColumnType::ByteArray,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
};
let make_config = || EngineConfig {
schema: schema.clone(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 64 * 1024 * 1024,
gc_grace_period_secs: 0,
l0_slowdown_trigger: u32::MAX as usize,
l0_stop_trigger: u32::MAX as usize,
..Default::default()
};
let cases: Vec<(bytes::Bytes, i64)> = vec![
(bytes::Bytes::new(), 100), (bytes::Bytes::from_static(&[0u8]), 200), (bytes::Bytes::from_static(&[0u8, 0u8]), 300), (bytes::Bytes::from_static(&[0u8, 0x01u8]), 400), (bytes::Bytes::from_static(&[0x01u8, 0u8]), 500), (bytes::Bytes::from_static(&[0xFFu8]), 600), (bytes::Bytes::from_static(&[0u8, 0xFFu8, 0u8]), 700), (bytes::Bytes::from("hello"), 800),
];
{
let engine = MeruEngine::open(make_config()).await.unwrap();
for (k, v) in &cases {
engine
.put(
vec![FieldValue::Bytes(k.clone())],
Row::new(vec![
Some(FieldValue::Bytes(k.clone())),
Some(FieldValue::Int64(*v)),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
engine.close().await.unwrap();
}
let engine = MeruEngine::open(make_config()).await.unwrap();
for (k, expected_v) in &cases {
let row = engine
.get(&[FieldValue::Bytes(k.clone())])
.unwrap()
.unwrap_or_else(|| panic!("Issue #7: key {k:?} missing after reopen"));
match row.get(1) {
Some(FieldValue::Int64(got)) => assert_eq!(
*got, *expected_v,
"Issue #7: key {k:?} returned wrong value (got {got}, want {expected_v})"
),
other => panic!("expected Int64 for key {k:?}, got {other:?}"),
}
}
let scanned = engine.scan(None, None).unwrap();
let scanned_keys: Vec<bytes::Bytes> = scanned
.iter()
.map(|(_ik, row)| match row.get(0) {
Some(FieldValue::Bytes(b)) => b.clone(),
_ => panic!("unexpected field type in scan result"),
})
.collect();
let mut expected_keys: Vec<bytes::Bytes> = cases.iter().map(|(k, _)| k.clone()).collect();
expected_keys.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
assert_eq!(
scanned_keys, expected_keys,
"Issue #7: scan must return keys in ascending order"
);
}
#[tokio::test]
async fn l0_stop_trigger_blocks_writes_until_drained() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.memtable_size_bytes = 64 * 1024 * 1024; config.l0_compaction_trigger = 100; config.l0_slowdown_trigger = 4;
config.l0_stop_trigger = 6;
config.flush_parallelism = 0;
config.compaction_parallelism = 0;
config.gc_grace_period_secs = 0;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..6i64 {
engine
.put(
vec![FieldValue::Int64(batch)],
Row::new(vec![
Some(FieldValue::Int64(batch)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{batch}")))),
]),
)
.await
.unwrap();
engine.flush().await.unwrap();
}
let l0 = engine
.stats()
.levels
.iter()
.find(|l| l.level == 0)
.map(|l| l.file_count)
.unwrap_or(0);
assert_eq!(
l0, 6,
"setup invariant: L0 must equal stop trigger; got {l0}"
);
let stalled = tokio::time::timeout(
std::time::Duration::from_millis(300),
engine.put(
vec![FieldValue::Int64(9999)],
Row::new(vec![Some(FieldValue::Int64(9999)), None]),
),
)
.await;
assert!(
stalled.is_err(),
"Issue #5: put must be stalled when L0 is above stop trigger"
);
engine.compact().await.unwrap();
let l0_after = engine
.stats()
.levels
.iter()
.find(|l| l.level == 0)
.map(|l| l.file_count)
.unwrap_or(0);
assert!(
l0_after < 6,
"compact() should have drained L0 below stop trigger; got {l0_after}"
);
let result = tokio::time::timeout(
std::time::Duration::from_secs(3),
engine.put(
vec![FieldValue::Int64(10000)],
Row::new(vec![Some(FieldValue::Int64(10000)), None]),
),
)
.await
.expect("put must complete after compaction drained L0")
.expect("put must succeed");
assert!(result.0 > 0);
}
#[tokio::test]
async fn compaction_output_splits_at_size_threshold() {
use merutable::types::schema::{ColumnDef, ColumnType, TableSchema};
let tmp = tempfile::tempdir().unwrap();
let schema = TableSchema {
table_name: "big_payload".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: false,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
};
let config = EngineConfig {
schema,
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
memtable_size_bytes: 64 * 1024 * 1024,
gc_grace_period_secs: 0,
..Default::default()
};
let engine = MeruEngine::open(config).await.unwrap();
let payload = vec![0xAAu8; 512 * 1024];
let row_count = 1200i64;
for i in 0..row_count {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(payload.clone()))),
]),
)
.await
.unwrap();
if i % 200 == 199 {
engine.flush().await.unwrap();
}
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
let stats = engine.stats();
let l1 = stats
.levels
.iter()
.find(|l| l.level == 1)
.expect("L1 must exist after L0→L1 compaction");
assert!(
l1.file_count >= 2,
"Issue #3: compaction must split large outputs; L1 has {} file(s)",
l1.file_count,
);
let mut ranges: Vec<(Vec<u8>, Vec<u8>)> = l1
.files
.iter()
.map(|f| {
(f.path.as_bytes().to_vec(), f.path.as_bytes().to_vec())
})
.collect();
ranges.sort();
ranges.dedup();
assert_eq!(
ranges.len(),
l1.file_count,
"output files must have unique paths"
);
for i in (0..row_count).step_by(100) {
let row = engine.get(&[FieldValue::Int64(i)]).unwrap();
assert!(
row.is_some(),
"Issue #3: row {i} missing after split compaction"
);
}
}
#[tokio::test]
async fn version_pin_prevents_gc_of_files_a_reader_might_need() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.gc_grace_period_secs = 0;
config.memtable_size_bytes = 64 * 1024 * 1024;
let engine = MeruEngine::open(config).await.unwrap();
for batch in 0..5i64 {
for i in 0..10i64 {
let key = batch * 100 + i;
engine
.put(
vec![FieldValue::Int64(key)],
Row::new(vec![
Some(FieldValue::Int64(key)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("v{key}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
}
let l0_dir = tmp.path().join("data").join("L0");
let files_before: Vec<_> = std::fs::read_dir(&l0_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.map(|e| e.path())
.collect();
assert!(
files_before.len() >= 4,
"need ≥4 L0 files to guarantee compaction; got {}",
files_before.len()
);
let (pin, pinned_version) = engine.pin_current_snapshot();
let pinned_snapshot_id = pinned_version.snapshot_id;
engine.compact().await.unwrap();
let files_after_compact: Vec<_> = std::fs::read_dir(&l0_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.map(|e| e.path())
.collect();
for f in &files_before {
assert!(
files_after_compact.iter().any(|p| p == f) || !f.exists(),
"file {} disappeared while a pinned reader still holds its snapshot",
f.display()
);
assert!(
f.exists(),
"BUG-0007..0013 regression: file {} was GCed while snapshot {pinned_snapshot_id} \
is pinned",
f.display(),
);
}
drop(pin);
engine.gc_pending_deletions().await;
let files_after_unpin: Vec<_> = std::fs::read_dir(&l0_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.map(|e| e.path())
.collect();
for f in &files_before {
assert!(
!f.exists(),
"file {} should have been GCed after pin released; remaining files = {files_after_unpin:?}",
f.display()
);
}
}
#[tokio::test]
async fn pin_refcount_and_min_snapshot() {
let tmp = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(test_config(&tmp)).await.unwrap();
assert!(engine.min_pinned_snapshot().is_none());
let (p1, v1) = engine.pin_current_snapshot();
let (p2, v2) = engine.pin_current_snapshot();
assert_eq!(v1.snapshot_id, v2.snapshot_id);
assert_eq!(engine.min_pinned_snapshot(), Some(v1.snapshot_id));
drop(p1);
assert_eq!(engine.min_pinned_snapshot(), Some(v2.snapshot_id));
drop(p2);
assert!(engine.min_pinned_snapshot().is_none());
}
#[tokio::test]
async fn imp12_gc_grace_period() {
let tmp = tempfile::tempdir().unwrap();
let mut config = test_config(&tmp);
config.gc_grace_period_secs = 10;
config.memtable_size_bytes = 512;
let engine = MeruEngine::open(config).await.unwrap();
for i in 0..50i64 {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("data_{i}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
for i in 50..100i64 {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(bytes::Bytes::from(format!("data_{i}")))),
]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
let l0_dir = tmp.path().join("data").join("L0");
let files_before: Vec<_> = std::fs::read_dir(&l0_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.collect();
assert!(
files_before.len() >= 2,
"should have at least 2 L0 files before compaction"
);
engine.compact().await.unwrap();
let files_after: Vec<_> = std::fs::read_dir(&l0_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.collect();
assert!(
files_after.len() >= files_before.len(),
"IMP-12: files should be preserved during grace period, \
before={} after={}",
files_before.len(),
files_after.len()
);
}