use super::traits::{Channel, ChannelMessage, SendMessage};
use anyhow::{Result, bail};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
pub struct WebhookChannel {
listen_port: u16,
listen_path: String,
send_url: Option<String>,
send_method: String,
auth_header: Option<String>,
secret: Option<String>,
allow_unsigned: bool,
host: String,
allow_public_bind: bool,
}
#[derive(Debug, Deserialize)]
struct IncomingWebhook {
sender: String,
content: String,
#[serde(default)]
thread_id: Option<String>,
}
#[derive(Debug, Serialize)]
struct OutgoingWebhook {
content: String,
#[serde(skip_serializing_if = "Option::is_none")]
thread_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
recipient: Option<String>,
}
impl WebhookChannel {
pub fn new(
listen_port: u16,
listen_path: Option<String>,
send_url: Option<String>,
send_method: Option<String>,
auth_header: Option<String>,
secret: Option<String>,
allow_unsigned: bool,
) -> Self {
let path = listen_path.unwrap_or_else(|| "/webhook".to_string());
let listen_path = if path.starts_with('/') {
path
} else {
format!("/{path}")
};
Self {
listen_port,
listen_path,
send_url,
send_method: send_method
.unwrap_or_else(|| "POST".to_string())
.to_uppercase(),
auth_header,
secret: secret.filter(|s| !s.trim().is_empty()),
allow_unsigned,
host: "127.0.0.1".to_string(),
allow_public_bind: false,
}
}
#[must_use]
pub fn with_bind(mut self, host: Option<String>, allow_public_bind: bool) -> Self {
self.host = host
.map(|h| h.trim().to_string())
.filter(|h| !h.is_empty())
.unwrap_or_else(|| "127.0.0.1".to_string());
self.allow_public_bind = allow_public_bind;
self
}
fn http_client(&self) -> reqwest::Client {
crate::config::build_runtime_proxy_client("channel.webhook")
}
fn verify_signature(&self, body: &[u8], signature: Option<&str>) -> bool {
let Some(ref secret) = self.secret else {
return true; };
let Some(sig) = signature else {
return false; };
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
return false;
};
mac.update(body);
let Ok(expected) = hex::decode(sig.trim_start_matches("sha256=")) else {
return false;
};
mac.verify_slice(&expected).is_ok()
}
}
#[async_trait]
impl Channel for WebhookChannel {
fn name(&self) -> &str {
"webhook"
}
fn supports_one_off_send(&self) -> bool {
false
}
async fn send(&self, message: &SendMessage) -> Result<()> {
let Some(ref send_url) = self.send_url else {
tracing::debug!("Webhook channel: no send_url configured, skipping outbound message");
return Ok(());
};
let client = self.http_client();
let payload = OutgoingWebhook {
content: message.content.clone(),
thread_id: message.thread_ts.clone(),
recipient: if message.recipient.is_empty() {
None
} else {
Some(message.recipient.clone())
},
};
let mut request = match self.send_method.as_str() {
"PUT" => client.put(send_url),
_ => client.post(send_url),
};
if let Some(ref auth) = self.auth_header {
request = request.header("Authorization", auth);
}
let resp = request.json(&payload).send().await?;
let status = resp.status();
if !status.is_success() {
let body = resp
.text()
.await
.unwrap_or_else(|e| format!("<failed to read response: {e}>"));
bail!("Webhook send failed ({status}): {body}");
}
Ok(())
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
use axum::{
Router,
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
routing::post,
};
use portable_atomic::{AtomicU64, Ordering};
use std::sync::Arc;
let host = self.host.as_str();
let public_bind = crate::security::pairing::is_public_bind(host);
if public_bind && !self.allow_public_bind {
bail!(
"Webhook channel is configured to bind a non-loopback address \
({host}:{port}), which exposes it on the network. Set \
[channels_config.webhook].allow_public_bind = true to opt in (a secret is \
then required), or set host = \"127.0.0.1\" to restrict it to loopback.",
port = self.listen_port
);
}
if self.secret.is_none() {
if public_bind {
bail!(
"Webhook channel binds a non-loopback address ({host}:{port}) but has no \
secret. A network-exposed endpoint must be authenticated: set \
[channels_config.webhook].secret. (allow_unsigned only applies to \
loopback binds.)",
port = self.listen_port
);
} else if self.allow_unsigned {
tracing::warn!(
"Webhook channel: NO secret configured and allow_unsigned=true — accepting \
UNAUTHENTICATED requests on {host}:{}{} (loopback). Any local caller can \
inject messages that trigger the agent; set \
[channels_config.webhook].secret to require HMAC-SHA256 signatures.",
self.listen_port,
self.listen_path
);
} else {
bail!(
"Webhook channel is enabled without a secret. Set \
[channels_config.webhook].secret for HMAC-SHA256 verification, or set \
allow_unsigned=true to deliberately accept unauthenticated requests on \
loopback."
);
}
}
let counter = Arc::new(AtomicU64::new(0));
struct WebhookState {
tx: tokio::sync::mpsc::Sender<ChannelMessage>,
secret: Option<String>,
counter: Arc<AtomicU64>,
}
let state = Arc::new(WebhookState {
tx: tx.clone(),
secret: self.secret.clone(),
counter: counter.clone(),
});
let listen_path = self.listen_path.clone();
async fn handle_webhook(
State(state): State<Arc<WebhookState>>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
if let Some(ref secret) = state.secret {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let signature = headers
.get("x-webhook-signature")
.and_then(|v| v.to_str().ok());
let valid = if let Some(sig) = signature {
if let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) {
mac.update(&body);
let expected =
hex::decode(sig.trim_start_matches("sha256=")).unwrap_or_default();
mac.verify_slice(&expected).is_ok()
} else {
false
}
} else {
false
};
if !valid {
tracing::warn!("Webhook: invalid signature, rejecting request");
return StatusCode::UNAUTHORIZED;
}
}
let payload: IncomingWebhook = match serde_json::from_slice(&body) {
Ok(p) => p,
Err(e) => {
tracing::warn!("Webhook: invalid JSON payload: {e}");
return StatusCode::BAD_REQUEST;
}
};
if payload.content.is_empty() {
return StatusCode::BAD_REQUEST;
}
let seq = state.counter.fetch_add(1, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let reply_target = payload
.thread_id
.clone()
.unwrap_or_else(|| payload.sender.clone());
let msg = ChannelMessage {
id: format!("webhook_{seq}"),
sender: payload.sender,
reply_target,
content: payload.content,
channel: "webhook".to_string(),
timestamp,
thread_ts: payload.thread_id,
interruption_scope_id: None,
attachments: vec![],
};
if state.tx.send(msg).await.is_err() {
return StatusCode::SERVICE_UNAVAILABLE;
}
StatusCode::OK
}
let app = Router::new()
.route(&listen_path, post(handle_webhook))
.with_state(state);
tracing::info!(
"Webhook channel listening on http://{host}:{}{} ...",
self.listen_port,
self.listen_path
);
let bind_host = host
.strip_prefix('[')
.and_then(|h| h.strip_suffix(']'))
.unwrap_or(host);
let listener = tokio::net::TcpListener::bind((bind_host, self.listen_port)).await?;
axum::serve(listener, app)
.await
.map_err(|e| anyhow::anyhow!("Webhook server error: {e}"))?;
Ok(())
}
async fn health_check(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_channel() -> WebhookChannel {
WebhookChannel::new(
8080,
Some("/webhook".into()),
Some("https://example.com/callback".into()),
None,
None,
None,
true,
)
}
fn make_channel_with_secret() -> WebhookChannel {
WebhookChannel::new(
8080,
None,
Some("https://example.com/callback".into()),
None,
None,
Some("mysecret".into()),
false,
)
}
#[test]
fn default_path() {
let ch = WebhookChannel::new(8080, None, None, None, None, None, false);
assert_eq!(ch.listen_path, "/webhook");
}
#[test]
fn path_normalized() {
let ch = WebhookChannel::new(
8080,
Some("hooks/incoming".into()),
None,
None,
None,
None,
false,
);
assert_eq!(ch.listen_path, "/hooks/incoming");
}
#[tokio::test]
async fn listen_fails_closed_without_secret() {
let ch = WebhookChannel::new(0, None, None, None, None, None, false);
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let err = ch.listen(tx).await.unwrap_err().to_string();
assert!(err.contains("without a secret"), "got: {err}");
}
#[test]
fn host_defaults_to_loopback() {
let ch = make_channel();
assert_eq!(ch.host, "127.0.0.1");
assert!(!ch.allow_public_bind);
}
#[test]
fn with_bind_blank_host_falls_back_to_loopback() {
let ch = make_channel_with_secret().with_bind(Some(" ".into()), false);
assert_eq!(ch.host, "127.0.0.1");
let ch2 = make_channel_with_secret().with_bind(None, true);
assert_eq!(ch2.host, "127.0.0.1");
assert!(ch2.allow_public_bind);
}
#[tokio::test]
async fn listen_bails_on_public_bind_without_opt_in() {
let ch = make_channel_with_secret().with_bind(Some("0.0.0.0".into()), false);
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let err = ch.listen(tx).await.unwrap_err().to_string();
assert!(err.contains("non-loopback"), "got: {err}");
assert!(err.contains("allow_public_bind"), "got: {err}");
}
#[tokio::test]
async fn listen_bails_on_public_bind_without_secret_even_with_allow_unsigned() {
let ch = make_channel().with_bind(Some("0.0.0.0".into()), true);
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let err = ch.listen(tx).await.unwrap_err().to_string();
assert!(err.contains("must be authenticated"), "got: {err}");
}
#[test]
fn send_method_default() {
let ch = make_channel();
assert_eq!(ch.send_method, "POST");
}
#[test]
fn send_method_put() {
let ch = WebhookChannel::new(
8080,
None,
Some("https://example.com".into()),
Some("put".into()),
None,
None,
true,
);
assert_eq!(ch.send_method, "PUT");
}
#[test]
fn incoming_payload_deserializes_all_fields() {
let json = r#"{"sender": "revka_user", "content": "hello", "thread_id": "t1"}"#;
let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
assert_eq!(payload.sender, "revka_user");
assert_eq!(payload.content, "hello");
assert_eq!(payload.thread_id.as_deref(), Some("t1"));
}
#[test]
fn incoming_payload_without_thread() {
let json = r#"{"sender": "bob", "content": "hi"}"#;
let payload: IncomingWebhook = serde_json::from_str(json).unwrap();
assert_eq!(payload.sender, "bob");
assert_eq!(payload.content, "hi");
assert!(payload.thread_id.is_none());
}
#[test]
fn outgoing_payload_serializes_content() {
let payload = OutgoingWebhook {
content: "response".into(),
thread_id: Some("t1".into()),
recipient: Some("revka_user".into()),
};
let json = serde_json::to_value(&payload).unwrap();
assert_eq!(json["content"], "response");
assert_eq!(json["thread_id"], "t1");
assert_eq!(json["recipient"], "revka_user");
}
#[test]
fn outgoing_payload_omits_none_fields() {
let payload = OutgoingWebhook {
content: "response".into(),
thread_id: None,
recipient: None,
};
let json = serde_json::to_value(&payload).unwrap();
assert_eq!(json["content"], "response");
assert!(json.get("thread_id").is_none());
assert!(json.get("recipient").is_none());
}
#[test]
fn verify_signature_no_secret() {
let ch = make_channel();
assert!(ch.verify_signature(b"body", None));
}
#[test]
fn verify_signature_missing_header() {
let ch = make_channel_with_secret();
assert!(!ch.verify_signature(b"body", None));
}
#[test]
fn verify_signature_valid() {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let ch = make_channel_with_secret();
let body = b"test body";
let mut mac = HmacSha256::new_from_slice(b"mysecret").unwrap();
mac.update(body);
let sig = hex::encode(mac.finalize().into_bytes());
assert!(ch.verify_signature(body, Some(&sig)));
}
#[test]
fn verify_signature_invalid() {
let ch = make_channel_with_secret();
assert!(!ch.verify_signature(b"body", Some("badhex")));
}
}