Skip to main content

harn_vm/
http.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::io::Write;
4use std::net::{Shutdown, TcpListener, TcpStream};
5use std::rc::Rc;
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    mpsc, Arc, RwLock,
9};
10use std::thread;
11use std::time::{Duration, SystemTime};
12
13use crate::value::{VmClosure, VmError, VmValue};
14use crate::vm::Vm;
15
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest_eventsource::{Event as SseEvent, EventSource};
19use sha2::{Digest, Sha256};
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
22use tokio_tungstenite::tungstenite::http::{HeaderValue, StatusCode};
23use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
24use tokio_tungstenite::tungstenite::protocol::CloseFrame;
25use tokio_tungstenite::tungstenite::Message as WsMessage;
26use x509_parser::prelude::{FromDer, X509Certificate};
27
28mod mock;
29
30use mock::{
31    clear_http_mocks, consume_http_mock, http_mock_calls_value, parse_mock_responses,
32    register_http_mock, reset_http_mocks, url_matches,
33};
34pub use mock::{http_mock_calls_snapshot, push_http_mock, HttpMockCallSnapshot, HttpMockResponse};
35#[cfg(test)]
36use mock::{mock_call_headers_value, redact_mock_call_url};
37
38#[derive(Clone)]
39struct RetryConfig {
40    max: u32,
41    backoff_ms: u64,
42    retryable_statuses: Vec<u16>,
43    retryable_methods: Vec<String>,
44    respect_retry_after: bool,
45}
46
47#[derive(Clone)]
48struct HttpRequestConfig {
49    total_timeout_ms: u64,
50    connect_timeout_ms: Option<u64>,
51    read_timeout_ms: Option<u64>,
52    retry: RetryConfig,
53    follow_redirects: bool,
54    max_redirects: usize,
55    proxy: Option<HttpProxyConfig>,
56    tls: HttpTlsConfig,
57    decompress: bool,
58}
59
60#[derive(Clone, Default)]
61struct HttpTlsConfig {
62    ca_bundle_path: Option<String>,
63    client_cert_path: Option<String>,
64    client_key_path: Option<String>,
65    client_identity_path: Option<String>,
66    pinned_sha256: Vec<String>,
67}
68
69#[derive(Clone)]
70struct HttpProxyConfig {
71    url: String,
72    auth: Option<(String, String)>,
73    no_proxy: Option<String>,
74}
75
76#[derive(Clone)]
77struct HttpSession {
78    client: reqwest::Client,
79    options: BTreeMap<String, VmValue>,
80}
81
82struct HttpRequestParts {
83    method: reqwest::Method,
84    headers: reqwest::header::HeaderMap,
85    recorded_headers: BTreeMap<String, VmValue>,
86    body: Option<String>,
87    multipart: Option<MultipartRequest>,
88}
89
90#[derive(Clone)]
91struct MultipartRequest {
92    parts: Vec<MultipartField>,
93    mock_body: String,
94}
95
96#[derive(Clone)]
97struct MultipartField {
98    name: String,
99    value: Vec<u8>,
100    filename: Option<String>,
101    content_type: Option<String>,
102}
103
104struct HttpStreamHandle {
105    kind: HttpStreamKind,
106    status: i64,
107    headers: BTreeMap<String, VmValue>,
108    pending: VecDeque<u8>,
109    closed: bool,
110}
111
112enum HttpStreamKind {
113    Real(Rc<tokio::sync::Mutex<reqwest::Response>>),
114    Fake,
115}
116
117struct SseMock {
118    url_pattern: String,
119    events: Vec<MockStreamEvent>,
120}
121
122#[derive(Clone)]
123struct MockStreamEvent {
124    event_type: String,
125    data: String,
126    id: Option<String>,
127    retry_ms: Option<i64>,
128}
129
130struct SseHandle {
131    kind: SseHandleKind,
132    url: String,
133    max_events: usize,
134    max_message_bytes: usize,
135    received: usize,
136}
137
138enum SseHandleKind {
139    Real(Rc<tokio::sync::Mutex<EventSource>>),
140    Fake(Rc<tokio::sync::Mutex<FakeSseStream>>),
141}
142
143struct FakeSseStream {
144    events: VecDeque<MockStreamEvent>,
145    opened: bool,
146    closed: bool,
147}
148
149struct SseServerHandle {
150    status: i64,
151    headers: BTreeMap<String, VmValue>,
152    frames: VecDeque<String>,
153    max_event_bytes: usize,
154    max_buffered_events: usize,
155    sent_events: usize,
156    flushed_events: usize,
157    closed: bool,
158    disconnected: bool,
159    cancelled: bool,
160    cancel_reason: Option<String>,
161}
162
163struct WebSocketMock {
164    url_pattern: String,
165    messages: Vec<MockWsMessage>,
166    echo: bool,
167}
168
169#[derive(Clone)]
170struct MockWsMessage {
171    message_type: String,
172    data: Vec<u8>,
173    close_code: Option<u16>,
174    close_reason: Option<String>,
175}
176
177type RealWebSocket =
178    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
179
180struct WebSocketHandle {
181    kind: WebSocketHandleKind,
182    url: String,
183    max_messages: usize,
184    max_message_bytes: usize,
185    received: usize,
186}
187
188enum WebSocketHandleKind {
189    Real(Rc<tokio::sync::Mutex<RealWebSocket>>),
190    Fake(Rc<tokio::sync::Mutex<FakeWebSocket>>),
191    Server(Rc<tokio::sync::Mutex<ServerWebSocket>>),
192}
193
194struct FakeWebSocket {
195    messages: VecDeque<MockWsMessage>,
196    echo: bool,
197    closed: bool,
198}
199
200struct WebSocketServer {
201    addr: String,
202    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
203    events: Rc<tokio::sync::Mutex<mpsc::Receiver<WebSocketServerEvent>>>,
204    running: Arc<AtomicBool>,
205}
206
207#[derive(Clone)]
208struct WebSocketRoute {
209    path: String,
210    bearer_token: Option<String>,
211    max_messages: usize,
212    max_message_bytes: usize,
213    send_buffer_messages: usize,
214    idle_timeout_ms: u64,
215}
216
217struct WebSocketServerEvent {
218    handle: ServerWebSocket,
219    path: String,
220    peer: String,
221    headers: BTreeMap<String, String>,
222    max_messages: usize,
223    max_message_bytes: usize,
224}
225
226struct ServerWebSocket {
227    incoming: VecDeque<MockWsMessage>,
228    incoming_rx: mpsc::Receiver<MockWsMessage>,
229    outgoing_tx: mpsc::SyncSender<ServerWebSocketCommand>,
230    closed: bool,
231}
232
233enum ServerWebSocketCommand {
234    Send(MockWsMessage),
235    Close(Option<u16>, Option<String>),
236}
237
238#[derive(Clone)]
239struct TransportMockCall {
240    kind: String,
241    handle: Option<String>,
242    url: String,
243    message_type: Option<String>,
244    data: Option<String>,
245}
246
247#[derive(Clone)]
248struct HttpServerRoute {
249    method: String,
250    template: String,
251    handler: Rc<VmClosure>,
252    max_body_bytes: Option<usize>,
253    retain_raw_body: Option<bool>,
254}
255
256#[derive(Clone)]
257struct HttpServer {
258    routes: Vec<HttpServerRoute>,
259    before: Vec<Rc<VmClosure>>,
260    after: Vec<Rc<VmClosure>>,
261    ready: bool,
262    readiness: Option<Rc<VmClosure>>,
263    shutdown_hooks: Vec<Rc<VmClosure>>,
264    shutdown: bool,
265    max_body_bytes: usize,
266    retain_raw_body: bool,
267}
268
269const DEFAULT_TIMEOUT_MS: u64 = 30_000;
270const DEFAULT_BACKOFF_MS: u64 = 1_000;
271const MAX_RETRY_DELAY_MS: u64 = 60_000;
272const DEFAULT_RETRYABLE_STATUSES: [u16; 6] = [408, 429, 500, 502, 503, 504];
273const DEFAULT_RETRYABLE_METHODS: [&str; 5] = ["GET", "HEAD", "PUT", "DELETE", "OPTIONS"];
274const DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS: u64 = 30_000;
275const DEFAULT_MAX_STREAM_EVENTS: usize = 10_000;
276const DEFAULT_MAX_MESSAGE_BYTES: usize = 1024 * 1024;
277const DEFAULT_SERVER_MAX_BODY_BYTES: usize = 1024 * 1024;
278const DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS: u64 = 30_000;
279const MAX_HTTP_SESSIONS: usize = 64;
280const MAX_HTTP_STREAMS: usize = 64;
281const MAX_SSE_STREAMS: usize = 64;
282const MAX_SSE_SERVER_STREAMS: usize = 64;
283const MAX_WEBSOCKETS: usize = 64;
284const MULTIPART_MOCK_BOUNDARY: &str = "harn-boundary";
285const MAX_HTTP_SERVERS: usize = 128;
286const MAX_WEBSOCKET_SERVERS: usize = 16;
287
288thread_local! {
289    static HTTP_CLIENTS: RefCell<HashMap<String, reqwest::Client>> = RefCell::new(HashMap::new());
290    static HTTP_SESSIONS: RefCell<HashMap<String, HttpSession>> = RefCell::new(HashMap::new());
291    static HTTP_STREAMS: RefCell<HashMap<String, HttpStreamHandle>> = RefCell::new(HashMap::new());
292    static SSE_MOCKS: RefCell<Vec<SseMock>> = const { RefCell::new(Vec::new()) };
293    static SSE_HANDLES: RefCell<HashMap<String, SseHandle>> = RefCell::new(HashMap::new());
294    static SSE_SERVER_HANDLES: RefCell<HashMap<String, SseServerHandle>> = RefCell::new(HashMap::new());
295    static WEBSOCKET_MOCKS: RefCell<Vec<WebSocketMock>> = const { RefCell::new(Vec::new()) };
296    static WEBSOCKET_HANDLES: RefCell<HashMap<String, WebSocketHandle>> = RefCell::new(HashMap::new());
297    static WEBSOCKET_SERVERS: RefCell<HashMap<String, WebSocketServer>> = RefCell::new(HashMap::new());
298    static TRANSPORT_MOCK_CALLS: RefCell<Vec<TransportMockCall>> = const { RefCell::new(Vec::new()) };
299    static TRANSPORT_HANDLE_COUNTER: RefCell<u64> = const { RefCell::new(0) };
300    static HTTP_SERVERS: RefCell<HashMap<String, HttpServer>> = RefCell::new(HashMap::new());
301}
302
303/// Reset thread-local HTTP mock state. Call between test runs.
304pub fn reset_http_state() {
305    reset_http_mocks();
306    HTTP_CLIENTS.with(|clients| clients.borrow_mut().clear());
307    HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().clear());
308    HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
309    SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
310    SSE_HANDLES.with(|handles| {
311        for handle in handles.borrow_mut().values_mut() {
312            if let SseHandleKind::Real(stream) = &handle.kind {
313                if let Ok(mut stream) = stream.try_lock() {
314                    stream.close();
315                }
316            }
317        }
318        handles.borrow_mut().clear();
319    });
320    SSE_SERVER_HANDLES.with(|handles| handles.borrow_mut().clear());
321    WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
322    WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
323    WEBSOCKET_SERVERS.with(|servers| {
324        let mut servers = servers.borrow_mut();
325        for server in servers.values() {
326            server.running.store(false, Ordering::SeqCst);
327            let _ = TcpStream::connect(&server.addr);
328        }
329        servers.clear();
330    });
331    TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
332    TRANSPORT_HANDLE_COUNTER.with(|counter| *counter.borrow_mut() = 0);
333    HTTP_SERVERS.with(|servers| servers.borrow_mut().clear());
334}
335
336/// Build a standard HTTP response dict with status, headers, body, and ok fields.
337fn build_http_response(status: i64, headers: BTreeMap<String, VmValue>, body: String) -> VmValue {
338    let mut result = BTreeMap::new();
339    result.insert("status".to_string(), VmValue::Int(status));
340    result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
341    result.insert("body".to_string(), VmValue::String(Rc::from(body)));
342    result.insert(
343        "ok".to_string(),
344        VmValue::Bool((200..300).contains(&(status as u16))),
345    );
346    VmValue::Dict(Rc::new(result))
347}
348
349fn build_http_download_response(
350    status: i64,
351    headers: BTreeMap<String, VmValue>,
352    bytes_written: u64,
353) -> VmValue {
354    let mut result = BTreeMap::new();
355    result.insert("status".to_string(), VmValue::Int(status));
356    result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
357    result.insert(
358        "bytes_written".to_string(),
359        VmValue::Int(bytes_written as i64),
360    );
361    result.insert(
362        "ok".to_string(),
363        VmValue::Bool((200..300).contains(&(status as u16))),
364    );
365    VmValue::Dict(Rc::new(result))
366}
367
368fn response_headers(headers: &reqwest::header::HeaderMap) -> BTreeMap<String, VmValue> {
369    let mut resp_headers = BTreeMap::new();
370    for (name, value) in headers {
371        if let Ok(v) = value.to_str() {
372            resp_headers.insert(name.as_str().to_string(), VmValue::String(Rc::from(v)));
373        }
374    }
375    resp_headers
376}
377
378fn vm_error(message: impl Into<String>) -> VmError {
379    VmError::Thrown(VmValue::String(Rc::from(message.into())))
380}
381
382fn next_transport_handle(prefix: &str) -> String {
383    TRANSPORT_HANDLE_COUNTER.with(|counter| {
384        let mut counter = counter.borrow_mut();
385        *counter += 1;
386        format!("{prefix}-{}", *counter)
387    })
388}
389
390fn handle_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
391    match value {
392        VmValue::String(handle) => Ok(handle.to_string()),
393        VmValue::Dict(dict) => dict
394            .get("id")
395            .map(|id| id.display())
396            .filter(|id| !id.is_empty())
397            .ok_or_else(|| vm_error(format!("{builtin}: handle dict must contain id"))),
398        _ => Err(vm_error(format!(
399            "{builtin}: first argument must be a handle string or dict"
400        ))),
401    }
402}
403
404fn get_options_arg(args: &[VmValue], index: usize) -> BTreeMap<String, VmValue> {
405    args.get(index)
406        .and_then(|value| value.as_dict())
407        .cloned()
408        .unwrap_or_default()
409}
410
411fn merge_options(
412    base: &BTreeMap<String, VmValue>,
413    overrides: &BTreeMap<String, VmValue>,
414) -> BTreeMap<String, VmValue> {
415    let mut merged = base.clone();
416    for (key, value) in overrides {
417        merged.insert(key.clone(), value.clone());
418    }
419    merged
420}
421
422fn resolve_http_path(
423    builtin: &str,
424    path: &str,
425    access: crate::stdlib::sandbox::FsAccess,
426) -> Result<std::path::PathBuf, VmError> {
427    let resolved = crate::stdlib::process::resolve_source_relative_path(path);
428    crate::stdlib::sandbox::enforce_fs_path(builtin, &resolved, access)?;
429    Ok(resolved)
430}
431
432fn value_to_bytes(value: &VmValue) -> Vec<u8> {
433    match value {
434        VmValue::Bytes(bytes) => bytes.as_ref().clone(),
435        other => other.display().into_bytes(),
436    }
437}
438
439fn parse_multipart_field(value: &VmValue) -> Result<MultipartField, VmError> {
440    let dict = value
441        .as_dict()
442        .ok_or_else(|| vm_error("http: multipart entries must be dicts"))?;
443    let name = dict
444        .get("name")
445        .map(|value| value.display())
446        .filter(|value| !value.is_empty())
447        .ok_or_else(|| vm_error("http: multipart entry requires name"))?;
448    let content_type = dict
449        .get("content_type")
450        .or_else(|| dict.get("mime_type"))
451        .map(|value| value.display())
452        .filter(|value| !value.is_empty());
453
454    let mut filename = dict
455        .get("filename")
456        .map(|value| value.display())
457        .filter(|value| !value.is_empty());
458    let value = if let Some(path_value) = dict.get("path") {
459        let path = path_value.display();
460        let resolved = resolve_http_path(
461            "http multipart",
462            &path,
463            crate::stdlib::sandbox::FsAccess::Read,
464        )?;
465        if filename.is_none() {
466            filename = resolved
467                .file_name()
468                .and_then(|name| name.to_str())
469                .map(|name| name.to_string());
470        }
471        std::fs::read(&resolved).map_err(|error| {
472            vm_error(format!(
473                "http: failed to read multipart file {}: {error}",
474                resolved.display()
475            ))
476        })?
477    } else if let Some(base64_value) = dict.get("value_base64").or_else(|| dict.get("base64")) {
478        base64::engine::general_purpose::STANDARD
479            .decode(base64_value.display())
480            .map_err(|error| vm_error(format!("http: invalid multipart base64 value: {error}")))?
481    } else {
482        dict.get("value").map(value_to_bytes).ok_or_else(|| {
483            vm_error("http: multipart entry requires value, value_base64, or path")
484        })?
485    };
486
487    Ok(MultipartField {
488        name,
489        value,
490        filename,
491        content_type,
492    })
493}
494
495fn parse_multipart_request(
496    options: &BTreeMap<String, VmValue>,
497) -> Result<Option<MultipartRequest>, VmError> {
498    let Some(value) = options.get("multipart") else {
499        return Ok(None);
500    };
501    let VmValue::List(items) = value else {
502        return Err(vm_error("http: multipart must be a list"));
503    };
504    let parts = items
505        .iter()
506        .map(parse_multipart_field)
507        .collect::<Result<Vec<_>, _>>()?;
508    let mock_body = multipart_mock_body(&parts);
509    Ok(Some(MultipartRequest { parts, mock_body }))
510}
511
512fn multipart_mock_body(parts: &[MultipartField]) -> String {
513    let mut out = String::new();
514    for part in parts {
515        out.push_str("--");
516        out.push_str(MULTIPART_MOCK_BOUNDARY);
517        out.push_str("\r\nContent-Disposition: form-data; name=\"");
518        out.push_str(&part.name);
519        out.push('"');
520        if let Some(filename) = &part.filename {
521            out.push_str("; filename=\"");
522            out.push_str(filename);
523            out.push('"');
524        }
525        out.push_str("\r\n");
526        if let Some(content_type) = &part.content_type {
527            out.push_str("Content-Type: ");
528            out.push_str(content_type);
529            out.push_str("\r\n");
530        }
531        out.push_str("\r\n");
532        out.push_str(&String::from_utf8_lossy(&part.value));
533        out.push_str("\r\n");
534    }
535    out.push_str("--");
536    out.push_str(MULTIPART_MOCK_BOUNDARY);
537    out.push_str("--\r\n");
538    out
539}
540
541fn multipart_form(request: &MultipartRequest) -> Result<reqwest::multipart::Form, VmError> {
542    let mut form = reqwest::multipart::Form::new();
543    for field in &request.parts {
544        let mut part = reqwest::multipart::Part::bytes(field.value.clone());
545        if let Some(filename) = &field.filename {
546            part = part.file_name(filename.clone());
547        }
548        if let Some(content_type) = &field.content_type {
549            part = part.mime_str(content_type).map_err(|error| {
550                vm_error(format!("http: invalid multipart content_type: {error}"))
551            })?;
552        }
553        form = form.part(field.name.clone(), part);
554    }
555    Ok(form)
556}
557
558fn transport_limit_option(options: &BTreeMap<String, VmValue>, key: &str, default: usize) -> usize {
559    options
560        .get(key)
561        .and_then(|value| value.as_int())
562        .map(|value| value.max(0) as usize)
563        .unwrap_or(default)
564}
565
566fn receive_timeout_arg(args: &[VmValue], index: usize) -> u64 {
567    match args.get(index) {
568        Some(VmValue::Duration(ms)) => (*ms).max(0) as u64,
569        Some(value) => value
570            .as_int()
571            .map(|ms| ms.max(0) as u64)
572            .unwrap_or(DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS),
573        None => DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS,
574    }
575}
576
577fn timeout_event() -> VmValue {
578    let mut dict = BTreeMap::new();
579    dict.insert("type".to_string(), VmValue::String(Rc::from("timeout")));
580    VmValue::Dict(Rc::new(dict))
581}
582
583fn closed_event() -> VmValue {
584    let mut dict = BTreeMap::new();
585    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
586    VmValue::Dict(Rc::new(dict))
587}
588
589fn sse_server_closed_event() -> VmValue {
590    let mut dict = BTreeMap::new();
591    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
592    dict.insert("server_closed".to_string(), VmValue::Bool(true));
593    VmValue::Dict(Rc::new(dict))
594}
595
596fn record_transport_call(call: TransportMockCall) {
597    TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().push(call));
598}
599
600/// Extract URL, validate it, and pull an options dict from `args`.
601/// For methods with a body (POST/PUT/PATCH), the body is at index 1 and
602/// options at index 2; for methods without (GET/DELETE), options are at index 1.
603async fn http_verb_handler(
604    method: &str,
605    has_body: bool,
606    args: Vec<VmValue>,
607) -> Result<VmValue, VmError> {
608    let url = args.first().map(|a| a.display()).unwrap_or_default();
609    if url.is_empty() {
610        return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
611            "http_{}: URL is required",
612            method.to_ascii_lowercase()
613        )))));
614    }
615    let mut options = if has_body {
616        match (args.get(1), args.get(2)) {
617            (Some(VmValue::Dict(d)), None) => (**d).clone(),
618            (_, Some(VmValue::Dict(d))) => (**d).clone(),
619            _ => BTreeMap::new(),
620        }
621    } else {
622        match args.get(1) {
623            Some(VmValue::Dict(d)) => (**d).clone(),
624            _ => BTreeMap::new(),
625        }
626    };
627    if has_body && !(matches!(args.get(1), Some(VmValue::Dict(_))) && args.get(2).is_none()) {
628        let body = args.get(1).map(|a| a.display()).unwrap_or_default();
629        options.insert("body".to_string(), VmValue::String(Rc::from(body)));
630    }
631    vm_execute_http_request(method, &url, &options).await
632}
633
634fn vm_string(value: impl AsRef<str>) -> VmValue {
635    VmValue::String(Rc::from(value.as_ref()))
636}
637
638fn dict_value(entries: BTreeMap<String, VmValue>) -> VmValue {
639    VmValue::Dict(Rc::new(entries))
640}
641
642fn get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
643    match options.get(key) {
644        Some(VmValue::Bool(value)) => *value,
645        _ => default,
646    }
647}
648
649fn get_usize_option(
650    options: &BTreeMap<String, VmValue>,
651    key: &str,
652    default: usize,
653) -> Result<usize, VmError> {
654    match options.get(key).and_then(VmValue::as_int) {
655        Some(value) if value >= 0 => Ok(value as usize),
656        Some(_) => Err(vm_error(format!("http_server: {key} must be non-negative"))),
657        None => Ok(default),
658    }
659}
660
661fn get_optional_usize_option(
662    options: &BTreeMap<String, VmValue>,
663    key: &str,
664) -> Result<Option<usize>, VmError> {
665    match options.get(key).and_then(VmValue::as_int) {
666        Some(value) if value >= 0 => Ok(Some(value as usize)),
667        Some(_) => Err(vm_error(format!(
668            "http_server_route: {key} must be non-negative"
669        ))),
670        None => Ok(None),
671    }
672}
673
674fn server_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
675    handle_from_value(value, builtin)
676}
677
678fn closure_arg(args: &[VmValue], index: usize, builtin: &str) -> Result<Rc<VmClosure>, VmError> {
679    match args.get(index) {
680        Some(VmValue::Closure(closure)) => Ok(closure.clone()),
681        Some(other) => Err(vm_error(format!(
682            "{builtin}: argument {} must be a closure, got {}",
683            index + 1,
684            other.type_name()
685        ))),
686        None => Err(vm_error(format!(
687            "{builtin}: missing closure argument {}",
688            index + 1
689        ))),
690    }
691}
692
693fn http_server_handle_value(id: &str) -> VmValue {
694    let mut dict = BTreeMap::new();
695    dict.insert("id".to_string(), vm_string(id));
696    dict.insert("kind".to_string(), vm_string("http_server"));
697    dict_value(dict)
698}
699
700fn header_lookup_value(headers: &BTreeMap<String, VmValue>, name: &str) -> VmValue {
701    headers
702        .iter()
703        .find(|(candidate, _)| candidate.eq_ignore_ascii_case(name))
704        .map(|(_, value)| value.clone())
705        .unwrap_or(VmValue::Nil)
706}
707
708fn headers_from_value(value: &VmValue) -> BTreeMap<String, VmValue> {
709    match value {
710        VmValue::Dict(dict) => dict
711            .get("headers")
712            .and_then(VmValue::as_dict)
713            .map(|headers| {
714                headers
715                    .iter()
716                    .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
717                    .collect()
718            })
719            .unwrap_or_else(|| {
720                dict.iter()
721                    .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
722                    .collect()
723            }),
724        _ => BTreeMap::new(),
725    }
726}
727
728fn normalize_headers(value: Option<&VmValue>) -> BTreeMap<String, VmValue> {
729    match value.and_then(VmValue::as_dict) {
730        Some(headers) => headers
731            .iter()
732            .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
733            .collect(),
734        None => BTreeMap::new(),
735    }
736}
737
738fn percent_decode(input: &str) -> String {
739    let bytes = input.as_bytes();
740    let mut out = Vec::with_capacity(bytes.len());
741    let mut i = 0;
742    while i < bytes.len() {
743        if bytes[i] == b'+' {
744            out.push(b' ');
745            i += 1;
746            continue;
747        }
748        if bytes[i] == b'%' && i + 2 < bytes.len() {
749            if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
750                out.push((hi << 4) | lo);
751                i += 3;
752                continue;
753            }
754        }
755        out.push(bytes[i]);
756        i += 1;
757    }
758    String::from_utf8_lossy(&out).into_owned()
759}
760
761fn hex_val(byte: u8) -> Option<u8> {
762    match byte {
763        b'0'..=b'9' => Some(byte - b'0'),
764        b'a'..=b'f' => Some(byte - b'a' + 10),
765        b'A'..=b'F' => Some(byte - b'A' + 10),
766        _ => None,
767    }
768}
769
770fn split_path_and_query(raw_path: &str) -> (String, BTreeMap<String, VmValue>) {
771    let (path, query) = raw_path.split_once('?').unwrap_or((raw_path, ""));
772    let mut query_map = BTreeMap::new();
773    for pair in query.split('&').filter(|part| !part.is_empty()) {
774        let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
775        query_map.insert(percent_decode(key), vm_string(percent_decode(value)));
776    }
777    (
778        if path.is_empty() { "/" } else { path }.to_string(),
779        query_map,
780    )
781}
782
783fn request_body_bytes(input: &BTreeMap<String, VmValue>) -> Vec<u8> {
784    match input.get("raw_body").or_else(|| input.get("body")) {
785        Some(VmValue::Bytes(bytes)) => bytes.as_ref().clone(),
786        Some(value) => value.display().into_bytes(),
787        None => Vec::new(),
788    }
789}
790
791fn request_value(
792    method: &str,
793    path: &str,
794    path_params: BTreeMap<String, VmValue>,
795    mut query: BTreeMap<String, VmValue>,
796    input: &BTreeMap<String, VmValue>,
797    body_bytes: &[u8],
798    retain_raw_body: bool,
799) -> VmValue {
800    if let Some(explicit_query) = input.get("query").and_then(VmValue::as_dict) {
801        query.extend(
802            explicit_query
803                .iter()
804                .map(|(key, value)| (key.clone(), value.clone())),
805        );
806    }
807
808    let headers = normalize_headers(input.get("headers"));
809    let body = String::from_utf8_lossy(body_bytes).into_owned();
810    let mut request = BTreeMap::new();
811    request.insert("method".to_string(), vm_string(method));
812    request.insert("path".to_string(), vm_string(path));
813    let path_params = dict_value(path_params);
814    request.insert("path_params".to_string(), path_params.clone());
815    request.insert("params".to_string(), path_params);
816    request.insert("query".to_string(), dict_value(query));
817    request.insert("headers".to_string(), dict_value(headers));
818    request.insert("body".to_string(), vm_string(body));
819    request.insert(
820        "raw_body".to_string(),
821        if retain_raw_body {
822            VmValue::Bytes(Rc::new(body_bytes.to_vec()))
823        } else {
824            VmValue::Nil
825        },
826    );
827    request.insert(
828        "body_bytes".to_string(),
829        VmValue::Int(body_bytes.len() as i64),
830    );
831    request.insert(
832        "remote_addr".to_string(),
833        input
834            .get("remote_addr")
835            .or_else(|| input.get("remote"))
836            .map(|value| vm_string(value.display()))
837            .unwrap_or(VmValue::Nil),
838    );
839    request.insert(
840        "client_ip".to_string(),
841        input
842            .get("client_ip")
843            .or_else(|| input.get("remote_ip"))
844            .or_else(|| input.get("ip"))
845            .map(|value| vm_string(value.display()))
846            .unwrap_or(VmValue::Nil),
847    );
848    dict_value(request)
849}
850
851fn normalize_status(status: i64) -> i64 {
852    if (100..=999).contains(&status) {
853        status
854    } else {
855        500
856    }
857}
858
859fn response_with_kind(
860    status: i64,
861    mut headers: BTreeMap<String, VmValue>,
862    body: VmValue,
863    body_kind: &str,
864) -> VmValue {
865    let status = normalize_status(status);
866    let mut response = BTreeMap::new();
867    if body_kind == "json" && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
868    {
869        headers.insert(
870            "content-type".to_string(),
871            vm_string("application/json; charset=utf-8"),
872        );
873    } else if body_kind == "text"
874        && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
875    {
876        headers.insert(
877            "content-type".to_string(),
878            vm_string("text/plain; charset=utf-8"),
879        );
880    }
881    response.insert("status".to_string(), VmValue::Int(status));
882    response.insert("headers".to_string(), dict_value(headers));
883    response.insert(
884        "ok".to_string(),
885        VmValue::Bool((200..300).contains(&status)),
886    );
887    response.insert("body_kind".to_string(), vm_string(body_kind));
888    match body {
889        VmValue::Bytes(bytes) => {
890            response.insert(
891                "body".to_string(),
892                vm_string(String::from_utf8_lossy(&bytes)),
893            );
894            response.insert("raw_body".to_string(), VmValue::Bytes(bytes));
895        }
896        other => {
897            response.insert("body".to_string(), vm_string(other.display()));
898            response.insert(
899                "raw_body".to_string(),
900                VmValue::Bytes(Rc::new(other.display().into_bytes())),
901            );
902        }
903    }
904    dict_value(response)
905}
906
907fn normalize_response(value: VmValue) -> VmValue {
908    match value {
909        VmValue::Dict(dict) if dict.contains_key("status") => {
910            let status = dict.get("status").and_then(VmValue::as_int).unwrap_or(200);
911            let headers = dict
912                .get("headers")
913                .and_then(VmValue::as_dict)
914                .cloned()
915                .unwrap_or_default();
916            let body_kind = dict
917                .get("body_kind")
918                .or_else(|| dict.get("kind"))
919                .map(|value| value.display())
920                .unwrap_or_else(|| "text".to_string());
921            let body = dict
922                .get("raw_body")
923                .filter(|value| matches!(value, VmValue::Bytes(_)))
924                .or_else(|| dict.get("body"))
925                .cloned()
926                .unwrap_or(VmValue::Nil);
927            response_with_kind(status, headers, body, &body_kind)
928        }
929        VmValue::Nil => response_with_kind(204, BTreeMap::new(), VmValue::Nil, "text"),
930        other => response_with_kind(200, BTreeMap::new(), other, "text"),
931    }
932}
933
934fn body_limit_response(limit: usize, actual: usize) -> VmValue {
935    let mut headers = BTreeMap::new();
936    headers.insert(
937        "content-type".to_string(),
938        vm_string("text/plain; charset=utf-8"),
939    );
940    headers.insert("connection".to_string(), vm_string("close"));
941    headers.insert(
942        "x-harn-body-limit".to_string(),
943        vm_string(limit.to_string()),
944    );
945    response_with_kind(
946        413,
947        headers,
948        vm_string(format!("request body too large: {actual} > {limit} bytes")),
949        "text",
950    )
951}
952
953fn not_found_response(method: &str, path: &str) -> VmValue {
954    response_with_kind(
955        404,
956        BTreeMap::new(),
957        vm_string(format!("no route for {method} {path}")),
958        "text",
959    )
960}
961
962fn unavailable_response(message: &str) -> VmValue {
963    response_with_kind(503, BTreeMap::new(), vm_string(message), "text")
964}
965
966fn route_template_match(template: &str, path: &str) -> Option<BTreeMap<String, VmValue>> {
967    let template_segments: Vec<&str> = template.trim_matches('/').split('/').collect();
968    let path_segments: Vec<&str> = path.trim_matches('/').split('/').collect();
969    if template == "/" && path == "/" {
970        return Some(BTreeMap::new());
971    }
972    if template_segments.len() != path_segments.len() {
973        return None;
974    }
975    let mut params = BTreeMap::new();
976    for (tmpl, actual) in template_segments.iter().zip(path_segments.iter()) {
977        if tmpl.starts_with('{') && tmpl.ends_with('}') && tmpl.len() > 2 {
978            params.insert(
979                tmpl[1..tmpl.len() - 1].to_string(),
980                vm_string(percent_decode(actual)),
981            );
982        } else if tmpl.starts_with(':') && tmpl.len() > 1 {
983            params.insert(tmpl[1..].to_string(), vm_string(percent_decode(actual)));
984        } else if tmpl != actual {
985            return None;
986        }
987    }
988    Some(params)
989}
990
991fn matching_route(
992    server: &HttpServer,
993    method: &str,
994    path: &str,
995) -> Option<(HttpServerRoute, BTreeMap<String, VmValue>)> {
996    server.routes.iter().find_map(|route| {
997        if route.method != "*" && !route.method.eq_ignore_ascii_case(method) {
998            return None;
999        }
1000        route_template_match(&route.template, path).map(|params| (route.clone(), params))
1001    })
1002}
1003
1004async fn call_server_closure(
1005    closure: &Rc<VmClosure>,
1006    args: &[VmValue],
1007    builtin: &str,
1008) -> Result<VmValue, VmError> {
1009    let mut vm = crate::vm::clone_async_builtin_child_vm()
1010        .ok_or_else(|| vm_error(format!("{builtin}: requires an async builtin VM context")))?;
1011    vm.call_closure_pub(closure, args).await
1012}
1013
1014fn value_is_response(value: &VmValue) -> bool {
1015    matches!(value, VmValue::Dict(dict) if dict.contains_key("status"))
1016}
1017
1018async fn run_http_server_request(server_id: &str, request: VmValue) -> Result<VmValue, VmError> {
1019    let server = HTTP_SERVERS.with(|servers| servers.borrow().get(server_id).cloned());
1020    let Some(server) = server else {
1021        return Err(vm_error(format!(
1022            "http_server_request: unknown server handle '{server_id}'"
1023        )));
1024    };
1025    if server.shutdown {
1026        return Ok(unavailable_response("server is shut down"));
1027    }
1028    if !server.ready {
1029        return Ok(unavailable_response("server is not ready"));
1030    }
1031    if let Some(readiness) = &server.readiness {
1032        let ready = call_server_closure(
1033            readiness,
1034            &[http_server_handle_value(server_id)],
1035            "http_server_request",
1036        )
1037        .await?;
1038        if !ready.is_truthy() {
1039            return Ok(unavailable_response("server is not ready"));
1040        }
1041    }
1042
1043    let input = request.as_dict().cloned().unwrap_or_default();
1044    let method = input
1045        .get("method")
1046        .map(|value| value.display())
1047        .filter(|value| !value.is_empty())
1048        .unwrap_or_else(|| "GET".to_string())
1049        .to_ascii_uppercase();
1050    let raw_path = input
1051        .get("path")
1052        .map(|value| value.display())
1053        .filter(|value| !value.is_empty())
1054        .unwrap_or_else(|| "/".to_string());
1055    let (path, query) = split_path_and_query(&raw_path);
1056    let body_bytes = request_body_bytes(&input);
1057
1058    let Some((route, path_params)) = matching_route(&server, &method, &path) else {
1059        return Ok(not_found_response(&method, &path));
1060    };
1061
1062    let limit = route.max_body_bytes.unwrap_or(server.max_body_bytes);
1063    if body_bytes.len() > limit {
1064        return Ok(body_limit_response(limit, body_bytes.len()));
1065    }
1066    let retain_raw_body = route.retain_raw_body.unwrap_or(server.retain_raw_body);
1067    let mut req = request_value(
1068        &method,
1069        &path,
1070        path_params,
1071        query,
1072        &input,
1073        &body_bytes,
1074        retain_raw_body,
1075    );
1076
1077    for before in &server.before {
1078        let result = call_server_closure(before, &[req.clone()], "http_server_request").await?;
1079        if value_is_response(&result) {
1080            return Ok(normalize_response(result));
1081        }
1082        if !matches!(result, VmValue::Nil) {
1083            req = result;
1084        }
1085    }
1086
1087    let handler_result =
1088        call_server_closure(&route.handler, &[req.clone()], "http_server_request").await?;
1089    let mut response = normalize_response(handler_result);
1090
1091    for after in &server.after {
1092        let result = call_server_closure(
1093            after,
1094            &[response.clone(), req.clone()],
1095            "http_server_request",
1096        )
1097        .await?;
1098        if !matches!(result, VmValue::Nil) {
1099            response = normalize_response(result);
1100        }
1101    }
1102
1103    Ok(response)
1104}
1105
1106/// Register HTTP builtins on a VM.
1107pub fn register_http_builtins(vm: &mut Vm) {
1108    vm.register_builtin("http_server_tls_plain", |_args, _out| {
1109        Ok(http_server_tls_config_value(
1110            "plain",
1111            false,
1112            "http",
1113            false,
1114            BTreeMap::new(),
1115        ))
1116    });
1117    vm.register_builtin("http_server_tls_edge", |args, _out| {
1118        let options = get_options_arg(args, 0);
1119        Ok(http_server_tls_config_value(
1120            "edge",
1121            false,
1122            "https",
1123            vm_get_bool_option(&options, "hsts", true),
1124            hsts_options(&options),
1125        ))
1126    });
1127    vm.register_builtin("http_server_tls_pem", |args, _out| {
1128        if args.len() < 2 {
1129            return Err(vm_error(
1130                "http_server_tls_pem: requires cert path and key path",
1131            ));
1132        }
1133        let cert_path = args[0].display();
1134        let key_path = args[1].display();
1135        if !std::path::Path::new(&cert_path).is_file() {
1136            return Err(vm_error(format!(
1137                "http_server_tls_pem: certificate not found: {cert_path}"
1138            )));
1139        }
1140        if !std::path::Path::new(&key_path).is_file() {
1141            return Err(vm_error(format!(
1142                "http_server_tls_pem: private key not found: {key_path}"
1143            )));
1144        }
1145        let mut extra = BTreeMap::new();
1146        extra.insert(
1147            "cert_path".to_string(),
1148            VmValue::String(Rc::from(cert_path)),
1149        );
1150        extra.insert("key_path".to_string(), VmValue::String(Rc::from(key_path)));
1151        Ok(http_server_tls_config_value(
1152            "pem", true, "https", true, extra,
1153        ))
1154    });
1155    vm.register_builtin("http_server_tls_self_signed_dev", |args, _out| {
1156        let hosts = tls_hosts_arg(args.first())?;
1157        let cert = rcgen::generate_simple_self_signed(hosts.clone()).map_err(|error| {
1158            vm_error(format!(
1159                "http_server_tls_self_signed_dev: failed to generate certificate: {error}"
1160            ))
1161        })?;
1162        let mut extra = BTreeMap::new();
1163        extra.insert(
1164            "hosts".to_string(),
1165            VmValue::List(Rc::new(
1166                hosts
1167                    .into_iter()
1168                    .map(|host| VmValue::String(Rc::from(host)))
1169                    .collect(),
1170            )),
1171        );
1172        extra.insert(
1173            "cert_pem".to_string(),
1174            VmValue::String(Rc::from(cert.cert.pem())),
1175        );
1176        extra.insert(
1177            "key_pem".to_string(),
1178            VmValue::String(Rc::from(cert.key_pair.serialize_pem())),
1179        );
1180        Ok(http_server_tls_config_value(
1181            "self_signed_dev",
1182            true,
1183            "https",
1184            false,
1185            extra,
1186        ))
1187    });
1188    vm.register_builtin("http_server_security_headers", |args, _out| {
1189        let Some(VmValue::Dict(config)) = args.first() else {
1190            return Err(vm_error(
1191                "http_server_security_headers: requires a TLS config dict",
1192            ));
1193        };
1194        Ok(VmValue::Dict(Rc::new(http_server_security_headers(config))))
1195    });
1196
1197    vm.register_async_builtin("http_get", |args| async move {
1198        http_verb_handler("GET", false, args).await
1199    });
1200    vm.register_async_builtin("http_post", |args| async move {
1201        http_verb_handler("POST", true, args).await
1202    });
1203    vm.register_async_builtin("http_put", |args| async move {
1204        http_verb_handler("PUT", true, args).await
1205    });
1206    vm.register_async_builtin("http_patch", |args| async move {
1207        http_verb_handler("PATCH", true, args).await
1208    });
1209    vm.register_async_builtin("http_delete", |args| async move {
1210        http_verb_handler("DELETE", false, args).await
1211    });
1212
1213    // --- Inbound HTTP server primitives ---
1214
1215    vm.register_builtin("http_server", |args, _out| {
1216        let options = get_options_arg(args, 0);
1217        let server = HttpServer {
1218            routes: Vec::new(),
1219            before: Vec::new(),
1220            after: Vec::new(),
1221            ready: get_bool_option(&options, "ready", true),
1222            readiness: None,
1223            shutdown_hooks: Vec::new(),
1224            shutdown: false,
1225            max_body_bytes: get_usize_option(
1226                &options,
1227                "max_body_bytes",
1228                DEFAULT_SERVER_MAX_BODY_BYTES,
1229            )?,
1230            retain_raw_body: get_bool_option(&options, "retain_raw_body", true),
1231        };
1232        let id = next_transport_handle("http-server");
1233        HTTP_SERVERS.with(|servers| {
1234            let mut servers = servers.borrow_mut();
1235            if servers.len() >= MAX_HTTP_SERVERS {
1236                return Err(vm_error(format!(
1237                    "http_server: maximum open servers ({MAX_HTTP_SERVERS}) reached"
1238                )));
1239            }
1240            servers.insert(id.clone(), server);
1241            Ok(())
1242        })?;
1243        Ok(http_server_handle_value(&id))
1244    });
1245
1246    vm.register_builtin("http_server_route", |args, _out| {
1247        if args.len() < 4 {
1248            return Err(vm_error(
1249                "http_server_route: requires server, method, path template, and handler",
1250            ));
1251        }
1252        let server_id = server_from_value(&args[0], "http_server_route")?;
1253        let method = args[1].display().to_ascii_uppercase();
1254        if method.is_empty() {
1255            return Err(vm_error("http_server_route: method is required"));
1256        }
1257        let template = args[2].display();
1258        if !template.starts_with('/') {
1259            return Err(vm_error(
1260                "http_server_route: path template must start with '/'",
1261            ));
1262        }
1263        let handler = closure_arg(args, 3, "http_server_route")?;
1264        let options = get_options_arg(args, 4);
1265        let route = HttpServerRoute {
1266            method,
1267            template,
1268            handler,
1269            max_body_bytes: get_optional_usize_option(&options, "max_body_bytes")?,
1270            retain_raw_body: match options.get("retain_raw_body") {
1271                Some(VmValue::Bool(value)) => Some(*value),
1272                _ => None,
1273            },
1274        };
1275        HTTP_SERVERS.with(|servers| {
1276            let mut servers = servers.borrow_mut();
1277            let server = servers.get_mut(&server_id).ok_or_else(|| {
1278                vm_error(format!("http_server_route: unknown server '{server_id}'"))
1279            })?;
1280            server.routes.push(route);
1281            Ok::<_, VmError>(())
1282        })?;
1283        Ok(http_server_handle_value(&server_id))
1284    });
1285
1286    vm.register_builtin("http_server_before", |args, _out| {
1287        if args.len() < 2 {
1288            return Err(vm_error("http_server_before: requires server and handler"));
1289        }
1290        let server_id = server_from_value(&args[0], "http_server_before")?;
1291        let handler = closure_arg(args, 1, "http_server_before")?;
1292        HTTP_SERVERS.with(|servers| {
1293            let mut servers = servers.borrow_mut();
1294            let server = servers.get_mut(&server_id).ok_or_else(|| {
1295                vm_error(format!("http_server_before: unknown server '{server_id}'"))
1296            })?;
1297            server.before.push(handler);
1298            Ok::<_, VmError>(())
1299        })?;
1300        Ok(http_server_handle_value(&server_id))
1301    });
1302
1303    vm.register_builtin("http_server_after", |args, _out| {
1304        if args.len() < 2 {
1305            return Err(vm_error("http_server_after: requires server and handler"));
1306        }
1307        let server_id = server_from_value(&args[0], "http_server_after")?;
1308        let handler = closure_arg(args, 1, "http_server_after")?;
1309        HTTP_SERVERS.with(|servers| {
1310            let mut servers = servers.borrow_mut();
1311            let server = servers.get_mut(&server_id).ok_or_else(|| {
1312                vm_error(format!("http_server_after: unknown server '{server_id}'"))
1313            })?;
1314            server.after.push(handler);
1315            Ok::<_, VmError>(())
1316        })?;
1317        Ok(http_server_handle_value(&server_id))
1318    });
1319
1320    vm.register_async_builtin("http_server_request", |args| async move {
1321        if args.len() < 2 {
1322            return Err(vm_error("http_server_request: requires server and request"));
1323        }
1324        let server_id = server_from_value(&args[0], "http_server_request")?;
1325        run_http_server_request(&server_id, args[1].clone()).await
1326    });
1327
1328    vm.register_async_builtin("http_server_test", |args| async move {
1329        if args.len() < 2 {
1330            return Err(vm_error("http_server_test: requires server and request"));
1331        }
1332        let server_id = server_from_value(&args[0], "http_server_test")?;
1333        run_http_server_request(&server_id, args[1].clone()).await
1334    });
1335
1336    vm.register_builtin("http_server_set_ready", |args, _out| {
1337        if args.len() < 2 {
1338            return Err(vm_error(
1339                "http_server_set_ready: requires server and ready bool",
1340            ));
1341        }
1342        let server_id = server_from_value(&args[0], "http_server_set_ready")?;
1343        let ready = matches!(args[1], VmValue::Bool(true));
1344        HTTP_SERVERS.with(|servers| {
1345            let mut servers = servers.borrow_mut();
1346            let server = servers.get_mut(&server_id).ok_or_else(|| {
1347                vm_error(format!(
1348                    "http_server_set_ready: unknown server '{server_id}'"
1349                ))
1350            })?;
1351            server.ready = ready;
1352            Ok::<_, VmError>(())
1353        })?;
1354        Ok(VmValue::Bool(ready))
1355    });
1356
1357    vm.register_builtin("http_server_readiness", |args, _out| {
1358        if args.len() < 2 {
1359            return Err(vm_error(
1360                "http_server_readiness: requires server and readiness closure",
1361            ));
1362        }
1363        let server_id = server_from_value(&args[0], "http_server_readiness")?;
1364        let handler = closure_arg(args, 1, "http_server_readiness")?;
1365        HTTP_SERVERS.with(|servers| {
1366            let mut servers = servers.borrow_mut();
1367            let server = servers.get_mut(&server_id).ok_or_else(|| {
1368                vm_error(format!(
1369                    "http_server_readiness: unknown server '{server_id}'"
1370                ))
1371            })?;
1372            server.readiness = Some(handler);
1373            Ok::<_, VmError>(())
1374        })?;
1375        Ok(http_server_handle_value(&server_id))
1376    });
1377
1378    vm.register_async_builtin("http_server_ready", |args| async move {
1379        let Some(server_arg) = args.first() else {
1380            return Err(vm_error("http_server_ready: requires server"));
1381        };
1382        let server_id = server_from_value(server_arg, "http_server_ready")?;
1383        let server = HTTP_SERVERS.with(|servers| servers.borrow().get(&server_id).cloned());
1384        let Some(server) = server else {
1385            return Err(vm_error(format!(
1386                "http_server_ready: unknown server '{server_id}'"
1387            )));
1388        };
1389        if server.shutdown {
1390            return Ok(VmValue::Bool(false));
1391        }
1392        let Some(readiness) = server.readiness else {
1393            return Ok(VmValue::Bool(server.ready));
1394        };
1395        let result = call_server_closure(
1396            &readiness,
1397            &[http_server_handle_value(&server_id)],
1398            "http_server_ready",
1399        )
1400        .await?;
1401        Ok(VmValue::Bool(result.is_truthy()))
1402    });
1403
1404    vm.register_builtin("http_server_on_shutdown", |args, _out| {
1405        if args.len() < 2 {
1406            return Err(vm_error(
1407                "http_server_on_shutdown: requires server and handler",
1408            ));
1409        }
1410        let server_id = server_from_value(&args[0], "http_server_on_shutdown")?;
1411        let handler = closure_arg(args, 1, "http_server_on_shutdown")?;
1412        HTTP_SERVERS.with(|servers| {
1413            let mut servers = servers.borrow_mut();
1414            let server = servers.get_mut(&server_id).ok_or_else(|| {
1415                vm_error(format!(
1416                    "http_server_on_shutdown: unknown server '{server_id}'"
1417                ))
1418            })?;
1419            server.shutdown_hooks.push(handler);
1420            Ok::<_, VmError>(())
1421        })?;
1422        Ok(http_server_handle_value(&server_id))
1423    });
1424
1425    vm.register_async_builtin("http_server_shutdown", |args| async move {
1426        let Some(server_arg) = args.first() else {
1427            return Err(vm_error("http_server_shutdown: requires server"));
1428        };
1429        let server_id = server_from_value(server_arg, "http_server_shutdown")?;
1430        let hooks = HTTP_SERVERS.with(|servers| {
1431            let mut servers = servers.borrow_mut();
1432            let server = servers.get_mut(&server_id).ok_or_else(|| {
1433                vm_error(format!(
1434                    "http_server_shutdown: unknown server '{server_id}'"
1435                ))
1436            })?;
1437            server.shutdown = true;
1438            Ok::<_, VmError>(server.shutdown_hooks.clone())
1439        })?;
1440        for hook in hooks {
1441            let _ = call_server_closure(
1442                &hook,
1443                &[http_server_handle_value(&server_id)],
1444                "http_server_shutdown",
1445            )
1446            .await?;
1447        }
1448        Ok(VmValue::Bool(true))
1449    });
1450
1451    vm.register_builtin("http_response", |args, _out| {
1452        let status = args.first().and_then(VmValue::as_int).unwrap_or(200);
1453        let body = args.get(1).cloned().unwrap_or(VmValue::Nil);
1454        let headers = args
1455            .get(2)
1456            .and_then(VmValue::as_dict)
1457            .cloned()
1458            .unwrap_or_default();
1459        Ok(response_with_kind(status, headers, body, "text"))
1460    });
1461
1462    vm.register_builtin("http_response_text", |args, _out| {
1463        let body = args.first().cloned().unwrap_or(VmValue::Nil);
1464        let options = get_options_arg(args, 1);
1465        let status = options
1466            .get("status")
1467            .and_then(VmValue::as_int)
1468            .unwrap_or(200);
1469        let headers = options
1470            .get("headers")
1471            .and_then(VmValue::as_dict)
1472            .cloned()
1473            .unwrap_or_default();
1474        Ok(response_with_kind(status, headers, body, "text"))
1475    });
1476
1477    vm.register_builtin("http_response_json", |args, _out| {
1478        let body = args
1479            .first()
1480            .map(crate::stdlib::json::vm_value_to_json)
1481            .map(vm_string)
1482            .unwrap_or_else(|| vm_string("null"));
1483        let options = get_options_arg(args, 1);
1484        let status = options
1485            .get("status")
1486            .and_then(VmValue::as_int)
1487            .unwrap_or(200);
1488        let headers = options
1489            .get("headers")
1490            .and_then(VmValue::as_dict)
1491            .cloned()
1492            .unwrap_or_default();
1493        Ok(response_with_kind(status, headers, body, "json"))
1494    });
1495
1496    vm.register_builtin("http_response_bytes", |args, _out| {
1497        let body = match args.first() {
1498            Some(VmValue::Bytes(bytes)) => VmValue::Bytes(bytes.clone()),
1499            Some(value) => VmValue::Bytes(Rc::new(value.display().into_bytes())),
1500            None => VmValue::Bytes(Rc::new(Vec::new())),
1501        };
1502        let options = get_options_arg(args, 1);
1503        let status = options
1504            .get("status")
1505            .and_then(VmValue::as_int)
1506            .unwrap_or(200);
1507        let headers = options
1508            .get("headers")
1509            .and_then(VmValue::as_dict)
1510            .cloned()
1511            .unwrap_or_default();
1512        Ok(response_with_kind(status, headers, body, "bytes"))
1513    });
1514
1515    vm.register_builtin("http_header", |args, _out| {
1516        if args.len() < 2 {
1517            return Err(vm_error(
1518                "http_header: requires headers/request/response and name",
1519            ));
1520        }
1521        let headers = headers_from_value(&args[0]);
1522        Ok(header_lookup_value(&headers, &args[1].display()))
1523    });
1524
1525    // --- Mock HTTP builtins ---
1526
1527    // http_mock(method, url_pattern, response) -> nil
1528    //
1529    // Calling http_mock again with the same (method, url_pattern) tuple
1530    // *replaces* the prior mock for that target — tests can override a
1531    // per-case response without first calling http_mock_clear().
1532    vm.register_builtin("http_mock", |args, _out| {
1533        let method = args.first().map(|a| a.display()).unwrap_or_default();
1534        let url_pattern = args.get(1).map(|a| a.display()).unwrap_or_default();
1535        let response = args
1536            .get(2)
1537            .and_then(|a| a.as_dict())
1538            .cloned()
1539            .unwrap_or_default();
1540        let responses = parse_mock_responses(&response);
1541
1542        register_http_mock(method, url_pattern, responses);
1543        Ok(VmValue::Nil)
1544    });
1545
1546    // http_mock_clear() -> nil
1547    vm.register_builtin("http_mock_clear", |_args, _out| {
1548        clear_http_mocks();
1549        HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1550        Ok(VmValue::Nil)
1551    });
1552
1553    // http_mock_calls(options?) -> list of {method, url, headers, body}
1554    vm.register_builtin("http_mock_calls", |args, _out| {
1555        let options = get_options_arg(args, 0);
1556        let include_sensitive = get_bool_option(&options, "include_sensitive", false)
1557            || get_bool_option(&options, "include_sensitive_headers", false);
1558        let redact_sensitive = get_bool_option(
1559            &options,
1560            "redact_sensitive",
1561            get_bool_option(&options, "redact_headers", true),
1562        ) && !include_sensitive;
1563        Ok(VmValue::List(Rc::new(http_mock_calls_value(
1564            redact_sensitive,
1565        ))))
1566    });
1567
1568    vm.register_async_builtin("http_request", |args| async move {
1569        let method = args
1570            .first()
1571            .map(|a| a.display())
1572            .unwrap_or_default()
1573            .to_uppercase();
1574        if method.is_empty() {
1575            return Err(VmError::Thrown(VmValue::String(Rc::from(
1576                "http_request: method is required",
1577            ))));
1578        }
1579        let url = args.get(1).map(|a| a.display()).unwrap_or_default();
1580        if url.is_empty() {
1581            return Err(VmError::Thrown(VmValue::String(Rc::from(
1582                "http_request: URL is required",
1583            ))));
1584        }
1585        let options = match args.get(2) {
1586            Some(VmValue::Dict(d)) => (**d).clone(),
1587            _ => BTreeMap::new(),
1588        };
1589        vm_execute_http_request(&method, &url, &options).await
1590    });
1591
1592    vm.register_async_builtin("http_download", |args| async move {
1593        let url = args.first().map(|a| a.display()).unwrap_or_default();
1594        if url.is_empty() {
1595            return Err(vm_error("http_download: URL is required"));
1596        }
1597        let dst_path = args.get(1).map(|a| a.display()).unwrap_or_default();
1598        if dst_path.is_empty() {
1599            return Err(vm_error("http_download: destination path is required"));
1600        }
1601        let options = get_options_arg(&args, 2);
1602        vm_http_download(&url, &dst_path, &options).await
1603    });
1604
1605    vm.register_async_builtin("http_stream_open", |args| async move {
1606        let url = args.first().map(|a| a.display()).unwrap_or_default();
1607        if url.is_empty() {
1608            return Err(vm_error("http_stream_open: URL is required"));
1609        }
1610        let options = get_options_arg(&args, 1);
1611        vm_http_stream_open(&url, &options).await
1612    });
1613
1614    vm.register_async_builtin("http_stream_read", |args| async move {
1615        let Some(handle) = args.first() else {
1616            return Err(vm_error("http_stream_read: requires a stream handle"));
1617        };
1618        let stream_id = handle_from_value(handle, "http_stream_read")?;
1619        let max_bytes = args
1620            .get(1)
1621            .and_then(|value| value.as_int())
1622            .map(|value| value.max(1) as usize)
1623            .unwrap_or(DEFAULT_MAX_MESSAGE_BYTES);
1624        vm_http_stream_read(&stream_id, max_bytes).await
1625    });
1626
1627    vm.register_builtin("http_stream_info", |args, _out| {
1628        let Some(handle) = args.first() else {
1629            return Err(vm_error("http_stream_info: requires a stream handle"));
1630        };
1631        let stream_id = handle_from_value(handle, "http_stream_info")?;
1632        vm_http_stream_info(&stream_id)
1633    });
1634
1635    vm.register_builtin("http_stream_close", |args, _out| {
1636        let Some(handle) = args.first() else {
1637            return Err(vm_error("http_stream_close: requires a stream handle"));
1638        };
1639        let stream_id = handle_from_value(handle, "http_stream_close")?;
1640        let removed = HTTP_STREAMS.with(|streams| streams.borrow_mut().remove(&stream_id));
1641        Ok(VmValue::Bool(removed.is_some()))
1642    });
1643
1644    vm.register_builtin("http_session", |args, _out| {
1645        let options = get_options_arg(args, 0);
1646        let config = parse_http_options(&options);
1647        let client = build_http_client(&config)?;
1648        let id = next_transport_handle("http-session");
1649        HTTP_SESSIONS.with(|sessions| {
1650            let mut sessions = sessions.borrow_mut();
1651            if sessions.len() >= MAX_HTTP_SESSIONS {
1652                return Err(vm_error(format!(
1653                    "http_session: maximum open sessions ({MAX_HTTP_SESSIONS}) reached"
1654                )));
1655            }
1656            sessions.insert(id.clone(), HttpSession { client, options });
1657            Ok(())
1658        })?;
1659        Ok(VmValue::String(Rc::from(id)))
1660    });
1661
1662    vm.register_async_builtin("http_session_request", |args| async move {
1663        if args.len() < 3 {
1664            return Err(vm_error(
1665                "http_session_request: requires session, method, and URL",
1666            ));
1667        }
1668        let session_id = handle_from_value(&args[0], "http_session_request")?;
1669        let method = args[1].display().to_uppercase();
1670        if method.is_empty() {
1671            return Err(vm_error("http_session_request: method is required"));
1672        }
1673        let url = args[2].display();
1674        if url.is_empty() {
1675            return Err(vm_error("http_session_request: URL is required"));
1676        }
1677        let options = get_options_arg(&args, 3);
1678        vm_execute_http_session_request(&session_id, &method, &url, &options).await
1679    });
1680
1681    vm.register_builtin("http_session_close", |args, _out| {
1682        let Some(handle) = args.first() else {
1683            return Err(vm_error("http_session_close: requires a session handle"));
1684        };
1685        let session_id = handle_from_value(handle, "http_session_close")?;
1686        let removed = HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().remove(&session_id));
1687        Ok(VmValue::Bool(removed.is_some()))
1688    });
1689
1690    vm.register_builtin("sse_mock", |args, _out| {
1691        let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1692        if url_pattern.is_empty() {
1693            return Err(vm_error("sse_mock: URL pattern is required"));
1694        }
1695        let events = parse_mock_stream_events(args.get(1));
1696        SSE_MOCKS.with(|mocks| {
1697            mocks.borrow_mut().push(SseMock {
1698                url_pattern,
1699                events,
1700            });
1701        });
1702        Ok(VmValue::Nil)
1703    });
1704
1705    vm.register_async_builtin("sse_connect", |args| async move {
1706        let method = args
1707            .first()
1708            .map(|arg| arg.display())
1709            .filter(|method| !method.is_empty())
1710            .unwrap_or_else(|| "GET".to_string())
1711            .to_uppercase();
1712        let url = args.get(1).map(|arg| arg.display()).unwrap_or_default();
1713        if url.is_empty() {
1714            return Err(vm_error("sse_connect: URL is required"));
1715        }
1716        let options = get_options_arg(&args, 2);
1717        vm_sse_connect(&method, &url, &options).await
1718    });
1719
1720    vm.register_async_builtin("sse_receive", |args| async move {
1721        let Some(handle) = args.first() else {
1722            return Err(vm_error("sse_receive: requires a stream handle"));
1723        };
1724        let stream_id = handle_from_value(handle, "sse_receive")?;
1725        let timeout_ms = receive_timeout_arg(&args, 1);
1726        vm_sse_receive(&stream_id, timeout_ms).await
1727    });
1728
1729    vm.register_builtin("sse_event", |args, _out| {
1730        let Some(event) = args.first() else {
1731            return Err(vm_error("sse_event: requires event data or an event dict"));
1732        };
1733        let options = get_options_arg(args, 1);
1734        Ok(VmValue::String(Rc::from(vm_sse_event_frame(
1735            event, &options,
1736        )?)))
1737    });
1738
1739    vm.register_builtin("sse_server_response", |args, _out| {
1740        let options = get_options_arg(args, 0);
1741        vm_sse_server_response(&options)
1742    });
1743
1744    vm.register_builtin("sse_server_send", |args, _out| {
1745        if args.len() < 2 {
1746            return Err(vm_error("sse_server_send: requires stream and event"));
1747        }
1748        let stream_id = handle_from_value(&args[0], "sse_server_send")?;
1749        let options = get_options_arg(args, 2);
1750        vm_sse_server_send(&stream_id, &args[1], &options)
1751    });
1752
1753    vm.register_builtin("sse_server_heartbeat", |args, _out| {
1754        let Some(handle) = args.first() else {
1755            return Err(vm_error("sse_server_heartbeat: requires a stream handle"));
1756        };
1757        let stream_id = handle_from_value(handle, "sse_server_heartbeat")?;
1758        vm_sse_server_heartbeat(&stream_id, args.get(1))
1759    });
1760
1761    vm.register_builtin("sse_server_flush", |args, _out| {
1762        let Some(handle) = args.first() else {
1763            return Err(vm_error("sse_server_flush: requires a stream handle"));
1764        };
1765        let stream_id = handle_from_value(handle, "sse_server_flush")?;
1766        vm_sse_server_flush(&stream_id)
1767    });
1768
1769    vm.register_builtin("sse_server_close", |args, _out| {
1770        let Some(handle) = args.first() else {
1771            return Err(vm_error("sse_server_close: requires a stream handle"));
1772        };
1773        let stream_id = handle_from_value(handle, "sse_server_close")?;
1774        vm_sse_server_close(&stream_id)
1775    });
1776
1777    vm.register_builtin("sse_server_cancel", |args, _out| {
1778        let Some(handle) = args.first() else {
1779            return Err(vm_error("sse_server_cancel: requires a stream handle"));
1780        };
1781        let stream_id = handle_from_value(handle, "sse_server_cancel")?;
1782        vm_sse_server_cancel(&stream_id, args.get(1))
1783    });
1784
1785    vm.register_builtin("sse_server_status", |args, _out| {
1786        let Some(handle) = args.first() else {
1787            return Err(vm_error("sse_server_status: requires a stream handle"));
1788        };
1789        let stream_id = handle_from_value(handle, "sse_server_status")?;
1790        vm_sse_server_status(&stream_id)
1791    });
1792
1793    vm.register_builtin("sse_server_disconnected", |args, _out| {
1794        let Some(handle) = args.first() else {
1795            return Err(vm_error(
1796                "sse_server_disconnected: requires a stream handle",
1797            ));
1798        };
1799        let stream_id = handle_from_value(handle, "sse_server_disconnected")?;
1800        vm_sse_server_observed_bool(&stream_id, "sse_server_disconnected", |handle| {
1801            handle.disconnected
1802        })
1803    });
1804
1805    vm.register_builtin("sse_server_cancelled", |args, _out| {
1806        let Some(handle) = args.first() else {
1807            return Err(vm_error("sse_server_cancelled: requires a stream handle"));
1808        };
1809        let stream_id = handle_from_value(handle, "sse_server_cancelled")?;
1810        vm_sse_server_observed_bool(&stream_id, "sse_server_cancelled", |handle| {
1811            handle.cancelled
1812        })
1813    });
1814
1815    vm.register_builtin("sse_server_mock_receive", |args, _out| {
1816        let Some(handle) = args.first() else {
1817            return Err(vm_error(
1818                "sse_server_mock_receive: requires a stream handle",
1819            ));
1820        };
1821        let stream_id = handle_from_value(handle, "sse_server_mock_receive")?;
1822        vm_sse_server_mock_receive(&stream_id)
1823    });
1824
1825    vm.register_builtin("sse_server_mock_disconnect", |args, _out| {
1826        let Some(handle) = args.first() else {
1827            return Err(vm_error(
1828                "sse_server_mock_disconnect: requires a stream handle",
1829            ));
1830        };
1831        let stream_id = handle_from_value(handle, "sse_server_mock_disconnect")?;
1832        vm_sse_server_mock_disconnect(&stream_id)
1833    });
1834
1835    vm.register_builtin("sse_close", |args, _out| {
1836        let Some(handle) = args.first() else {
1837            return Err(vm_error("sse_close: requires a stream handle"));
1838        };
1839        let stream_id = handle_from_value(handle, "sse_close")?;
1840        let removed = SSE_HANDLES.with(|handles| {
1841            let mut handles = handles.borrow_mut();
1842            let removed = handles.remove(&stream_id);
1843            if let Some(handle) = &removed {
1844                if let SseHandleKind::Real(stream) = &handle.kind {
1845                    if let Ok(mut stream) = stream.try_lock() {
1846                        stream.close();
1847                    }
1848                }
1849            }
1850            removed
1851        });
1852        Ok(VmValue::Bool(removed.is_some()))
1853    });
1854
1855    vm.register_builtin("websocket_mock", |args, _out| {
1856        let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1857        if url_pattern.is_empty() {
1858            return Err(vm_error("websocket_mock: URL pattern is required"));
1859        }
1860        let (messages, echo) = parse_websocket_mock(args.get(1));
1861        WEBSOCKET_MOCKS.with(|mocks| {
1862            mocks.borrow_mut().push(WebSocketMock {
1863                url_pattern,
1864                messages,
1865                echo,
1866            });
1867        });
1868        Ok(VmValue::Nil)
1869    });
1870
1871    vm.register_async_builtin("websocket_connect", |args| async move {
1872        let url = args.first().map(|arg| arg.display()).unwrap_or_default();
1873        if url.is_empty() {
1874            return Err(vm_error("websocket_connect: URL is required"));
1875        }
1876        let options = get_options_arg(&args, 1);
1877        vm_websocket_connect(&url, &options).await
1878    });
1879
1880    vm.register_builtin("websocket_server", |args, _out| {
1881        let bind = args
1882            .first()
1883            .map(|arg| arg.display())
1884            .filter(|bind| !bind.is_empty())
1885            .unwrap_or_else(|| "127.0.0.1:0".to_string());
1886        let options = get_options_arg(args, 1);
1887        vm_websocket_server(&bind, &options)
1888    });
1889
1890    vm.register_builtin("websocket_route", |args, _out| {
1891        if args.len() < 2 {
1892            return Err(vm_error(
1893                "websocket_route: requires server handle and route path",
1894            ));
1895        }
1896        let server_id = handle_from_value(&args[0], "websocket_route")?;
1897        let path = args[1].display();
1898        if path.is_empty() || !path.starts_with('/') {
1899            return Err(vm_error("websocket_route: path must start with '/'"));
1900        }
1901        let options = get_options_arg(args, 2);
1902        vm_websocket_route(&server_id, &path, &options)
1903    });
1904
1905    vm.register_async_builtin("websocket_accept", |args| async move {
1906        let Some(handle) = args.first() else {
1907            return Err(vm_error("websocket_accept: requires a server handle"));
1908        };
1909        let server_id = handle_from_value(handle, "websocket_accept")?;
1910        let timeout_ms = receive_timeout_arg(&args, 1);
1911        vm_websocket_accept(&server_id, timeout_ms).await
1912    });
1913
1914    vm.register_async_builtin("websocket_send", |args| async move {
1915        if args.len() < 2 {
1916            return Err(vm_error(
1917                "websocket_send: requires socket handle and message",
1918            ));
1919        }
1920        let socket_id = handle_from_value(&args[0], "websocket_send")?;
1921        let message = args[1].clone();
1922        let options = get_options_arg(&args, 2);
1923        vm_websocket_send(&socket_id, message, &options).await
1924    });
1925
1926    vm.register_async_builtin("websocket_receive", |args| async move {
1927        let Some(handle) = args.first() else {
1928            return Err(vm_error("websocket_receive: requires a socket handle"));
1929        };
1930        let socket_id = handle_from_value(handle, "websocket_receive")?;
1931        let timeout_ms = receive_timeout_arg(&args, 1);
1932        vm_websocket_receive(&socket_id, timeout_ms).await
1933    });
1934
1935    vm.register_async_builtin("websocket_close", |args| async move {
1936        let Some(handle) = args.first() else {
1937            return Err(vm_error("websocket_close: requires a socket handle"));
1938        };
1939        let socket_id = handle_from_value(handle, "websocket_close")?;
1940        vm_websocket_close(&socket_id).await
1941    });
1942
1943    vm.register_builtin("websocket_server_close", |args, _out| {
1944        let Some(handle) = args.first() else {
1945            return Err(vm_error("websocket_server_close: requires a server handle"));
1946        };
1947        let server_id = handle_from_value(handle, "websocket_server_close")?;
1948        vm_websocket_server_close(&server_id)
1949    });
1950
1951    vm.register_builtin("transport_mock_clear", |_args, _out| {
1952        HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1953        SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1954        SSE_HANDLES.with(|handles| handles.borrow_mut().clear());
1955        WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1956        WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
1957        WEBSOCKET_SERVERS.with(|servers| {
1958            let mut servers = servers.borrow_mut();
1959            for server in servers.values() {
1960                server.running.store(false, Ordering::SeqCst);
1961                let _ = TcpStream::connect(&server.addr);
1962            }
1963            servers.clear();
1964        });
1965        TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
1966        Ok(VmValue::Nil)
1967    });
1968
1969    vm.register_builtin("transport_mock_calls", |_args, _out| {
1970        let calls = TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow().clone());
1971        let values = calls
1972            .iter()
1973            .map(transport_mock_call_value)
1974            .collect::<Vec<_>>();
1975        Ok(VmValue::List(Rc::new(values)))
1976    });
1977}
1978
1979fn http_server_tls_config_value(
1980    mode: &str,
1981    terminate_tls: bool,
1982    scheme: &str,
1983    hsts: bool,
1984    extra: BTreeMap<String, VmValue>,
1985) -> VmValue {
1986    let mut dict = BTreeMap::new();
1987    dict.insert("mode".to_string(), VmValue::String(Rc::from(mode)));
1988    dict.insert("terminate_tls".to_string(), VmValue::Bool(terminate_tls));
1989    dict.insert("scheme".to_string(), VmValue::String(Rc::from(scheme)));
1990    dict.insert("hsts".to_string(), VmValue::Bool(hsts));
1991    for (key, value) in extra {
1992        dict.insert(key, value);
1993    }
1994    VmValue::Dict(Rc::new(dict))
1995}
1996
1997fn hsts_options(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
1998    let mut hsts = BTreeMap::new();
1999    hsts.insert(
2000        "hsts_max_age_seconds".to_string(),
2001        VmValue::Int(vm_get_int_option(
2002            options,
2003            "hsts_max_age_seconds",
2004            31_536_000,
2005        )),
2006    );
2007    hsts.insert(
2008        "hsts_include_subdomains".to_string(),
2009        VmValue::Bool(vm_get_bool_option(
2010            options,
2011            "hsts_include_subdomains",
2012            false,
2013        )),
2014    );
2015    hsts.insert(
2016        "hsts_preload".to_string(),
2017        VmValue::Bool(vm_get_bool_option(options, "hsts_preload", false)),
2018    );
2019    hsts
2020}
2021
2022fn http_server_security_headers(config: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2023    let hsts_enabled = vm_get_bool_option(config, "hsts", false);
2024    if !hsts_enabled {
2025        return BTreeMap::new();
2026    }
2027    let mut value = format!(
2028        "max-age={}",
2029        vm_get_int_option(config, "hsts_max_age_seconds", 31_536_000).max(0)
2030    );
2031    if vm_get_bool_option(config, "hsts_include_subdomains", false) {
2032        value.push_str("; includeSubDomains");
2033    }
2034    if vm_get_bool_option(config, "hsts_preload", false) {
2035        value.push_str("; preload");
2036    }
2037    BTreeMap::from([(
2038        "strict-transport-security".to_string(),
2039        VmValue::String(Rc::from(value)),
2040    )])
2041}
2042
2043fn tls_hosts_arg(value: Option<&VmValue>) -> Result<Vec<String>, VmError> {
2044    match value {
2045        None | Some(VmValue::Nil) => Ok(vec!["localhost".to_string(), "127.0.0.1".to_string()]),
2046        Some(VmValue::List(hosts)) => {
2047            let mut parsed = Vec::new();
2048            for host in hosts.iter() {
2049                let host = host.display();
2050                if host.is_empty() {
2051                    return Err(vm_error(
2052                        "http_server_tls_self_signed_dev: host names must be non-empty",
2053                    ));
2054                }
2055                parsed.push(host);
2056            }
2057            if parsed.is_empty() {
2058                return Err(vm_error(
2059                    "http_server_tls_self_signed_dev: host list must not be empty",
2060                ));
2061            }
2062            Ok(parsed)
2063        }
2064        Some(other) => {
2065            let host = other.display();
2066            if host.is_empty() {
2067                return Err(vm_error(
2068                    "http_server_tls_self_signed_dev: host name must be non-empty",
2069                ));
2070            }
2071            Ok(vec![host])
2072        }
2073    }
2074}
2075
2076fn vm_get_int_option(options: &BTreeMap<String, VmValue>, key: &str, default: i64) -> i64 {
2077    options.get(key).and_then(|v| v.as_int()).unwrap_or(default)
2078}
2079
2080fn vm_get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
2081    match options.get(key) {
2082        Some(VmValue::Bool(b)) => *b,
2083        _ => default,
2084    }
2085}
2086
2087fn vm_get_int_option_prefer(
2088    options: &BTreeMap<String, VmValue>,
2089    canonical: &str,
2090    alias: &str,
2091    default: i64,
2092) -> i64 {
2093    options
2094        .get(canonical)
2095        .and_then(|value| value.as_int())
2096        .or_else(|| options.get(alias).and_then(|value| value.as_int()))
2097        .unwrap_or(default)
2098}
2099
2100fn vm_get_optional_int_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<u64> {
2101    options
2102        .get(key)
2103        .and_then(|value| value.as_int())
2104        .map(|value| value.max(0) as u64)
2105}
2106
2107fn string_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
2108    options
2109        .get(key)
2110        .map(|value| value.display())
2111        .filter(|value| !value.is_empty())
2112}
2113
2114fn parse_proxy_config(options: &BTreeMap<String, VmValue>) -> Option<HttpProxyConfig> {
2115    let proxy = options.get("proxy")?;
2116    let (url, no_proxy) = match proxy {
2117        VmValue::Dict(dict) => (
2118            dict.get("url")
2119                .map(|value| value.display())
2120                .filter(|value| !value.is_empty())?,
2121            dict.get("no_proxy")
2122                .map(|value| value.display())
2123                .filter(|value| !value.is_empty()),
2124        ),
2125        other => (other.display(), None),
2126    };
2127    if url.is_empty() {
2128        return None;
2129    }
2130    let auth = options
2131        .get("proxy_auth")
2132        .and_then(|value| value.as_dict())
2133        .map(|dict| {
2134            (
2135                dict.get("user")
2136                    .map(|value| value.display())
2137                    .unwrap_or_default(),
2138                dict.get("pass")
2139                    .or_else(|| dict.get("password"))
2140                    .map(|value| value.display())
2141                    .unwrap_or_default(),
2142            )
2143        })
2144        .filter(|(user, _)| !user.is_empty());
2145    Some(HttpProxyConfig {
2146        url,
2147        auth,
2148        no_proxy,
2149    })
2150}
2151
2152fn parse_tls_config(options: &BTreeMap<String, VmValue>) -> HttpTlsConfig {
2153    let Some(tls) = options.get("tls").and_then(|value| value.as_dict()) else {
2154        return HttpTlsConfig::default();
2155    };
2156    let pinned_sha256 = match tls.get("pinned_sha256") {
2157        Some(VmValue::List(values)) => values
2158            .iter()
2159            .map(|value| value.display())
2160            .filter(|value| !value.is_empty())
2161            .collect(),
2162        Some(value) => {
2163            let value = value.display();
2164            if value.is_empty() {
2165                Vec::new()
2166            } else {
2167                vec![value]
2168            }
2169        }
2170        None => Vec::new(),
2171    };
2172    HttpTlsConfig {
2173        ca_bundle_path: string_option(tls, "ca_bundle_path"),
2174        client_cert_path: string_option(tls, "client_cert_path"),
2175        client_key_path: string_option(tls, "client_key_path"),
2176        client_identity_path: string_option(tls, "client_identity_path"),
2177        pinned_sha256,
2178    }
2179}
2180
2181fn parse_retry_statuses(options: &BTreeMap<String, VmValue>) -> Vec<u16> {
2182    match options.get("retry_on") {
2183        Some(VmValue::List(values)) => {
2184            let statuses: Vec<u16> = values
2185                .iter()
2186                .filter_map(|value| value.as_int())
2187                .filter(|status| (0..=u16::MAX as i64).contains(status))
2188                .map(|status| status as u16)
2189                .collect();
2190            if statuses.is_empty() {
2191                DEFAULT_RETRYABLE_STATUSES.to_vec()
2192            } else {
2193                statuses
2194            }
2195        }
2196        _ => DEFAULT_RETRYABLE_STATUSES.to_vec(),
2197    }
2198}
2199
2200fn parse_retry_methods(options: &BTreeMap<String, VmValue>) -> Vec<String> {
2201    match options.get("retry_methods") {
2202        Some(VmValue::List(values)) => {
2203            let methods: Vec<String> = values
2204                .iter()
2205                .map(|value| value.display().trim().to_ascii_uppercase())
2206                .filter(|value| !value.is_empty())
2207                .collect();
2208            if methods.is_empty() {
2209                DEFAULT_RETRYABLE_METHODS
2210                    .iter()
2211                    .map(|method| (*method).to_string())
2212                    .collect()
2213            } else {
2214                methods
2215            }
2216        }
2217        _ => DEFAULT_RETRYABLE_METHODS
2218            .iter()
2219            .map(|method| (*method).to_string())
2220            .collect(),
2221    }
2222}
2223
2224fn parse_http_options(options: &BTreeMap<String, VmValue>) -> HttpRequestConfig {
2225    let total_timeout_ms = vm_get_int_option(options, "total_timeout_ms", -1);
2226    let total_timeout_ms = if total_timeout_ms >= 0 {
2227        total_timeout_ms as u64
2228    } else {
2229        vm_get_int_option_prefer(options, "timeout_ms", "timeout", DEFAULT_TIMEOUT_MS as i64).max(0)
2230            as u64
2231    };
2232    let retry_options = options.get("retry").and_then(|value| value.as_dict());
2233    let retry_max = retry_options
2234        .and_then(|retry| retry.get("max"))
2235        .and_then(|value| value.as_int())
2236        .unwrap_or_else(|| vm_get_int_option(options, "retries", 0))
2237        .max(0) as u32;
2238    let retry_backoff_ms = retry_options
2239        .and_then(|retry| retry.get("backoff_ms"))
2240        .and_then(|value| value.as_int())
2241        .unwrap_or_else(|| vm_get_int_option(options, "backoff", DEFAULT_BACKOFF_MS as i64))
2242        .max(0) as u64;
2243    let respect_retry_after = vm_get_bool_option(options, "respect_retry_after", true);
2244    let follow_redirects = vm_get_bool_option(options, "follow_redirects", true);
2245    let max_redirects = vm_get_int_option(options, "max_redirects", 10).max(0) as usize;
2246
2247    HttpRequestConfig {
2248        total_timeout_ms,
2249        connect_timeout_ms: vm_get_optional_int_option(options, "connect_timeout_ms"),
2250        read_timeout_ms: vm_get_optional_int_option(options, "read_timeout_ms"),
2251        retry: RetryConfig {
2252            max: retry_max,
2253            backoff_ms: retry_backoff_ms,
2254            retryable_statuses: parse_retry_statuses(options),
2255            retryable_methods: parse_retry_methods(options),
2256            respect_retry_after,
2257        },
2258        follow_redirects,
2259        max_redirects,
2260        proxy: parse_proxy_config(options),
2261        tls: parse_tls_config(options),
2262        decompress: vm_get_bool_option(options, "decompress", true),
2263    }
2264}
2265
2266fn http_client_key(config: &HttpRequestConfig) -> String {
2267    format!(
2268        "follow_redirects={};max_redirects={};connect_timeout={:?};read_timeout={:?};proxy={};proxy_auth={};no_proxy={};ca={};client_cert={};client_key={};identity={};pins={};decompress={}",
2269        config.follow_redirects,
2270        config.max_redirects,
2271        config.connect_timeout_ms,
2272        config.read_timeout_ms,
2273        config
2274            .proxy
2275            .as_ref()
2276            .map(|proxy| proxy.url.as_str())
2277            .unwrap_or(""),
2278        config
2279            .proxy
2280            .as_ref()
2281            .and_then(|proxy| proxy.auth.as_ref())
2282            .map(|(user, _)| user.as_str())
2283            .unwrap_or(""),
2284        config
2285            .proxy
2286            .as_ref()
2287            .and_then(|proxy| proxy.no_proxy.as_deref())
2288            .unwrap_or(""),
2289        config.tls.ca_bundle_path.as_deref().unwrap_or(""),
2290        config.tls.client_cert_path.as_deref().unwrap_or(""),
2291        config.tls.client_key_path.as_deref().unwrap_or(""),
2292        config.tls.client_identity_path.as_deref().unwrap_or(""),
2293        config.tls.pinned_sha256.join(","),
2294        config.decompress,
2295    )
2296}
2297
2298fn build_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2299    let redirect_policy = if config.follow_redirects {
2300        let max_redirects = config.max_redirects;
2301        reqwest::redirect::Policy::custom(move |attempt| {
2302            if attempt.previous().len() >= max_redirects {
2303                attempt.error("too many redirects")
2304            } else if crate::egress::redirect_url_allowed("http_redirect", attempt.url().as_str()) {
2305                attempt.follow()
2306            } else {
2307                attempt.error("egress policy blocked redirect target")
2308            }
2309        })
2310    } else {
2311        reqwest::redirect::Policy::none()
2312    };
2313
2314    let mut builder = reqwest::Client::builder().redirect(redirect_policy);
2315    if let Some(ms) = config.connect_timeout_ms {
2316        builder = builder.connect_timeout(Duration::from_millis(ms));
2317    }
2318    if let Some(ms) = config.read_timeout_ms {
2319        builder = builder.read_timeout(Duration::from_millis(ms));
2320    }
2321    if !config.decompress {
2322        builder = builder.no_gzip().no_brotli().no_deflate().no_zstd();
2323    }
2324    if let Some(proxy_config) = &config.proxy {
2325        let mut proxy = reqwest::Proxy::all(&proxy_config.url)
2326            .map_err(|e| vm_error(format!("http: invalid proxy '{}': {e}", proxy_config.url)))?;
2327        if let Some((user, pass)) = &proxy_config.auth {
2328            proxy = proxy.basic_auth(user, pass);
2329        }
2330        if let Some(no_proxy) = &proxy_config.no_proxy {
2331            proxy = proxy.no_proxy(reqwest::NoProxy::from_string(no_proxy));
2332        }
2333        builder = builder.proxy(proxy);
2334    }
2335    builder = configure_tls(builder, &config.tls)?;
2336    builder
2337        .build()
2338        .map_err(|e| vm_error(format!("http: failed to build client: {e}")))
2339}
2340
2341fn configure_tls(
2342    mut builder: reqwest::ClientBuilder,
2343    tls: &HttpTlsConfig,
2344) -> Result<reqwest::ClientBuilder, VmError> {
2345    if let Some(path) = &tls.ca_bundle_path {
2346        let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2347        let bytes = std::fs::read(&resolved).map_err(|error| {
2348            vm_error(format!(
2349                "http: failed to read CA bundle {}: {error}",
2350                resolved.display()
2351            ))
2352        })?;
2353        match reqwest::Certificate::from_pem_bundle(&bytes) {
2354            Ok(certs) => {
2355                for cert in certs {
2356                    builder = builder.add_root_certificate(cert);
2357                }
2358            }
2359            Err(pem_error) => {
2360                let cert = reqwest::Certificate::from_der(&bytes).map_err(|der_error| {
2361                    vm_error(format!(
2362                        "http: failed to parse CA bundle {} as PEM ({pem_error}) or DER ({der_error})",
2363                        resolved.display()
2364                    ))
2365                })?;
2366                builder = builder.add_root_certificate(cert);
2367            }
2368        }
2369    }
2370
2371    if let Some(path) = &tls.client_identity_path {
2372        let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2373        let bytes = std::fs::read(&resolved).map_err(|error| {
2374            vm_error(format!(
2375                "http: failed to read client identity {}: {error}",
2376                resolved.display()
2377            ))
2378        })?;
2379        let identity = reqwest::Identity::from_pem(&bytes).map_err(|error| {
2380            vm_error(format!(
2381                "http: failed to parse client identity {}: {error}",
2382                resolved.display()
2383            ))
2384        })?;
2385        builder = builder.identity(identity);
2386    } else if let Some(cert_path) = &tls.client_cert_path {
2387        let cert = {
2388            let resolved = resolve_http_path(
2389                "http tls",
2390                cert_path,
2391                crate::stdlib::sandbox::FsAccess::Read,
2392            )?;
2393            std::fs::read(&resolved).map_err(|error| {
2394                vm_error(format!(
2395                    "http: failed to read client certificate {}: {error}",
2396                    resolved.display()
2397                ))
2398            })?
2399        };
2400        let mut identity_pem = cert;
2401        if let Some(key_path) = &tls.client_key_path {
2402            let resolved =
2403                resolve_http_path("http tls", key_path, crate::stdlib::sandbox::FsAccess::Read)?;
2404            let key = std::fs::read(&resolved).map_err(|error| {
2405                vm_error(format!(
2406                    "http: failed to read client key {}: {error}",
2407                    resolved.display()
2408                ))
2409            })?;
2410            identity_pem.extend_from_slice(b"\n");
2411            identity_pem.extend_from_slice(&key);
2412        }
2413        let identity = reqwest::Identity::from_pem(&identity_pem)
2414            .map_err(|error| vm_error(format!("http: failed to parse client identity: {error}")))?;
2415        builder = builder.identity(identity);
2416    }
2417
2418    if !tls.pinned_sha256.is_empty() {
2419        builder = builder.tls_info(true);
2420    }
2421    Ok(builder)
2422}
2423
2424fn pooled_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2425    let key = http_client_key(config);
2426    if let Some(client) = HTTP_CLIENTS.with(|clients| clients.borrow().get(&key).cloned()) {
2427        return Ok(client);
2428    }
2429
2430    let client = build_http_client(config)?;
2431    HTTP_CLIENTS.with(|clients| {
2432        clients.borrow_mut().insert(key, client.clone());
2433    });
2434    Ok(client)
2435}
2436
2437fn normalize_pin(value: &str) -> String {
2438    let trimmed = value.trim();
2439    let trimmed = trimmed
2440        .strip_prefix("sha256/")
2441        .or_else(|| trimmed.strip_prefix("sha256:"))
2442        .unwrap_or(trimmed);
2443    let compact = trimmed.replace(':', "");
2444    if !compact.is_empty() && compact.chars().all(|ch| ch.is_ascii_hexdigit()) {
2445        compact.to_ascii_lowercase()
2446    } else {
2447        compact
2448    }
2449}
2450
2451fn verify_tls_pin(response: &reqwest::Response, pins: &[String]) -> Result<(), VmError> {
2452    if pins.is_empty() {
2453        return Ok(());
2454    }
2455    let Some(info) = response.extensions().get::<reqwest::tls::TlsInfo>() else {
2456        return Err(vm_error(
2457            "http: TLS pinning requested but TLS info is unavailable",
2458        ));
2459    };
2460    let Some(cert_der) = info.peer_certificate() else {
2461        return Err(vm_error(
2462            "http: TLS pinning requested but no peer certificate was presented",
2463        ));
2464    };
2465    let (_, cert) = X509Certificate::from_der(cert_der)
2466        .map_err(|error| vm_error(format!("http: failed to parse peer certificate: {error}")))?;
2467    let digest = Sha256::digest(cert.tbs_certificate.subject_pki.raw);
2468    let hex_pin = hex::encode(digest.as_slice());
2469    let base64_pin = base64::engine::general_purpose::STANDARD.encode(digest);
2470    let base64url_pin = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
2471    let wanted = pins
2472        .iter()
2473        .map(|pin| normalize_pin(pin))
2474        .collect::<Vec<_>>();
2475    if wanted
2476        .iter()
2477        .any(|pin| pin == &hex_pin || pin == &base64_pin || pin == &base64url_pin)
2478    {
2479        Ok(())
2480    } else {
2481        Err(vm_error("http: TLS SPKI pin mismatch"))
2482    }
2483}
2484
2485fn parse_http_request_parts(
2486    method: &str,
2487    options: &BTreeMap<String, VmValue>,
2488) -> Result<HttpRequestParts, VmError> {
2489    let req_method = method
2490        .parse::<reqwest::Method>()
2491        .map_err(|e| vm_error(format!("http: invalid method '{method}': {e}")))?;
2492
2493    let mut header_map = reqwest::header::HeaderMap::new();
2494    let mut recorded_headers = BTreeMap::new();
2495
2496    if let Some(auth_val) = options.get("auth") {
2497        match auth_val {
2498            VmValue::String(s) => {
2499                let hv = reqwest::header::HeaderValue::from_str(s)
2500                    .map_err(|e| vm_error(format!("http: invalid auth header value: {e}")))?;
2501                header_map.insert(reqwest::header::AUTHORIZATION, hv);
2502                recorded_headers.insert(
2503                    "authorization".to_string(),
2504                    VmValue::String(Rc::from(s.as_ref())),
2505                );
2506            }
2507            VmValue::Dict(d) => {
2508                if let Some(bearer) = d.get("bearer") {
2509                    let token = bearer.display();
2510                    let authorization = format!("Bearer {token}");
2511                    let hv = reqwest::header::HeaderValue::from_str(&authorization)
2512                        .map_err(|e| vm_error(format!("http: invalid bearer token: {e}")))?;
2513                    header_map.insert(reqwest::header::AUTHORIZATION, hv);
2514                    recorded_headers.insert(
2515                        "authorization".to_string(),
2516                        VmValue::String(Rc::from(authorization)),
2517                    );
2518                } else if let Some(VmValue::Dict(basic)) = d.get("basic") {
2519                    let user = basic.get("user").map(|v| v.display()).unwrap_or_default();
2520                    let password = basic
2521                        .get("password")
2522                        .map(|v| v.display())
2523                        .unwrap_or_default();
2524                    use base64::Engine;
2525                    let encoded = base64::engine::general_purpose::STANDARD
2526                        .encode(format!("{user}:{password}"));
2527                    let authorization = format!("Basic {encoded}");
2528                    let hv = reqwest::header::HeaderValue::from_str(&authorization)
2529                        .map_err(|e| vm_error(format!("http: invalid basic auth: {e}")))?;
2530                    header_map.insert(reqwest::header::AUTHORIZATION, hv);
2531                    recorded_headers.insert(
2532                        "authorization".to_string(),
2533                        VmValue::String(Rc::from(authorization)),
2534                    );
2535                }
2536            }
2537            _ => {}
2538        }
2539    }
2540
2541    if let Some(VmValue::Dict(hdrs)) = options.get("headers") {
2542        for (k, v) in hdrs.iter() {
2543            let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())
2544                .map_err(|e| vm_error(format!("http: invalid header name '{k}': {e}")))?;
2545            let val = reqwest::header::HeaderValue::from_str(&v.display())
2546                .map_err(|e| vm_error(format!("http: invalid header value for '{k}': {e}")))?;
2547            header_map.insert(name, val);
2548            recorded_headers.insert(
2549                k.to_ascii_lowercase(),
2550                VmValue::String(Rc::from(v.display())),
2551            );
2552        }
2553    }
2554
2555    let multipart = parse_multipart_request(options)?;
2556    if multipart.is_some() {
2557        if options.contains_key("body") {
2558            return Err(vm_error(
2559                "http: body and multipart options are mutually exclusive",
2560            ));
2561        }
2562        recorded_headers.insert(
2563            "content-type".to_string(),
2564            VmValue::String(Rc::from(format!(
2565                "multipart/form-data; boundary={MULTIPART_MOCK_BOUNDARY}"
2566            ))),
2567        );
2568    }
2569
2570    Ok(HttpRequestParts {
2571        method: req_method,
2572        headers: header_map,
2573        recorded_headers,
2574        body: if multipart.is_some() {
2575            multipart.as_ref().map(|request| request.mock_body.clone())
2576        } else {
2577            options.get("body").map(|v| v.display())
2578        },
2579        multipart,
2580    })
2581}
2582
2583fn final_http_url(
2584    url: &str,
2585    options: &BTreeMap<String, VmValue>,
2586    builtin: &str,
2587) -> Result<String, VmError> {
2588    let Some(query) = options.get("query").and_then(VmValue::as_dict) else {
2589        return Ok(url.to_string());
2590    };
2591    let mut parsed = url::Url::parse(url)
2592        .map_err(|error| vm_error(format!("{builtin}: invalid URL '{url}': {error}")))?;
2593    {
2594        let mut pairs = parsed.query_pairs_mut();
2595        for (key, value) in query.iter() {
2596            if !matches!(value, VmValue::Nil) {
2597                pairs.append_pair(key, &value.display());
2598            }
2599        }
2600    }
2601    Ok(parsed.to_string())
2602}
2603
2604fn session_from_options(options: &BTreeMap<String, VmValue>) -> Option<String> {
2605    options
2606        .get("session")
2607        .and_then(|value| handle_from_value(value, "http_request").ok())
2608}
2609
2610fn parse_mock_stream_event(value: &VmValue) -> MockStreamEvent {
2611    match value {
2612        VmValue::Dict(dict) => MockStreamEvent {
2613            event_type: dict
2614                .get("event")
2615                .or_else(|| dict.get("type"))
2616                .map(|value| value.display())
2617                .filter(|value| !value.is_empty())
2618                .unwrap_or_else(|| "message".to_string()),
2619            data: dict
2620                .get("data")
2621                .map(|value| value.display())
2622                .unwrap_or_default(),
2623            id: dict
2624                .get("id")
2625                .map(|value| value.display())
2626                .filter(|value| !value.is_empty()),
2627            retry_ms: dict.get("retry_ms").and_then(|value| value.as_int()),
2628        },
2629        _ => MockStreamEvent {
2630            event_type: "message".to_string(),
2631            data: value.display(),
2632            id: None,
2633            retry_ms: None,
2634        },
2635    }
2636}
2637
2638fn parse_mock_stream_events(value: Option<&VmValue>) -> Vec<MockStreamEvent> {
2639    let Some(value) = value else {
2640        return Vec::new();
2641    };
2642    match value {
2643        VmValue::Dict(dict) => dict
2644            .get("events")
2645            .and_then(|events| match events {
2646                VmValue::List(items) => Some(items.iter().map(parse_mock_stream_event).collect()),
2647                _ => None,
2648            })
2649            .unwrap_or_default(),
2650        VmValue::List(items) => items.iter().map(parse_mock_stream_event).collect(),
2651        other => vec![parse_mock_stream_event(other)],
2652    }
2653}
2654
2655fn sse_event_value(event: &MockStreamEvent) -> VmValue {
2656    let mut dict = BTreeMap::new();
2657    dict.insert("type".to_string(), VmValue::String(Rc::from("event")));
2658    dict.insert(
2659        "event".to_string(),
2660        VmValue::String(Rc::from(event.event_type.as_str())),
2661    );
2662    dict.insert(
2663        "data".to_string(),
2664        VmValue::String(Rc::from(event.data.as_str())),
2665    );
2666    dict.insert(
2667        "id".to_string(),
2668        event
2669            .id
2670            .as_deref()
2671            .map(|id| VmValue::String(Rc::from(id)))
2672            .unwrap_or(VmValue::Nil),
2673    );
2674    dict.insert(
2675        "retry_ms".to_string(),
2676        event.retry_ms.map(VmValue::Int).unwrap_or(VmValue::Nil),
2677    );
2678    VmValue::Dict(Rc::new(dict))
2679}
2680
2681fn sse_server_response_value(id: &str, handle: &SseServerHandle) -> VmValue {
2682    let mut dict = BTreeMap::new();
2683    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2684    dict.insert(
2685        "type".to_string(),
2686        VmValue::String(Rc::from("sse_response")),
2687    );
2688    dict.insert("status".to_string(), VmValue::Int(handle.status));
2689    dict.insert(
2690        "headers".to_string(),
2691        VmValue::Dict(Rc::new(handle.headers.clone())),
2692    );
2693    dict.insert("body".to_string(), VmValue::Nil);
2694    dict.insert("streaming".to_string(), VmValue::Bool(true));
2695    dict.insert(
2696        "max_event_bytes".to_string(),
2697        VmValue::Int(handle.max_event_bytes as i64),
2698    );
2699    dict.insert(
2700        "max_buffered_events".to_string(),
2701        VmValue::Int(handle.max_buffered_events as i64),
2702    );
2703    VmValue::Dict(Rc::new(dict))
2704}
2705
2706fn default_sse_response_headers() -> BTreeMap<String, VmValue> {
2707    BTreeMap::from([
2708        (
2709            "content-type".to_string(),
2710            VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2711        ),
2712        (
2713            "cache-control".to_string(),
2714            VmValue::String(Rc::from("no-cache")),
2715        ),
2716        (
2717            "connection".to_string(),
2718            VmValue::String(Rc::from("keep-alive")),
2719        ),
2720        (
2721            "x-accel-buffering".to_string(),
2722            VmValue::String(Rc::from("no")),
2723        ),
2724    ])
2725}
2726
2727fn sse_response_headers(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2728    let mut headers = default_sse_response_headers();
2729    if let Some(VmValue::Dict(custom)) = options.get("headers") {
2730        for (name, value) in custom.iter() {
2731            headers.retain(|existing, _| !existing.eq_ignore_ascii_case(name));
2732            headers.insert(name.clone(), VmValue::String(Rc::from(value.display())));
2733        }
2734    }
2735    if !headers
2736        .keys()
2737        .any(|name| name.eq_ignore_ascii_case("content-type"))
2738    {
2739        headers.insert(
2740            "content-type".to_string(),
2741            VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2742        );
2743    }
2744    headers
2745}
2746
2747fn validate_sse_field(field: &str, value: &str) -> Result<(), VmError> {
2748    if value.contains('\n') || value.contains('\r') {
2749        return Err(vm_error(format!(
2750            "sse_event: {field} must not contain newlines"
2751        )));
2752    }
2753    Ok(())
2754}
2755
2756fn push_sse_multiline_field(frame: &mut String, field: &str, value: &str) {
2757    let normalized = value.replace("\r\n", "\n").replace('\r', "\n");
2758    if normalized.is_empty() {
2759        frame.push_str(field);
2760        frame.push_str(": \n");
2761        return;
2762    }
2763    for line in normalized.split('\n') {
2764        frame.push_str(field);
2765        frame.push_str(": ");
2766        frame.push_str(line);
2767        frame.push('\n');
2768    }
2769}
2770
2771fn vm_sse_event_frame(
2772    event: &VmValue,
2773    options: &BTreeMap<String, VmValue>,
2774) -> Result<String, VmError> {
2775    let mut frame = String::new();
2776    let mut has_event_payload = false;
2777
2778    match event {
2779        VmValue::Dict(dict) => {
2780            if let Some(comment) = dict.get("comment").or_else(|| options.get("comment")) {
2781                push_sse_comment(&mut frame, &comment.display());
2782            }
2783            if let Some(id) = dict.get("id").or_else(|| options.get("id")) {
2784                let id = id.display();
2785                validate_sse_field("id", &id)?;
2786                frame.push_str("id: ");
2787                frame.push_str(&id);
2788                frame.push('\n');
2789            }
2790            if let Some(event_type) = dict
2791                .get("event")
2792                .or_else(|| dict.get("name"))
2793                .or_else(|| options.get("event"))
2794            {
2795                let event_type = event_type.display();
2796                validate_sse_field("event", &event_type)?;
2797                frame.push_str("event: ");
2798                frame.push_str(&event_type);
2799                frame.push('\n');
2800                has_event_payload = true;
2801            }
2802            if let Some(retry) = dict
2803                .get("retry")
2804                .or_else(|| dict.get("retry_ms"))
2805                .or_else(|| options.get("retry"))
2806                .or_else(|| options.get("retry_ms"))
2807            {
2808                let retry_ms = retry.as_int().ok_or_else(|| {
2809                    vm_error("sse_event: retry/retry_ms must be a non-negative integer")
2810                })?;
2811                if retry_ms < 0 {
2812                    return Err(vm_error(
2813                        "sse_event: retry/retry_ms must be a non-negative integer",
2814                    ));
2815                }
2816                frame.push_str("retry: ");
2817                frame.push_str(&retry_ms.to_string());
2818                frame.push('\n');
2819                has_event_payload = true;
2820            }
2821            if let Some(data) = dict.get("data").or_else(|| options.get("data")) {
2822                push_sse_multiline_field(&mut frame, "data", &data.display());
2823                has_event_payload = true;
2824            } else if !frame.is_empty() && !dict.contains_key("comment") {
2825                push_sse_multiline_field(&mut frame, "data", "");
2826                has_event_payload = true;
2827            }
2828        }
2829        other => {
2830            if let Some(comment) = options.get("comment") {
2831                push_sse_comment(&mut frame, &comment.display());
2832            }
2833            if let Some(id) = options.get("id") {
2834                let id = id.display();
2835                validate_sse_field("id", &id)?;
2836                frame.push_str("id: ");
2837                frame.push_str(&id);
2838                frame.push('\n');
2839            }
2840            if let Some(event_type) = options.get("event") {
2841                let event_type = event_type.display();
2842                validate_sse_field("event", &event_type)?;
2843                frame.push_str("event: ");
2844                frame.push_str(&event_type);
2845                frame.push('\n');
2846            }
2847            if let Some(retry) = options.get("retry").or_else(|| options.get("retry_ms")) {
2848                let retry_ms = retry.as_int().ok_or_else(|| {
2849                    vm_error("sse_event: retry/retry_ms must be a non-negative integer")
2850                })?;
2851                if retry_ms < 0 {
2852                    return Err(vm_error(
2853                        "sse_event: retry/retry_ms must be a non-negative integer",
2854                    ));
2855                }
2856                frame.push_str("retry: ");
2857                frame.push_str(&retry_ms.to_string());
2858                frame.push('\n');
2859            }
2860            push_sse_multiline_field(&mut frame, "data", &other.display());
2861            has_event_payload = true;
2862        }
2863    }
2864
2865    if frame.is_empty() || !has_event_payload && !frame.starts_with(':') {
2866        push_sse_comment(&mut frame, "");
2867    }
2868    frame.push('\n');
2869    Ok(frame)
2870}
2871
2872fn push_sse_comment(frame: &mut String, comment: &str) {
2873    let normalized = comment.replace("\r\n", "\n").replace('\r', "\n");
2874    if normalized.is_empty() {
2875        frame.push_str(":\n");
2876        return;
2877    }
2878    for line in normalized.split('\n') {
2879        frame.push_str(": ");
2880        frame.push_str(line);
2881        frame.push('\n');
2882    }
2883}
2884
2885fn vm_sse_server_response(options: &BTreeMap<String, VmValue>) -> Result<VmValue, VmError> {
2886    let id = next_transport_handle("sse-server");
2887    let status = options
2888        .get("status")
2889        .and_then(|value| value.as_int())
2890        .unwrap_or(200)
2891        .clamp(100, 599);
2892    let handle = SseServerHandle {
2893        status,
2894        headers: sse_response_headers(options),
2895        frames: VecDeque::new(),
2896        max_event_bytes: transport_limit_option(
2897            options,
2898            "max_event_bytes",
2899            DEFAULT_MAX_MESSAGE_BYTES,
2900        )
2901        .max(1),
2902        max_buffered_events: transport_limit_option(
2903            options,
2904            "max_buffered_events",
2905            DEFAULT_MAX_STREAM_EVENTS,
2906        )
2907        .max(1),
2908        sent_events: 0,
2909        flushed_events: 0,
2910        closed: false,
2911        disconnected: false,
2912        cancelled: false,
2913        cancel_reason: None,
2914    };
2915    let value = sse_server_response_value(&id, &handle);
2916    SSE_SERVER_HANDLES.with(|handles| {
2917        let mut handles = handles.borrow_mut();
2918        if handles.len() >= MAX_SSE_SERVER_STREAMS {
2919            return Err(vm_error(format!(
2920                "sse_server_response: maximum open streams ({MAX_SSE_SERVER_STREAMS}) reached"
2921            )));
2922        }
2923        handles.insert(id, handle);
2924        Ok(())
2925    })?;
2926    Ok(value)
2927}
2928
2929fn sse_server_status_value(id: &str, handle: &SseServerHandle) -> VmValue {
2930    let mut dict = BTreeMap::new();
2931    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2932    dict.insert("status".to_string(), VmValue::Int(handle.status));
2933    dict.insert(
2934        "headers".to_string(),
2935        VmValue::Dict(Rc::new(handle.headers.clone())),
2936    );
2937    dict.insert(
2938        "buffered_events".to_string(),
2939        VmValue::Int(handle.frames.len() as i64),
2940    );
2941    dict.insert(
2942        "sent_events".to_string(),
2943        VmValue::Int(handle.sent_events as i64),
2944    );
2945    dict.insert(
2946        "flushed_events".to_string(),
2947        VmValue::Int(handle.flushed_events as i64),
2948    );
2949    dict.insert("closed".to_string(), VmValue::Bool(handle.closed));
2950    dict.insert(
2951        "disconnected".to_string(),
2952        VmValue::Bool(handle.disconnected),
2953    );
2954    dict.insert("cancelled".to_string(), VmValue::Bool(handle.cancelled));
2955    dict.insert(
2956        "cancel_reason".to_string(),
2957        handle
2958            .cancel_reason
2959            .as_deref()
2960            .map(|reason| VmValue::String(Rc::from(reason)))
2961            .unwrap_or(VmValue::Nil),
2962    );
2963    dict.insert(
2964        "max_event_bytes".to_string(),
2965        VmValue::Int(handle.max_event_bytes as i64),
2966    );
2967    dict.insert(
2968        "max_buffered_events".to_string(),
2969        VmValue::Int(handle.max_buffered_events as i64),
2970    );
2971    VmValue::Dict(Rc::new(dict))
2972}
2973
2974fn vm_sse_server_status(stream_id: &str) -> Result<VmValue, VmError> {
2975    SSE_SERVER_HANDLES.with(|handles| {
2976        handles
2977            .borrow()
2978            .get(stream_id)
2979            .map(|handle| sse_server_status_value(stream_id, handle))
2980            .ok_or_else(|| vm_error(format!("sse_server_status: unknown stream '{stream_id}'")))
2981    })
2982}
2983
2984fn vm_sse_server_send(
2985    stream_id: &str,
2986    event: &VmValue,
2987    options: &BTreeMap<String, VmValue>,
2988) -> Result<VmValue, VmError> {
2989    let frame = vm_sse_event_frame(event, options)?;
2990    SSE_SERVER_HANDLES.with(|handles| {
2991        let mut handles = handles.borrow_mut();
2992        let handle = handles
2993            .get_mut(stream_id)
2994            .ok_or_else(|| vm_error(format!("sse_server_send: unknown stream '{stream_id}'")))?;
2995        if handle.closed || handle.cancelled || handle.disconnected {
2996            return Ok(VmValue::Bool(false));
2997        }
2998        if frame.len() > handle.max_event_bytes {
2999            return Err(vm_error(format!(
3000                "sse_server_send: event exceeded max_event_bytes ({})",
3001                handle.max_event_bytes
3002            )));
3003        }
3004        if handle.frames.len() >= handle.max_buffered_events {
3005            return Err(vm_error(format!(
3006                "sse_server_send: buffered events exceeded max_buffered_events ({})",
3007                handle.max_buffered_events
3008            )));
3009        }
3010        handle.frames.push_back(frame);
3011        handle.sent_events += 1;
3012        Ok(VmValue::Bool(true))
3013    })
3014}
3015
3016fn vm_sse_server_heartbeat(stream_id: &str, comment: Option<&VmValue>) -> Result<VmValue, VmError> {
3017    let mut frame = String::new();
3018    push_sse_comment(
3019        &mut frame,
3020        &comment
3021            .map(|value| value.display())
3022            .unwrap_or_else(|| "heartbeat".to_string()),
3023    );
3024    frame.push('\n');
3025    SSE_SERVER_HANDLES.with(|handles| {
3026        let mut handles = handles.borrow_mut();
3027        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3028            vm_error(format!(
3029                "sse_server_heartbeat: unknown stream '{stream_id}'"
3030            ))
3031        })?;
3032        if handle.closed || handle.cancelled || handle.disconnected {
3033            return Ok(VmValue::Bool(false));
3034        }
3035        if frame.len() > handle.max_event_bytes {
3036            return Err(vm_error(format!(
3037                "sse_server_heartbeat: event exceeded max_event_bytes ({})",
3038                handle.max_event_bytes
3039            )));
3040        }
3041        if handle.frames.len() >= handle.max_buffered_events {
3042            return Err(vm_error(format!(
3043                "sse_server_heartbeat: buffered events exceeded max_buffered_events ({})",
3044                handle.max_buffered_events
3045            )));
3046        }
3047        handle.frames.push_back(frame);
3048        handle.sent_events += 1;
3049        Ok(VmValue::Bool(true))
3050    })
3051}
3052
3053fn vm_sse_server_flush(stream_id: &str) -> Result<VmValue, VmError> {
3054    SSE_SERVER_HANDLES.with(|handles| {
3055        let mut handles = handles.borrow_mut();
3056        let handle = handles
3057            .get_mut(stream_id)
3058            .ok_or_else(|| vm_error(format!("sse_server_flush: unknown stream '{stream_id}'")))?;
3059        if handle.disconnected || handle.cancelled {
3060            return Ok(VmValue::Bool(false));
3061        }
3062        handle.flushed_events = handle.sent_events;
3063        Ok(VmValue::Bool(!handle.closed))
3064    })
3065}
3066
3067fn vm_sse_server_close(stream_id: &str) -> Result<VmValue, VmError> {
3068    SSE_SERVER_HANDLES.with(|handles| {
3069        let mut handles = handles.borrow_mut();
3070        let handle = handles
3071            .get_mut(stream_id)
3072            .ok_or_else(|| vm_error(format!("sse_server_close: unknown stream '{stream_id}'")))?;
3073        if handle.closed {
3074            return Ok(VmValue::Bool(false));
3075        }
3076        handle.closed = true;
3077        Ok(VmValue::Bool(true))
3078    })
3079}
3080
3081fn vm_sse_server_cancel(stream_id: &str, reason: Option<&VmValue>) -> Result<VmValue, VmError> {
3082    SSE_SERVER_HANDLES.with(|handles| {
3083        let mut handles = handles.borrow_mut();
3084        let handle = handles
3085            .get_mut(stream_id)
3086            .ok_or_else(|| vm_error(format!("sse_server_cancel: unknown stream '{stream_id}'")))?;
3087        if handle.cancelled {
3088            return Ok(VmValue::Bool(false));
3089        }
3090        handle.cancelled = true;
3091        handle.closed = true;
3092        handle.cancel_reason = reason
3093            .map(|value| value.display())
3094            .filter(|value| !value.is_empty());
3095        Ok(VmValue::Bool(true))
3096    })
3097}
3098
3099fn vm_sse_server_observed_bool(
3100    stream_id: &str,
3101    builtin: &str,
3102    predicate: impl Fn(&SseServerHandle) -> bool,
3103) -> Result<VmValue, VmError> {
3104    SSE_SERVER_HANDLES.with(|handles| {
3105        handles
3106            .borrow()
3107            .get(stream_id)
3108            .map(|handle| VmValue::Bool(predicate(handle)))
3109            .ok_or_else(|| vm_error(format!("{builtin}: unknown stream '{stream_id}'")))
3110    })
3111}
3112
3113fn vm_sse_server_mock_receive(stream_id: &str) -> Result<VmValue, VmError> {
3114    SSE_SERVER_HANDLES.with(|handles| {
3115        let mut handles = handles.borrow_mut();
3116        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3117            vm_error(format!(
3118                "sse_server_mock_receive: unknown stream '{stream_id}'"
3119            ))
3120        })?;
3121        if let Some(frame) = handle.frames.pop_front() {
3122            return Ok(sse_server_mock_frame_value(&frame));
3123        }
3124        if handle.closed || handle.cancelled || handle.disconnected {
3125            return Ok(sse_server_closed_event());
3126        }
3127        Ok(timeout_event())
3128    })
3129}
3130
3131fn vm_sse_server_mock_disconnect(stream_id: &str) -> Result<VmValue, VmError> {
3132    SSE_SERVER_HANDLES.with(|handles| {
3133        let mut handles = handles.borrow_mut();
3134        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3135            vm_error(format!(
3136                "sse_server_mock_disconnect: unknown stream '{stream_id}'"
3137            ))
3138        })?;
3139        if handle.disconnected {
3140            return Ok(VmValue::Bool(false));
3141        }
3142        handle.disconnected = true;
3143        handle.closed = true;
3144        Ok(VmValue::Bool(true))
3145    })
3146}
3147
3148fn sse_server_mock_frame_value(frame: &str) -> VmValue {
3149    let mut event = MockStreamEvent {
3150        event_type: "message".to_string(),
3151        data: String::new(),
3152        id: None,
3153        retry_ms: None,
3154    };
3155    let mut data_lines = Vec::new();
3156    let mut comments = Vec::new();
3157    for raw in frame.lines() {
3158        if raw.is_empty() {
3159            continue;
3160        }
3161        if let Some(comment) = raw.strip_prefix(':') {
3162            comments.push(comment.strip_prefix(' ').unwrap_or(comment).to_string());
3163            continue;
3164        }
3165        let (field, value) = raw.split_once(':').unwrap_or((raw, ""));
3166        let value = value.strip_prefix(' ').unwrap_or(value);
3167        match field {
3168            "event" => event.event_type = value.to_string(),
3169            "data" => data_lines.push(value.to_string()),
3170            "id" => event.id = Some(value.to_string()).filter(|value| !value.is_empty()),
3171            "retry" => event.retry_ms = value.parse::<i64>().ok(),
3172            _ => {}
3173        }
3174    }
3175    if !data_lines.is_empty() {
3176        event.data = data_lines.join("\n");
3177    }
3178    let mut value = if comments.is_empty() || !data_lines.is_empty() {
3179        sse_event_value(&event)
3180    } else {
3181        let mut dict = BTreeMap::new();
3182        dict.insert("type".to_string(), VmValue::String(Rc::from("comment")));
3183        dict.insert(
3184            "comment".to_string(),
3185            VmValue::String(Rc::from(comments.join("\n"))),
3186        );
3187        VmValue::Dict(Rc::new(dict))
3188    };
3189    if let VmValue::Dict(dict) = &mut value {
3190        let mut owned = (**dict).clone();
3191        owned.insert("raw".to_string(), VmValue::String(Rc::from(frame)));
3192        value = VmValue::Dict(Rc::new(owned));
3193    }
3194    value
3195}
3196
3197fn real_sse_event_value(event: SseEvent) -> VmValue {
3198    match event {
3199        SseEvent::Open => {
3200            let mut dict = BTreeMap::new();
3201            dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
3202            VmValue::Dict(Rc::new(dict))
3203        }
3204        SseEvent::Message(message) => {
3205            let retry_ms = message.retry.map(|retry| retry.as_millis() as i64);
3206            sse_event_value(&MockStreamEvent {
3207                event_type: if message.event.is_empty() {
3208                    "message".to_string()
3209                } else {
3210                    message.event
3211                },
3212                data: message.data,
3213                id: if message.id.is_empty() {
3214                    None
3215                } else {
3216                    Some(message.id)
3217                },
3218                retry_ms,
3219            })
3220        }
3221    }
3222}
3223
3224fn consume_sse_mock(url: &str) -> Option<Vec<MockStreamEvent>> {
3225    SSE_MOCKS.with(|mocks| {
3226        mocks
3227            .borrow()
3228            .iter()
3229            .find(|mock| url_matches(&mock.url_pattern, url))
3230            .map(|mock| mock.events.clone())
3231    })
3232}
3233
3234fn parse_ws_message(value: &VmValue) -> MockWsMessage {
3235    match value {
3236        VmValue::Dict(dict) => {
3237            let message_type = dict
3238                .get("type")
3239                .map(|value| value.display())
3240                .filter(|value| !value.is_empty())
3241                .unwrap_or_else(|| "text".to_string());
3242            let data = if dict
3243                .get("base64")
3244                .and_then(|value| match value {
3245                    VmValue::Bool(value) => Some(*value),
3246                    _ => None,
3247                })
3248                .unwrap_or(false)
3249            {
3250                use base64::Engine;
3251                dict.get("data")
3252                    .map(|value| value.display())
3253                    .and_then(|data| base64::engine::general_purpose::STANDARD.decode(data).ok())
3254                    .unwrap_or_default()
3255            } else {
3256                dict.get("data")
3257                    .map(|value| value.display().into_bytes())
3258                    .unwrap_or_default()
3259            };
3260            MockWsMessage {
3261                message_type,
3262                data,
3263                close_code: dict
3264                    .get("code")
3265                    .and_then(|value| value.as_int())
3266                    .map(|value| value as u16),
3267                close_reason: dict.get("reason").map(|value| value.display()),
3268            }
3269        }
3270        VmValue::Bytes(bytes) => MockWsMessage {
3271            message_type: "binary".to_string(),
3272            data: bytes.as_ref().clone(),
3273            close_code: None,
3274            close_reason: None,
3275        },
3276        other => MockWsMessage {
3277            message_type: "text".to_string(),
3278            data: other.display().into_bytes(),
3279            close_code: None,
3280            close_reason: None,
3281        },
3282    }
3283}
3284
3285fn parse_websocket_mock(value: Option<&VmValue>) -> (Vec<MockWsMessage>, bool) {
3286    let Some(value) = value else {
3287        return (Vec::new(), false);
3288    };
3289    match value {
3290        VmValue::Dict(dict) => {
3291            let echo = dict
3292                .get("echo")
3293                .and_then(|value| match value {
3294                    VmValue::Bool(value) => Some(*value),
3295                    _ => None,
3296                })
3297                .unwrap_or(false);
3298            let messages = dict
3299                .get("messages")
3300                .and_then(|messages| match messages {
3301                    VmValue::List(items) => Some(items.iter().map(parse_ws_message).collect()),
3302                    _ => None,
3303                })
3304                .unwrap_or_default();
3305            (messages, echo)
3306        }
3307        VmValue::List(items) => (items.iter().map(parse_ws_message).collect(), false),
3308        other => (vec![parse_ws_message(other)], false),
3309    }
3310}
3311
3312fn consume_websocket_mock(url: &str) -> Option<(Vec<MockWsMessage>, bool)> {
3313    WEBSOCKET_MOCKS.with(|mocks| {
3314        mocks
3315            .borrow()
3316            .iter()
3317            .find(|mock| url_matches(&mock.url_pattern, url))
3318            .map(|mock| (mock.messages.clone(), mock.echo))
3319    })
3320}
3321
3322fn ws_message_data(message: &MockWsMessage) -> String {
3323    match message.message_type.as_str() {
3324        "text" => String::from_utf8_lossy(&message.data).into_owned(),
3325        _ => {
3326            use base64::Engine;
3327            base64::engine::general_purpose::STANDARD.encode(&message.data)
3328        }
3329    }
3330}
3331
3332fn closed_event_with(code: Option<u16>, reason: Option<String>) -> VmValue {
3333    let mut dict = BTreeMap::new();
3334    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
3335    if let Some(code) = code {
3336        dict.insert("code".to_string(), VmValue::Int(i64::from(code)));
3337    }
3338    if let Some(reason) = reason {
3339        dict.insert("reason".to_string(), VmValue::String(Rc::from(reason)));
3340    }
3341    VmValue::Dict(Rc::new(dict))
3342}
3343
3344fn ws_event_value(message: MockWsMessage) -> VmValue {
3345    if message.message_type == "close" {
3346        return closed_event_with(message.close_code, message.close_reason);
3347    }
3348    let mut dict = BTreeMap::new();
3349    dict.insert(
3350        "type".to_string(),
3351        VmValue::String(Rc::from(message.message_type.as_str())),
3352    );
3353    match message.message_type.as_str() {
3354        "text" => {
3355            dict.insert(
3356                "data".to_string(),
3357                VmValue::String(Rc::from(String::from_utf8_lossy(&message.data).as_ref())),
3358            );
3359        }
3360        _ => {
3361            use base64::Engine;
3362            dict.insert(
3363                "data_base64".to_string(),
3364                VmValue::String(Rc::from(
3365                    base64::engine::general_purpose::STANDARD
3366                        .encode(&message.data)
3367                        .as_str(),
3368                )),
3369            );
3370        }
3371    }
3372    VmValue::Dict(Rc::new(dict))
3373}
3374
3375fn real_ws_event_value(message: WsMessage) -> VmValue {
3376    match message {
3377        WsMessage::Text(text) => ws_event_value(MockWsMessage {
3378            message_type: "text".to_string(),
3379            data: text.as_bytes().to_vec(),
3380            close_code: None,
3381            close_reason: None,
3382        }),
3383        WsMessage::Binary(bytes) => ws_event_value(MockWsMessage {
3384            message_type: "binary".to_string(),
3385            data: bytes.to_vec(),
3386            close_code: None,
3387            close_reason: None,
3388        }),
3389        WsMessage::Ping(bytes) => ws_event_value(MockWsMessage {
3390            message_type: "ping".to_string(),
3391            data: bytes.to_vec(),
3392            close_code: None,
3393            close_reason: None,
3394        }),
3395        WsMessage::Pong(bytes) => ws_event_value(MockWsMessage {
3396            message_type: "pong".to_string(),
3397            data: bytes.to_vec(),
3398            close_code: None,
3399            close_reason: None,
3400        }),
3401        WsMessage::Close(frame) => match frame {
3402            Some(frame) => {
3403                closed_event_with(Some(u16::from(frame.code)), Some(frame.reason.to_string()))
3404            }
3405            None => closed_event(),
3406        },
3407        WsMessage::Frame(_) => VmValue::Nil,
3408    }
3409}
3410
3411fn transport_mock_call_value(call: &TransportMockCall) -> VmValue {
3412    let mut dict = BTreeMap::new();
3413    dict.insert(
3414        "kind".to_string(),
3415        VmValue::String(Rc::from(call.kind.as_str())),
3416    );
3417    dict.insert(
3418        "url".to_string(),
3419        VmValue::String(Rc::from(call.url.as_str())),
3420    );
3421    dict.insert(
3422        "handle".to_string(),
3423        call.handle
3424            .as_deref()
3425            .map(|handle| VmValue::String(Rc::from(handle)))
3426            .unwrap_or(VmValue::Nil),
3427    );
3428    dict.insert(
3429        "type".to_string(),
3430        call.message_type
3431            .as_deref()
3432            .map(|message_type| VmValue::String(Rc::from(message_type)))
3433            .unwrap_or(VmValue::Nil),
3434    );
3435    dict.insert(
3436        "data".to_string(),
3437        call.data
3438            .as_deref()
3439            .map(|data| VmValue::String(Rc::from(data)))
3440            .unwrap_or(VmValue::Nil),
3441    );
3442    VmValue::Dict(Rc::new(dict))
3443}
3444
3445fn method_is_retryable(retry: &RetryConfig, method: &reqwest::Method) -> bool {
3446    retry
3447        .retryable_methods
3448        .iter()
3449        .any(|candidate| candidate.eq_ignore_ascii_case(method.as_str()))
3450}
3451
3452fn should_retry_response(
3453    config: &HttpRequestConfig,
3454    method: &reqwest::Method,
3455    status: u16,
3456    attempt: u32,
3457) -> bool {
3458    attempt < config.retry.max
3459        && method_is_retryable(&config.retry, method)
3460        && config.retry.retryable_statuses.contains(&status)
3461}
3462
3463fn should_retry_transport(
3464    config: &HttpRequestConfig,
3465    method: &reqwest::Method,
3466    error: &reqwest::Error,
3467    attempt: u32,
3468) -> bool {
3469    attempt < config.retry.max
3470        && method_is_retryable(&config.retry, method)
3471        && (error.is_timeout() || error.is_connect())
3472}
3473
3474fn parse_retry_after_value(value: &str) -> Option<Duration> {
3475    let value = value.trim();
3476    if value.is_empty() {
3477        return None;
3478    }
3479
3480    if let Ok(secs) = value.parse::<f64>() {
3481        if !secs.is_finite() || secs < 0.0 {
3482            return Some(Duration::from_millis(0));
3483        }
3484        let millis = (secs * 1_000.0) as u64;
3485        return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3486    }
3487
3488    if let Ok(target) = httpdate::parse_http_date(value) {
3489        let millis = target
3490            .duration_since(SystemTime::now())
3491            .map(|delta| delta.as_millis() as u64)
3492            .unwrap_or(0);
3493        return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3494    }
3495
3496    None
3497}
3498
3499fn parse_retry_after_header(value: &reqwest::header::HeaderValue) -> Option<Duration> {
3500    value.to_str().ok().and_then(parse_retry_after_value)
3501}
3502
3503fn mock_retry_after(status: u16, headers: &BTreeMap<String, VmValue>) -> Option<Duration> {
3504    if !(status == 429 || status == 503) {
3505        return None;
3506    }
3507
3508    headers
3509        .iter()
3510        .find(|(name, _)| name.eq_ignore_ascii_case("retry-after"))
3511        .and_then(|(_, value)| parse_retry_after_value(&value.display()))
3512}
3513
3514fn response_retry_after(
3515    status: u16,
3516    headers: &reqwest::header::HeaderMap,
3517    respect_retry_after: bool,
3518) -> Option<Duration> {
3519    if !respect_retry_after || !(status == 429 || status == 503) {
3520        return None;
3521    }
3522    headers
3523        .get(reqwest::header::RETRY_AFTER)
3524        .and_then(parse_retry_after_header)
3525}
3526
3527fn compute_retry_delay(attempt: u32, base_ms: u64, retry_after: Option<Duration>) -> Duration {
3528    use rand::RngExt;
3529
3530    let base_delay = base_ms.saturating_mul(1u64 << attempt.min(30));
3531    let jitter: f64 = rand::rng().random_range(0.75..=1.25);
3532    let exponential_ms = ((base_delay as f64 * jitter) as u64).min(MAX_RETRY_DELAY_MS);
3533    let retry_after_ms = retry_after
3534        .map(|duration| duration.as_millis() as u64)
3535        .unwrap_or(0)
3536        .min(MAX_RETRY_DELAY_MS);
3537    Duration::from_millis(exponential_ms.max(retry_after_ms))
3538}
3539
3540async fn vm_execute_http_request(
3541    method: &str,
3542    url: &str,
3543    options: &BTreeMap<String, VmValue>,
3544) -> Result<VmValue, VmError> {
3545    if let Some(session_id) = session_from_options(options) {
3546        return vm_execute_http_session_request(&session_id, method, url, options).await;
3547    }
3548
3549    let config = parse_http_options(options);
3550    let client = pooled_http_client(&config)?;
3551    vm_execute_http_request_with_client(client, &config, method, url, options).await
3552}
3553
3554async fn vm_execute_http_session_request(
3555    session_id: &str,
3556    method: &str,
3557    url: &str,
3558    options: &BTreeMap<String, VmValue>,
3559) -> Result<VmValue, VmError> {
3560    let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(session_id).cloned());
3561    let Some(session) = session else {
3562        return Err(vm_error(format!(
3563            "http_session_request: unknown HTTP session '{session_id}'"
3564        )));
3565    };
3566    let merged_options = merge_options(&session.options, options);
3567    let config = parse_http_options(&merged_options);
3568    vm_execute_http_request_with_client(session.client, &config, method, url, &merged_options).await
3569}
3570
3571async fn vm_execute_http_request_with_client(
3572    client: reqwest::Client,
3573    config: &HttpRequestConfig,
3574    method: &str,
3575    url: &str,
3576    options: &BTreeMap<String, VmValue>,
3577) -> Result<VmValue, VmError> {
3578    let parts = parse_http_request_parts(method, options)?;
3579    let final_url = final_http_url(url, options, "http")?;
3580
3581    if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3582        return Err(vm_error(format!(
3583            "http: URL must start with http:// or https://, got '{url}'"
3584        )));
3585    }
3586    crate::egress::enforce_url_allowed("http_request", &final_url).await?;
3587
3588    for attempt in 0..=config.retry.max {
3589        if let Some(mock_response) = consume_http_mock(
3590            method,
3591            &final_url,
3592            parts.recorded_headers.clone(),
3593            parts.body.clone(),
3594        ) {
3595            let status = mock_response.status.clamp(0, u16::MAX as i64) as u16;
3596            if should_retry_response(config, &parts.method, status, attempt) {
3597                let retry_after = if config.retry.respect_retry_after {
3598                    mock_retry_after(status, &mock_response.headers)
3599                } else {
3600                    None
3601                };
3602                tokio::time::sleep(compute_retry_delay(
3603                    attempt,
3604                    config.retry.backoff_ms,
3605                    retry_after,
3606                ))
3607                .await;
3608                continue;
3609            }
3610
3611            return Ok(build_http_response(
3612                mock_response.status,
3613                mock_response.headers,
3614                mock_response.body,
3615            ));
3616        }
3617
3618        let mut req = client.request(parts.method.clone(), &final_url);
3619        req = req
3620            .headers(parts.headers.clone())
3621            .timeout(Duration::from_millis(config.total_timeout_ms));
3622        if let Some(multipart) = &parts.multipart {
3623            req = req.multipart(multipart_form(multipart)?);
3624        } else if let Some(ref b) = parts.body {
3625            req = req.body(b.clone());
3626        }
3627
3628        match req.send().await {
3629            Ok(response) => {
3630                verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3631                let status = response.status().as_u16();
3632                if should_retry_response(config, &parts.method, status, attempt) {
3633                    let retry_after = response_retry_after(
3634                        status,
3635                        response.headers(),
3636                        config.retry.respect_retry_after,
3637                    );
3638                    tokio::time::sleep(compute_retry_delay(
3639                        attempt,
3640                        config.retry.backoff_ms,
3641                        retry_after,
3642                    ))
3643                    .await;
3644                    continue;
3645                }
3646
3647                let resp_headers = response_headers(response.headers());
3648
3649                let body_text = response
3650                    .text()
3651                    .await
3652                    .map_err(|e| vm_error(format!("http: failed to read response body: {e}")))?;
3653                return Ok(build_http_response(status as i64, resp_headers, body_text));
3654            }
3655            Err(e) => {
3656                if should_retry_transport(config, &parts.method, &e, attempt) {
3657                    tokio::time::sleep(compute_retry_delay(attempt, config.retry.backoff_ms, None))
3658                        .await;
3659                    continue;
3660                }
3661                return Err(vm_error(format!("http: request failed: {e}")));
3662            }
3663        }
3664    }
3665
3666    Err(vm_error("http: request failed"))
3667}
3668
3669async fn vm_http_download(
3670    url: &str,
3671    dst_path: &str,
3672    options: &BTreeMap<String, VmValue>,
3673) -> Result<VmValue, VmError> {
3674    let method = options
3675        .get("method")
3676        .map(|value| value.display())
3677        .filter(|value| !value.is_empty())
3678        .unwrap_or_else(|| "GET".to_string())
3679        .to_uppercase();
3680    let parts = parse_http_request_parts(&method, options)?;
3681    let final_url = final_http_url(url, options, "http_download")?;
3682    if let Some(mock_response) = consume_http_mock(
3683        &method,
3684        &final_url,
3685        parts.recorded_headers.clone(),
3686        parts.body.clone(),
3687    ) {
3688        let resolved = resolve_http_path(
3689            "http_download",
3690            dst_path,
3691            crate::stdlib::sandbox::FsAccess::Write,
3692        )?;
3693        std::fs::write(&resolved, mock_response.body.as_bytes()).map_err(|error| {
3694            vm_error(format!(
3695                "http_download: failed to write {}: {error}",
3696                resolved.display()
3697            ))
3698        })?;
3699        return Ok(build_http_download_response(
3700            mock_response.status,
3701            mock_response.headers,
3702            mock_response.body.len() as u64,
3703        ));
3704    }
3705
3706    if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3707        return Err(vm_error(format!(
3708            "http_download: URL must start with http:// or https://, got '{url}'"
3709        )));
3710    }
3711    crate::egress::enforce_url_allowed("http_download", &final_url).await?;
3712    let config = parse_http_options(options);
3713    let client = if let Some(session_id) = session_from_options(options) {
3714        HTTP_SESSIONS
3715            .with(|sessions| sessions.borrow().get(&session_id).cloned())
3716            .map(|session| session.client)
3717            .ok_or_else(|| {
3718                vm_error(format!(
3719                    "http_download: unknown HTTP session '{session_id}'"
3720                ))
3721            })?
3722    } else {
3723        pooled_http_client(&config)?
3724    };
3725    let mut request = client
3726        .request(parts.method, &final_url)
3727        .headers(parts.headers)
3728        .timeout(Duration::from_millis(config.total_timeout_ms));
3729    if let Some(multipart) = &parts.multipart {
3730        request = request.multipart(multipart_form(multipart)?);
3731    } else if let Some(body) = parts.body {
3732        request = request.body(body);
3733    }
3734    let mut response = request
3735        .send()
3736        .await
3737        .map_err(|error| vm_error(format!("http_download: request failed: {error}")))?;
3738    verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3739    let status = response.status().as_u16() as i64;
3740    let headers = response_headers(response.headers());
3741    let resolved = resolve_http_path(
3742        "http_download",
3743        dst_path,
3744        crate::stdlib::sandbox::FsAccess::Write,
3745    )?;
3746    let mut file = std::fs::File::create(&resolved).map_err(|error| {
3747        vm_error(format!(
3748            "http_download: failed to create {}: {error}",
3749            resolved.display()
3750        ))
3751    })?;
3752    let mut bytes_written = 0_u64;
3753    while let Some(chunk) = response.chunk().await.map_err(|error| {
3754        vm_error(format!(
3755            "http_download: failed to read response body: {error}"
3756        ))
3757    })? {
3758        file.write_all(&chunk).map_err(|error| {
3759            vm_error(format!(
3760                "http_download: failed to write {}: {error}",
3761                resolved.display()
3762            ))
3763        })?;
3764        bytes_written += chunk.len() as u64;
3765    }
3766    Ok(build_http_download_response(status, headers, bytes_written))
3767}
3768
3769async fn vm_http_stream_open(
3770    url: &str,
3771    options: &BTreeMap<String, VmValue>,
3772) -> Result<VmValue, VmError> {
3773    let method = options
3774        .get("method")
3775        .map(|value| value.display())
3776        .filter(|value| !value.is_empty())
3777        .unwrap_or_else(|| "GET".to_string())
3778        .to_uppercase();
3779    let parts = parse_http_request_parts(&method, options)?;
3780    let final_url = final_http_url(url, options, "http_stream_open")?;
3781    let id = next_transport_handle("http-stream");
3782    if let Some(mock_response) = consume_http_mock(
3783        &method,
3784        &final_url,
3785        parts.recorded_headers.clone(),
3786        parts.body.clone(),
3787    ) {
3788        let handle = HttpStreamHandle {
3789            kind: HttpStreamKind::Fake,
3790            status: mock_response.status,
3791            headers: mock_response.headers,
3792            pending: mock_response.body.into_bytes().into(),
3793            closed: false,
3794        };
3795        HTTP_STREAMS.with(|streams| {
3796            let mut streams = streams.borrow_mut();
3797            if streams.len() >= MAX_HTTP_STREAMS {
3798                return Err(vm_error(format!(
3799                    "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
3800                )));
3801            }
3802            streams.insert(id.clone(), handle);
3803            Ok(())
3804        })?;
3805        return Ok(VmValue::String(Rc::from(id)));
3806    }
3807
3808    if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3809        return Err(vm_error(format!(
3810            "http_stream_open: URL must start with http:// or https://, got '{url}'"
3811        )));
3812    }
3813    crate::egress::enforce_url_allowed("http_stream_open", &final_url).await?;
3814    let config = parse_http_options(options);
3815    let client = if let Some(session_id) = session_from_options(options) {
3816        HTTP_SESSIONS
3817            .with(|sessions| sessions.borrow().get(&session_id).cloned())
3818            .map(|session| session.client)
3819            .ok_or_else(|| {
3820                vm_error(format!(
3821                    "http_stream_open: unknown HTTP session '{session_id}'"
3822                ))
3823            })?
3824    } else {
3825        pooled_http_client(&config)?
3826    };
3827    let mut request = client
3828        .request(parts.method, &final_url)
3829        .headers(parts.headers)
3830        .timeout(Duration::from_millis(config.total_timeout_ms));
3831    if let Some(multipart) = &parts.multipart {
3832        request = request.multipart(multipart_form(multipart)?);
3833    } else if let Some(body) = parts.body {
3834        request = request.body(body);
3835    }
3836    let response = request
3837        .send()
3838        .await
3839        .map_err(|error| vm_error(format!("http_stream_open: request failed: {error}")))?;
3840    verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3841    let status = response.status().as_u16() as i64;
3842    let headers = response_headers(response.headers());
3843    let handle = HttpStreamHandle {
3844        kind: HttpStreamKind::Real(Rc::new(tokio::sync::Mutex::new(response))),
3845        status,
3846        headers,
3847        pending: VecDeque::new(),
3848        closed: false,
3849    };
3850    HTTP_STREAMS.with(|streams| {
3851        let mut streams = streams.borrow_mut();
3852        if streams.len() >= MAX_HTTP_STREAMS {
3853            return Err(vm_error(format!(
3854                "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
3855            )));
3856        }
3857        streams.insert(id.clone(), handle);
3858        Ok(())
3859    })?;
3860    Ok(VmValue::String(Rc::from(id)))
3861}
3862
3863async fn vm_http_stream_read(stream_id: &str, max_bytes: usize) -> Result<VmValue, VmError> {
3864    let (kind, mut pending, closed) = HTTP_STREAMS
3865        .with(|streams| {
3866            let mut streams = streams.borrow_mut();
3867            let handle = streams.get_mut(stream_id)?;
3868            let kind = match &handle.kind {
3869                HttpStreamKind::Real(response) => HttpStreamKind::Real(response.clone()),
3870                HttpStreamKind::Fake => HttpStreamKind::Fake,
3871            };
3872            let pending = std::mem::take(&mut handle.pending);
3873            Some((kind, pending, handle.closed))
3874        })
3875        .ok_or_else(|| vm_error(format!("http_stream_read: unknown stream '{stream_id}'")))?;
3876    if closed {
3877        return Ok(VmValue::Nil);
3878    }
3879    if pending.is_empty() {
3880        match kind {
3881            HttpStreamKind::Fake => {}
3882            HttpStreamKind::Real(response) => {
3883                let mut response = response.lock().await;
3884                if let Some(chunk) = response.chunk().await.map_err(|error| {
3885                    vm_error(format!(
3886                        "http_stream_read: failed to read response body: {error}"
3887                    ))
3888                })? {
3889                    pending.extend(chunk);
3890                }
3891            }
3892        }
3893    }
3894    if pending.is_empty() {
3895        HTTP_STREAMS.with(|streams| {
3896            if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
3897                handle.closed = true;
3898            }
3899        });
3900        return Ok(VmValue::Nil);
3901    }
3902    let take = pending.len().min(max_bytes.max(1));
3903    let chunk = pending.drain(..take).collect::<Vec<_>>();
3904    HTTP_STREAMS.with(|streams| {
3905        if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
3906            handle.pending = pending;
3907        }
3908    });
3909    Ok(VmValue::Bytes(Rc::new(chunk)))
3910}
3911
3912fn vm_http_stream_info(stream_id: &str) -> Result<VmValue, VmError> {
3913    HTTP_STREAMS.with(|streams| {
3914        let streams = streams.borrow();
3915        let handle = streams
3916            .get(stream_id)
3917            .ok_or_else(|| vm_error(format!("http_stream_info: unknown stream '{stream_id}'")))?;
3918        let mut dict = BTreeMap::new();
3919        dict.insert("status".to_string(), VmValue::Int(handle.status));
3920        dict.insert(
3921            "headers".to_string(),
3922            VmValue::Dict(Rc::new(handle.headers.clone())),
3923        );
3924        dict.insert(
3925            "ok".to_string(),
3926            VmValue::Bool((200..300).contains(&(handle.status as u16))),
3927        );
3928        Ok(VmValue::Dict(Rc::new(dict)))
3929    })
3930}
3931
3932async fn vm_sse_connect(
3933    method: &str,
3934    url: &str,
3935    options: &BTreeMap<String, VmValue>,
3936) -> Result<VmValue, VmError> {
3937    let id = next_transport_handle("sse");
3938    let max_events =
3939        transport_limit_option(options, "max_events", DEFAULT_MAX_STREAM_EVENTS).max(1);
3940    let max_message_bytes =
3941        transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
3942
3943    if let Some(events) = consume_sse_mock(url) {
3944        let handle = SseHandle {
3945            kind: SseHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeSseStream {
3946                events: events.into(),
3947                opened: false,
3948                closed: false,
3949            }))),
3950            url: url.to_string(),
3951            max_events,
3952            max_message_bytes,
3953            received: 0,
3954        };
3955        SSE_HANDLES.with(|handles| {
3956            let mut handles = handles.borrow_mut();
3957            if handles.len() >= MAX_SSE_STREAMS {
3958                return Err(vm_error(format!(
3959                    "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
3960                )));
3961            }
3962            handles.insert(id.clone(), handle);
3963            Ok(())
3964        })?;
3965        record_transport_call(TransportMockCall {
3966            kind: "sse_connect".to_string(),
3967            handle: Some(id.clone()),
3968            url: url.to_string(),
3969            message_type: None,
3970            data: None,
3971        });
3972        return Ok(VmValue::String(Rc::from(id)));
3973    }
3974
3975    if !url.starts_with("http://") && !url.starts_with("https://") {
3976        return Err(vm_error(format!(
3977            "sse_connect: URL must start with http:// or https://, got '{url}'"
3978        )));
3979    }
3980    crate::egress::enforce_url_allowed("sse_connect", url).await?;
3981
3982    let config = parse_http_options(options);
3983    let client = if let Some(session_id) = session_from_options(options) {
3984        let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(&session_id).cloned());
3985        session
3986            .map(|session| session.client)
3987            .ok_or_else(|| vm_error(format!("sse_connect: unknown HTTP session '{session_id}'")))?
3988    } else {
3989        pooled_http_client(&config)?
3990    };
3991    let parts = parse_http_request_parts(method, options)?;
3992    let mut request = client
3993        .request(parts.method, url)
3994        .headers(parts.headers)
3995        .timeout(Duration::from_millis(config.total_timeout_ms));
3996    if let Some(body) = parts.body {
3997        request = request.body(body);
3998    }
3999    let stream = EventSource::new(request)
4000        .map_err(|error| vm_error(format!("sse_connect: failed to create stream: {error}")))?;
4001    let handle = SseHandle {
4002        kind: SseHandleKind::Real(Rc::new(tokio::sync::Mutex::new(stream))),
4003        url: url.to_string(),
4004        max_events,
4005        max_message_bytes,
4006        received: 0,
4007    };
4008    SSE_HANDLES.with(|handles| {
4009        let mut handles = handles.borrow_mut();
4010        if handles.len() >= MAX_SSE_STREAMS {
4011            return Err(vm_error(format!(
4012                "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4013            )));
4014        }
4015        handles.insert(id.clone(), handle);
4016        Ok(())
4017    })?;
4018    Ok(VmValue::String(Rc::from(id)))
4019}
4020
4021async fn vm_sse_receive(stream_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4022    let stream = SSE_HANDLES.with(|handles| {
4023        let mut handles = handles.borrow_mut();
4024        let handle = handles.get_mut(stream_id)?;
4025        if handle.received >= handle.max_events {
4026            return Some(Err(vm_error(format!(
4027                "sse_receive: stream '{stream_id}' exceeded max_events"
4028            ))));
4029        }
4030        handle.received += 1;
4031        let url = handle.url.clone();
4032        let max_message_bytes = handle.max_message_bytes;
4033        let kind = match &handle.kind {
4034            SseHandleKind::Real(stream) => SseHandleKind::Real(stream.clone()),
4035            SseHandleKind::Fake(stream) => SseHandleKind::Fake(stream.clone()),
4036        };
4037        Some(Ok((kind, url, max_message_bytes)))
4038    });
4039    let Some(stream) = stream else {
4040        return Err(vm_error(format!(
4041            "sse_receive: unknown stream '{stream_id}'"
4042        )));
4043    };
4044    let (kind, _url, max_message_bytes) = stream?;
4045
4046    match kind {
4047        SseHandleKind::Fake(stream) => {
4048            let mut stream = stream.lock().await;
4049            if stream.closed {
4050                return Ok(VmValue::Nil);
4051            }
4052            if !stream.opened {
4053                stream.opened = true;
4054                let mut dict = BTreeMap::new();
4055                dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
4056                return Ok(VmValue::Dict(Rc::new(dict)));
4057            }
4058            let Some(event) = stream.events.pop_front() else {
4059                stream.closed = true;
4060                return Ok(VmValue::Nil);
4061            };
4062            if event.data.len() > max_message_bytes {
4063                return Err(vm_error(format!(
4064                    "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4065                )));
4066            }
4067            Ok(sse_event_value(&event))
4068        }
4069        SseHandleKind::Real(stream) => {
4070            let mut stream = stream.lock().await;
4071            let next = stream.next();
4072            let event = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await {
4073                Ok(Some(Ok(event))) => event,
4074                Ok(Some(Err(error))) => {
4075                    return Err(vm_error(format!("sse_receive: stream error: {error}")));
4076                }
4077                Ok(None) => return Ok(VmValue::Nil),
4078                Err(_) => return Ok(timeout_event()),
4079            };
4080            if let SseEvent::Message(message) = &event {
4081                if message.data.len() > max_message_bytes {
4082                    stream.close();
4083                    return Err(vm_error(format!(
4084                        "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4085                    )));
4086                }
4087            }
4088            Ok(real_sse_event_value(event))
4089        }
4090    }
4091}
4092
4093fn websocket_route_from_options(path: &str, options: &BTreeMap<String, VmValue>) -> WebSocketRoute {
4094    let bearer_token = options.get("auth").and_then(|auth| match auth {
4095        VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4096        other => {
4097            let value = other.display();
4098            (!value.is_empty()).then_some(value)
4099        }
4100    });
4101    WebSocketRoute {
4102        path: path.to_string(),
4103        bearer_token,
4104        max_messages: transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS)
4105            .max(1),
4106        max_message_bytes: transport_limit_option(
4107            options,
4108            "max_message_bytes",
4109            DEFAULT_MAX_MESSAGE_BYTES,
4110        )
4111        .max(1),
4112        send_buffer_messages: transport_limit_option(options, "send_buffer_messages", 64),
4113        idle_timeout_ms: vm_get_int_option(
4114            options,
4115            "idle_timeout_ms",
4116            DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS as i64,
4117        )
4118        .max(0) as u64,
4119    }
4120}
4121
4122fn vm_websocket_server(
4123    bind: &str,
4124    options: &BTreeMap<String, VmValue>,
4125) -> Result<VmValue, VmError> {
4126    let listener = TcpListener::bind(bind)
4127        .map_err(|error| vm_error(format!("websocket_server: bind failed: {error}")))?;
4128    listener
4129        .set_nonblocking(true)
4130        .map_err(|error| vm_error(format!("websocket_server: nonblocking failed: {error}")))?;
4131    let local_addr = listener
4132        .local_addr()
4133        .map_err(|error| vm_error(format!("websocket_server: local addr failed: {error}")))?;
4134    let id = next_transport_handle("websocket-server");
4135    let addr = local_addr.to_string();
4136    let url = format!("ws://{addr}");
4137    let routes = Arc::new(RwLock::new(HashMap::<String, WebSocketRoute>::new()));
4138    if let Some(path) = options
4139        .get("path")
4140        .map(|value| value.display())
4141        .filter(|path| !path.is_empty())
4142    {
4143        if !path.starts_with('/') {
4144            return Err(vm_error("websocket_server: path must start with '/'"));
4145        }
4146        routes
4147            .write()
4148            .expect("websocket routes poisoned")
4149            .insert(path.clone(), websocket_route_from_options(&path, options));
4150    }
4151    let (event_tx, event_rx) = mpsc::channel();
4152    let running = Arc::new(AtomicBool::new(true));
4153    let server_routes = routes.clone();
4154    let server_running = running.clone();
4155    thread::Builder::new()
4156        .name(format!("harn-ws-{id}"))
4157        .spawn(move || websocket_server_loop(listener, server_routes, event_tx, server_running))
4158        .map_err(|error| vm_error(format!("websocket_server: spawn failed: {error}")))?;
4159    WEBSOCKET_SERVERS.with(|servers| {
4160        let mut servers = servers.borrow_mut();
4161        if servers.len() >= MAX_WEBSOCKET_SERVERS {
4162            running.store(false, Ordering::SeqCst);
4163            let _ = TcpStream::connect(&addr);
4164            return Err(vm_error(format!(
4165                "websocket_server: maximum open servers ({MAX_WEBSOCKET_SERVERS}) reached"
4166            )));
4167        }
4168        servers.insert(
4169            id.clone(),
4170            WebSocketServer {
4171                addr: addr.clone(),
4172                routes,
4173                events: Rc::new(tokio::sync::Mutex::new(event_rx)),
4174                running,
4175            },
4176        );
4177        Ok(())
4178    })?;
4179    let mut dict = BTreeMap::new();
4180    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
4181    dict.insert("addr".to_string(), VmValue::String(Rc::from(addr)));
4182    dict.insert("url".to_string(), VmValue::String(Rc::from(url)));
4183    Ok(VmValue::Dict(Rc::new(dict)))
4184}
4185
4186fn vm_websocket_route(
4187    server_id: &str,
4188    path: &str,
4189    options: &BTreeMap<String, VmValue>,
4190) -> Result<VmValue, VmError> {
4191    let routes = WEBSOCKET_SERVERS.with(|servers| {
4192        servers
4193            .borrow()
4194            .get(server_id)
4195            .map(|server| server.routes.clone())
4196    });
4197    let Some(routes) = routes else {
4198        return Err(vm_error(format!(
4199            "websocket_route: unknown server '{server_id}'"
4200        )));
4201    };
4202    routes.write().expect("websocket routes poisoned").insert(
4203        path.to_string(),
4204        websocket_route_from_options(path, options),
4205    );
4206    Ok(VmValue::Bool(true))
4207}
4208
4209async fn vm_websocket_accept(server_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4210    let receiver = WEBSOCKET_SERVERS.with(|servers| {
4211        servers
4212            .borrow()
4213            .get(server_id)
4214            .map(|server| server.events.clone())
4215    });
4216    let Some(receiver) = receiver else {
4217        return Err(vm_error(format!(
4218            "websocket_accept: unknown server '{server_id}'"
4219        )));
4220    };
4221    let started = std::time::Instant::now();
4222    loop {
4223        let event = {
4224            let receiver = receiver.lock().await;
4225            receiver.try_recv()
4226        };
4227        match event {
4228            Ok(event) => return register_accepted_websocket(event),
4229            Err(mpsc::TryRecvError::Disconnected) => return Ok(VmValue::Nil),
4230            Err(mpsc::TryRecvError::Empty) => {
4231                if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4232                    return Ok(timeout_event());
4233                }
4234                tokio::time::sleep(Duration::from_millis(10)).await;
4235            }
4236        }
4237    }
4238}
4239
4240fn register_accepted_websocket(event: WebSocketServerEvent) -> Result<VmValue, VmError> {
4241    let WebSocketServerEvent {
4242        handle,
4243        path,
4244        peer,
4245        headers,
4246        max_messages,
4247        max_message_bytes,
4248    } = event;
4249    let id = next_transport_handle("websocket");
4250    WEBSOCKET_HANDLES.with(|handles| {
4251        let mut handles = handles.borrow_mut();
4252        if handles.len() >= MAX_WEBSOCKETS {
4253            return Err(vm_error(format!(
4254                "websocket_accept: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4255            )));
4256        }
4257        handles.insert(
4258            id.clone(),
4259            WebSocketHandle {
4260                kind: WebSocketHandleKind::Server(Rc::new(tokio::sync::Mutex::new(handle))),
4261                url: path.clone(),
4262                max_messages,
4263                max_message_bytes,
4264                received: 0,
4265            },
4266        );
4267        Ok(())
4268    })?;
4269    let mut metadata = BTreeMap::new();
4270    metadata.insert("id".to_string(), VmValue::String(Rc::from(id)));
4271    metadata.insert("path".to_string(), VmValue::String(Rc::from(path)));
4272    metadata.insert("peer".to_string(), VmValue::String(Rc::from(peer)));
4273    metadata.insert(
4274        "headers".to_string(),
4275        VmValue::Dict(Rc::new(
4276            headers
4277                .into_iter()
4278                .map(|(name, value)| (name, VmValue::String(Rc::from(value))))
4279                .collect(),
4280        )),
4281    );
4282    Ok(VmValue::Dict(Rc::new(metadata)))
4283}
4284
4285fn vm_websocket_server_close(server_id: &str) -> Result<VmValue, VmError> {
4286    let server = WEBSOCKET_SERVERS.with(|servers| servers.borrow_mut().remove(server_id));
4287    let Some(server) = server else {
4288        return Ok(VmValue::Bool(false));
4289    };
4290    server.running.store(false, Ordering::SeqCst);
4291    let _ = TcpStream::connect(&server.addr);
4292    Ok(VmValue::Bool(true))
4293}
4294
4295fn websocket_server_loop(
4296    listener: TcpListener,
4297    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4298    event_tx: mpsc::Sender<WebSocketServerEvent>,
4299    running: Arc<AtomicBool>,
4300) {
4301    while running.load(Ordering::SeqCst) {
4302        match listener.accept() {
4303            Ok((stream, peer)) => {
4304                let routes = routes.clone();
4305                let event_tx = event_tx.clone();
4306                let running = running.clone();
4307                let peer = peer.to_string();
4308                let _ = thread::Builder::new()
4309                    .name("harn-ws-conn".to_string())
4310                    .spawn(move || {
4311                        websocket_connection_thread(stream, peer, routes, event_tx, running);
4312                    });
4313            }
4314            Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
4315                thread::sleep(Duration::from_millis(10));
4316            }
4317            Err(_) => break,
4318        }
4319    }
4320}
4321
4322fn error_response(status: StatusCode, body: &str) -> ErrorResponse {
4323    let mut response = ErrorResponse::new(Some(body.to_string()));
4324    *response.status_mut() = status;
4325    response
4326}
4327
4328fn route_for_request(
4329    request: &Request,
4330    routes: &Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4331) -> Result<WebSocketRoute, ErrorResponse> {
4332    let path = request.uri().path().to_string();
4333    let Some(route) = routes
4334        .read()
4335        .expect("websocket routes poisoned")
4336        .get(&path)
4337        .cloned()
4338    else {
4339        return Err(error_response(
4340            StatusCode::NOT_FOUND,
4341            "websocket route not found",
4342        ));
4343    };
4344    if let Some(token) = route.bearer_token.as_ref() {
4345        let expected = format!("Bearer {token}");
4346        let authorized = request
4347            .headers()
4348            .get("authorization")
4349            .and_then(|value| value.to_str().ok())
4350            .map(|value| value == expected)
4351            .unwrap_or(false);
4352        if !authorized {
4353            return Err(error_response(
4354                StatusCode::UNAUTHORIZED,
4355                "websocket route unauthorized",
4356            ));
4357        }
4358    }
4359    Ok(route)
4360}
4361
4362fn websocket_connection_thread(
4363    stream: TcpStream,
4364    peer: String,
4365    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4366    event_tx: mpsc::Sender<WebSocketServerEvent>,
4367    running: Arc<AtomicBool>,
4368) {
4369    let accepted_route = Arc::new(std::sync::Mutex::new(
4370        None::<(WebSocketRoute, BTreeMap<String, String>)>,
4371    ));
4372    let callback_route = accepted_route.clone();
4373    let callback =
4374        move |request: &Request, response: Response| -> Result<Response, ErrorResponse> {
4375            let route = route_for_request(request, &routes)?;
4376            let headers = request
4377                .headers()
4378                .iter()
4379                .filter_map(|(name, value)| {
4380                    value
4381                        .to_str()
4382                        .ok()
4383                        .map(|value| (name.as_str().to_ascii_lowercase(), value.to_string()))
4384                })
4385                .collect::<BTreeMap<_, _>>();
4386            *callback_route
4387                .lock()
4388                .expect("websocket route metadata poisoned") = Some((route, headers));
4389            Ok(response)
4390        };
4391    let Ok(mut socket) = tokio_tungstenite::tungstenite::accept_hdr(stream, callback) else {
4392        return;
4393    };
4394    let Some((route, headers)) = accepted_route
4395        .lock()
4396        .expect("websocket route metadata poisoned")
4397        .clone()
4398    else {
4399        let _ = socket.close(None);
4400        return;
4401    };
4402    let _ = socket
4403        .get_mut()
4404        .set_read_timeout(Some(Duration::from_millis(50)));
4405    let (incoming_tx, incoming_rx) = mpsc::channel();
4406    let (outgoing_tx, outgoing_rx) = mpsc::sync_channel(route.send_buffer_messages);
4407    let event = WebSocketServerEvent {
4408        handle: ServerWebSocket {
4409            incoming: VecDeque::new(),
4410            incoming_rx,
4411            outgoing_tx,
4412            closed: false,
4413        },
4414        path: route.path.clone(),
4415        peer,
4416        headers,
4417        max_messages: route.max_messages,
4418        max_message_bytes: route.max_message_bytes,
4419    };
4420    if event_tx.send(event).is_err() {
4421        let _ = socket.close(None);
4422        return;
4423    }
4424    let mut last_activity = std::time::Instant::now();
4425    while running.load(Ordering::SeqCst) {
4426        while let Ok(command) = outgoing_rx.try_recv() {
4427            match command {
4428                ServerWebSocketCommand::Send(message) => {
4429                    let Ok(message) = real_ws_message(&message) else {
4430                        continue;
4431                    };
4432                    if socket.send(message).is_err() {
4433                        return;
4434                    }
4435                    last_activity = std::time::Instant::now();
4436                }
4437                ServerWebSocketCommand::Close(code, reason) => {
4438                    let _ = socket.close(close_frame(code, reason));
4439                    return;
4440                }
4441            }
4442        }
4443        if route.idle_timeout_ms > 0
4444            && last_activity.elapsed() >= Duration::from_millis(route.idle_timeout_ms)
4445        {
4446            let _ = socket.close(close_frame(Some(1001), Some("idle timeout".to_string())));
4447            let _ = incoming_tx.send(MockWsMessage {
4448                message_type: "close".to_string(),
4449                data: Vec::new(),
4450                close_code: Some(1001),
4451                close_reason: Some("idle timeout".to_string()),
4452            });
4453            return;
4454        }
4455        match socket.read() {
4456            Ok(message) => {
4457                last_activity = std::time::Instant::now();
4458                if incoming_tx
4459                    .send(mock_ws_message_from_real(message))
4460                    .is_err()
4461                {
4462                    return;
4463                }
4464            }
4465            Err(tokio_tungstenite::tungstenite::Error::Io(error))
4466                if error.kind() == std::io::ErrorKind::WouldBlock
4467                    || error.kind() == std::io::ErrorKind::TimedOut =>
4468            {
4469                continue;
4470            }
4471            Err(_) => {
4472                let _ = incoming_tx.send(MockWsMessage {
4473                    message_type: "close".to_string(),
4474                    data: Vec::new(),
4475                    close_code: None,
4476                    close_reason: None,
4477                });
4478                return;
4479            }
4480        }
4481    }
4482    let _ = socket.get_mut().shutdown(Shutdown::Both);
4483}
4484
4485async fn vm_websocket_connect(
4486    url: &str,
4487    options: &BTreeMap<String, VmValue>,
4488) -> Result<VmValue, VmError> {
4489    let id = next_transport_handle("websocket");
4490    let max_messages =
4491        transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS).max(1);
4492    let max_message_bytes =
4493        transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4494
4495    if let Some((messages, echo)) = consume_websocket_mock(url) {
4496        let handle = WebSocketHandle {
4497            kind: WebSocketHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeWebSocket {
4498                messages: messages.into(),
4499                echo,
4500                closed: false,
4501            }))),
4502            url: url.to_string(),
4503            max_messages,
4504            max_message_bytes,
4505            received: 0,
4506        };
4507        WEBSOCKET_HANDLES.with(|handles| {
4508            let mut handles = handles.borrow_mut();
4509            if handles.len() >= MAX_WEBSOCKETS {
4510                return Err(vm_error(format!(
4511                    "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4512                )));
4513            }
4514            handles.insert(id.clone(), handle);
4515            Ok(())
4516        })?;
4517        record_transport_call(TransportMockCall {
4518            kind: "websocket_connect".to_string(),
4519            handle: Some(id.clone()),
4520            url: url.to_string(),
4521            message_type: None,
4522            data: None,
4523        });
4524        return Ok(VmValue::String(Rc::from(id)));
4525    }
4526
4527    if !url.starts_with("ws://") && !url.starts_with("wss://") {
4528        return Err(vm_error(format!(
4529            "websocket_connect: URL must start with ws:// or wss://, got '{url}'"
4530        )));
4531    }
4532    crate::egress::enforce_url_allowed("websocket_connect", url).await?;
4533    let timeout_ms = vm_get_int_option_prefer(
4534        options,
4535        "timeout_ms",
4536        "timeout",
4537        DEFAULT_TIMEOUT_MS as i64,
4538    )
4539    .max(0) as u64;
4540    let request = websocket_client_request(url, options)?;
4541    let connect = tokio_tungstenite::connect_async(request);
4542    let (socket, _) = tokio::time::timeout(Duration::from_millis(timeout_ms), connect)
4543        .await
4544        .map_err(|_| vm_error(format!("websocket_connect: timed out after {timeout_ms}ms")))?
4545        .map_err(|error| vm_error(format!("websocket_connect: failed: {error}")))?;
4546    let handle = WebSocketHandle {
4547        kind: WebSocketHandleKind::Real(Rc::new(tokio::sync::Mutex::new(socket))),
4548        url: url.to_string(),
4549        max_messages,
4550        max_message_bytes,
4551        received: 0,
4552    };
4553    WEBSOCKET_HANDLES.with(|handles| {
4554        let mut handles = handles.borrow_mut();
4555        if handles.len() >= MAX_WEBSOCKETS {
4556            return Err(vm_error(format!(
4557                "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4558            )));
4559        }
4560        handles.insert(id.clone(), handle);
4561        Ok(())
4562    })?;
4563    Ok(VmValue::String(Rc::from(id)))
4564}
4565
4566fn websocket_message_from_vm(
4567    value: VmValue,
4568    options: &BTreeMap<String, VmValue>,
4569) -> Result<MockWsMessage, VmError> {
4570    let message_type = options
4571        .get("type")
4572        .map(|value| value.display())
4573        .filter(|value| !value.is_empty())
4574        .unwrap_or_else(|| match value {
4575            VmValue::Bytes(_) => "binary".to_string(),
4576            _ => "text".to_string(),
4577        });
4578    let data = match value {
4579        VmValue::Bytes(bytes) => bytes.as_ref().clone(),
4580        other
4581            if options
4582                .get("base64")
4583                .and_then(|value| match value {
4584                    VmValue::Bool(value) => Some(*value),
4585                    _ => None,
4586                })
4587                .unwrap_or(false) =>
4588        {
4589            use base64::Engine;
4590            base64::engine::general_purpose::STANDARD
4591                .decode(other.display())
4592                .map_err(|error| vm_error(format!("websocket_send: invalid base64: {error}")))?
4593        }
4594        other => other.display().into_bytes(),
4595    };
4596    Ok(MockWsMessage {
4597        message_type,
4598        data,
4599        close_code: options
4600            .get("code")
4601            .or_else(|| options.get("close_code"))
4602            .and_then(|value| value.as_int())
4603            .map(|value| value as u16),
4604        close_reason: options
4605            .get("reason")
4606            .or_else(|| options.get("close_reason"))
4607            .map(|value| value.display()),
4608    })
4609}
4610
4611fn websocket_client_request(
4612    url: &str,
4613    options: &BTreeMap<String, VmValue>,
4614) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, VmError> {
4615    let mut request = url
4616        .into_client_request()
4617        .map_err(|error| vm_error(format!("websocket_connect: invalid request: {error}")))?;
4618    if let Some(headers) = options.get("headers").and_then(|value| value.as_dict()) {
4619        for (name, value) in headers {
4620            let header_name = tokio_tungstenite::tungstenite::http::header::HeaderName::from_bytes(
4621                name.as_bytes(),
4622            )
4623            .map_err(|error| {
4624                vm_error(format!(
4625                    "websocket_connect: invalid header name '{name}': {error}"
4626                ))
4627            })?;
4628            let header_value = HeaderValue::from_str(&value.display()).map_err(|error| {
4629                vm_error(format!(
4630                    "websocket_connect: invalid header value for '{name}': {error}"
4631                ))
4632            })?;
4633            request.headers_mut().insert(header_name, header_value);
4634        }
4635    }
4636    if let Some(auth) = options.get("auth") {
4637        let bearer = match auth {
4638            VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4639            other => Some(other.display()),
4640        };
4641        if let Some(token) = bearer.filter(|token| !token.is_empty()) {
4642            let value = HeaderValue::from_str(&format!("Bearer {token}")).map_err(|error| {
4643                vm_error(format!(
4644                    "websocket_connect: invalid authorization header: {error}"
4645                ))
4646            })?;
4647            request.headers_mut().insert("authorization", value);
4648        }
4649    }
4650    Ok(request)
4651}
4652
4653fn close_frame(code: Option<u16>, reason: Option<String>) -> Option<CloseFrame> {
4654    code.or(reason.as_ref().map(|_| 1000))
4655        .map(|code| CloseFrame {
4656            code: CloseCode::from(code),
4657            reason: reason.unwrap_or_default().into(),
4658        })
4659}
4660
4661fn real_ws_message(message: &MockWsMessage) -> Result<WsMessage, VmError> {
4662    match message.message_type.as_str() {
4663        "text" => Ok(WsMessage::Text(
4664            String::from_utf8(message.data.clone())
4665                .map_err(|error| vm_error(format!("websocket_send: text is not UTF-8: {error}")))?
4666                .into(),
4667        )),
4668        "binary" => Ok(WsMessage::Binary(message.data.clone().into())),
4669        "ping" => Ok(WsMessage::Ping(message.data.clone().into())),
4670        "pong" => Ok(WsMessage::Pong(message.data.clone().into())),
4671        "close" => Ok(WsMessage::Close(close_frame(
4672            message.close_code,
4673            message.close_reason.clone(),
4674        ))),
4675        other => Err(vm_error(format!(
4676            "websocket_send: unsupported message type '{other}'"
4677        ))),
4678    }
4679}
4680
4681fn mock_ws_message_from_real(message: WsMessage) -> MockWsMessage {
4682    match message {
4683        WsMessage::Text(text) => MockWsMessage {
4684            message_type: "text".to_string(),
4685            data: text.as_bytes().to_vec(),
4686            close_code: None,
4687            close_reason: None,
4688        },
4689        WsMessage::Binary(bytes) => MockWsMessage {
4690            message_type: "binary".to_string(),
4691            data: bytes.to_vec(),
4692            close_code: None,
4693            close_reason: None,
4694        },
4695        WsMessage::Ping(bytes) => MockWsMessage {
4696            message_type: "ping".to_string(),
4697            data: bytes.to_vec(),
4698            close_code: None,
4699            close_reason: None,
4700        },
4701        WsMessage::Pong(bytes) => MockWsMessage {
4702            message_type: "pong".to_string(),
4703            data: bytes.to_vec(),
4704            close_code: None,
4705            close_reason: None,
4706        },
4707        WsMessage::Close(frame) => {
4708            let (close_code, close_reason) = frame
4709                .map(|frame| (Some(u16::from(frame.code)), Some(frame.reason.to_string())))
4710                .unwrap_or((None, None));
4711            MockWsMessage {
4712                message_type: "close".to_string(),
4713                data: Vec::new(),
4714                close_code,
4715                close_reason,
4716            }
4717        }
4718        WsMessage::Frame(_) => MockWsMessage {
4719            message_type: "close".to_string(),
4720            data: Vec::new(),
4721            close_code: None,
4722            close_reason: None,
4723        },
4724    }
4725}
4726
4727async fn vm_websocket_send(
4728    socket_id: &str,
4729    value: VmValue,
4730    options: &BTreeMap<String, VmValue>,
4731) -> Result<VmValue, VmError> {
4732    let message = websocket_message_from_vm(value, options)?;
4733    let socket = WEBSOCKET_HANDLES.with(|handles| {
4734        let handles = handles.borrow();
4735        let handle = handles.get(socket_id)?;
4736        let url = handle.url.clone();
4737        let max_message_bytes = handle.max_message_bytes;
4738        let kind = match &handle.kind {
4739            WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4740            WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4741            WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4742        };
4743        Some((kind, url, max_message_bytes))
4744    });
4745    let Some((kind, url, max_message_bytes)) = socket else {
4746        return Err(vm_error(format!(
4747            "websocket_send: unknown socket '{socket_id}'"
4748        )));
4749    };
4750    if message.data.len() > max_message_bytes {
4751        return Err(vm_error(format!(
4752            "websocket_send: message exceeded max_message_bytes ({max_message_bytes})"
4753        )));
4754    }
4755    match kind {
4756        WebSocketHandleKind::Fake(socket) => {
4757            let mut socket = socket.lock().await;
4758            if socket.closed {
4759                return Ok(VmValue::Bool(false));
4760            }
4761            if message.message_type == "close" {
4762                socket.closed = true;
4763            } else if socket.echo {
4764                socket.messages.push_back(message.clone());
4765            }
4766            record_transport_call(TransportMockCall {
4767                kind: "websocket_send".to_string(),
4768                handle: Some(socket_id.to_string()),
4769                url,
4770                message_type: Some(message.message_type.clone()),
4771                data: Some(ws_message_data(&message)),
4772            });
4773            Ok(VmValue::Bool(true))
4774        }
4775        WebSocketHandleKind::Real(socket) => {
4776            let mut socket = socket.lock().await;
4777            socket
4778                .send(real_ws_message(&message)?)
4779                .await
4780                .map_err(|error| vm_error(format!("websocket_send: failed: {error}")))?;
4781            Ok(VmValue::Bool(true))
4782        }
4783        WebSocketHandleKind::Server(socket) => {
4784            let mut socket = socket.lock().await;
4785            if socket.closed {
4786                return Ok(VmValue::Bool(false));
4787            }
4788            let command = if message.message_type == "close" {
4789                socket.closed = true;
4790                ServerWebSocketCommand::Close(message.close_code, message.close_reason.clone())
4791            } else {
4792                ServerWebSocketCommand::Send(message.clone())
4793            };
4794            socket
4795                .outgoing_tx
4796                .try_send(command)
4797                .map_err(|error| match error {
4798                    mpsc::TrySendError::Full(_) => vm_error("websocket_send: send buffer full"),
4799                    mpsc::TrySendError::Disconnected(_) => {
4800                        vm_error("websocket_send: connection closed")
4801                    }
4802                })?;
4803            Ok(VmValue::Bool(true))
4804        }
4805    }
4806}
4807
4808async fn vm_websocket_receive(socket_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4809    let socket = WEBSOCKET_HANDLES.with(|handles| {
4810        let mut handles = handles.borrow_mut();
4811        let handle = handles.get_mut(socket_id)?;
4812        if handle.received >= handle.max_messages {
4813            return Some(Err(vm_error(format!(
4814                "websocket_receive: socket '{socket_id}' exceeded max_messages"
4815            ))));
4816        }
4817        handle.received += 1;
4818        let max_message_bytes = handle.max_message_bytes;
4819        let kind = match &handle.kind {
4820            WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4821            WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4822            WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4823        };
4824        Some(Ok((kind, max_message_bytes)))
4825    });
4826    let Some(socket) = socket else {
4827        return Err(vm_error(format!(
4828            "websocket_receive: unknown socket '{socket_id}'"
4829        )));
4830    };
4831    let (kind, max_message_bytes) = socket?;
4832    match kind {
4833        WebSocketHandleKind::Fake(socket) => {
4834            let mut socket = socket.lock().await;
4835            if socket.closed {
4836                return Ok(VmValue::Nil);
4837            }
4838            let Some(message) = socket.messages.pop_front() else {
4839                return Ok(timeout_event());
4840            };
4841            if message.data.len() > max_message_bytes {
4842                socket.closed = true;
4843                return Err(vm_error(format!(
4844                    "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4845                )));
4846            }
4847            if message.message_type == "close" {
4848                socket.closed = true;
4849            }
4850            Ok(ws_event_value(message))
4851        }
4852        WebSocketHandleKind::Real(socket) => {
4853            let mut socket = socket.lock().await;
4854            let next = socket.next();
4855            let message = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await
4856            {
4857                Ok(Some(Ok(message))) => message,
4858                Ok(Some(Err(error))) => {
4859                    return Err(vm_error(format!("websocket_receive: failed: {error}")));
4860                }
4861                Ok(None) => return Ok(VmValue::Nil),
4862                Err(_) => return Ok(timeout_event()),
4863            };
4864            match &message {
4865                WsMessage::Text(text) if text.len() > max_message_bytes => {
4866                    return Err(vm_error(format!(
4867                        "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4868                    )));
4869                }
4870                WsMessage::Binary(bytes) | WsMessage::Ping(bytes) | WsMessage::Pong(bytes)
4871                    if bytes.len() > max_message_bytes =>
4872                {
4873                    return Err(vm_error(format!(
4874                        "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4875                    )));
4876                }
4877                _ => {}
4878            }
4879            Ok(real_ws_event_value(message))
4880        }
4881        WebSocketHandleKind::Server(socket) => {
4882            let started = std::time::Instant::now();
4883            loop {
4884                let message = {
4885                    let mut socket = socket.lock().await;
4886                    if socket.closed {
4887                        return Ok(VmValue::Nil);
4888                    }
4889                    while let Ok(message) = socket.incoming_rx.try_recv() {
4890                        socket.incoming.push_back(message);
4891                    }
4892                    socket.incoming.pop_front()
4893                };
4894                if let Some(message) = message {
4895                    if message.data.len() > max_message_bytes {
4896                        let mut socket = socket.lock().await;
4897                        socket.closed = true;
4898                        let _ = socket.outgoing_tx.try_send(ServerWebSocketCommand::Close(
4899                            Some(1009),
4900                            Some("message too large".to_string()),
4901                        ));
4902                        return Err(vm_error(format!(
4903                            "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4904                        )));
4905                    }
4906                    if message.message_type == "close" {
4907                        let mut socket = socket.lock().await;
4908                        socket.closed = true;
4909                    }
4910                    return Ok(ws_event_value(message));
4911                }
4912                if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4913                    return Ok(timeout_event());
4914                }
4915                tokio::time::sleep(Duration::from_millis(10)).await;
4916            }
4917        }
4918    }
4919}
4920
4921async fn vm_websocket_close(socket_id: &str) -> Result<VmValue, VmError> {
4922    let removed = WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().remove(socket_id));
4923    let Some(handle) = removed else {
4924        return Ok(VmValue::Bool(false));
4925    };
4926    match handle.kind {
4927        WebSocketHandleKind::Fake(socket) => {
4928            socket.lock().await.closed = true;
4929            record_transport_call(TransportMockCall {
4930                kind: "websocket_close".to_string(),
4931                handle: Some(socket_id.to_string()),
4932                url: handle.url,
4933                message_type: None,
4934                data: None,
4935            });
4936            Ok(VmValue::Bool(true))
4937        }
4938        WebSocketHandleKind::Real(socket) => {
4939            let mut socket = socket.lock().await;
4940            socket
4941                .close(None)
4942                .await
4943                .map_err(|error| vm_error(format!("websocket_close: failed: {error}")))?;
4944            Ok(VmValue::Bool(true))
4945        }
4946        WebSocketHandleKind::Server(socket) => {
4947            let mut socket = socket.lock().await;
4948            socket.closed = true;
4949            let _ = socket
4950                .outgoing_tx
4951                .try_send(ServerWebSocketCommand::Close(None, None));
4952            Ok(VmValue::Bool(true))
4953        }
4954    }
4955}
4956
4957#[cfg(test)]
4958mod tests {
4959    use super::{
4960        compute_retry_delay, handle_from_value, http_mock_calls_snapshot, mock_call_headers_value,
4961        parse_retry_after_value, push_http_mock, redact_mock_call_url, reset_http_state,
4962        vm_execute_http_request, vm_http_download, vm_http_stream_info, vm_http_stream_open,
4963        vm_http_stream_read, vm_sse_event_frame, vm_sse_server_cancel, vm_sse_server_heartbeat,
4964        vm_sse_server_mock_disconnect, vm_sse_server_mock_receive, vm_sse_server_observed_bool,
4965        vm_sse_server_response, vm_sse_server_send, HttpMockResponse,
4966    };
4967    use crate::connectors::test_util::{
4968        accept_http_connection, read_http_request, write_http_response,
4969    };
4970    use crate::value::VmValue;
4971    use base64::Engine;
4972    use rcgen::generate_simple_self_signed;
4973    use rustls::pki_types::PrivatePkcs8KeyDer;
4974    use rustls::{ServerConfig, ServerConnection, StreamOwned};
4975    use sha2::{Digest, Sha256};
4976    use std::collections::BTreeMap;
4977    use std::io::{Read, Write};
4978    use std::net::TcpListener;
4979    use std::rc::Rc;
4980    use std::sync::{Arc, Once};
4981    use std::time::{Duration, SystemTime};
4982    use tempfile::TempDir;
4983    use x509_parser::prelude::{FromDer, X509Certificate};
4984
4985    fn expect_bool(value: VmValue) -> bool {
4986        let VmValue::Bool(value) = value else {
4987            panic!("expected bool, got {}", value.display());
4988        };
4989        value
4990    }
4991
4992    #[test]
4993    fn parses_retry_after_delta_seconds() {
4994        assert_eq!(parse_retry_after_value("5"), Some(Duration::from_secs(5)));
4995    }
4996
4997    #[test]
4998    fn parses_retry_after_http_date() {
4999        let header = httpdate::fmt_http_date(SystemTime::now() + Duration::from_secs(2));
5000        let parsed = parse_retry_after_value(&header).expect("http-date should parse");
5001        assert!(parsed <= Duration::from_secs(2));
5002        assert!(parsed <= Duration::from_secs(60));
5003    }
5004
5005    #[test]
5006    fn malformed_retry_after_returns_none() {
5007        assert_eq!(parse_retry_after_value("soon-ish"), None);
5008    }
5009
5010    #[test]
5011    fn retry_delay_honors_retry_after_floor() {
5012        let delay = compute_retry_delay(0, 1, Some(Duration::from_millis(250)));
5013        assert!(delay >= Duration::from_millis(250));
5014        assert!(delay <= Duration::from_secs(60));
5015    }
5016
5017    #[tokio::test]
5018    async fn typed_mock_api_drives_http_request_retries() {
5019        reset_http_state();
5020        push_http_mock(
5021            "GET",
5022            "https://api.example.com/retry",
5023            vec![
5024                HttpMockResponse::new(503, "busy").with_header("retry-after", "0"),
5025                HttpMockResponse::new(200, "ok"),
5026            ],
5027        );
5028        let result = vm_execute_http_request(
5029            "GET",
5030            "https://api.example.com/retry",
5031            &BTreeMap::from([
5032                ("retries".to_string(), VmValue::Int(1)),
5033                ("backoff".to_string(), VmValue::Int(0)),
5034            ]),
5035        )
5036        .await
5037        .expect("mocked request should succeed after retry");
5038
5039        let dict = result.as_dict().expect("response dict");
5040        assert_eq!(dict["status"].as_int(), Some(200));
5041        let calls = http_mock_calls_snapshot();
5042        assert_eq!(calls.len(), 2);
5043        assert_eq!(calls[0].url, "https://api.example.com/retry");
5044        reset_http_state();
5045    }
5046
5047    #[tokio::test]
5048    async fn http_mock_records_normalized_headers_and_final_query_url() {
5049        reset_http_state();
5050        push_http_mock(
5051            "GET",
5052            "https://api.example.com/items?api_key=secret&limit=2",
5053            vec![HttpMockResponse::new(200, "ok")],
5054        );
5055        let options = BTreeMap::from([
5056            (
5057                "headers".to_string(),
5058                VmValue::Dict(Rc::new(BTreeMap::from([
5059                    (
5060                        "Authorization".to_string(),
5061                        VmValue::String(Rc::from("Bearer secret")),
5062                    ),
5063                    ("X-Trace".to_string(), VmValue::String(Rc::from("trace-1"))),
5064                ]))),
5065            ),
5066            (
5067                "query".to_string(),
5068                VmValue::Dict(Rc::new(BTreeMap::from([
5069                    ("api_key".to_string(), VmValue::String(Rc::from("secret"))),
5070                    ("limit".to_string(), VmValue::Int(2)),
5071                ]))),
5072            ),
5073        ]);
5074
5075        let response = vm_execute_http_request("GET", "https://api.example.com/items", &options)
5076            .await
5077            .expect("mocked request with query");
5078        let response = response.as_dict().expect("response dict");
5079        assert_eq!(response["status"].as_int(), Some(200));
5080
5081        let calls = http_mock_calls_snapshot();
5082        assert_eq!(calls.len(), 1);
5083        assert_eq!(
5084            calls[0].url,
5085            "https://api.example.com/items?api_key=secret&limit=2"
5086        );
5087        assert_eq!(
5088            calls[0].headers.get("authorization").map(String::as_str),
5089            Some("Bearer secret")
5090        );
5091        assert_eq!(
5092            calls[0].headers.get("x-trace").map(String::as_str),
5093            Some("trace-1")
5094        );
5095        reset_http_state();
5096    }
5097
5098    #[test]
5099    fn mock_call_headers_redact_sensitive_values() {
5100        let headers = BTreeMap::from([
5101            (
5102                "authorization".to_string(),
5103                VmValue::String(Rc::from("Bearer secret")),
5104            ),
5105            (
5106                "accept".to_string(),
5107                VmValue::String(Rc::from("application/json")),
5108            ),
5109            ("x-api-key".to_string(), VmValue::String(Rc::from("secret"))),
5110        ]);
5111        let redacted = mock_call_headers_value(&headers, true);
5112        assert_eq!(redacted["authorization"].display(), "[redacted]");
5113        assert_eq!(redacted["x-api-key"].display(), "[redacted]");
5114        assert_eq!(redacted["accept"].display(), "application/json");
5115
5116        let raw = mock_call_headers_value(&headers, false);
5117        assert_eq!(raw["authorization"].display(), "Bearer secret");
5118    }
5119
5120    #[test]
5121    fn mock_call_url_redacts_sensitive_query_values() {
5122        assert_eq!(
5123            redact_mock_call_url(
5124                "https://api.example.com/items?api_key=secret&limit=2&access_token=token",
5125                true,
5126            ),
5127            "https://api.example.com/items?api_key=%5Bredacted%5D&limit=2&access_token=%5Bredacted%5D"
5128        );
5129        assert_eq!(
5130            redact_mock_call_url("https://api.example.com/items?api_key=secret", false),
5131            "https://api.example.com/items?api_key=secret"
5132        );
5133        assert_eq!(
5134            redact_mock_call_url("https://api.example.com/items?q=a%20b", true),
5135            "https://api.example.com/items?q=a%20b"
5136        );
5137    }
5138
5139    #[tokio::test]
5140    async fn multipart_requests_are_mock_visible() {
5141        reset_http_state();
5142        push_http_mock(
5143            "POST",
5144            "https://api.example.com/upload",
5145            vec![HttpMockResponse::new(201, "uploaded")],
5146        );
5147        let options = BTreeMap::from([(
5148            "multipart".to_string(),
5149            VmValue::List(Rc::new(vec![
5150                VmValue::Dict(Rc::new(BTreeMap::from([
5151                    ("name".to_string(), VmValue::String(Rc::from("meta"))),
5152                    ("value".to_string(), VmValue::String(Rc::from("hello"))),
5153                ]))),
5154                VmValue::Dict(Rc::new(BTreeMap::from([
5155                    ("name".to_string(), VmValue::String(Rc::from("blob"))),
5156                    (
5157                        "filename".to_string(),
5158                        VmValue::String(Rc::from("blob.bin")),
5159                    ),
5160                    (
5161                        "content_type".to_string(),
5162                        VmValue::String(Rc::from("application/octet-stream")),
5163                    ),
5164                    (
5165                        "value".to_string(),
5166                        VmValue::Bytes(Rc::new(vec![0, 1, 2, 3])),
5167                    ),
5168                ]))),
5169            ])),
5170        )]);
5171
5172        let response = vm_execute_http_request("POST", "https://api.example.com/upload", &options)
5173            .await
5174            .expect("multipart mock request should succeed");
5175        let response = response.as_dict().expect("response dict");
5176        assert_eq!(response["status"].as_int(), Some(201));
5177
5178        let calls = http_mock_calls_snapshot();
5179        assert_eq!(calls.len(), 1);
5180        assert!(calls[0]
5181            .headers
5182            .get("content-type")
5183            .expect("content-type recorded")
5184            .contains("multipart/form-data"));
5185        let body = calls[0].body.as_deref().expect("multipart body recorded");
5186        assert!(body.contains("name=\"meta\""));
5187        assert!(body.contains("hello"));
5188        assert!(body.contains("filename=\"blob.bin\""));
5189        reset_http_state();
5190    }
5191
5192    #[tokio::test]
5193    async fn http_stream_mock_reads_in_chunks() {
5194        reset_http_state();
5195        push_http_mock(
5196            "GET",
5197            "https://api.example.com/stream",
5198            vec![HttpMockResponse::new(200, "stream-body")],
5199        );
5200
5201        let handle = vm_http_stream_open("https://api.example.com/stream", &BTreeMap::new())
5202            .await
5203            .expect("stream open");
5204        let stream_id = handle.display();
5205        let info = vm_http_stream_info(&stream_id).expect("stream info");
5206        let info = info.as_dict().expect("info dict");
5207        assert_eq!(info["status"].as_int(), Some(200));
5208
5209        let first = vm_http_stream_read(&stream_id, 6)
5210            .await
5211            .expect("first chunk");
5212        let second = vm_http_stream_read(&stream_id, 64)
5213            .await
5214            .expect("second chunk");
5215        let end = vm_http_stream_read(&stream_id, 64)
5216            .await
5217            .expect("end marker");
5218        assert_eq!(first.as_bytes().expect("bytes"), b"stream");
5219        assert_eq!(second.as_bytes().expect("bytes"), b"-body");
5220        assert!(matches!(end, VmValue::Nil));
5221        reset_http_state();
5222    }
5223
5224    #[tokio::test]
5225    async fn http_download_mock_writes_file() {
5226        reset_http_state();
5227        let temp = TempDir::new().expect("tempdir");
5228        let path = temp.path().join("download.bin");
5229        push_http_mock(
5230            "GET",
5231            "https://api.example.com/download",
5232            vec![HttpMockResponse::new(200, "downloaded")],
5233        );
5234
5235        let response = vm_http_download(
5236            "https://api.example.com/download",
5237            &path.display().to_string(),
5238            &BTreeMap::new(),
5239        )
5240        .await
5241        .expect("download response");
5242        let response = response.as_dict().expect("response dict");
5243        assert_eq!(response["bytes_written"].as_int(), Some(10));
5244        assert_eq!(
5245            std::fs::read_to_string(path).expect("downloaded file"),
5246            "downloaded"
5247        );
5248        reset_http_state();
5249    }
5250
5251    #[tokio::test]
5252    async fn http_proxy_routes_requests_through_configured_proxy() {
5253        reset_http_state();
5254        let listener = TcpListener::bind("127.0.0.1:0").expect("bind proxy listener");
5255        let proxy_addr = listener.local_addr().expect("proxy addr");
5256        let proxy_thread = std::thread::spawn(move || {
5257            let mut stream = accept_http_connection(&listener, "proxy listener");
5258            let request = read_http_request(&mut stream);
5259            assert_eq!(request.method, "GET");
5260            assert_eq!(request.path, "http://example.invalid/proxy-check");
5261            assert_eq!(
5262                request
5263                    .headers
5264                    .get("proxy-authorization")
5265                    .map(String::as_str),
5266                Some("Basic dXNlcjpwYXNz")
5267            );
5268            write_http_response(
5269                &mut stream,
5270                200,
5271                &[("content-type", "text/plain".to_string())],
5272                "proxied",
5273            );
5274        });
5275
5276        let options = BTreeMap::from([
5277            (
5278                "proxy".to_string(),
5279                VmValue::String(Rc::from(format!("http://{proxy_addr}"))),
5280            ),
5281            (
5282                "proxy_auth".to_string(),
5283                VmValue::Dict(Rc::new(BTreeMap::from([
5284                    ("user".to_string(), VmValue::String(Rc::from("user"))),
5285                    ("pass".to_string(), VmValue::String(Rc::from("pass"))),
5286                ]))),
5287            ),
5288            ("timeout_ms".to_string(), VmValue::Int(1_000)),
5289        ]);
5290
5291        let response =
5292            vm_execute_http_request("GET", "http://example.invalid/proxy-check", &options)
5293                .await
5294                .expect("proxied response");
5295        let response = response.as_dict().expect("response dict");
5296        assert_eq!(response["status"].as_int(), Some(200));
5297        assert_eq!(response["body"].display(), "proxied");
5298        proxy_thread.join().expect("proxy thread");
5299        reset_http_state();
5300    }
5301
5302    #[tokio::test]
5303    async fn custom_tls_ca_bundle_and_pin_allow_request() {
5304        reset_http_state();
5305        install_rustls_provider();
5306        let temp = TempDir::new().expect("tempdir");
5307        let cert =
5308            generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5309                .expect("generate cert");
5310        let cert_pem = cert.cert.pem();
5311        let cert_path = temp.path().join("cert.pem");
5312        std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5313        let pin = spki_pin_base64(cert.cert.der().as_ref());
5314
5315        let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5316        let port = listener.local_addr().expect("tls addr").port();
5317        let server_config = Arc::new(
5318            ServerConfig::builder()
5319                .with_no_client_auth()
5320                .with_single_cert(
5321                    vec![cert.cert.der().clone()],
5322                    PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5323                )
5324                .expect("build tls config"),
5325        );
5326        let thread = std::thread::spawn(move || {
5327            let (tcp, _) = listener.accept().expect("accept tls client");
5328            let conn = ServerConnection::new(server_config).expect("server connection");
5329            let mut stream = StreamOwned::new(conn, tcp);
5330            let request = read_http_request_generic(&mut stream);
5331            assert!(request.starts_with("GET /secure HTTP/1.1\r\n"));
5332            write_http_response_generic(
5333                &mut stream,
5334                200,
5335                &[("content-type", "text/plain".to_string())],
5336                "secure",
5337            );
5338        });
5339
5340        let options = BTreeMap::from([(
5341            "tls".to_string(),
5342            VmValue::Dict(Rc::new(BTreeMap::from([
5343                (
5344                    "ca_bundle_path".to_string(),
5345                    VmValue::String(Rc::from(cert_path.display().to_string())),
5346                ),
5347                (
5348                    "pinned_sha256".to_string(),
5349                    VmValue::List(Rc::new(vec![VmValue::String(Rc::from(pin))])),
5350                ),
5351            ]))),
5352        )]);
5353        let response =
5354            vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5355                .await
5356                .expect("tls request should succeed");
5357        let response = response.as_dict().expect("response dict");
5358        assert_eq!(response["body"].display(), "secure");
5359        thread.join().expect("tls thread");
5360        reset_http_state();
5361    }
5362
5363    #[tokio::test]
5364    async fn custom_tls_pin_mismatch_is_rejected() {
5365        reset_http_state();
5366        install_rustls_provider();
5367        let temp = TempDir::new().expect("tempdir");
5368        let cert =
5369            generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5370                .expect("generate cert");
5371        let cert_pem = cert.cert.pem();
5372        let cert_path = temp.path().join("cert.pem");
5373        std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5374
5375        let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5376        let port = listener.local_addr().expect("tls addr").port();
5377        let server_config = Arc::new(
5378            ServerConfig::builder()
5379                .with_no_client_auth()
5380                .with_single_cert(
5381                    vec![cert.cert.der().clone()],
5382                    PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5383                )
5384                .expect("build tls config"),
5385        );
5386        let thread = std::thread::spawn(move || {
5387            let (tcp, _) = listener.accept().expect("accept tls client");
5388            let conn = ServerConnection::new(server_config).expect("server connection");
5389            let mut stream = StreamOwned::new(conn, tcp);
5390            let _ = read_http_request_generic(&mut stream);
5391            write_http_response_generic(
5392                &mut stream,
5393                200,
5394                &[("content-type", "text/plain".to_string())],
5395                "secure",
5396            );
5397        });
5398
5399        let options = BTreeMap::from([(
5400            "tls".to_string(),
5401            VmValue::Dict(Rc::new(BTreeMap::from([
5402                (
5403                    "ca_bundle_path".to_string(),
5404                    VmValue::String(Rc::from(cert_path.display().to_string())),
5405                ),
5406                (
5407                    "pinned_sha256".to_string(),
5408                    VmValue::List(Rc::new(vec![VmValue::String(Rc::from("deadbeef"))])),
5409                ),
5410            ]))),
5411        )]);
5412        let error =
5413            vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5414                .await
5415                .expect_err("pin mismatch should fail");
5416        let message = error.to_string();
5417        assert!(message.contains("TLS SPKI pin mismatch"), "{message}");
5418        thread.join().expect("tls thread");
5419        reset_http_state();
5420    }
5421
5422    fn install_rustls_provider() {
5423        static INSTALL: Once = Once::new();
5424        INSTALL.call_once(|| {
5425            let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
5426        });
5427    }
5428
5429    fn spki_pin_base64(cert_der: &[u8]) -> String {
5430        let (_, cert) = X509Certificate::from_der(cert_der).expect("parse cert");
5431        base64::engine::general_purpose::STANDARD
5432            .encode(Sha256::digest(cert.tbs_certificate.subject_pki.raw))
5433    }
5434
5435    fn read_http_request_generic<T: Read>(stream: &mut T) -> String {
5436        let mut buffer = Vec::new();
5437        let mut chunk = [0u8; 4096];
5438        loop {
5439            let read = stream.read(&mut chunk).expect("read request");
5440            assert!(read > 0, "request closed before headers");
5441            buffer.extend_from_slice(&chunk[..read]);
5442            if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
5443                return String::from_utf8_lossy(&buffer).into_owned();
5444            }
5445        }
5446    }
5447
5448    fn write_http_response_generic<T: Write>(
5449        stream: &mut T,
5450        status: u16,
5451        headers: &[(&str, String)],
5452        body: &str,
5453    ) {
5454        let mut response = format!(
5455            "HTTP/1.1 {status} OK\r\ncontent-length: {}\r\nconnection: close\r\n",
5456            body.len()
5457        );
5458        for (name, value) in headers {
5459            response.push_str(name);
5460            response.push_str(": ");
5461            response.push_str(value);
5462            response.push_str("\r\n");
5463        }
5464        response.push_str("\r\n");
5465        response.push_str(body);
5466        stream
5467            .write_all(response.as_bytes())
5468            .expect("write response");
5469        stream.flush().expect("flush response");
5470    }
5471
5472    #[test]
5473    fn formats_sse_event_fields_and_multiline_data() {
5474        let frame = vm_sse_event_frame(
5475            &VmValue::Dict(Rc::new(BTreeMap::from([
5476                ("event".to_string(), VmValue::String(Rc::from("progress"))),
5477                ("data".to_string(), VmValue::String(Rc::from("one\ntwo"))),
5478                ("id".to_string(), VmValue::String(Rc::from("evt-1"))),
5479                ("retry_ms".to_string(), VmValue::Int(2500)),
5480            ]))),
5481            &BTreeMap::new(),
5482        )
5483        .expect("event frame");
5484        assert_eq!(
5485            frame,
5486            "id: evt-1\nevent: progress\nretry: 2500\ndata: one\ndata: two\n\n"
5487        );
5488    }
5489
5490    #[test]
5491    fn rejects_sse_event_control_fields_with_newlines() {
5492        let err = vm_sse_event_frame(
5493            &VmValue::Dict(Rc::new(BTreeMap::from([(
5494                "event".to_string(),
5495                VmValue::String(Rc::from("bad\nname")),
5496            )]))),
5497            &BTreeMap::new(),
5498        )
5499        .expect_err("newline should reject");
5500        assert!(err.to_string().contains("event must not contain newlines"));
5501    }
5502
5503    #[test]
5504    fn server_sse_mock_client_observes_heartbeat_disconnect_and_cancel() {
5505        reset_http_state();
5506        let response = vm_sse_server_response(&BTreeMap::from([(
5507            "max_buffered_events".to_string(),
5508            VmValue::Int(4),
5509        )]))
5510        .expect("response");
5511        let stream_id = handle_from_value(&response, "test").expect("handle");
5512
5513        assert!(expect_bool(
5514            vm_sse_server_send(
5515                &stream_id,
5516                &VmValue::Dict(Rc::new(BTreeMap::from([
5517                    ("event".to_string(), VmValue::String(Rc::from("progress")),),
5518                    ("data".to_string(), VmValue::String(Rc::from("50"))),
5519                ]))),
5520                &BTreeMap::new(),
5521            )
5522            .expect("send")
5523        ));
5524        assert!(expect_bool(
5525            vm_sse_server_heartbeat(&stream_id, Some(&VmValue::String(Rc::from("tick"))))
5526                .expect("heartbeat")
5527        ));
5528
5529        let first = vm_sse_server_mock_receive(&stream_id).expect("first");
5530        let first = first.as_dict().expect("first dict");
5531        assert_eq!(first["event"].display(), "progress");
5532        assert_eq!(first["data"].display(), "50");
5533        let heartbeat = vm_sse_server_mock_receive(&stream_id).expect("heartbeat read");
5534        let heartbeat = heartbeat.as_dict().expect("heartbeat dict");
5535        assert_eq!(heartbeat["type"].display(), "comment");
5536        assert_eq!(heartbeat["comment"].display(), "tick");
5537
5538        assert!(expect_bool(
5539            vm_sse_server_mock_disconnect(&stream_id).expect("disconnect")
5540        ));
5541        assert!(expect_bool(
5542            vm_sse_server_observed_bool(&stream_id, "test", |handle| handle.disconnected)
5543                .expect("observed")
5544        ));
5545        assert!(!expect_bool(
5546            vm_sse_server_send(
5547                &stream_id,
5548                &VmValue::String(Rc::from("late")),
5549                &BTreeMap::new()
5550            )
5551            .expect("late send")
5552        ));
5553
5554        let cancelled = vm_sse_server_response(&BTreeMap::new()).expect("cancelled response");
5555        let cancelled_id = handle_from_value(&cancelled, "test").expect("cancelled handle");
5556        assert!(expect_bool(
5557            vm_sse_server_cancel(&cancelled_id, Some(&VmValue::String(Rc::from("stop"))))
5558                .expect("cancel")
5559        ));
5560        assert!(expect_bool(
5561            vm_sse_server_observed_bool(&cancelled_id, "test", |handle| handle.cancelled)
5562                .expect("cancelled observed")
5563        ));
5564        reset_http_state();
5565    }
5566
5567    #[test]
5568    fn server_sse_rejects_oversized_events() {
5569        reset_http_state();
5570        let response = vm_sse_server_response(&BTreeMap::from([(
5571            "max_event_bytes".to_string(),
5572            VmValue::Int(12),
5573        )]))
5574        .expect("response");
5575        let stream_id = handle_from_value(&response, "test").expect("handle");
5576        let err = vm_sse_server_send(
5577            &stream_id,
5578            &VmValue::String(Rc::from("this is too large")),
5579            &BTreeMap::new(),
5580        )
5581        .expect_err("oversized event should reject");
5582        assert!(err.to_string().contains("max_event_bytes"));
5583        reset_http_state();
5584    }
5585}