Skip to main content

aver/services/
http_server.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Read, Write};
3use std::net::{TcpListener, TcpStream};
4use std::time::Duration;
5
6use crate::value::{RuntimeError, Value, list_from_vec, list_to_vec};
7
8#[derive(Debug, Clone)]
9struct ServerRequest {
10    method: String,
11    path: String,
12    body: String,
13    headers: Vec<(String, String)>,
14}
15
16#[derive(Debug, Clone)]
17struct ServerResponse {
18    status: i64,
19    body: String,
20    headers: Vec<(String, String)>,
21}
22
23pub fn register(global: &mut HashMap<String, Value>) {
24    let mut members = HashMap::new();
25    members.insert(
26        "listen".to_string(),
27        Value::Builtin("HttpServer.listen".to_string()),
28    );
29    members.insert(
30        "listenWith".to_string(),
31        Value::Builtin("HttpServer.listenWith".to_string()),
32    );
33    global.insert(
34        "HttpServer".to_string(),
35        Value::Namespace {
36            name: "HttpServer".to_string(),
37            members,
38        },
39    );
40}
41
42pub fn effects(name: &str) -> &'static [&'static str] {
43    match name {
44        "HttpServer.listen" | "HttpServer.listenWith" => &["HttpServer"],
45        _ => &[],
46    }
47}
48
49pub fn call(_name: &str, _args: &[Value]) -> Option<Result<Value, RuntimeError>> {
50    None
51}
52
53pub fn call_with_runtime<F>(
54    name: &str,
55    args: &[Value],
56    mut invoke_handler: F,
57    skip_server: bool,
58) -> Option<Result<Value, RuntimeError>>
59where
60    F: FnMut(Value, Vec<Value>, String) -> Result<Value, RuntimeError>,
61{
62    match name {
63        "HttpServer.listen" => Some(listen(args, false, &mut invoke_handler, skip_server)),
64        "HttpServer.listenWith" => Some(listen(args, true, &mut invoke_handler, skip_server)),
65        _ => None,
66    }
67}
68
69fn listen<F>(
70    args: &[Value],
71    with_context: bool,
72    invoke_handler: &mut F,
73    skip_server: bool,
74) -> Result<Value, RuntimeError>
75where
76    F: FnMut(Value, Vec<Value>, String) -> Result<Value, RuntimeError>,
77{
78    let expected = if with_context { 3 } else { 2 };
79    if args.len() != expected {
80        let sig = if with_context {
81            "HttpServer.listenWith(port, context, handler)"
82        } else {
83            "HttpServer.listen(port, handler)"
84        };
85        return Err(RuntimeError::Error(format!(
86            "{} expects {} arguments, got {}",
87            sig,
88            expected,
89            args.len()
90        )));
91    }
92
93    if skip_server {
94        return Ok(Value::Unit);
95    }
96
97    let port = match &args[0] {
98        Value::Int(n) if (0..=65535).contains(n) => *n as u16,
99        Value::Int(n) => {
100            return Err(RuntimeError::Error(format!(
101                "HttpServer.listen: port {} is out of range (0-65535)",
102                n
103            )));
104        }
105        _ => {
106            return Err(RuntimeError::Error(
107                "HttpServer.listen: port must be an Int".to_string(),
108            ));
109        }
110    };
111    let (context, handler) = if with_context {
112        (Some(args[1].clone()), args[2].clone())
113    } else {
114        (None, args[1].clone())
115    };
116
117    let listener = TcpListener::bind(("0.0.0.0", port)).map_err(|e| {
118        RuntimeError::Error(format!(
119            "HttpServer.listen: failed to bind on {}: {}",
120            port, e
121        ))
122    })?;
123
124    for incoming in listener.incoming() {
125        let mut stream = match incoming {
126            Ok(s) => s,
127            Err(e) => {
128                return Err(RuntimeError::Error(format!(
129                    "HttpServer.listen: failed to accept connection: {}",
130                    e
131                )));
132            }
133        };
134
135        if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(30))) {
136            let _ = write_http_response(
137                &mut stream,
138                &ServerResponse {
139                    status: 500,
140                    body: format!("HttpServer: failed to set read timeout: {}", e),
141                    headers: vec![],
142                },
143            );
144            continue;
145        }
146        if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(30))) {
147            let _ = write_http_response(
148                &mut stream,
149                &ServerResponse {
150                    status: 500,
151                    body: format!("HttpServer: failed to set write timeout: {}", e),
152                    headers: vec![],
153                },
154            );
155            continue;
156        }
157
158        let request = match parse_http_request(&mut stream) {
159            Ok(req) => req,
160            Err(msg) => {
161                let _ = write_http_response(
162                    &mut stream,
163                    &ServerResponse {
164                        status: 400,
165                        body: format!("Bad Request: {}", msg),
166                        headers: vec![],
167                    },
168                );
169                continue;
170            }
171        };
172
173        let request_value = http_request_to_value(&request);
174        let callback_entry = format!("<HttpServer {} {}>", request.method, request.path);
175        let callback_args = match &context {
176            Some(ctx) => vec![ctx.clone(), request_value],
177            None => vec![request_value],
178        };
179        let callback_result = invoke_handler(handler.clone(), callback_args, callback_entry);
180
181        let response = match callback_result {
182            Ok(value) => match http_response_from_value(value) {
183                Ok(resp) => resp,
184                Err(e) => ServerResponse {
185                    status: 500,
186                    body: format!("HttpServer handler return error: {}", e),
187                    headers: vec![],
188                },
189            },
190            Err(e) => ServerResponse {
191                status: 500,
192                body: format!("HttpServer handler execution error: {}", e),
193                headers: vec![],
194            },
195        };
196
197        let _ = write_http_response(&mut stream, &response);
198    }
199
200    Ok(Value::Unit)
201}
202
203fn parse_http_request(stream: &mut TcpStream) -> Result<ServerRequest, String> {
204    const BODY_LIMIT: usize = 10 * 1024 * 1024; // 10 MB
205
206    let reader_stream = stream
207        .try_clone()
208        .map_err(|e| format!("cannot clone TCP stream: {}", e))?;
209    let mut reader = BufReader::new(reader_stream);
210
211    let mut request_line = String::new();
212    let line_len = reader
213        .read_line(&mut request_line)
214        .map_err(|e| format!("cannot read request line: {}", e))?;
215    if line_len == 0 {
216        return Err("empty request".to_string());
217    }
218
219    let request_line = request_line.trim_end_matches(&['\r', '\n'][..]);
220    let mut request_parts = request_line.split_whitespace();
221    let method = request_parts
222        .next()
223        .ok_or_else(|| "missing HTTP method".to_string())?
224        .to_string();
225    let path = request_parts
226        .next()
227        .ok_or_else(|| "missing request path".to_string())?
228        .to_string();
229    let _version = request_parts
230        .next()
231        .ok_or_else(|| "missing HTTP version".to_string())?;
232
233    let mut headers = Vec::new();
234    let mut content_length = 0usize;
235
236    loop {
237        let mut line = String::new();
238        let bytes = reader
239            .read_line(&mut line)
240            .map_err(|e| format!("cannot read header line: {}", e))?;
241        if bytes == 0 {
242            break;
243        }
244
245        let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
246        if trimmed.is_empty() {
247            break;
248        }
249
250        let (name, value) = trimmed
251            .split_once(':')
252            .ok_or_else(|| format!("malformed header: '{}'", trimmed))?;
253        let name = name.trim().to_string();
254        let value = value.trim().to_string();
255
256        if name.eq_ignore_ascii_case("Content-Length") {
257            content_length = value
258                .parse::<usize>()
259                .map_err(|_| format!("invalid Content-Length value: '{}'", value))?;
260            if content_length > BODY_LIMIT {
261                return Err(format!("request body exceeds {} bytes limit", BODY_LIMIT));
262            }
263        }
264
265        headers.push((name, value));
266    }
267
268    let mut body_bytes = vec![0_u8; content_length];
269    if content_length > 0 {
270        reader
271            .read_exact(&mut body_bytes)
272            .map_err(|e| format!("cannot read request body: {}", e))?;
273    }
274    let body = String::from_utf8_lossy(&body_bytes).into_owned();
275
276    Ok(ServerRequest {
277        method,
278        path,
279        body,
280        headers,
281    })
282}
283
284fn http_request_to_value(req: &ServerRequest) -> Value {
285    let headers = req
286        .headers
287        .iter()
288        .map(|(name, value)| Value::Record {
289            type_name: "Header".to_string(),
290            fields: vec![
291                ("name".to_string(), Value::Str(name.clone())),
292                ("value".to_string(), Value::Str(value.clone())),
293            ],
294        })
295        .collect::<Vec<_>>();
296
297    Value::Record {
298        type_name: "HttpRequest".to_string(),
299        fields: vec![
300            ("method".to_string(), Value::Str(req.method.clone())),
301            ("path".to_string(), Value::Str(req.path.clone())),
302            ("body".to_string(), Value::Str(req.body.clone())),
303            ("headers".to_string(), list_from_vec(headers)),
304        ],
305    }
306}
307
308fn http_response_from_value(val: Value) -> Result<ServerResponse, RuntimeError> {
309    let (type_name, fields) = match val {
310        Value::Record { type_name, fields } => (type_name, fields),
311        _ => {
312            return Err(RuntimeError::Error(
313                "HttpServer handler must return HttpResponse record".to_string(),
314            ));
315        }
316    };
317
318    if type_name != "HttpResponse" {
319        return Err(RuntimeError::Error(format!(
320            "HttpServer handler must return HttpResponse, got {}",
321            type_name
322        )));
323    }
324
325    let mut status = None;
326    let mut body = None;
327    let mut headers = Vec::new();
328
329    for (name, value) in fields {
330        match name.as_str() {
331            "status" => {
332                if let Value::Int(n) = value {
333                    status = Some(n);
334                } else {
335                    return Err(RuntimeError::Error(
336                        "HttpResponse.status must be Int".to_string(),
337                    ));
338                }
339            }
340            "body" => {
341                if let Value::Str(s) = value {
342                    body = Some(s);
343                } else {
344                    return Err(RuntimeError::Error(
345                        "HttpResponse.body must be String".to_string(),
346                    ));
347                }
348            }
349            "headers" => {
350                headers = parse_http_response_headers(value)?;
351            }
352            _ => {}
353        }
354    }
355
356    Ok(ServerResponse {
357        status: status
358            .ok_or_else(|| RuntimeError::Error("HttpResponse.status is required".to_string()))?,
359        body: body
360            .ok_or_else(|| RuntimeError::Error("HttpResponse.body is required".to_string()))?,
361        headers,
362    })
363}
364
365fn parse_http_response_headers(val: Value) -> Result<Vec<(String, String)>, RuntimeError> {
366    let list = list_to_vec(&val).ok_or_else(|| {
367        RuntimeError::Error("HttpResponse.headers must be List<Header>".to_string())
368    })?;
369
370    let mut out = Vec::new();
371    for item in list {
372        let fields = match item {
373            Value::Record { fields, .. } => fields,
374            _ => {
375                return Err(RuntimeError::Error(
376                    "HttpResponse.headers entries must be Header records".to_string(),
377                ));
378            }
379        };
380
381        let mut name = None;
382        let mut value = None;
383        for (field_name, field_val) in fields {
384            match (field_name.as_str(), field_val) {
385                ("name", Value::Str(s)) => name = Some(s),
386                ("value", Value::Str(s)) => value = Some(s),
387                _ => {}
388            }
389        }
390
391        let name = name.ok_or_else(|| {
392            RuntimeError::Error("HttpResponse header missing String 'name'".to_string())
393        })?;
394        let value = value.ok_or_else(|| {
395            RuntimeError::Error("HttpResponse header missing String 'value'".to_string())
396        })?;
397        out.push((name, value));
398    }
399
400    Ok(out)
401}
402
403fn status_reason(status: i64) -> &'static str {
404    match status {
405        200 => "OK",
406        201 => "Created",
407        204 => "No Content",
408        301 => "Moved Permanently",
409        302 => "Found",
410        304 => "Not Modified",
411        400 => "Bad Request",
412        401 => "Unauthorized",
413        403 => "Forbidden",
414        404 => "Not Found",
415        405 => "Method Not Allowed",
416        409 => "Conflict",
417        422 => "Unprocessable Entity",
418        429 => "Too Many Requests",
419        500 => "Internal Server Error",
420        501 => "Not Implemented",
421        502 => "Bad Gateway",
422        503 => "Service Unavailable",
423        _ => "OK",
424    }
425}
426
427fn write_http_response(stream: &mut TcpStream, response: &ServerResponse) -> std::io::Result<()> {
428    let mut headers = response
429        .headers
430        .iter()
431        .filter(|(name, _)| {
432            !name.eq_ignore_ascii_case("Content-Length") && !name.eq_ignore_ascii_case("Connection")
433        })
434        .cloned()
435        .collect::<Vec<_>>();
436
437    if !headers
438        .iter()
439        .any(|(name, _)| name.eq_ignore_ascii_case("Content-Type"))
440    {
441        headers.push((
442            "Content-Type".to_string(),
443            "text/plain; charset=utf-8".to_string(),
444        ));
445    }
446
447    headers.push((
448        "Content-Length".to_string(),
449        response.body.len().to_string(),
450    ));
451    headers.push(("Connection".to_string(), "close".to_string()));
452
453    let mut head = format!(
454        "HTTP/1.1 {} {}\r\n",
455        response.status,
456        status_reason(response.status)
457    );
458    for (name, value) in headers {
459        head.push_str(&format!("{}: {}\r\n", name, value));
460    }
461    head.push_str("\r\n");
462
463    stream.write_all(head.as_bytes())?;
464    stream.write_all(response.body.as_bytes())?;
465    stream.flush()?;
466    Ok(())
467}