cerebro 1.1.6

A blazing-fast AI memory layer that enables teams of specialized agents to collaborate through a shared cognitive architecture.
Documentation
//! # HTTP Gateway
//!
//! Exposes the Cerebro Swarm engine over a REST API and WebSockets.
//! Requires the `api` feature flag.

use std::sync::Arc;
use axum::{
    routing::{post, get},
    Router,
    Json,
    extract::{State, ws::{WebSocketUpgrade, WebSocket, Message}},
};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;

use crate::traits::Result;
use super::orchestrator::{SwarmOrchestrator, SwarmPattern};
use super::patterns::SwarmResult;

/// State shared across HTTP handlers.
#[derive(Clone)]
pub struct GatewayState {
    // We wrap it in Arc<Mutex> so handlers can mutably register things or just execute.
    // In a real prod env, orchestrator might be structured to allow concurrent execute(&self).
    // SwarmOrchestrator::execute takes &self, so Arc is enough.
    pub orchestrator: Arc<SwarmOrchestrator>,
}

#[derive(Deserialize)]
pub struct ExecuteRequest {
    pub pattern: SwarmPattern,
    pub input: String,
}

#[derive(Serialize)]
pub struct ExecuteResponse {
    pub status: String,
    pub result: Option<SwarmResult>,
    pub error: Option<String>,
}

/// Create the Axum router for the Swarm API.
pub fn create_router(orchestrator: Arc<SwarmOrchestrator>) -> Router {
    let state = GatewayState { orchestrator };

    Router::new()
        .route("/api/swarm/execute", post(handle_execute))
        .route("/api/swarm/stream", get(handle_stream))
        .layer(CorsLayer::permissive())
        .with_state(state)
}

/// POST /api/swarm/execute
/// Synchronously executes a swarm pattern.
async fn handle_execute(
    State(state): State<GatewayState>,
    Json(req): Json<ExecuteRequest>,
) -> Json<ExecuteResponse> {
    match state.orchestrator.execute(req.pattern, &req.input).await {
        Ok(res) => Json(ExecuteResponse {
            status: "success".into(),
            result: Some(res),
            error: None,
        }),
        Err(e) => Json(ExecuteResponse {
            status: "error".into(),
            result: None,
            error: Some(e.to_string()),
        }),
    }
}

/// GET /api/swarm/stream
/// Upgrades connection to a WebSocket for event streaming.
async fn handle_stream(ws: WebSocketUpgrade) -> axum::response::Response {
    ws.on_upgrade(|socket| async move {
        handle_websocket(socket).await;
    })
}

async fn handle_websocket(mut socket: WebSocket) {
    if let Err(e) = socket.send(Message::Text("Connected to Cerebro Swarm Stream".into())).await {
        eprintln!("Websocket error: {}", e);
        return;
    }
    
    // In a full implementation, you would attach a broadcast channel to ExecutionTracer
    // and loop here awaiting channel receivers, forwarding TraceStep json to the socket.
    while let Some(msg) = socket.recv().await {
        if let Ok(_) = msg {
            // Echo or handle commands
            let _ = socket.send(Message::Text("Streaming not fully implemented. Check trace logs.".into())).await;
        } else {
            break;
        }
    }
}