terminal-info 1.4.2

An extensible terminal information CLI and developer toolbox
Documentation
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Duration;

use serde::Deserialize;
use serde_json::json;

use crate::ai::agent::ApprovalKind;
use crate::ai::chat::ProviderKind;
use crate::ai::hook::HookEventPayload;
use crate::ai::runtime::{Runtime, RuntimeEvent};
use crate::ai::web;

#[derive(Debug, Clone)]
pub struct LocalApi {
    bind_address: String,
    web_enabled: bool,
    refresh_ms: u64,
}

pub struct LocalApiServer {
    stop: Arc<AtomicBool>,
    handle: Option<JoinHandle<()>>,
}

impl LocalApi {
    pub fn new(bind_address: impl Into<String>, web_enabled: bool, refresh_ms: u64) -> Self {
        Self {
            bind_address: bind_address.into(),
            web_enabled,
            refresh_ms,
        }
    }

    pub fn bind_address(&self) -> &str {
        &self.bind_address
    }

    pub fn start(&self, runtime: Runtime) -> Result<LocalApiServer, String> {
        let listener = TcpListener::bind(&self.bind_address)
            .map_err(|err| format!("Failed to bind local API {}: {err}", self.bind_address))?;
        listener
            .set_nonblocking(true)
            .map_err(|err| format!("Failed to configure local API socket: {err}"))?;

        let stop = Arc::new(AtomicBool::new(false));
        let stop_flag = stop.clone();
        let web_enabled = self.web_enabled;
        let refresh_ms = self.refresh_ms;
        let handle = thread::spawn(move || loop {
            if stop_flag.load(Ordering::SeqCst) {
                return;
            }
            match listener.accept() {
                Ok((mut stream, _)) => {
                    let _ = handle_connection(&mut stream, &runtime, web_enabled, refresh_ms);
                }
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                    thread::sleep(Duration::from_millis(100));
                }
                Err(_) => return,
            }
        });

        Ok(LocalApiServer {
            stop,
            handle: Some(handle),
        })
    }
}

impl Drop for LocalApiServer {
    fn drop(&mut self) {
        self.stop.store(true, Ordering::SeqCst);
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }
}

#[derive(Deserialize)]
struct ApprovalDecisionBody {
    id: String,
}

#[derive(Deserialize)]
struct ApprovalRequestBody {
    agent_id: String,
    action: String,
    details: Option<String>,
    kind: Option<String>,
}

#[derive(Deserialize)]
struct ExternalLogBody {
    agent_id: String,
    level: Option<String>,
    message: String,
}

#[derive(Deserialize)]
struct ExternalEventBody {
    agent_id: String,
    event_type: String,
    message: Option<String>,
}

#[derive(Deserialize)]
struct ChatSessionBody {
    session_id: Option<String>,
    provider: Option<String>,
    model: Option<String>,
    system_prompt: Option<String>,
}

#[derive(Deserialize)]
struct ChatMessageBody {
    session_id: String,
    content: String,
}

#[derive(Deserialize)]
struct ChatAgentBody {
    session_id: String,
    agent_id: Option<String>,
}

