convergio-ipc 0.1.2

Message bus, SSE event streaming, agent registry
Documentation
//! IPC route handlers — agent management, messaging, and status.

use std::sync::Arc;

use axum::extract::{Query, State};
use axum::response::sse::{self, Sse};
use axum::response::Json;
use serde::Deserialize;

use crate::sse::create_sse_stream;

use super::IpcState;

pub async fn handle_status(State(state): State<Arc<IpcState>>) -> Json<serde_json::Value> {
    let conn = match state.pool.get() {
        Ok(c) => c,
        Err(e) => return Json(serde_json::json!({"error": e.to_string()})),
    };
    let agents: u64 = conn
        .query_row("SELECT count(*) FROM ipc_agents", [], |r| r.get(0))
        .unwrap_or(0);
    let messages: u64 = conn
        .query_row("SELECT count(*) FROM ipc_messages", [], |r| r.get(0))
        .unwrap_or(0);
    let channels: u64 = conn
        .query_row("SELECT count(*) FROM ipc_channels", [], |r| r.get(0))
        .unwrap_or(0);
    Json(serde_json::json!({
        "agents": agents,
        "messages": messages,
        "channels": channels,
    }))
}

pub async fn handle_agents(State(state): State<Arc<IpcState>>) -> Json<serde_json::Value> {
    match crate::agents::list(&state.pool) {
        Ok(agents) => Json(serde_json::json!(agents)),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

pub async fn handle_channels(State(state): State<Arc<IpcState>>) -> Json<serde_json::Value> {
    match crate::channels::list_channels(&state.pool) {
        Ok(ch) => Json(serde_json::json!(ch)),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

pub async fn handle_context(State(state): State<Arc<IpcState>>) -> Json<serde_json::Value> {
    match crate::channels::context_list(&state.pool) {
        Ok(ctx) => Json(serde_json::json!(ctx)),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

#[derive(Debug, Deserialize)]
pub struct MessagesQuery {
    pub agent: Option<String>,
    pub channel: Option<String>,
    pub limit: Option<u32>,
}

pub async fn handle_messages(
    State(state): State<Arc<IpcState>>,
    Query(params): Query<MessagesQuery>,
) -> Json<serde_json::Value> {
    let limit = params.limit.unwrap_or(50).min(200);
    match crate::messaging::history(
        &state.pool,
        params.agent.as_deref(),
        params.channel.as_deref(),
        limit,
        None,
    ) {
        Ok(msgs) => Json(serde_json::json!(msgs)),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

#[derive(Debug, Deserialize)]
pub struct StreamQuery {
    pub agent: Option<String>,
}

pub async fn handle_stream(
    State(state): State<Arc<IpcState>>,
    Query(params): Query<StreamQuery>,
) -> Sse<impl futures_core::Stream<Item = Result<sse::Event, std::convert::Infallible>>> {
    Sse::new(create_sse_stream(
        Arc::clone(&state.event_bus),
        params.agent,
    ))
}

#[derive(Debug, Deserialize)]
pub struct SendRequest {
    pub from: String,
    pub to: String,
    pub content: String,
    #[serde(default = "default_msg_type")]
    pub msg_type: String,
    #[serde(default)]
    pub priority: i32,
}

fn default_msg_type() -> String {
    "text".into()
}

pub async fn handle_send(
    State(state): State<Arc<IpcState>>,
    Json(body): Json<SendRequest>,
) -> Json<serde_json::Value> {
    let params = crate::messaging::SendParams {
        from: &body.from,
        to: &body.to,
        content: &body.content,
        msg_type: &body.msg_type,
        priority: body.priority,
        rate_limit: state.rate_limit,
    };
    match crate::messaging::send(&state.pool, &state.notify, &params) {
        Ok(id) => Json(serde_json::json!({"id": id})),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

#[derive(Debug, Deserialize)]
pub struct RegisterAgentRequest {
    pub name: String,
    #[serde(default = "default_agent_type")]
    pub agent_type: String,
    pub pid: Option<u32>,
    pub host: Option<String>,
    pub metadata: Option<String>,
    pub parent_agent: Option<String>,
}

fn default_agent_type() -> String {
    "claude".into()
}

#[derive(Debug, Deserialize)]
pub struct ContextSetRequest {
    pub key: String,
    pub value: String,
    #[serde(default = "default_context_set_by")]
    pub set_by: String,
}

fn default_context_set_by() -> String {
    "api".into()
}

pub async fn handle_context_set(
    State(state): State<Arc<IpcState>>,
    Json(body): Json<ContextSetRequest>,
) -> Json<serde_json::Value> {
    match crate::channels::context_set(&state.pool, &body.key, &body.value, &body.set_by) {
        Ok(()) => Json(serde_json::json!({"ok": true})),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

pub async fn handle_register_agent(
    State(state): State<Arc<IpcState>>,
    Json(body): Json<RegisterAgentRequest>,
) -> Json<serde_json::Value> {
    let host = body.host.unwrap_or_else(super::local_hostname);
    match crate::agents::register(
        &state.pool,
        &body.name,
        &body.agent_type,
        body.pid,
        &host,
        body.metadata.as_deref(),
        body.parent_agent.as_deref(),
    ) {
        Ok(()) => Json(serde_json::json!({"ok": true})),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

pub async fn handle_unregister_agent(
    State(state): State<Arc<IpcState>>,
    axum::extract::Path(name): axum::extract::Path<String>,
) -> Json<serde_json::Value> {
    let host = super::local_hostname();
    match crate::agents::unregister(&state.pool, &name, &host) {
        Ok(()) => Json(serde_json::json!({"ok": true})),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}

#[derive(Debug, Deserialize)]
pub struct HeartbeatAgentRequest {
    pub name: String,
    pub host: Option<String>,
}

pub async fn handle_agent_heartbeat(
    State(state): State<Arc<IpcState>>,
    Json(body): Json<HeartbeatAgentRequest>,
) -> Json<serde_json::Value> {
    let host = body.host.unwrap_or_else(super::local_hostname);
    match crate::agents::heartbeat(&state.pool, &body.name, &host) {
        Ok(()) => Json(serde_json::json!({"ok": true})),
        Err(e) => Json(serde_json::json!({"error": e.to_string()})),
    }
}