use std::{
sync::{Arc, Weak},
time::Duration,
};
use merutable::engine::{background::BackgroundWorkers, EngineConfig, MeruEngine};
use merutable::types::schema::{ColumnDef, ColumnType, TableSchema};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "bgdrop".into(),
columns: vec![ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
}],
primary_key: vec![0],
..Default::default()
}
}
fn config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
flush_parallelism: 1,
compaction_parallelism: 2,
..Default::default()
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn drop_without_shutdown_releases_engine_arc() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(config(&tmp)).await.unwrap();
let weak: Weak<MeruEngine> = Arc::downgrade(&engine);
let workers = BackgroundWorkers::spawn(engine.clone());
assert!(
weak.strong_count() > 1,
"workers should hold strong refs while alive"
);
drop(workers);
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while weak.strong_count() > 1 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::task::yield_now().await;
}
assert_eq!(
weak.strong_count(),
1,
"background workers must release Arc<MeruEngine> after drop; \
lingering strong_count = {} implies tasks are still orphaned",
weak.strong_count()
);
engine.close().await.unwrap();
}