use subtle::ConstantTimeEq;
use axum::{
Json,
body::to_bytes,
extract::{Path, Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde::Deserialize;
use serde_json::{Value, json};
use super::AppState;
use super::agent::{
CHANNEL_PROCESSING_ERROR_REPLY, channel_chat_id_for_inbound, process_channel_message,
};
pub async fn webhook_telegram(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Json(body): axum::extract::Json<Value>,
) -> impl IntoResponse {
let adapter = match state.telegram.as_ref() {
Some(a) => a,
None => {
return super::problem_response(
StatusCode::SERVICE_UNAVAILABLE,
"Telegram not configured",
);
}
};
if adapter.webhook_secret.is_none() {
return super::problem_response(
StatusCode::SERVICE_UNAVAILABLE,
"Webhook secret not configured",
);
}
if let Some(ref secret) = adapter.webhook_secret {
let header_value = headers
.get("X-Telegram-Bot-Api-Secret-Token")
.and_then(|v| v.to_str().ok());
let matches = header_value
.map(|v| bool::from(v.as_bytes().ct_eq(secret.as_bytes())))
.unwrap_or(false);
if !matches {
return super::problem_response(
StatusCode::UNAUTHORIZED,
"missing or invalid webhook secret",
);
}
}
tracing::debug!("received Telegram webhook");
{
match adapter.process_webhook_update(&body) {
Ok(Some(inbound)) => {
let state = state.clone();
state.channel_router.record_received("telegram").await;
let inbound_for_error = inbound.clone();
tokio::spawn(async move {
if let Err(e) = process_channel_message(&state, inbound).await {
state
.channel_router
.record_processing_error("telegram", e.clone())
.await;
let chat_id = channel_chat_id_for_inbound(&inbound_for_error);
if let Err(send_err) = state
.channel_router
.send_reply(
"telegram",
&chat_id,
CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
)
.await
{
tracing::warn!(
error = %send_err,
"failed to send Telegram webhook processing failure reply"
);
}
tracing::error!(error = %e, "Telegram message processing failed");
}
});
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to parse Telegram webhook update");
}
}
}
(StatusCode::OK, Json(json!({"ok": true}))).into_response()
}
pub async fn webhook_whatsapp_verify(
State(state): State<AppState>,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> impl IntoResponse {
let mode = params.get("hub.mode").map(String::as_str).unwrap_or("");
let token = params
.get("hub.verify_token")
.map(String::as_str)
.unwrap_or("");
let challenge = params.get("hub.challenge").cloned().unwrap_or_default();
match state.whatsapp.as_ref() {
Some(adapter) => match adapter.verify_webhook_challenge(mode, token, &challenge) {
Ok(verified) => (StatusCode::OK, verified).into_response(),
Err(_) => StatusCode::FORBIDDEN.into_response(),
},
None => StatusCode::SERVICE_UNAVAILABLE.into_response(),
}
}
pub async fn webhook_whatsapp(
State(state): State<AppState>,
request: axum::extract::Request,
) -> impl IntoResponse {
let adapter = match state.whatsapp.as_ref() {
Some(a) => a,
None => {
return super::problem_response(
StatusCode::SERVICE_UNAVAILABLE,
"WhatsApp not configured",
);
}
};
let secret = match adapter.app_secret.as_ref() {
Some(s) => s,
None => {
return super::problem_response(
StatusCode::SERVICE_UNAVAILABLE,
"Webhook secret not configured",
);
}
};
const WEBHOOK_BODY_LIMIT: usize = 1024 * 1024;
let (parts, body) = request.into_parts();
let bytes = match to_bytes(body, WEBHOOK_BODY_LIMIT).await {
Ok(b) => b,
Err(_) => {
return super::problem_response(StatusCode::BAD_REQUEST, "body too large or invalid");
}
};
let sig_header = parts
.headers
.get("x-hub-signature-256")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let expected = match &sig_header {
Some(s) if s.starts_with("sha256=") => &s[7..],
_ => {
return super::problem_response(
StatusCode::UNAUTHORIZED,
"missing or invalid X-Hub-Signature-256",
);
}
};
use hmac::Mac;
let mut mac = hmac::Hmac::<sha2::Sha256>::new_from_slice(secret.as_bytes())
.expect("HMAC accepts any key size");
mac.update(&bytes);
let computed = mac.finalize().into_bytes();
let Ok(expected_bytes) = hex::decode(expected) else {
return super::problem_response(
StatusCode::UNAUTHORIZED,
"invalid webhook signature (bad hex)",
);
};
if !bool::from(computed.ct_eq(expected_bytes.as_slice())) {
return super::problem_response(StatusCode::UNAUTHORIZED, "invalid webhook signature");
}
let body_json: Value = match serde_json::from_slice(&bytes) {
Ok(v) => v,
Err(_) => {
return super::problem_response(StatusCode::BAD_REQUEST, "invalid JSON");
}
};
tracing::debug!("received WhatsApp webhook");
match adapter.process_webhook(&body_json) {
Ok(Some(inbound)) => {
let state = state.clone();
state.channel_router.record_received("whatsapp").await;
tokio::spawn(async move {
if let Err(e) = process_channel_message(&state, inbound).await {
state
.channel_router
.record_processing_error("whatsapp", e.clone())
.await;
tracing::error!(error = %e, "WhatsApp message processing failed");
}
});
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "failed to parse WhatsApp webhook");
}
}
Json(json!({"ok": true})).into_response()
}
pub async fn get_channels_status(State(state): State<AppState>) -> impl IntoResponse {
let statuses = state.channel_router.channel_status().await;
let mut result: Vec<Value> = vec![json!({
"name": "web",
"connected": true,
"messages_received": 0,
"messages_sent": 0,
"error_count": 0,
"health": "connected",
})];
for s in statuses {
result.push(json!({
"name": s.name,
"connected": s.connected,
"messages_received": s.messages_received,
"messages_sent": s.messages_sent,
"error_count": s.error_count,
"last_error": s.last_error,
"last_activity": s.last_activity,
"last_successful_at": s.last_successful_at,
"health": s.health,
}));
}
{
let a2a = state.a2a.read().await;
result.push(json!({
"name": "a2a",
"connected": a2a.config.enabled,
"sessions_active": a2a.session_count(),
}));
}
Json(json!(result))
}
const KNOWN_PLATFORMS: &[&str] = &[
"telegram", "discord", "whatsapp", "signal", "email", "matrix", "web",
];
pub async fn get_integrations(State(state): State<AppState>) -> impl IntoResponse {
let statuses = state.channel_router.channel_status().await;
let config = state.config.read().await;
let channels_cfg = &config.channels;
let mut platforms: Vec<Value> = Vec::new();
for &platform in KNOWN_PLATFORMS {
let (configured, enabled) = match platform {
"telegram" => (
channels_cfg.telegram.is_some(),
channels_cfg
.telegram
.as_ref()
.map(|c| c.enabled)
.unwrap_or(false),
),
"discord" => (
channels_cfg.discord.is_some(),
channels_cfg
.discord
.as_ref()
.map(|c| c.enabled)
.unwrap_or(false),
),
"whatsapp" => (
channels_cfg.whatsapp.is_some(),
channels_cfg
.whatsapp
.as_ref()
.map(|c| c.enabled)
.unwrap_or(false),
),
"signal" => (
channels_cfg.signal.is_some(),
channels_cfg
.signal
.as_ref()
.map(|c| c.enabled)
.unwrap_or(false),
),
"email" => {
let has_smtp = !channels_cfg.email.smtp_host.is_empty();
(has_smtp, has_smtp && channels_cfg.email.enabled)
}
"matrix" => (
channels_cfg.matrix.is_some(),
channels_cfg
.matrix
.as_ref()
.map(|c| c.enabled)
.unwrap_or(false),
),
"web" => (true, true),
_ => (false, false),
};
let runtime = statuses.iter().find(|s| s.name == platform);
let entry = if let Some(s) = runtime {
json!({
"name": platform,
"configured": configured,
"enabled": enabled,
"health": s.health,
"messages_received": s.messages_received,
"messages_sent": s.messages_sent,
"error_count": s.error_count,
"last_error": s.last_error,
"last_activity": s.last_activity,
"last_successful_at": s.last_successful_at,
})
} else if platform == "web" {
json!({
"name": "web",
"configured": true,
"enabled": true,
"health": "connected",
"messages_received": 0,
"messages_sent": 0,
"error_count": 0,
})
} else {
json!({
"name": platform,
"configured": configured,
"enabled": enabled,
"health": "disconnected",
})
};
platforms.push(entry);
}
Json(json!({ "platforms": platforms }))
}
#[derive(Debug, Deserialize)]
pub struct DeadLetterQuery {
#[serde(default = "default_dead_letter_limit")]
pub limit: usize,
}
fn default_dead_letter_limit() -> usize {
50
}
pub async fn get_dead_letters(
State(state): State<AppState>,
Query(query): Query<DeadLetterQuery>,
) -> impl IntoResponse {
let limit = query.limit.clamp(1, 500);
let dead_letters = state.channel_router.dead_letters(limit).await;
let payload: Vec<Value> = dead_letters
.into_iter()
.map(|item| {
json!({
"id": item.id,
"channel": item.channel,
"recipient_id": item.recipient_id,
"content": item.content,
"idempotency_key": item.idempotency_key,
"attempts": item.attempts,
"max_attempts": item.max_attempts,
"last_error": item.last_error,
"created_at": item.created_at,
})
})
.collect();
Json(json!({ "items": payload, "count": payload.len() }))
}
pub async fn replay_dead_letter(
State(state): State<AppState>,
Path(id): Path<String>,
) -> impl IntoResponse {
let replayed = state.channel_router.replay_dead_letter(&id).await;
if replayed {
(StatusCode::OK, Json(json!({"ok": true, "id": id}))).into_response()
} else {
super::problem_response(StatusCode::NOT_FOUND, "dead-letter item not found")
}
}
pub async fn test_channel(
State(state): State<AppState>,
Path(platform): Path<String>,
) -> impl IntoResponse {
let platform_lower = platform.to_ascii_lowercase();
let statuses = state.channel_router.channel_status().await;
let matched = statuses
.iter()
.find(|s| s.name.to_ascii_lowercase() == platform_lower);
match matched {
Some(status) => {
let diagnostics = json!({
"platform": status.name,
"connected": status.connected,
"health": status.health,
"messages_received": status.messages_received,
"messages_sent": status.messages_sent,
"error_count": status.error_count,
"last_error": status.last_error,
"last_activity": status.last_activity,
"last_successful_at": status.last_successful_at,
"test_result": if status.connected { "pass" } else { "fail" },
"details": if status.connected {
format!("{} adapter is connected and operational", status.name)
} else if let Some(ref err) = status.last_error {
format!("{} adapter is not connected: {}", status.name, err)
} else {
format!("{} adapter is not connected (no error details available)", status.name)
}
});
Json(json!({ "ok": status.connected, "diagnostics": diagnostics })).into_response()
}
None => {
if platform_lower == "web" {
return Json(json!({
"ok": true,
"diagnostics": {
"platform": "web",
"connected": true,
"test_result": "pass",
"details": "Web channel is always available via WebSocket"
}
}))
.into_response();
}
super::problem_response(
StatusCode::NOT_FOUND,
&format!(
"Channel '{}' is not configured. Add [channels.{}] to your roboticus.toml.",
platform, platform_lower
),
)
}
}
}