use allsource_core::{
application::services::webhook::{DeliveryStatus, RegisterWebhookRequest},
domain::entities::Event,
store::EventStore,
webhook_worker,
};
use serde_json::json;
use std::sync::Arc;
fn create_test_event(entity_id: &str, event_type: &str, payload: serde_json::Value) -> Event {
Event::from_strings(
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
None,
)
.unwrap()
}
#[tokio::test]
async fn test_webhook_register_ingest_delivery() {
let store = Arc::new(EventStore::new());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
store.set_webhook_tx(tx);
let registry = store.webhook_registry();
let registry_clone = Arc::clone(®istry);
let worker_handle = tokio::spawn(async move {
webhook_worker::run_webhook_delivery_worker(rx, registry_clone).await;
});
let webhook = registry.register(RegisterWebhookRequest {
tenant_id: "default".to_string(),
url: "http://127.0.0.1:19999/webhook-test".to_string(), event_types: vec!["user.*".to_string()],
entity_ids: vec![],
secret: Some("test-secret-123".to_string()),
description: Some("Integration test webhook".to_string()),
});
assert!(registry.get(webhook.id).is_some());
assert!(webhook.active);
let event = create_test_event("user-42", "user.created", json!({"name": "Alice"}));
let event_id = event.id;
store.ingest(&event).unwrap();
let mut found_delivery = false;
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let deliveries = registry.get_deliveries(webhook.id, 100);
if !deliveries.is_empty() {
found_delivery = true;
let first = &deliveries[0];
assert_eq!(first.webhook_id, webhook.id);
assert_eq!(first.event_id, event_id);
assert!(
first.status == DeliveryStatus::Retrying || first.status == DeliveryStatus::Failed,
"Expected Retrying or Failed, got {:?}",
first.status
);
assert_eq!(first.attempt, 1);
assert!(first.error.is_some() || first.response_status.is_some());
break;
}
}
assert!(found_delivery, "Expected at least one delivery record");
drop(store);
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), worker_handle).await;
}
#[tokio::test]
async fn test_webhook_event_type_filter() {
let store = Arc::new(EventStore::new());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
store.set_webhook_tx(tx);
let registry = store.webhook_registry();
let registry_clone = Arc::clone(®istry);
let worker_handle = tokio::spawn(async move {
webhook_worker::run_webhook_delivery_worker(rx, registry_clone).await;
});
let webhook = registry.register(RegisterWebhookRequest {
tenant_id: "default".to_string(),
url: "http://127.0.0.1:19998/webhook-filter-test".to_string(),
event_types: vec!["order.*".to_string()],
entity_ids: vec![],
secret: None,
description: None,
});
let event = create_test_event("user-1", "user.created", json!({"name": "Bob"}));
store.ingest(&event).unwrap();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let deliveries = registry.get_deliveries(webhook.id, 100);
assert!(
deliveries.is_empty(),
"Non-matching event should not trigger delivery"
);
drop(store);
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), worker_handle).await;
}
#[test]
fn test_webhook_crud_lifecycle() {
let registry = allsource_core::application::services::webhook::WebhookRegistry::new();
let webhook = registry.register(RegisterWebhookRequest {
tenant_id: "tenant-1".to_string(),
url: "https://example.com/hook".to_string(),
event_types: vec!["user.*".to_string()],
entity_ids: vec![],
secret: Some("secret-key".to_string()),
description: Some("Test hook".to_string()),
});
let id = webhook.id;
let fetched = registry.get(id).unwrap();
assert_eq!(fetched.url, "https://example.com/hook");
assert!(fetched.active);
let hooks = registry.list_by_tenant("tenant-1");
assert_eq!(hooks.len(), 1);
let updated = registry
.update(
id,
allsource_core::application::services::webhook::UpdateWebhookRequest {
url: Some("https://example.com/new-hook".to_string()),
event_types: None,
entity_ids: None,
active: Some(false),
description: None,
},
)
.unwrap();
assert_eq!(updated.url, "https://example.com/new-hook");
assert!(!updated.active);
let deleted = registry.delete(id);
assert!(deleted.is_some());
assert!(registry.get(id).is_none());
assert!(registry.list_by_tenant("tenant-1").is_empty());
}
#[test]
fn test_delivery_status_tracking() {
let registry = allsource_core::application::services::webhook::WebhookRegistry::new();
let webhook_id = uuid::Uuid::new_v4();
let event_id = uuid::Uuid::new_v4();
registry.record_delivery(
allsource_core::application::services::webhook::WebhookDelivery {
id: uuid::Uuid::new_v4(),
webhook_id,
event_id,
status: DeliveryStatus::Retrying,
attempt: 1,
max_attempts: 3,
response_status: Some(500),
response_body: Some("Internal Server Error".to_string()),
error: None,
created_at: chrono::Utc::now(),
next_retry_at: Some(chrono::Utc::now() + chrono::Duration::seconds(1)),
},
);
registry.record_delivery(
allsource_core::application::services::webhook::WebhookDelivery {
id: uuid::Uuid::new_v4(),
webhook_id,
event_id,
status: DeliveryStatus::Success,
attempt: 2,
max_attempts: 3,
response_status: Some(200),
response_body: None,
error: None,
created_at: chrono::Utc::now(),
next_retry_at: None,
},
);
let deliveries = registry.get_deliveries(webhook_id, 10);
assert_eq!(deliveries.len(), 2);
assert_eq!(deliveries[0].status, DeliveryStatus::Retrying);
assert_eq!(deliveries[0].attempt, 1);
assert_eq!(deliveries[1].status, DeliveryStatus::Success);
assert_eq!(deliveries[1].attempt, 2);
}