mod helpers;
use rand::distributions::{Alphanumeric, DistString};
use sqlx_ledger::{account::*, event::*, journal::*, tx_template::*, *};
/// Helper: create a journal, two accounts, and a simple template.
/// Returns (ledger, journal_id, sender_account_id, recipient_account_id, tx_code).
async fn setup_ledger(
pool: &sqlx::PgPool,
) -> anyhow::Result<(SqlxLedger, JournalId, AccountId, AccountId, String)> {
let tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32);
let name = Alphanumeric.sample_string(&mut rand::thread_rng(), 32);
let new_journal = NewJournal::builder().name(name).build().unwrap();
let ledger = SqlxLedger::new(pool);
let journal_id = ledger.journals().create(new_journal).await.unwrap();
let code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32);
let sender_account_id = ledger
.accounts()
.create(
NewAccount::builder()
.id(uuid::Uuid::new_v4())
.name(format!("Sender {code}"))
.code(code)
.build()
.unwrap(),
)
.await
.unwrap();
let code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32);
let recipient_account_id = ledger
.accounts()
.create(
NewAccount::builder()
.id(uuid::Uuid::new_v4())
.name(format!("Recipient {code}"))
.code(code)
.build()
.unwrap(),
)
.await
.unwrap();
let params = vec![
ParamDefinition::builder()
.name("recipient")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("sender")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("journal_id")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("external_id")
.r#type(ParamDataType::STRING)
.build()
.unwrap(),
ParamDefinition::builder()
.name("effective")
.r#type(ParamDataType::DATE)
.default_expr("date()")
.build()
.unwrap(),
];
let entries = vec![
EntryInput::builder()
.entry_type("'TEST_DR'")
.account_id("params.sender")
.layer("SETTLED")
.direction("DEBIT")
.units("decimal('100')")
.currency("'USD'")
.build()
.unwrap(),
EntryInput::builder()
.entry_type("'TEST_CR'")
.account_id("params.recipient")
.layer("SETTLED")
.direction("CREDIT")
.units("decimal('100')")
.currency("'USD'")
.build()
.unwrap(),
];
let new_template = NewTxTemplate::builder()
.id(uuid::Uuid::new_v4())
.code(&tx_code)
.params(params)
.tx_input(
TxInput::builder()
.effective("params.effective")
.journal_id("params.journal_id")
.external_id("params.external_id")
.build()
.unwrap(),
)
.entries(entries)
.build()
.unwrap();
ledger.tx_templates().create(new_template).await.unwrap();
Ok((
ledger,
journal_id,
sender_account_id,
recipient_account_id,
tx_code,
))
}
/// Helper: post a single transaction and return its external_id.
async fn post_one_transaction(
ledger: &SqlxLedger,
tx_code: &str,
journal_id: JournalId,
sender_account_id: AccountId,
recipient_account_id: AccountId,
) -> anyhow::Result<String> {
let external_id = uuid::Uuid::new_v4().to_string();
let mut params = TxParams::new();
params.insert("journal_id", journal_id);
params.insert("sender", sender_account_id);
params.insert("recipient", recipient_account_id);
params.insert("external_id", external_id.clone());
ledger
.post_transaction(TransactionId::new(), tx_code, Some(params))
.await
.unwrap();
Ok(external_id)
}
// Tests the reload path: posts many transactions before subscribing, then
// subscribes with after_id to force a full catch-up from the DB.
#[tokio::test]
async fn after_id_catches_up_on_many_events() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let num_transactions = 20;
for _ in 0..num_transactions {
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
}
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
// Each transaction produces 3 events: 1 TransactionCreated + 2 BalanceUpdated
let expected_min_events = num_transactions * 3;
let mut received = Vec::new();
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
while tokio::time::Instant::now() < deadline && received.len() < expected_min_events {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
assert!(
event.id > SqlxLedgerEventId::from(baseline_id),
"Received event {:?} at or before baseline {:?}",
event.id,
baseline_id
);
// Only count events belonging to this test's journal
if event.journal_id() == journal_id {
received.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n} events");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Event subscriber closed unexpectedly");
}
Err(_) => {}
}
}
assert!(
received.len() >= expected_min_events,
"Expected at least {expected_min_events} events, got {}",
received.len()
);
// Verify events are in strictly increasing ID order
for window in received.windows(2) {
assert!(
window[0].id < window[1].id,
"Events out of order: {:?} >= {:?}",
window[0].id,
window[1].id
);
}
Ok(())
}
#[tokio::test]
async fn reload_to_live_transition_receives_new_events() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let num_pre_txns = 5;
for _ in 0..num_pre_txns {
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
}
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let expected_catchup = num_pre_txns * 3;
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
let mut catchup_events = Vec::new();
while tokio::time::Instant::now() < deadline && catchup_events.len() < expected_catchup {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) if event.journal_id() == journal_id => {
catchup_events.push(event);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed during catch-up");
}
_ => {}
}
}
assert!(
catchup_events.len() >= expected_catchup,
"Catch-up: expected at least {expected_catchup} events, got {}",
catchup_events.len()
);
let live_ext_id =
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
let live_deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
let mut live_events = Vec::new();
while tokio::time::Instant::now() < live_deadline && live_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
let is_ours = match &event.data {
SqlxLedgerEventData::TransactionCreated(tx) => tx.external_id == live_ext_id,
SqlxLedgerEventData::BalanceUpdated(b) => {
b.account_id == sender_id || b.account_id == recipient_id
}
_ => false,
};
if is_ours {
live_events.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed during live phase");
}
Err(_) => {}
}
}
assert!(
live_events.len() >= 2,
"Live phase: expected at least 2 events from post-catchup transaction, got {}",
live_events.len()
);
let all: Vec<_> = catchup_events.iter().chain(live_events.iter()).collect();
for window in all.windows(2) {
assert!(
window[0].id < window[1].id,
"Events out of order: {:?} >= {:?}",
window[0].id,
window[1].id
);
}
Ok(())
}
// Multiple tasks post transactions concurrently, then a subscriber catches up
// via after_id. Each producer uses unique accounts to avoid DuplicateKey
// conflicts on sqlx_ledger_current_balances.
#[tokio::test]
async fn concurrent_producers_events_ordered() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let num_producers = 5;
let txns_per_producer = 4;
let mut handles = Vec::new();
for _ in 0..num_producers {
let pool_clone = pool.clone();
let handle = tokio::spawn(async move {
let (producer_ledger, journal_id, sender_id, recipient_id, tx_code) =
setup_ledger(&pool_clone).await.unwrap();
for _ in 0..txns_per_producer {
post_one_transaction(
&producer_ledger,
&tx_code,
journal_id,
sender_id,
recipient_id,
)
.await
.unwrap();
}
journal_id
});
handles.push(handle);
}
let mut producer_journal_ids = std::collections::HashSet::new();
for handle in handles {
producer_journal_ids.insert(handle.await?);
}
// Each transaction produces 3 events (1 TransactionCreated + 2 BalanceUpdated)
let total_transactions = num_producers * txns_per_producer;
let expected_events = total_transactions * 3;
let actual_new_events: i64 =
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM sqlx_ledger_events WHERE id > $1")
.bind(baseline_id)
.fetch_one(&pool)
.await?;
assert!(
actual_new_events >= expected_events as i64,
"Expected at least {expected_events} new events in DB, got {actual_new_events}"
);
let ledger = SqlxLedger::new(&pool);
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let mut received = Vec::new();
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
while tokio::time::Instant::now() < deadline && received.len() < expected_events {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
assert!(
event.id > SqlxLedgerEventId::from(baseline_id),
"Received event {:?} at or before baseline {:?}",
event.id,
baseline_id
);
// Only count events belonging to our producers' journals
if producer_journal_ids.contains(&event.journal_id()) {
received.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed");
}
Err(_) => {}
}
}
assert!(
received.len() >= expected_events,
"Expected at least {expected_events} events, got {}",
received.len()
);
for window in received.windows(2) {
assert!(
window[0].id < window[1].id,
"Events out of order: {:?} >= {:?}",
window[0].id,
window[1].id
);
}
Ok(())
}
// Verifies that close_on_lag=true closes the subscriber when the internal
// dispatch loop's `incoming` broadcast receiver overflows. With buffer=1, the
// subscribe() producer sends events faster than the dispatch loop can consume
// them, causing incoming.recv() to return RecvError::Lagged.
#[tokio::test]
async fn close_on_lag_closes_subscriber() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let events = ledger
.events(EventSubscriberOpts {
close_on_lag: true,
buffer: 1,
..Default::default()
})
.await?;
// Flood 20 transactions (~60 events) into a buffer of 1
for _ in 0..20 {
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
}
// Poll for the subscriber to close; the dispatch loop may not detect lag instantly
let closed = tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
loop {
if events.all().is_err() {
return;
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
})
.await;
assert!(
closed.is_ok(),
"Expected EventSubscriber to close due to lag within timeout"
);
Ok(())
}
// Tests the reload path triggered by payloads exceeding the pg_notify 8KB limit.
// The PG trigger sends a minimal payload (no data field) for large events,
// causing deserialization to fail. The subscribe loop detects this and reloads
// from the DB. We subscribe from a known baseline to isolate our events.
#[tokio::test]
async fn large_payload_triggers_reload() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let ledger = SqlxLedger::new(&pool);
let (_, journal_id, sender_account_id, recipient_account_id, small_tx_code) =
setup_ledger(&pool).await?;
post_one_transaction(
&ledger,
&small_tx_code,
journal_id,
sender_account_id,
recipient_account_id,
)
.await?;
let baseline_max_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
// Create a template with large metadata (>8KB) to exceed pg_notify limit
let large_tx_code = Alphanumeric.sample_string(&mut rand::thread_rng(), 32);
let large_value = "x".repeat(9000);
let large_metadata = format!(r#"{{"large_field": "{large_value}"}}"#);
let params = vec![
ParamDefinition::builder()
.name("recipient")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("sender")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("journal_id")
.r#type(ParamDataType::UUID)
.build()
.unwrap(),
ParamDefinition::builder()
.name("external_id")
.r#type(ParamDataType::STRING)
.build()
.unwrap(),
ParamDefinition::builder()
.name("effective")
.r#type(ParamDataType::DATE)
.default_expr("date()")
.build()
.unwrap(),
];
let entries = vec![
EntryInput::builder()
.entry_type("'TEST_DR'")
.account_id("params.sender")
.layer("SETTLED")
.direction("DEBIT")
.units("decimal('100')")
.currency("'USD'")
.build()
.unwrap(),
EntryInput::builder()
.entry_type("'TEST_CR'")
.account_id("params.recipient")
.layer("SETTLED")
.direction("CREDIT")
.units("decimal('100')")
.currency("'USD'")
.build()
.unwrap(),
];
let new_template = NewTxTemplate::builder()
.id(uuid::Uuid::new_v4())
.code(&large_tx_code)
.params(params)
.tx_input(
TxInput::builder()
.effective("params.effective")
.journal_id("params.journal_id")
.external_id("params.external_id")
.metadata(&large_metadata)
.build()
.unwrap(),
)
.entries(entries)
.build()
.unwrap();
ledger.tx_templates().create(new_template).await.unwrap();
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_max_id)),
buffer: 10000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let external_id = uuid::Uuid::new_v4().to_string();
let mut tx_params = TxParams::new();
tx_params.insert("journal_id", journal_id);
tx_params.insert("sender", sender_account_id);
tx_params.insert("recipient", recipient_account_id);
tx_params.insert("external_id", external_id.clone());
ledger
.post_transaction(TransactionId::new(), &large_tx_code, Some(tx_params))
.await
.unwrap();
// Verify events exist in the DB
let db_event_count: i64 =
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM sqlx_ledger_events WHERE id > $1")
.bind(baseline_max_id)
.fetch_one(&pool)
.await?;
assert!(
db_event_count >= 3,
"Expected at least 3 new events in DB, got {db_event_count}"
);
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(15);
let mut our_events = Vec::new();
while tokio::time::Instant::now() < deadline && our_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
let is_ours = match &event.data {
SqlxLedgerEventData::TransactionCreated(tx) => tx.external_id == external_id,
SqlxLedgerEventData::BalanceUpdated(b) => {
b.account_id == sender_account_id || b.account_id == recipient_account_id
}
_ => false,
};
if is_ours {
our_events.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n} events");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly");
}
Err(_) => {}
}
}
// TransactionCreated can be lost due to the resubscribe() gap: events fetched
// during catch-up may be sent into the broadcast channel before the user's
// receiver is created via all() -> resubscribe(). TransactionCreated is the
// first event and most likely to fall into this window under concurrent load.
assert!(
our_events.len() >= 2,
"Expected at least 2 events from large-payload transaction, got {}. \
Types received: {:?}. \
The reload path may not have recovered correctly.",
our_events.len(),
our_events
.iter()
.map(|e| format!("{:?}", e.r#type))
.collect::<Vec<_>>()
);
let balance_count = our_events
.iter()
.filter(|e| matches!(&e.data, SqlxLedgerEventData::BalanceUpdated(_)))
.count();
assert_eq!(balance_count, 2, "Expected exactly 2 BalanceUpdated events");
Ok(())
}
#[tokio::test]
async fn live_gap_triggers_reload_and_recovers() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let _ext_id_1 =
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
let mut phase1_events = Vec::new();
while tokio::time::Instant::now() < deadline && phase1_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) if event.journal_id() == journal_id => {
phase1_events.push(event);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n} events");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly in phase 1");
}
_ => {}
}
}
assert!(
phase1_events.len() >= 2,
"Phase 1: expected at least 2 events, got {}",
phase1_events.len()
);
let fake_id = baseline_id + 100_000;
let fake_payload = format!(
r#"{{"id": {fake_id}, "type": "BalanceUpdated", "data": {{"journal_id": "00000000-0000-0000-0000-000000000000", "account_id": "00000000-0000-0000-0000-000000000000", "entry_id": "00000000-0000-0000-0000-000000000000", "currency": "USD", "settled_dr_balance": "0", "settled_cr_balance": "0", "settled_entry_id": "00000000-0000-0000-0000-000000000000", "settled_modified_at": "2024-01-01T00:00:00Z", "pending_dr_balance": "0", "pending_cr_balance": "0", "pending_entry_id": "00000000-0000-0000-0000-000000000000", "pending_modified_at": "2024-01-01T00:00:00Z", "encumbered_dr_balance": "0", "encumbered_cr_balance": "0", "encumbered_entry_id": "00000000-0000-0000-0000-000000000000", "encumbered_modified_at": "2024-01-01T00:00:00Z", "version": 1, "modified_at": "2024-01-01T00:00:00Z", "created_at": "2024-01-01T00:00:00Z"}}, "recorded_at": "2024-01-01T00:00:00Z"}}"#
);
sqlx::query(&format!("NOTIFY sqlx_ledger_events, '{fake_payload}'"))
.execute(&pool)
.await?;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let ext_id_2 =
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
let mut phase2_events = Vec::new();
while tokio::time::Instant::now() < deadline && phase2_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
let is_ours = match &event.data {
SqlxLedgerEventData::TransactionCreated(tx) => tx.external_id == ext_id_2,
SqlxLedgerEventData::BalanceUpdated(b) => {
b.account_id == sender_id || b.account_id == recipient_id
}
_ => false,
};
if is_ours && event.id > SqlxLedgerEventId::from(baseline_id) {
phase2_events.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n} events");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly in phase 2");
}
Err(_) => {}
}
}
assert!(
phase2_events.len() >= 2,
"Phase 2: expected at least 2 events after gap recovery, got {}",
phase2_events.len()
);
Ok(())
}
#[tokio::test]
async fn paginated_reload_fetches_multiple_batches() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let num_transactions = 50;
for _ in 0..num_transactions {
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
}
let expected_events = num_transactions * 3;
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
batch_size: 3,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
let mut received = Vec::new();
while tokio::time::Instant::now() < deadline && received.len() < expected_events {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) if event.journal_id() == journal_id => {
received.push(event);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly");
}
_ => {}
}
}
assert!(
received.len() >= expected_events,
"Expected at least {expected_events} events across multiple batches (batch_size=3), got {}",
received.len()
);
for window in received.windows(2) {
assert!(
window[0].id < window[1].id,
"Events out of order: {:?} >= {:?}",
window[0].id,
window[1].id
);
}
Ok(())
}
#[tokio::test]
async fn pg_listener_error_triggers_reload_and_recovers() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let (ledger, journal_id, sender_id, recipient_id, tx_code) = setup_ledger(&pool).await?;
let baseline_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(id), 0) FROM sqlx_ledger_events")
.fetch_one(&pool)
.await?;
let _ext_id_1 =
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
let events = ledger
.events(EventSubscriberOpts {
after_id: Some(SqlxLedgerEventId::from(baseline_id)),
buffer: 1000,
..Default::default()
})
.await?;
let mut all_events = events.all().expect("subscriber should be open");
let per_event_timeout = tokio::time::Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
let mut phase1_events = Vec::new();
while tokio::time::Instant::now() < deadline && phase1_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) if event.journal_id() == journal_id => {
phase1_events.push(event);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly in phase 1");
}
_ => {}
}
}
assert!(
phase1_events.len() >= 2,
"Phase 1: expected at least 2 events, got {}",
phase1_events.len()
);
let listener_pids: Vec<i32> = sqlx::query_scalar::<_, i32>(
"SELECT pid FROM pg_stat_activity WHERE query ILIKE '%LISTEN%sqlx_ledger_events%' AND pid != pg_backend_pid()"
)
.fetch_all(&pool)
.await?;
assert!(
!listener_pids.is_empty(),
"Expected to find at least one PgListener backend to terminate"
);
for pid in &listener_pids {
sqlx::query("SELECT pg_terminate_backend($1)")
.bind(pid)
.execute(&pool)
.await?;
}
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let ext_id_2 =
post_one_transaction(&ledger, &tx_code, journal_id, sender_id, recipient_id).await?;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
let mut phase2_events = Vec::new();
while tokio::time::Instant::now() < deadline && phase2_events.len() < 3 {
match tokio::time::timeout(per_event_timeout, all_events.recv()).await {
Ok(Ok(event)) => {
let is_ours = match &event.data {
SqlxLedgerEventData::TransactionCreated(tx) => tx.external_id == ext_id_2,
SqlxLedgerEventData::BalanceUpdated(b) => {
b.account_id == sender_id || b.account_id == recipient_id
}
_ => false,
};
if is_ours {
phase2_events.push(event);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
eprintln!("Lagged by {n}");
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
panic!("Subscriber closed unexpectedly in phase 2");
}
Err(_) => {}
}
}
assert!(
phase2_events.len() >= 2,
"Phase 2: expected at least 2 events after PgListener recovery, got {}",
phase2_events.len()
);
Ok(())
}