use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use futures_util::SinkExt;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::{Request, Response, StatusCode};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::handshake::derive_accept_key;
use tokio_tungstenite::tungstenite::protocol::Role;
use super::RemoteState;
use crate::remote::containers;
type Body = Full<Bytes>;
type WsStream = WebSocketStream<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>>;
pub async fn handle(req: Request<Incoming>, state: Arc<RemoteState>) -> Result<Response<Body>> {
let path = req.uri().path().to_string();
let project = extract_project(&req);
match path.as_str() {
"/ws/logs" => ws_upgrade(req, move |ws| stream_logs(ws, project)),
p if p.starts_with("/ws/agent/") => {
let session_id = p["/ws/agent/".len()..].to_string();
ws_upgrade(req, move |ws| stream_agent(ws, session_id, project))
}
_ => {
let _ = state;
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("not found")))
.unwrap())
}
}
}
fn extract_project(req: &Request<Incoming>) -> Option<String> {
req.uri().query().and_then(|q| {
q.split('&')
.find_map(|p| p.strip_prefix("project=").map(|v| v.to_string()))
})
}
fn ws_upgrade<F, Fut>(req: Request<Incoming>, handler: F) -> Result<Response<Body>>
where
F: FnOnce(WsStream) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<()>> + Send,
{
let ws_key = match req.headers().get("sec-websocket-key") {
Some(key) => key.clone(),
None => {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Full::new(Bytes::from("missing Sec-WebSocket-Key")))
.unwrap());
}
};
let accept_key = derive_accept_key(ws_key.as_bytes());
tokio::task::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
let io = hyper_util::rt::TokioIo::new(upgraded);
let ws = WebSocketStream::from_raw_socket(io, Role::Server, None).await;
if let Err(e) = handler(ws).await {
eprintln!("[remote] ws handler error: {e}");
}
}
Err(e) => {
eprintln!("[remote] ws upgrade failed: {e}");
}
}
});
Ok(Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Accept", accept_key)
.body(Full::new(Bytes::new()))
.unwrap())
}
async fn stream_logs(mut ws: WsStream, project: Option<String>) -> Result<()> {
let (_app, sidecar) = resolve_ws_containers(project.as_deref())?;
let mut child = containers::exec_stream(
&sidecar,
&[
"tail",
"-f",
"-n",
"0", "/var/log/kap/proxy.jsonl",
],
)
.await?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("no stdout from tail"))?;
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if ws.send(Message::Text(line.into())).await.is_err() {
break;
}
}
let _ = child.kill().await;
Ok(())
}
async fn stream_agent(mut ws: WsStream, session_id: String, project: Option<String>) -> Result<()> {
let (app, _sidecar) = resolve_ws_containers(project.as_deref())?;
let path_output = containers::exec_in(
&app,
&[
"sh",
"-c",
&format!(
"find /home /root -name '{session_id}.jsonl' -path '*/.claude/projects/*' 2>/dev/null | head -1"
),
],
)
.ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
let session_path = path_output.trim().to_string();
if session_path.is_empty() {
anyhow::bail!("session {session_id} not found");
}
let mut child = containers::exec_stream(
&app,
&[
"tail",
"-f",
"-n",
"50", &session_path,
],
)
.await?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("no stdout from tail"))?;
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let events = crate::remote::agent::parse_session_events(&line);
for event in events {
if let Ok(json) = serde_json::to_string(&event)
&& ws.send(Message::Text(json.into())).await.is_err()
{
let _ = child.kill().await;
return Ok(());
}
}
}
let _ = child.kill().await;
Ok(())
}
fn resolve_ws_containers(project: Option<&str>) -> Result<(String, String)> {
match project {
Some(p) => containers::find_by_project(p),
None => {
let groups = containers::find_all_containers()?;
match groups.len() {
0 => anyhow::bail!("no running devcontainer found"),
1 => Ok((groups[0].app.clone(), groups[0].sidecar.clone())),
n => anyhow::bail!("{n} devcontainers running; specify &project=X"),
}
}
}
}
#[cfg(test)]
mod tests {
fn parse_project_from_query(query: &str) -> Option<String> {
query
.split('&')
.find_map(|p| p.strip_prefix("project=").map(|v| v.to_string()))
}
#[test]
fn extract_project_from_query_string() {
assert_eq!(
parse_project_from_query("token=abc&project=myproj"),
Some("myproj".into())
);
}
#[test]
fn extract_project_missing() {
assert_eq!(parse_project_from_query("token=abc"), None);
}
#[test]
fn extract_project_first_param() {
assert_eq!(
parse_project_from_query("project=foo&token=abc"),
Some("foo".into())
);
}
#[test]
fn extract_project_empty_value() {
assert_eq!(parse_project_from_query("project="), Some("".into()));
}
}