use motedb::{Database, DBConfig, StreamingQueryResult};
use motedb::types::Value;
use tempfile::TempDir;
fn exec(db: &Database, sql: &str) -> motedb::sql::QueryResult {
db.execute(sql).expect("execute SQL").materialize().expect("materialize")
}
fn count_rows(db: &Database) -> usize {
let mut count = 0;
let mut result = db.execute("SELECT id FROM t").unwrap();
if let StreamingQueryResult::SelectStreaming { ref mut rows, .. } = result {
while let Some(Ok(_)) = rows.next() { count += 1; }
}
count
}
fn get_row(db: &Database, id: i64) -> Option<Vec<Value>> {
let sql = format!("SELECT * FROM t WHERE id = {}", id);
let result = exec(db, &sql);
match result {
motedb::sql::QueryResult::Select { rows, .. } => rows.into_iter().next(),
_ => None,
}
}
fn get_val(db: &Database, id: i64) -> String {
let row = get_row(db, id).unwrap_or_else(|| panic!("Row {} should exist", id));
match &row[1] {
Value::Text(s) => s.clone(),
other => panic!("Row {} col 1 expected Text, got {:?}", id, other),
}
}
fn wait_for_compaction() {
std::thread::sleep(std::time::Duration::from_secs(5));
}
fn make_db() -> (TempDir, Database) {
let dir = TempDir::new().unwrap();
let db = Database::create_with_config(dir.path(), DBConfig::for_edge()).unwrap();
(dir, db)
}
fn create_table(db: &Database) {
exec(db, "CREATE TABLE t (id INTEGER PRIMARY KEY, status TEXT, amount FLOAT)");
}
fn lsm_dir_for(dir: &TempDir) -> std::path::PathBuf {
dir.path().with_extension("mote").join("lsm")
}
fn count_sst_files(dir: &TempDir) -> usize {
let lsm = lsm_dir_for(dir);
std::fs::read_dir(&lsm).unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("sst"))
.count()
}
#[test]
fn test_compaction_preserves_all_rows() {
let (_dir, db) = make_db();
create_table(&db);
for batch in 0..5i64 {
let start = batch * 2000 + 1;
let end = start + 2000;
for i in start..end {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'v_{}', 0.0)", i, i));
}
db.flush().unwrap();
}
wait_for_compaction();
let count = count_rows(&db);
assert_eq!(count, 10000, "After compaction: expected 10000 rows, got {}", count);
for i in (1..=10000).step_by(100) {
let row = get_row(&db, i).unwrap_or_else(|| panic!("Row {} missing after compaction", i));
assert!(row.len() >= 2, "Row {} has too few columns", i);
}
}
#[test]
fn test_compaction_with_overlapping_keys() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=100i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'initial', 0.0)", i));
}
db.flush().unwrap();
for i in 1..=50i64 {
exec(&db, &format!("UPDATE t SET status = 'v1' WHERE id = {}", i));
}
db.flush().unwrap();
for i in 1..=25i64 {
exec(&db, &format!("UPDATE t SET status = 'v2' WHERE id = {}", i));
}
db.flush().unwrap();
wait_for_compaction();
db.flush().unwrap();
let mut ok = false;
for attempt in 0..8 {
let count = count_rows(&db);
if count != 100 {
eprintln!("Attempt {}: count={}, waiting...", attempt, count);
std::thread::sleep(std::time::Duration::from_secs(3));
db.flush().unwrap();
continue;
}
let mut all_correct = true;
for i in 1..=25i64 {
if get_val(&db, i) != "v2" { all_correct = false; break; }
}
if all_correct {
for i in 26..=50i64 {
if get_val(&db, i) != "v1" { all_correct = false; break; }
}
}
if all_correct {
for i in 51..=100i64 {
if get_val(&db, i) != "initial" { all_correct = false; break; }
}
}
if all_correct { ok = true; break; }
eprintln!("Attempt {}: values not yet correct, waiting...", attempt);
std::thread::sleep(std::time::Duration::from_secs(3));
db.flush().unwrap();
}
assert!(ok, "Compaction did not settle with correct values after 8 attempts");
assert_eq!(count_rows(&db), 100, "Should have 100 rows after compaction");
}
#[test]
fn test_compaction_after_delete() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=500i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'active', {})", i, i));
}
db.flush().unwrap();
for i in 1..=250i64 {
exec(&db, &format!("DELETE FROM t WHERE id = {}", i));
}
db.flush().unwrap();
wait_for_compaction();
assert_eq!(count_rows(&db), 250, "Should have 250 rows after delete + compaction");
assert!(get_row(&db, 1).is_none(), "Deleted row 1 should be gone");
assert!(get_row(&db, 250).is_none(), "Deleted row 250 should be gone");
assert!(get_row(&db, 251).is_some(), "Row 251 should exist");
assert!(get_row(&db, 500).is_some(), "Row 500 should exist");
}
#[test]
fn test_compaction_tombstone_propagation() {
let (_dir, db) = make_db();
create_table(&db);
exec(&db, "INSERT INTO t VALUES (42, 'alive', 1.0)");
db.flush().unwrap();
assert!(get_row(&db, 42).is_some(), "Row should exist after flush");
exec(&db, "DELETE FROM t WHERE id = 42");
db.flush().unwrap();
wait_for_compaction();
assert!(get_row(&db, 42).is_none(), "Row should be gone after tombstone compaction");
assert_eq!(count_rows(&db), 0, "Table should be empty");
}
#[test]
fn test_multi_level_compaction() {
let (_dir, db) = make_db();
create_table(&db);
for batch in 0..5i64 {
let start = batch * 2000 + 1;
let end = start + 2000;
for i in start..end {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'data', {:.1})", i, i as f64));
}
db.flush().unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
}
wait_for_compaction();
let count = count_rows(&db);
assert_eq!(count, 10000, "All 10000 rows should survive multi-level compaction, got {}", count);
for i in (1..=10000).step_by(200) {
assert!(get_row(&db, i).is_some(), "Row {} missing", i);
}
}
#[test]
fn test_concurrent_writes_and_scan() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=1000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'base', 0.0)", i));
}
db.flush().unwrap();
let scan_counts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
std::thread::scope(|s| {
let db = &db;
let counts = scan_counts.clone();
s.spawn(move || {
for i in 1001..=3000i64 {
exec(db, &format!("INSERT INTO t VALUES ({}, 'new', 0.0)", i));
}
});
s.spawn(move || {
for _ in 0..10 {
let c = count_rows(db);
counts.lock().unwrap().push(c);
}
});
});
let counts = scan_counts.lock().unwrap();
for (i, &c) in counts.iter().enumerate() {
assert!(c >= 1000, "Scan {} saw {} rows, expected >= 1000", i, c);
}
let final_count = count_rows(&db);
assert_eq!(final_count, 3000, "Final count should be 3000, got {}", final_count);
}
#[test]
fn test_concurrent_flush_and_scan() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=3000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'x', {})", i, i));
}
let mut min_count = usize::MAX;
let mut max_count = 0;
for _ in 0..10 {
let c = count_rows(&db);
min_count = min_count.min(c);
max_count = max_count.max(c);
}
assert!(min_count >= 3000, "Min scan count {} < 3000", min_count);
let final_count = count_rows(&db);
assert_eq!(final_count, 3000, "Final count should be 3000, got {}", final_count);
}
#[test]
fn test_concurrent_compaction_and_point_get() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=5000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'val_{}', {:.2})", i, i, i as f64 * 1.5));
}
db.flush().unwrap();
for i in (1..=5000).step_by(50) {
let row = get_row(&db, i).unwrap_or_else(|| {
panic!("Row {} should exist during compaction", i)
});
assert_eq!(row[0], Value::Integer(i), "Row {} has wrong id", i);
}
for i in 1..=100i64 {
exec(&db, &format!("UPDATE t SET status = 'updated' WHERE id = {}", i));
}
db.flush().unwrap();
for i in (1..=100).step_by(10) {
assert_eq!(get_val(&db, i), "updated", "Row {} should be updated", i);
}
for i in (101..=5000).step_by(100) {
assert_eq!(get_val(&db, i), format!("val_{}", i), "Row {} should be original", i);
}
assert_eq!(count_rows(&db), 5000, "All 5000 rows should be present");
}
#[test]
fn test_many_small_sstables() {
let (_dir, db) = make_db();
create_table(&db);
for round in 0..20i64 {
let base = round * 1000 + 1;
for i in 0..10i64 {
let id = base + i;
exec(&db, &format!("INSERT INTO t VALUES ({}, 'small', 0.0)", id));
}
db.flush().unwrap();
}
wait_for_compaction();
assert_eq!(count_rows(&db), 200, "Should have 200 rows across 20 SSTables, got {}", count_rows(&db));
for round in 0..20i64 {
let base = round * 1000 + 1;
for i in 0..10i64 {
let id = base + i;
assert!(get_row(&db, id).is_some(), "Row {} missing", id);
}
}
}
#[test]
fn test_empty_table_scan() {
let (_dir, db) = make_db();
create_table(&db);
assert_eq!(count_rows(&db), 0, "Empty table should have 0 rows");
db.flush().unwrap();
assert_eq!(count_rows(&db), 0, "Empty table after flush should have 0 rows");
}
#[test]
fn test_single_row_lifecycle() {
let (_dir, db) = make_db();
create_table(&db);
exec(&db, "INSERT INTO t VALUES (1, 'original', 10.0)");
assert!(get_row(&db, 1).is_some(), "Row exists in memtable");
assert_eq!(get_val(&db, 1), "original");
db.flush().unwrap();
assert!(get_row(&db, 1).is_some(), "Row exists in SSTable");
assert_eq!(get_val(&db, 1), "original");
exec(&db, "UPDATE t SET status = 'updated' WHERE id = 1");
assert_eq!(get_val(&db, 1), "updated");
db.flush().unwrap();
assert_eq!(get_val(&db, 1), "updated");
exec(&db, "DELETE FROM t WHERE id = 1");
assert!(get_row(&db, 1).is_none(), "Row gone from memtable");
db.flush().unwrap();
assert!(get_row(&db, 1).is_none(), "Row gone after flush");
wait_for_compaction();
assert!(get_row(&db, 1).is_none(), "Row gone after compaction");
}
#[test]
fn test_large_dataset_compaction() {
let (_dir, db) = make_db();
create_table(&db);
for batch in 0..4i64 {
let start = batch * 5000 + 1;
let end = start + 5000;
for i in start..end {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'v_{}', {:.2})", i, i, i as f64));
}
db.flush().unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
}
wait_for_compaction();
assert_eq!(count_rows(&db), 20000, "Should have 20000 rows after compaction");
for i in (1..=20000).step_by(500) {
assert!(get_row(&db, i).is_some(), "Row {} missing", i);
}
for i in 1..=2000i64 {
exec(&db, &format!("UPDATE t SET status = 'final' WHERE id = {}", i));
}
db.flush().unwrap();
wait_for_compaction();
assert_eq!(count_rows(&db), 20000, "Still 20000 after UPDATE");
for i in (1..=2000).step_by(500) {
assert_eq!(get_val(&db, i), "final", "Row {} should be updated", i);
}
}
#[test]
fn test_delete_nonexistent_key() {
let (_dir, db) = make_db();
create_table(&db);
exec(&db, "INSERT INTO t VALUES (1, 'here', 1.0)");
exec(&db, "DELETE FROM t WHERE id = 999");
assert_eq!(count_rows(&db), 1, "Only row 1 should exist");
assert!(get_row(&db, 1).is_some(), "Row 1 should still exist");
assert!(get_row(&db, 999).is_none(), "Row 999 should not exist");
}
#[test]
fn test_update_then_scan_preserves_all() {
let (_dir, db) = make_db();
create_table(&db);
for i in 1..=5000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'pending', {:.2})", i, i as f64 * 10.0));
}
db.flush().unwrap();
for i in 1..=2500i64 {
exec(&db, &format!("UPDATE t SET status = 'completed' WHERE id = {}", i));
}
db.flush().unwrap();
for i in 1..=1000i64 {
exec(&db, &format!("UPDATE t SET status = 'final' WHERE id = {}", i));
}
db.flush().unwrap();
let mut ok = false;
for attempt in 0..8 {
db.flush().unwrap();
let total = count_rows(&db);
if total != 5000 {
eprintln!("Attempt {}: count={}, waiting...", attempt, total);
std::thread::sleep(std::time::Duration::from_secs(3));
continue;
}
let mut final_count = 0u64;
let mut completed_count = 0u64;
let mut pending_count = 0u64;
let mut result = db.execute("SELECT id, status FROM t").unwrap();
if let StreamingQueryResult::SelectStreaming { ref mut rows, .. } = result {
while let Some(Ok(row)) = rows.next() {
match &row[1] {
Value::Text(s) if s == "final" => final_count += 1,
Value::Text(s) if s == "completed" => completed_count += 1,
Value::Text(s) if s == "pending" => pending_count += 1,
other => panic!("Unexpected status: {:?}", other),
}
}
}
if final_count == 1000 && completed_count == 1500 && pending_count == 2500 {
ok = true;
break;
}
eprintln!("Attempt {}: final={}, completed={}, pending={}", attempt, final_count, completed_count, pending_count);
std::thread::sleep(std::time::Duration::from_secs(3));
}
assert!(ok, "Status distribution did not settle after 8 attempts");
}
#[test]
fn test_deferred_deletion_keeps_files_alive() {
let (dir, db) = make_db();
create_table(&db);
for i in 1..=500i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'a', 0.0)", i));
}
db.flush().unwrap();
for i in 501..=1000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'b', 0.0)", i));
}
db.flush().unwrap();
wait_for_compaction();
let file_count = count_sst_files(&dir);
assert!(file_count >= 2, "Deferred deletion: expected >= 2 sst files, got {}", file_count);
for i in 1001..=1500i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'c', 0.0)", i));
}
db.flush().unwrap();
wait_for_compaction();
assert_eq!(count_rows(&db), 1500, "All 1500 rows should be present");
}
#[test]
fn test_orphan_cleanup_on_open() {
let dir = TempDir::new().unwrap();
let path = dir.path().to_path_buf();
{
let db = Database::create_with_config(&path, DBConfig::for_edge()).unwrap();
exec(&db, "CREATE TABLE t (id INTEGER PRIMARY KEY, status TEXT)");
for i in 1..=100i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'ok')", i));
}
db.flush().unwrap();
db.close().unwrap();
}
let lsm = path.with_extension("mote").join("lsm");
let orphan = lsm.join("l0_orphan_999999.sst");
std::fs::write(&orphan, b"garbage data not a real sstable").unwrap();
assert!(orphan.exists(), "Orphan file should exist before open");
{
let db = Database::open_with_config(&path, DBConfig::for_edge()).unwrap();
assert!(!orphan.exists(), "Orphan SSTable should be cleaned up on open");
let count = count_rows_via(&db, "SELECT id FROM t");
assert_eq!(count, 100, "Original data should survive reopen");
}
}
fn count_rows_via(db: &Database, sql: &str) -> usize {
let mut count = 0;
let mut result = db.execute(sql).unwrap();
if let StreamingQueryResult::SelectStreaming { ref mut rows, .. } = result {
while let Some(Ok(_)) = rows.next() { count += 1; }
}
count
}
#[test]
fn test_data_survives_restart() {
let dir = TempDir::new().unwrap();
let path = dir.path().to_path_buf();
{
let db = Database::create_with_config(&path, DBConfig::for_edge()).unwrap();
exec(&db, "CREATE TABLE t (id INTEGER PRIMARY KEY, status TEXT)");
for i in 1..=5000i64 {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'val_{}')", i, i));
}
db.flush().unwrap();
db.close().unwrap();
}
{
let db = Database::open_with_config(&path, DBConfig::for_edge()).unwrap();
let count = count_rows_via(&db, "SELECT id FROM t");
assert_eq!(count, 5000, "All rows should survive restart, got {}", count);
for i in (1..=5000).step_by(100) {
let sql = format!("SELECT * FROM t WHERE id = {}", i);
let result = exec(&db, &sql);
match result {
motedb::sql::QueryResult::Select { rows, .. } => {
let row = rows.into_iter().next()
.unwrap_or_else(|| panic!("Row {} missing after restart", i));
assert_eq!(row[1], Value::Text(format!("val_{}", i)),
"Row {} value mismatch after restart", i);
}
_ => panic!("Expected Select result for row {}", i),
}
}
}
}
#[test]
fn test_compaction_result_survives_restart() {
let dir = TempDir::new().unwrap();
let path = dir.path().to_path_buf();
{
let db = Database::create_with_config(&path, DBConfig::for_edge()).unwrap();
exec(&db, "CREATE TABLE t (id INTEGER PRIMARY KEY, status TEXT)");
for batch in 0..5i64 {
let start = batch * 2000 + 1;
let end = start + 2000;
for i in start..end {
exec(&db, &format!("INSERT INTO t VALUES ({}, 'data')", i));
}
db.flush().unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
}
wait_for_compaction();
db.close().unwrap();
}
{
let db = Database::open_with_config(&path, DBConfig::for_edge()).unwrap();
let count = count_rows_via(&db, "SELECT id FROM t");
assert_eq!(count, 10000, "All 10000 rows should survive compaction + restart, got {}", count);
for i in (1..=10000).step_by(200) {
let sql = format!("SELECT * FROM t WHERE id = {}", i);
let result = exec(&db, &sql);
match result {
motedb::sql::QueryResult::Select { rows, .. } => {
assert!(rows.into_iter().next().is_some(), "Row {} missing after restart", i);
}
_ => panic!("Expected Select result for row {}", i),
}
}
}
}