#![allow(dead_code)] #![allow(clippy::unwrap_used)] #![allow(clippy::cast_precision_loss)] #![allow(clippy::cast_sign_loss)] #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_wrap)] #![allow(clippy::cast_lossless)] #![allow(clippy::missing_panics_doc)] #![allow(clippy::missing_errors_doc)] #![allow(missing_docs)] #![allow(clippy::items_after_statements)] #![allow(clippy::used_underscore_binding)] #![allow(clippy::needless_pass_by_value)] #![allow(clippy::match_same_arms)] #![allow(clippy::branches_sharing_code)] #![allow(clippy::undocumented_unsafe_blocks)]
use std::{sync::Arc, time::Duration};
use fraiseql_test_utils::database_url;
use serde_json::json;
use sqlx::PgPool;
use tokio::sync::Mutex;
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
pub async fn create_test_pool() -> PgPool {
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&database_url())
.await
.expect("Failed to connect to test database")
}
pub async fn setup_observer_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query("CREATE SCHEMA IF NOT EXISTS core").execute(pool).await?;
sqlx::query("DROP TABLE IF EXISTS core.tb_entity_change_log CASCADE")
.execute(pool)
.await?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS core.tb_entity_change_log (
pk_entity_change_log BIGSERIAL PRIMARY KEY,
id UUID NOT NULL DEFAULT gen_random_uuid(),
fk_customer_org TEXT,
fk_contact TEXT,
object_type TEXT NOT NULL,
object_id TEXT NOT NULL,
modification_type TEXT NOT NULL,
change_status TEXT,
object_data JSONB NOT NULL,
extra_metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
",
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS tb_observer (
pk_observer BIGSERIAL PRIMARY KEY,
id UUID NOT NULL DEFAULT gen_random_uuid() UNIQUE,
name VARCHAR(255) NOT NULL,
description TEXT,
entity_type VARCHAR(255),
event_type VARCHAR(50),
condition_expression TEXT,
actions JSONB NOT NULL DEFAULT '[]',
enabled BOOLEAN NOT NULL DEFAULT true,
priority INTEGER NOT NULL DEFAULT 100,
retry_config JSONB NOT NULL DEFAULT '{"max_attempts": 3, "backoff": "exponential", "initial_delay_ms": 100}',
timeout_ms INTEGER NOT NULL DEFAULT 30000,
fk_customer_org BIGINT,
created_by VARCHAR(255),
updated_by VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS tb_observer_log (
pk_observer_log BIGSERIAL PRIMARY KEY,
id UUID NOT NULL DEFAULT gen_random_uuid() UNIQUE,
fk_observer BIGINT NOT NULL REFERENCES tb_observer(pk_observer),
fk_entity_change_log BIGINT,
event_id UUID NOT NULL,
entity_type VARCHAR(255) NOT NULL,
entity_id VARCHAR(255) NOT NULL,
event_type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
action_index INTEGER,
action_type VARCHAR(50),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
duration_ms INTEGER,
error_code VARCHAR(100),
error_message TEXT,
attempt_number INTEGER NOT NULL DEFAULT 1,
max_attempts INTEGER NOT NULL DEFAULT 3,
trace_id VARCHAR(64),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
",
)
.execute(pool)
.await?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS observer_checkpoints (
listener_id VARCHAR(255) PRIMARY KEY,
last_processed_id BIGINT NOT NULL DEFAULT 0,
last_processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
batch_size INT NOT NULL DEFAULT 100,
event_count INT NOT NULL DEFAULT 0,
consecutive_errors INT NOT NULL DEFAULT 0,
last_error TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
",
)
.execute(pool)
.await?;
Ok(())
}
pub async fn cleanup_test_data(pool: &PgPool, test_id: &str) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM tb_observer_log WHERE event_id::text LIKE $1")
.bind(format!("%{test_id}%"))
.execute(pool)
.await
.ok();
sqlx::query("DELETE FROM tb_observer WHERE name LIKE $1")
.bind(format!("%{test_id}%"))
.execute(pool)
.await
.ok();
sqlx::query("DELETE FROM core.tb_entity_change_log WHERE object_type LIKE $1")
.bind(format!("%{test_id}%"))
.execute(pool)
.await
.ok();
sqlx::query("DELETE FROM observer_checkpoints WHERE listener_id LIKE $1")
.bind(format!("%{test_id}%"))
.execute(pool)
.await
.ok();
Ok(())
}
pub struct MockWebhookServer {
pub server: MockServer,
requests: Arc<Mutex<Vec<serde_json::Value>>>,
}
impl MockWebhookServer {
pub async fn start() -> Self {
let server = MockServer::start().await;
let requests = Arc::new(Mutex::new(Vec::new()));
Self { server, requests }
}
pub async fn mock_success(&self) {
let requests = Arc::clone(&self.requests);
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(move |req: &wiremock::Request| {
let body: serde_json::Value =
serde_json::from_slice(&req.body).unwrap_or_else(|_| serde_json::json!({}));
if let Ok(mut reqs) = requests.try_lock() {
reqs.push(body);
}
ResponseTemplate::new(200).set_body_json(serde_json::json!({"status": "success"}))
})
.mount(&self.server)
.await;
}
pub async fn mock_failure(&self, status_code: u16) {
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(ResponseTemplate::new(status_code))
.mount(&self.server)
.await;
}
pub async fn mock_transient_failure(&self, fail_count: usize) {
let counter = Arc::new(Mutex::new(0));
let requests = Arc::clone(&self.requests);
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(move |req: &wiremock::Request| {
let mut count = counter.try_lock().expect("Counter lock failed");
*count += 1;
let current_count = *count;
drop(count);
if current_count <= fail_count {
ResponseTemplate::new(500)
} else {
let body: serde_json::Value =
serde_json::from_slice(&req.body).unwrap_or_else(|_| serde_json::json!({}));
if let Ok(mut reqs) = requests.try_lock() {
reqs.push(body);
}
ResponseTemplate::new(200)
}
})
.mount(&self.server)
.await;
}
pub fn webhook_url(&self) -> String {
format!("{}/webhook", self.server.uri())
}
pub async fn received_requests(&self) -> Vec<serde_json::Value> {
self.requests.lock().await.clone()
}
pub async fn request_count(&self) -> usize {
self.requests.lock().await.len()
}
pub async fn mock_delayed_response(&self, delay: Duration) {
let requests = Arc::clone(&self.requests);
Mock::given(method("POST"))
.and(path("/webhook"))
.respond_with(move |req: &wiremock::Request| {
let body: serde_json::Value =
serde_json::from_slice(&req.body).unwrap_or_else(|_| serde_json::json!({}));
if let Ok(mut reqs) = requests.try_lock() {
reqs.push(body);
}
ResponseTemplate::new(200)
.set_delay(delay)
.set_body_json(serde_json::json!({"status": "success"}))
})
.mount(&self.server)
.await;
}
pub async fn reset(&self) {
let mut reqs = self.requests.lock().await;
reqs.clear();
}
}
pub async fn create_test_observer(
pool: &PgPool,
name: &str,
entity_type: Option<&str>,
event_type: Option<&str>,
condition: Option<&str>,
webhook_url: &str,
) -> Result<i64, sqlx::Error> {
let actions = json!([
{
"type": "webhook",
"url": webhook_url,
"headers": {
"Content-Type": "application/json"
}
}
]);
let retry_config = json!({
"max_attempts": 3,
"backoff": "exponential",
"initial_delay_ms": 100,
"max_delay_ms": 5000
});
let row: (i64,) = sqlx::query_as(
r"
INSERT INTO tb_observer (
name, entity_type, event_type, condition_expression,
actions, retry_config, enabled
)
VALUES ($1, $2, $3, $4, $5, $6, true)
RETURNING pk_observer
",
)
.bind(name)
.bind(entity_type)
.bind(event_type)
.bind(condition)
.bind(actions)
.bind(retry_config)
.fetch_one(pool)
.await?;
Ok(row.0)
}
pub async fn insert_change_log_entry(
pool: &PgPool,
event_type: &str, entity_type: &str,
entity_id: &str,
data: serde_json::Value,
before_data: Option<serde_json::Value>,
) -> Result<i64, sqlx::Error> {
let op_code = match event_type.to_uppercase().as_str() {
"INSERT" | "C" => "c",
"UPDATE" | "U" => "u",
"DELETE" | "D" => "d",
_ => "c", };
let object_data = json!({
"op": op_code,
"before": before_data,
"after": data,
"source": {
"db": "fraiseql_test",
"table": entity_type
},
"ts_ms": chrono::Utc::now().timestamp_millis()
});
let row: (i64,) = sqlx::query_as(
r"
INSERT INTO core.tb_entity_change_log (
object_type, object_id, modification_type, object_data
)
VALUES ($1, $2, $3, $4)
RETURNING pk_entity_change_log
",
)
.bind(entity_type)
.bind(entity_id)
.bind(event_type)
.bind(object_data)
.fetch_one(pool)
.await?;
Ok(row.0)
}
pub async fn wait_for_webhook(
server: &MockWebhookServer,
expected_count: usize,
timeout: Duration,
) {
let start = tokio::time::Instant::now();
loop {
assert!(
start.elapsed() <= timeout,
"Timeout waiting for {} webhook calls. Got: {}",
expected_count,
server.request_count().await
);
if server.request_count().await >= expected_count {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn assert_observer_log(
pool: &PgPool,
entity_id: &str,
expected_status: &str,
expected_attempts: Option<i32>,
) {
let row: Option<(String, i32, Option<i32>)> = sqlx::query_as(
r"
SELECT status, attempt_number, duration_ms
FROM tb_observer_log
WHERE entity_id = $1
ORDER BY created_at DESC
LIMIT 1
",
)
.bind(entity_id)
.fetch_optional(pool)
.await
.unwrap();
assert!(row.is_some(), "No observer log entry found for entity {}", entity_id);
let (status, attempts, duration) = row.unwrap();
assert_eq!(status, expected_status, "Expected status {}, got {}", expected_status, status);
if let Some(expected) = expected_attempts {
assert_eq!(attempts, expected, "Expected {} attempts, got {}", expected, attempts);
}
assert!(duration.is_some() && duration.unwrap() > 0, "Duration should be positive");
}
pub fn assert_webhook_payload(
payload: &serde_json::Value,
expected_entity_id: &str,
expected_field_value: Option<(&str, &str)>,
) {
assert!(payload["after"]["id"].as_str().is_some(), "Webhook payload missing after.id");
assert_eq!(payload["after"]["id"].as_str().unwrap(), expected_entity_id);
if let Some((field, value)) = expected_field_value {
assert_eq!(payload["after"][field].as_str().unwrap(), value, "Field {} mismatch", field);
}
}
pub async fn get_observer_log_count(pool: &PgPool, status: &str) -> Result<i64, sqlx::Error> {
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tb_observer_log WHERE status = $1")
.bind(status)
.fetch_one(pool)
.await?;
Ok(count.0)
}
pub async fn get_observer_logs_for_entity(
pool: &PgPool,
entity_id: &str,
) -> Result<Vec<(String, i32, Option<i32>)>, sqlx::Error> {
sqlx::query_as(
r"
SELECT status, attempt_number, duration_ms
FROM tb_observer_log
WHERE entity_id = $1
ORDER BY attempt_number ASC
",
)
.bind(entity_id)
.fetch_all(pool)
.await
}
pub async fn check_checkpoint_exists(
pool: &PgPool,
listener_id: &str,
) -> Result<bool, sqlx::Error> {
let row: Option<(i64,)> =
sqlx::query_as("SELECT COUNT(*) FROM observer_checkpoints WHERE listener_id = $1")
.bind(listener_id)
.fetch_optional(pool)
.await?;
Ok(row.is_some_and(|(count,)| count > 0))
}
pub async fn get_checkpoint_value(pool: &PgPool, listener_id: &str) -> Result<i64, sqlx::Error> {
let row: Option<(i64,)> =
sqlx::query_as("SELECT last_processed_id FROM observer_checkpoints WHERE listener_id = $1")
.bind(listener_id)
.fetch_optional(pool)
.await?;
Ok(row.map_or(0, |(id,)| id))
}
pub async fn wait_for_runtime_events(
pool: &PgPool,
expected_status: &str,
expected_count: i64,
timeout: Duration,
) {
let start = tokio::time::Instant::now();
loop {
assert!(
start.elapsed() <= timeout,
"Timeout waiting for {} events with status {}. Got: {:?}",
expected_count,
expected_status,
get_observer_log_count(pool, expected_status).await.unwrap_or(0)
);
if let Ok(count) = get_observer_log_count(pool, expected_status).await {
if count >= expected_count {
break;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_webhook_server_creation() {
let server = MockWebhookServer::start().await;
let url = server.webhook_url();
assert!(url.contains("http://"));
}
#[tokio::test]
async fn test_webhook_url_format() {
let server = MockWebhookServer::start().await;
let url = server.webhook_url();
assert!(url.ends_with("/webhook"));
}
}