fn handle_connection(
    stream: &mut TcpStream,
    runtime: &Runtime,
    web_enabled: bool,
    refresh_ms: u64,
) -> Result<(), String> {
    let mut buffer = [0_u8; 64 * 1024];
    let read = stream
        .read(&mut buffer)
        .map_err(|err| format!("Failed to read local API request: {err}"))?;
    if read == 0 {
        return Ok(());
    }

    let request = String::from_utf8_lossy(&buffer[..read]).to_string();
    let (head, body) = request.split_once("\r\n\r\n").unwrap_or((&request, ""));
    let mut lines = head.lines();
    let request_line = lines
        .next()
        .ok_or_else(|| "Missing request line".to_string())?;
    let mut parts = request_line.split_whitespace();
    let method = parts.next().unwrap_or("");
    let path = parts.next().unwrap_or("/");
    let path_only = path.split('?').next().unwrap_or(path);

    if web_enabled {
        match (method, path_only) {
            ("GET", "/") => return write_html(stream, &web::index_html(refresh_ms)),
            ("GET", "/index.html") => return write_html(stream, &web::index_html(refresh_ms)),
            ("GET", "/stream") => return write_snapshot_stream(stream, runtime, refresh_ms),
            _ => {}
        }
    }

    if let Some(agent_id) = path_only
        .strip_prefix("/agents/")
        .and_then(|tail| tail.split('/').next())
    {
        let suffix = path_only.trim_start_matches("/agents/").trim_start_matches(agent_id);
        return match (method, suffix) {
            ("GET", "") => {
                if let Some(agent) = runtime.agent(agent_id) {
                    write_json(stream, &agent)
                } else {
                    write_text(stream, "404 Not Found", "agent not found")
                }
            }
            ("POST", "/start") => {
                runtime.start_agent(agent_id)?;
                write_text(stream, "200 OK", "started")
            }
            ("POST", "/pause") => {
                runtime.pause_agent(agent_id)?;
                write_text(stream, "200 OK", "paused")
            }
            ("POST", "/resume") => {
                runtime.resume_agent(agent_id)?;
                write_text(stream, "200 OK", "resumed")
            }
            ("POST", "/stop") => {
                runtime.stop_agent(agent_id)?;
                write_text(stream, "200 OK", "stopped")
            }
            _ => write_text(stream, "404 Not Found", "not found"),
        };
    }

    match (method, path_only) {
        ("GET", "/agents") => write_json(stream, &runtime.snapshot().agents),
        ("GET", "/logs") => write_json(stream, &runtime.snapshot().logs),
        ("GET", "/events") => write_json(stream, &runtime.snapshot().events),
        ("GET", "/approvals") => write_json(stream, &runtime.snapshot().approvals),
        ("GET", "/chat/session") => write_json(stream, &runtime.chat_sessions_payload()),
        ("POST", "/approve") => {
            let payload: ApprovalDecisionBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid /approve payload: {err}"))?;
            runtime.approve_request(&payload.id)?;
            write_text(stream, "200 OK", "approved")
        }
        ("POST", "/deny") => {
            let payload: ApprovalDecisionBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid /deny payload: {err}"))?;
            runtime.deny_request(&payload.id)?;
            write_text(stream, "200 OK", "denied")
        }
        ("POST", "/approvals/request") => {
            let payload: ApprovalRequestBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid approval request payload: {err}"))?;
            let request = runtime.add_approval_request(
                payload.agent_id,
                parse_approval_kind(payload.kind.as_deref()),
                payload.action,
                payload.details,
            );
            write_json(stream, &request)
        }
        ("POST", "/logs") => {
            let payload: ExternalLogBody =
                serde_json::from_str(body).map_err(|err| format!("Invalid log payload: {err}"))?;
            runtime.append_external_log(
                payload.agent_id,
                payload.level.unwrap_or_else(|| "info".to_string()),
                payload.message,
            );
            write_text(stream, "200 OK", "logged")
        }
        ("POST", "/events") => {
            let payload: ExternalEventBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid event payload: {err}"))?;
            runtime.append_external_event(event_from_body(payload));
            write_text(stream, "200 OK", "event recorded")
        }
        ("POST", "/hook/event") => {
            let payload: HookEventPayload = serde_json::from_str(body)
                .map_err(|err| format!("Invalid hook event payload: {err}"))?;
            runtime.ingest_hook_event(payload)?;
            write_text(stream, "200 OK", "hook event recorded")
        }
        ("POST", "/chat/session") => {
            let payload: ChatSessionBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid chat session payload: {err}"))?;
            let session_id = runtime.ensure_chat_session(
                payload.session_id,
                payload.provider.as_deref().map(ProviderKind::from_label),
                payload.model,
                payload.system_prompt,
            )?;
            let session = runtime
                .chat_sessions_payload()
                .sessions
                .into_iter()
                .find(|session| session.id() == session_id)
                .ok_or_else(|| "Chat session was not created.".to_string())?;
            write_json(stream, &session)
        }
        ("POST", "/chat/message") => {
            let payload: ChatMessageBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid chat message payload: {err}"))?;
            runtime.send_chat_message(&payload.session_id, payload.content)?;
            write_text(stream, "200 OK", "queued")
        }
        ("POST", "/chat/send-to-agent") => {
            let payload: ChatAgentBody = serde_json::from_str(body)
                .map_err(|err| format!("Invalid send-to-agent payload: {err}"))?;
            let task = runtime.send_chat_to_agent(&payload.session_id, payload.agent_id)?;
            write_json(stream, &task)
        }
        _ => write_text(stream, "404 Not Found", "not found"),
    }
}

