#![cfg(feature = "observers")]
#![allow(clippy::unwrap_used)]
mod observer_test_helpers;
use std::time::Duration;
use fraiseql_server::observers::runtime::{ObserverRuntime, ObserverRuntimeConfig};
use observer_test_helpers::*;
use uuid::Uuid;
fn init_test_tracing() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.with_test_writer()
.init();
});
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_runtime_start_stop_lifecycle() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let entity_type = format!("Order_{}", test_id);
let _observer_id = create_test_observer(
&pool,
&format!("test-lifecycle-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
let probe_url = mock_server.webhook_url();
let probe = reqwest::Client::new().get(&probe_url).send().await;
eprintln!("[DIAG] mock server probe at {probe_url}: {probe:?}");
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "status": "new"}),
None,
)
.await
.expect("Failed to insert change log entry");
let cl_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM core.tb_entity_change_log WHERE object_type = $1")
.bind(&entity_type)
.fetch_one(&pool)
.await
.expect("Failed to query change log");
eprintln!("[DIAG] change log entries for {entity_type}: {}", cl_count.0);
assert!(cl_count.0 > 0, "change log entry must exist in DB");
tokio::time::sleep(Duration::from_secs(3)).await;
let health = runtime.health();
eprintln!(
"[DIAG] runtime health: running={}, observer_count={}, events_processed={}, errors={}",
health.running, health.observer_count, health.events_processed, health.errors
);
assert!(health.errors == 0, "runtime should have zero errors, got {}", health.errors);
wait_for_webhook(&mock_server, 1, Duration::from_secs(15)).await;
let requests = mock_server.received_requests().await;
assert_eq!(
requests.len(),
1,
"Expected 1 webhook call during lifecycle, got {}",
requests.len()
);
let log_count = get_observer_log_count(&pool, "success")
.await
.expect("Failed to query observer logs");
assert!(log_count > 0, "Expected at least 1 successful observer log");
let checkpoint_exists = check_checkpoint_exists(&pool, &entity_type)
.await
.expect("Failed to check checkpoint");
assert!(checkpoint_exists, "Expected checkpoint to be saved after processing");
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_checkpoint_recovery_after_restart() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let entity_type = format!("Order_{}", test_id);
let _observer_id = create_test_observer(
&pool,
&format!("test-checkpoint-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
for i in 0..5 {
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "sequence": i}),
None,
)
.await
.expect("Failed to insert change log entry");
}
wait_for_webhook(&mock_server, 5, Duration::from_secs(20)).await;
let first_request_count = mock_server.request_count().await;
assert_eq!(
first_request_count, 5,
"Expected 5 webhooks after first batch, got {}",
first_request_count
);
for i in 5..10 {
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "sequence": i}),
None,
)
.await
.expect("Failed to insert change log entry");
}
wait_for_webhook(&mock_server, 10, Duration::from_secs(20)).await;
let checkpoint_after_second = get_checkpoint_value(&pool, &entity_type)
.await
.expect("Failed to get checkpoint");
assert!(
checkpoint_after_second > 0,
"Expected checkpoint to be updated after second batch"
);
let requests = mock_server.received_requests().await;
let ids: Vec<String> = requests
.iter()
.filter_map(|r| r["after"]["id"].as_str().map(|s| s.to_string()))
.collect();
let unique_ids: std::collections::HashSet<_> = ids.iter().cloned().collect();
assert_eq!(ids.len(), unique_ids.len(), "Expected no duplicate IDs in webhook payloads");
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_hot_reload_observers() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server_1 = MockWebhookServer::start().await;
let mock_server_2 = MockWebhookServer::start().await;
mock_server_1.mock_success().await;
mock_server_2.mock_success().await;
let entity_type = format!("Order_{}", test_id);
let _observer_id_1 = create_test_observer(
&pool,
&format!("test-reload-1-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server_1.webhook_url(),
)
.await
.expect("Failed to create observer 1");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
let order_id_1 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id_1.to_string(),
serde_json::json!({"id": order_id_1.to_string(), "status": "created"}),
None,
)
.await
.expect("Failed to insert change log entry 1");
wait_for_webhook(&mock_server_1, 1, Duration::from_secs(15)).await;
assert_eq!(mock_server_1.request_count().await, 1);
let _observer_id_2 = create_test_observer(
&pool,
&format!("test-reload-2-{}", test_id),
Some(&entity_type),
Some("UPDATE"),
None,
&mock_server_2.webhook_url(),
)
.await
.expect("Failed to create observer 2");
let observer_count = runtime.reload_observers().await.expect("Failed to reload observers");
assert_eq!(observer_count, 2, "Should have 2 observers after reload");
let order_id_2 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"UPDATE",
&entity_type,
&order_id_2.to_string(),
serde_json::json!({"id": order_id_2.to_string(), "status": "updated"}),
Some(serde_json::json!({"id": order_id_2.to_string(), "status": "created"})),
)
.await
.expect("Failed to insert change log entry 2");
wait_for_webhook(&mock_server_2, 1, Duration::from_secs(15)).await;
assert_eq!(mock_server_1.request_count().await, 1, "Observer 1 should have 1 event");
assert_eq!(
mock_server_2.request_count().await,
1,
"Observer 2 should have 1 event after reload"
);
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_graceful_shutdown_mid_processing() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_delayed_response(Duration::from_secs(2)).await;
let entity_type = format!("Order_{}", test_id);
let _observer_id = create_test_observer(
&pool,
&format!("test-shutdown-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
let order_ids: Vec<_> = (0..5)
.map(|i| {
let order_id = Uuid::new_v4();
(order_id, i)
})
.collect();
for (order_id, i) in &order_ids {
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "sequence": i}),
None,
)
.await
.expect("Failed to insert change log entry");
}
tokio::time::sleep(Duration::from_secs(11)).await;
let checkpoint_exists = check_checkpoint_exists(&pool, &entity_type)
.await
.expect("Failed to check checkpoint");
assert!(checkpoint_exists, "Expected checkpoint to exist");
let initial_count = mock_server.request_count().await;
assert!(initial_count > 0, "Expected at least one event to start processing");
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_runtime_continues_after_errors() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_transient_failure(2).await;
let entity_type = format!("Order_{}", test_id);
let _observer_id = create_test_observer(
&pool,
&format!("test-error-resilience-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
let order_id_1 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id_1.to_string(),
serde_json::json!({"id": order_id_1.to_string(), "sequence": 1}),
None,
)
.await
.expect("Failed to insert change log entry 1");
wait_for_webhook(&mock_server, 1, Duration::from_secs(20)).await;
let requests = mock_server.received_requests().await;
assert_eq!(requests.len(), 1, "Expected 1 successful webhook after retries");
let logs = get_observer_logs_for_entity(&pool, &order_id_1.to_string())
.await
.expect("Failed to fetch observer logs");
assert!(!logs.is_empty(), "Expected observer logs for event with retries");
mock_server.reset().await;
mock_server.mock_success().await;
let order_id_2 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id_2.to_string(),
serde_json::json!({"id": order_id_2.to_string(), "sequence": 2}),
None,
)
.await
.expect("Failed to insert change log entry 2");
wait_for_webhook(&mock_server, 1, Duration::from_secs(15)).await;
let second_count = mock_server.request_count().await;
assert_eq!(second_count, 1, "Expected runtime to continue processing after errors");
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_high_throughput_processing() {
init_test_tracing();
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let entity_type = format!("Order_{}", test_id);
let _observer_id = create_test_observer(
&pool,
&format!("test-throughput-{}", test_id),
Some(&entity_type),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
let event_count = 100;
for i in 0..event_count {
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "sequence": i, "batch": "throughput"}),
None,
)
.await
.expect("Failed to insert change log entry");
}
wait_for_webhook(&mock_server, event_count, Duration::from_secs(60)).await;
let request_count = mock_server.request_count().await;
assert_eq!(
request_count, event_count,
"Expected {} webhooks for high throughput test, got {}",
event_count, request_count
);
let requests = mock_server.received_requests().await;
let ids: Vec<String> = requests
.iter()
.filter_map(|r| r["after"]["id"].as_str().map(|s| s.to_string()))
.collect();
let unique_ids: std::collections::HashSet<_> = ids.iter().cloned().collect();
assert_eq!(ids.len(), unique_ids.len(), "Expected no duplicates in high throughput test");
let success_count = get_observer_log_count(&pool, "success")
.await
.expect("Failed to query observer logs");
assert!(
usize::try_from(success_count).unwrap_or(0) >= event_count * 90 / 100,
"Expected at least 90% of events logged as success, got {}",
success_count
);
runtime.stop().await.expect("Failed to stop runtime");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_runtime_basic_lifecycle() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(50);
let mut runtime = ObserverRuntime::new(config);
let start_result = runtime.start().await;
assert!(start_result.is_ok(), "Failed to start runtime: {:?}", start_result);
let stop_result = runtime.stop().await;
assert!(stop_result.is_ok(), "Failed to stop runtime: {:?}", stop_result);
let mut runtime2 = ObserverRuntime::new(ObserverRuntimeConfig::new(pool));
let start_result2 = runtime2.start().await;
assert!(start_result2.is_ok(), "Failed to start runtime second time");
runtime2.stop().await.ok();
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_debug_event_processing() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
"debug-observer",
Some("TestOrder"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let observer_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM tb_observer WHERE enabled = true")
.fetch_one(&pool)
.await
.expect("Failed to count observers");
println!("✓ Created observer. Count in DB: {}", observer_count.0);
assert_eq!(observer_count.0, 1, "Observer not in database");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(10);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
println!("✓ Runtime started");
let order_id = uuid::Uuid::new_v4();
let _change_log_id = insert_change_log_entry(
&pool,
"INSERT",
"TestOrder",
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "amount": 100}),
None,
)
.await
.expect("Failed to insert change log entry");
println!("✓ Inserted change log entry");
let entry_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM core.tb_entity_change_log")
.fetch_one(&pool)
.await
.expect("Failed to count entries");
println!("✓ Change log entries in DB: {}", entry_count.0);
tokio::time::sleep(Duration::from_secs(2)).await;
let requests = mock_server.received_requests().await;
println!("✓ Webhook calls received: {}", requests.len());
let log_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tb_observer_log")
.fetch_one(&pool)
.await
.ok()
.unwrap_or((0,));
println!("✓ Observer log entries: {}", log_count.0);
runtime.stop().await.expect("Failed to stop runtime");
println!("\nDebug Results:");
println!(" Observers in DB: {}", observer_count.0);
println!(" Change log entries: {}", entry_count.0);
println!(" Webhook calls: {}", requests.len());
println!(" Observer logs: {}", log_count.0);
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_observer_loading() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
"load-test-observer",
Some("Product"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let observer: Option<(String, Option<String>, Option<String>)> = sqlx::query_as(
"SELECT name, entity_type, event_type FROM tb_observer WHERE name = 'load-test-observer'",
)
.fetch_optional(&pool)
.await
.expect("Failed to query observer");
let (name, entity_type, event_type) = observer.expect("Observer not found");
println!("✓ Observer in DB:");
println!(" name: {}", name);
println!(" entity_type: {:?}", entity_type);
println!(" event_type: {:?}", event_type);
let actions: Option<(serde_json::Value,)> =
sqlx::query_as("SELECT actions FROM tb_observer WHERE name = 'load-test-observer'")
.fetch_optional(&pool)
.await
.expect("Failed to query actions");
if let Some((actions_json,)) = actions {
println!("✓ Actions:");
println!(" {}", serde_json::to_string_pretty(&actions_json).unwrap());
}
assert_eq!(name, "load-test-observer");
assert_eq!(entity_type.as_deref(), Some("Product"));
assert_eq!(event_type.as_deref(), Some("INSERT"));
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_runtime_loads_observers() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
let _observer_id = create_test_observer(
&pool,
"runtime-load-test",
Some("User"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone());
let mut runtime = ObserverRuntime::new(config);
let start_result = runtime.start().await;
println!("✓ Runtime.start() result: {:?}", start_result);
assert!(start_result.is_ok(), "Failed to start: {:?}", start_result);
tokio::time::sleep(Duration::from_millis(100)).await;
runtime.stop().await.ok();
println!("✓ Runtime started and stopped successfully");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_debug_debezium_envelope() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let order_id = uuid::Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
"Order",
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "total": 50}),
None,
)
.await
.expect("Failed to insert");
let entry: Option<(i64, Option<i64>, String, String, String, Option<String>, chrono::DateTime<chrono::Utc>, serde_json::Value)> = sqlx::query_as(
"SELECT pk_entity_change_log, fk_customer_org, object_type, object_id, modification_type, change_status, created_at, object_data FROM core.tb_entity_change_log LIMIT 1"
)
.fetch_optional(&pool)
.await
.expect("Query failed");
if let Some((pk, _fk_cust, obj_type, obj_id, mod_type, change_status, _created_at, obj_data)) =
entry
{
println!("✓ Change log entry found:");
println!(" pk: {}", pk);
println!(" object_type: {}", obj_type);
println!(" object_id: {}", obj_id);
println!(" modification_type: {}", mod_type);
println!(" change_status: {:?}", change_status);
println!(" object_data (Debezium envelope):");
println!(" {}", serde_json::to_string_pretty(&obj_data).unwrap());
if let Some(op_val) = obj_data.get("op") {
println!(" ✓ op field: {:?}", op_val);
if let Some(op_char) = op_val.as_str().and_then(|s| s.chars().next()) {
println!(" ✓ op first char: '{}'", op_char);
match op_char {
'c' => println!(" → Recognized as CREATE"),
'u' => println!(" → Recognized as UPDATE"),
'd' => println!(" → Recognized as DELETE"),
x => println!(" → UNRECOGNIZED: '{}'", x),
}
}
}
} else {
println!("✗ No change log entry found!");
}
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_action_parsing() {
let actions_json = serde_json::json!([
{
"type": "webhook",
"url": "http://127.0.0.1:8080/webhook",
"method": "POST",
"headers": {
"Content-Type": "application/json"
}
}
]);
println!("Input JSON: {}", serde_json::to_string_pretty(&actions_json).unwrap());
match serde_json::from_value::<Vec<fraiseql_observers::config::ActionConfig>>(actions_json) {
Ok(actions) => {
println!("✓ Successfully parsed {} actions", actions.len());
for (i, action) in actions.iter().enumerate() {
println!(" Action {}: {:?}", i, action);
}
},
Err(e) => {
println!("✗ Failed to parse actions: {}", e);
panic!("Action parsing failed");
},
}
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_with_longer_polling() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
"long-poll-test",
Some("Widget"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let config = ObserverRuntimeConfig::new(pool.clone()).with_poll_interval(5);
let mut runtime = ObserverRuntime::new(config);
runtime.start().await.expect("Failed to start runtime");
println!("Runtime initialized");
let widget_id = uuid::Uuid::new_v4();
println!("Inserting change log entry...");
let _ = insert_change_log_entry(
&pool,
"INSERT",
"Widget",
&widget_id.to_string(),
serde_json::json!({"id": widget_id.to_string(), "name": "Test Widget"}),
None,
)
.await
.expect("Failed to insert");
println!("Change log entry inserted");
println!("Waiting for event processing...");
for i in 0..50 {
tokio::time::sleep(Duration::from_millis(10)).await;
let requests = mock_server.received_requests().await;
if !requests.is_empty() {
println!("✓ Webhook called after {} ms", (i + 1) * 10);
break;
}
if i % 10 == 0 {
println!(" Still waiting... ({} ms elapsed)", (i + 1) * 10);
}
}
let requests = mock_server.received_requests().await;
println!("Final webhook calls: {}", requests.len());
println!("Expected: 1");
runtime.stop().await.ok();
if requests.is_empty() {
println!("\nDEBUG: Checking database state...");
let cl_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM core.tb_entity_change_log")
.fetch_one(&pool)
.await
.ok()
.unwrap_or((0,));
println!(" Change log entries: {}", cl_count.0);
let obs_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM tb_observer WHERE enabled = true")
.fetch_one(&pool)
.await
.ok()
.unwrap_or((0,));
println!(" Observers enabled: {}", obs_count.0);
let logs: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tb_observer_log")
.fetch_one(&pool)
.await
.ok()
.unwrap_or((0,));
println!(" Observer logs: {}", logs.0);
}
assert!(!requests.is_empty(), "No webhook calls received after 500ms");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_listener_direct() {
init_test_tracing();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
sqlx::query("DELETE FROM tb_observer_log")
.execute(&pool)
.await
.expect("Failed to clean observer logs");
sqlx::query("DELETE FROM tb_observer")
.execute(&pool)
.await
.expect("Failed to clean observers");
sqlx::query("DELETE FROM core.tb_entity_change_log")
.execute(&pool)
.await
.expect("Failed to clean change log");
let product_id = uuid::Uuid::new_v4();
insert_change_log_entry(
&pool,
"INSERT",
"Product",
&product_id.to_string(),
serde_json::json!({"id": product_id.to_string(), "name": "Test"}),
None,
)
.await
.expect("Failed to insert");
let config =
fraiseql_observers::listener::change_log::ChangeLogListenerConfig::new(pool.clone())
.with_poll_interval(10);
let mut listener = fraiseql_observers::listener::change_log::ChangeLogListener::new(config);
println!("Calling listener.next_batch()...");
let result = listener.next_batch().await;
match result {
Ok(entries) => {
println!("✓ Got {} entries from listener", entries.len());
assert!(!entries.is_empty(), "Listener should have found entries");
for entry in entries {
println!(
" Entry: pk={}, object_type={}, op={:?}",
entry.id,
entry.object_type,
entry.object_data.get("op")
);
match entry.to_entity_event() {
Ok(event) => {
println!(" ✓ Converted to EntityEvent: {:?}", event.event_type);
},
Err(e) => {
println!(" ✗ Failed to convert: {}", e);
panic!("Failed to convert: {}", e);
},
}
}
},
Err(e) => {
panic!("Listener failed: {}", e);
},
}
}