#![cfg(all(feature = "datafusion-backend", feature = "sidecar"))]
use std::sync::Arc;
use std::time::Instant;
use arrow::array::{Array, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use rhei::{
DeleteDetection, OlapEngine, SyncMode, TableSchema, TimestampCdcConfig, TimestampCdcConsumer,
TimestampTableConfig,
};
const PG_CONN_STR: &str = "host=localhost port=5432 user=rhei password=rhei_test dbname=rhei_test";
fn skip_unless_postgres() -> bool {
if std::env::var("RHEI_TEST_POSTGRES").is_ok() {
return false;
}
eprintln!(" SKIP: set RHEI_TEST_POSTGRES=1 to run Postgres tests");
true
}
fn pg_connect() -> postgres::Client {
use postgres::{Client, NoTls};
for attempt in 1..=10 {
match Client::connect(PG_CONN_STR, NoTls) {
Ok(client) => return client,
Err(e) if attempt < 10 => {
eprintln!(" pg_connect attempt {attempt}/10 failed: {e}, retrying...");
std::thread::sleep(std::time::Duration::from_millis(500));
}
Err(e) => panic!("failed to connect to Postgres after 10 attempts: {e}"),
}
}
unreachable!()
}
async fn pg_exec(sql: &'static str) {
tokio::task::spawn_blocking(move || {
let mut client = pg_connect();
client.batch_execute(sql).unwrap();
})
.await
.unwrap();
}
async fn pg_bulk_insert(count: i64, id_start: i64, ts_start: i64) {
tokio::task::spawn_blocking(move || {
use postgres::{Client, NoTls};
let mut client = Client::connect(PG_CONN_STR, NoTls).unwrap();
let stmt = client
.prepare("INSERT INTO orders VALUES ($1, $2, $3, 'pending', $4, $4, NULL)")
.unwrap();
for i in 0..count {
let id = id_start + i;
let ts = ts_start + i;
client
.execute(
&stmt,
&[&id, &format!("Customer {id}"), &((id as f64) * 10.0), &ts],
)
.unwrap();
}
})
.await
.unwrap();
}
fn make_pg_consumer(
config: TimestampCdcConfig,
) -> TimestampCdcConsumer<connector_arrow::postgres::PostgresConnection> {
use connector_arrow::postgres::PostgresConnection;
let client = pg_connect();
let pg_conn = PostgresConnection::new(client);
TimestampCdcConsumer::new(pg_conn, config)
}
fn orders_table_config() -> TimestampTableConfig {
TimestampTableConfig {
table_name: "orders".into(),
created_at_column: "created_at".into(),
updated_at_column: "updated_at".into(),
primary_key: vec!["id".into()],
columns: vec![],
}
}
fn orders_schema() -> TableSchema {
TableSchema::new(
"orders",
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("customer_name", DataType::Utf8, false),
Field::new("amount", DataType::Float64, false),
Field::new("status", DataType::Utf8, false),
Field::new("created_at", DataType::Int64, false),
Field::new("updated_at", DataType::Int64, false),
Field::new("deleted_at", DataType::Int64, true),
])),
vec!["id".into()],
)
}
#[tokio::test]
async fn test_postgres_source_connector() {
if skip_unless_postgres() {
return;
}
println!("\n=== Postgres SourceConnector Test ===");
pg_exec(
"
DROP TABLE IF EXISTS pg_test;
CREATE TABLE pg_test (
id BIGINT PRIMARY KEY,
name TEXT NOT NULL,
score DOUBLE PRECISION,
active BOOLEAN DEFAULT true,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
INSERT INTO pg_test VALUES (1, 'Alice', 95.5, true, 1000, 1000);
INSERT INTO pg_test VALUES (2, 'Bob', 87.3, false, 1001, 1001);
INSERT INTO pg_test VALUES (3, 'Carol', NULL, true, 1002, 1002);
",
)
.await;
let batches = tokio::task::spawn_blocking(|| {
use connector_arrow::postgres::PostgresConnection;
use postgres::{Client, NoTls};
let client = Client::connect(PG_CONN_STR, NoTls).unwrap();
let mut conn = PostgresConnection::new(client);
use rhei::SourceConnector;
conn.query("SELECT * FROM pg_test ORDER BY id").unwrap()
})
.await
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 3);
let ids = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ids.value(0), 1);
assert_eq!(ids.value(2), 3);
let names = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
let scores = batch
.column(2)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!((scores.value(0) - 95.5).abs() < f64::EPSILON);
assert!(scores.is_null(2));
println!(" Types verified: int64, text, float64, bool, null");
pg_exec("DROP TABLE pg_test").await;
println!("=== SourceConnector PASSED ===\n");
}
#[tokio::test]
async fn test_postgres_sidecar_full_pipeline() {
if skip_unless_postgres() {
return;
}
println!("\n=== Postgres Sidecar Full Pipeline ===");
pg_exec(
"
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
customer_name TEXT NOT NULL,
amount DOUBLE PRECISION NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
deleted_at BIGINT
);
",
)
.await;
let consumer = tokio::task::spawn_blocking(|| {
make_pg_consumer(TimestampCdcConfig {
tables: vec![orders_table_config()],
poll_batch_size: 500,
delete_detection: DeleteDetection::SoftDelete {
column: "deleted_at".into(),
},
})
})
.await
.unwrap();
let olap = rhei::SharedDataFusionEngine::new(rhei::DataFusionEngine::new());
let schema_registry = rhei::SchemaRegistry::new();
schema_registry.register(orders_schema()).unwrap();
let temporal_schema = rhei::temporalize_schema(&orders_schema().arrow_schema);
olap.create_table("orders", &temporal_schema, &orders_schema().primary_key)
.await
.unwrap();
let sync_engine = Arc::new(
rhei::CdcSyncEngine::new(consumer, olap.clone(), schema_registry, 1000)
.with_sync_mode(SyncMode::Temporal),
);
use rhei::SyncEngine;
println!(" Phase 1: INSERT");
pg_exec(
"
INSERT INTO orders VALUES (1, 'Alice', 99.99, 'pending', 1000, 1000, NULL);
INSERT INTO orders VALUES (2, 'Bob', 149.50, 'pending', 1001, 1001, NULL);
INSERT INTO orders VALUES (3, 'Carol', 75.00, 'confirmed', 1002, 1002, NULL);
",
)
.await;
let t0 = Instant::now();
let result = sync_engine.sync_once().await.unwrap();
println!(
" {} events, {} inserted | {:?}",
result.events_processed,
result.rows_inserted,
t0.elapsed()
);
assert_eq!(result.events_processed, 3);
assert_eq!(result.rows_inserted, 3);
let batches = olap.query("SELECT COUNT(*) FROM orders").await.unwrap();
let count = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(count, 3);
println!(" Phase 2: UPDATE");
pg_exec("UPDATE orders SET status = 'shipped', updated_at = 2000 WHERE id = 1").await;
let result = sync_engine.sync_once().await.unwrap();
println!(
" {} events, {} updated | {:?}",
result.events_processed,
result.rows_updated,
Instant::now() - t0
);
assert_eq!(result.rows_updated, 1);
let batches = olap
.query("SELECT COUNT(*) FROM orders WHERE id = 1")
.await
.unwrap();
let versions = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(versions, 2);
println!(" Order 1: {versions} versions");
println!(" Phase 3: Point-in-time");
let batches = olap
.query(
"SELECT status FROM orders WHERE id = 1
AND _rhei_valid_from <= 1500
AND (_rhei_valid_to IS NULL OR _rhei_valid_to > 1500)",
)
.await
.unwrap();
let status = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
assert_eq!(status, "pending");
println!(" at t=1500: '{status}'");
let batches = olap
.query("SELECT status FROM orders WHERE id = 1 AND _rhei_valid_to IS NULL")
.await
.unwrap();
let status = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
assert_eq!(status, "shipped");
println!(" current: '{status}'");
println!(" Phase 4: Soft-DELETE");
pg_exec("UPDATE orders SET deleted_at = 3000, updated_at = 3000 WHERE id = 3").await;
let result = sync_engine.sync_once().await.unwrap();
println!(
" {} events ({} updated, {} deleted)",
result.events_processed, result.rows_updated, result.rows_deleted
);
assert!(result.events_processed >= 1);
println!(" Phase 5: Re-sync (idempotent)");
let result = sync_engine.sync_once().await.unwrap();
assert_eq!(result.events_processed, 0);
println!(" {} events", result.events_processed);
println!(" Phase 6: Bulk INSERT (500 rows)");
pg_bulk_insert(500, 100, 4000).await;
let t1 = Instant::now();
let result = sync_engine.sync_once().await.unwrap();
let elapsed = t1.elapsed();
let throughput = result.events_processed as f64 / elapsed.as_secs_f64();
println!(
" {} events in {:?} ({:.0} events/sec)",
result.events_processed, elapsed, throughput
);
assert_eq!(result.events_processed, 500);
tokio::task::spawn_blocking(move || drop(sync_engine))
.await
.unwrap();
pg_exec("DROP TABLE orders").await;
println!("\n=== Postgres Pipeline PASSED ===\n");
}
#[tokio::test]
async fn test_postgres_type_mapping() {
if skip_unless_postgres() {
return;
}
println!("\n=== Postgres Type Mapping ===");
pg_exec(
"
DROP TABLE IF EXISTS type_test;
CREATE TABLE type_test (
id BIGINT PRIMARY KEY,
small_int SMALLINT,
regular_int INTEGER,
big_int BIGINT,
real_val REAL,
double_val DOUBLE PRECISION,
text_val TEXT,
bool_val BOOLEAN,
nullable_text TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
INSERT INTO type_test VALUES (
1, 42, 100000, 9999999999, 3.14, 2.718281828, 'hello', true, NULL, 1000, 1000
);
",
)
.await;
let consumer = tokio::task::spawn_blocking(|| {
make_pg_consumer(TimestampCdcConfig {
tables: vec![TimestampTableConfig {
table_name: "type_test".into(),
created_at_column: "created_at".into(),
updated_at_column: "updated_at".into(),
primary_key: vec!["id".into()],
columns: vec![],
}],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
})
})
.await
.unwrap();
use rhei_core::CdcConsumer;
let events = consumer.poll(None, 100).await.unwrap();
assert_eq!(events.len(), 1);
let data = events[0].new_data.as_ref().unwrap();
let obj = data.as_object().unwrap();
assert_eq!(obj.get("id").unwrap().as_i64().unwrap(), 1);
assert_eq!(obj.get("small_int").unwrap().as_i64().unwrap(), 42);
assert_eq!(obj.get("regular_int").unwrap().as_i64().unwrap(), 100000);
assert_eq!(obj.get("big_int").unwrap().as_i64().unwrap(), 9999999999);
assert!((obj.get("double_val").unwrap().as_f64().unwrap() - 2.718281828).abs() < 1e-6);
assert_eq!(obj.get("text_val").unwrap().as_str().unwrap(), "hello");
assert_eq!(obj.get("bool_val").unwrap().as_bool().unwrap(), true);
assert!(obj.get("nullable_text").unwrap().is_null());
println!(" int64, int16, int32, float64, text, bool, null: all correct");
tokio::task::spawn_blocking(move || drop(consumer))
.await
.unwrap();
pg_exec("DROP TABLE type_test").await;
println!("=== Type Mapping PASSED ===\n");
}
#[tokio::test]
async fn test_htap_engine_sidecar_postgres_source() {
if skip_unless_postgres() {
return;
}
println!("\n=== HtapEngine SidecarSource::Postgres ===");
use rhei::{DeleteDetection, HtapConfig, HtapEngine, SidecarConfig, SidecarSource};
let config = HtapConfig {
oltp_path: ":memory:".to_string(),
sidecar: Some(SidecarConfig {
source: SidecarSource::Postgres(PG_CONN_STR.to_string()),
timestamp_config: TimestampCdcConfig {
tables: vec![],
poll_batch_size: 100,
delete_detection: DeleteDetection::Disabled,
},
enable_local_oltp: false,
watermark_path: None,
}),
..Default::default()
};
let engine = tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(HtapEngine::new(config))
})
.await
.unwrap();
assert!(
engine.is_ok(),
"HtapEngine::new with Postgres sidecar failed: {:?}",
engine.err()
);
println!("=== SidecarSource::Postgres construction PASSED ===\n");
}