fn write_snapshot_stream(
    stream: &mut TcpStream,
    runtime: &Runtime,
    refresh_ms: u64,
) -> Result<(), String> {
    write!(
        stream,
        "HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nCache-Control: no-cache\r\nConnection: keep-alive\r\nAccess-Control-Allow-Origin: *\r\n\r\n"
    )
    .map_err(|err| format!("Failed to start live stream response: {err}"))?;
    stream
        .flush()
        .map_err(|err| format!("Failed to flush live stream headers: {err}"))?;

    let interval = Duration::from_millis(refresh_ms.max(100));
    loop {
        let payload = serde_json::to_string(&snapshot_payload(runtime))
            .map_err(|err| format!("Failed to encode live stream payload: {err}"))?;
        if write!(stream, "event: snapshot\r\ndata: {payload}\r\n\r\n")
            .and_then(|_| stream.flush())
            .is_err()
        {
            return Ok(());
        }
        thread::sleep(interval);
    }
}

fn snapshot_payload(runtime: &Runtime) -> serde_json::Value {
    let snapshot = runtime.snapshot();
    let chat = runtime.chat_sessions_payload();
    json!({
        "agents": snapshot.agents,
        "approvals": snapshot.approvals,
        "logs": snapshot.logs,
        "events": snapshot.events,
        "chat": chat,
    })
}

fn parse_approval_kind(value: Option<&str>) -> ApprovalKind {
    match value.unwrap_or("other") {
        "shell_command" => ApprovalKind::ShellCommand,
        "file_write" => ApprovalKind::FileWrite,
        "network_call" => ApprovalKind::NetworkCall,
        "package_install" => ApprovalKind::PackageInstall,
        _ => ApprovalKind::Other,
    }
}

fn event_from_body(payload: ExternalEventBody) -> RuntimeEvent {
    match payload.event_type.as_str() {
        "step_started" => RuntimeEvent::StepStarted {
            agent_id: payload.agent_id,
            step: payload.message.unwrap_or_else(|| "step".to_string()),
        },
        "tool_called" => RuntimeEvent::ToolCalled {
            agent_id: payload.agent_id,
            tool: payload.message.unwrap_or_else(|| "tool".to_string()),
        },
        "waiting_approval" => RuntimeEvent::WaitingApproval {
            agent_id: payload.agent_id,
            request_id: payload.message.unwrap_or_else(|| "request".to_string()),
        },
        "finished" => RuntimeEvent::Finished {
            agent_id: payload.agent_id,
        },
        "error" => RuntimeEvent::Error {
            agent_id: payload.agent_id,
            message: payload.message.unwrap_or_else(|| "error".to_string()),
        },
        _ => RuntimeEvent::OutputStream {
            agent_id: payload.agent_id,
            chunk: payload.message.unwrap_or_else(|| "event".to_string()),
        },
    }
}

fn write_json<T: serde::Serialize>(stream: &mut TcpStream, value: &T) -> Result<(), String> {
    let body =
        serde_json::to_string_pretty(value).map_err(|err| format!("JSON encode failed: {err}"))?;
    write_response(stream, "200 OK", "application/json; charset=utf-8", &body)
}

fn write_text(stream: &mut TcpStream, status: &str, body: &str) -> Result<(), String> {
    write_response(stream, status, "text/plain; charset=utf-8", body)
}

fn write_html(stream: &mut TcpStream, body: &str) -> Result<(), String> {
    write_response(stream, "200 OK", "text/html; charset=utf-8", body)
}

fn write_response(
    stream: &mut TcpStream,
    status: &str,
    content_type: &str,
    body: &str,
) -> Result<(), String> {
    let response = format!(
        "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nCache-Control: no-store\r\nConnection: close\r\n\r\n{}",
        body.len(),
        body
    );
    stream
        .write_all(response.as_bytes())
        .map_err(|err| format!("Failed to write local API response: {err}"))
}