use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use tempfile::TempDir;
use rhei::{HtapConfig, HtapEngine, OlapEngine, TableSchema};
fn users_schema() -> TableSchema {
TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
])),
vec!["id".to_string()],
)
}
fn users_two_col_schema() -> TableSchema {
TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
])),
vec!["id".to_string()],
)
}
fn orders_schema() -> TableSchema {
TableSchema::new(
"orders",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("user_id", DataType::Int64, true),
Field::new("total", DataType::Int64, true),
])),
vec!["id".to_string()],
)
}
async fn olap_count(engine: &HtapEngine, table: &str) -> i64 {
let batches = engine
.olap()
.query(&format!("SELECT COUNT(*) FROM {table}"))
.await
.unwrap();
batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0)
}
macro_rules! backend_tests {
($mod_name:ident, $make_engine:ident) => {
mod $mod_name {
use super::*;
use std::time::Duration;
use rhei::OltpEngine;
#[tokio::test]
async fn full_htap_pipeline() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.events_processed, 3);
assert_eq!(sync_result.rows_inserted, 3);
assert_eq!(sync_result.rows_updated, 0);
assert_eq!(sync_result.rows_deleted, 0);
let olap_batches = engine
.olap()
.query("SELECT COUNT(*) FROM users")
.await
.unwrap();
assert_eq!(olap_batches.len(), 1);
let count = olap_count(&engine, "users").await;
assert_eq!(count, 3);
let status = engine.sync_status().await.unwrap();
assert_eq!(status.lag, 0);
}
#[tokio::test]
async fn update_and_delete_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
engine
.execute("UPDATE users SET age = 31 WHERE id = 1", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_updated, 1);
engine
.execute("DELETE FROM users WHERE id = 1", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_deleted, 1);
}
#[tokio::test]
async fn query_routing() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
let result = engine
.query("SELECT * FROM users WHERE id = 1")
.await
.unwrap();
assert_eq!(result.len(), 1);
let result = engine.query("SELECT COUNT(*) FROM users").await.unwrap();
assert_eq!(result.len(), 1);
let result = engine
.query_with_hint("SELECT * FROM users", rhei::QueryHint::ForceOlap)
.await
.unwrap();
assert_eq!(result.len(), 1);
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
}
#[tokio::test]
async fn initial_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
let count = olap_count(&engine, "users").await;
assert_eq!(count, 0, "OLAP should be empty before initial_sync");
let rows = engine.initial_sync("users").await.unwrap();
assert_eq!(rows, 3);
let count = olap_count(&engine, "users").await;
assert_eq!(count, 3, "OLAP should have 3 rows after initial_sync");
let cdc_batches = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM _rhei_cdc_log", &[])
.await
.unwrap();
let cdc_count = cdc_batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0);
assert_eq!(
cdc_count, 0,
"CDC log should be empty — initial_sync bypasses CDC"
);
}
#[tokio::test]
async fn add_column() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
&[],
)
.await
.unwrap();
engine.register_table(users_two_col_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice')", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
engine
.execute("ALTER TABLE users ADD COLUMN age INTEGER", &[])
.await
.unwrap();
engine
.add_column("users", "age", DataType::Int64)
.await
.unwrap();
assert_eq!(
engine
.schema_registry()
.get("users")
.unwrap()
.arrow_schema
.fields()
.len(),
3,
"schema registry should have 3 fields after add_column"
);
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 30)", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_inserted, 1);
let batches = engine
.olap()
.query("SELECT age FROM users WHERE id = 2")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
assert!(
batches[0].schema().index_of("age").is_ok(),
"OLAP result should include the new 'age' column"
);
}
#[tokio::test]
async fn drop_column() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
engine.drop_column("users", "age").await.unwrap();
assert_eq!(
engine
.schema_registry()
.get("users")
.unwrap()
.arrow_schema
.fields()
.len(),
2,
"schema registry should have 2 fields after drop_column"
);
engine
.execute("INSERT INTO users VALUES (2, 'Bob')", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_inserted, 1);
let batches = engine
.olap()
.query("SELECT * FROM users WHERE id = 2")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
assert!(
batches[0].schema().index_of("age").is_err(),
"OLAP result should not include the dropped 'age' column"
);
assert!(
engine.drop_column("users", "id").await.is_err(),
"dropping a PK column should return an error"
);
}
#[tokio::test]
async fn batch_insert_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
for i in 1..=5 {
engine
.execute(
&format!("INSERT INTO users VALUES ({i}, 'User{i}', {})", 20 + i),
&[],
)
.await
.unwrap();
}
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 5);
assert_eq!(result.rows_inserted, 5);
let count = olap_count(&engine, "users").await;
assert_eq!(count, 5);
}
#[tokio::test]
async fn cdc_pruning_after_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let cdc_before = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) as cnt FROM _rhei_cdc_log", &[])
.await
.unwrap();
let before_count = cdc_before[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0);
assert_eq!(before_count, 2, "CDC log should have 2 events before sync");
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 2);
assert!(
result.pruned_count.is_some(),
"pruned_count should be set when pruning is enabled"
);
assert_eq!(result.pruned_count.unwrap(), 2);
let cdc_after = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) as cnt FROM _rhei_cdc_log", &[])
.await
.unwrap();
let count_array = cdc_after[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap();
assert_eq!(
count_array.value(0),
0,
"CDC log should be empty after pruning"
);
}
#[tokio::test]
async fn background_sync_loop() {
let tmp = TempDir::new().unwrap();
let mut engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine.start_sync(Duration::from_millis(50));
assert!(engine.is_sync_running());
tokio::time::sleep(Duration::from_millis(500)).await;
let count = olap_count(&engine, "users").await;
assert_eq!(
count, 1,
"background sync should have replicated the row to OLAP"
);
engine.stop_sync().await;
assert!(!engine.is_sync_running());
let status = engine.sync_status().await.unwrap();
assert!(!status.running);
}
#[tokio::test]
async fn concurrent_reads_and_writes() {
let tmp = TempDir::new().unwrap();
let engine = Arc::new($make_engine(&tmp).await);
engine
.execute(
"CREATE TABLE items (id INTEGER PRIMARY KEY, val INTEGER)",
&[],
)
.await
.unwrap();
engine
.register_table(TableSchema::new(
"items",
Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", DataType::Int64, false),
arrow::datatypes::Field::new("val", DataType::Int64, true),
])),
vec!["id".to_string()],
))
.await
.unwrap();
let handles: Vec<_> = (1i64..=8)
.map(|i| {
let eng = Arc::clone(&engine);
tokio::spawn(async move {
eng.execute(
&format!("INSERT INTO items VALUES ({i}, {i})"),
&[],
)
.await
.unwrap();
eng.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM items", &[])
.await
.unwrap();
})
})
.collect();
for h in handles {
h.await.unwrap();
}
engine.sync_now().await.unwrap();
let count = olap_count(&engine, "items").await;
assert_eq!(count, 8, "all 8 rows should have replicated to OLAP");
}
#[tokio::test]
async fn initial_sync_empty_table() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
let rows = engine.initial_sync("users").await.unwrap();
assert_eq!(rows, 0);
let count = olap_count(&engine, "users").await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn initial_sync_all() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute(
"CREATE TABLE orders (id INTEGER PRIMARY KEY, user_id INTEGER, total INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute("INSERT INTO orders VALUES (1, 1, 100)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO orders VALUES (2, 1, 200)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO orders VALUES (3, 2, 150)", &[])
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine.register_table(orders_schema()).await.unwrap();
let total = engine.initial_sync_all().await.unwrap();
assert_eq!(total, 5);
let users_count = olap_count(&engine, "users").await;
assert_eq!(users_count, 2);
let orders_count = olap_count(&engine, "orders").await;
assert_eq!(orders_count, 3);
}
#[tokio::test]
async fn ddl_lock_add_column_during_sync() {
let tmp = TempDir::new().unwrap();
let mut engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
&[],
)
.await
.unwrap();
engine.register_table(users_two_col_schema()).await.unwrap();
for i in 1..=5 {
engine
.execute(
&format!("INSERT INTO users VALUES ({}, 'User{}')", i, i),
&[],
)
.await
.unwrap();
}
engine.start_sync(Duration::from_millis(50));
tokio::time::sleep(Duration::from_millis(150)).await;
engine
.execute("ALTER TABLE users ADD COLUMN age INTEGER", &[])
.await
.unwrap();
engine
.add_column("users", "age", DataType::Int64)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (6, 'User6', 25)", &[])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
engine.stop_sync().await;
let count = olap_count(&engine, "users").await;
assert!(
count >= 5,
"OLAP should have at least 5 rows, got {}",
count
);
let batches = engine
.olap()
.query("SELECT age FROM users WHERE id = 6")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1, "post-add_column row should be in OLAP");
}
#[tokio::test]
async fn ddl_lock_drop_column_consistency() {
let tmp = TempDir::new().unwrap();
let mut engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine.start_sync(Duration::from_millis(50));
tokio::time::sleep(Duration::from_millis(150)).await;
engine.drop_column("users", "age").await.unwrap();
let schema = engine.schema_registry().get("users").unwrap();
assert_eq!(schema.arrow_schema.fields().len(), 2);
engine
.execute("INSERT INTO users VALUES (2, 'Bob')", &[])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
engine.stop_sync().await;
let batches = engine
.olap()
.query("SELECT name FROM users WHERE id = 1")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1, "pre-drop row should still be in OLAP");
}
}
};
}
async fn make_datafusion_engine(tmp: &TempDir) -> HtapEngine {
let db_path = tmp.path().join("test.db");
let config = HtapConfig {
oltp_path: db_path.to_str().unwrap().to_string(),
olap_in_memory: true,
olap_path: None,
sync_batch_size: 100,
prune_after_sync: true,
sync_interval: None,
..Default::default()
};
HtapEngine::new(config)
.await
.expect("failed to create HtapEngine")
}
backend_tests!(datafusion_tests, make_datafusion_engine);
mod datafusion_extra_tests {
use super::*;
use rhei::OltpEngine;
async fn make_engine(tmp: &TempDir) -> HtapEngine {
make_datafusion_engine(tmp).await
}
#[tokio::test]
async fn test_wal_mode_enabled() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
let batches = engine
.oltp()
.unwrap()
.query("PRAGMA journal_mode", &[])
.await
.unwrap();
let mode = batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::StringArray>()
.unwrap()
.value(0);
assert_eq!(mode, "wal", "WAL mode should be active on a file-backed DB");
}
#[tokio::test]
async fn test_schema_evolution_cdc_triggers_rebuilt() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
&[],
)
.await
.unwrap();
engine.register_table(users_two_col_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice')", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
engine
.execute("ALTER TABLE users ADD COLUMN score REAL", &[])
.await
.unwrap();
engine
.add_column("users", "score", DataType::Float64)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 9.5)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
let batches = engine
.olap()
.query("SELECT score FROM users WHERE id = 2")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
assert!(
!batches[0].column(0).is_null(0),
"score should not be NULL — rebuilt CDC trigger must have captured the value"
);
}
}
#[cfg(feature = "duckdb-backend")]
async fn make_duckdb_engine(tmp: &TempDir) -> HtapEngine {
use rhei::OlapBackendType;
let db_path = tmp.path().join("test.db");
let config = HtapConfig {
oltp_path: db_path.to_str().unwrap().to_string(),
olap_in_memory: true,
olap_path: None,
sync_batch_size: 100,
prune_after_sync: true,
sync_interval: None,
olap_backend: OlapBackendType::DuckDb,
read_pool_size: 4,
..Default::default()
};
HtapEngine::new(config)
.await
.expect("failed to create HtapEngine")
}
#[cfg(feature = "duckdb-backend")]
backend_tests!(duckdb_tests, make_duckdb_engine);
#[cfg(feature = "duckdb-backend")]
mod duckdb_extra_tests {
use super::*;
use rhei::OltpEngine;
use serde_json::json;
async fn make_engine(tmp: &TempDir) -> HtapEngine {
make_duckdb_engine(tmp).await
}
#[tokio::test]
async fn test_oltp_create_and_query() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let batches = engine
.oltp()
.unwrap()
.query("SELECT * FROM users ORDER BY id", &[])
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
}
#[tokio::test]
async fn test_register_table_sets_up_cdc() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
let cdc_batches = engine
.oltp()
.unwrap()
.query("SELECT * FROM _rhei_cdc_log", &[])
.await
.unwrap();
assert_eq!(cdc_batches.len(), 1);
assert!(
cdc_batches[0].num_rows() >= 1,
"CDC log should have at least 1 event"
);
}
#[tokio::test]
async fn test_parameterized_execute() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute(
"INSERT INTO users VALUES (?1, ?2, ?3)",
&[json!(1), json!("Alice"), json!(30)],
)
.await
.unwrap();
let batches = engine
.oltp()
.unwrap()
.query("SELECT name FROM users WHERE id = 1", &[])
.await
.unwrap();
assert_eq!(batches[0].num_rows(), 1);
}
}
#[cfg(feature = "datafusion-backend")]
mod temporal_tests {
use super::*;
use arrow::array::{Array, Int64Array, StringArray};
use rhei::SyncMode;
async fn make_temporal_engine(tmp: &TempDir) -> HtapEngine {
let config = HtapConfig {
oltp_path: tmp.path().join("temporal.db").to_str().unwrap().to_string(),
sync_mode: SyncMode::Temporal,
..Default::default()
};
HtapEngine::new(config).await.unwrap()
}
#[tokio::test]
async fn test_temporal_mode_with_cdc() {
let tmp = TempDir::new().unwrap();
let engine = make_temporal_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute(
"INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)",
&[],
)
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert!(result.events_processed >= 1);
assert!(result.rows_inserted >= 1);
let batches = engine
.olap()
.query("SELECT * FROM users WHERE _rhei_operation = 'I' AND name = 'Alice'")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
let schema = batches[0].schema();
assert!(schema.field_with_name("_rhei_valid_from").is_ok());
assert!(schema.field_with_name("_rhei_valid_to").is_ok());
assert!(schema.field_with_name("_rhei_operation").is_ok());
let op_col = batches[0]
.column(schema.index_of("_rhei_operation").unwrap())
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(op_col.value(0), "I");
let valid_to = batches[0]
.column(schema.index_of("_rhei_valid_to").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(valid_to.is_null(0)); }
#[tokio::test]
async fn test_temporal_update_creates_history() {
let tmp = TempDir::new().unwrap();
let engine = make_temporal_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute(
"INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30)",
&[],
)
.await
.unwrap();
engine.sync_now().await.unwrap();
engine
.execute("UPDATE users SET name = 'Bob', age = 31 WHERE id = 1", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.rows_updated, 1);
let batches = engine
.olap()
.query("SELECT * FROM users ORDER BY _rhei_valid_from ASC")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2, "should have 2 versions (original + updated)");
let schema = batches[0].schema();
let op_idx = schema.index_of("_rhei_operation").unwrap();
let valid_to_idx = schema.index_of("_rhei_valid_to").unwrap();
let ops = batches[0]
.column(op_idx)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let valid_to = batches[0]
.column(valid_to_idx)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ops.value(0), "I");
assert!(!valid_to.is_null(0), "original version should be closed");
assert_eq!(ops.value(1), "U");
assert!(valid_to.is_null(1), "updated version should be current");
}
}
#[cfg(all(feature = "datafusion-backend", feature = "rocksdb-cdc"))]
mod rocksdb_bridge_tests {
use super::*;
use rhei::OltpEngine;
async fn make_bridge_engine(tmp: &TempDir) -> HtapEngine {
let db_path = tmp.path().join("bridge_test.db");
let rocksdb_path = tmp.path().join("bridge_rocksdb");
let config = HtapConfig {
oltp_path: db_path.to_str().unwrap().to_string(),
olap_in_memory: true,
olap_path: None,
sync_batch_size: 100,
prune_after_sync: true,
sync_interval: None,
rocksdb_cdc_path: Some(rocksdb_path.to_str().unwrap().to_string()),
..Default::default()
};
HtapEngine::new(config)
.await
.expect("failed to create HtapEngine with RocksDB bridge")
}
#[tokio::test]
async fn test_bridge_full_pipeline() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(
sync_result.events_processed, 3,
"bridge should process 3 events"
);
assert_eq!(sync_result.rows_inserted, 3);
assert_eq!(sync_result.rows_updated, 0);
assert_eq!(sync_result.rows_deleted, 0);
let count = olap_count(&engine, "users").await;
assert_eq!(count, 3, "OLAP should have 3 rows after bridge sync");
let status = engine.sync_status().await.unwrap();
assert_eq!(status.lag, 0, "no lag after sync");
}
#[tokio::test]
async fn test_bridge_prunes_sqlite_log() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let cdc_before = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM _rhei_cdc_log", &[])
.await
.unwrap();
let before_count = cdc_before[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0);
assert_eq!(
before_count, 2,
"SQLite CDC log should have 2 events before sync"
);
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 2);
let cdc_after = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM _rhei_cdc_log", &[])
.await
.unwrap();
let after_count = cdc_after[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0);
assert_eq!(
after_count, 0,
"SQLite CDC log should be empty after bridge sync (events moved to RocksDB)"
);
}
#[tokio::test]
async fn test_bridge_update_delete() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
engine
.execute("UPDATE users SET age = 31 WHERE id = 1", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_updated, 1, "bridge should sync UPDATE");
engine
.execute("DELETE FROM users WHERE id = 1", &[])
.await
.unwrap();
let sync_result = engine.sync_now().await.unwrap();
assert_eq!(sync_result.rows_deleted, 1, "bridge should sync DELETE");
let count = olap_count(&engine, "users").await;
assert_eq!(count, 0, "OLAP should be empty after delete");
}
#[tokio::test]
async fn test_bridge_rocksdb_durability() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 2);
let result2 = engine.sync_now().await.unwrap();
assert_eq!(
result2.events_processed, 0,
"second sync cycle should process 0 events (already applied and pruned)"
);
let count = olap_count(&engine, "users").await;
assert_eq!(
count, 2,
"OLAP should still have 2 rows after idempotent second sync"
);
}
#[tokio::test]
async fn test_bridge_idempotent_no_duplicates() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
let result1 = engine.sync_now().await.unwrap();
assert_eq!(
result1.events_processed, 3,
"first sync should process 3 events"
);
let result2 = engine.sync_now().await.unwrap();
assert_eq!(
result2.events_processed, 0,
"second sync must not re-process already-bridged events"
);
let result3 = engine.sync_now().await.unwrap();
assert_eq!(
result3.events_processed, 0,
"third sync must also be a no-op"
);
let count = olap_count(&engine, "users").await;
assert_eq!(
count, 3,
"OLAP must have exactly 3 rows; duplicate bridging would inflate this"
);
}
#[tokio::test]
async fn test_bridge_lag_reflects_sqlite_backlog() {
let tmp = TempDir::new().unwrap();
let engine = make_bridge_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let status_before = engine.sync_status().await.unwrap();
assert!(
status_before.lag > 0,
"lag should be nonzero before sync (SQLite backlog not yet bridged); \
got lag = {}",
status_before.lag
);
engine.sync_now().await.unwrap();
let status_after = engine.sync_status().await.unwrap();
assert_eq!(
status_after.lag, 0,
"lag should be 0 after sync; got {}",
status_after.lag
);
}
}
#[cfg(feature = "datafusion-backend")]
mod schema_persistence_tests {
use super::*;
async fn make_engine_with_registry(oltp_path: &str, registry_path: &str) -> HtapEngine {
let config = HtapConfig {
oltp_path: oltp_path.to_string(),
schema_registry_path: Some(registry_path.to_string()),
..Default::default()
};
HtapEngine::new(config)
.await
.expect("failed to create engine")
}
#[tokio::test]
async fn schema_persists_across_restarts() {
let tmp = TempDir::new().unwrap();
let db_path = tmp.path().join("persist.db").to_str().unwrap().to_string();
let reg_path = tmp
.path()
.join("registry.json")
.to_str()
.unwrap()
.to_string();
{
let engine = make_engine_with_registry(&db_path, ®_path).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
assert!(
std::path::Path::new(®_path).exists(),
"registry JSON file should have been created after register_table"
);
}
{
let engine = make_engine_with_registry(&db_path, ®_path).await;
let names = engine.schema_registry().table_names();
assert!(
names.contains(&"users".to_string()),
"schema registry should contain 'users' after restart, got: {names:?}"
);
let schema = engine.schema_registry().get("users").unwrap();
assert_eq!(schema.name, "users");
assert_eq!(schema.primary_key, vec!["id".to_string()]);
assert_eq!(schema.arrow_schema.fields().len(), 3);
}
}
#[tokio::test]
async fn schema_persist_round_trips_types() {
let tmp = TempDir::new().unwrap();
let db_path = tmp.path().join("types.db").to_str().unwrap().to_string();
let reg_path = tmp
.path()
.join("types_registry.json")
.to_str()
.unwrap()
.to_string();
let rich_schema = TableSchema::new(
"events",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("score", DataType::Float64, true),
Field::new("label", DataType::Utf8, true),
Field::new("active", DataType::Boolean, true),
Field::new("data", DataType::Binary, true),
])),
vec!["id".to_string()],
);
{
let engine = make_engine_with_registry(&db_path, ®_path).await;
engine
.execute(
"CREATE TABLE events (id INTEGER PRIMARY KEY, score REAL, label TEXT, active INTEGER, data BLOB)",
&[],
)
.await
.unwrap();
engine.register_table(rich_schema).await.unwrap();
}
{
let engine = make_engine_with_registry(&db_path, ®_path).await;
let schema = engine.schema_registry().get("events").unwrap();
assert_eq!(schema.arrow_schema.fields().len(), 5);
let f = |name: &str| schema.arrow_schema.field_with_name(name).unwrap().clone();
assert_eq!(*f("id").data_type(), DataType::Int64);
assert_eq!(*f("score").data_type(), DataType::Float64);
assert_eq!(*f("label").data_type(), DataType::Utf8);
assert_eq!(*f("active").data_type(), DataType::Boolean);
assert_eq!(*f("data").data_type(), DataType::Binary);
}
}
}
#[cfg(all(feature = "datafusion-backend", feature = "sidecar"))]
mod sidecar_tests {
use super::*;
use arrow::array::StringArray;
use rhei::{
DeleteDetection, SidecarConfig, SidecarSource, SyncMode, TimestampCdcConfig,
TimestampTableConfig,
};
fn external_users_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "users".to_string(),
created_at_column: "created_at".to_string(),
updated_at_column: "updated_at".to_string(),
primary_key: vec!["id".to_string()],
columns: vec![],
}
}
fn external_users_schema() -> TableSchema {
TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("created_at", DataType::Int64, false),
Field::new("updated_at", DataType::Int64, false),
])),
vec!["id".to_string()],
)
}
fn setup_external_db(path: &str) {
let conn = rusqlite::Connection::open(path).unwrap();
conn.execute_batch(
"CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)",
)
.unwrap();
}
#[tokio::test]
async fn test_sidecar_temporal_full_pipeline() {
let tmp = TempDir::new().unwrap();
let ext_path = tmp.path().join("external.db");
let ext_path_str = ext_path.to_str().unwrap();
setup_external_db(ext_path_str);
{
let conn = rusqlite::Connection::open(ext_path_str).unwrap();
conn.execute(
"INSERT INTO users (id, name, created_at, updated_at) VALUES (1, 'Alice', 1000, 1000)",
[],
)
.unwrap();
conn.execute(
"INSERT INTO users (id, name, created_at, updated_at) VALUES (2, 'Bob', 1001, 1001)",
[],
)
.unwrap();
}
let config = HtapConfig {
oltp_path: tmp.path().join("local.db").to_str().unwrap().to_string(),
sync_mode: SyncMode::Temporal,
sidecar: Some(SidecarConfig {
source: SidecarSource::Sqlite(ext_path_str.to_string()),
timestamp_config: TimestampCdcConfig {
tables: vec![external_users_table_config()],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
},
enable_local_oltp: false,
watermark_path: None,
}),
..Default::default()
};
let engine = HtapEngine::new(config).await.unwrap();
engine
.register_table(external_users_schema())
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 2);
assert_eq!(result.rows_inserted, 2);
let batches = engine
.olap()
.query("SELECT * FROM users ORDER BY id")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
let schema = batches[0].schema();
assert!(schema.field_with_name("_rhei_valid_from").is_ok());
assert!(schema.field_with_name("_rhei_valid_to").is_ok());
assert!(schema.field_with_name("_rhei_operation").is_ok());
}
#[tokio::test]
async fn test_sidecar_oltp_disabled() {
let tmp = TempDir::new().unwrap();
let ext_path = tmp.path().join("external.db");
let ext_path_str = ext_path.to_str().unwrap();
setup_external_db(ext_path_str);
let config = HtapConfig {
oltp_path: tmp.path().join("local.db").to_str().unwrap().to_string(),
sync_mode: SyncMode::Destructive,
sidecar: Some(SidecarConfig {
source: SidecarSource::Sqlite(ext_path_str.to_string()),
timestamp_config: TimestampCdcConfig {
tables: vec![external_users_table_config()],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
},
enable_local_oltp: false,
watermark_path: None,
}),
..Default::default()
};
let engine = HtapEngine::new(config).await.unwrap();
assert!(engine.oltp().is_none());
let err = engine.execute("INSERT INTO foo VALUES (1)", &[]).await;
assert!(err.is_err());
}
#[tokio::test]
async fn test_sidecar_point_in_time_query() {
let tmp = TempDir::new().unwrap();
let ext_path = tmp.path().join("external.db");
let ext_path_str = ext_path.to_str().unwrap();
setup_external_db(ext_path_str);
{
let conn = rusqlite::Connection::open(ext_path_str).unwrap();
conn.execute(
"INSERT INTO users (id, name, created_at, updated_at) VALUES (1, 'Alice', 1000, 1000)",
[],
)
.unwrap();
}
let config = HtapConfig {
oltp_path: tmp.path().join("local.db").to_str().unwrap().to_string(),
sync_mode: SyncMode::Temporal,
sidecar: Some(SidecarConfig {
source: SidecarSource::Sqlite(ext_path_str.to_string()),
timestamp_config: TimestampCdcConfig {
tables: vec![external_users_table_config()],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
},
enable_local_oltp: false,
watermark_path: None,
}),
..Default::default()
};
let engine = HtapEngine::new(config).await.unwrap();
engine
.register_table(external_users_schema())
.await
.unwrap();
engine.sync_now().await.unwrap();
{
let conn = rusqlite::Connection::open(ext_path_str).unwrap();
conn.execute(
"UPDATE users SET name = 'Bob', updated_at = 2000 WHERE id = 1",
[],
)
.unwrap();
}
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 1);
assert_eq!(result.rows_updated, 1);
let batches = engine
.olap()
.query(
"SELECT name FROM users WHERE id = 1 \
AND _rhei_valid_from <= 1500 \
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 1500)",
)
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
let name = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
assert_eq!(name, "Alice", "at time 1500, user should still be Alice");
let batches = engine
.olap()
.query(
"SELECT name FROM users WHERE id = 1 \
AND _rhei_valid_from <= 2500 \
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 2500)",
)
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
let name = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
assert_eq!(name, "Bob", "at time 2500, user should be Bob");
}
}
mod register_table_idempotency {
use super::*;
async fn make_engine(tmp: &TempDir) -> HtapEngine {
make_datafusion_engine(tmp).await
}
#[tokio::test]
async fn register_table_idempotent_matching_schema() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.register_table(users_schema())
.await
.expect("register_table with matching schema must be idempotent (Ok)");
}
#[tokio::test]
async fn register_table_errors_on_conflicting_schema() {
let tmp = TempDir::new().unwrap();
let engine = make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
let conflicting = TableSchema::new(
"users",
Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", DataType::Int64, false),
arrow::datatypes::Field::new("name", DataType::Utf8, true),
arrow::datatypes::Field::new("age", DataType::Float64, true), ])),
vec!["id".to_string()],
);
let result = engine.register_table(conflicting).await;
assert!(
result.is_err(),
"register_table with conflicting schema must return an error"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("different schema"),
"error message should mention 'different schema', got: {err_msg}"
);
}
}