Skip to main content

daemon/
http_listener.rs

1//! marshal's own minimal plain-HTTP listener for the Claude Code
2//! `/hook/*` endpoints.
3//!
4//! Deliberately separate from myko's listener. Stock myko serves
5//! `/myko/mcp` (WS + HTTP MCP) on its own port and we do **not** fork it
6//! to bolt on extra routes. The hooks need a dumb plain-HTTP endpoint a
7//! curl one-liner POSTs raw hook JSON to, so marshal binds a second port
8//! and shares the server's `Arc<CellServerCtx>` — same registry, same
9//! event log, no second source of truth.
10//!
11//! The HTTP handling is intentionally tiny: HTTP/1.1, `Content-Length`
12//! bodies only (curl `--data-binary @-` always sends one), one request
13//! per connection, `Connection: close`. Hook bodies are a few hundred
14//! bytes; anything fancier would be a dependency we don't need.
15
16use std::{net::SocketAddr, sync::Arc};
17
18use myko::server::CellServerCtx;
19use tokio::{
20    io::{AsyncReadExt, AsyncWriteExt},
21    net::{TcpListener, TcpStream},
22};
23
24use crate::hooks;
25
26/// Hard cap on a hook request body. Claude Code hook payloads are small;
27/// this only exists so a malformed/hostile `Content-Length` can't make us
28/// allocate unboundedly.
29const MAX_BODY: usize = 256 * 1024;
30
31/// Run the hook listener forever. Spawn on a tokio task.
32pub async fn run(bind: SocketAddr, ctx: Arc<CellServerCtx>) -> std::io::Result<()> {
33    let listener = TcpListener::bind(bind).await?;
34    log::info!("marshal hook listener on http://{bind} (/hook/*)");
35    loop {
36        let (stream, _peer) = match listener.accept().await {
37            Ok(pair) => pair,
38            Err(e) => {
39                log::warn!("[hook] accept error: {e}");
40                continue;
41            }
42        };
43        let ctx = ctx.clone();
44        tokio::spawn(async move {
45            if let Err(e) = handle_conn(stream, ctx).await {
46                log::debug!("[hook] connection error: {e}");
47            }
48        });
49    }
50}
51
52async fn handle_conn(mut stream: TcpStream, ctx: Arc<CellServerCtx>) -> std::io::Result<()> {
53    let mut buf: Vec<u8> = Vec::with_capacity(2048);
54    let mut tmp = [0u8; 2048];
55
56    // Read until the end of headers.
57    let header_end = loop {
58        if let Some(pos) = find(&buf, b"\r\n\r\n") {
59            break pos;
60        }
61        let n = stream.read(&mut tmp).await?;
62        if n == 0 {
63            // Connection closed before a complete header block.
64            return Ok(());
65        }
66        buf.extend_from_slice(&tmp[..n]);
67        if buf.len() > MAX_BODY + 8192 {
68            return write_response(&mut stream, 413, "payload too large").await;
69        }
70    };
71
72    let head = String::from_utf8_lossy(&buf[..header_end]).into_owned();
73    let mut lines = head.split("\r\n");
74    let request_line = lines.next().unwrap_or("");
75    let mut rl = request_line.split_whitespace();
76    let method = rl.next().unwrap_or("");
77    let target = rl.next().unwrap_or("");
78
79    let content_length = lines
80        .filter_map(|l| l.split_once(':'))
81        .find(|(k, _)| k.trim().eq_ignore_ascii_case("content-length"))
82        .and_then(|(_, v)| v.trim().parse::<usize>().ok())
83        .unwrap_or(0)
84        .min(MAX_BODY);
85
86    // Body bytes already buffered past the header terminator, plus any
87    // remainder up to Content-Length.
88    let body_start = header_end + 4;
89    let mut body: Vec<u8> = buf
90        .get(body_start..)
91        .map(|s| s.to_vec())
92        .unwrap_or_default();
93    while body.len() < content_length {
94        let n = stream.read(&mut tmp).await?;
95        if n == 0 {
96            break;
97        }
98        body.extend_from_slice(&tmp[..n]);
99    }
100    body.truncate(content_length);
101
102    let (path, query) = match target.split_once('?') {
103        Some((p, q)) => (p, q),
104        None => (target, ""),
105    };
106
107    if method != "POST" {
108        return write_response(&mut stream, 405, "method not allowed").await;
109    }
110
111    match hooks::dispatch(path, query, &body, &ctx) {
112        Some(text) => write_response(&mut stream, 200, &text).await,
113        None => write_response(&mut stream, 404, "not found").await,
114    }
115}
116
117async fn write_response(stream: &mut TcpStream, status: u16, body: &str) -> std::io::Result<()> {
118    let reason = match status {
119        200 => "OK",
120        404 => "Not Found",
121        405 => "Method Not Allowed",
122        413 => "Payload Too Large",
123        _ => "OK",
124    };
125    let head = format!(
126        "HTTP/1.1 {status} {reason}\r\n\
127         Content-Type: text/plain; charset=utf-8\r\n\
128         Content-Length: {}\r\n\
129         Connection: close\r\n\r\n",
130        body.len(),
131    );
132    stream.write_all(head.as_bytes()).await?;
133    stream.write_all(body.as_bytes()).await?;
134    stream.flush().await?;
135    let _ = stream.shutdown().await;
136    Ok(())
137}
138
139/// First index of `needle` in `haystack`, or `None`.
140fn find(haystack: &[u8], needle: &[u8]) -> Option<usize> {
141    if needle.is_empty() || haystack.len() < needle.len() {
142        return None;
143    }
144    haystack.windows(needle.len()).position(|w| w == needle)
145}