use subtle::ConstantTimeEq;
use axum::{
Json,
body::to_bytes,
extract::{Path, Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde::Deserialize;
use serde_json::{Value, json};
use super::AppState;
use super::agent::{
CHANNEL_PROCESSING_ERROR_REPLY, channel_chat_id_for_inbound, process_channel_message,
};
pub async fn webhook_telegram(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Json(body): axum::extract::Json<Value>,
) -> impl IntoResponse {
let adapter = match state.telegram.as_ref() {
Some(a) => a,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"ok": false, "error": "Telegram not configured"})),
)
.into_response();
}
};
if adapter.webhook_secret.is_none() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"ok": false, "error": "Webhook secret not configured"})),
)
.into_response();
}
if let Some(ref secret) = adapter.webhook_secret {
let header_value = headers
.get("X-Telegram-Bot-Api-Secret-Token")
.and_then(|v| v.to_str().ok());
let matches = header_value
.map(|v| bool::from(v.as_bytes().ct_eq(secret.as_bytes())))
.unwrap_or(false);
if !matches {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"ok": false, "error": "missing or invalid webhook secret"})),
)
.into_response();
}
}
tracing::debug!("received Telegram webhook");
{
match adapter.process_webhook_update(&body) {
Ok(Some(inbound)) => {
let state = state.clone();
state.channel_router.record_received("telegram").await;
let inbound_for_error = inbound.clone();
tokio::spawn(async move {
if let Err(e) = process_channel_message(&state, inbound).await {
state
.channel_router
.record_processing_error("telegram", e.clone())
.await;
let chat_id = channel_chat_id_for_inbound(&inbound_for_error);
if let Err(send_err) = state
.channel_router
.send_reply(
"telegram",
&chat_id,
CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
)
.await
{
tracing::warn!(
error = %send_err,
"failed to send Telegram webhook processing failure reply"
);
}
tracing::error!(error = %e, "Telegram message processing failed");
}
});
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to parse Telegram webhook update");
}
}
}
(StatusCode::OK, Json(json!({"ok": true}))).into_response()
}
pub async fn webhook_whatsapp_verify(
State(state): State<AppState>,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> impl IntoResponse {
let mode = params.get("hub.mode").map(String::as_str).unwrap_or("");
let token = params
.get("hub.verify_token")
.map(String::as_str)
.unwrap_or("");
let challenge = params.get("hub.challenge").cloned().unwrap_or_default();
match state.whatsapp.as_ref() {
Some(adapter) => match adapter.verify_webhook_challenge(mode, token, &challenge) {
Ok(verified) => (StatusCode::OK, verified).into_response(),
Err(_) => StatusCode::FORBIDDEN.into_response(),
},
None => StatusCode::SERVICE_UNAVAILABLE.into_response(),
}
}
pub async fn webhook_whatsapp(
State(state): State<AppState>,
request: axum::extract::Request,
) -> impl IntoResponse {
let adapter = match state.whatsapp.as_ref() {
Some(a) => a,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"ok": false, "error": "WhatsApp not configured"})),
)
.into_response();
}
};
let secret = match adapter.app_secret.as_ref() {
Some(s) => s,
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"ok": false, "error": "Webhook secret not configured"})),
)
.into_response();
}
};
const WEBHOOK_BODY_LIMIT: usize = 1024 * 1024;
let (parts, body) = request.into_parts();
let bytes = match to_bytes(body, WEBHOOK_BODY_LIMIT).await {
Ok(b) => b,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"ok": false, "error": "body too large or invalid"})),
)
.into_response();
}
};
let sig_header = parts
.headers
.get("x-hub-signature-256")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let expected = match &sig_header {
Some(s) if s.starts_with("sha256=") => &s[7..],
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"ok": false, "error": "missing or invalid X-Hub-Signature-256"})),
)
.into_response();
}
};
use hmac::Mac;
let mut mac = hmac::Hmac::<sha2::Sha256>::new_from_slice(secret.as_bytes())
.expect("HMAC accepts any key size");
mac.update(&bytes);
let computed = mac.finalize().into_bytes();
let Ok(expected_bytes) = hex::decode(expected) else {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"ok": false, "error": "invalid webhook signature (bad hex)"})),
)
.into_response();
};
if !bool::from(computed.ct_eq(expected_bytes.as_slice())) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"ok": false, "error": "invalid webhook signature"})),
)
.into_response();
}
let body_json: Value = match serde_json::from_slice(&bytes) {
Ok(v) => v,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"ok": false, "error": "invalid JSON"})),
)
.into_response();
}
};
tracing::debug!("received WhatsApp webhook");
match adapter.process_webhook(&body_json) {
Ok(Some(inbound)) => {
let state = state.clone();
state.channel_router.record_received("whatsapp").await;
tokio::spawn(async move {
if let Err(e) = process_channel_message(&state, inbound).await {
state
.channel_router
.record_processing_error("whatsapp", e.clone())
.await;
tracing::error!(error = %e, "WhatsApp message processing failed");
}
});
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to parse WhatsApp webhook");
}
}
Json(json!({"ok": true})).into_response()
}
pub async fn get_channels_status(State(state): State<AppState>) -> impl IntoResponse {
let statuses = state.channel_router.channel_status().await;
let mut result: Vec<Value> = vec![json!({
"name": "web",
"connected": true,
"messages_received": 0,
"messages_sent": 0,
})];
for s in statuses {
result.push(json!({
"name": s.name,
"connected": s.connected,
"messages_received": s.messages_received,
"messages_sent": s.messages_sent,
"last_error": s.last_error,
"last_activity": s.last_activity,
}));
}
{
let a2a = state.a2a.read().await;
result.push(json!({
"name": "a2a",
"connected": a2a.config.enabled,
"sessions_active": a2a.session_count(),
}));
}
Json(json!(result))
}
#[derive(Debug, Deserialize)]
pub struct DeadLetterQuery {
#[serde(default = "default_dead_letter_limit")]
pub limit: usize,
}
fn default_dead_letter_limit() -> usize {
50
}
pub async fn get_dead_letters(
State(state): State<AppState>,
Query(query): Query<DeadLetterQuery>,
) -> impl IntoResponse {
let limit = query.limit.clamp(1, 500);
let dead_letters = state.channel_router.dead_letters(limit).await;
let payload: Vec<Value> = dead_letters
.into_iter()
.map(|item| {
json!({
"id": item.id,
"channel": item.channel,
"recipient_id": item.recipient_id,
"content": item.content,
"idempotency_key": item.idempotency_key,
"attempts": item.attempts,
"max_attempts": item.max_attempts,
"last_error": item.last_error,
"created_at": item.created_at,
})
})
.collect();
Json(json!({ "items": payload, "count": payload.len() }))
}
pub async fn replay_dead_letter(
State(state): State<AppState>,
Path(id): Path<String>,
) -> impl IntoResponse {
let replayed = state.channel_router.replay_dead_letter(&id).await;
if replayed {
(StatusCode::OK, Json(json!({"ok": true, "id": id}))).into_response()
} else {
(
StatusCode::NOT_FOUND,
Json(json!({"ok": false, "error": "dead-letter item not found"})),
)
.into_response()
}
}