#![cfg(all(feature = "datafusion-backend", feature = "sidecar"))]
use std::sync::Arc;
use std::time::Instant;
use arrow::array::{Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use tempfile::TempDir;
use rhei::{
DeleteDetection, HtapConfig, HtapEngine, OlapEngine, SidecarConfig, SidecarSource, SyncMode,
TableSchema, TimestampCdcConfig, TimestampTableConfig,
};
fn setup_external_db(path: &str) {
let conn = rusqlite::Connection::open(path).unwrap();
conn.execute_batch(
"
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
deleted_at INTEGER
);
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
deleted_at INTEGER
);
",
)
.unwrap();
}
fn orders_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "orders".into(),
created_at_column: "created_at".into(),
updated_at_column: "updated_at".into(),
primary_key: vec!["id".into()],
columns: vec![],
}
}
fn customers_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "customers".into(),
created_at_column: "created_at".into(),
updated_at_column: "updated_at".into(),
primary_key: vec!["id".into()],
columns: vec![],
}
}
fn orders_schema() -> TableSchema {
TableSchema::new(
"orders",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("customer_id", DataType::Int64, false),
Field::new("amount", DataType::Int64, false),
Field::new("status", DataType::Utf8, false),
Field::new("created_at", DataType::Int64, false),
Field::new("updated_at", DataType::Int64, false),
Field::new("deleted_at", DataType::Int64, true),
])),
vec!["id".into()],
)
}
fn customers_schema() -> TableSchema {
TableSchema::new(
"customers",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("email", DataType::Utf8, true),
Field::new("created_at", DataType::Int64, false),
Field::new("updated_at", DataType::Int64, false),
Field::new("deleted_at", DataType::Int64, true),
])),
vec!["id".into()],
)
}
async fn make_sidecar_engine(tmp: &TempDir, ext_path: &str) -> HtapEngine {
let config = HtapConfig {
oltp_path: tmp.path().join("local.db").to_str().unwrap().to_string(),
sync_mode: SyncMode::Temporal,
sidecar: Some(SidecarConfig {
source: SidecarSource::Sqlite(ext_path.to_string()),
timestamp_config: TimestampCdcConfig {
tables: vec![orders_table_config(), customers_table_config()],
poll_batch_size: 500,
delete_detection: DeleteDetection::SoftDelete {
column: "deleted_at".into(),
},
},
enable_local_oltp: false,
watermark_path: None,
}),
..Default::default()
};
let engine = HtapEngine::new(config).await.unwrap();
engine.register_table(orders_schema()).await.unwrap();
engine.register_table(customers_schema()).await.unwrap();
engine
}
fn ext_exec(path: &str, sql: &str) {
let conn = rusqlite::Connection::open(path).unwrap();
conn.execute_batch(sql).unwrap();
}
async fn olap_count(engine: &HtapEngine, sql: &str) -> i64 {
let batches = engine.olap().query(sql).await.unwrap();
if batches.is_empty() || batches[0].num_rows() == 0 {
return 0;
}
batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0)
}
async fn olap_string(engine: &HtapEngine, sql: &str) -> String {
let batches = engine.olap().query(sql).await.unwrap();
batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0)
.to_string()
}
#[tokio::test]
async fn test_sidecar_e2e_full_pipeline() {
let tmp = TempDir::new().unwrap();
let ext_path = tmp.path().join("source.db");
let ext = ext_path.to_str().unwrap();
println!("\n=== Phase 1: Setup ===");
setup_external_db(ext);
let engine = make_sidecar_engine(&tmp, ext).await;
assert!(
engine.oltp().is_none(),
"OLTP should be None in sidecar mode"
);
assert!(
engine.execute("SELECT 1", &[]).await.is_err(),
"execute() should fail without OLTP"
);
assert!(engine.olap().table_exists("orders").await.unwrap());
assert!(engine.olap().table_exists("customers").await.unwrap());
println!(" OLAP tables created: orders, customers");
println!("\n=== Phase 2: INSERT (batch) ===");
ext_exec(
ext,
"
INSERT INTO customers VALUES (1, 'Alice', 'alice@test.com', 1000, 1000, NULL);
INSERT INTO customers VALUES (2, 'Bob', 'bob@test.com', 1001, 1001, NULL);
INSERT INTO customers VALUES (3, 'Carol', 'carol@test.com', 1002, 1002, NULL);
INSERT INTO orders VALUES (100, 1, 5000, 'pending', 1000, 1000, NULL);
INSERT INTO orders VALUES (101, 1, 3000, 'pending', 1001, 1001, NULL);
INSERT INTO orders VALUES (102, 2, 7500, 'pending', 1002, 1002, NULL);
INSERT INTO orders VALUES (103, 3, 1200, 'confirmed', 1003, 1003, NULL);
",
);
let t0 = Instant::now();
let result = engine.sync_now().await.unwrap();
let sync_latency_1 = t0.elapsed();
println!(
" Sync result: {} events, {} inserted | latency: {:?}",
result.events_processed, result.rows_inserted, sync_latency_1
);
assert_eq!(result.events_processed, 7, "3 customers + 4 orders");
assert_eq!(result.rows_inserted, 7);
let batches = engine
.olap()
.query("SELECT * FROM orders LIMIT 1")
.await
.unwrap();
let schema = batches[0].schema();
assert!(schema.field_with_name("_rhei_valid_from").is_ok());
assert!(schema.field_with_name("_rhei_valid_to").is_ok());
assert!(schema.field_with_name("_rhei_operation").is_ok());
println!(
" OLAP orders schema: {} fields (including 3 temporal)",
schema.fields().len()
);
let customer_count = olap_count(&engine, "SELECT COUNT(*) FROM customers").await;
let order_count = olap_count(&engine, "SELECT COUNT(*) FROM orders").await;
assert_eq!(customer_count, 3);
assert_eq!(order_count, 4);
println!(" OLAP: {customer_count} customers, {order_count} orders");
let current_orders = olap_count(
&engine,
"SELECT COUNT(*) FROM orders WHERE _rhei_valid_to IS NULL AND _rhei_operation = 'I'",
)
.await;
assert_eq!(current_orders, 4);
println!("\n=== Phase 3: UPDATE ===");
ext_exec(
ext,
"
UPDATE orders SET status = 'confirmed', updated_at = 2000 WHERE id = 100;
UPDATE orders SET status = 'shipped', updated_at = 2001 WHERE id = 101;
UPDATE customers SET email = 'alice_new@test.com', updated_at = 2002 WHERE id = 1;
",
);
let t1 = Instant::now();
let result = engine.sync_now().await.unwrap();
let sync_latency_2 = t1.elapsed();
println!(
" Sync result: {} events, {} updated | latency: {:?}",
result.events_processed, result.rows_updated, sync_latency_2
);
assert_eq!(result.events_processed, 3);
assert_eq!(result.rows_updated, 3);
let order_100_versions =
olap_count(&engine, "SELECT COUNT(*) FROM orders WHERE id = 100").await;
assert_eq!(
order_100_versions, 2,
"order 100 should have 2 versions (original + update)"
);
let closed_count = olap_count(
&engine,
"SELECT COUNT(*) FROM orders WHERE id = 100 AND _rhei_valid_to IS NOT NULL",
)
.await;
assert_eq!(closed_count, 1, "one version should be closed");
let current_status = olap_string(
&engine,
"SELECT status FROM orders WHERE id = 100 AND _rhei_valid_to IS NULL",
)
.await;
assert_eq!(current_status, "confirmed");
println!(" Order 100: {order_100_versions} versions, current status = '{current_status}'");
println!("\n=== Phase 4: Point-in-time queries ===");
let status_at_1500 = olap_string(
&engine,
"SELECT status FROM orders
WHERE id = 100
AND _rhei_valid_from <= 1500
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 1500)",
)
.await;
assert_eq!(
status_at_1500, "pending",
"at t=1500, order should be pending"
);
println!(" Order 100 at t=1500: status = '{status_at_1500}'");
let status_at_2500 = olap_string(
&engine,
"SELECT status FROM orders
WHERE id = 100
AND _rhei_valid_from <= 2500
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 2500)",
)
.await;
assert_eq!(
status_at_2500, "confirmed",
"at t=2500, order should be confirmed"
);
println!(" Order 100 at t=2500: status = '{status_at_2500}'");
let email_at_1500 = olap_string(
&engine,
"SELECT email FROM customers
WHERE id = 1
AND _rhei_valid_from <= 1500
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 1500)",
)
.await;
assert_eq!(email_at_1500, "alice@test.com");
println!(" Alice email at t=1500: '{email_at_1500}'");
let email_current = olap_string(
&engine,
"SELECT email FROM customers WHERE id = 1 AND _rhei_valid_to IS NULL",
)
.await;
assert_eq!(email_current, "alice_new@test.com");
println!(" Alice email current: '{email_current}'");
println!("\n=== Phase 5: Soft-DELETE ===");
ext_exec(
ext,
"
UPDATE orders SET deleted_at = 3000, updated_at = 3000 WHERE id = 103;
UPDATE customers SET deleted_at = 3001, updated_at = 3001 WHERE id = 3;
",
);
let t2 = Instant::now();
let result = engine.sync_now().await.unwrap();
let sync_latency_3 = t2.elapsed();
println!(
" Sync result: {} events ({} updated, {} deleted) | latency: {:?}",
result.events_processed, result.rows_updated, result.rows_deleted, sync_latency_3
);
assert!(
result.events_processed >= 2,
"should detect at least the 2 updated rows"
);
let order_103_versions =
olap_count(&engine, "SELECT COUNT(*) FROM orders WHERE id = 103").await;
println!(" Order 103: {order_103_versions} versions (including tombstone if detected)");
println!("\n=== Phase 6: Analytical queries ===");
let total_active = olap_count(
&engine,
"SELECT COALESCE(SUM(amount), 0) FROM orders
WHERE _rhei_valid_to IS NULL AND _rhei_operation != 'D'",
)
.await;
println!(" Total active order amount: {total_active}");
let batches = engine
.olap()
.query("SELECT id, COUNT(*) as versions FROM orders GROUP BY id ORDER BY id")
.await
.unwrap();
let ids = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let versions = batches[0]
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
for i in 0..batches[0].num_rows() {
println!(" Order {}: {} version(s)", ids.value(i), versions.value(i));
}
let snapshot_count = olap_count(
&engine,
"SELECT COUNT(*) FROM orders
WHERE _rhei_valid_from <= 1500
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 1500)",
)
.await;
println!(" Orders visible at t=1500: {snapshot_count}");
assert_eq!(snapshot_count, 4, "all 4 orders existed at t=1500");
println!("\n=== Phase 7: Idempotent re-sync ===");
let t3 = Instant::now();
let result = engine.sync_now().await.unwrap();
let sync_latency_4 = t3.elapsed();
println!(
" Sync result: {} events (should be 0) | latency: {:?}",
result.events_processed, sync_latency_4
);
assert_eq!(result.events_processed, 0, "no new changes to sync");
println!("\n=== Phase 8: Bulk insert throughput ===");
{
let conn = rusqlite::Connection::open(ext).unwrap();
let mut stmt = conn
.prepare("INSERT INTO orders VALUES (?1, ?2, ?3, 'pending', ?4, ?4, NULL)")
.unwrap();
let base_ts = 4000i64;
for i in 200..300 {
stmt.execute(rusqlite::params![i, (i % 3) + 1, i * 100, base_ts + i])
.unwrap();
}
}
let t4 = Instant::now();
let result = engine.sync_now().await.unwrap();
let sync_latency_5 = t4.elapsed();
let throughput = if sync_latency_5.as_secs_f64() > 0.0 {
result.events_processed as f64 / sync_latency_5.as_secs_f64()
} else {
f64::INFINITY
};
println!(
" Synced {} events in {:?} ({:.0} events/sec)",
result.events_processed, sync_latency_5, throughput
);
assert_eq!(result.events_processed, 100);
assert_eq!(result.rows_inserted, 100);
let total_rows = olap_count(&engine, "SELECT COUNT(*) FROM orders").await;
println!(" Total OLAP order rows (all versions): {total_rows}");
assert!(
total_rows >= 106,
"4 original + 2 updated versions + 100 bulk = 106+"
);
println!("\n=== Phase 9: Sync status ===");
let status = engine.sync_status().await.unwrap();
println!(" Last synced seq: {:?}", status.last_synced_seq);
println!(" Latest available: {:?}", status.latest_available_seq);
println!(" Lag: {} events", status.lag);
println!("\n=== Latency Summary ===");
println!(" Initial sync (7 events): {:?}", sync_latency_1);
println!(" Update sync (3 events): {:?}", sync_latency_2);
println!(" Delete sync (2+ events): {:?}", sync_latency_3);
println!(" No-op sync (0 events): {:?}", sync_latency_4);
println!(" Bulk sync (100 events): {:?}", sync_latency_5);
println!(" Throughput: {:.0} events/sec", throughput);
println!("\n=== E2E Test PASSED ===\n");
}