kap 0.0.1-pre10

Run AI agents in secure capsules. Built on devcontainers with network controls and remote access.
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use futures_util::SinkExt;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::{Request, Response, StatusCode};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::handshake::derive_accept_key;
use tokio_tungstenite::tungstenite::protocol::Role;

use super::RemoteState;
use crate::remote::containers;

type Body = Full<Bytes>;
type WsStream = WebSocketStream<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>>;

/// Handle WebSocket upgrade requests.
pub async fn handle(req: Request<Incoming>, state: Arc<RemoteState>) -> Result<Response<Body>> {
    let path = req.uri().path().to_string();
    let project = extract_project(&req);

    match path.as_str() {
        "/ws/logs" => ws_upgrade(req, move |ws| stream_logs(ws, project)),
        p if p.starts_with("/ws/agent/") => {
            let session_id = p["/ws/agent/".len()..].to_string();
            ws_upgrade(req, move |ws| stream_agent(ws, session_id, project))
        }
        _ => {
            let _ = state;
            Ok(Response::builder()
                .status(StatusCode::NOT_FOUND)
                .body(Full::new(Bytes::from("not found")))
                .unwrap())
        }
    }
}

fn extract_project(req: &Request<Incoming>) -> Option<String> {
    req.uri().query().and_then(|q| {
        q.split('&')
            .find_map(|p| p.strip_prefix("project=").map(|v| v.to_string()))
    })
}

/// Generic WebSocket upgrade that spawns a handler future.
fn ws_upgrade<F, Fut>(req: Request<Incoming>, handler: F) -> Result<Response<Body>>
where
    F: FnOnce(WsStream) -> Fut + Send + 'static,
    Fut: std::future::Future<Output = Result<()>> + Send,
{
    let ws_key = match req.headers().get("sec-websocket-key") {
        Some(key) => key.clone(),
        None => {
            return Ok(Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Full::new(Bytes::from("missing Sec-WebSocket-Key")))
                .unwrap());
        }
    };

    let accept_key = derive_accept_key(ws_key.as_bytes());

    tokio::task::spawn(async move {
        match hyper::upgrade::on(req).await {
            Ok(upgraded) => {
                let io = hyper_util::rt::TokioIo::new(upgraded);
                let ws = WebSocketStream::from_raw_socket(io, Role::Server, None).await;

                if let Err(e) = handler(ws).await {
                    eprintln!("[remote] ws handler error: {e}");
                }
            }
            Err(e) => {
                eprintln!("[remote] ws upgrade failed: {e}");
            }
        }
    });

    Ok(Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Accept", accept_key)
        .body(Full::new(Bytes::new()))
        .unwrap())
}

async fn stream_logs(mut ws: WsStream, project: Option<String>) -> Result<()> {
    let (_app, sidecar) = resolve_ws_containers(project.as_deref())?;

    let mut child = containers::exec_stream(
        &sidecar,
        &[
            "tail",
            "-f",
            "-n",
            "0", // no catch-up — REST handles initial load
            "/var/log/kap/proxy.jsonl",
        ],
    )
    .await?;

    let stdout = child
        .stdout
        .take()
        .ok_or_else(|| anyhow::anyhow!("no stdout from tail"))?;

    let reader = BufReader::new(stdout);
    let mut lines = reader.lines();

    while let Ok(Some(line)) = lines.next_line().await {
        if ws.send(Message::Text(line.into())).await.is_err() {
            break;
        }
    }

    let _ = child.kill().await;
    Ok(())
}

async fn stream_agent(mut ws: WsStream, session_id: String, project: Option<String>) -> Result<()> {
    let (app, _sidecar) = resolve_ws_containers(project.as_deref())?;

    // Find the session file path
    let path_output = containers::exec_in(
        &app,
        &[
            "sh",
            "-c",
            &format!(
                "find /home /root -name '{session_id}.jsonl' -path '*/.claude/projects/*' 2>/dev/null | head -1"
            ),
        ],
    )
    .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;

    let session_path = path_output.trim().to_string();
    if session_path.is_empty() {
        anyhow::bail!("session {session_id} not found");
    }

    let mut child = containers::exec_stream(
        &app,
        &[
            "tail",
            "-f",
            "-n",
            "50", // send last 50 lines as catch-up
            &session_path,
        ],
    )
    .await?;

    let stdout = child
        .stdout
        .take()
        .ok_or_else(|| anyhow::anyhow!("no stdout from tail"))?;

    let reader = BufReader::new(stdout);
    let mut lines = reader.lines();

    while let Ok(Some(line)) = lines.next_line().await {
        // Parse and filter like the agent module does, then send as JSON
        let events = crate::remote::agent::parse_session_events(&line);
        for event in events {
            if let Ok(json) = serde_json::to_string(&event)
                && ws.send(Message::Text(json.into())).await.is_err()
            {
                let _ = child.kill().await;
                return Ok(());
            }
        }
    }

    let _ = child.kill().await;
    Ok(())
}

fn resolve_ws_containers(project: Option<&str>) -> Result<(String, String)> {
    match project {
        Some(p) => containers::find_by_project(p),
        None => {
            let groups = containers::find_all_containers()?;
            match groups.len() {
                0 => anyhow::bail!("no running devcontainer found"),
                1 => Ok((groups[0].app.clone(), groups[0].sidecar.clone())),
                n => anyhow::bail!("{n} devcontainers running; specify &project=X"),
            }
        }
    }
}

#[cfg(test)]
mod tests {
    fn parse_project_from_query(query: &str) -> Option<String> {
        query
            .split('&')
            .find_map(|p| p.strip_prefix("project=").map(|v| v.to_string()))
    }

    #[test]
    fn extract_project_from_query_string() {
        assert_eq!(
            parse_project_from_query("token=abc&project=myproj"),
            Some("myproj".into())
        );
    }

    #[test]
    fn extract_project_missing() {
        assert_eq!(parse_project_from_query("token=abc"), None);
    }

    #[test]
    fn extract_project_first_param() {
        assert_eq!(
            parse_project_from_query("project=foo&token=abc"),
            Some("foo".into())
        );
    }

    #[test]
    fn extract_project_empty_value() {
        assert_eq!(parse_project_from_query("project="), Some("".into()));
    }
}