use std::sync::Arc;
use std::time::Duration;
use arrow::array::{Array, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use tempfile::TempDir;
use rhei::{HtapConfig, HtapEngine, OlapEngine, OltpEngine, TableSchema};
fn users_schema() -> TableSchema {
TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int64, true),
])),
vec!["id".to_string()],
)
}
fn products_schema() -> TableSchema {
TableSchema::new(
"products",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("price", DataType::Float64, true),
])),
vec!["id".to_string()],
)
}
async fn olap_string(engine: &HtapEngine, sql: &str, col: usize, row: usize) -> String {
let batches = engine.olap().query(sql).await.unwrap();
batches[0]
.column(col)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(row)
.to_string()
}
async fn olap_i64(engine: &HtapEngine, sql: &str, col: usize, row: usize) -> i64 {
let batches = engine.olap().query(sql).await.unwrap();
batches[0]
.column(col)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row)
}
async fn olap_f64(engine: &HtapEngine, sql: &str, col: usize, row: usize) -> f64 {
let batches = engine.olap().query(sql).await.unwrap();
batches[0]
.column(col)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.value(row)
}
async fn olap_count(engine: &HtapEngine, table: &str) -> i64 {
let batches = engine
.olap()
.query(&format!("SELECT COUNT(*) FROM {table}"))
.await
.unwrap();
batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0)
}
async fn cdc_log_count(engine: &HtapEngine) -> i64 {
let batches = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM _rhei_cdc_log", &[])
.await
.unwrap();
batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0)
}
async fn olap_has_column(engine: &HtapEngine, table: &str, column: &str) -> bool {
match engine
.olap()
.query(&format!("SELECT \"{column}\" FROM {table} LIMIT 0"))
.await
{
Ok(_) => true,
Err(_) => false,
}
}
macro_rules! htap_e2e_tests {
($mod_name:ident, $make_engine:ident) => {
mod $mod_name {
use super::*;
#[tokio::test]
async fn e2e_crud_lifecycle() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
let cdc_count = cdc_log_count(&engine).await;
assert_eq!(cdc_count, 3, "CDC log should have 3 INSERT events");
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 3);
assert_eq!(result.rows_inserted, 3);
assert_eq!(result.rows_updated, 0);
assert_eq!(result.rows_deleted, 0);
assert_eq!(olap_count(&engine, "users").await, 3);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 1", 0, 0).await,
"Alice"
);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 2", 0, 0).await,
25
);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 3", 0, 0).await,
"Charlie"
);
let status = engine.sync_status().await.unwrap();
assert_eq!(status.lag, 0, "no pending CDC events after sync");
engine
.execute("UPDATE users SET name = 'Alice Updated', age = 31 WHERE id = 1", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.rows_updated, 1);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 1", 0, 0).await,
"Alice Updated"
);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 1", 0, 0).await,
31
);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 2", 0, 0).await,
"Bob"
);
engine
.execute("DELETE FROM users WHERE id = 2", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.rows_deleted, 1);
assert_eq!(olap_count(&engine, "users").await, 2);
let batches = engine
.olap()
.query("SELECT id FROM users ORDER BY id")
.await
.unwrap();
let ids = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ids.value(0), 1, "Alice should remain");
assert_eq!(ids.value(1), 3, "Charlie should remain");
}
#[tokio::test]
async fn e2e_multi_table_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute(
"CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL)",
&[],
)
.await
.unwrap();
engine.register_table(products_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO products VALUES (1, 'Widget', 9.99)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO products VALUES (2, 'Gadget', 19.99)", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 4);
assert_eq!(olap_count(&engine, "users").await, 2);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 1", 0, 0).await,
"Alice"
);
assert_eq!(olap_count(&engine, "products").await, 2);
assert_eq!(
olap_string(&engine, "SELECT name FROM products WHERE id = 1", 0, 0).await,
"Widget"
);
let price = olap_f64(
&engine,
"SELECT price FROM products WHERE id = 2",
0,
0,
)
.await;
assert!((price - 19.99).abs() < 0.01, "price should be 19.99");
let batches = engine
.olap()
.query("SELECT COUNT(*) as total FROM users UNION ALL SELECT COUNT(*) FROM products")
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2, "UNION ALL should return 2 rows");
}
#[tokio::test]
async fn e2e_schema_evolution() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
&[],
)
.await
.unwrap();
engine
.register_table(TableSchema::new(
"users",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
])),
vec!["id".to_string()],
))
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice')", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
assert_eq!(olap_count(&engine, "users").await, 1);
assert!(!olap_has_column(&engine, "users", "age").await);
engine
.execute("ALTER TABLE users ADD COLUMN age INTEGER", &[])
.await
.unwrap();
engine
.add_column("users", "age", DataType::Int64)
.await
.unwrap();
let schema = engine.schema_registry().get("users").unwrap();
assert_eq!(schema.arrow_schema.fields().len(), 3);
assert!(olap_has_column(&engine, "users", "age").await);
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.rows_inserted, 1);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 2", 0, 0).await,
25
);
engine.drop_column("users", "age").await.unwrap();
let schema = engine.schema_registry().get("users").unwrap();
assert_eq!(schema.arrow_schema.fields().len(), 2);
assert!(!olap_has_column(&engine, "users", "age").await);
engine
.execute("INSERT INTO users VALUES (3, 'Charlie')", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
assert_eq!(olap_count(&engine, "users").await, 3);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 3", 0, 0).await,
"Charlie"
);
}
#[tokio::test]
async fn e2e_cdc_pruning() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
for i in 1..=5 {
engine
.execute(
&format!("INSERT INTO users VALUES ({i}, 'User{i}', {})", 20 + i),
&[],
)
.await
.unwrap();
}
assert_eq!(cdc_log_count(&engine).await, 5);
let result = engine.sync_now().await.unwrap();
assert_eq!(result.events_processed, 5);
assert!(result.pruned_count.is_some());
assert_eq!(result.pruned_count.unwrap(), 5);
assert_eq!(cdc_log_count(&engine).await, 0);
assert_eq!(olap_count(&engine, "users").await, 5);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 3", 0, 0).await,
23
);
}
#[tokio::test]
async fn e2e_background_sync() {
let tmp = TempDir::new().unwrap();
let mut engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine.start_sync(Duration::from_millis(50));
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(olap_count(&engine, "users").await, 2);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 1", 0, 0).await,
"Alice"
);
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(olap_count(&engine, "users").await, 3);
engine.stop_sync().await;
}
#[tokio::test]
async fn e2e_query_routing() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
engine.sync_now().await.unwrap();
let batches = engine
.query("SELECT AVG(age) as avg_age FROM users")
.await
.unwrap();
let avg = batches[0]
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.value(0);
assert!((avg - 30.0).abs() < 0.01, "AVG(age) should be 30.0");
let batches = engine
.query("SELECT MIN(age), MAX(age) FROM users")
.await
.unwrap();
let min = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
let max = batches[0]
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(min, 25);
assert_eq!(max, 35);
let batches = engine
.query_with_hint(
"SELECT * FROM users ORDER BY id",
rhei::QueryHint::ForceOlap,
)
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 3);
let batches = engine
.query("SELECT name FROM users WHERE id = 1")
.await
.unwrap();
let name = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
assert_eq!(name, "Alice");
}
#[tokio::test]
async fn e2e_initial_sync() {
let tmp = TempDir::new().unwrap();
let engine = $make_engine(&tmp).await;
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (1, 'Alice', 30)", &[])
.await
.unwrap();
engine
.execute("INSERT INTO users VALUES (2, 'Bob', 25)", &[])
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
assert_eq!(olap_count(&engine, "users").await, 0);
let rows = engine.initial_sync("users").await.unwrap();
assert_eq!(rows, 2);
assert_eq!(olap_count(&engine, "users").await, 2);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 1", 0, 0).await,
"Alice"
);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 2", 0, 0).await,
25
);
assert_eq!(cdc_log_count(&engine).await, 0);
engine
.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", &[])
.await
.unwrap();
let result = engine.sync_now().await.unwrap();
assert_eq!(result.rows_inserted, 1);
assert_eq!(olap_count(&engine, "users").await, 3);
}
#[tokio::test]
async fn e2e_concurrent_writes() {
let tmp = TempDir::new().unwrap();
let engine = Arc::new($make_engine(&tmp).await);
engine
.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)",
&[],
)
.await
.unwrap();
engine.register_table(users_schema()).await.unwrap();
let handles: Vec<_> = (1i64..=10)
.map(|i| {
let eng = Arc::clone(&engine);
tokio::spawn(async move {
eng.execute(
&format!("INSERT INTO users VALUES ({i}, 'User{i}', {})", 20 + i),
&[],
)
.await
.unwrap();
})
})
.collect();
for h in handles {
h.await.unwrap();
}
let batches = engine
.oltp()
.unwrap()
.query("SELECT COUNT(*) FROM users", &[])
.await
.unwrap();
let oltp_count = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(oltp_count, 10, "OLTP should have 10 rows");
engine.sync_now().await.unwrap();
assert_eq!(olap_count(&engine, "users").await, 10);
assert_eq!(
olap_i64(&engine, "SELECT age FROM users WHERE id = 5", 0, 0).await,
25
);
assert_eq!(
olap_string(&engine, "SELECT name FROM users WHERE id = 10", 0, 0).await,
"User10"
);
let batches = engine
.olap()
.query("SELECT CAST(SUM(age) AS BIGINT) FROM users")
.await
.unwrap();
let sum = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(sum, 255, "SUM(age) should be 255");
}
}
};
}
#[cfg(feature = "datafusion-backend")]
async fn make_datafusion_engine(tmp: &TempDir) -> HtapEngine {
let config = HtapConfig {
oltp_path: tmp.path().join("test.db").to_str().unwrap().to_string(),
..Default::default()
};
HtapEngine::new(config).await.unwrap()
}
#[cfg(feature = "duckdb-backend")]
async fn make_duckdb_engine(tmp: &TempDir) -> HtapEngine {
use rhei::OlapBackendType;
let config = HtapConfig {
oltp_path: tmp.path().join("test.db").to_str().unwrap().to_string(),
olap_backend: OlapBackendType::DuckDb,
..Default::default()
};
HtapEngine::new(config).await.unwrap()
}
#[cfg(feature = "datafusion-backend")]
htap_e2e_tests!(datafusion_e2e, make_datafusion_engine);
#[cfg(feature = "duckdb-backend")]
htap_e2e_tests!(duckdb_e2e, make_duckdb_engine);