use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
},
time::Duration,
};
use bytes::Bytes;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
type Shadow = Arc<Mutex<HashMap<(i64, Vec<u8>), bool>>>;
fn schema() -> TableSchema {
TableSchema {
table_name: "issue39".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "key2".into(),
col_type: ColumnType::ByteArray,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0, 1],
..Default::default()
}
}
fn turbo_config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
l0_compaction_trigger: 2,
l0_slowdown_trigger: 8,
l0_stop_trigger: 12,
memtable_size_bytes: 16 * 1024 * 1024,
row_cache_capacity: 1_000,
compaction_parallelism: 4,
flush_parallelism: 2,
..Default::default()
}
}
fn mk_row(id: i64, key2: &[u8]) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Bytes(Bytes::copy_from_slice(key2))),
Some(FieldValue::Bytes(Bytes::from(vec![b'x'; 256]))),
])
}
fn pk(id: i64, key2: &[u8]) -> Vec<FieldValue> {
vec![
FieldValue::Int64(id),
FieldValue::Bytes(Bytes::copy_from_slice(key2)),
]
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[ignore]
async fn issue39_concurrent_readers_no_post_quiescence_ghosts() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(turbo_config(&tmp)).await.unwrap();
let shadow: Shadow = Arc::new(Mutex::new(HashMap::new()));
let stop = Arc::new(AtomicBool::new(false));
let concurrent_ghosts = Arc::new(AtomicU64::new(0));
let concurrent_reads = Arc::new(AtomicU64::new(0));
let mut reader_handles = Vec::new();
for _ in 0..4 {
let e = engine.clone();
let s = shadow.clone();
let stp = stop.clone();
let g = concurrent_ghosts.clone();
let r = concurrent_reads.clone();
reader_handles.push(tokio::spawn(async move {
while !stp.load(Ordering::Relaxed) {
let snap: Vec<((i64, Vec<u8>), bool)> = {
let sh = s.lock().unwrap();
sh.iter().map(|(k, v)| (k.clone(), *v)).collect()
};
for ((id, k2), present) in snap.iter().take(20) {
let got = e.get(&pk(*id, k2));
r.fetch_add(1, Ordering::Relaxed);
if let Ok(result) = got {
let actual = result.is_some();
if actual != *present {
g.fetch_add(1, Ordering::Relaxed);
}
}
}
tokio::task::yield_now().await;
}
}));
}
const TOTAL_OPS: i64 = 20_000;
for i in 0..TOTAL_OPS {
let id = i % 2_000;
let key2 = format!("k{:04}", i % 97).into_bytes();
let pk_vals = pk(id, &key2);
let decision = i % 100;
if decision < 5 {
engine.delete(pk_vals.clone()).await.unwrap();
shadow.lock().unwrap().insert((id, key2), false);
} else {
engine
.put(pk_vals.clone(), mk_row(id, &key2))
.await
.unwrap();
shadow.lock().unwrap().insert((id, key2), true);
}
}
stop.store(true, Ordering::Relaxed);
for h in reader_handles {
let _ = h.await;
}
engine.flush().await.unwrap();
engine.compact().await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let final_shadow: Vec<((i64, Vec<u8>), bool)> = shadow
.lock()
.unwrap()
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
let mut post_ghosts: Vec<(i64, Vec<u8>)> = Vec::new();
let mut post_missing: Vec<(i64, Vec<u8>)> = Vec::new();
for ((id, key2), expected_present) in &final_shadow {
let got = engine.get(&pk(*id, key2)).unwrap();
let actual_present = got.is_some();
if actual_present != *expected_present {
if *expected_present {
post_missing.push((*id, key2.clone()));
} else {
post_ghosts.push((*id, key2.clone()));
}
}
}
let cg = concurrent_ghosts.load(Ordering::Relaxed);
let cr = concurrent_reads.load(Ordering::Relaxed);
eprintln!(
"issue39 stress: {} concurrent reads, {} concurrent ghost-class observations \
(shadow-lag window); {} post-quiescence ghosts; {} post-quiescence missing",
cr,
cg,
post_ghosts.len(),
post_missing.len()
);
assert!(
post_ghosts.is_empty(),
"Issue #39 regression: {} deleted keys resurrected after quiescence. \
First 5: {:?}",
post_ghosts.len(),
post_ghosts.iter().take(5).collect::<Vec<_>>()
);
assert!(
post_missing.is_empty(),
"Issue #39 regression: {} live keys disappeared after quiescence. \
First 5: {:?}",
post_missing.len(),
post_missing.iter().take(5).collect::<Vec<_>>()
);
}