robson-gateway-webhook 0.1.0

Rust async agent orchestrator for automated development workflows
Documentation
use anyhow::Result;
use async_trait::async_trait;
use sea_orm::DatabaseConnection;
use tracing::{info, warn};

use robson_core::plugin::{Gateway, MessageEvent, MessageEventKind};

use crate::entities::subscriber::Model as Subscriber;

/// Webhook Gateway plugin.
///
/// - `start`: exposes a POST /message endpoint that forwards incoming messages into the SensoriumLoop.
/// - `send`: HTTP POST callback to all registered subscribers when a MessageDelivered event fires.
pub struct WebhookGateway {
    db: DatabaseConnection,
}

impl WebhookGateway {
    pub fn new(db: DatabaseConnection) -> Self {
        Self { db }
    }

    pub async fn connect_and_migrate(db_path: &str) -> Result<Self> {
        let db = crate::db::connect_and_migrate(db_path).await?;
        Ok(Self { db })
    }
}

#[async_trait]
impl Gateway for WebhookGateway {
    fn name(&self) -> &'static str {
        "webhook"
    }

    async fn send(&self, msg: MessageEvent) -> Result<()> {
        if !matches!(msg.kind, MessageEventKind::Delivered) {
            return Ok(());
        }

        // Route to the originating callback URL carried in channel_id.
        // If not set, fall back to broadcasting to all registered subscribers.
        if !msg.channel_id.is_empty() {
            let client = reqwest::Client::new();
            let payload = serde_json::json!({ "content": msg.content });
            match client.post(&msg.channel_id).json(&payload).send().await {
                Ok(resp) => {
                    info!(url = %msg.channel_id, status = %resp.status(), "webhook callback sent")
                }
                Err(e) => warn!(url = %msg.channel_id, error = %e, "webhook callback failed"),
            }
            return Ok(());
        }

        // Legacy broadcast path (no originating channel set).
        let subscribers = Subscriber::find_all(&self.db).await?;
        if subscribers.is_empty() {
            return Ok(());
        }

        let client = reqwest::Client::new();
        let payload = serde_json::json!({ "content": msg.content });

        for sub in subscribers {
            match client.post(&sub.callback_url).json(&payload).send().await {
                Ok(resp) => {
                    info!(url = %sub.callback_url, status = %resp.status(), "webhook callback sent")
                }
                Err(e) => warn!(url = %sub.callback_url, error = %e, "webhook callback failed"),
            }
        }

        Ok(())
    }

    /// The webhook gateway does not start a long-running listener —
    /// inbound messages arrive via HTTP POST to the axum server routes and are written
    /// directly to the conversations table by the route handlers.
    /// This method returns immediately.
    async fn start(&self, _db: DatabaseConnection) -> Result<()> {
        Ok(())
    }
}