mod postgres_helpers;
use anyhow::{Context, Result};
use bytes::Bytes;
use drasi_bootstrap_postgres::{
PostgresBootstrapConfig, PostgresBootstrapProvider, SslMode as BootstrapSslMode,
TableKeyConfig as BootstrapTableKeyConfig,
};
use drasi_lib::ComponentStatus;
use drasi_lib::{config::SourceSubscriptionSettings, DrasiLib, Query, Source, SourceError};
use drasi_reaction_application::{ApplicationReaction, ApplicationReactionHandle};
use drasi_source_postgres::{
PostgresReplicationSource, PostgresSourceConfig, SslMode, TableKeyConfig,
};
use postgres_helpers::{
create_decimal_test_table, create_logical_replication_slot, create_publication,
create_test_table, create_test_table_replica_identity_default, delete_test_row,
grant_replication, grant_table_access, insert_decimal_test_row, insert_test_row,
setup_replication_postgres, update_test_row,
};
use serial_test::serial;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
const TEST_TABLE: &str = "users";
const TEST_PUBLICATION: &str = "drasi_test_pub";
fn init_logging() {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.is_test(true)
.try_init();
}
fn slot_name() -> String {
format!("drasi_slot_{}", uuid::Uuid::new_v4().simple())
}
async fn wait_for_source_running(source: &PostgresReplicationSource) {
let start = Instant::now();
while start.elapsed() < Duration::from_secs(10) {
if source.status().await == ComponentStatus::Running {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("Source did not reach Running state within 10s");
}
async fn wait_for_query_results(
core: &Arc<DrasiLib>,
query_id: &str,
predicate: impl Fn(&[serde_json::Value]) -> bool,
) -> Result<()> {
let start = Instant::now();
let timeout = Duration::from_secs(20);
loop {
match core.get_query_results(query_id).await {
Ok(results) => {
if predicate(&results) {
return Ok(());
}
}
Err(e) if e.to_string().contains("is not running") => {
}
Err(e) => return Err(e.into()),
}
if start.elapsed() > timeout {
anyhow::bail!("Timed out waiting for query results for query_id `{query_id}`")
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
}
async fn build_core(
config: &postgres_helpers::ReplicationPostgresConfig,
slot_name: String,
) -> Result<(Arc<DrasiLib>, ApplicationReactionHandle)> {
let source_config = PostgresSourceConfig {
host: config.host.clone(),
port: config.port,
database: config.database.clone(),
user: config.user.clone(),
password: config.password.clone(),
tables: vec![TEST_TABLE.to_string()],
slot_name,
publication_name: TEST_PUBLICATION.to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![TableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_config = PostgresBootstrapConfig {
host: source_config.host.clone(),
port: source_config.port,
database: source_config.database.clone(),
user: source_config.user.clone(),
password: source_config.password.clone(),
tables: source_config.tables.clone(),
slot_name: source_config.slot_name.clone(),
publication_name: source_config.publication_name.clone(),
ssl_mode: BootstrapSslMode::Disable,
table_keys: vec![BootstrapTableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_provider = PostgresBootstrapProvider::new(bootstrap_config);
let source = PostgresReplicationSource::builder("pg-test-source")
.with_config(source_config)
.with_bootstrap_provider(bootstrap_provider)
.build()?;
let query = Query::cypher("test-query")
.query(
r#"
MATCH (u:users)
RETURN u.id AS id, u.name AS name
"#,
)
.from_source("pg-test-source")
.auto_start(true)
.enable_bootstrap(true)
.build();
let (reaction, handle) = ApplicationReaction::builder("test-reaction")
.with_query("test-query")
.build();
let core = Arc::new(
DrasiLib::builder()
.with_id("pg-test-core")
.with_source(source)
.with_query(query)
.with_reaction(reaction)
.build()
.await?,
);
Ok((core, handle))
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_source_connects_and_starts() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
wait_for_query_results(&core, "test-query", |results| results.is_empty()).await?;
let status = core.get_source_status("pg-test-source").await?;
assert_eq!(status, drasi_lib::channels::ComponentStatus::Running);
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_schema_discovery_reports_columns_end_to_end() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
let schema = core
.get_source_schema("pg-test-source")
.await?
.expect("postgres source should report schema");
assert!(schema.nodes.iter().any(|node| node.label == TEST_TABLE));
let users = schema
.nodes
.iter()
.find(|node| node.label == TEST_TABLE)
.expect("users schema should be present");
assert!(users
.properties
.iter()
.any(|property| property.name == "id"));
assert!(users
.properties
.iter()
.any(|property| property.name == "name"));
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_insert_detection() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice".into()))
})
.await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_update_detection() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice".into()))
})
.await?;
update_test_row(&client, TEST_TABLE, 1, "Alice Updated").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice Updated".into()))
})
.await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_update_without_old_tuple_stays_update() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table_replica_identity_default(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice".into()))
})
.await?;
update_test_row(&client, TEST_TABLE, 1, "Alice Updated").await?;
wait_for_query_results(&core, "test-query", |results| {
results.len() == 1
&& results
.iter()
.any(|row| row.get("name") == Some(&"Alice Updated".into()))
})
.await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_delete_detection() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice".into()))
})
.await?;
delete_test_row(&client, TEST_TABLE, 1).await?;
wait_for_query_results(&core, "test-query", |results| results.is_empty()).await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_full_crud_cycle() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let (core, _handle) = build_core(pg.config(), slot_name).await?;
core.start().await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice".into()))
})
.await?;
update_test_row(&client, TEST_TABLE, 1, "Alice Updated").await?;
wait_for_query_results(&core, "test-query", |results| {
results
.iter()
.any(|row| row.get("name") == Some(&"Alice Updated".into()))
})
.await?;
delete_test_row(&client, TEST_TABLE, 1).await?;
wait_for_query_results(&core, "test-query", |results| results.is_empty()).await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_decimal_datatype_serialization() -> Result<()> {
init_logging();
const DECIMAL_TABLE: &str = "products";
const DECIMAL_PUBLICATION: &str = "drasi_decimal_pub";
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_decimal_test_table(&client, DECIMAL_TABLE).await?;
grant_table_access(&client, DECIMAL_TABLE, "postgres").await?;
create_publication(&client, DECIMAL_PUBLICATION, &[DECIMAL_TABLE.to_string()]).await?;
let slot_name = slot_name();
create_logical_replication_slot(&client, &slot_name).await?;
let source_config = PostgresSourceConfig {
host: pg.config().host.clone(),
port: pg.config().port,
database: pg.config().database.clone(),
user: pg.config().user.clone(),
password: pg.config().password.clone(),
tables: vec![DECIMAL_TABLE.to_string()],
slot_name: slot_name.clone(),
publication_name: DECIMAL_PUBLICATION.to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![TableKeyConfig {
table: DECIMAL_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_config = PostgresBootstrapConfig {
host: source_config.host.clone(),
port: source_config.port,
database: source_config.database.clone(),
user: source_config.user.clone(),
password: source_config.password.clone(),
tables: source_config.tables.clone(),
slot_name: source_config.slot_name.clone(),
publication_name: source_config.publication_name.clone(),
ssl_mode: BootstrapSslMode::Disable,
table_keys: vec![BootstrapTableKeyConfig {
table: DECIMAL_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_provider = PostgresBootstrapProvider::new(bootstrap_config);
let source = PostgresReplicationSource::builder("pg-decimal-test-source")
.with_config(source_config)
.with_bootstrap_provider(bootstrap_provider)
.build()?;
let query = Query::cypher("decimal-test-query")
.query(
r#"
MATCH (p:products)
RETURN p.id AS id, p.price AS price, p.quantity AS quantity, p.total AS total
"#,
)
.from_source("pg-decimal-test-source")
.auto_start(true)
.enable_bootstrap(true)
.build();
let (reaction, _handle) = ApplicationReaction::builder("decimal-test-reaction")
.with_query("decimal-test-query")
.build();
let core = Arc::new(
DrasiLib::builder()
.with_id("pg-decimal-test-core")
.with_source(source)
.with_query(query)
.with_reaction(reaction)
.build()
.await?,
);
core.start().await?;
insert_decimal_test_row(&client, DECIMAL_TABLE, 1, "99.99", "10.5000", "1049.895000").await?;
wait_for_query_results(&core, "decimal-test-query", |results| {
if results.is_empty() {
return false;
}
let row = &results[0];
if let Some(price) = row.get("price") {
if !price.is_number() {
log::error!("price is not a number: {price:?}");
return false;
}
if let Some(price_val) = price.as_f64() {
let expected = 99.99;
if (price_val - expected).abs() > 0.0001 {
log::error!("price value is incorrect: expected {expected}, got {price_val}");
return false;
}
} else {
log::error!("price cannot be converted to f64");
return false;
}
} else {
log::error!("price field is missing");
return false;
}
if let Some(quantity) = row.get("quantity") {
if !quantity.is_number() {
log::error!("quantity is not a number: {quantity:?}");
return false;
}
if let Some(quantity_val) = quantity.as_f64() {
let expected = 10.5;
if (quantity_val - expected).abs() > 0.0001 {
log::error!(
"quantity value is incorrect: expected {expected}, got {quantity_val}"
);
return false;
}
} else {
log::error!("quantity cannot be converted to f64");
return false;
}
} else {
log::error!("quantity field is missing");
return false;
}
if let Some(total) = row.get("total") {
if !total.is_number() {
log::error!("total is not a number: {total:?}");
return false;
}
if let Some(total_val) = total.as_f64() {
let expected = 1049.895000;
if (total_val - expected).abs() > 0.0001 {
log::error!("total value is incorrect: expected {expected}, got {total_val}");
return false;
}
} else {
log::error!("total cannot be converted to f64");
return false;
}
} else {
log::error!("total field is missing");
return false;
}
true
})
.await?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}
fn build_source_config(
config: &postgres_helpers::ReplicationPostgresConfig,
slot_name: String,
) -> PostgresSourceConfig {
PostgresSourceConfig {
host: config.host.clone(),
port: config.port,
database: config.database.clone(),
user: config.user.clone(),
password: config.password.clone(),
tables: vec![TEST_TABLE.to_string()],
slot_name,
publication_name: TEST_PUBLICATION.to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![TableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
}
}
fn build_source(
config: &postgres_helpers::ReplicationPostgresConfig,
slot_name: String,
) -> Result<PostgresReplicationSource> {
PostgresReplicationSource::builder("pg-direct-source")
.with_config(build_source_config(config, slot_name))
.build()
}
fn subscription_settings(
source_id: &str,
query_id: &str,
resume_from: Option<Bytes>,
request_position_handle: bool,
) -> SourceSubscriptionSettings {
SourceSubscriptionSettings {
source_id: source_id.to_string(),
enable_bootstrap: false,
query_id: query_id.to_string(),
nodes: HashSet::new(),
relations: HashSet::new(),
resume_from,
request_position_handle,
last_sequence: None,
}
}
fn lsn_to_bytes(lsn: u64) -> Bytes {
Bytes::from(lsn.to_be_bytes().to_vec())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_resume_from_before_slot_watermark_returns_position_unavailable() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot = slot_name();
create_logical_replication_slot(&client, &slot).await?;
let source = build_source(pg.config(), slot)?;
source.start().await?;
wait_for_source_running(&source).await;
let settings = subscription_settings(
"pg-direct-source",
"q-unavailable",
Some(lsn_to_bytes(1)),
false,
);
let result = source.subscribe(settings).await;
assert!(result.is_err(), "Expected PositionUnavailable error");
let err = match result {
Err(e) => e,
Ok(_) => unreachable!("already asserted is_err"),
};
let source_err: &SourceError = err.downcast_ref().expect("should be SourceError");
match source_err {
SourceError::PositionUnavailable {
source_id,
requested,
earliest_available,
} => {
assert_eq!(source_id, "pg-direct-source");
assert_eq!(requested, &lsn_to_bytes(1));
assert!(earliest_available.is_some());
}
}
source.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_events_carry_source_position_bytes() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot = slot_name();
create_logical_replication_slot(&client, &slot).await?;
let source = build_source(pg.config(), slot)?;
source.start().await?;
wait_for_source_running(&source).await;
let settings = subscription_settings("pg-direct-source", "q-position-test", None, true);
let response = source.subscribe(settings).await?;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
let mut rx = response.receiver;
let start = Instant::now();
let timeout = Duration::from_secs(15);
let mut found_event = false;
while start.elapsed() < timeout {
match tokio::time::timeout(Duration::from_millis(500), rx.recv()).await {
Ok(Ok(event)) => {
assert!(
event.source_position.is_some(),
"Event should have source_position set"
);
let pos = event.source_position.as_ref().expect("already checked");
assert_eq!(pos.len(), 8, "source_position should be 8 bytes (LSN)");
let lsn = u64::from_be_bytes(pos[..8].try_into().expect("8 bytes"));
assert!(lsn > 0, "LSN should be non-zero");
assert!(
event.sequence.is_some(),
"Event should have a sequence number"
);
found_event = true;
break;
}
Ok(Err(_)) => break,
Err(_) => continue,
}
}
assert!(found_event, "Should have received at least one event");
source.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_resume_subscription_replays_from_lsn() -> Result<()> {
init_logging();
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot = slot_name();
create_logical_replication_slot(&client, &slot).await?;
let source = build_source(pg.config(), slot)?;
source.start().await?;
wait_for_source_running(&source).await;
let settings = subscription_settings("pg-direct-source", "q-first", None, true);
let response = source.subscribe(settings).await?;
let mut rx = response.receiver;
insert_test_row(&client, TEST_TABLE, 1, "Alice").await?;
insert_test_row(&client, TEST_TABLE, 2, "Bob").await?;
let mut positions: Vec<Bytes> = Vec::new();
let start = Instant::now();
let timeout = Duration::from_secs(15);
while positions.len() < 2 && start.elapsed() < timeout {
match tokio::time::timeout(Duration::from_millis(500), rx.recv()).await {
Ok(Ok(event)) => {
if let Some(pos) = event.source_position.clone() {
positions.push(pos);
}
}
Ok(Err(_)) => break,
Err(_) => continue,
}
}
assert_eq!(positions.len(), 2, "Should receive 2 events with positions");
let resume_lsn = positions[0].clone();
source.stop().await?;
tokio::time::sleep(Duration::from_millis(500)).await;
source.start().await?;
wait_for_source_running(&source).await;
let settings = subscription_settings("pg-direct-source", "q-resumed", Some(resume_lsn), true);
let response = source.subscribe(settings).await?;
let mut rx2 = response.receiver;
let mut replay_count = 0;
let start2 = Instant::now();
while start2.elapsed() < Duration::from_secs(15) {
match tokio::time::timeout(Duration::from_millis(500), rx2.recv()).await {
Ok(Ok(_event)) => {
replay_count += 1;
if replay_count >= 2 {
break;
}
}
Ok(Err(_)) => break,
Err(_) => continue,
}
}
assert!(
replay_count >= 2,
"Should replay at least 2 events from the resumed LSN, got {replay_count}"
);
source.stop().await?;
pg.cleanup().await;
Ok(())
}
async fn wait_for_subscription_change<F>(
sub: &mut drasi_reaction_application::subscription::Subscription,
attempts: usize,
mut matcher: F,
) -> Result<drasi_lib::channels::ResultDiff>
where
F: FnMut(&drasi_lib::channels::ResultDiff) -> bool,
{
for _ in 0..attempts {
if let Some(result) = sub.recv().await {
for entry in &result.results {
if matcher(entry) {
return Ok(entry.clone());
}
}
}
}
anyhow::bail!("Timed out waiting for expected change event");
}
fn pg_matches_change(
entry: &drasi_lib::channels::ResultDiff,
change_type: &str,
fields: &[(&str, &str)],
) -> bool {
let data = match (change_type, entry) {
("ADD", drasi_lib::channels::ResultDiff::Add { data, .. })
| ("DELETE", drasi_lib::channels::ResultDiff::Delete { data, .. })
| ("UPDATE", drasi_lib::channels::ResultDiff::Update { data, .. }) => data,
_ => return false,
};
fields
.iter()
.all(|(field, expected)| match data.get(*field) {
Some(serde_json::Value::String(s)) => s == expected,
Some(serde_json::Value::Number(n)) => n.to_string() == *expected,
_ => false,
})
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_postgres_multi_query_no_duplicate_on_restart() -> Result<()> {
use drasi_index_rocksdb::RocksDbIndexProvider;
use drasi_lib::indexes::config::{StorageBackendRef, StorageBackendSpec};
use drasi_reaction_application::subscription::SubscriptionOptions;
init_logging();
let tmp_dir = tempfile::TempDir::new()?;
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot = slot_name();
create_logical_replication_slot(&client, &slot).await?;
insert_test_row(&client, TEST_TABLE, 600, "Seed").await?;
tokio::time::sleep(Duration::from_secs(2)).await;
let source_config = PostgresSourceConfig {
host: pg.config().host.clone(),
port: pg.config().port,
database: pg.config().database.clone(),
user: pg.config().user.clone(),
password: pg.config().password.clone(),
tables: vec![TEST_TABLE.to_string()],
slot_name: slot.clone(),
publication_name: TEST_PUBLICATION.to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![TableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_config = PostgresBootstrapConfig {
host: source_config.host.clone(),
port: source_config.port,
database: source_config.database.clone(),
user: source_config.user.clone(),
password: source_config.password.clone(),
tables: source_config.tables.clone(),
slot_name: source_config.slot_name.clone(),
publication_name: source_config.publication_name.clone(),
ssl_mode: BootstrapSslMode::Disable,
table_keys: vec![BootstrapTableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_provider = PostgresBootstrapProvider::new(bootstrap_config);
let source = PostgresReplicationSource::builder("pg-mq-source")
.with_config(source_config)
.with_bootstrap_provider(bootstrap_provider)
.build()?;
let q1_id = "mq-query1";
let q2_id = "mq-query2";
let q1_dir = tmp_dir.path().join("q1");
let q2_dir = tmp_dir.path().join("q2");
std::fs::create_dir_all(&q1_dir)?;
std::fs::create_dir_all(&q2_dir)?;
let query1 = Query::cypher(q1_id)
.query(
r#"
MATCH (u:users)
RETURN u.id AS id, u.name AS name
"#,
)
.from_source("pg-mq-source")
.auto_start(true)
.enable_bootstrap(true)
.with_storage_backend(StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
path: q1_dir.to_string_lossy().to_string(),
enable_archive: false,
direct_io: false,
}))
.build();
let query2 = Query::cypher(q2_id)
.query(
r#"
MATCH (u:users)
RETURN u.id AS id, u.name AS name
"#,
)
.from_source("pg-mq-source")
.auto_start(true)
.enable_bootstrap(true)
.with_storage_backend(StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
path: q2_dir.to_string_lossy().to_string(),
enable_archive: false,
direct_io: false,
}))
.build();
let (reaction1, handle1) = ApplicationReaction::builder("mq-reaction1")
.with_query(q1_id)
.build();
let (reaction2, handle2) = ApplicationReaction::builder("mq-reaction2")
.with_query(q2_id)
.build();
let provider = RocksDbIndexProvider::new(tmp_dir.path(), false, false);
let core = Arc::new(
DrasiLib::builder()
.with_id("pg-multi-query-test")
.with_source(source)
.with_query(query1)
.with_query(query2)
.with_reaction(reaction1)
.with_reaction(reaction2)
.with_index_provider(Arc::new(provider))
.build()
.await?,
);
core.start().await?;
let sub_opts = SubscriptionOptions::default().with_timeout(Duration::from_secs(5));
let mut sub1 = handle1.subscribe_with_options(sub_opts.clone()).await?;
let mut sub2 = handle2.subscribe_with_options(sub_opts.clone()).await?;
insert_test_row(&client, TEST_TABLE, 601, "Both").await?;
wait_for_subscription_change(&mut sub1, 15, |e| {
pg_matches_change(e, "ADD", &[("id", "601"), ("name", "Both")])
})
.await?;
wait_for_subscription_change(&mut sub2, 15, |e| {
pg_matches_change(e, "ADD", &[("id", "601"), ("name", "Both")])
})
.await?;
tokio::time::sleep(Duration::from_secs(2)).await;
core.stop_query(q2_id).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
insert_test_row(&client, TEST_TABLE, 602, "OnlyQ1").await?;
wait_for_subscription_change(&mut sub1, 15, |e| {
pg_matches_change(e, "ADD", &[("id", "602"), ("name", "OnlyQ1")])
})
.await?;
tokio::time::sleep(Duration::from_secs(2)).await;
core.stop().await?;
tokio::time::sleep(Duration::from_millis(500)).await;
insert_test_row(&client, TEST_TABLE, 603, "WhileStopped").await?;
tokio::time::sleep(Duration::from_secs(2)).await;
core.start().await?;
wait_for_subscription_change(&mut sub2, 20, |e| {
pg_matches_change(e, "ADD", &[("id", "602"), ("name", "OnlyQ1")])
})
.await
.context("query2 did not see replayed row 602")?;
wait_for_subscription_change(&mut sub2, 20, |e| {
pg_matches_change(e, "ADD", &[("id", "603"), ("name", "WhileStopped")])
})
.await
.context("query2 did not see row 603")?;
let mut saw_602_duplicate = false;
let mut saw_603 = false;
for _ in 0..20 {
if let Some(result) = sub1.recv().await {
for entry in &result.results {
if pg_matches_change(entry, "ADD", &[("id", "602")]) {
saw_602_duplicate = true;
}
if pg_matches_change(entry, "ADD", &[("id", "603"), ("name", "WhileStopped")]) {
saw_603 = true;
}
}
if saw_603 {
break;
}
}
}
assert!(
saw_603,
"query1 should see the new row 603 inserted while stopped"
);
assert!(
!saw_602_duplicate,
"query1 should NOT see row 602 again — position filtering must suppress it"
);
core.stop().await?;
pg.cleanup().await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_postgres_stop_restart_recovers() -> Result<()> {
use drasi_index_rocksdb::RocksDbIndexProvider;
use drasi_lib::indexes::config::{StorageBackendRef, StorageBackendSpec};
use drasi_reaction_application::subscription::SubscriptionOptions;
init_logging();
let tmp_dir = tempfile::TempDir::new()?;
let pg = setup_replication_postgres().await;
let client = pg.get_client().await?;
grant_replication(&client, "postgres").await?;
create_test_table(&client, TEST_TABLE).await?;
grant_table_access(&client, TEST_TABLE, "postgres").await?;
create_publication(&client, TEST_PUBLICATION, &[TEST_TABLE.to_string()]).await?;
let slot = slot_name();
create_logical_replication_slot(&client, &slot).await?;
insert_test_row(&client, TEST_TABLE, 700, "Seed").await?;
tokio::time::sleep(Duration::from_secs(2)).await;
let q_id = "kr-query";
let q_dir = tmp_dir.path().join("kr-q");
std::fs::create_dir_all(&q_dir)?;
let source_config = PostgresSourceConfig {
host: pg.config().host.clone(),
port: pg.config().port,
database: pg.config().database.clone(),
user: pg.config().user.clone(),
password: pg.config().password.clone(),
tables: vec![TEST_TABLE.to_string()],
slot_name: slot.clone(),
publication_name: TEST_PUBLICATION.to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![TableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_config = PostgresBootstrapConfig {
host: source_config.host.clone(),
port: source_config.port,
database: source_config.database.clone(),
user: source_config.user.clone(),
password: source_config.password.clone(),
tables: source_config.tables.clone(),
slot_name: source_config.slot_name.clone(),
publication_name: source_config.publication_name.clone(),
ssl_mode: BootstrapSslMode::Disable,
table_keys: vec![BootstrapTableKeyConfig {
table: TEST_TABLE.to_string(),
key_columns: vec!["id".to_string()],
}],
};
let bootstrap_provider = PostgresBootstrapProvider::new(bootstrap_config);
let source = PostgresReplicationSource::builder("kr-source")
.with_config(source_config)
.with_bootstrap_provider(bootstrap_provider)
.build()?;
let query = Query::cypher(q_id)
.query(
r#"
MATCH (u:users)
RETURN u.id AS id, u.name AS name
"#,
)
.from_source("kr-source")
.auto_start(true)
.enable_bootstrap(true)
.with_storage_backend(StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
path: q_dir.to_string_lossy().to_string(),
enable_archive: false,
direct_io: false,
}))
.build();
let (reaction, handle) = ApplicationReaction::builder("kr-reaction")
.with_query(q_id)
.build();
let provider = RocksDbIndexProvider::new(tmp_dir.path(), false, false);
let core = Arc::new(
DrasiLib::builder()
.with_id("pg-stop-restart")
.with_source(source)
.with_query(query)
.with_reaction(reaction)
.with_index_provider(Arc::new(provider))
.build()
.await?,
);
core.start().await?;
let sub_opts = SubscriptionOptions::default().with_timeout(Duration::from_secs(5));
let mut sub1 = handle.subscribe_with_options(sub_opts.clone()).await?;
insert_test_row(&client, TEST_TABLE, 701, "BeforeStop").await?;
wait_for_subscription_change(&mut sub1, 15, |e| {
pg_matches_change(e, "ADD", &[("id", "701"), ("name", "BeforeStop")])
})
.await
.context("query did not see row 701")?;
tokio::time::sleep(Duration::from_secs(3)).await;
core.stop().await?;
tokio::time::sleep(Duration::from_millis(500)).await;
insert_test_row(&client, TEST_TABLE, 702, "WhileStopped1").await?;
insert_test_row(&client, TEST_TABLE, 703, "WhileStopped2").await?;
tokio::time::sleep(Duration::from_secs(2)).await;
core.start().await?;
wait_for_subscription_change(&mut sub1, 20, |e| {
pg_matches_change(e, "ADD", &[("id", "702"), ("name", "WhileStopped1")])
})
.await
.context("query did not see row 702 after restart")?;
wait_for_subscription_change(&mut sub1, 20, |e| {
pg_matches_change(e, "ADD", &[("id", "703"), ("name", "WhileStopped2")])
})
.await
.context("query did not see row 703 after restart")?;
core.stop().await?;
pg.cleanup().await;
Ok(())
}