use crate::config::LocatorWebhookConfig;
use crate::rwi::gateway::EventCacheEntry;
use serde_json::json;
use std::collections::{HashSet, VecDeque};
use tokio::sync::broadcast;
use tracing::{debug, warn};
const WEBHOOK_CHANNEL_SIZE: usize = 512;
const DEDUP_CACHE_SIZE: usize = 4096;
pub fn start_rwi_webhook_handler(
config: LocatorWebhookConfig,
) -> broadcast::Sender<EventCacheEntry> {
let (tx, rx) = broadcast::channel(WEBHOOK_CHANNEL_SIZE);
crate::utils::spawn(run_rwi_webhook_handler(config, rx));
tx
}
async fn run_rwi_webhook_handler(
config: LocatorWebhookConfig,
mut rx: broadcast::Receiver<EventCacheEntry>,
) {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(
config.timeout_ms.unwrap_or(5000),
))
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let url = config.url.trim().to_string();
debug!("RWI webhook handler started for {}", url);
let mut dedup: VecDeque<(String, u64)> = VecDeque::with_capacity(DEDUP_CACHE_SIZE + 1);
let mut seen: HashSet<(String, u64)> = HashSet::new();
loop {
let entry = match rx.recv().await {
Ok(entry) => entry,
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("RWI webhook lagged, missed {} events", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
};
if !entry.call_id.is_empty() {
let dedup_key = (entry.call_id.clone(), entry.sequence);
if seen.contains(&dedup_key) {
debug!("RWI webhook: skipping duplicate event {}", entry.sequence);
continue;
}
seen.insert(dedup_key.clone());
dedup.push_back(dedup_key);
while dedup.len() > DEDUP_CACHE_SIZE {
if let Some(old) = dedup.pop_front() {
seen.remove(&old);
}
}
}
let event_value = &entry.event.payload;
let event_type = entry.event.event_type;
if !config.events.is_empty() && !config.events.iter().any(|e| e == event_type) {
continue;
}
let payload = json!({
"rwi": "1.0",
"sequence": entry.sequence,
"timestamp": entry.cached_at.to_rfc3339(),
"call_id": entry.call_id,
"event_type": event_type,
"event": event_value,
});
let header_map = config.headers.clone().unwrap_or_default();
let req = client.post(&url).json(&payload);
if let Err(e) = crate::http_util::execute_request(req, &header_map, None).await {
warn!(
"RWI webhook send failed for {} (event: {}): {}",
url, event_type, e
);
}
}
}
pub async fn send_test_event(
url: &str,
headers: Option<&std::collections::HashMap<String, String>>,
) -> Result<(), anyhow::Error> {
let test_payload = json!({
"rwi": "1.0",
"sequence": 0,
"timestamp": chrono::Utc::now().to_rfc3339(),
"call_id": "test-call-id",
"event_type": "test",
"event": {
"test": {
"message": "RustPBX RWI webhook test"
}
}
});
let opts = crate::http_util::HttpFetchOptions::new()
.with_timeout(std::time::Duration::from_secs(5))
.with_headers(headers.cloned().unwrap_or_default());
let req = reqwest::Client::new().post(url).json(&test_payload);
crate::http_util::execute_request(req, &opts.headers, opts.timeout).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
struct TestHttpServer {
port: u16,
received: Arc<Mutex<Vec<serde_json::Value>>>,
}
impl TestHttpServer {
async fn start() -> Self {
let received: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(Vec::new()));
let rc = received.clone();
let app = axum::Router::new().route(
"/hook",
axum::routing::post(move |axum::Json(body): axum::Json<serde_json::Value>| {
rc.lock().unwrap().push(body);
async { axum::Json(serde_json::json!({"status":"ok"})) }
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
crate::utils::spawn(async move {
axum::serve(listener, app).await.ok();
});
Self { port, received }
}
fn url(&self) -> String {
format!("http://127.0.0.1:{}/hook", self.port)
}
}
async fn wait_for_events(received: &Arc<Mutex<Vec<serde_json::Value>>>, min: usize, ms: u64) {
let start = std::time::Instant::now();
loop {
if received.lock().unwrap().len() >= min {
return;
}
if start.elapsed() > Duration::from_millis(ms) {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn test_webhook_receives_call_ringing() {
let server = TestHttpServer::start().await;
let config = LocatorWebhookConfig {
url: server.url(),
events: vec![],
headers: None,
timeout_ms: Some(5000),
};
let tx = start_rwi_webhook_handler(config);
tokio::time::sleep(Duration::from_millis(50)).await;
let entry = EventCacheEntry {
sequence: 1,
cached_at: chrono::Utc::now(),
call_id: "c1".into(),
event: crate::rwi::event::to_legacy_event(
&crate::rwi::CallRinging {
call_id: "c1".into(),
},
None,
),
};
tx.send(entry).ok();
wait_for_events(&server.received, 1, 2000).await;
let body = &server.received.lock().unwrap()[0];
assert_eq!(body["event_type"], "call_ringing");
}
}