1use std::{net::SocketAddr, sync::Arc};
17
18use myko::server::CellServerCtx;
19use tokio::{
20 io::{AsyncReadExt, AsyncWriteExt},
21 net::{TcpListener, TcpStream},
22};
23
24use crate::hooks;
25
26const MAX_BODY: usize = 256 * 1024;
30
31pub async fn run(bind: SocketAddr, ctx: Arc<CellServerCtx>) -> std::io::Result<()> {
33 let listener = TcpListener::bind(bind).await?;
34 log::info!("marshal hook listener on http://{bind} (/hook/*)");
35 loop {
36 let (stream, _peer) = match listener.accept().await {
37 Ok(pair) => pair,
38 Err(e) => {
39 log::warn!("[hook] accept error: {e}");
40 continue;
41 }
42 };
43 let ctx = ctx.clone();
44 tokio::spawn(async move {
45 if let Err(e) = handle_conn(stream, ctx).await {
46 log::debug!("[hook] connection error: {e}");
47 }
48 });
49 }
50}
51
52async fn handle_conn(mut stream: TcpStream, ctx: Arc<CellServerCtx>) -> std::io::Result<()> {
53 let mut buf: Vec<u8> = Vec::with_capacity(2048);
54 let mut tmp = [0u8; 2048];
55
56 let header_end = loop {
58 if let Some(pos) = find(&buf, b"\r\n\r\n") {
59 break pos;
60 }
61 let n = stream.read(&mut tmp).await?;
62 if n == 0 {
63 return Ok(());
65 }
66 buf.extend_from_slice(&tmp[..n]);
67 if buf.len() > MAX_BODY + 8192 {
68 return write_response(&mut stream, 413, "payload too large").await;
69 }
70 };
71
72 let head = String::from_utf8_lossy(&buf[..header_end]).into_owned();
73 let mut lines = head.split("\r\n");
74 let request_line = lines.next().unwrap_or("");
75 let mut rl = request_line.split_whitespace();
76 let method = rl.next().unwrap_or("");
77 let target = rl.next().unwrap_or("");
78
79 let content_length = lines
80 .filter_map(|l| l.split_once(':'))
81 .find(|(k, _)| k.trim().eq_ignore_ascii_case("content-length"))
82 .and_then(|(_, v)| v.trim().parse::<usize>().ok())
83 .unwrap_or(0)
84 .min(MAX_BODY);
85
86 let body_start = header_end + 4;
89 let mut body: Vec<u8> = buf
90 .get(body_start..)
91 .map(|s| s.to_vec())
92 .unwrap_or_default();
93 while body.len() < content_length {
94 let n = stream.read(&mut tmp).await?;
95 if n == 0 {
96 break;
97 }
98 body.extend_from_slice(&tmp[..n]);
99 }
100 body.truncate(content_length);
101
102 let (path, query) = match target.split_once('?') {
103 Some((p, q)) => (p, q),
104 None => (target, ""),
105 };
106
107 if method != "POST" {
108 return write_response(&mut stream, 405, "method not allowed").await;
109 }
110
111 match hooks::dispatch(path, query, &body, &ctx) {
112 Some(text) => write_response(&mut stream, 200, &text).await,
113 None => write_response(&mut stream, 404, "not found").await,
114 }
115}
116
117async fn write_response(stream: &mut TcpStream, status: u16, body: &str) -> std::io::Result<()> {
118 let reason = match status {
119 200 => "OK",
120 404 => "Not Found",
121 405 => "Method Not Allowed",
122 413 => "Payload Too Large",
123 _ => "OK",
124 };
125 let head = format!(
126 "HTTP/1.1 {status} {reason}\r\n\
127 Content-Type: text/plain; charset=utf-8\r\n\
128 Content-Length: {}\r\n\
129 Connection: close\r\n\r\n",
130 body.len(),
131 );
132 stream.write_all(head.as_bytes()).await?;
133 stream.write_all(body.as_bytes()).await?;
134 stream.flush().await?;
135 let _ = stream.shutdown().await;
136 Ok(())
137}
138
139fn find(haystack: &[u8], needle: &[u8]) -> Option<usize> {
141 if needle.is_empty() || haystack.len() < needle.len() {
142 return None;
143 }
144 haystack.windows(needle.len()).position(|w| w == needle)
145}