#![cfg(feature = "observers")]
mod observer_test_helpers;
use std::time::Duration;
use observer_test_helpers::*;
use uuid::Uuid;
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_observer_happy_path_insert_webhook() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
&format!("test-observer-{}", test_id),
Some("Order"),
Some("INSERT"),
None, &mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let order_id = Uuid::new_v4();
let order_data = serde_json::json!({
"id": order_id.to_string(),
"status": "pending",
"amount": 100.0,
"customer": "test"
});
let _change_log_id = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
order_data.clone(),
None,
)
.await
.expect("Failed to insert change log entry");
let timeout = Duration::from_secs(10);
wait_for_webhook(&mock_server, 1, timeout).await;
let requests = mock_server.received_requests().await;
assert_eq!(requests.len(), 1, "Expected exactly 1 webhook call, got {}", requests.len());
let webhook_payload = &requests[0];
assert_webhook_payload(webhook_payload, &order_id.to_string(), Some(("status", "pending")));
assert_observer_log(
&pool,
&order_id.to_string(),
"success",
Some(1), )
.await;
let success_count = get_observer_log_count(&pool, "success")
.await
.expect("Failed to query observer logs");
assert_eq!(success_count, 1, "Expected 1 success log entry");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_observer_conditional_execution() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
&format!("test-conditional-{}", test_id),
Some("Order"),
Some("UPDATE"),
Some("status == 'shipped'"), &mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let order_id_1 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"UPDATE",
&format!("Order_{}", test_id),
&order_id_1.to_string(),
serde_json::json!({"id": order_id_1.to_string(), "status": "pending"}),
Some(serde_json::json!({"id": order_id_1.to_string(), "status": "created"})),
)
.await
.expect("Failed to insert change log entry");
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(
mock_server.request_count().await,
0,
"Webhook should not fire for status='pending'"
);
let order_id_2 = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"UPDATE",
&format!("Order_{}", test_id),
&order_id_2.to_string(),
serde_json::json!({"id": order_id_2.to_string(), "status": "shipped"}),
Some(serde_json::json!({"id": order_id_2.to_string(), "status": "pending"})),
)
.await
.expect("Failed to insert change log entry");
wait_for_webhook(&mock_server, 1, Duration::from_secs(10)).await;
let requests = mock_server.received_requests().await;
assert_eq!(requests.len(), 1, "Expected 1 webhook call for shipped status");
assert_eq!(requests[0]["after"]["status"], "shipped");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_multiple_observers_single_event() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server_1 = MockWebhookServer::start().await;
let mock_server_2 = MockWebhookServer::start().await;
mock_server_1.mock_success().await;
mock_server_2.mock_success().await;
let _observer_id_1 = create_test_observer(
&pool,
&format!("test-multi-1-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server_1.webhook_url(),
)
.await
.expect("Failed to create observer 1");
let _observer_id_2 = create_test_observer(
&pool,
&format!("test-multi-2-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server_2.webhook_url(),
)
.await
.expect("Failed to create observer 2");
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "status": "new"}),
None,
)
.await
.expect("Failed to insert change log entry");
wait_for_webhook(&mock_server_1, 1, Duration::from_secs(10)).await;
wait_for_webhook(&mock_server_2, 1, Duration::from_secs(10)).await;
assert_eq!(mock_server_1.request_count().await, 1, "Observer 1 should fire once");
assert_eq!(mock_server_2.request_count().await, 1, "Observer 2 should fire once");
let success_count = get_observer_log_count(&pool, "success")
.await
.expect("Failed to query observer logs");
assert_eq!(success_count, 2, "Expected 2 success log entries");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_observer_retry_exponential_backoff() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_transient_failure(2).await;
let _observer_id = create_test_observer(
&pool,
&format!("test-retry-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string()}),
None,
)
.await
.expect("Failed to insert change log entry");
wait_for_webhook(&mock_server, 1, Duration::from_secs(15)).await;
let logs = get_observer_logs_for_entity(&pool, &order_id.to_string())
.await
.expect("Failed to fetch observer logs");
assert!(!logs.is_empty(), "Expected observer log entries for entity {}", order_id);
let final_status = &logs.last().expect("Should have at least one log").0;
assert_eq!(final_status, "success", "Final status should be success");
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_observer_dlq_permanent_failure() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_failure(500).await;
let _observer_id = create_test_observer(
&pool,
&format!("test-dlq-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string()}),
None,
)
.await
.expect("Failed to insert change log entry");
tokio::time::sleep(Duration::from_secs(10)).await;
let failed_count = get_observer_log_count(&pool, "failed")
.await
.expect("Failed to query observer logs");
assert!(failed_count >= 1, "Expected at least 1 failed attempt, got {}", failed_count);
let success_count = get_observer_log_count(&pool, "success")
.await
.expect("Failed to query observer logs");
assert_eq!(success_count, 0, "Expected 0 success entries for permanent failure");
let webhook_calls = mock_server.request_count().await;
assert!(
webhook_calls > 1,
"Expected multiple webhook calls due to retries, got {}",
webhook_calls
);
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_multiple_event_types_same_entity() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _insert_observer = create_test_observer(
&pool,
&format!("test-insert-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create insert observer");
let _update_observer = create_test_observer(
&pool,
&format!("test-update-{}", test_id),
Some("Order"),
Some("UPDATE"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create update observer");
let order_id = Uuid::new_v4();
let entity_type = format!("Order_{}", test_id);
let _ = insert_change_log_entry(
&pool,
"INSERT",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "status": "created"}),
None,
)
.await
.expect("Failed to insert INSERT event");
wait_for_webhook(&mock_server, 1, Duration::from_secs(10)).await;
let _ = insert_change_log_entry(
&pool,
"UPDATE",
&entity_type,
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "status": "shipped"}),
Some(serde_json::json!({"id": order_id.to_string(), "status": "created"})),
)
.await
.expect("Failed to insert UPDATE event");
wait_for_webhook(&mock_server, 2, Duration::from_secs(10)).await;
let calls = mock_server.request_count().await;
assert_eq!(calls, 2, "Expected 2 webhook calls (1 INSERT + 1 UPDATE), got {}", calls);
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL"]
async fn test_batch_processing() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
&format!("test-batch-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let event_count = 10;
for i in 0..event_count {
let order_id = Uuid::new_v4();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string(), "sequence": i}),
None,
)
.await
.expect("Failed to insert change log entry");
}
wait_for_webhook(&mock_server, event_count, Duration::from_secs(30)).await;
let calls = mock_server.request_count().await;
assert_eq!(calls, event_count, "Expected {} webhook calls, got {}", event_count, calls);
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}
#[tokio::test]
#[ignore = "requires PostgreSQL - performance benchmark"]
async fn benchmark_observer_latency() {
let test_id = Uuid::new_v4().to_string();
let pool = create_test_pool().await;
setup_observer_schema(&pool).await.expect("Failed to setup schema");
let mock_server = MockWebhookServer::start().await;
mock_server.mock_success().await;
let _observer_id = create_test_observer(
&pool,
&format!("bench-latency-{}", test_id),
Some("Order"),
Some("INSERT"),
None,
&mock_server.webhook_url(),
)
.await
.expect("Failed to create observer");
let mut latencies = Vec::new();
for _ in 0..20 {
let order_id = Uuid::new_v4();
let start = tokio::time::Instant::now();
let _ = insert_change_log_entry(
&pool,
"INSERT",
&format!("Order_{}", test_id),
&order_id.to_string(),
serde_json::json!({"id": order_id.to_string()}),
None,
)
.await
.expect("Failed to insert change log entry");
let poll_timeout = Duration::from_secs(10);
let poll_start = tokio::time::Instant::now();
let expected_count = latencies.len() + 1;
while mock_server.request_count().await < expected_count
&& poll_start.elapsed() < poll_timeout
{
tokio::time::sleep(Duration::from_millis(10)).await;
}
let latency = start.elapsed();
latencies.push(latency);
tokio::time::sleep(Duration::from_millis(50)).await;
}
latencies.sort();
let p50_idx = latencies.len() / 2;
let p95_idx = (latencies.len() * 95) / 100;
let p99_idx = (latencies.len() * 99) / 100;
let p50 = latencies[p50_idx];
let p95 = latencies.get(p95_idx).copied().unwrap_or(p50);
let p99 = latencies.get(p99_idx).copied().unwrap_or(p95);
println!("\n=== Observer Latency Benchmark ===");
println!("p50: {:?} ({:.1}ms)", p50, p50.as_millis());
println!("p95: {:?} ({:.1}ms)", p95, p95.as_millis());
println!("p99: {:?} ({:.1}ms)", p99, p99.as_millis());
println!("Min: {:?}", latencies.first());
println!("Max: {:?}", latencies.last());
assert!(
p99 < Duration::from_millis(500),
"p99 latency {} exceeds 500ms threshold",
p99.as_millis()
);
cleanup_test_data(&pool, &test_id).await.expect("Failed to cleanup");
}