Skip to main content

harn_vm/
http.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::io::Write;
4use std::net::{Shutdown, TcpListener, TcpStream};
5use std::rc::Rc;
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    mpsc, Arc, RwLock,
9};
10use std::thread;
11use std::time::{Duration, SystemTime};
12
13use crate::value::{VmClosure, VmError, VmValue};
14use crate::vm::Vm;
15
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest_eventsource::{Event as SseEvent, EventSource};
19use sha2::{Digest, Sha256};
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
22use tokio_tungstenite::tungstenite::http::{HeaderValue, StatusCode};
23use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
24use tokio_tungstenite::tungstenite::protocol::CloseFrame;
25use tokio_tungstenite::tungstenite::Message as WsMessage;
26use x509_parser::prelude::{FromDer, X509Certificate};
27
28// Mock HTTP framework (thread-local, mirrors the mock LLM pattern).
29
30#[derive(Clone)]
31struct MockResponse {
32    status: i64,
33    body: String,
34    headers: BTreeMap<String, VmValue>,
35}
36
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct HttpMockResponse {
39    pub status: i64,
40    pub body: String,
41    pub headers: BTreeMap<String, String>,
42}
43
44impl HttpMockResponse {
45    pub fn new(status: i64, body: impl Into<String>) -> Self {
46        Self {
47            status,
48            body: body.into(),
49            headers: BTreeMap::new(),
50        }
51    }
52
53    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
54        self.headers.insert(name.into(), value.into());
55        self
56    }
57}
58
59impl From<HttpMockResponse> for MockResponse {
60    fn from(value: HttpMockResponse) -> Self {
61        Self {
62            status: value.status,
63            body: value.body,
64            headers: value
65                .headers
66                .into_iter()
67                .map(|(key, value)| (key, VmValue::String(Rc::from(value))))
68                .collect(),
69        }
70    }
71}
72
73struct HttpMock {
74    method: String,
75    url_pattern: String,
76    responses: Vec<MockResponse>,
77    next_response: usize,
78}
79
80#[derive(Clone)]
81struct HttpMockCall {
82    method: String,
83    url: String,
84    headers: BTreeMap<String, VmValue>,
85    body: Option<String>,
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct HttpMockCallSnapshot {
90    pub method: String,
91    pub url: String,
92    pub headers: BTreeMap<String, String>,
93    pub body: Option<String>,
94}
95
96#[derive(Clone)]
97struct RetryConfig {
98    max: u32,
99    backoff_ms: u64,
100    retryable_statuses: Vec<u16>,
101    retryable_methods: Vec<String>,
102    respect_retry_after: bool,
103}
104
105#[derive(Clone)]
106struct HttpRequestConfig {
107    total_timeout_ms: u64,
108    connect_timeout_ms: Option<u64>,
109    read_timeout_ms: Option<u64>,
110    retry: RetryConfig,
111    follow_redirects: bool,
112    max_redirects: usize,
113    proxy: Option<HttpProxyConfig>,
114    tls: HttpTlsConfig,
115    decompress: bool,
116}
117
118#[derive(Clone, Default)]
119struct HttpTlsConfig {
120    ca_bundle_path: Option<String>,
121    client_cert_path: Option<String>,
122    client_key_path: Option<String>,
123    client_identity_path: Option<String>,
124    pinned_sha256: Vec<String>,
125}
126
127#[derive(Clone)]
128struct HttpProxyConfig {
129    url: String,
130    auth: Option<(String, String)>,
131    no_proxy: Option<String>,
132}
133
134#[derive(Clone)]
135struct HttpSession {
136    client: reqwest::Client,
137    options: BTreeMap<String, VmValue>,
138}
139
140struct HttpRequestParts {
141    method: reqwest::Method,
142    headers: reqwest::header::HeaderMap,
143    recorded_headers: BTreeMap<String, VmValue>,
144    body: Option<String>,
145    multipart: Option<MultipartRequest>,
146}
147
148#[derive(Clone)]
149struct MultipartRequest {
150    parts: Vec<MultipartField>,
151    mock_body: String,
152}
153
154#[derive(Clone)]
155struct MultipartField {
156    name: String,
157    value: Vec<u8>,
158    filename: Option<String>,
159    content_type: Option<String>,
160}
161
162struct HttpStreamHandle {
163    kind: HttpStreamKind,
164    status: i64,
165    headers: BTreeMap<String, VmValue>,
166    pending: VecDeque<u8>,
167    closed: bool,
168}
169
170enum HttpStreamKind {
171    Real(Rc<tokio::sync::Mutex<reqwest::Response>>),
172    Fake,
173}
174
175struct SseMock {
176    url_pattern: String,
177    events: Vec<MockStreamEvent>,
178}
179
180#[derive(Clone)]
181struct MockStreamEvent {
182    event_type: String,
183    data: String,
184    id: Option<String>,
185    retry_ms: Option<i64>,
186}
187
188struct SseHandle {
189    kind: SseHandleKind,
190    url: String,
191    max_events: usize,
192    max_message_bytes: usize,
193    received: usize,
194}
195
196enum SseHandleKind {
197    Real(Rc<tokio::sync::Mutex<EventSource>>),
198    Fake(Rc<tokio::sync::Mutex<FakeSseStream>>),
199}
200
201struct FakeSseStream {
202    events: VecDeque<MockStreamEvent>,
203    opened: bool,
204    closed: bool,
205}
206
207struct SseServerHandle {
208    status: i64,
209    headers: BTreeMap<String, VmValue>,
210    frames: VecDeque<String>,
211    max_event_bytes: usize,
212    max_buffered_events: usize,
213    sent_events: usize,
214    flushed_events: usize,
215    closed: bool,
216    disconnected: bool,
217    cancelled: bool,
218    cancel_reason: Option<String>,
219}
220
221struct WebSocketMock {
222    url_pattern: String,
223    messages: Vec<MockWsMessage>,
224    echo: bool,
225}
226
227#[derive(Clone)]
228struct MockWsMessage {
229    message_type: String,
230    data: Vec<u8>,
231    close_code: Option<u16>,
232    close_reason: Option<String>,
233}
234
235type RealWebSocket =
236    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
237
238struct WebSocketHandle {
239    kind: WebSocketHandleKind,
240    url: String,
241    max_messages: usize,
242    max_message_bytes: usize,
243    received: usize,
244}
245
246enum WebSocketHandleKind {
247    Real(Rc<tokio::sync::Mutex<RealWebSocket>>),
248    Fake(Rc<tokio::sync::Mutex<FakeWebSocket>>),
249    Server(Rc<tokio::sync::Mutex<ServerWebSocket>>),
250}
251
252struct FakeWebSocket {
253    messages: VecDeque<MockWsMessage>,
254    echo: bool,
255    closed: bool,
256}
257
258struct WebSocketServer {
259    addr: String,
260    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
261    events: Rc<tokio::sync::Mutex<mpsc::Receiver<WebSocketServerEvent>>>,
262    running: Arc<AtomicBool>,
263}
264
265#[derive(Clone)]
266struct WebSocketRoute {
267    path: String,
268    bearer_token: Option<String>,
269    max_messages: usize,
270    max_message_bytes: usize,
271    send_buffer_messages: usize,
272    idle_timeout_ms: u64,
273}
274
275struct WebSocketServerEvent {
276    handle: ServerWebSocket,
277    path: String,
278    peer: String,
279    headers: BTreeMap<String, String>,
280    max_messages: usize,
281    max_message_bytes: usize,
282}
283
284struct ServerWebSocket {
285    incoming: VecDeque<MockWsMessage>,
286    incoming_rx: mpsc::Receiver<MockWsMessage>,
287    outgoing_tx: mpsc::SyncSender<ServerWebSocketCommand>,
288    closed: bool,
289}
290
291enum ServerWebSocketCommand {
292    Send(MockWsMessage),
293    Close(Option<u16>, Option<String>),
294}
295
296#[derive(Clone)]
297struct TransportMockCall {
298    kind: String,
299    handle: Option<String>,
300    url: String,
301    message_type: Option<String>,
302    data: Option<String>,
303}
304
305#[derive(Clone)]
306struct HttpServerRoute {
307    method: String,
308    template: String,
309    handler: Rc<VmClosure>,
310    max_body_bytes: Option<usize>,
311    retain_raw_body: Option<bool>,
312}
313
314#[derive(Clone)]
315struct HttpServer {
316    routes: Vec<HttpServerRoute>,
317    before: Vec<Rc<VmClosure>>,
318    after: Vec<Rc<VmClosure>>,
319    ready: bool,
320    readiness: Option<Rc<VmClosure>>,
321    shutdown_hooks: Vec<Rc<VmClosure>>,
322    shutdown: bool,
323    max_body_bytes: usize,
324    retain_raw_body: bool,
325}
326
327const DEFAULT_TIMEOUT_MS: u64 = 30_000;
328const DEFAULT_BACKOFF_MS: u64 = 1_000;
329const MAX_RETRY_DELAY_MS: u64 = 60_000;
330const DEFAULT_RETRYABLE_STATUSES: [u16; 6] = [408, 429, 500, 502, 503, 504];
331const DEFAULT_RETRYABLE_METHODS: [&str; 5] = ["GET", "HEAD", "PUT", "DELETE", "OPTIONS"];
332const DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS: u64 = 30_000;
333const DEFAULT_MAX_STREAM_EVENTS: usize = 10_000;
334const DEFAULT_MAX_MESSAGE_BYTES: usize = 1024 * 1024;
335const DEFAULT_SERVER_MAX_BODY_BYTES: usize = 1024 * 1024;
336const DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS: u64 = 30_000;
337const MAX_HTTP_SESSIONS: usize = 64;
338const MAX_HTTP_STREAMS: usize = 64;
339const MAX_SSE_STREAMS: usize = 64;
340const MAX_SSE_SERVER_STREAMS: usize = 64;
341const MAX_WEBSOCKETS: usize = 64;
342const MULTIPART_MOCK_BOUNDARY: &str = "harn-boundary";
343const MAX_HTTP_SERVERS: usize = 128;
344const MAX_WEBSOCKET_SERVERS: usize = 16;
345
346thread_local! {
347    static HTTP_MOCKS: RefCell<Vec<HttpMock>> = const { RefCell::new(Vec::new()) };
348    static HTTP_MOCK_CALLS: RefCell<Vec<HttpMockCall>> = const { RefCell::new(Vec::new()) };
349    static HTTP_CLIENTS: RefCell<HashMap<String, reqwest::Client>> = RefCell::new(HashMap::new());
350    static HTTP_SESSIONS: RefCell<HashMap<String, HttpSession>> = RefCell::new(HashMap::new());
351    static HTTP_STREAMS: RefCell<HashMap<String, HttpStreamHandle>> = RefCell::new(HashMap::new());
352    static SSE_MOCKS: RefCell<Vec<SseMock>> = const { RefCell::new(Vec::new()) };
353    static SSE_HANDLES: RefCell<HashMap<String, SseHandle>> = RefCell::new(HashMap::new());
354    static SSE_SERVER_HANDLES: RefCell<HashMap<String, SseServerHandle>> = RefCell::new(HashMap::new());
355    static WEBSOCKET_MOCKS: RefCell<Vec<WebSocketMock>> = const { RefCell::new(Vec::new()) };
356    static WEBSOCKET_HANDLES: RefCell<HashMap<String, WebSocketHandle>> = RefCell::new(HashMap::new());
357    static WEBSOCKET_SERVERS: RefCell<HashMap<String, WebSocketServer>> = RefCell::new(HashMap::new());
358    static TRANSPORT_MOCK_CALLS: RefCell<Vec<TransportMockCall>> = const { RefCell::new(Vec::new()) };
359    static TRANSPORT_HANDLE_COUNTER: RefCell<u64> = const { RefCell::new(0) };
360    static HTTP_SERVERS: RefCell<HashMap<String, HttpServer>> = RefCell::new(HashMap::new());
361}
362
363/// Reset thread-local HTTP mock state. Call between test runs.
364pub fn reset_http_state() {
365    HTTP_MOCKS.with(|m| m.borrow_mut().clear());
366    HTTP_MOCK_CALLS.with(|c| c.borrow_mut().clear());
367    HTTP_CLIENTS.with(|clients| clients.borrow_mut().clear());
368    HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().clear());
369    HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
370    SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
371    SSE_HANDLES.with(|handles| {
372        for handle in handles.borrow_mut().values_mut() {
373            if let SseHandleKind::Real(stream) = &handle.kind {
374                if let Ok(mut stream) = stream.try_lock() {
375                    stream.close();
376                }
377            }
378        }
379        handles.borrow_mut().clear();
380    });
381    SSE_SERVER_HANDLES.with(|handles| handles.borrow_mut().clear());
382    WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
383    WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
384    WEBSOCKET_SERVERS.with(|servers| {
385        let mut servers = servers.borrow_mut();
386        for server in servers.values() {
387            server.running.store(false, Ordering::SeqCst);
388            let _ = TcpStream::connect(&server.addr);
389        }
390        servers.clear();
391    });
392    TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
393    TRANSPORT_HANDLE_COUNTER.with(|counter| *counter.borrow_mut() = 0);
394    HTTP_SERVERS.with(|servers| servers.borrow_mut().clear());
395}
396
397pub fn push_http_mock(
398    method: impl Into<String>,
399    url_pattern: impl Into<String>,
400    responses: Vec<HttpMockResponse>,
401) {
402    let responses = if responses.is_empty() {
403        vec![MockResponse::from(HttpMockResponse::new(200, ""))]
404    } else {
405        responses.into_iter().map(MockResponse::from).collect()
406    };
407    let method = method.into();
408    let url_pattern = url_pattern.into();
409    HTTP_MOCKS.with(|mocks| {
410        let mut mocks = mocks.borrow_mut();
411        // Re-registering the same (method, url_pattern) replaces the prior
412        // mock so tests can override per-case responses without first calling
413        // http_mock_clear(). Without this, the original mock keeps matching
414        // forever and the new one is dead.
415        mocks.retain(|mock| !(mock.method == method && mock.url_pattern == url_pattern));
416        mocks.push(HttpMock {
417            method,
418            url_pattern,
419            responses,
420            next_response: 0,
421        });
422    });
423}
424
425pub fn http_mock_calls_snapshot() -> Vec<HttpMockCallSnapshot> {
426    HTTP_MOCK_CALLS.with(|calls| {
427        calls
428            .borrow()
429            .iter()
430            .map(|call| HttpMockCallSnapshot {
431                method: call.method.clone(),
432                url: call.url.clone(),
433                headers: call
434                    .headers
435                    .iter()
436                    .map(|(key, value)| (key.clone(), value.display()))
437                    .collect(),
438                body: call.body.clone(),
439            })
440            .collect()
441    })
442}
443
444/// Check if a URL matches a mock pattern (exact or glob with `*`).
445fn url_matches(pattern: &str, url: &str) -> bool {
446    if pattern == "*" {
447        return true;
448    }
449    if !pattern.contains('*') {
450        return pattern == url;
451    }
452    // Multi-glob: split on `*` and match segments in order.
453    let parts: Vec<&str> = pattern.split('*').collect();
454    let mut remaining = url;
455    for (i, part) in parts.iter().enumerate() {
456        if part.is_empty() {
457            continue;
458        }
459        if i == 0 {
460            if !remaining.starts_with(part) {
461                return false;
462            }
463            remaining = &remaining[part.len()..];
464        } else if i == parts.len() - 1 {
465            if !remaining.ends_with(part) {
466                return false;
467            }
468            remaining = "";
469        } else {
470            match remaining.find(part) {
471                Some(pos) => remaining = &remaining[pos + part.len()..],
472                None => return false,
473            }
474        }
475    }
476    true
477}
478
479/// Build a standard HTTP response dict with status, headers, body, and ok fields.
480fn build_http_response(status: i64, headers: BTreeMap<String, VmValue>, body: String) -> VmValue {
481    let mut result = BTreeMap::new();
482    result.insert("status".to_string(), VmValue::Int(status));
483    result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
484    result.insert("body".to_string(), VmValue::String(Rc::from(body)));
485    result.insert(
486        "ok".to_string(),
487        VmValue::Bool((200..300).contains(&(status as u16))),
488    );
489    VmValue::Dict(Rc::new(result))
490}
491
492fn build_http_download_response(
493    status: i64,
494    headers: BTreeMap<String, VmValue>,
495    bytes_written: u64,
496) -> VmValue {
497    let mut result = BTreeMap::new();
498    result.insert("status".to_string(), VmValue::Int(status));
499    result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
500    result.insert(
501        "bytes_written".to_string(),
502        VmValue::Int(bytes_written as i64),
503    );
504    result.insert(
505        "ok".to_string(),
506        VmValue::Bool((200..300).contains(&(status as u16))),
507    );
508    VmValue::Dict(Rc::new(result))
509}
510
511fn response_headers(headers: &reqwest::header::HeaderMap) -> BTreeMap<String, VmValue> {
512    let mut resp_headers = BTreeMap::new();
513    for (name, value) in headers {
514        if let Ok(v) = value.to_str() {
515            resp_headers.insert(name.as_str().to_string(), VmValue::String(Rc::from(v)));
516        }
517    }
518    resp_headers
519}
520
521fn vm_error(message: impl Into<String>) -> VmError {
522    VmError::Thrown(VmValue::String(Rc::from(message.into())))
523}
524
525fn next_transport_handle(prefix: &str) -> String {
526    TRANSPORT_HANDLE_COUNTER.with(|counter| {
527        let mut counter = counter.borrow_mut();
528        *counter += 1;
529        format!("{prefix}-{}", *counter)
530    })
531}
532
533fn handle_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
534    match value {
535        VmValue::String(handle) => Ok(handle.to_string()),
536        VmValue::Dict(dict) => dict
537            .get("id")
538            .map(|id| id.display())
539            .filter(|id| !id.is_empty())
540            .ok_or_else(|| vm_error(format!("{builtin}: handle dict must contain id"))),
541        _ => Err(vm_error(format!(
542            "{builtin}: first argument must be a handle string or dict"
543        ))),
544    }
545}
546
547fn get_options_arg(args: &[VmValue], index: usize) -> BTreeMap<String, VmValue> {
548    args.get(index)
549        .and_then(|value| value.as_dict())
550        .cloned()
551        .unwrap_or_default()
552}
553
554fn merge_options(
555    base: &BTreeMap<String, VmValue>,
556    overrides: &BTreeMap<String, VmValue>,
557) -> BTreeMap<String, VmValue> {
558    let mut merged = base.clone();
559    for (key, value) in overrides {
560        merged.insert(key.clone(), value.clone());
561    }
562    merged
563}
564
565fn resolve_http_path(
566    builtin: &str,
567    path: &str,
568    access: crate::stdlib::sandbox::FsAccess,
569) -> Result<std::path::PathBuf, VmError> {
570    let resolved = crate::stdlib::process::resolve_source_relative_path(path);
571    crate::stdlib::sandbox::enforce_fs_path(builtin, &resolved, access)?;
572    Ok(resolved)
573}
574
575fn value_to_bytes(value: &VmValue) -> Vec<u8> {
576    match value {
577        VmValue::Bytes(bytes) => bytes.as_ref().clone(),
578        other => other.display().into_bytes(),
579    }
580}
581
582fn parse_multipart_field(value: &VmValue) -> Result<MultipartField, VmError> {
583    let dict = value
584        .as_dict()
585        .ok_or_else(|| vm_error("http: multipart entries must be dicts"))?;
586    let name = dict
587        .get("name")
588        .map(|value| value.display())
589        .filter(|value| !value.is_empty())
590        .ok_or_else(|| vm_error("http: multipart entry requires name"))?;
591    let content_type = dict
592        .get("content_type")
593        .or_else(|| dict.get("mime_type"))
594        .map(|value| value.display())
595        .filter(|value| !value.is_empty());
596
597    let mut filename = dict
598        .get("filename")
599        .map(|value| value.display())
600        .filter(|value| !value.is_empty());
601    let value = if let Some(path_value) = dict.get("path") {
602        let path = path_value.display();
603        let resolved = resolve_http_path(
604            "http multipart",
605            &path,
606            crate::stdlib::sandbox::FsAccess::Read,
607        )?;
608        if filename.is_none() {
609            filename = resolved
610                .file_name()
611                .and_then(|name| name.to_str())
612                .map(|name| name.to_string());
613        }
614        std::fs::read(&resolved).map_err(|error| {
615            vm_error(format!(
616                "http: failed to read multipart file {}: {error}",
617                resolved.display()
618            ))
619        })?
620    } else if let Some(base64_value) = dict.get("value_base64").or_else(|| dict.get("base64")) {
621        base64::engine::general_purpose::STANDARD
622            .decode(base64_value.display())
623            .map_err(|error| vm_error(format!("http: invalid multipart base64 value: {error}")))?
624    } else {
625        dict.get("value").map(value_to_bytes).ok_or_else(|| {
626            vm_error("http: multipart entry requires value, value_base64, or path")
627        })?
628    };
629
630    Ok(MultipartField {
631        name,
632        value,
633        filename,
634        content_type,
635    })
636}
637
638fn parse_multipart_request(
639    options: &BTreeMap<String, VmValue>,
640) -> Result<Option<MultipartRequest>, VmError> {
641    let Some(value) = options.get("multipart") else {
642        return Ok(None);
643    };
644    let VmValue::List(items) = value else {
645        return Err(vm_error("http: multipart must be a list"));
646    };
647    let parts = items
648        .iter()
649        .map(parse_multipart_field)
650        .collect::<Result<Vec<_>, _>>()?;
651    let mock_body = multipart_mock_body(&parts);
652    Ok(Some(MultipartRequest { parts, mock_body }))
653}
654
655fn multipart_mock_body(parts: &[MultipartField]) -> String {
656    let mut out = String::new();
657    for part in parts {
658        out.push_str("--");
659        out.push_str(MULTIPART_MOCK_BOUNDARY);
660        out.push_str("\r\nContent-Disposition: form-data; name=\"");
661        out.push_str(&part.name);
662        out.push('"');
663        if let Some(filename) = &part.filename {
664            out.push_str("; filename=\"");
665            out.push_str(filename);
666            out.push('"');
667        }
668        out.push_str("\r\n");
669        if let Some(content_type) = &part.content_type {
670            out.push_str("Content-Type: ");
671            out.push_str(content_type);
672            out.push_str("\r\n");
673        }
674        out.push_str("\r\n");
675        out.push_str(&String::from_utf8_lossy(&part.value));
676        out.push_str("\r\n");
677    }
678    out.push_str("--");
679    out.push_str(MULTIPART_MOCK_BOUNDARY);
680    out.push_str("--\r\n");
681    out
682}
683
684fn multipart_form(request: &MultipartRequest) -> Result<reqwest::multipart::Form, VmError> {
685    let mut form = reqwest::multipart::Form::new();
686    for field in &request.parts {
687        let mut part = reqwest::multipart::Part::bytes(field.value.clone());
688        if let Some(filename) = &field.filename {
689            part = part.file_name(filename.clone());
690        }
691        if let Some(content_type) = &field.content_type {
692            part = part.mime_str(content_type).map_err(|error| {
693                vm_error(format!("http: invalid multipart content_type: {error}"))
694            })?;
695        }
696        form = form.part(field.name.clone(), part);
697    }
698    Ok(form)
699}
700
701fn transport_limit_option(options: &BTreeMap<String, VmValue>, key: &str, default: usize) -> usize {
702    options
703        .get(key)
704        .and_then(|value| value.as_int())
705        .map(|value| value.max(0) as usize)
706        .unwrap_or(default)
707}
708
709fn receive_timeout_arg(args: &[VmValue], index: usize) -> u64 {
710    match args.get(index) {
711        Some(VmValue::Duration(ms)) => (*ms).max(0) as u64,
712        Some(value) => value
713            .as_int()
714            .map(|ms| ms.max(0) as u64)
715            .unwrap_or(DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS),
716        None => DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS,
717    }
718}
719
720fn timeout_event() -> VmValue {
721    let mut dict = BTreeMap::new();
722    dict.insert("type".to_string(), VmValue::String(Rc::from("timeout")));
723    VmValue::Dict(Rc::new(dict))
724}
725
726fn closed_event() -> VmValue {
727    let mut dict = BTreeMap::new();
728    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
729    VmValue::Dict(Rc::new(dict))
730}
731
732fn sse_server_closed_event() -> VmValue {
733    let mut dict = BTreeMap::new();
734    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
735    dict.insert("server_closed".to_string(), VmValue::Bool(true));
736    VmValue::Dict(Rc::new(dict))
737}
738
739fn record_transport_call(call: TransportMockCall) {
740    TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().push(call));
741}
742
743/// Extract URL, validate it, and pull an options dict from `args`.
744/// For methods with a body (POST/PUT/PATCH), the body is at index 1 and
745/// options at index 2; for methods without (GET/DELETE), options are at index 1.
746async fn http_verb_handler(
747    method: &str,
748    has_body: bool,
749    args: Vec<VmValue>,
750) -> Result<VmValue, VmError> {
751    let url = args.first().map(|a| a.display()).unwrap_or_default();
752    if url.is_empty() {
753        return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
754            "http_{}: URL is required",
755            method.to_ascii_lowercase()
756        )))));
757    }
758    let mut options = if has_body {
759        match (args.get(1), args.get(2)) {
760            (Some(VmValue::Dict(d)), None) => (**d).clone(),
761            (_, Some(VmValue::Dict(d))) => (**d).clone(),
762            _ => BTreeMap::new(),
763        }
764    } else {
765        match args.get(1) {
766            Some(VmValue::Dict(d)) => (**d).clone(),
767            _ => BTreeMap::new(),
768        }
769    };
770    if has_body && !(matches!(args.get(1), Some(VmValue::Dict(_))) && args.get(2).is_none()) {
771        let body = args.get(1).map(|a| a.display()).unwrap_or_default();
772        options.insert("body".to_string(), VmValue::String(Rc::from(body)));
773    }
774    vm_execute_http_request(method, &url, &options).await
775}
776
777fn parse_mock_response_dict(response: &BTreeMap<String, VmValue>) -> MockResponse {
778    let status = response
779        .get("status")
780        .and_then(|v| v.as_int())
781        .unwrap_or(200);
782    let body = response
783        .get("body")
784        .map(|v| v.display())
785        .unwrap_or_default();
786    let headers = response
787        .get("headers")
788        .and_then(|v| v.as_dict())
789        .cloned()
790        .unwrap_or_default();
791    MockResponse {
792        status,
793        body,
794        headers,
795    }
796}
797
798fn parse_mock_responses(response: &BTreeMap<String, VmValue>) -> Vec<MockResponse> {
799    let scripted = response
800        .get("responses")
801        .and_then(|value| match value {
802            VmValue::List(items) => Some(
803                items
804                    .iter()
805                    .filter_map(|item| item.as_dict().map(parse_mock_response_dict))
806                    .collect::<Vec<_>>(),
807            ),
808            _ => None,
809        })
810        .unwrap_or_default();
811
812    if scripted.is_empty() {
813        vec![parse_mock_response_dict(response)]
814    } else {
815        scripted
816    }
817}
818
819fn consume_http_mock(
820    method: &str,
821    url: &str,
822    headers: BTreeMap<String, VmValue>,
823    body: Option<String>,
824) -> Option<MockResponse> {
825    let response = HTTP_MOCKS.with(|mocks| {
826        let mut mocks = mocks.borrow_mut();
827        for mock in mocks.iter_mut() {
828            if (mock.method == "*" || mock.method.eq_ignore_ascii_case(method))
829                && url_matches(&mock.url_pattern, url)
830            {
831                let Some(last_index) = mock.responses.len().checked_sub(1) else {
832                    continue;
833                };
834                let index = mock.next_response.min(last_index);
835                let response = mock.responses[index].clone();
836                if mock.next_response < last_index {
837                    mock.next_response += 1;
838                }
839                return Some(response);
840            }
841        }
842        None
843    })?;
844
845    HTTP_MOCK_CALLS.with(|calls| {
846        calls.borrow_mut().push(HttpMockCall {
847            method: method.to_string(),
848            url: url.to_string(),
849            headers,
850            body,
851        });
852    });
853
854    Some(response)
855}
856
857fn vm_string(value: impl AsRef<str>) -> VmValue {
858    VmValue::String(Rc::from(value.as_ref()))
859}
860
861fn dict_value(entries: BTreeMap<String, VmValue>) -> VmValue {
862    VmValue::Dict(Rc::new(entries))
863}
864
865fn get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
866    match options.get(key) {
867        Some(VmValue::Bool(value)) => *value,
868        _ => default,
869    }
870}
871
872fn get_usize_option(
873    options: &BTreeMap<String, VmValue>,
874    key: &str,
875    default: usize,
876) -> Result<usize, VmError> {
877    match options.get(key).and_then(VmValue::as_int) {
878        Some(value) if value >= 0 => Ok(value as usize),
879        Some(_) => Err(vm_error(format!("http_server: {key} must be non-negative"))),
880        None => Ok(default),
881    }
882}
883
884fn get_optional_usize_option(
885    options: &BTreeMap<String, VmValue>,
886    key: &str,
887) -> Result<Option<usize>, VmError> {
888    match options.get(key).and_then(VmValue::as_int) {
889        Some(value) if value >= 0 => Ok(Some(value as usize)),
890        Some(_) => Err(vm_error(format!(
891            "http_server_route: {key} must be non-negative"
892        ))),
893        None => Ok(None),
894    }
895}
896
897fn server_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
898    handle_from_value(value, builtin)
899}
900
901fn closure_arg(args: &[VmValue], index: usize, builtin: &str) -> Result<Rc<VmClosure>, VmError> {
902    match args.get(index) {
903        Some(VmValue::Closure(closure)) => Ok(closure.clone()),
904        Some(other) => Err(vm_error(format!(
905            "{builtin}: argument {} must be a closure, got {}",
906            index + 1,
907            other.type_name()
908        ))),
909        None => Err(vm_error(format!(
910            "{builtin}: missing closure argument {}",
911            index + 1
912        ))),
913    }
914}
915
916fn http_server_handle_value(id: &str) -> VmValue {
917    let mut dict = BTreeMap::new();
918    dict.insert("id".to_string(), vm_string(id));
919    dict.insert("kind".to_string(), vm_string("http_server"));
920    dict_value(dict)
921}
922
923fn header_lookup_value(headers: &BTreeMap<String, VmValue>, name: &str) -> VmValue {
924    headers
925        .iter()
926        .find(|(candidate, _)| candidate.eq_ignore_ascii_case(name))
927        .map(|(_, value)| value.clone())
928        .unwrap_or(VmValue::Nil)
929}
930
931fn headers_from_value(value: &VmValue) -> BTreeMap<String, VmValue> {
932    match value {
933        VmValue::Dict(dict) => dict
934            .get("headers")
935            .and_then(VmValue::as_dict)
936            .map(|headers| {
937                headers
938                    .iter()
939                    .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
940                    .collect()
941            })
942            .unwrap_or_else(|| {
943                dict.iter()
944                    .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
945                    .collect()
946            }),
947        _ => BTreeMap::new(),
948    }
949}
950
951fn normalize_headers(value: Option<&VmValue>) -> BTreeMap<String, VmValue> {
952    match value.and_then(VmValue::as_dict) {
953        Some(headers) => headers
954            .iter()
955            .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
956            .collect(),
957        None => BTreeMap::new(),
958    }
959}
960
961fn percent_decode(input: &str) -> String {
962    let bytes = input.as_bytes();
963    let mut out = Vec::with_capacity(bytes.len());
964    let mut i = 0;
965    while i < bytes.len() {
966        if bytes[i] == b'+' {
967            out.push(b' ');
968            i += 1;
969            continue;
970        }
971        if bytes[i] == b'%' && i + 2 < bytes.len() {
972            if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
973                out.push((hi << 4) | lo);
974                i += 3;
975                continue;
976            }
977        }
978        out.push(bytes[i]);
979        i += 1;
980    }
981    String::from_utf8_lossy(&out).into_owned()
982}
983
984fn hex_val(byte: u8) -> Option<u8> {
985    match byte {
986        b'0'..=b'9' => Some(byte - b'0'),
987        b'a'..=b'f' => Some(byte - b'a' + 10),
988        b'A'..=b'F' => Some(byte - b'A' + 10),
989        _ => None,
990    }
991}
992
993fn split_path_and_query(raw_path: &str) -> (String, BTreeMap<String, VmValue>) {
994    let (path, query) = raw_path.split_once('?').unwrap_or((raw_path, ""));
995    let mut query_map = BTreeMap::new();
996    for pair in query.split('&').filter(|part| !part.is_empty()) {
997        let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
998        query_map.insert(percent_decode(key), vm_string(percent_decode(value)));
999    }
1000    (
1001        if path.is_empty() { "/" } else { path }.to_string(),
1002        query_map,
1003    )
1004}
1005
1006fn request_body_bytes(input: &BTreeMap<String, VmValue>) -> Vec<u8> {
1007    match input.get("raw_body").or_else(|| input.get("body")) {
1008        Some(VmValue::Bytes(bytes)) => bytes.as_ref().clone(),
1009        Some(value) => value.display().into_bytes(),
1010        None => Vec::new(),
1011    }
1012}
1013
1014fn request_value(
1015    method: &str,
1016    path: &str,
1017    path_params: BTreeMap<String, VmValue>,
1018    mut query: BTreeMap<String, VmValue>,
1019    input: &BTreeMap<String, VmValue>,
1020    body_bytes: &[u8],
1021    retain_raw_body: bool,
1022) -> VmValue {
1023    if let Some(explicit_query) = input.get("query").and_then(VmValue::as_dict) {
1024        query.extend(
1025            explicit_query
1026                .iter()
1027                .map(|(key, value)| (key.clone(), value.clone())),
1028        );
1029    }
1030
1031    let headers = normalize_headers(input.get("headers"));
1032    let body = String::from_utf8_lossy(body_bytes).into_owned();
1033    let mut request = BTreeMap::new();
1034    request.insert("method".to_string(), vm_string(method));
1035    request.insert("path".to_string(), vm_string(path));
1036    let path_params = dict_value(path_params);
1037    request.insert("path_params".to_string(), path_params.clone());
1038    request.insert("params".to_string(), path_params);
1039    request.insert("query".to_string(), dict_value(query));
1040    request.insert("headers".to_string(), dict_value(headers));
1041    request.insert("body".to_string(), vm_string(body));
1042    request.insert(
1043        "raw_body".to_string(),
1044        if retain_raw_body {
1045            VmValue::Bytes(Rc::new(body_bytes.to_vec()))
1046        } else {
1047            VmValue::Nil
1048        },
1049    );
1050    request.insert(
1051        "body_bytes".to_string(),
1052        VmValue::Int(body_bytes.len() as i64),
1053    );
1054    request.insert(
1055        "remote_addr".to_string(),
1056        input
1057            .get("remote_addr")
1058            .or_else(|| input.get("remote"))
1059            .map(|value| vm_string(value.display()))
1060            .unwrap_or(VmValue::Nil),
1061    );
1062    request.insert(
1063        "client_ip".to_string(),
1064        input
1065            .get("client_ip")
1066            .or_else(|| input.get("remote_ip"))
1067            .or_else(|| input.get("ip"))
1068            .map(|value| vm_string(value.display()))
1069            .unwrap_or(VmValue::Nil),
1070    );
1071    dict_value(request)
1072}
1073
1074fn normalize_status(status: i64) -> i64 {
1075    if (100..=999).contains(&status) {
1076        status
1077    } else {
1078        500
1079    }
1080}
1081
1082fn response_with_kind(
1083    status: i64,
1084    mut headers: BTreeMap<String, VmValue>,
1085    body: VmValue,
1086    body_kind: &str,
1087) -> VmValue {
1088    let status = normalize_status(status);
1089    let mut response = BTreeMap::new();
1090    if body_kind == "json" && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
1091    {
1092        headers.insert(
1093            "content-type".to_string(),
1094            vm_string("application/json; charset=utf-8"),
1095        );
1096    } else if body_kind == "text"
1097        && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
1098    {
1099        headers.insert(
1100            "content-type".to_string(),
1101            vm_string("text/plain; charset=utf-8"),
1102        );
1103    }
1104    response.insert("status".to_string(), VmValue::Int(status));
1105    response.insert("headers".to_string(), dict_value(headers));
1106    response.insert(
1107        "ok".to_string(),
1108        VmValue::Bool((200..300).contains(&status)),
1109    );
1110    response.insert("body_kind".to_string(), vm_string(body_kind));
1111    match body {
1112        VmValue::Bytes(bytes) => {
1113            response.insert(
1114                "body".to_string(),
1115                vm_string(String::from_utf8_lossy(&bytes)),
1116            );
1117            response.insert("raw_body".to_string(), VmValue::Bytes(bytes));
1118        }
1119        other => {
1120            response.insert("body".to_string(), vm_string(other.display()));
1121            response.insert(
1122                "raw_body".to_string(),
1123                VmValue::Bytes(Rc::new(other.display().into_bytes())),
1124            );
1125        }
1126    }
1127    dict_value(response)
1128}
1129
1130fn normalize_response(value: VmValue) -> VmValue {
1131    match value {
1132        VmValue::Dict(dict) if dict.contains_key("status") => {
1133            let status = dict.get("status").and_then(VmValue::as_int).unwrap_or(200);
1134            let headers = dict
1135                .get("headers")
1136                .and_then(VmValue::as_dict)
1137                .cloned()
1138                .unwrap_or_default();
1139            let body_kind = dict
1140                .get("body_kind")
1141                .or_else(|| dict.get("kind"))
1142                .map(|value| value.display())
1143                .unwrap_or_else(|| "text".to_string());
1144            let body = dict
1145                .get("raw_body")
1146                .filter(|value| matches!(value, VmValue::Bytes(_)))
1147                .or_else(|| dict.get("body"))
1148                .cloned()
1149                .unwrap_or(VmValue::Nil);
1150            response_with_kind(status, headers, body, &body_kind)
1151        }
1152        VmValue::Nil => response_with_kind(204, BTreeMap::new(), VmValue::Nil, "text"),
1153        other => response_with_kind(200, BTreeMap::new(), other, "text"),
1154    }
1155}
1156
1157fn body_limit_response(limit: usize, actual: usize) -> VmValue {
1158    let mut headers = BTreeMap::new();
1159    headers.insert(
1160        "content-type".to_string(),
1161        vm_string("text/plain; charset=utf-8"),
1162    );
1163    headers.insert("connection".to_string(), vm_string("close"));
1164    headers.insert(
1165        "x-harn-body-limit".to_string(),
1166        vm_string(limit.to_string()),
1167    );
1168    response_with_kind(
1169        413,
1170        headers,
1171        vm_string(format!("request body too large: {actual} > {limit} bytes")),
1172        "text",
1173    )
1174}
1175
1176fn not_found_response(method: &str, path: &str) -> VmValue {
1177    response_with_kind(
1178        404,
1179        BTreeMap::new(),
1180        vm_string(format!("no route for {method} {path}")),
1181        "text",
1182    )
1183}
1184
1185fn unavailable_response(message: &str) -> VmValue {
1186    response_with_kind(503, BTreeMap::new(), vm_string(message), "text")
1187}
1188
1189fn route_template_match(template: &str, path: &str) -> Option<BTreeMap<String, VmValue>> {
1190    let template_segments: Vec<&str> = template.trim_matches('/').split('/').collect();
1191    let path_segments: Vec<&str> = path.trim_matches('/').split('/').collect();
1192    if template == "/" && path == "/" {
1193        return Some(BTreeMap::new());
1194    }
1195    if template_segments.len() != path_segments.len() {
1196        return None;
1197    }
1198    let mut params = BTreeMap::new();
1199    for (tmpl, actual) in template_segments.iter().zip(path_segments.iter()) {
1200        if tmpl.starts_with('{') && tmpl.ends_with('}') && tmpl.len() > 2 {
1201            params.insert(
1202                tmpl[1..tmpl.len() - 1].to_string(),
1203                vm_string(percent_decode(actual)),
1204            );
1205        } else if tmpl.starts_with(':') && tmpl.len() > 1 {
1206            params.insert(tmpl[1..].to_string(), vm_string(percent_decode(actual)));
1207        } else if tmpl != actual {
1208            return None;
1209        }
1210    }
1211    Some(params)
1212}
1213
1214fn matching_route(
1215    server: &HttpServer,
1216    method: &str,
1217    path: &str,
1218) -> Option<(HttpServerRoute, BTreeMap<String, VmValue>)> {
1219    server.routes.iter().find_map(|route| {
1220        if route.method != "*" && !route.method.eq_ignore_ascii_case(method) {
1221            return None;
1222        }
1223        route_template_match(&route.template, path).map(|params| (route.clone(), params))
1224    })
1225}
1226
1227async fn call_server_closure(
1228    closure: &Rc<VmClosure>,
1229    args: &[VmValue],
1230    builtin: &str,
1231) -> Result<VmValue, VmError> {
1232    let mut vm = crate::vm::clone_async_builtin_child_vm()
1233        .ok_or_else(|| vm_error(format!("{builtin}: requires an async builtin VM context")))?;
1234    vm.call_closure_pub(closure, args).await
1235}
1236
1237fn value_is_response(value: &VmValue) -> bool {
1238    matches!(value, VmValue::Dict(dict) if dict.contains_key("status"))
1239}
1240
1241async fn run_http_server_request(server_id: &str, request: VmValue) -> Result<VmValue, VmError> {
1242    let server = HTTP_SERVERS.with(|servers| servers.borrow().get(server_id).cloned());
1243    let Some(server) = server else {
1244        return Err(vm_error(format!(
1245            "http_server_request: unknown server handle '{server_id}'"
1246        )));
1247    };
1248    if server.shutdown {
1249        return Ok(unavailable_response("server is shut down"));
1250    }
1251    if !server.ready {
1252        return Ok(unavailable_response("server is not ready"));
1253    }
1254    if let Some(readiness) = &server.readiness {
1255        let ready = call_server_closure(
1256            readiness,
1257            &[http_server_handle_value(server_id)],
1258            "http_server_request",
1259        )
1260        .await?;
1261        if !ready.is_truthy() {
1262            return Ok(unavailable_response("server is not ready"));
1263        }
1264    }
1265
1266    let input = request.as_dict().cloned().unwrap_or_default();
1267    let method = input
1268        .get("method")
1269        .map(|value| value.display())
1270        .filter(|value| !value.is_empty())
1271        .unwrap_or_else(|| "GET".to_string())
1272        .to_ascii_uppercase();
1273    let raw_path = input
1274        .get("path")
1275        .map(|value| value.display())
1276        .filter(|value| !value.is_empty())
1277        .unwrap_or_else(|| "/".to_string());
1278    let (path, query) = split_path_and_query(&raw_path);
1279    let body_bytes = request_body_bytes(&input);
1280
1281    let Some((route, path_params)) = matching_route(&server, &method, &path) else {
1282        return Ok(not_found_response(&method, &path));
1283    };
1284
1285    let limit = route.max_body_bytes.unwrap_or(server.max_body_bytes);
1286    if body_bytes.len() > limit {
1287        return Ok(body_limit_response(limit, body_bytes.len()));
1288    }
1289    let retain_raw_body = route.retain_raw_body.unwrap_or(server.retain_raw_body);
1290    let mut req = request_value(
1291        &method,
1292        &path,
1293        path_params,
1294        query,
1295        &input,
1296        &body_bytes,
1297        retain_raw_body,
1298    );
1299
1300    for before in &server.before {
1301        let result = call_server_closure(before, &[req.clone()], "http_server_request").await?;
1302        if value_is_response(&result) {
1303            return Ok(normalize_response(result));
1304        }
1305        if !matches!(result, VmValue::Nil) {
1306            req = result;
1307        }
1308    }
1309
1310    let handler_result =
1311        call_server_closure(&route.handler, &[req.clone()], "http_server_request").await?;
1312    let mut response = normalize_response(handler_result);
1313
1314    for after in &server.after {
1315        let result = call_server_closure(
1316            after,
1317            &[response.clone(), req.clone()],
1318            "http_server_request",
1319        )
1320        .await?;
1321        if !matches!(result, VmValue::Nil) {
1322            response = normalize_response(result);
1323        }
1324    }
1325
1326    Ok(response)
1327}
1328
1329/// Register HTTP builtins on a VM.
1330pub fn register_http_builtins(vm: &mut Vm) {
1331    vm.register_builtin("http_server_tls_plain", |_args, _out| {
1332        Ok(http_server_tls_config_value(
1333            "plain",
1334            false,
1335            "http",
1336            false,
1337            BTreeMap::new(),
1338        ))
1339    });
1340    vm.register_builtin("http_server_tls_edge", |args, _out| {
1341        let options = get_options_arg(args, 0);
1342        Ok(http_server_tls_config_value(
1343            "edge",
1344            false,
1345            "https",
1346            vm_get_bool_option(&options, "hsts", true),
1347            hsts_options(&options),
1348        ))
1349    });
1350    vm.register_builtin("http_server_tls_pem", |args, _out| {
1351        if args.len() < 2 {
1352            return Err(vm_error(
1353                "http_server_tls_pem: requires cert path and key path",
1354            ));
1355        }
1356        let cert_path = args[0].display();
1357        let key_path = args[1].display();
1358        if !std::path::Path::new(&cert_path).is_file() {
1359            return Err(vm_error(format!(
1360                "http_server_tls_pem: certificate not found: {cert_path}"
1361            )));
1362        }
1363        if !std::path::Path::new(&key_path).is_file() {
1364            return Err(vm_error(format!(
1365                "http_server_tls_pem: private key not found: {key_path}"
1366            )));
1367        }
1368        let mut extra = BTreeMap::new();
1369        extra.insert(
1370            "cert_path".to_string(),
1371            VmValue::String(Rc::from(cert_path)),
1372        );
1373        extra.insert("key_path".to_string(), VmValue::String(Rc::from(key_path)));
1374        Ok(http_server_tls_config_value(
1375            "pem", true, "https", true, extra,
1376        ))
1377    });
1378    vm.register_builtin("http_server_tls_self_signed_dev", |args, _out| {
1379        let hosts = tls_hosts_arg(args.first())?;
1380        let cert = rcgen::generate_simple_self_signed(hosts.clone()).map_err(|error| {
1381            vm_error(format!(
1382                "http_server_tls_self_signed_dev: failed to generate certificate: {error}"
1383            ))
1384        })?;
1385        let mut extra = BTreeMap::new();
1386        extra.insert(
1387            "hosts".to_string(),
1388            VmValue::List(Rc::new(
1389                hosts
1390                    .into_iter()
1391                    .map(|host| VmValue::String(Rc::from(host)))
1392                    .collect(),
1393            )),
1394        );
1395        extra.insert(
1396            "cert_pem".to_string(),
1397            VmValue::String(Rc::from(cert.cert.pem())),
1398        );
1399        extra.insert(
1400            "key_pem".to_string(),
1401            VmValue::String(Rc::from(cert.key_pair.serialize_pem())),
1402        );
1403        Ok(http_server_tls_config_value(
1404            "self_signed_dev",
1405            true,
1406            "https",
1407            false,
1408            extra,
1409        ))
1410    });
1411    vm.register_builtin("http_server_security_headers", |args, _out| {
1412        let Some(VmValue::Dict(config)) = args.first() else {
1413            return Err(vm_error(
1414                "http_server_security_headers: requires a TLS config dict",
1415            ));
1416        };
1417        Ok(VmValue::Dict(Rc::new(http_server_security_headers(config))))
1418    });
1419
1420    vm.register_async_builtin("http_get", |args| async move {
1421        http_verb_handler("GET", false, args).await
1422    });
1423    vm.register_async_builtin("http_post", |args| async move {
1424        http_verb_handler("POST", true, args).await
1425    });
1426    vm.register_async_builtin("http_put", |args| async move {
1427        http_verb_handler("PUT", true, args).await
1428    });
1429    vm.register_async_builtin("http_patch", |args| async move {
1430        http_verb_handler("PATCH", true, args).await
1431    });
1432    vm.register_async_builtin("http_delete", |args| async move {
1433        http_verb_handler("DELETE", false, args).await
1434    });
1435
1436    // --- Inbound HTTP server primitives ---
1437
1438    vm.register_builtin("http_server", |args, _out| {
1439        let options = get_options_arg(args, 0);
1440        let server = HttpServer {
1441            routes: Vec::new(),
1442            before: Vec::new(),
1443            after: Vec::new(),
1444            ready: get_bool_option(&options, "ready", true),
1445            readiness: None,
1446            shutdown_hooks: Vec::new(),
1447            shutdown: false,
1448            max_body_bytes: get_usize_option(
1449                &options,
1450                "max_body_bytes",
1451                DEFAULT_SERVER_MAX_BODY_BYTES,
1452            )?,
1453            retain_raw_body: get_bool_option(&options, "retain_raw_body", true),
1454        };
1455        let id = next_transport_handle("http-server");
1456        HTTP_SERVERS.with(|servers| {
1457            let mut servers = servers.borrow_mut();
1458            if servers.len() >= MAX_HTTP_SERVERS {
1459                return Err(vm_error(format!(
1460                    "http_server: maximum open servers ({MAX_HTTP_SERVERS}) reached"
1461                )));
1462            }
1463            servers.insert(id.clone(), server);
1464            Ok(())
1465        })?;
1466        Ok(http_server_handle_value(&id))
1467    });
1468
1469    vm.register_builtin("http_server_route", |args, _out| {
1470        if args.len() < 4 {
1471            return Err(vm_error(
1472                "http_server_route: requires server, method, path template, and handler",
1473            ));
1474        }
1475        let server_id = server_from_value(&args[0], "http_server_route")?;
1476        let method = args[1].display().to_ascii_uppercase();
1477        if method.is_empty() {
1478            return Err(vm_error("http_server_route: method is required"));
1479        }
1480        let template = args[2].display();
1481        if !template.starts_with('/') {
1482            return Err(vm_error(
1483                "http_server_route: path template must start with '/'",
1484            ));
1485        }
1486        let handler = closure_arg(args, 3, "http_server_route")?;
1487        let options = get_options_arg(args, 4);
1488        let route = HttpServerRoute {
1489            method,
1490            template,
1491            handler,
1492            max_body_bytes: get_optional_usize_option(&options, "max_body_bytes")?,
1493            retain_raw_body: match options.get("retain_raw_body") {
1494                Some(VmValue::Bool(value)) => Some(*value),
1495                _ => None,
1496            },
1497        };
1498        HTTP_SERVERS.with(|servers| {
1499            let mut servers = servers.borrow_mut();
1500            let server = servers.get_mut(&server_id).ok_or_else(|| {
1501                vm_error(format!("http_server_route: unknown server '{server_id}'"))
1502            })?;
1503            server.routes.push(route);
1504            Ok::<_, VmError>(())
1505        })?;
1506        Ok(http_server_handle_value(&server_id))
1507    });
1508
1509    vm.register_builtin("http_server_before", |args, _out| {
1510        if args.len() < 2 {
1511            return Err(vm_error("http_server_before: requires server and handler"));
1512        }
1513        let server_id = server_from_value(&args[0], "http_server_before")?;
1514        let handler = closure_arg(args, 1, "http_server_before")?;
1515        HTTP_SERVERS.with(|servers| {
1516            let mut servers = servers.borrow_mut();
1517            let server = servers.get_mut(&server_id).ok_or_else(|| {
1518                vm_error(format!("http_server_before: unknown server '{server_id}'"))
1519            })?;
1520            server.before.push(handler);
1521            Ok::<_, VmError>(())
1522        })?;
1523        Ok(http_server_handle_value(&server_id))
1524    });
1525
1526    vm.register_builtin("http_server_after", |args, _out| {
1527        if args.len() < 2 {
1528            return Err(vm_error("http_server_after: requires server and handler"));
1529        }
1530        let server_id = server_from_value(&args[0], "http_server_after")?;
1531        let handler = closure_arg(args, 1, "http_server_after")?;
1532        HTTP_SERVERS.with(|servers| {
1533            let mut servers = servers.borrow_mut();
1534            let server = servers.get_mut(&server_id).ok_or_else(|| {
1535                vm_error(format!("http_server_after: unknown server '{server_id}'"))
1536            })?;
1537            server.after.push(handler);
1538            Ok::<_, VmError>(())
1539        })?;
1540        Ok(http_server_handle_value(&server_id))
1541    });
1542
1543    vm.register_async_builtin("http_server_request", |args| async move {
1544        if args.len() < 2 {
1545            return Err(vm_error("http_server_request: requires server and request"));
1546        }
1547        let server_id = server_from_value(&args[0], "http_server_request")?;
1548        run_http_server_request(&server_id, args[1].clone()).await
1549    });
1550
1551    vm.register_async_builtin("http_server_test", |args| async move {
1552        if args.len() < 2 {
1553            return Err(vm_error("http_server_test: requires server and request"));
1554        }
1555        let server_id = server_from_value(&args[0], "http_server_test")?;
1556        run_http_server_request(&server_id, args[1].clone()).await
1557    });
1558
1559    vm.register_builtin("http_server_set_ready", |args, _out| {
1560        if args.len() < 2 {
1561            return Err(vm_error(
1562                "http_server_set_ready: requires server and ready bool",
1563            ));
1564        }
1565        let server_id = server_from_value(&args[0], "http_server_set_ready")?;
1566        let ready = matches!(args[1], VmValue::Bool(true));
1567        HTTP_SERVERS.with(|servers| {
1568            let mut servers = servers.borrow_mut();
1569            let server = servers.get_mut(&server_id).ok_or_else(|| {
1570                vm_error(format!(
1571                    "http_server_set_ready: unknown server '{server_id}'"
1572                ))
1573            })?;
1574            server.ready = ready;
1575            Ok::<_, VmError>(())
1576        })?;
1577        Ok(VmValue::Bool(ready))
1578    });
1579
1580    vm.register_builtin("http_server_readiness", |args, _out| {
1581        if args.len() < 2 {
1582            return Err(vm_error(
1583                "http_server_readiness: requires server and readiness closure",
1584            ));
1585        }
1586        let server_id = server_from_value(&args[0], "http_server_readiness")?;
1587        let handler = closure_arg(args, 1, "http_server_readiness")?;
1588        HTTP_SERVERS.with(|servers| {
1589            let mut servers = servers.borrow_mut();
1590            let server = servers.get_mut(&server_id).ok_or_else(|| {
1591                vm_error(format!(
1592                    "http_server_readiness: unknown server '{server_id}'"
1593                ))
1594            })?;
1595            server.readiness = Some(handler);
1596            Ok::<_, VmError>(())
1597        })?;
1598        Ok(http_server_handle_value(&server_id))
1599    });
1600
1601    vm.register_async_builtin("http_server_ready", |args| async move {
1602        let Some(server_arg) = args.first() else {
1603            return Err(vm_error("http_server_ready: requires server"));
1604        };
1605        let server_id = server_from_value(server_arg, "http_server_ready")?;
1606        let server = HTTP_SERVERS.with(|servers| servers.borrow().get(&server_id).cloned());
1607        let Some(server) = server else {
1608            return Err(vm_error(format!(
1609                "http_server_ready: unknown server '{server_id}'"
1610            )));
1611        };
1612        if server.shutdown {
1613            return Ok(VmValue::Bool(false));
1614        }
1615        let Some(readiness) = server.readiness else {
1616            return Ok(VmValue::Bool(server.ready));
1617        };
1618        let result = call_server_closure(
1619            &readiness,
1620            &[http_server_handle_value(&server_id)],
1621            "http_server_ready",
1622        )
1623        .await?;
1624        Ok(VmValue::Bool(result.is_truthy()))
1625    });
1626
1627    vm.register_builtin("http_server_on_shutdown", |args, _out| {
1628        if args.len() < 2 {
1629            return Err(vm_error(
1630                "http_server_on_shutdown: requires server and handler",
1631            ));
1632        }
1633        let server_id = server_from_value(&args[0], "http_server_on_shutdown")?;
1634        let handler = closure_arg(args, 1, "http_server_on_shutdown")?;
1635        HTTP_SERVERS.with(|servers| {
1636            let mut servers = servers.borrow_mut();
1637            let server = servers.get_mut(&server_id).ok_or_else(|| {
1638                vm_error(format!(
1639                    "http_server_on_shutdown: unknown server '{server_id}'"
1640                ))
1641            })?;
1642            server.shutdown_hooks.push(handler);
1643            Ok::<_, VmError>(())
1644        })?;
1645        Ok(http_server_handle_value(&server_id))
1646    });
1647
1648    vm.register_async_builtin("http_server_shutdown", |args| async move {
1649        let Some(server_arg) = args.first() else {
1650            return Err(vm_error("http_server_shutdown: requires server"));
1651        };
1652        let server_id = server_from_value(server_arg, "http_server_shutdown")?;
1653        let hooks = HTTP_SERVERS.with(|servers| {
1654            let mut servers = servers.borrow_mut();
1655            let server = servers.get_mut(&server_id).ok_or_else(|| {
1656                vm_error(format!(
1657                    "http_server_shutdown: unknown server '{server_id}'"
1658                ))
1659            })?;
1660            server.shutdown = true;
1661            Ok::<_, VmError>(server.shutdown_hooks.clone())
1662        })?;
1663        for hook in hooks {
1664            let _ = call_server_closure(
1665                &hook,
1666                &[http_server_handle_value(&server_id)],
1667                "http_server_shutdown",
1668            )
1669            .await?;
1670        }
1671        Ok(VmValue::Bool(true))
1672    });
1673
1674    vm.register_builtin("http_response", |args, _out| {
1675        let status = args.first().and_then(VmValue::as_int).unwrap_or(200);
1676        let body = args.get(1).cloned().unwrap_or(VmValue::Nil);
1677        let headers = args
1678            .get(2)
1679            .and_then(VmValue::as_dict)
1680            .cloned()
1681            .unwrap_or_default();
1682        Ok(response_with_kind(status, headers, body, "text"))
1683    });
1684
1685    vm.register_builtin("http_response_text", |args, _out| {
1686        let body = args.first().cloned().unwrap_or(VmValue::Nil);
1687        let options = get_options_arg(args, 1);
1688        let status = options
1689            .get("status")
1690            .and_then(VmValue::as_int)
1691            .unwrap_or(200);
1692        let headers = options
1693            .get("headers")
1694            .and_then(VmValue::as_dict)
1695            .cloned()
1696            .unwrap_or_default();
1697        Ok(response_with_kind(status, headers, body, "text"))
1698    });
1699
1700    vm.register_builtin("http_response_json", |args, _out| {
1701        let body = args
1702            .first()
1703            .map(crate::stdlib::json::vm_value_to_json)
1704            .map(vm_string)
1705            .unwrap_or_else(|| vm_string("null"));
1706        let options = get_options_arg(args, 1);
1707        let status = options
1708            .get("status")
1709            .and_then(VmValue::as_int)
1710            .unwrap_or(200);
1711        let headers = options
1712            .get("headers")
1713            .and_then(VmValue::as_dict)
1714            .cloned()
1715            .unwrap_or_default();
1716        Ok(response_with_kind(status, headers, body, "json"))
1717    });
1718
1719    vm.register_builtin("http_response_bytes", |args, _out| {
1720        let body = match args.first() {
1721            Some(VmValue::Bytes(bytes)) => VmValue::Bytes(bytes.clone()),
1722            Some(value) => VmValue::Bytes(Rc::new(value.display().into_bytes())),
1723            None => VmValue::Bytes(Rc::new(Vec::new())),
1724        };
1725        let options = get_options_arg(args, 1);
1726        let status = options
1727            .get("status")
1728            .and_then(VmValue::as_int)
1729            .unwrap_or(200);
1730        let headers = options
1731            .get("headers")
1732            .and_then(VmValue::as_dict)
1733            .cloned()
1734            .unwrap_or_default();
1735        Ok(response_with_kind(status, headers, body, "bytes"))
1736    });
1737
1738    vm.register_builtin("http_header", |args, _out| {
1739        if args.len() < 2 {
1740            return Err(vm_error(
1741                "http_header: requires headers/request/response and name",
1742            ));
1743        }
1744        let headers = headers_from_value(&args[0]);
1745        Ok(header_lookup_value(&headers, &args[1].display()))
1746    });
1747
1748    // --- Mock HTTP builtins ---
1749
1750    // http_mock(method, url_pattern, response) -> nil
1751    //
1752    // Calling http_mock again with the same (method, url_pattern) tuple
1753    // *replaces* the prior mock for that target — tests can override a
1754    // per-case response without first calling http_mock_clear().
1755    vm.register_builtin("http_mock", |args, _out| {
1756        let method = args.first().map(|a| a.display()).unwrap_or_default();
1757        let url_pattern = args.get(1).map(|a| a.display()).unwrap_or_default();
1758        let response = args
1759            .get(2)
1760            .and_then(|a| a.as_dict())
1761            .cloned()
1762            .unwrap_or_default();
1763        let responses = parse_mock_responses(&response);
1764
1765        HTTP_MOCKS.with(|mocks| {
1766            let mut mocks = mocks.borrow_mut();
1767            mocks.retain(|mock| !(mock.method == method && mock.url_pattern == url_pattern));
1768            mocks.push(HttpMock {
1769                method,
1770                url_pattern,
1771                responses,
1772                next_response: 0,
1773            });
1774        });
1775        Ok(VmValue::Nil)
1776    });
1777
1778    // http_mock_clear() -> nil
1779    vm.register_builtin("http_mock_clear", |_args, _out| {
1780        HTTP_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1781        HTTP_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
1782        HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1783        Ok(VmValue::Nil)
1784    });
1785
1786    // http_mock_calls() -> list of {method, url, headers, body}
1787    vm.register_builtin("http_mock_calls", |_args, _out| {
1788        let calls = HTTP_MOCK_CALLS.with(|calls| calls.borrow().clone());
1789        let result: Vec<VmValue> = calls
1790            .iter()
1791            .map(|c| {
1792                let mut dict = BTreeMap::new();
1793                dict.insert(
1794                    "method".to_string(),
1795                    VmValue::String(Rc::from(c.method.as_str())),
1796                );
1797                dict.insert("url".to_string(), VmValue::String(Rc::from(c.url.as_str())));
1798                dict.insert(
1799                    "headers".to_string(),
1800                    VmValue::Dict(Rc::new(c.headers.clone())),
1801                );
1802                dict.insert(
1803                    "body".to_string(),
1804                    match &c.body {
1805                        Some(b) => VmValue::String(Rc::from(b.as_str())),
1806                        None => VmValue::Nil,
1807                    },
1808                );
1809                VmValue::Dict(Rc::new(dict))
1810            })
1811            .collect();
1812        Ok(VmValue::List(Rc::new(result)))
1813    });
1814
1815    vm.register_async_builtin("http_request", |args| async move {
1816        let method = args
1817            .first()
1818            .map(|a| a.display())
1819            .unwrap_or_default()
1820            .to_uppercase();
1821        if method.is_empty() {
1822            return Err(VmError::Thrown(VmValue::String(Rc::from(
1823                "http_request: method is required",
1824            ))));
1825        }
1826        let url = args.get(1).map(|a| a.display()).unwrap_or_default();
1827        if url.is_empty() {
1828            return Err(VmError::Thrown(VmValue::String(Rc::from(
1829                "http_request: URL is required",
1830            ))));
1831        }
1832        let options = match args.get(2) {
1833            Some(VmValue::Dict(d)) => (**d).clone(),
1834            _ => BTreeMap::new(),
1835        };
1836        vm_execute_http_request(&method, &url, &options).await
1837    });
1838
1839    vm.register_async_builtin("http_download", |args| async move {
1840        let url = args.first().map(|a| a.display()).unwrap_or_default();
1841        if url.is_empty() {
1842            return Err(vm_error("http_download: URL is required"));
1843        }
1844        let dst_path = args.get(1).map(|a| a.display()).unwrap_or_default();
1845        if dst_path.is_empty() {
1846            return Err(vm_error("http_download: destination path is required"));
1847        }
1848        let options = get_options_arg(&args, 2);
1849        vm_http_download(&url, &dst_path, &options).await
1850    });
1851
1852    vm.register_async_builtin("http_stream_open", |args| async move {
1853        let url = args.first().map(|a| a.display()).unwrap_or_default();
1854        if url.is_empty() {
1855            return Err(vm_error("http_stream_open: URL is required"));
1856        }
1857        let options = get_options_arg(&args, 1);
1858        vm_http_stream_open(&url, &options).await
1859    });
1860
1861    vm.register_async_builtin("http_stream_read", |args| async move {
1862        let Some(handle) = args.first() else {
1863            return Err(vm_error("http_stream_read: requires a stream handle"));
1864        };
1865        let stream_id = handle_from_value(handle, "http_stream_read")?;
1866        let max_bytes = args
1867            .get(1)
1868            .and_then(|value| value.as_int())
1869            .map(|value| value.max(1) as usize)
1870            .unwrap_or(DEFAULT_MAX_MESSAGE_BYTES);
1871        vm_http_stream_read(&stream_id, max_bytes).await
1872    });
1873
1874    vm.register_builtin("http_stream_info", |args, _out| {
1875        let Some(handle) = args.first() else {
1876            return Err(vm_error("http_stream_info: requires a stream handle"));
1877        };
1878        let stream_id = handle_from_value(handle, "http_stream_info")?;
1879        vm_http_stream_info(&stream_id)
1880    });
1881
1882    vm.register_builtin("http_stream_close", |args, _out| {
1883        let Some(handle) = args.first() else {
1884            return Err(vm_error("http_stream_close: requires a stream handle"));
1885        };
1886        let stream_id = handle_from_value(handle, "http_stream_close")?;
1887        let removed = HTTP_STREAMS.with(|streams| streams.borrow_mut().remove(&stream_id));
1888        Ok(VmValue::Bool(removed.is_some()))
1889    });
1890
1891    vm.register_builtin("http_session", |args, _out| {
1892        let options = get_options_arg(args, 0);
1893        let config = parse_http_options(&options);
1894        let client = build_http_client(&config)?;
1895        let id = next_transport_handle("http-session");
1896        HTTP_SESSIONS.with(|sessions| {
1897            let mut sessions = sessions.borrow_mut();
1898            if sessions.len() >= MAX_HTTP_SESSIONS {
1899                return Err(vm_error(format!(
1900                    "http_session: maximum open sessions ({MAX_HTTP_SESSIONS}) reached"
1901                )));
1902            }
1903            sessions.insert(id.clone(), HttpSession { client, options });
1904            Ok(())
1905        })?;
1906        Ok(VmValue::String(Rc::from(id)))
1907    });
1908
1909    vm.register_async_builtin("http_session_request", |args| async move {
1910        if args.len() < 3 {
1911            return Err(vm_error(
1912                "http_session_request: requires session, method, and URL",
1913            ));
1914        }
1915        let session_id = handle_from_value(&args[0], "http_session_request")?;
1916        let method = args[1].display().to_uppercase();
1917        if method.is_empty() {
1918            return Err(vm_error("http_session_request: method is required"));
1919        }
1920        let url = args[2].display();
1921        if url.is_empty() {
1922            return Err(vm_error("http_session_request: URL is required"));
1923        }
1924        let options = get_options_arg(&args, 3);
1925        vm_execute_http_session_request(&session_id, &method, &url, &options).await
1926    });
1927
1928    vm.register_builtin("http_session_close", |args, _out| {
1929        let Some(handle) = args.first() else {
1930            return Err(vm_error("http_session_close: requires a session handle"));
1931        };
1932        let session_id = handle_from_value(handle, "http_session_close")?;
1933        let removed = HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().remove(&session_id));
1934        Ok(VmValue::Bool(removed.is_some()))
1935    });
1936
1937    vm.register_builtin("sse_mock", |args, _out| {
1938        let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1939        if url_pattern.is_empty() {
1940            return Err(vm_error("sse_mock: URL pattern is required"));
1941        }
1942        let events = parse_mock_stream_events(args.get(1));
1943        SSE_MOCKS.with(|mocks| {
1944            mocks.borrow_mut().push(SseMock {
1945                url_pattern,
1946                events,
1947            });
1948        });
1949        Ok(VmValue::Nil)
1950    });
1951
1952    vm.register_async_builtin("sse_connect", |args| async move {
1953        let method = args
1954            .first()
1955            .map(|arg| arg.display())
1956            .filter(|method| !method.is_empty())
1957            .unwrap_or_else(|| "GET".to_string())
1958            .to_uppercase();
1959        let url = args.get(1).map(|arg| arg.display()).unwrap_or_default();
1960        if url.is_empty() {
1961            return Err(vm_error("sse_connect: URL is required"));
1962        }
1963        let options = get_options_arg(&args, 2);
1964        vm_sse_connect(&method, &url, &options).await
1965    });
1966
1967    vm.register_async_builtin("sse_receive", |args| async move {
1968        let Some(handle) = args.first() else {
1969            return Err(vm_error("sse_receive: requires a stream handle"));
1970        };
1971        let stream_id = handle_from_value(handle, "sse_receive")?;
1972        let timeout_ms = receive_timeout_arg(&args, 1);
1973        vm_sse_receive(&stream_id, timeout_ms).await
1974    });
1975
1976    vm.register_builtin("sse_event", |args, _out| {
1977        let Some(event) = args.first() else {
1978            return Err(vm_error("sse_event: requires event data or an event dict"));
1979        };
1980        let options = get_options_arg(args, 1);
1981        Ok(VmValue::String(Rc::from(vm_sse_event_frame(
1982            event, &options,
1983        )?)))
1984    });
1985
1986    vm.register_builtin("sse_server_response", |args, _out| {
1987        let options = get_options_arg(args, 0);
1988        vm_sse_server_response(&options)
1989    });
1990
1991    vm.register_builtin("sse_server_send", |args, _out| {
1992        if args.len() < 2 {
1993            return Err(vm_error("sse_server_send: requires stream and event"));
1994        }
1995        let stream_id = handle_from_value(&args[0], "sse_server_send")?;
1996        let options = get_options_arg(args, 2);
1997        vm_sse_server_send(&stream_id, &args[1], &options)
1998    });
1999
2000    vm.register_builtin("sse_server_heartbeat", |args, _out| {
2001        let Some(handle) = args.first() else {
2002            return Err(vm_error("sse_server_heartbeat: requires a stream handle"));
2003        };
2004        let stream_id = handle_from_value(handle, "sse_server_heartbeat")?;
2005        vm_sse_server_heartbeat(&stream_id, args.get(1))
2006    });
2007
2008    vm.register_builtin("sse_server_flush", |args, _out| {
2009        let Some(handle) = args.first() else {
2010            return Err(vm_error("sse_server_flush: requires a stream handle"));
2011        };
2012        let stream_id = handle_from_value(handle, "sse_server_flush")?;
2013        vm_sse_server_flush(&stream_id)
2014    });
2015
2016    vm.register_builtin("sse_server_close", |args, _out| {
2017        let Some(handle) = args.first() else {
2018            return Err(vm_error("sse_server_close: requires a stream handle"));
2019        };
2020        let stream_id = handle_from_value(handle, "sse_server_close")?;
2021        vm_sse_server_close(&stream_id)
2022    });
2023
2024    vm.register_builtin("sse_server_cancel", |args, _out| {
2025        let Some(handle) = args.first() else {
2026            return Err(vm_error("sse_server_cancel: requires a stream handle"));
2027        };
2028        let stream_id = handle_from_value(handle, "sse_server_cancel")?;
2029        vm_sse_server_cancel(&stream_id, args.get(1))
2030    });
2031
2032    vm.register_builtin("sse_server_status", |args, _out| {
2033        let Some(handle) = args.first() else {
2034            return Err(vm_error("sse_server_status: requires a stream handle"));
2035        };
2036        let stream_id = handle_from_value(handle, "sse_server_status")?;
2037        vm_sse_server_status(&stream_id)
2038    });
2039
2040    vm.register_builtin("sse_server_disconnected", |args, _out| {
2041        let Some(handle) = args.first() else {
2042            return Err(vm_error(
2043                "sse_server_disconnected: requires a stream handle",
2044            ));
2045        };
2046        let stream_id = handle_from_value(handle, "sse_server_disconnected")?;
2047        vm_sse_server_observed_bool(&stream_id, "sse_server_disconnected", |handle| {
2048            handle.disconnected
2049        })
2050    });
2051
2052    vm.register_builtin("sse_server_cancelled", |args, _out| {
2053        let Some(handle) = args.first() else {
2054            return Err(vm_error("sse_server_cancelled: requires a stream handle"));
2055        };
2056        let stream_id = handle_from_value(handle, "sse_server_cancelled")?;
2057        vm_sse_server_observed_bool(&stream_id, "sse_server_cancelled", |handle| {
2058            handle.cancelled
2059        })
2060    });
2061
2062    vm.register_builtin("sse_server_mock_receive", |args, _out| {
2063        let Some(handle) = args.first() else {
2064            return Err(vm_error(
2065                "sse_server_mock_receive: requires a stream handle",
2066            ));
2067        };
2068        let stream_id = handle_from_value(handle, "sse_server_mock_receive")?;
2069        vm_sse_server_mock_receive(&stream_id)
2070    });
2071
2072    vm.register_builtin("sse_server_mock_disconnect", |args, _out| {
2073        let Some(handle) = args.first() else {
2074            return Err(vm_error(
2075                "sse_server_mock_disconnect: requires a stream handle",
2076            ));
2077        };
2078        let stream_id = handle_from_value(handle, "sse_server_mock_disconnect")?;
2079        vm_sse_server_mock_disconnect(&stream_id)
2080    });
2081
2082    vm.register_builtin("sse_close", |args, _out| {
2083        let Some(handle) = args.first() else {
2084            return Err(vm_error("sse_close: requires a stream handle"));
2085        };
2086        let stream_id = handle_from_value(handle, "sse_close")?;
2087        let removed = SSE_HANDLES.with(|handles| {
2088            let mut handles = handles.borrow_mut();
2089            let removed = handles.remove(&stream_id);
2090            if let Some(handle) = &removed {
2091                if let SseHandleKind::Real(stream) = &handle.kind {
2092                    if let Ok(mut stream) = stream.try_lock() {
2093                        stream.close();
2094                    }
2095                }
2096            }
2097            removed
2098        });
2099        Ok(VmValue::Bool(removed.is_some()))
2100    });
2101
2102    vm.register_builtin("websocket_mock", |args, _out| {
2103        let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
2104        if url_pattern.is_empty() {
2105            return Err(vm_error("websocket_mock: URL pattern is required"));
2106        }
2107        let (messages, echo) = parse_websocket_mock(args.get(1));
2108        WEBSOCKET_MOCKS.with(|mocks| {
2109            mocks.borrow_mut().push(WebSocketMock {
2110                url_pattern,
2111                messages,
2112                echo,
2113            });
2114        });
2115        Ok(VmValue::Nil)
2116    });
2117
2118    vm.register_async_builtin("websocket_connect", |args| async move {
2119        let url = args.first().map(|arg| arg.display()).unwrap_or_default();
2120        if url.is_empty() {
2121            return Err(vm_error("websocket_connect: URL is required"));
2122        }
2123        let options = get_options_arg(&args, 1);
2124        vm_websocket_connect(&url, &options).await
2125    });
2126
2127    vm.register_builtin("websocket_server", |args, _out| {
2128        let bind = args
2129            .first()
2130            .map(|arg| arg.display())
2131            .filter(|bind| !bind.is_empty())
2132            .unwrap_or_else(|| "127.0.0.1:0".to_string());
2133        let options = get_options_arg(args, 1);
2134        vm_websocket_server(&bind, &options)
2135    });
2136
2137    vm.register_builtin("websocket_route", |args, _out| {
2138        if args.len() < 2 {
2139            return Err(vm_error(
2140                "websocket_route: requires server handle and route path",
2141            ));
2142        }
2143        let server_id = handle_from_value(&args[0], "websocket_route")?;
2144        let path = args[1].display();
2145        if path.is_empty() || !path.starts_with('/') {
2146            return Err(vm_error("websocket_route: path must start with '/'"));
2147        }
2148        let options = get_options_arg(args, 2);
2149        vm_websocket_route(&server_id, &path, &options)
2150    });
2151
2152    vm.register_async_builtin("websocket_accept", |args| async move {
2153        let Some(handle) = args.first() else {
2154            return Err(vm_error("websocket_accept: requires a server handle"));
2155        };
2156        let server_id = handle_from_value(handle, "websocket_accept")?;
2157        let timeout_ms = receive_timeout_arg(&args, 1);
2158        vm_websocket_accept(&server_id, timeout_ms).await
2159    });
2160
2161    vm.register_async_builtin("websocket_send", |args| async move {
2162        if args.len() < 2 {
2163            return Err(vm_error(
2164                "websocket_send: requires socket handle and message",
2165            ));
2166        }
2167        let socket_id = handle_from_value(&args[0], "websocket_send")?;
2168        let message = args[1].clone();
2169        let options = get_options_arg(&args, 2);
2170        vm_websocket_send(&socket_id, message, &options).await
2171    });
2172
2173    vm.register_async_builtin("websocket_receive", |args| async move {
2174        let Some(handle) = args.first() else {
2175            return Err(vm_error("websocket_receive: requires a socket handle"));
2176        };
2177        let socket_id = handle_from_value(handle, "websocket_receive")?;
2178        let timeout_ms = receive_timeout_arg(&args, 1);
2179        vm_websocket_receive(&socket_id, timeout_ms).await
2180    });
2181
2182    vm.register_async_builtin("websocket_close", |args| async move {
2183        let Some(handle) = args.first() else {
2184            return Err(vm_error("websocket_close: requires a socket handle"));
2185        };
2186        let socket_id = handle_from_value(handle, "websocket_close")?;
2187        vm_websocket_close(&socket_id).await
2188    });
2189
2190    vm.register_builtin("websocket_server_close", |args, _out| {
2191        let Some(handle) = args.first() else {
2192            return Err(vm_error("websocket_server_close: requires a server handle"));
2193        };
2194        let server_id = handle_from_value(handle, "websocket_server_close")?;
2195        vm_websocket_server_close(&server_id)
2196    });
2197
2198    vm.register_builtin("transport_mock_clear", |_args, _out| {
2199        HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
2200        SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
2201        SSE_HANDLES.with(|handles| handles.borrow_mut().clear());
2202        WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
2203        WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
2204        WEBSOCKET_SERVERS.with(|servers| {
2205            let mut servers = servers.borrow_mut();
2206            for server in servers.values() {
2207                server.running.store(false, Ordering::SeqCst);
2208                let _ = TcpStream::connect(&server.addr);
2209            }
2210            servers.clear();
2211        });
2212        TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
2213        Ok(VmValue::Nil)
2214    });
2215
2216    vm.register_builtin("transport_mock_calls", |_args, _out| {
2217        let calls = TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow().clone());
2218        let values = calls
2219            .iter()
2220            .map(transport_mock_call_value)
2221            .collect::<Vec<_>>();
2222        Ok(VmValue::List(Rc::new(values)))
2223    });
2224}
2225
2226fn http_server_tls_config_value(
2227    mode: &str,
2228    terminate_tls: bool,
2229    scheme: &str,
2230    hsts: bool,
2231    extra: BTreeMap<String, VmValue>,
2232) -> VmValue {
2233    let mut dict = BTreeMap::new();
2234    dict.insert("mode".to_string(), VmValue::String(Rc::from(mode)));
2235    dict.insert("terminate_tls".to_string(), VmValue::Bool(terminate_tls));
2236    dict.insert("scheme".to_string(), VmValue::String(Rc::from(scheme)));
2237    dict.insert("hsts".to_string(), VmValue::Bool(hsts));
2238    for (key, value) in extra {
2239        dict.insert(key, value);
2240    }
2241    VmValue::Dict(Rc::new(dict))
2242}
2243
2244fn hsts_options(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2245    let mut hsts = BTreeMap::new();
2246    hsts.insert(
2247        "hsts_max_age_seconds".to_string(),
2248        VmValue::Int(vm_get_int_option(
2249            options,
2250            "hsts_max_age_seconds",
2251            31_536_000,
2252        )),
2253    );
2254    hsts.insert(
2255        "hsts_include_subdomains".to_string(),
2256        VmValue::Bool(vm_get_bool_option(
2257            options,
2258            "hsts_include_subdomains",
2259            false,
2260        )),
2261    );
2262    hsts.insert(
2263        "hsts_preload".to_string(),
2264        VmValue::Bool(vm_get_bool_option(options, "hsts_preload", false)),
2265    );
2266    hsts
2267}
2268
2269fn http_server_security_headers(config: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2270    let hsts_enabled = vm_get_bool_option(config, "hsts", false);
2271    if !hsts_enabled {
2272        return BTreeMap::new();
2273    }
2274    let mut value = format!(
2275        "max-age={}",
2276        vm_get_int_option(config, "hsts_max_age_seconds", 31_536_000).max(0)
2277    );
2278    if vm_get_bool_option(config, "hsts_include_subdomains", false) {
2279        value.push_str("; includeSubDomains");
2280    }
2281    if vm_get_bool_option(config, "hsts_preload", false) {
2282        value.push_str("; preload");
2283    }
2284    BTreeMap::from([(
2285        "strict-transport-security".to_string(),
2286        VmValue::String(Rc::from(value)),
2287    )])
2288}
2289
2290fn tls_hosts_arg(value: Option<&VmValue>) -> Result<Vec<String>, VmError> {
2291    match value {
2292        None | Some(VmValue::Nil) => Ok(vec!["localhost".to_string(), "127.0.0.1".to_string()]),
2293        Some(VmValue::List(hosts)) => {
2294            let mut parsed = Vec::new();
2295            for host in hosts.iter() {
2296                let host = host.display();
2297                if host.is_empty() {
2298                    return Err(vm_error(
2299                        "http_server_tls_self_signed_dev: host names must be non-empty",
2300                    ));
2301                }
2302                parsed.push(host);
2303            }
2304            if parsed.is_empty() {
2305                return Err(vm_error(
2306                    "http_server_tls_self_signed_dev: host list must not be empty",
2307                ));
2308            }
2309            Ok(parsed)
2310        }
2311        Some(other) => {
2312            let host = other.display();
2313            if host.is_empty() {
2314                return Err(vm_error(
2315                    "http_server_tls_self_signed_dev: host name must be non-empty",
2316                ));
2317            }
2318            Ok(vec![host])
2319        }
2320    }
2321}
2322
2323fn vm_get_int_option(options: &BTreeMap<String, VmValue>, key: &str, default: i64) -> i64 {
2324    options.get(key).and_then(|v| v.as_int()).unwrap_or(default)
2325}
2326
2327fn vm_get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
2328    match options.get(key) {
2329        Some(VmValue::Bool(b)) => *b,
2330        _ => default,
2331    }
2332}
2333
2334fn vm_get_int_option_prefer(
2335    options: &BTreeMap<String, VmValue>,
2336    canonical: &str,
2337    alias: &str,
2338    default: i64,
2339) -> i64 {
2340    options
2341        .get(canonical)
2342        .and_then(|value| value.as_int())
2343        .or_else(|| options.get(alias).and_then(|value| value.as_int()))
2344        .unwrap_or(default)
2345}
2346
2347fn vm_get_optional_int_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<u64> {
2348    options
2349        .get(key)
2350        .and_then(|value| value.as_int())
2351        .map(|value| value.max(0) as u64)
2352}
2353
2354fn string_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
2355    options
2356        .get(key)
2357        .map(|value| value.display())
2358        .filter(|value| !value.is_empty())
2359}
2360
2361fn parse_proxy_config(options: &BTreeMap<String, VmValue>) -> Option<HttpProxyConfig> {
2362    let proxy = options.get("proxy")?;
2363    let (url, no_proxy) = match proxy {
2364        VmValue::Dict(dict) => (
2365            dict.get("url")
2366                .map(|value| value.display())
2367                .filter(|value| !value.is_empty())?,
2368            dict.get("no_proxy")
2369                .map(|value| value.display())
2370                .filter(|value| !value.is_empty()),
2371        ),
2372        other => (other.display(), None),
2373    };
2374    if url.is_empty() {
2375        return None;
2376    }
2377    let auth = options
2378        .get("proxy_auth")
2379        .and_then(|value| value.as_dict())
2380        .map(|dict| {
2381            (
2382                dict.get("user")
2383                    .map(|value| value.display())
2384                    .unwrap_or_default(),
2385                dict.get("pass")
2386                    .or_else(|| dict.get("password"))
2387                    .map(|value| value.display())
2388                    .unwrap_or_default(),
2389            )
2390        })
2391        .filter(|(user, _)| !user.is_empty());
2392    Some(HttpProxyConfig {
2393        url,
2394        auth,
2395        no_proxy,
2396    })
2397}
2398
2399fn parse_tls_config(options: &BTreeMap<String, VmValue>) -> HttpTlsConfig {
2400    let Some(tls) = options.get("tls").and_then(|value| value.as_dict()) else {
2401        return HttpTlsConfig::default();
2402    };
2403    let pinned_sha256 = match tls.get("pinned_sha256") {
2404        Some(VmValue::List(values)) => values
2405            .iter()
2406            .map(|value| value.display())
2407            .filter(|value| !value.is_empty())
2408            .collect(),
2409        Some(value) => {
2410            let value = value.display();
2411            if value.is_empty() {
2412                Vec::new()
2413            } else {
2414                vec![value]
2415            }
2416        }
2417        None => Vec::new(),
2418    };
2419    HttpTlsConfig {
2420        ca_bundle_path: string_option(tls, "ca_bundle_path"),
2421        client_cert_path: string_option(tls, "client_cert_path"),
2422        client_key_path: string_option(tls, "client_key_path"),
2423        client_identity_path: string_option(tls, "client_identity_path"),
2424        pinned_sha256,
2425    }
2426}
2427
2428fn parse_retry_statuses(options: &BTreeMap<String, VmValue>) -> Vec<u16> {
2429    match options.get("retry_on") {
2430        Some(VmValue::List(values)) => {
2431            let statuses: Vec<u16> = values
2432                .iter()
2433                .filter_map(|value| value.as_int())
2434                .filter(|status| (0..=u16::MAX as i64).contains(status))
2435                .map(|status| status as u16)
2436                .collect();
2437            if statuses.is_empty() {
2438                DEFAULT_RETRYABLE_STATUSES.to_vec()
2439            } else {
2440                statuses
2441            }
2442        }
2443        _ => DEFAULT_RETRYABLE_STATUSES.to_vec(),
2444    }
2445}
2446
2447fn parse_retry_methods(options: &BTreeMap<String, VmValue>) -> Vec<String> {
2448    match options.get("retry_methods") {
2449        Some(VmValue::List(values)) => {
2450            let methods: Vec<String> = values
2451                .iter()
2452                .map(|value| value.display().trim().to_ascii_uppercase())
2453                .filter(|value| !value.is_empty())
2454                .collect();
2455            if methods.is_empty() {
2456                DEFAULT_RETRYABLE_METHODS
2457                    .iter()
2458                    .map(|method| (*method).to_string())
2459                    .collect()
2460            } else {
2461                methods
2462            }
2463        }
2464        _ => DEFAULT_RETRYABLE_METHODS
2465            .iter()
2466            .map(|method| (*method).to_string())
2467            .collect(),
2468    }
2469}
2470
2471fn parse_http_options(options: &BTreeMap<String, VmValue>) -> HttpRequestConfig {
2472    let total_timeout_ms = vm_get_int_option(options, "total_timeout_ms", -1);
2473    let total_timeout_ms = if total_timeout_ms >= 0 {
2474        total_timeout_ms as u64
2475    } else {
2476        vm_get_int_option_prefer(options, "timeout_ms", "timeout", DEFAULT_TIMEOUT_MS as i64).max(0)
2477            as u64
2478    };
2479    let retry_options = options.get("retry").and_then(|value| value.as_dict());
2480    let retry_max = retry_options
2481        .and_then(|retry| retry.get("max"))
2482        .and_then(|value| value.as_int())
2483        .unwrap_or_else(|| vm_get_int_option(options, "retries", 0))
2484        .max(0) as u32;
2485    let retry_backoff_ms = retry_options
2486        .and_then(|retry| retry.get("backoff_ms"))
2487        .and_then(|value| value.as_int())
2488        .unwrap_or_else(|| vm_get_int_option(options, "backoff", DEFAULT_BACKOFF_MS as i64))
2489        .max(0) as u64;
2490    let respect_retry_after = vm_get_bool_option(options, "respect_retry_after", true);
2491    let follow_redirects = vm_get_bool_option(options, "follow_redirects", true);
2492    let max_redirects = vm_get_int_option(options, "max_redirects", 10).max(0) as usize;
2493
2494    HttpRequestConfig {
2495        total_timeout_ms,
2496        connect_timeout_ms: vm_get_optional_int_option(options, "connect_timeout_ms"),
2497        read_timeout_ms: vm_get_optional_int_option(options, "read_timeout_ms"),
2498        retry: RetryConfig {
2499            max: retry_max,
2500            backoff_ms: retry_backoff_ms,
2501            retryable_statuses: parse_retry_statuses(options),
2502            retryable_methods: parse_retry_methods(options),
2503            respect_retry_after,
2504        },
2505        follow_redirects,
2506        max_redirects,
2507        proxy: parse_proxy_config(options),
2508        tls: parse_tls_config(options),
2509        decompress: vm_get_bool_option(options, "decompress", true),
2510    }
2511}
2512
2513fn http_client_key(config: &HttpRequestConfig) -> String {
2514    format!(
2515        "follow_redirects={};max_redirects={};connect_timeout={:?};read_timeout={:?};proxy={};proxy_auth={};no_proxy={};ca={};client_cert={};client_key={};identity={};pins={};decompress={}",
2516        config.follow_redirects,
2517        config.max_redirects,
2518        config.connect_timeout_ms,
2519        config.read_timeout_ms,
2520        config
2521            .proxy
2522            .as_ref()
2523            .map(|proxy| proxy.url.as_str())
2524            .unwrap_or(""),
2525        config
2526            .proxy
2527            .as_ref()
2528            .and_then(|proxy| proxy.auth.as_ref())
2529            .map(|(user, _)| user.as_str())
2530            .unwrap_or(""),
2531        config
2532            .proxy
2533            .as_ref()
2534            .and_then(|proxy| proxy.no_proxy.as_deref())
2535            .unwrap_or(""),
2536        config.tls.ca_bundle_path.as_deref().unwrap_or(""),
2537        config.tls.client_cert_path.as_deref().unwrap_or(""),
2538        config.tls.client_key_path.as_deref().unwrap_or(""),
2539        config.tls.client_identity_path.as_deref().unwrap_or(""),
2540        config.tls.pinned_sha256.join(","),
2541        config.decompress,
2542    )
2543}
2544
2545fn build_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2546    let redirect_policy = if config.follow_redirects {
2547        let max_redirects = config.max_redirects;
2548        reqwest::redirect::Policy::custom(move |attempt| {
2549            if attempt.previous().len() >= max_redirects {
2550                attempt.error("too many redirects")
2551            } else if crate::egress::redirect_url_allowed("http_redirect", attempt.url().as_str()) {
2552                attempt.follow()
2553            } else {
2554                attempt.error("egress policy blocked redirect target")
2555            }
2556        })
2557    } else {
2558        reqwest::redirect::Policy::none()
2559    };
2560
2561    let mut builder = reqwest::Client::builder().redirect(redirect_policy);
2562    if let Some(ms) = config.connect_timeout_ms {
2563        builder = builder.connect_timeout(Duration::from_millis(ms));
2564    }
2565    if let Some(ms) = config.read_timeout_ms {
2566        builder = builder.read_timeout(Duration::from_millis(ms));
2567    }
2568    if !config.decompress {
2569        builder = builder.no_gzip().no_brotli().no_deflate().no_zstd();
2570    }
2571    if let Some(proxy_config) = &config.proxy {
2572        let mut proxy = reqwest::Proxy::all(&proxy_config.url)
2573            .map_err(|e| vm_error(format!("http: invalid proxy '{}': {e}", proxy_config.url)))?;
2574        if let Some((user, pass)) = &proxy_config.auth {
2575            proxy = proxy.basic_auth(user, pass);
2576        }
2577        if let Some(no_proxy) = &proxy_config.no_proxy {
2578            proxy = proxy.no_proxy(reqwest::NoProxy::from_string(no_proxy));
2579        }
2580        builder = builder.proxy(proxy);
2581    }
2582    builder = configure_tls(builder, &config.tls)?;
2583    builder
2584        .build()
2585        .map_err(|e| vm_error(format!("http: failed to build client: {e}")))
2586}
2587
2588fn configure_tls(
2589    mut builder: reqwest::ClientBuilder,
2590    tls: &HttpTlsConfig,
2591) -> Result<reqwest::ClientBuilder, VmError> {
2592    if let Some(path) = &tls.ca_bundle_path {
2593        let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2594        let bytes = std::fs::read(&resolved).map_err(|error| {
2595            vm_error(format!(
2596                "http: failed to read CA bundle {}: {error}",
2597                resolved.display()
2598            ))
2599        })?;
2600        match reqwest::Certificate::from_pem_bundle(&bytes) {
2601            Ok(certs) => {
2602                for cert in certs {
2603                    builder = builder.add_root_certificate(cert);
2604                }
2605            }
2606            Err(pem_error) => {
2607                let cert = reqwest::Certificate::from_der(&bytes).map_err(|der_error| {
2608                    vm_error(format!(
2609                        "http: failed to parse CA bundle {} as PEM ({pem_error}) or DER ({der_error})",
2610                        resolved.display()
2611                    ))
2612                })?;
2613                builder = builder.add_root_certificate(cert);
2614            }
2615        }
2616    }
2617
2618    if let Some(path) = &tls.client_identity_path {
2619        let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2620        let bytes = std::fs::read(&resolved).map_err(|error| {
2621            vm_error(format!(
2622                "http: failed to read client identity {}: {error}",
2623                resolved.display()
2624            ))
2625        })?;
2626        let identity = reqwest::Identity::from_pem(&bytes).map_err(|error| {
2627            vm_error(format!(
2628                "http: failed to parse client identity {}: {error}",
2629                resolved.display()
2630            ))
2631        })?;
2632        builder = builder.identity(identity);
2633    } else if let Some(cert_path) = &tls.client_cert_path {
2634        let cert = {
2635            let resolved = resolve_http_path(
2636                "http tls",
2637                cert_path,
2638                crate::stdlib::sandbox::FsAccess::Read,
2639            )?;
2640            std::fs::read(&resolved).map_err(|error| {
2641                vm_error(format!(
2642                    "http: failed to read client certificate {}: {error}",
2643                    resolved.display()
2644                ))
2645            })?
2646        };
2647        let mut identity_pem = cert;
2648        if let Some(key_path) = &tls.client_key_path {
2649            let resolved =
2650                resolve_http_path("http tls", key_path, crate::stdlib::sandbox::FsAccess::Read)?;
2651            let key = std::fs::read(&resolved).map_err(|error| {
2652                vm_error(format!(
2653                    "http: failed to read client key {}: {error}",
2654                    resolved.display()
2655                ))
2656            })?;
2657            identity_pem.extend_from_slice(b"\n");
2658            identity_pem.extend_from_slice(&key);
2659        }
2660        let identity = reqwest::Identity::from_pem(&identity_pem)
2661            .map_err(|error| vm_error(format!("http: failed to parse client identity: {error}")))?;
2662        builder = builder.identity(identity);
2663    }
2664
2665    if !tls.pinned_sha256.is_empty() {
2666        builder = builder.tls_info(true);
2667    }
2668    Ok(builder)
2669}
2670
2671fn pooled_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2672    let key = http_client_key(config);
2673    if let Some(client) = HTTP_CLIENTS.with(|clients| clients.borrow().get(&key).cloned()) {
2674        return Ok(client);
2675    }
2676
2677    let client = build_http_client(config)?;
2678    HTTP_CLIENTS.with(|clients| {
2679        clients.borrow_mut().insert(key, client.clone());
2680    });
2681    Ok(client)
2682}
2683
2684fn normalize_pin(value: &str) -> String {
2685    let trimmed = value.trim();
2686    let trimmed = trimmed
2687        .strip_prefix("sha256/")
2688        .or_else(|| trimmed.strip_prefix("sha256:"))
2689        .unwrap_or(trimmed);
2690    let compact = trimmed.replace(':', "");
2691    if !compact.is_empty() && compact.chars().all(|ch| ch.is_ascii_hexdigit()) {
2692        compact.to_ascii_lowercase()
2693    } else {
2694        compact
2695    }
2696}
2697
2698fn verify_tls_pin(response: &reqwest::Response, pins: &[String]) -> Result<(), VmError> {
2699    if pins.is_empty() {
2700        return Ok(());
2701    }
2702    let Some(info) = response.extensions().get::<reqwest::tls::TlsInfo>() else {
2703        return Err(vm_error(
2704            "http: TLS pinning requested but TLS info is unavailable",
2705        ));
2706    };
2707    let Some(cert_der) = info.peer_certificate() else {
2708        return Err(vm_error(
2709            "http: TLS pinning requested but no peer certificate was presented",
2710        ));
2711    };
2712    let (_, cert) = X509Certificate::from_der(cert_der)
2713        .map_err(|error| vm_error(format!("http: failed to parse peer certificate: {error}")))?;
2714    let digest = Sha256::digest(cert.tbs_certificate.subject_pki.raw);
2715    let hex_pin = hex::encode(digest.as_slice());
2716    let base64_pin = base64::engine::general_purpose::STANDARD.encode(digest);
2717    let base64url_pin = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
2718    let wanted = pins
2719        .iter()
2720        .map(|pin| normalize_pin(pin))
2721        .collect::<Vec<_>>();
2722    if wanted
2723        .iter()
2724        .any(|pin| pin == &hex_pin || pin == &base64_pin || pin == &base64url_pin)
2725    {
2726        Ok(())
2727    } else {
2728        Err(vm_error("http: TLS SPKI pin mismatch"))
2729    }
2730}
2731
2732fn parse_http_request_parts(
2733    method: &str,
2734    options: &BTreeMap<String, VmValue>,
2735) -> Result<HttpRequestParts, VmError> {
2736    let req_method = method
2737        .parse::<reqwest::Method>()
2738        .map_err(|e| vm_error(format!("http: invalid method '{method}': {e}")))?;
2739
2740    let mut header_map = reqwest::header::HeaderMap::new();
2741    let mut recorded_headers = BTreeMap::new();
2742
2743    if let Some(auth_val) = options.get("auth") {
2744        match auth_val {
2745            VmValue::String(s) => {
2746                let hv = reqwest::header::HeaderValue::from_str(s)
2747                    .map_err(|e| vm_error(format!("http: invalid auth header value: {e}")))?;
2748                header_map.insert(reqwest::header::AUTHORIZATION, hv);
2749                recorded_headers.insert(
2750                    "Authorization".to_string(),
2751                    VmValue::String(Rc::from(s.as_ref())),
2752                );
2753            }
2754            VmValue::Dict(d) => {
2755                if let Some(bearer) = d.get("bearer") {
2756                    let token = bearer.display();
2757                    let authorization = format!("Bearer {token}");
2758                    let hv = reqwest::header::HeaderValue::from_str(&authorization)
2759                        .map_err(|e| vm_error(format!("http: invalid bearer token: {e}")))?;
2760                    header_map.insert(reqwest::header::AUTHORIZATION, hv);
2761                    recorded_headers.insert(
2762                        "Authorization".to_string(),
2763                        VmValue::String(Rc::from(authorization)),
2764                    );
2765                } else if let Some(VmValue::Dict(basic)) = d.get("basic") {
2766                    let user = basic.get("user").map(|v| v.display()).unwrap_or_default();
2767                    let password = basic
2768                        .get("password")
2769                        .map(|v| v.display())
2770                        .unwrap_or_default();
2771                    use base64::Engine;
2772                    let encoded = base64::engine::general_purpose::STANDARD
2773                        .encode(format!("{user}:{password}"));
2774                    let authorization = format!("Basic {encoded}");
2775                    let hv = reqwest::header::HeaderValue::from_str(&authorization)
2776                        .map_err(|e| vm_error(format!("http: invalid basic auth: {e}")))?;
2777                    header_map.insert(reqwest::header::AUTHORIZATION, hv);
2778                    recorded_headers.insert(
2779                        "Authorization".to_string(),
2780                        VmValue::String(Rc::from(authorization)),
2781                    );
2782                }
2783            }
2784            _ => {}
2785        }
2786    }
2787
2788    if let Some(VmValue::Dict(hdrs)) = options.get("headers") {
2789        for (k, v) in hdrs.iter() {
2790            let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())
2791                .map_err(|e| vm_error(format!("http: invalid header name '{k}': {e}")))?;
2792            let val = reqwest::header::HeaderValue::from_str(&v.display())
2793                .map_err(|e| vm_error(format!("http: invalid header value for '{k}': {e}")))?;
2794            header_map.insert(name, val);
2795            recorded_headers.insert(k.clone(), VmValue::String(Rc::from(v.display())));
2796        }
2797    }
2798
2799    let multipart = parse_multipart_request(options)?;
2800    if multipart.is_some() {
2801        if options.contains_key("body") {
2802            return Err(vm_error(
2803                "http: body and multipart options are mutually exclusive",
2804            ));
2805        }
2806        recorded_headers.insert(
2807            "content-type".to_string(),
2808            VmValue::String(Rc::from(format!(
2809                "multipart/form-data; boundary={MULTIPART_MOCK_BOUNDARY}"
2810            ))),
2811        );
2812    }
2813
2814    Ok(HttpRequestParts {
2815        method: req_method,
2816        headers: header_map,
2817        recorded_headers,
2818        body: if multipart.is_some() {
2819            multipart.as_ref().map(|request| request.mock_body.clone())
2820        } else {
2821            options.get("body").map(|v| v.display())
2822        },
2823        multipart,
2824    })
2825}
2826
2827fn session_from_options(options: &BTreeMap<String, VmValue>) -> Option<String> {
2828    options
2829        .get("session")
2830        .and_then(|value| handle_from_value(value, "http_request").ok())
2831}
2832
2833fn parse_mock_stream_event(value: &VmValue) -> MockStreamEvent {
2834    match value {
2835        VmValue::Dict(dict) => MockStreamEvent {
2836            event_type: dict
2837                .get("event")
2838                .or_else(|| dict.get("type"))
2839                .map(|value| value.display())
2840                .filter(|value| !value.is_empty())
2841                .unwrap_or_else(|| "message".to_string()),
2842            data: dict
2843                .get("data")
2844                .map(|value| value.display())
2845                .unwrap_or_default(),
2846            id: dict
2847                .get("id")
2848                .map(|value| value.display())
2849                .filter(|value| !value.is_empty()),
2850            retry_ms: dict.get("retry_ms").and_then(|value| value.as_int()),
2851        },
2852        _ => MockStreamEvent {
2853            event_type: "message".to_string(),
2854            data: value.display(),
2855            id: None,
2856            retry_ms: None,
2857        },
2858    }
2859}
2860
2861fn parse_mock_stream_events(value: Option<&VmValue>) -> Vec<MockStreamEvent> {
2862    let Some(value) = value else {
2863        return Vec::new();
2864    };
2865    match value {
2866        VmValue::Dict(dict) => dict
2867            .get("events")
2868            .and_then(|events| match events {
2869                VmValue::List(items) => Some(items.iter().map(parse_mock_stream_event).collect()),
2870                _ => None,
2871            })
2872            .unwrap_or_default(),
2873        VmValue::List(items) => items.iter().map(parse_mock_stream_event).collect(),
2874        other => vec![parse_mock_stream_event(other)],
2875    }
2876}
2877
2878fn sse_event_value(event: &MockStreamEvent) -> VmValue {
2879    let mut dict = BTreeMap::new();
2880    dict.insert("type".to_string(), VmValue::String(Rc::from("event")));
2881    dict.insert(
2882        "event".to_string(),
2883        VmValue::String(Rc::from(event.event_type.as_str())),
2884    );
2885    dict.insert(
2886        "data".to_string(),
2887        VmValue::String(Rc::from(event.data.as_str())),
2888    );
2889    dict.insert(
2890        "id".to_string(),
2891        event
2892            .id
2893            .as_deref()
2894            .map(|id| VmValue::String(Rc::from(id)))
2895            .unwrap_or(VmValue::Nil),
2896    );
2897    dict.insert(
2898        "retry_ms".to_string(),
2899        event.retry_ms.map(VmValue::Int).unwrap_or(VmValue::Nil),
2900    );
2901    VmValue::Dict(Rc::new(dict))
2902}
2903
2904fn sse_server_response_value(id: &str, handle: &SseServerHandle) -> VmValue {
2905    let mut dict = BTreeMap::new();
2906    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2907    dict.insert(
2908        "type".to_string(),
2909        VmValue::String(Rc::from("sse_response")),
2910    );
2911    dict.insert("status".to_string(), VmValue::Int(handle.status));
2912    dict.insert(
2913        "headers".to_string(),
2914        VmValue::Dict(Rc::new(handle.headers.clone())),
2915    );
2916    dict.insert("body".to_string(), VmValue::Nil);
2917    dict.insert("streaming".to_string(), VmValue::Bool(true));
2918    dict.insert(
2919        "max_event_bytes".to_string(),
2920        VmValue::Int(handle.max_event_bytes as i64),
2921    );
2922    dict.insert(
2923        "max_buffered_events".to_string(),
2924        VmValue::Int(handle.max_buffered_events as i64),
2925    );
2926    VmValue::Dict(Rc::new(dict))
2927}
2928
2929fn default_sse_response_headers() -> BTreeMap<String, VmValue> {
2930    BTreeMap::from([
2931        (
2932            "content-type".to_string(),
2933            VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2934        ),
2935        (
2936            "cache-control".to_string(),
2937            VmValue::String(Rc::from("no-cache")),
2938        ),
2939        (
2940            "connection".to_string(),
2941            VmValue::String(Rc::from("keep-alive")),
2942        ),
2943        (
2944            "x-accel-buffering".to_string(),
2945            VmValue::String(Rc::from("no")),
2946        ),
2947    ])
2948}
2949
2950fn sse_response_headers(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2951    let mut headers = default_sse_response_headers();
2952    if let Some(VmValue::Dict(custom)) = options.get("headers") {
2953        for (name, value) in custom.iter() {
2954            headers.retain(|existing, _| !existing.eq_ignore_ascii_case(name));
2955            headers.insert(name.clone(), VmValue::String(Rc::from(value.display())));
2956        }
2957    }
2958    if !headers
2959        .keys()
2960        .any(|name| name.eq_ignore_ascii_case("content-type"))
2961    {
2962        headers.insert(
2963            "content-type".to_string(),
2964            VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2965        );
2966    }
2967    headers
2968}
2969
2970fn validate_sse_field(field: &str, value: &str) -> Result<(), VmError> {
2971    if value.contains('\n') || value.contains('\r') {
2972        return Err(vm_error(format!(
2973            "sse_event: {field} must not contain newlines"
2974        )));
2975    }
2976    Ok(())
2977}
2978
2979fn push_sse_multiline_field(frame: &mut String, field: &str, value: &str) {
2980    let normalized = value.replace("\r\n", "\n").replace('\r', "\n");
2981    if normalized.is_empty() {
2982        frame.push_str(field);
2983        frame.push_str(": \n");
2984        return;
2985    }
2986    for line in normalized.split('\n') {
2987        frame.push_str(field);
2988        frame.push_str(": ");
2989        frame.push_str(line);
2990        frame.push('\n');
2991    }
2992}
2993
2994fn vm_sse_event_frame(
2995    event: &VmValue,
2996    options: &BTreeMap<String, VmValue>,
2997) -> Result<String, VmError> {
2998    let mut frame = String::new();
2999    let mut has_event_payload = false;
3000
3001    match event {
3002        VmValue::Dict(dict) => {
3003            if let Some(comment) = dict.get("comment").or_else(|| options.get("comment")) {
3004                push_sse_comment(&mut frame, &comment.display());
3005            }
3006            if let Some(id) = dict.get("id").or_else(|| options.get("id")) {
3007                let id = id.display();
3008                validate_sse_field("id", &id)?;
3009                frame.push_str("id: ");
3010                frame.push_str(&id);
3011                frame.push('\n');
3012            }
3013            if let Some(event_type) = dict
3014                .get("event")
3015                .or_else(|| dict.get("name"))
3016                .or_else(|| options.get("event"))
3017            {
3018                let event_type = event_type.display();
3019                validate_sse_field("event", &event_type)?;
3020                frame.push_str("event: ");
3021                frame.push_str(&event_type);
3022                frame.push('\n');
3023                has_event_payload = true;
3024            }
3025            if let Some(retry) = dict
3026                .get("retry")
3027                .or_else(|| dict.get("retry_ms"))
3028                .or_else(|| options.get("retry"))
3029                .or_else(|| options.get("retry_ms"))
3030            {
3031                let retry_ms = retry.as_int().ok_or_else(|| {
3032                    vm_error("sse_event: retry/retry_ms must be a non-negative integer")
3033                })?;
3034                if retry_ms < 0 {
3035                    return Err(vm_error(
3036                        "sse_event: retry/retry_ms must be a non-negative integer",
3037                    ));
3038                }
3039                frame.push_str("retry: ");
3040                frame.push_str(&retry_ms.to_string());
3041                frame.push('\n');
3042                has_event_payload = true;
3043            }
3044            if let Some(data) = dict.get("data").or_else(|| options.get("data")) {
3045                push_sse_multiline_field(&mut frame, "data", &data.display());
3046                has_event_payload = true;
3047            } else if !frame.is_empty() && !dict.contains_key("comment") {
3048                push_sse_multiline_field(&mut frame, "data", "");
3049                has_event_payload = true;
3050            }
3051        }
3052        other => {
3053            if let Some(comment) = options.get("comment") {
3054                push_sse_comment(&mut frame, &comment.display());
3055            }
3056            if let Some(id) = options.get("id") {
3057                let id = id.display();
3058                validate_sse_field("id", &id)?;
3059                frame.push_str("id: ");
3060                frame.push_str(&id);
3061                frame.push('\n');
3062            }
3063            if let Some(event_type) = options.get("event") {
3064                let event_type = event_type.display();
3065                validate_sse_field("event", &event_type)?;
3066                frame.push_str("event: ");
3067                frame.push_str(&event_type);
3068                frame.push('\n');
3069            }
3070            if let Some(retry) = options.get("retry").or_else(|| options.get("retry_ms")) {
3071                let retry_ms = retry.as_int().ok_or_else(|| {
3072                    vm_error("sse_event: retry/retry_ms must be a non-negative integer")
3073                })?;
3074                if retry_ms < 0 {
3075                    return Err(vm_error(
3076                        "sse_event: retry/retry_ms must be a non-negative integer",
3077                    ));
3078                }
3079                frame.push_str("retry: ");
3080                frame.push_str(&retry_ms.to_string());
3081                frame.push('\n');
3082            }
3083            push_sse_multiline_field(&mut frame, "data", &other.display());
3084            has_event_payload = true;
3085        }
3086    }
3087
3088    if frame.is_empty() || !has_event_payload && !frame.starts_with(':') {
3089        push_sse_comment(&mut frame, "");
3090    }
3091    frame.push('\n');
3092    Ok(frame)
3093}
3094
3095fn push_sse_comment(frame: &mut String, comment: &str) {
3096    let normalized = comment.replace("\r\n", "\n").replace('\r', "\n");
3097    if normalized.is_empty() {
3098        frame.push_str(":\n");
3099        return;
3100    }
3101    for line in normalized.split('\n') {
3102        frame.push_str(": ");
3103        frame.push_str(line);
3104        frame.push('\n');
3105    }
3106}
3107
3108fn vm_sse_server_response(options: &BTreeMap<String, VmValue>) -> Result<VmValue, VmError> {
3109    let id = next_transport_handle("sse-server");
3110    let status = options
3111        .get("status")
3112        .and_then(|value| value.as_int())
3113        .unwrap_or(200)
3114        .clamp(100, 599);
3115    let handle = SseServerHandle {
3116        status,
3117        headers: sse_response_headers(options),
3118        frames: VecDeque::new(),
3119        max_event_bytes: transport_limit_option(
3120            options,
3121            "max_event_bytes",
3122            DEFAULT_MAX_MESSAGE_BYTES,
3123        )
3124        .max(1),
3125        max_buffered_events: transport_limit_option(
3126            options,
3127            "max_buffered_events",
3128            DEFAULT_MAX_STREAM_EVENTS,
3129        )
3130        .max(1),
3131        sent_events: 0,
3132        flushed_events: 0,
3133        closed: false,
3134        disconnected: false,
3135        cancelled: false,
3136        cancel_reason: None,
3137    };
3138    let value = sse_server_response_value(&id, &handle);
3139    SSE_SERVER_HANDLES.with(|handles| {
3140        let mut handles = handles.borrow_mut();
3141        if handles.len() >= MAX_SSE_SERVER_STREAMS {
3142            return Err(vm_error(format!(
3143                "sse_server_response: maximum open streams ({MAX_SSE_SERVER_STREAMS}) reached"
3144            )));
3145        }
3146        handles.insert(id, handle);
3147        Ok(())
3148    })?;
3149    Ok(value)
3150}
3151
3152fn sse_server_status_value(id: &str, handle: &SseServerHandle) -> VmValue {
3153    let mut dict = BTreeMap::new();
3154    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
3155    dict.insert("status".to_string(), VmValue::Int(handle.status));
3156    dict.insert(
3157        "headers".to_string(),
3158        VmValue::Dict(Rc::new(handle.headers.clone())),
3159    );
3160    dict.insert(
3161        "buffered_events".to_string(),
3162        VmValue::Int(handle.frames.len() as i64),
3163    );
3164    dict.insert(
3165        "sent_events".to_string(),
3166        VmValue::Int(handle.sent_events as i64),
3167    );
3168    dict.insert(
3169        "flushed_events".to_string(),
3170        VmValue::Int(handle.flushed_events as i64),
3171    );
3172    dict.insert("closed".to_string(), VmValue::Bool(handle.closed));
3173    dict.insert(
3174        "disconnected".to_string(),
3175        VmValue::Bool(handle.disconnected),
3176    );
3177    dict.insert("cancelled".to_string(), VmValue::Bool(handle.cancelled));
3178    dict.insert(
3179        "cancel_reason".to_string(),
3180        handle
3181            .cancel_reason
3182            .as_deref()
3183            .map(|reason| VmValue::String(Rc::from(reason)))
3184            .unwrap_or(VmValue::Nil),
3185    );
3186    dict.insert(
3187        "max_event_bytes".to_string(),
3188        VmValue::Int(handle.max_event_bytes as i64),
3189    );
3190    dict.insert(
3191        "max_buffered_events".to_string(),
3192        VmValue::Int(handle.max_buffered_events as i64),
3193    );
3194    VmValue::Dict(Rc::new(dict))
3195}
3196
3197fn vm_sse_server_status(stream_id: &str) -> Result<VmValue, VmError> {
3198    SSE_SERVER_HANDLES.with(|handles| {
3199        handles
3200            .borrow()
3201            .get(stream_id)
3202            .map(|handle| sse_server_status_value(stream_id, handle))
3203            .ok_or_else(|| vm_error(format!("sse_server_status: unknown stream '{stream_id}'")))
3204    })
3205}
3206
3207fn vm_sse_server_send(
3208    stream_id: &str,
3209    event: &VmValue,
3210    options: &BTreeMap<String, VmValue>,
3211) -> Result<VmValue, VmError> {
3212    let frame = vm_sse_event_frame(event, options)?;
3213    SSE_SERVER_HANDLES.with(|handles| {
3214        let mut handles = handles.borrow_mut();
3215        let handle = handles
3216            .get_mut(stream_id)
3217            .ok_or_else(|| vm_error(format!("sse_server_send: unknown stream '{stream_id}'")))?;
3218        if handle.closed || handle.cancelled || handle.disconnected {
3219            return Ok(VmValue::Bool(false));
3220        }
3221        if frame.len() > handle.max_event_bytes {
3222            return Err(vm_error(format!(
3223                "sse_server_send: event exceeded max_event_bytes ({})",
3224                handle.max_event_bytes
3225            )));
3226        }
3227        if handle.frames.len() >= handle.max_buffered_events {
3228            return Err(vm_error(format!(
3229                "sse_server_send: buffered events exceeded max_buffered_events ({})",
3230                handle.max_buffered_events
3231            )));
3232        }
3233        handle.frames.push_back(frame);
3234        handle.sent_events += 1;
3235        Ok(VmValue::Bool(true))
3236    })
3237}
3238
3239fn vm_sse_server_heartbeat(stream_id: &str, comment: Option<&VmValue>) -> Result<VmValue, VmError> {
3240    let mut frame = String::new();
3241    push_sse_comment(
3242        &mut frame,
3243        &comment
3244            .map(|value| value.display())
3245            .unwrap_or_else(|| "heartbeat".to_string()),
3246    );
3247    frame.push('\n');
3248    SSE_SERVER_HANDLES.with(|handles| {
3249        let mut handles = handles.borrow_mut();
3250        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3251            vm_error(format!(
3252                "sse_server_heartbeat: unknown stream '{stream_id}'"
3253            ))
3254        })?;
3255        if handle.closed || handle.cancelled || handle.disconnected {
3256            return Ok(VmValue::Bool(false));
3257        }
3258        if frame.len() > handle.max_event_bytes {
3259            return Err(vm_error(format!(
3260                "sse_server_heartbeat: event exceeded max_event_bytes ({})",
3261                handle.max_event_bytes
3262            )));
3263        }
3264        if handle.frames.len() >= handle.max_buffered_events {
3265            return Err(vm_error(format!(
3266                "sse_server_heartbeat: buffered events exceeded max_buffered_events ({})",
3267                handle.max_buffered_events
3268            )));
3269        }
3270        handle.frames.push_back(frame);
3271        handle.sent_events += 1;
3272        Ok(VmValue::Bool(true))
3273    })
3274}
3275
3276fn vm_sse_server_flush(stream_id: &str) -> Result<VmValue, VmError> {
3277    SSE_SERVER_HANDLES.with(|handles| {
3278        let mut handles = handles.borrow_mut();
3279        let handle = handles
3280            .get_mut(stream_id)
3281            .ok_or_else(|| vm_error(format!("sse_server_flush: unknown stream '{stream_id}'")))?;
3282        if handle.disconnected || handle.cancelled {
3283            return Ok(VmValue::Bool(false));
3284        }
3285        handle.flushed_events = handle.sent_events;
3286        Ok(VmValue::Bool(!handle.closed))
3287    })
3288}
3289
3290fn vm_sse_server_close(stream_id: &str) -> Result<VmValue, VmError> {
3291    SSE_SERVER_HANDLES.with(|handles| {
3292        let mut handles = handles.borrow_mut();
3293        let handle = handles
3294            .get_mut(stream_id)
3295            .ok_or_else(|| vm_error(format!("sse_server_close: unknown stream '{stream_id}'")))?;
3296        if handle.closed {
3297            return Ok(VmValue::Bool(false));
3298        }
3299        handle.closed = true;
3300        Ok(VmValue::Bool(true))
3301    })
3302}
3303
3304fn vm_sse_server_cancel(stream_id: &str, reason: Option<&VmValue>) -> Result<VmValue, VmError> {
3305    SSE_SERVER_HANDLES.with(|handles| {
3306        let mut handles = handles.borrow_mut();
3307        let handle = handles
3308            .get_mut(stream_id)
3309            .ok_or_else(|| vm_error(format!("sse_server_cancel: unknown stream '{stream_id}'")))?;
3310        if handle.cancelled {
3311            return Ok(VmValue::Bool(false));
3312        }
3313        handle.cancelled = true;
3314        handle.closed = true;
3315        handle.cancel_reason = reason
3316            .map(|value| value.display())
3317            .filter(|value| !value.is_empty());
3318        Ok(VmValue::Bool(true))
3319    })
3320}
3321
3322fn vm_sse_server_observed_bool(
3323    stream_id: &str,
3324    builtin: &str,
3325    predicate: impl Fn(&SseServerHandle) -> bool,
3326) -> Result<VmValue, VmError> {
3327    SSE_SERVER_HANDLES.with(|handles| {
3328        handles
3329            .borrow()
3330            .get(stream_id)
3331            .map(|handle| VmValue::Bool(predicate(handle)))
3332            .ok_or_else(|| vm_error(format!("{builtin}: unknown stream '{stream_id}'")))
3333    })
3334}
3335
3336fn vm_sse_server_mock_receive(stream_id: &str) -> Result<VmValue, VmError> {
3337    SSE_SERVER_HANDLES.with(|handles| {
3338        let mut handles = handles.borrow_mut();
3339        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3340            vm_error(format!(
3341                "sse_server_mock_receive: unknown stream '{stream_id}'"
3342            ))
3343        })?;
3344        if let Some(frame) = handle.frames.pop_front() {
3345            return Ok(sse_server_mock_frame_value(&frame));
3346        }
3347        if handle.closed || handle.cancelled || handle.disconnected {
3348            return Ok(sse_server_closed_event());
3349        }
3350        Ok(timeout_event())
3351    })
3352}
3353
3354fn vm_sse_server_mock_disconnect(stream_id: &str) -> Result<VmValue, VmError> {
3355    SSE_SERVER_HANDLES.with(|handles| {
3356        let mut handles = handles.borrow_mut();
3357        let handle = handles.get_mut(stream_id).ok_or_else(|| {
3358            vm_error(format!(
3359                "sse_server_mock_disconnect: unknown stream '{stream_id}'"
3360            ))
3361        })?;
3362        if handle.disconnected {
3363            return Ok(VmValue::Bool(false));
3364        }
3365        handle.disconnected = true;
3366        handle.closed = true;
3367        Ok(VmValue::Bool(true))
3368    })
3369}
3370
3371fn sse_server_mock_frame_value(frame: &str) -> VmValue {
3372    let mut event = MockStreamEvent {
3373        event_type: "message".to_string(),
3374        data: String::new(),
3375        id: None,
3376        retry_ms: None,
3377    };
3378    let mut data_lines = Vec::new();
3379    let mut comments = Vec::new();
3380    for raw in frame.lines() {
3381        if raw.is_empty() {
3382            continue;
3383        }
3384        if let Some(comment) = raw.strip_prefix(':') {
3385            comments.push(comment.strip_prefix(' ').unwrap_or(comment).to_string());
3386            continue;
3387        }
3388        let (field, value) = raw.split_once(':').unwrap_or((raw, ""));
3389        let value = value.strip_prefix(' ').unwrap_or(value);
3390        match field {
3391            "event" => event.event_type = value.to_string(),
3392            "data" => data_lines.push(value.to_string()),
3393            "id" => event.id = Some(value.to_string()).filter(|value| !value.is_empty()),
3394            "retry" => event.retry_ms = value.parse::<i64>().ok(),
3395            _ => {}
3396        }
3397    }
3398    if !data_lines.is_empty() {
3399        event.data = data_lines.join("\n");
3400    }
3401    let mut value = if comments.is_empty() || !data_lines.is_empty() {
3402        sse_event_value(&event)
3403    } else {
3404        let mut dict = BTreeMap::new();
3405        dict.insert("type".to_string(), VmValue::String(Rc::from("comment")));
3406        dict.insert(
3407            "comment".to_string(),
3408            VmValue::String(Rc::from(comments.join("\n"))),
3409        );
3410        VmValue::Dict(Rc::new(dict))
3411    };
3412    if let VmValue::Dict(dict) = &mut value {
3413        let mut owned = (**dict).clone();
3414        owned.insert("raw".to_string(), VmValue::String(Rc::from(frame)));
3415        value = VmValue::Dict(Rc::new(owned));
3416    }
3417    value
3418}
3419
3420fn real_sse_event_value(event: SseEvent) -> VmValue {
3421    match event {
3422        SseEvent::Open => {
3423            let mut dict = BTreeMap::new();
3424            dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
3425            VmValue::Dict(Rc::new(dict))
3426        }
3427        SseEvent::Message(message) => {
3428            let retry_ms = message.retry.map(|retry| retry.as_millis() as i64);
3429            sse_event_value(&MockStreamEvent {
3430                event_type: if message.event.is_empty() {
3431                    "message".to_string()
3432                } else {
3433                    message.event
3434                },
3435                data: message.data,
3436                id: if message.id.is_empty() {
3437                    None
3438                } else {
3439                    Some(message.id)
3440                },
3441                retry_ms,
3442            })
3443        }
3444    }
3445}
3446
3447fn consume_sse_mock(url: &str) -> Option<Vec<MockStreamEvent>> {
3448    SSE_MOCKS.with(|mocks| {
3449        mocks
3450            .borrow()
3451            .iter()
3452            .find(|mock| url_matches(&mock.url_pattern, url))
3453            .map(|mock| mock.events.clone())
3454    })
3455}
3456
3457fn parse_ws_message(value: &VmValue) -> MockWsMessage {
3458    match value {
3459        VmValue::Dict(dict) => {
3460            let message_type = dict
3461                .get("type")
3462                .map(|value| value.display())
3463                .filter(|value| !value.is_empty())
3464                .unwrap_or_else(|| "text".to_string());
3465            let data = if dict
3466                .get("base64")
3467                .and_then(|value| match value {
3468                    VmValue::Bool(value) => Some(*value),
3469                    _ => None,
3470                })
3471                .unwrap_or(false)
3472            {
3473                use base64::Engine;
3474                dict.get("data")
3475                    .map(|value| value.display())
3476                    .and_then(|data| base64::engine::general_purpose::STANDARD.decode(data).ok())
3477                    .unwrap_or_default()
3478            } else {
3479                dict.get("data")
3480                    .map(|value| value.display().into_bytes())
3481                    .unwrap_or_default()
3482            };
3483            MockWsMessage {
3484                message_type,
3485                data,
3486                close_code: dict
3487                    .get("code")
3488                    .and_then(|value| value.as_int())
3489                    .map(|value| value as u16),
3490                close_reason: dict.get("reason").map(|value| value.display()),
3491            }
3492        }
3493        VmValue::Bytes(bytes) => MockWsMessage {
3494            message_type: "binary".to_string(),
3495            data: bytes.as_ref().clone(),
3496            close_code: None,
3497            close_reason: None,
3498        },
3499        other => MockWsMessage {
3500            message_type: "text".to_string(),
3501            data: other.display().into_bytes(),
3502            close_code: None,
3503            close_reason: None,
3504        },
3505    }
3506}
3507
3508fn parse_websocket_mock(value: Option<&VmValue>) -> (Vec<MockWsMessage>, bool) {
3509    let Some(value) = value else {
3510        return (Vec::new(), false);
3511    };
3512    match value {
3513        VmValue::Dict(dict) => {
3514            let echo = dict
3515                .get("echo")
3516                .and_then(|value| match value {
3517                    VmValue::Bool(value) => Some(*value),
3518                    _ => None,
3519                })
3520                .unwrap_or(false);
3521            let messages = dict
3522                .get("messages")
3523                .and_then(|messages| match messages {
3524                    VmValue::List(items) => Some(items.iter().map(parse_ws_message).collect()),
3525                    _ => None,
3526                })
3527                .unwrap_or_default();
3528            (messages, echo)
3529        }
3530        VmValue::List(items) => (items.iter().map(parse_ws_message).collect(), false),
3531        other => (vec![parse_ws_message(other)], false),
3532    }
3533}
3534
3535fn consume_websocket_mock(url: &str) -> Option<(Vec<MockWsMessage>, bool)> {
3536    WEBSOCKET_MOCKS.with(|mocks| {
3537        mocks
3538            .borrow()
3539            .iter()
3540            .find(|mock| url_matches(&mock.url_pattern, url))
3541            .map(|mock| (mock.messages.clone(), mock.echo))
3542    })
3543}
3544
3545fn ws_message_data(message: &MockWsMessage) -> String {
3546    match message.message_type.as_str() {
3547        "text" => String::from_utf8_lossy(&message.data).into_owned(),
3548        _ => {
3549            use base64::Engine;
3550            base64::engine::general_purpose::STANDARD.encode(&message.data)
3551        }
3552    }
3553}
3554
3555fn closed_event_with(code: Option<u16>, reason: Option<String>) -> VmValue {
3556    let mut dict = BTreeMap::new();
3557    dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
3558    if let Some(code) = code {
3559        dict.insert("code".to_string(), VmValue::Int(i64::from(code)));
3560    }
3561    if let Some(reason) = reason {
3562        dict.insert("reason".to_string(), VmValue::String(Rc::from(reason)));
3563    }
3564    VmValue::Dict(Rc::new(dict))
3565}
3566
3567fn ws_event_value(message: MockWsMessage) -> VmValue {
3568    if message.message_type == "close" {
3569        return closed_event_with(message.close_code, message.close_reason);
3570    }
3571    let mut dict = BTreeMap::new();
3572    dict.insert(
3573        "type".to_string(),
3574        VmValue::String(Rc::from(message.message_type.as_str())),
3575    );
3576    match message.message_type.as_str() {
3577        "text" => {
3578            dict.insert(
3579                "data".to_string(),
3580                VmValue::String(Rc::from(String::from_utf8_lossy(&message.data).as_ref())),
3581            );
3582        }
3583        _ => {
3584            use base64::Engine;
3585            dict.insert(
3586                "data_base64".to_string(),
3587                VmValue::String(Rc::from(
3588                    base64::engine::general_purpose::STANDARD
3589                        .encode(&message.data)
3590                        .as_str(),
3591                )),
3592            );
3593        }
3594    }
3595    VmValue::Dict(Rc::new(dict))
3596}
3597
3598fn real_ws_event_value(message: WsMessage) -> VmValue {
3599    match message {
3600        WsMessage::Text(text) => ws_event_value(MockWsMessage {
3601            message_type: "text".to_string(),
3602            data: text.as_bytes().to_vec(),
3603            close_code: None,
3604            close_reason: None,
3605        }),
3606        WsMessage::Binary(bytes) => ws_event_value(MockWsMessage {
3607            message_type: "binary".to_string(),
3608            data: bytes.to_vec(),
3609            close_code: None,
3610            close_reason: None,
3611        }),
3612        WsMessage::Ping(bytes) => ws_event_value(MockWsMessage {
3613            message_type: "ping".to_string(),
3614            data: bytes.to_vec(),
3615            close_code: None,
3616            close_reason: None,
3617        }),
3618        WsMessage::Pong(bytes) => ws_event_value(MockWsMessage {
3619            message_type: "pong".to_string(),
3620            data: bytes.to_vec(),
3621            close_code: None,
3622            close_reason: None,
3623        }),
3624        WsMessage::Close(frame) => match frame {
3625            Some(frame) => {
3626                closed_event_with(Some(u16::from(frame.code)), Some(frame.reason.to_string()))
3627            }
3628            None => closed_event(),
3629        },
3630        WsMessage::Frame(_) => VmValue::Nil,
3631    }
3632}
3633
3634fn transport_mock_call_value(call: &TransportMockCall) -> VmValue {
3635    let mut dict = BTreeMap::new();
3636    dict.insert(
3637        "kind".to_string(),
3638        VmValue::String(Rc::from(call.kind.as_str())),
3639    );
3640    dict.insert(
3641        "url".to_string(),
3642        VmValue::String(Rc::from(call.url.as_str())),
3643    );
3644    dict.insert(
3645        "handle".to_string(),
3646        call.handle
3647            .as_deref()
3648            .map(|handle| VmValue::String(Rc::from(handle)))
3649            .unwrap_or(VmValue::Nil),
3650    );
3651    dict.insert(
3652        "type".to_string(),
3653        call.message_type
3654            .as_deref()
3655            .map(|message_type| VmValue::String(Rc::from(message_type)))
3656            .unwrap_or(VmValue::Nil),
3657    );
3658    dict.insert(
3659        "data".to_string(),
3660        call.data
3661            .as_deref()
3662            .map(|data| VmValue::String(Rc::from(data)))
3663            .unwrap_or(VmValue::Nil),
3664    );
3665    VmValue::Dict(Rc::new(dict))
3666}
3667
3668fn method_is_retryable(retry: &RetryConfig, method: &reqwest::Method) -> bool {
3669    retry
3670        .retryable_methods
3671        .iter()
3672        .any(|candidate| candidate.eq_ignore_ascii_case(method.as_str()))
3673}
3674
3675fn should_retry_response(
3676    config: &HttpRequestConfig,
3677    method: &reqwest::Method,
3678    status: u16,
3679    attempt: u32,
3680) -> bool {
3681    attempt < config.retry.max
3682        && method_is_retryable(&config.retry, method)
3683        && config.retry.retryable_statuses.contains(&status)
3684}
3685
3686fn should_retry_transport(
3687    config: &HttpRequestConfig,
3688    method: &reqwest::Method,
3689    error: &reqwest::Error,
3690    attempt: u32,
3691) -> bool {
3692    attempt < config.retry.max
3693        && method_is_retryable(&config.retry, method)
3694        && (error.is_timeout() || error.is_connect())
3695}
3696
3697fn parse_retry_after_value(value: &str) -> Option<Duration> {
3698    let value = value.trim();
3699    if value.is_empty() {
3700        return None;
3701    }
3702
3703    if let Ok(secs) = value.parse::<f64>() {
3704        if !secs.is_finite() || secs < 0.0 {
3705            return Some(Duration::from_millis(0));
3706        }
3707        let millis = (secs * 1_000.0) as u64;
3708        return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3709    }
3710
3711    if let Ok(target) = httpdate::parse_http_date(value) {
3712        let millis = target
3713            .duration_since(SystemTime::now())
3714            .map(|delta| delta.as_millis() as u64)
3715            .unwrap_or(0);
3716        return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3717    }
3718
3719    None
3720}
3721
3722fn parse_retry_after_header(value: &reqwest::header::HeaderValue) -> Option<Duration> {
3723    value.to_str().ok().and_then(parse_retry_after_value)
3724}
3725
3726fn mock_retry_after(status: u16, headers: &BTreeMap<String, VmValue>) -> Option<Duration> {
3727    if !(status == 429 || status == 503) {
3728        return None;
3729    }
3730
3731    headers
3732        .iter()
3733        .find(|(name, _)| name.eq_ignore_ascii_case("retry-after"))
3734        .and_then(|(_, value)| parse_retry_after_value(&value.display()))
3735}
3736
3737fn response_retry_after(
3738    status: u16,
3739    headers: &reqwest::header::HeaderMap,
3740    respect_retry_after: bool,
3741) -> Option<Duration> {
3742    if !respect_retry_after || !(status == 429 || status == 503) {
3743        return None;
3744    }
3745    headers
3746        .get(reqwest::header::RETRY_AFTER)
3747        .and_then(parse_retry_after_header)
3748}
3749
3750fn compute_retry_delay(attempt: u32, base_ms: u64, retry_after: Option<Duration>) -> Duration {
3751    use rand::RngExt;
3752
3753    let base_delay = base_ms.saturating_mul(1u64 << attempt.min(30));
3754    let jitter: f64 = rand::rng().random_range(0.75..=1.25);
3755    let exponential_ms = ((base_delay as f64 * jitter) as u64).min(MAX_RETRY_DELAY_MS);
3756    let retry_after_ms = retry_after
3757        .map(|duration| duration.as_millis() as u64)
3758        .unwrap_or(0)
3759        .min(MAX_RETRY_DELAY_MS);
3760    Duration::from_millis(exponential_ms.max(retry_after_ms))
3761}
3762
3763async fn vm_execute_http_request(
3764    method: &str,
3765    url: &str,
3766    options: &BTreeMap<String, VmValue>,
3767) -> Result<VmValue, VmError> {
3768    if let Some(session_id) = session_from_options(options) {
3769        return vm_execute_http_session_request(&session_id, method, url, options).await;
3770    }
3771
3772    let config = parse_http_options(options);
3773    let client = pooled_http_client(&config)?;
3774    vm_execute_http_request_with_client(client, &config, method, url, options).await
3775}
3776
3777async fn vm_execute_http_session_request(
3778    session_id: &str,
3779    method: &str,
3780    url: &str,
3781    options: &BTreeMap<String, VmValue>,
3782) -> Result<VmValue, VmError> {
3783    let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(session_id).cloned());
3784    let Some(session) = session else {
3785        return Err(vm_error(format!(
3786            "http_session_request: unknown HTTP session '{session_id}'"
3787        )));
3788    };
3789    let merged_options = merge_options(&session.options, options);
3790    let config = parse_http_options(&merged_options);
3791    vm_execute_http_request_with_client(session.client, &config, method, url, &merged_options).await
3792}
3793
3794async fn vm_execute_http_request_with_client(
3795    client: reqwest::Client,
3796    config: &HttpRequestConfig,
3797    method: &str,
3798    url: &str,
3799    options: &BTreeMap<String, VmValue>,
3800) -> Result<VmValue, VmError> {
3801    let parts = parse_http_request_parts(method, options)?;
3802
3803    if !url.starts_with("http://") && !url.starts_with("https://") {
3804        return Err(vm_error(format!(
3805            "http: URL must start with http:// or https://, got '{url}'"
3806        )));
3807    }
3808    crate::egress::enforce_url_allowed("http_request", url).await?;
3809
3810    for attempt in 0..=config.retry.max {
3811        if let Some(mock_response) = consume_http_mock(
3812            method,
3813            url,
3814            parts.recorded_headers.clone(),
3815            parts.body.clone(),
3816        ) {
3817            let status = mock_response.status.clamp(0, u16::MAX as i64) as u16;
3818            if should_retry_response(config, &parts.method, status, attempt) {
3819                let retry_after = if config.retry.respect_retry_after {
3820                    mock_retry_after(status, &mock_response.headers)
3821                } else {
3822                    None
3823                };
3824                tokio::time::sleep(compute_retry_delay(
3825                    attempt,
3826                    config.retry.backoff_ms,
3827                    retry_after,
3828                ))
3829                .await;
3830                continue;
3831            }
3832
3833            return Ok(build_http_response(
3834                mock_response.status,
3835                mock_response.headers,
3836                mock_response.body,
3837            ));
3838        }
3839
3840        let mut req = client.request(parts.method.clone(), url);
3841        req = req
3842            .headers(parts.headers.clone())
3843            .timeout(Duration::from_millis(config.total_timeout_ms));
3844        if let Some(multipart) = &parts.multipart {
3845            req = req.multipart(multipart_form(multipart)?);
3846        } else if let Some(ref b) = parts.body {
3847            req = req.body(b.clone());
3848        }
3849
3850        match req.send().await {
3851            Ok(response) => {
3852                verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3853                let status = response.status().as_u16();
3854                if should_retry_response(config, &parts.method, status, attempt) {
3855                    let retry_after = response_retry_after(
3856                        status,
3857                        response.headers(),
3858                        config.retry.respect_retry_after,
3859                    );
3860                    tokio::time::sleep(compute_retry_delay(
3861                        attempt,
3862                        config.retry.backoff_ms,
3863                        retry_after,
3864                    ))
3865                    .await;
3866                    continue;
3867                }
3868
3869                let resp_headers = response_headers(response.headers());
3870
3871                let body_text = response
3872                    .text()
3873                    .await
3874                    .map_err(|e| vm_error(format!("http: failed to read response body: {e}")))?;
3875                return Ok(build_http_response(status as i64, resp_headers, body_text));
3876            }
3877            Err(e) => {
3878                if should_retry_transport(config, &parts.method, &e, attempt) {
3879                    tokio::time::sleep(compute_retry_delay(attempt, config.retry.backoff_ms, None))
3880                        .await;
3881                    continue;
3882                }
3883                return Err(vm_error(format!("http: request failed: {e}")));
3884            }
3885        }
3886    }
3887
3888    Err(vm_error("http: request failed"))
3889}
3890
3891async fn vm_http_download(
3892    url: &str,
3893    dst_path: &str,
3894    options: &BTreeMap<String, VmValue>,
3895) -> Result<VmValue, VmError> {
3896    let method = options
3897        .get("method")
3898        .map(|value| value.display())
3899        .filter(|value| !value.is_empty())
3900        .unwrap_or_else(|| "GET".to_string())
3901        .to_uppercase();
3902    let parts = parse_http_request_parts(&method, options)?;
3903    if let Some(mock_response) = consume_http_mock(
3904        &method,
3905        url,
3906        parts.recorded_headers.clone(),
3907        parts.body.clone(),
3908    ) {
3909        let resolved = resolve_http_path(
3910            "http_download",
3911            dst_path,
3912            crate::stdlib::sandbox::FsAccess::Write,
3913        )?;
3914        std::fs::write(&resolved, mock_response.body.as_bytes()).map_err(|error| {
3915            vm_error(format!(
3916                "http_download: failed to write {}: {error}",
3917                resolved.display()
3918            ))
3919        })?;
3920        return Ok(build_http_download_response(
3921            mock_response.status,
3922            mock_response.headers,
3923            mock_response.body.len() as u64,
3924        ));
3925    }
3926
3927    if !url.starts_with("http://") && !url.starts_with("https://") {
3928        return Err(vm_error(format!(
3929            "http_download: URL must start with http:// or https://, got '{url}'"
3930        )));
3931    }
3932    crate::egress::enforce_url_allowed("http_download", url).await?;
3933    let config = parse_http_options(options);
3934    let client = if let Some(session_id) = session_from_options(options) {
3935        HTTP_SESSIONS
3936            .with(|sessions| sessions.borrow().get(&session_id).cloned())
3937            .map(|session| session.client)
3938            .ok_or_else(|| {
3939                vm_error(format!(
3940                    "http_download: unknown HTTP session '{session_id}'"
3941                ))
3942            })?
3943    } else {
3944        pooled_http_client(&config)?
3945    };
3946    let mut request = client
3947        .request(parts.method, url)
3948        .headers(parts.headers)
3949        .timeout(Duration::from_millis(config.total_timeout_ms));
3950    if let Some(multipart) = &parts.multipart {
3951        request = request.multipart(multipart_form(multipart)?);
3952    } else if let Some(body) = parts.body {
3953        request = request.body(body);
3954    }
3955    let mut response = request
3956        .send()
3957        .await
3958        .map_err(|error| vm_error(format!("http_download: request failed: {error}")))?;
3959    verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3960    let status = response.status().as_u16() as i64;
3961    let headers = response_headers(response.headers());
3962    let resolved = resolve_http_path(
3963        "http_download",
3964        dst_path,
3965        crate::stdlib::sandbox::FsAccess::Write,
3966    )?;
3967    let mut file = std::fs::File::create(&resolved).map_err(|error| {
3968        vm_error(format!(
3969            "http_download: failed to create {}: {error}",
3970            resolved.display()
3971        ))
3972    })?;
3973    let mut bytes_written = 0_u64;
3974    while let Some(chunk) = response.chunk().await.map_err(|error| {
3975        vm_error(format!(
3976            "http_download: failed to read response body: {error}"
3977        ))
3978    })? {
3979        file.write_all(&chunk).map_err(|error| {
3980            vm_error(format!(
3981                "http_download: failed to write {}: {error}",
3982                resolved.display()
3983            ))
3984        })?;
3985        bytes_written += chunk.len() as u64;
3986    }
3987    Ok(build_http_download_response(status, headers, bytes_written))
3988}
3989
3990async fn vm_http_stream_open(
3991    url: &str,
3992    options: &BTreeMap<String, VmValue>,
3993) -> Result<VmValue, VmError> {
3994    let method = options
3995        .get("method")
3996        .map(|value| value.display())
3997        .filter(|value| !value.is_empty())
3998        .unwrap_or_else(|| "GET".to_string())
3999        .to_uppercase();
4000    let parts = parse_http_request_parts(&method, options)?;
4001    let id = next_transport_handle("http-stream");
4002    if let Some(mock_response) = consume_http_mock(
4003        &method,
4004        url,
4005        parts.recorded_headers.clone(),
4006        parts.body.clone(),
4007    ) {
4008        let handle = HttpStreamHandle {
4009            kind: HttpStreamKind::Fake,
4010            status: mock_response.status,
4011            headers: mock_response.headers,
4012            pending: mock_response.body.into_bytes().into(),
4013            closed: false,
4014        };
4015        HTTP_STREAMS.with(|streams| {
4016            let mut streams = streams.borrow_mut();
4017            if streams.len() >= MAX_HTTP_STREAMS {
4018                return Err(vm_error(format!(
4019                    "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
4020                )));
4021            }
4022            streams.insert(id.clone(), handle);
4023            Ok(())
4024        })?;
4025        return Ok(VmValue::String(Rc::from(id)));
4026    }
4027
4028    if !url.starts_with("http://") && !url.starts_with("https://") {
4029        return Err(vm_error(format!(
4030            "http_stream_open: URL must start with http:// or https://, got '{url}'"
4031        )));
4032    }
4033    crate::egress::enforce_url_allowed("http_stream_open", url).await?;
4034    let config = parse_http_options(options);
4035    let client = if let Some(session_id) = session_from_options(options) {
4036        HTTP_SESSIONS
4037            .with(|sessions| sessions.borrow().get(&session_id).cloned())
4038            .map(|session| session.client)
4039            .ok_or_else(|| {
4040                vm_error(format!(
4041                    "http_stream_open: unknown HTTP session '{session_id}'"
4042                ))
4043            })?
4044    } else {
4045        pooled_http_client(&config)?
4046    };
4047    let mut request = client
4048        .request(parts.method, url)
4049        .headers(parts.headers)
4050        .timeout(Duration::from_millis(config.total_timeout_ms));
4051    if let Some(multipart) = &parts.multipart {
4052        request = request.multipart(multipart_form(multipart)?);
4053    } else if let Some(body) = parts.body {
4054        request = request.body(body);
4055    }
4056    let response = request
4057        .send()
4058        .await
4059        .map_err(|error| vm_error(format!("http_stream_open: request failed: {error}")))?;
4060    verify_tls_pin(&response, &config.tls.pinned_sha256)?;
4061    let status = response.status().as_u16() as i64;
4062    let headers = response_headers(response.headers());
4063    let handle = HttpStreamHandle {
4064        kind: HttpStreamKind::Real(Rc::new(tokio::sync::Mutex::new(response))),
4065        status,
4066        headers,
4067        pending: VecDeque::new(),
4068        closed: false,
4069    };
4070    HTTP_STREAMS.with(|streams| {
4071        let mut streams = streams.borrow_mut();
4072        if streams.len() >= MAX_HTTP_STREAMS {
4073            return Err(vm_error(format!(
4074                "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
4075            )));
4076        }
4077        streams.insert(id.clone(), handle);
4078        Ok(())
4079    })?;
4080    Ok(VmValue::String(Rc::from(id)))
4081}
4082
4083async fn vm_http_stream_read(stream_id: &str, max_bytes: usize) -> Result<VmValue, VmError> {
4084    let (kind, mut pending, closed) = HTTP_STREAMS
4085        .with(|streams| {
4086            let mut streams = streams.borrow_mut();
4087            let handle = streams.get_mut(stream_id)?;
4088            let kind = match &handle.kind {
4089                HttpStreamKind::Real(response) => HttpStreamKind::Real(response.clone()),
4090                HttpStreamKind::Fake => HttpStreamKind::Fake,
4091            };
4092            let pending = std::mem::take(&mut handle.pending);
4093            Some((kind, pending, handle.closed))
4094        })
4095        .ok_or_else(|| vm_error(format!("http_stream_read: unknown stream '{stream_id}'")))?;
4096    if closed {
4097        return Ok(VmValue::Nil);
4098    }
4099    if pending.is_empty() {
4100        match kind {
4101            HttpStreamKind::Fake => {}
4102            HttpStreamKind::Real(response) => {
4103                let mut response = response.lock().await;
4104                if let Some(chunk) = response.chunk().await.map_err(|error| {
4105                    vm_error(format!(
4106                        "http_stream_read: failed to read response body: {error}"
4107                    ))
4108                })? {
4109                    pending.extend(chunk);
4110                }
4111            }
4112        }
4113    }
4114    if pending.is_empty() {
4115        HTTP_STREAMS.with(|streams| {
4116            if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
4117                handle.closed = true;
4118            }
4119        });
4120        return Ok(VmValue::Nil);
4121    }
4122    let take = pending.len().min(max_bytes.max(1));
4123    let chunk = pending.drain(..take).collect::<Vec<_>>();
4124    HTTP_STREAMS.with(|streams| {
4125        if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
4126            handle.pending = pending;
4127        }
4128    });
4129    Ok(VmValue::Bytes(Rc::new(chunk)))
4130}
4131
4132fn vm_http_stream_info(stream_id: &str) -> Result<VmValue, VmError> {
4133    HTTP_STREAMS.with(|streams| {
4134        let streams = streams.borrow();
4135        let handle = streams
4136            .get(stream_id)
4137            .ok_or_else(|| vm_error(format!("http_stream_info: unknown stream '{stream_id}'")))?;
4138        let mut dict = BTreeMap::new();
4139        dict.insert("status".to_string(), VmValue::Int(handle.status));
4140        dict.insert(
4141            "headers".to_string(),
4142            VmValue::Dict(Rc::new(handle.headers.clone())),
4143        );
4144        dict.insert(
4145            "ok".to_string(),
4146            VmValue::Bool((200..300).contains(&(handle.status as u16))),
4147        );
4148        Ok(VmValue::Dict(Rc::new(dict)))
4149    })
4150}
4151
4152async fn vm_sse_connect(
4153    method: &str,
4154    url: &str,
4155    options: &BTreeMap<String, VmValue>,
4156) -> Result<VmValue, VmError> {
4157    let id = next_transport_handle("sse");
4158    let max_events =
4159        transport_limit_option(options, "max_events", DEFAULT_MAX_STREAM_EVENTS).max(1);
4160    let max_message_bytes =
4161        transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4162
4163    if let Some(events) = consume_sse_mock(url) {
4164        let handle = SseHandle {
4165            kind: SseHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeSseStream {
4166                events: events.into(),
4167                opened: false,
4168                closed: false,
4169            }))),
4170            url: url.to_string(),
4171            max_events,
4172            max_message_bytes,
4173            received: 0,
4174        };
4175        SSE_HANDLES.with(|handles| {
4176            let mut handles = handles.borrow_mut();
4177            if handles.len() >= MAX_SSE_STREAMS {
4178                return Err(vm_error(format!(
4179                    "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4180                )));
4181            }
4182            handles.insert(id.clone(), handle);
4183            Ok(())
4184        })?;
4185        record_transport_call(TransportMockCall {
4186            kind: "sse_connect".to_string(),
4187            handle: Some(id.clone()),
4188            url: url.to_string(),
4189            message_type: None,
4190            data: None,
4191        });
4192        return Ok(VmValue::String(Rc::from(id)));
4193    }
4194
4195    if !url.starts_with("http://") && !url.starts_with("https://") {
4196        return Err(vm_error(format!(
4197            "sse_connect: URL must start with http:// or https://, got '{url}'"
4198        )));
4199    }
4200    crate::egress::enforce_url_allowed("sse_connect", url).await?;
4201
4202    let config = parse_http_options(options);
4203    let client = if let Some(session_id) = session_from_options(options) {
4204        let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(&session_id).cloned());
4205        session
4206            .map(|session| session.client)
4207            .ok_or_else(|| vm_error(format!("sse_connect: unknown HTTP session '{session_id}'")))?
4208    } else {
4209        pooled_http_client(&config)?
4210    };
4211    let parts = parse_http_request_parts(method, options)?;
4212    let mut request = client
4213        .request(parts.method, url)
4214        .headers(parts.headers)
4215        .timeout(Duration::from_millis(config.total_timeout_ms));
4216    if let Some(body) = parts.body {
4217        request = request.body(body);
4218    }
4219    let stream = EventSource::new(request)
4220        .map_err(|error| vm_error(format!("sse_connect: failed to create stream: {error}")))?;
4221    let handle = SseHandle {
4222        kind: SseHandleKind::Real(Rc::new(tokio::sync::Mutex::new(stream))),
4223        url: url.to_string(),
4224        max_events,
4225        max_message_bytes,
4226        received: 0,
4227    };
4228    SSE_HANDLES.with(|handles| {
4229        let mut handles = handles.borrow_mut();
4230        if handles.len() >= MAX_SSE_STREAMS {
4231            return Err(vm_error(format!(
4232                "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4233            )));
4234        }
4235        handles.insert(id.clone(), handle);
4236        Ok(())
4237    })?;
4238    Ok(VmValue::String(Rc::from(id)))
4239}
4240
4241async fn vm_sse_receive(stream_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4242    let stream = SSE_HANDLES.with(|handles| {
4243        let mut handles = handles.borrow_mut();
4244        let handle = handles.get_mut(stream_id)?;
4245        if handle.received >= handle.max_events {
4246            return Some(Err(vm_error(format!(
4247                "sse_receive: stream '{stream_id}' exceeded max_events"
4248            ))));
4249        }
4250        handle.received += 1;
4251        let url = handle.url.clone();
4252        let max_message_bytes = handle.max_message_bytes;
4253        let kind = match &handle.kind {
4254            SseHandleKind::Real(stream) => SseHandleKind::Real(stream.clone()),
4255            SseHandleKind::Fake(stream) => SseHandleKind::Fake(stream.clone()),
4256        };
4257        Some(Ok((kind, url, max_message_bytes)))
4258    });
4259    let Some(stream) = stream else {
4260        return Err(vm_error(format!(
4261            "sse_receive: unknown stream '{stream_id}'"
4262        )));
4263    };
4264    let (kind, _url, max_message_bytes) = stream?;
4265
4266    match kind {
4267        SseHandleKind::Fake(stream) => {
4268            let mut stream = stream.lock().await;
4269            if stream.closed {
4270                return Ok(VmValue::Nil);
4271            }
4272            if !stream.opened {
4273                stream.opened = true;
4274                let mut dict = BTreeMap::new();
4275                dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
4276                return Ok(VmValue::Dict(Rc::new(dict)));
4277            }
4278            let Some(event) = stream.events.pop_front() else {
4279                stream.closed = true;
4280                return Ok(VmValue::Nil);
4281            };
4282            if event.data.len() > max_message_bytes {
4283                return Err(vm_error(format!(
4284                    "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4285                )));
4286            }
4287            Ok(sse_event_value(&event))
4288        }
4289        SseHandleKind::Real(stream) => {
4290            let mut stream = stream.lock().await;
4291            let next = stream.next();
4292            let event = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await {
4293                Ok(Some(Ok(event))) => event,
4294                Ok(Some(Err(error))) => {
4295                    return Err(vm_error(format!("sse_receive: stream error: {error}")));
4296                }
4297                Ok(None) => return Ok(VmValue::Nil),
4298                Err(_) => return Ok(timeout_event()),
4299            };
4300            if let SseEvent::Message(message) = &event {
4301                if message.data.len() > max_message_bytes {
4302                    stream.close();
4303                    return Err(vm_error(format!(
4304                        "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4305                    )));
4306                }
4307            }
4308            Ok(real_sse_event_value(event))
4309        }
4310    }
4311}
4312
4313fn websocket_route_from_options(path: &str, options: &BTreeMap<String, VmValue>) -> WebSocketRoute {
4314    let bearer_token = options.get("auth").and_then(|auth| match auth {
4315        VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4316        other => {
4317            let value = other.display();
4318            (!value.is_empty()).then_some(value)
4319        }
4320    });
4321    WebSocketRoute {
4322        path: path.to_string(),
4323        bearer_token,
4324        max_messages: transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS)
4325            .max(1),
4326        max_message_bytes: transport_limit_option(
4327            options,
4328            "max_message_bytes",
4329            DEFAULT_MAX_MESSAGE_BYTES,
4330        )
4331        .max(1),
4332        send_buffer_messages: transport_limit_option(options, "send_buffer_messages", 64),
4333        idle_timeout_ms: vm_get_int_option(
4334            options,
4335            "idle_timeout_ms",
4336            DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS as i64,
4337        )
4338        .max(0) as u64,
4339    }
4340}
4341
4342fn vm_websocket_server(
4343    bind: &str,
4344    options: &BTreeMap<String, VmValue>,
4345) -> Result<VmValue, VmError> {
4346    let listener = TcpListener::bind(bind)
4347        .map_err(|error| vm_error(format!("websocket_server: bind failed: {error}")))?;
4348    listener
4349        .set_nonblocking(true)
4350        .map_err(|error| vm_error(format!("websocket_server: nonblocking failed: {error}")))?;
4351    let local_addr = listener
4352        .local_addr()
4353        .map_err(|error| vm_error(format!("websocket_server: local addr failed: {error}")))?;
4354    let id = next_transport_handle("websocket-server");
4355    let addr = local_addr.to_string();
4356    let url = format!("ws://{addr}");
4357    let routes = Arc::new(RwLock::new(HashMap::<String, WebSocketRoute>::new()));
4358    if let Some(path) = options
4359        .get("path")
4360        .map(|value| value.display())
4361        .filter(|path| !path.is_empty())
4362    {
4363        if !path.starts_with('/') {
4364            return Err(vm_error("websocket_server: path must start with '/'"));
4365        }
4366        routes
4367            .write()
4368            .expect("websocket routes poisoned")
4369            .insert(path.clone(), websocket_route_from_options(&path, options));
4370    }
4371    let (event_tx, event_rx) = mpsc::channel();
4372    let running = Arc::new(AtomicBool::new(true));
4373    let server_routes = routes.clone();
4374    let server_running = running.clone();
4375    thread::Builder::new()
4376        .name(format!("harn-ws-{id}"))
4377        .spawn(move || websocket_server_loop(listener, server_routes, event_tx, server_running))
4378        .map_err(|error| vm_error(format!("websocket_server: spawn failed: {error}")))?;
4379    WEBSOCKET_SERVERS.with(|servers| {
4380        let mut servers = servers.borrow_mut();
4381        if servers.len() >= MAX_WEBSOCKET_SERVERS {
4382            running.store(false, Ordering::SeqCst);
4383            let _ = TcpStream::connect(&addr);
4384            return Err(vm_error(format!(
4385                "websocket_server: maximum open servers ({MAX_WEBSOCKET_SERVERS}) reached"
4386            )));
4387        }
4388        servers.insert(
4389            id.clone(),
4390            WebSocketServer {
4391                addr: addr.clone(),
4392                routes,
4393                events: Rc::new(tokio::sync::Mutex::new(event_rx)),
4394                running,
4395            },
4396        );
4397        Ok(())
4398    })?;
4399    let mut dict = BTreeMap::new();
4400    dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
4401    dict.insert("addr".to_string(), VmValue::String(Rc::from(addr)));
4402    dict.insert("url".to_string(), VmValue::String(Rc::from(url)));
4403    Ok(VmValue::Dict(Rc::new(dict)))
4404}
4405
4406fn vm_websocket_route(
4407    server_id: &str,
4408    path: &str,
4409    options: &BTreeMap<String, VmValue>,
4410) -> Result<VmValue, VmError> {
4411    let routes = WEBSOCKET_SERVERS.with(|servers| {
4412        servers
4413            .borrow()
4414            .get(server_id)
4415            .map(|server| server.routes.clone())
4416    });
4417    let Some(routes) = routes else {
4418        return Err(vm_error(format!(
4419            "websocket_route: unknown server '{server_id}'"
4420        )));
4421    };
4422    routes.write().expect("websocket routes poisoned").insert(
4423        path.to_string(),
4424        websocket_route_from_options(path, options),
4425    );
4426    Ok(VmValue::Bool(true))
4427}
4428
4429async fn vm_websocket_accept(server_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4430    let receiver = WEBSOCKET_SERVERS.with(|servers| {
4431        servers
4432            .borrow()
4433            .get(server_id)
4434            .map(|server| server.events.clone())
4435    });
4436    let Some(receiver) = receiver else {
4437        return Err(vm_error(format!(
4438            "websocket_accept: unknown server '{server_id}'"
4439        )));
4440    };
4441    let started = std::time::Instant::now();
4442    loop {
4443        let event = {
4444            let receiver = receiver.lock().await;
4445            receiver.try_recv()
4446        };
4447        match event {
4448            Ok(event) => return register_accepted_websocket(event),
4449            Err(mpsc::TryRecvError::Disconnected) => return Ok(VmValue::Nil),
4450            Err(mpsc::TryRecvError::Empty) => {
4451                if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4452                    return Ok(timeout_event());
4453                }
4454                tokio::time::sleep(Duration::from_millis(10)).await;
4455            }
4456        }
4457    }
4458}
4459
4460fn register_accepted_websocket(event: WebSocketServerEvent) -> Result<VmValue, VmError> {
4461    let WebSocketServerEvent {
4462        handle,
4463        path,
4464        peer,
4465        headers,
4466        max_messages,
4467        max_message_bytes,
4468    } = event;
4469    let id = next_transport_handle("websocket");
4470    WEBSOCKET_HANDLES.with(|handles| {
4471        let mut handles = handles.borrow_mut();
4472        if handles.len() >= MAX_WEBSOCKETS {
4473            return Err(vm_error(format!(
4474                "websocket_accept: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4475            )));
4476        }
4477        handles.insert(
4478            id.clone(),
4479            WebSocketHandle {
4480                kind: WebSocketHandleKind::Server(Rc::new(tokio::sync::Mutex::new(handle))),
4481                url: path.clone(),
4482                max_messages,
4483                max_message_bytes,
4484                received: 0,
4485            },
4486        );
4487        Ok(())
4488    })?;
4489    let mut metadata = BTreeMap::new();
4490    metadata.insert("id".to_string(), VmValue::String(Rc::from(id)));
4491    metadata.insert("path".to_string(), VmValue::String(Rc::from(path)));
4492    metadata.insert("peer".to_string(), VmValue::String(Rc::from(peer)));
4493    metadata.insert(
4494        "headers".to_string(),
4495        VmValue::Dict(Rc::new(
4496            headers
4497                .into_iter()
4498                .map(|(name, value)| (name, VmValue::String(Rc::from(value))))
4499                .collect(),
4500        )),
4501    );
4502    Ok(VmValue::Dict(Rc::new(metadata)))
4503}
4504
4505fn vm_websocket_server_close(server_id: &str) -> Result<VmValue, VmError> {
4506    let server = WEBSOCKET_SERVERS.with(|servers| servers.borrow_mut().remove(server_id));
4507    let Some(server) = server else {
4508        return Ok(VmValue::Bool(false));
4509    };
4510    server.running.store(false, Ordering::SeqCst);
4511    let _ = TcpStream::connect(&server.addr);
4512    Ok(VmValue::Bool(true))
4513}
4514
4515fn websocket_server_loop(
4516    listener: TcpListener,
4517    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4518    event_tx: mpsc::Sender<WebSocketServerEvent>,
4519    running: Arc<AtomicBool>,
4520) {
4521    while running.load(Ordering::SeqCst) {
4522        match listener.accept() {
4523            Ok((stream, peer)) => {
4524                let routes = routes.clone();
4525                let event_tx = event_tx.clone();
4526                let running = running.clone();
4527                let peer = peer.to_string();
4528                let _ = thread::Builder::new()
4529                    .name("harn-ws-conn".to_string())
4530                    .spawn(move || {
4531                        websocket_connection_thread(stream, peer, routes, event_tx, running);
4532                    });
4533            }
4534            Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
4535                thread::sleep(Duration::from_millis(10));
4536            }
4537            Err(_) => break,
4538        }
4539    }
4540}
4541
4542fn error_response(status: StatusCode, body: &str) -> ErrorResponse {
4543    let mut response = ErrorResponse::new(Some(body.to_string()));
4544    *response.status_mut() = status;
4545    response
4546}
4547
4548fn route_for_request(
4549    request: &Request,
4550    routes: &Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4551) -> Result<WebSocketRoute, ErrorResponse> {
4552    let path = request.uri().path().to_string();
4553    let Some(route) = routes
4554        .read()
4555        .expect("websocket routes poisoned")
4556        .get(&path)
4557        .cloned()
4558    else {
4559        return Err(error_response(
4560            StatusCode::NOT_FOUND,
4561            "websocket route not found",
4562        ));
4563    };
4564    if let Some(token) = route.bearer_token.as_ref() {
4565        let expected = format!("Bearer {token}");
4566        let authorized = request
4567            .headers()
4568            .get("authorization")
4569            .and_then(|value| value.to_str().ok())
4570            .map(|value| value == expected)
4571            .unwrap_or(false);
4572        if !authorized {
4573            return Err(error_response(
4574                StatusCode::UNAUTHORIZED,
4575                "websocket route unauthorized",
4576            ));
4577        }
4578    }
4579    Ok(route)
4580}
4581
4582fn websocket_connection_thread(
4583    stream: TcpStream,
4584    peer: String,
4585    routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4586    event_tx: mpsc::Sender<WebSocketServerEvent>,
4587    running: Arc<AtomicBool>,
4588) {
4589    let accepted_route = Arc::new(std::sync::Mutex::new(
4590        None::<(WebSocketRoute, BTreeMap<String, String>)>,
4591    ));
4592    let callback_route = accepted_route.clone();
4593    let callback =
4594        move |request: &Request, response: Response| -> Result<Response, ErrorResponse> {
4595            let route = route_for_request(request, &routes)?;
4596            let headers = request
4597                .headers()
4598                .iter()
4599                .filter_map(|(name, value)| {
4600                    value
4601                        .to_str()
4602                        .ok()
4603                        .map(|value| (name.as_str().to_ascii_lowercase(), value.to_string()))
4604                })
4605                .collect::<BTreeMap<_, _>>();
4606            *callback_route
4607                .lock()
4608                .expect("websocket route metadata poisoned") = Some((route, headers));
4609            Ok(response)
4610        };
4611    let Ok(mut socket) = tokio_tungstenite::tungstenite::accept_hdr(stream, callback) else {
4612        return;
4613    };
4614    let Some((route, headers)) = accepted_route
4615        .lock()
4616        .expect("websocket route metadata poisoned")
4617        .clone()
4618    else {
4619        let _ = socket.close(None);
4620        return;
4621    };
4622    let _ = socket
4623        .get_mut()
4624        .set_read_timeout(Some(Duration::from_millis(50)));
4625    let (incoming_tx, incoming_rx) = mpsc::channel();
4626    let (outgoing_tx, outgoing_rx) = mpsc::sync_channel(route.send_buffer_messages);
4627    let event = WebSocketServerEvent {
4628        handle: ServerWebSocket {
4629            incoming: VecDeque::new(),
4630            incoming_rx,
4631            outgoing_tx,
4632            closed: false,
4633        },
4634        path: route.path.clone(),
4635        peer,
4636        headers,
4637        max_messages: route.max_messages,
4638        max_message_bytes: route.max_message_bytes,
4639    };
4640    if event_tx.send(event).is_err() {
4641        let _ = socket.close(None);
4642        return;
4643    }
4644    let mut last_activity = std::time::Instant::now();
4645    while running.load(Ordering::SeqCst) {
4646        while let Ok(command) = outgoing_rx.try_recv() {
4647            match command {
4648                ServerWebSocketCommand::Send(message) => {
4649                    let Ok(message) = real_ws_message(&message) else {
4650                        continue;
4651                    };
4652                    if socket.send(message).is_err() {
4653                        return;
4654                    }
4655                    last_activity = std::time::Instant::now();
4656                }
4657                ServerWebSocketCommand::Close(code, reason) => {
4658                    let _ = socket.close(close_frame(code, reason));
4659                    return;
4660                }
4661            }
4662        }
4663        if route.idle_timeout_ms > 0
4664            && last_activity.elapsed() >= Duration::from_millis(route.idle_timeout_ms)
4665        {
4666            let _ = socket.close(close_frame(Some(1001), Some("idle timeout".to_string())));
4667            let _ = incoming_tx.send(MockWsMessage {
4668                message_type: "close".to_string(),
4669                data: Vec::new(),
4670                close_code: Some(1001),
4671                close_reason: Some("idle timeout".to_string()),
4672            });
4673            return;
4674        }
4675        match socket.read() {
4676            Ok(message) => {
4677                last_activity = std::time::Instant::now();
4678                if incoming_tx
4679                    .send(mock_ws_message_from_real(message))
4680                    .is_err()
4681                {
4682                    return;
4683                }
4684            }
4685            Err(tokio_tungstenite::tungstenite::Error::Io(error))
4686                if error.kind() == std::io::ErrorKind::WouldBlock
4687                    || error.kind() == std::io::ErrorKind::TimedOut =>
4688            {
4689                continue;
4690            }
4691            Err(_) => {
4692                let _ = incoming_tx.send(MockWsMessage {
4693                    message_type: "close".to_string(),
4694                    data: Vec::new(),
4695                    close_code: None,
4696                    close_reason: None,
4697                });
4698                return;
4699            }
4700        }
4701    }
4702    let _ = socket.get_mut().shutdown(Shutdown::Both);
4703}
4704
4705async fn vm_websocket_connect(
4706    url: &str,
4707    options: &BTreeMap<String, VmValue>,
4708) -> Result<VmValue, VmError> {
4709    let id = next_transport_handle("websocket");
4710    let max_messages =
4711        transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS).max(1);
4712    let max_message_bytes =
4713        transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4714
4715    if let Some((messages, echo)) = consume_websocket_mock(url) {
4716        let handle = WebSocketHandle {
4717            kind: WebSocketHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeWebSocket {
4718                messages: messages.into(),
4719                echo,
4720                closed: false,
4721            }))),
4722            url: url.to_string(),
4723            max_messages,
4724            max_message_bytes,
4725            received: 0,
4726        };
4727        WEBSOCKET_HANDLES.with(|handles| {
4728            let mut handles = handles.borrow_mut();
4729            if handles.len() >= MAX_WEBSOCKETS {
4730                return Err(vm_error(format!(
4731                    "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4732                )));
4733            }
4734            handles.insert(id.clone(), handle);
4735            Ok(())
4736        })?;
4737        record_transport_call(TransportMockCall {
4738            kind: "websocket_connect".to_string(),
4739            handle: Some(id.clone()),
4740            url: url.to_string(),
4741            message_type: None,
4742            data: None,
4743        });
4744        return Ok(VmValue::String(Rc::from(id)));
4745    }
4746
4747    if !url.starts_with("ws://") && !url.starts_with("wss://") {
4748        return Err(vm_error(format!(
4749            "websocket_connect: URL must start with ws:// or wss://, got '{url}'"
4750        )));
4751    }
4752    crate::egress::enforce_url_allowed("websocket_connect", url).await?;
4753    let timeout_ms = vm_get_int_option_prefer(
4754        options,
4755        "timeout_ms",
4756        "timeout",
4757        DEFAULT_TIMEOUT_MS as i64,
4758    )
4759    .max(0) as u64;
4760    let request = websocket_client_request(url, options)?;
4761    let connect = tokio_tungstenite::connect_async(request);
4762    let (socket, _) = tokio::time::timeout(Duration::from_millis(timeout_ms), connect)
4763        .await
4764        .map_err(|_| vm_error(format!("websocket_connect: timed out after {timeout_ms}ms")))?
4765        .map_err(|error| vm_error(format!("websocket_connect: failed: {error}")))?;
4766    let handle = WebSocketHandle {
4767        kind: WebSocketHandleKind::Real(Rc::new(tokio::sync::Mutex::new(socket))),
4768        url: url.to_string(),
4769        max_messages,
4770        max_message_bytes,
4771        received: 0,
4772    };
4773    WEBSOCKET_HANDLES.with(|handles| {
4774        let mut handles = handles.borrow_mut();
4775        if handles.len() >= MAX_WEBSOCKETS {
4776            return Err(vm_error(format!(
4777                "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4778            )));
4779        }
4780        handles.insert(id.clone(), handle);
4781        Ok(())
4782    })?;
4783    Ok(VmValue::String(Rc::from(id)))
4784}
4785
4786fn websocket_message_from_vm(
4787    value: VmValue,
4788    options: &BTreeMap<String, VmValue>,
4789) -> Result<MockWsMessage, VmError> {
4790    let message_type = options
4791        .get("type")
4792        .map(|value| value.display())
4793        .filter(|value| !value.is_empty())
4794        .unwrap_or_else(|| match value {
4795            VmValue::Bytes(_) => "binary".to_string(),
4796            _ => "text".to_string(),
4797        });
4798    let data = match value {
4799        VmValue::Bytes(bytes) => bytes.as_ref().clone(),
4800        other
4801            if options
4802                .get("base64")
4803                .and_then(|value| match value {
4804                    VmValue::Bool(value) => Some(*value),
4805                    _ => None,
4806                })
4807                .unwrap_or(false) =>
4808        {
4809            use base64::Engine;
4810            base64::engine::general_purpose::STANDARD
4811                .decode(other.display())
4812                .map_err(|error| vm_error(format!("websocket_send: invalid base64: {error}")))?
4813        }
4814        other => other.display().into_bytes(),
4815    };
4816    Ok(MockWsMessage {
4817        message_type,
4818        data,
4819        close_code: options
4820            .get("code")
4821            .or_else(|| options.get("close_code"))
4822            .and_then(|value| value.as_int())
4823            .map(|value| value as u16),
4824        close_reason: options
4825            .get("reason")
4826            .or_else(|| options.get("close_reason"))
4827            .map(|value| value.display()),
4828    })
4829}
4830
4831fn websocket_client_request(
4832    url: &str,
4833    options: &BTreeMap<String, VmValue>,
4834) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, VmError> {
4835    let mut request = url
4836        .into_client_request()
4837        .map_err(|error| vm_error(format!("websocket_connect: invalid request: {error}")))?;
4838    if let Some(headers) = options.get("headers").and_then(|value| value.as_dict()) {
4839        for (name, value) in headers {
4840            let header_name = tokio_tungstenite::tungstenite::http::header::HeaderName::from_bytes(
4841                name.as_bytes(),
4842            )
4843            .map_err(|error| {
4844                vm_error(format!(
4845                    "websocket_connect: invalid header name '{name}': {error}"
4846                ))
4847            })?;
4848            let header_value = HeaderValue::from_str(&value.display()).map_err(|error| {
4849                vm_error(format!(
4850                    "websocket_connect: invalid header value for '{name}': {error}"
4851                ))
4852            })?;
4853            request.headers_mut().insert(header_name, header_value);
4854        }
4855    }
4856    if let Some(auth) = options.get("auth") {
4857        let bearer = match auth {
4858            VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4859            other => Some(other.display()),
4860        };
4861        if let Some(token) = bearer.filter(|token| !token.is_empty()) {
4862            let value = HeaderValue::from_str(&format!("Bearer {token}")).map_err(|error| {
4863                vm_error(format!(
4864                    "websocket_connect: invalid authorization header: {error}"
4865                ))
4866            })?;
4867            request.headers_mut().insert("authorization", value);
4868        }
4869    }
4870    Ok(request)
4871}
4872
4873fn close_frame(code: Option<u16>, reason: Option<String>) -> Option<CloseFrame> {
4874    code.or(reason.as_ref().map(|_| 1000))
4875        .map(|code| CloseFrame {
4876            code: CloseCode::from(code),
4877            reason: reason.unwrap_or_default().into(),
4878        })
4879}
4880
4881fn real_ws_message(message: &MockWsMessage) -> Result<WsMessage, VmError> {
4882    match message.message_type.as_str() {
4883        "text" => Ok(WsMessage::Text(
4884            String::from_utf8(message.data.clone())
4885                .map_err(|error| vm_error(format!("websocket_send: text is not UTF-8: {error}")))?
4886                .into(),
4887        )),
4888        "binary" => Ok(WsMessage::Binary(message.data.clone().into())),
4889        "ping" => Ok(WsMessage::Ping(message.data.clone().into())),
4890        "pong" => Ok(WsMessage::Pong(message.data.clone().into())),
4891        "close" => Ok(WsMessage::Close(close_frame(
4892            message.close_code,
4893            message.close_reason.clone(),
4894        ))),
4895        other => Err(vm_error(format!(
4896            "websocket_send: unsupported message type '{other}'"
4897        ))),
4898    }
4899}
4900
4901fn mock_ws_message_from_real(message: WsMessage) -> MockWsMessage {
4902    match message {
4903        WsMessage::Text(text) => MockWsMessage {
4904            message_type: "text".to_string(),
4905            data: text.as_bytes().to_vec(),
4906            close_code: None,
4907            close_reason: None,
4908        },
4909        WsMessage::Binary(bytes) => MockWsMessage {
4910            message_type: "binary".to_string(),
4911            data: bytes.to_vec(),
4912            close_code: None,
4913            close_reason: None,
4914        },
4915        WsMessage::Ping(bytes) => MockWsMessage {
4916            message_type: "ping".to_string(),
4917            data: bytes.to_vec(),
4918            close_code: None,
4919            close_reason: None,
4920        },
4921        WsMessage::Pong(bytes) => MockWsMessage {
4922            message_type: "pong".to_string(),
4923            data: bytes.to_vec(),
4924            close_code: None,
4925            close_reason: None,
4926        },
4927        WsMessage::Close(frame) => {
4928            let (close_code, close_reason) = frame
4929                .map(|frame| (Some(u16::from(frame.code)), Some(frame.reason.to_string())))
4930                .unwrap_or((None, None));
4931            MockWsMessage {
4932                message_type: "close".to_string(),
4933                data: Vec::new(),
4934                close_code,
4935                close_reason,
4936            }
4937        }
4938        WsMessage::Frame(_) => MockWsMessage {
4939            message_type: "close".to_string(),
4940            data: Vec::new(),
4941            close_code: None,
4942            close_reason: None,
4943        },
4944    }
4945}
4946
4947async fn vm_websocket_send(
4948    socket_id: &str,
4949    value: VmValue,
4950    options: &BTreeMap<String, VmValue>,
4951) -> Result<VmValue, VmError> {
4952    let message = websocket_message_from_vm(value, options)?;
4953    let socket = WEBSOCKET_HANDLES.with(|handles| {
4954        let handles = handles.borrow();
4955        let handle = handles.get(socket_id)?;
4956        let url = handle.url.clone();
4957        let max_message_bytes = handle.max_message_bytes;
4958        let kind = match &handle.kind {
4959            WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4960            WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4961            WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4962        };
4963        Some((kind, url, max_message_bytes))
4964    });
4965    let Some((kind, url, max_message_bytes)) = socket else {
4966        return Err(vm_error(format!(
4967            "websocket_send: unknown socket '{socket_id}'"
4968        )));
4969    };
4970    if message.data.len() > max_message_bytes {
4971        return Err(vm_error(format!(
4972            "websocket_send: message exceeded max_message_bytes ({max_message_bytes})"
4973        )));
4974    }
4975    match kind {
4976        WebSocketHandleKind::Fake(socket) => {
4977            let mut socket = socket.lock().await;
4978            if socket.closed {
4979                return Ok(VmValue::Bool(false));
4980            }
4981            if message.message_type == "close" {
4982                socket.closed = true;
4983            } else if socket.echo {
4984                socket.messages.push_back(message.clone());
4985            }
4986            record_transport_call(TransportMockCall {
4987                kind: "websocket_send".to_string(),
4988                handle: Some(socket_id.to_string()),
4989                url,
4990                message_type: Some(message.message_type.clone()),
4991                data: Some(ws_message_data(&message)),
4992            });
4993            Ok(VmValue::Bool(true))
4994        }
4995        WebSocketHandleKind::Real(socket) => {
4996            let mut socket = socket.lock().await;
4997            socket
4998                .send(real_ws_message(&message)?)
4999                .await
5000                .map_err(|error| vm_error(format!("websocket_send: failed: {error}")))?;
5001            Ok(VmValue::Bool(true))
5002        }
5003        WebSocketHandleKind::Server(socket) => {
5004            let mut socket = socket.lock().await;
5005            if socket.closed {
5006                return Ok(VmValue::Bool(false));
5007            }
5008            let command = if message.message_type == "close" {
5009                socket.closed = true;
5010                ServerWebSocketCommand::Close(message.close_code, message.close_reason.clone())
5011            } else {
5012                ServerWebSocketCommand::Send(message.clone())
5013            };
5014            socket
5015                .outgoing_tx
5016                .try_send(command)
5017                .map_err(|error| match error {
5018                    mpsc::TrySendError::Full(_) => vm_error("websocket_send: send buffer full"),
5019                    mpsc::TrySendError::Disconnected(_) => {
5020                        vm_error("websocket_send: connection closed")
5021                    }
5022                })?;
5023            Ok(VmValue::Bool(true))
5024        }
5025    }
5026}
5027
5028async fn vm_websocket_receive(socket_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
5029    let socket = WEBSOCKET_HANDLES.with(|handles| {
5030        let mut handles = handles.borrow_mut();
5031        let handle = handles.get_mut(socket_id)?;
5032        if handle.received >= handle.max_messages {
5033            return Some(Err(vm_error(format!(
5034                "websocket_receive: socket '{socket_id}' exceeded max_messages"
5035            ))));
5036        }
5037        handle.received += 1;
5038        let max_message_bytes = handle.max_message_bytes;
5039        let kind = match &handle.kind {
5040            WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
5041            WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
5042            WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
5043        };
5044        Some(Ok((kind, max_message_bytes)))
5045    });
5046    let Some(socket) = socket else {
5047        return Err(vm_error(format!(
5048            "websocket_receive: unknown socket '{socket_id}'"
5049        )));
5050    };
5051    let (kind, max_message_bytes) = socket?;
5052    match kind {
5053        WebSocketHandleKind::Fake(socket) => {
5054            let mut socket = socket.lock().await;
5055            if socket.closed {
5056                return Ok(VmValue::Nil);
5057            }
5058            let Some(message) = socket.messages.pop_front() else {
5059                return Ok(timeout_event());
5060            };
5061            if message.data.len() > max_message_bytes {
5062                socket.closed = true;
5063                return Err(vm_error(format!(
5064                    "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5065                )));
5066            }
5067            if message.message_type == "close" {
5068                socket.closed = true;
5069            }
5070            Ok(ws_event_value(message))
5071        }
5072        WebSocketHandleKind::Real(socket) => {
5073            let mut socket = socket.lock().await;
5074            let next = socket.next();
5075            let message = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await
5076            {
5077                Ok(Some(Ok(message))) => message,
5078                Ok(Some(Err(error))) => {
5079                    return Err(vm_error(format!("websocket_receive: failed: {error}")));
5080                }
5081                Ok(None) => return Ok(VmValue::Nil),
5082                Err(_) => return Ok(timeout_event()),
5083            };
5084            match &message {
5085                WsMessage::Text(text) if text.len() > max_message_bytes => {
5086                    return Err(vm_error(format!(
5087                        "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5088                    )));
5089                }
5090                WsMessage::Binary(bytes) | WsMessage::Ping(bytes) | WsMessage::Pong(bytes)
5091                    if bytes.len() > max_message_bytes =>
5092                {
5093                    return Err(vm_error(format!(
5094                        "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5095                    )));
5096                }
5097                _ => {}
5098            }
5099            Ok(real_ws_event_value(message))
5100        }
5101        WebSocketHandleKind::Server(socket) => {
5102            let started = std::time::Instant::now();
5103            loop {
5104                let message = {
5105                    let mut socket = socket.lock().await;
5106                    if socket.closed {
5107                        return Ok(VmValue::Nil);
5108                    }
5109                    while let Ok(message) = socket.incoming_rx.try_recv() {
5110                        socket.incoming.push_back(message);
5111                    }
5112                    socket.incoming.pop_front()
5113                };
5114                if let Some(message) = message {
5115                    if message.data.len() > max_message_bytes {
5116                        let mut socket = socket.lock().await;
5117                        socket.closed = true;
5118                        let _ = socket.outgoing_tx.try_send(ServerWebSocketCommand::Close(
5119                            Some(1009),
5120                            Some("message too large".to_string()),
5121                        ));
5122                        return Err(vm_error(format!(
5123                            "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5124                        )));
5125                    }
5126                    if message.message_type == "close" {
5127                        let mut socket = socket.lock().await;
5128                        socket.closed = true;
5129                    }
5130                    return Ok(ws_event_value(message));
5131                }
5132                if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
5133                    return Ok(timeout_event());
5134                }
5135                tokio::time::sleep(Duration::from_millis(10)).await;
5136            }
5137        }
5138    }
5139}
5140
5141async fn vm_websocket_close(socket_id: &str) -> Result<VmValue, VmError> {
5142    let removed = WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().remove(socket_id));
5143    let Some(handle) = removed else {
5144        return Ok(VmValue::Bool(false));
5145    };
5146    match handle.kind {
5147        WebSocketHandleKind::Fake(socket) => {
5148            socket.lock().await.closed = true;
5149            record_transport_call(TransportMockCall {
5150                kind: "websocket_close".to_string(),
5151                handle: Some(socket_id.to_string()),
5152                url: handle.url,
5153                message_type: None,
5154                data: None,
5155            });
5156            Ok(VmValue::Bool(true))
5157        }
5158        WebSocketHandleKind::Real(socket) => {
5159            let mut socket = socket.lock().await;
5160            socket
5161                .close(None)
5162                .await
5163                .map_err(|error| vm_error(format!("websocket_close: failed: {error}")))?;
5164            Ok(VmValue::Bool(true))
5165        }
5166        WebSocketHandleKind::Server(socket) => {
5167            let mut socket = socket.lock().await;
5168            socket.closed = true;
5169            let _ = socket
5170                .outgoing_tx
5171                .try_send(ServerWebSocketCommand::Close(None, None));
5172            Ok(VmValue::Bool(true))
5173        }
5174    }
5175}
5176
5177#[cfg(test)]
5178mod tests {
5179    use super::{
5180        compute_retry_delay, handle_from_value, http_mock_calls_snapshot, parse_retry_after_value,
5181        push_http_mock, reset_http_state, vm_execute_http_request, vm_http_download,
5182        vm_http_stream_info, vm_http_stream_open, vm_http_stream_read, vm_sse_event_frame,
5183        vm_sse_server_cancel, vm_sse_server_heartbeat, vm_sse_server_mock_disconnect,
5184        vm_sse_server_mock_receive, vm_sse_server_observed_bool, vm_sse_server_response,
5185        vm_sse_server_send, HttpMockResponse,
5186    };
5187    use crate::connectors::test_util::{
5188        accept_http_connection, read_http_request, write_http_response,
5189    };
5190    use crate::value::VmValue;
5191    use base64::Engine;
5192    use rcgen::generate_simple_self_signed;
5193    use rustls::pki_types::PrivatePkcs8KeyDer;
5194    use rustls::{ServerConfig, ServerConnection, StreamOwned};
5195    use sha2::{Digest, Sha256};
5196    use std::collections::BTreeMap;
5197    use std::io::{Read, Write};
5198    use std::net::TcpListener;
5199    use std::rc::Rc;
5200    use std::sync::{Arc, Once};
5201    use std::time::{Duration, SystemTime};
5202    use tempfile::TempDir;
5203    use x509_parser::prelude::{FromDer, X509Certificate};
5204
5205    fn expect_bool(value: VmValue) -> bool {
5206        let VmValue::Bool(value) = value else {
5207            panic!("expected bool, got {}", value.display());
5208        };
5209        value
5210    }
5211
5212    #[test]
5213    fn parses_retry_after_delta_seconds() {
5214        assert_eq!(parse_retry_after_value("5"), Some(Duration::from_secs(5)));
5215    }
5216
5217    #[test]
5218    fn parses_retry_after_http_date() {
5219        let header = httpdate::fmt_http_date(SystemTime::now() + Duration::from_secs(2));
5220        let parsed = parse_retry_after_value(&header).expect("http-date should parse");
5221        assert!(parsed <= Duration::from_secs(2));
5222        assert!(parsed <= Duration::from_secs(60));
5223    }
5224
5225    #[test]
5226    fn malformed_retry_after_returns_none() {
5227        assert_eq!(parse_retry_after_value("soon-ish"), None);
5228    }
5229
5230    #[test]
5231    fn retry_delay_honors_retry_after_floor() {
5232        let delay = compute_retry_delay(0, 1, Some(Duration::from_millis(250)));
5233        assert!(delay >= Duration::from_millis(250));
5234        assert!(delay <= Duration::from_secs(60));
5235    }
5236
5237    #[tokio::test]
5238    async fn typed_mock_api_drives_http_request_retries() {
5239        reset_http_state();
5240        push_http_mock(
5241            "GET",
5242            "https://api.example.com/retry",
5243            vec![
5244                HttpMockResponse::new(503, "busy").with_header("retry-after", "0"),
5245                HttpMockResponse::new(200, "ok"),
5246            ],
5247        );
5248        let result = vm_execute_http_request(
5249            "GET",
5250            "https://api.example.com/retry",
5251            &BTreeMap::from([
5252                ("retries".to_string(), VmValue::Int(1)),
5253                ("backoff".to_string(), VmValue::Int(0)),
5254            ]),
5255        )
5256        .await
5257        .expect("mocked request should succeed after retry");
5258
5259        let dict = result.as_dict().expect("response dict");
5260        assert_eq!(dict["status"].as_int(), Some(200));
5261        let calls = http_mock_calls_snapshot();
5262        assert_eq!(calls.len(), 2);
5263        assert_eq!(calls[0].url, "https://api.example.com/retry");
5264        reset_http_state();
5265    }
5266
5267    #[tokio::test]
5268    async fn multipart_requests_are_mock_visible() {
5269        reset_http_state();
5270        push_http_mock(
5271            "POST",
5272            "https://api.example.com/upload",
5273            vec![HttpMockResponse::new(201, "uploaded")],
5274        );
5275        let options = BTreeMap::from([(
5276            "multipart".to_string(),
5277            VmValue::List(Rc::new(vec![
5278                VmValue::Dict(Rc::new(BTreeMap::from([
5279                    ("name".to_string(), VmValue::String(Rc::from("meta"))),
5280                    ("value".to_string(), VmValue::String(Rc::from("hello"))),
5281                ]))),
5282                VmValue::Dict(Rc::new(BTreeMap::from([
5283                    ("name".to_string(), VmValue::String(Rc::from("blob"))),
5284                    (
5285                        "filename".to_string(),
5286                        VmValue::String(Rc::from("blob.bin")),
5287                    ),
5288                    (
5289                        "content_type".to_string(),
5290                        VmValue::String(Rc::from("application/octet-stream")),
5291                    ),
5292                    (
5293                        "value".to_string(),
5294                        VmValue::Bytes(Rc::new(vec![0, 1, 2, 3])),
5295                    ),
5296                ]))),
5297            ])),
5298        )]);
5299
5300        let response = vm_execute_http_request("POST", "https://api.example.com/upload", &options)
5301            .await
5302            .expect("multipart mock request should succeed");
5303        let response = response.as_dict().expect("response dict");
5304        assert_eq!(response["status"].as_int(), Some(201));
5305
5306        let calls = http_mock_calls_snapshot();
5307        assert_eq!(calls.len(), 1);
5308        assert!(calls[0]
5309            .headers
5310            .get("content-type")
5311            .expect("content-type recorded")
5312            .contains("multipart/form-data"));
5313        let body = calls[0].body.as_deref().expect("multipart body recorded");
5314        assert!(body.contains("name=\"meta\""));
5315        assert!(body.contains("hello"));
5316        assert!(body.contains("filename=\"blob.bin\""));
5317        reset_http_state();
5318    }
5319
5320    #[tokio::test]
5321    async fn http_stream_mock_reads_in_chunks() {
5322        reset_http_state();
5323        push_http_mock(
5324            "GET",
5325            "https://api.example.com/stream",
5326            vec![HttpMockResponse::new(200, "stream-body")],
5327        );
5328
5329        let handle = vm_http_stream_open("https://api.example.com/stream", &BTreeMap::new())
5330            .await
5331            .expect("stream open");
5332        let stream_id = handle.display();
5333        let info = vm_http_stream_info(&stream_id).expect("stream info");
5334        let info = info.as_dict().expect("info dict");
5335        assert_eq!(info["status"].as_int(), Some(200));
5336
5337        let first = vm_http_stream_read(&stream_id, 6)
5338            .await
5339            .expect("first chunk");
5340        let second = vm_http_stream_read(&stream_id, 64)
5341            .await
5342            .expect("second chunk");
5343        let end = vm_http_stream_read(&stream_id, 64)
5344            .await
5345            .expect("end marker");
5346        assert_eq!(first.as_bytes().expect("bytes"), b"stream");
5347        assert_eq!(second.as_bytes().expect("bytes"), b"-body");
5348        assert!(matches!(end, VmValue::Nil));
5349        reset_http_state();
5350    }
5351
5352    #[tokio::test]
5353    async fn http_download_mock_writes_file() {
5354        reset_http_state();
5355        let temp = TempDir::new().expect("tempdir");
5356        let path = temp.path().join("download.bin");
5357        push_http_mock(
5358            "GET",
5359            "https://api.example.com/download",
5360            vec![HttpMockResponse::new(200, "downloaded")],
5361        );
5362
5363        let response = vm_http_download(
5364            "https://api.example.com/download",
5365            &path.display().to_string(),
5366            &BTreeMap::new(),
5367        )
5368        .await
5369        .expect("download response");
5370        let response = response.as_dict().expect("response dict");
5371        assert_eq!(response["bytes_written"].as_int(), Some(10));
5372        assert_eq!(
5373            std::fs::read_to_string(path).expect("downloaded file"),
5374            "downloaded"
5375        );
5376        reset_http_state();
5377    }
5378
5379    #[tokio::test]
5380    async fn http_proxy_routes_requests_through_configured_proxy() {
5381        reset_http_state();
5382        let listener = TcpListener::bind("127.0.0.1:0").expect("bind proxy listener");
5383        let proxy_addr = listener.local_addr().expect("proxy addr");
5384        let proxy_thread = std::thread::spawn(move || {
5385            let mut stream = accept_http_connection(&listener, "proxy listener");
5386            let request = read_http_request(&mut stream);
5387            assert_eq!(request.method, "GET");
5388            assert_eq!(request.path, "http://example.invalid/proxy-check");
5389            assert_eq!(
5390                request
5391                    .headers
5392                    .get("proxy-authorization")
5393                    .map(String::as_str),
5394                Some("Basic dXNlcjpwYXNz")
5395            );
5396            write_http_response(
5397                &mut stream,
5398                200,
5399                &[("content-type", "text/plain".to_string())],
5400                "proxied",
5401            );
5402        });
5403
5404        let options = BTreeMap::from([
5405            (
5406                "proxy".to_string(),
5407                VmValue::String(Rc::from(format!("http://{proxy_addr}"))),
5408            ),
5409            (
5410                "proxy_auth".to_string(),
5411                VmValue::Dict(Rc::new(BTreeMap::from([
5412                    ("user".to_string(), VmValue::String(Rc::from("user"))),
5413                    ("pass".to_string(), VmValue::String(Rc::from("pass"))),
5414                ]))),
5415            ),
5416            ("timeout_ms".to_string(), VmValue::Int(1_000)),
5417        ]);
5418
5419        let response =
5420            vm_execute_http_request("GET", "http://example.invalid/proxy-check", &options)
5421                .await
5422                .expect("proxied response");
5423        let response = response.as_dict().expect("response dict");
5424        assert_eq!(response["status"].as_int(), Some(200));
5425        assert_eq!(response["body"].display(), "proxied");
5426        proxy_thread.join().expect("proxy thread");
5427        reset_http_state();
5428    }
5429
5430    #[tokio::test]
5431    async fn custom_tls_ca_bundle_and_pin_allow_request() {
5432        reset_http_state();
5433        install_rustls_provider();
5434        let temp = TempDir::new().expect("tempdir");
5435        let cert =
5436            generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5437                .expect("generate cert");
5438        let cert_pem = cert.cert.pem();
5439        let cert_path = temp.path().join("cert.pem");
5440        std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5441        let pin = spki_pin_base64(cert.cert.der().as_ref());
5442
5443        let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5444        let port = listener.local_addr().expect("tls addr").port();
5445        let server_config = Arc::new(
5446            ServerConfig::builder()
5447                .with_no_client_auth()
5448                .with_single_cert(
5449                    vec![cert.cert.der().clone()],
5450                    PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5451                )
5452                .expect("build tls config"),
5453        );
5454        let thread = std::thread::spawn(move || {
5455            let (tcp, _) = listener.accept().expect("accept tls client");
5456            let conn = ServerConnection::new(server_config).expect("server connection");
5457            let mut stream = StreamOwned::new(conn, tcp);
5458            let request = read_http_request_generic(&mut stream);
5459            assert!(request.starts_with("GET /secure HTTP/1.1\r\n"));
5460            write_http_response_generic(
5461                &mut stream,
5462                200,
5463                &[("content-type", "text/plain".to_string())],
5464                "secure",
5465            );
5466        });
5467
5468        let options = BTreeMap::from([(
5469            "tls".to_string(),
5470            VmValue::Dict(Rc::new(BTreeMap::from([
5471                (
5472                    "ca_bundle_path".to_string(),
5473                    VmValue::String(Rc::from(cert_path.display().to_string())),
5474                ),
5475                (
5476                    "pinned_sha256".to_string(),
5477                    VmValue::List(Rc::new(vec![VmValue::String(Rc::from(pin))])),
5478                ),
5479            ]))),
5480        )]);
5481        let response =
5482            vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5483                .await
5484                .expect("tls request should succeed");
5485        let response = response.as_dict().expect("response dict");
5486        assert_eq!(response["body"].display(), "secure");
5487        thread.join().expect("tls thread");
5488        reset_http_state();
5489    }
5490
5491    #[tokio::test]
5492    async fn custom_tls_pin_mismatch_is_rejected() {
5493        reset_http_state();
5494        install_rustls_provider();
5495        let temp = TempDir::new().expect("tempdir");
5496        let cert =
5497            generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5498                .expect("generate cert");
5499        let cert_pem = cert.cert.pem();
5500        let cert_path = temp.path().join("cert.pem");
5501        std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5502
5503        let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5504        let port = listener.local_addr().expect("tls addr").port();
5505        let server_config = Arc::new(
5506            ServerConfig::builder()
5507                .with_no_client_auth()
5508                .with_single_cert(
5509                    vec![cert.cert.der().clone()],
5510                    PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5511                )
5512                .expect("build tls config"),
5513        );
5514        let thread = std::thread::spawn(move || {
5515            let (tcp, _) = listener.accept().expect("accept tls client");
5516            let conn = ServerConnection::new(server_config).expect("server connection");
5517            let mut stream = StreamOwned::new(conn, tcp);
5518            let _ = read_http_request_generic(&mut stream);
5519            write_http_response_generic(
5520                &mut stream,
5521                200,
5522                &[("content-type", "text/plain".to_string())],
5523                "secure",
5524            );
5525        });
5526
5527        let options = BTreeMap::from([(
5528            "tls".to_string(),
5529            VmValue::Dict(Rc::new(BTreeMap::from([
5530                (
5531                    "ca_bundle_path".to_string(),
5532                    VmValue::String(Rc::from(cert_path.display().to_string())),
5533                ),
5534                (
5535                    "pinned_sha256".to_string(),
5536                    VmValue::List(Rc::new(vec![VmValue::String(Rc::from("deadbeef"))])),
5537                ),
5538            ]))),
5539        )]);
5540        let error =
5541            vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5542                .await
5543                .expect_err("pin mismatch should fail");
5544        let message = error.to_string();
5545        assert!(message.contains("TLS SPKI pin mismatch"), "{message}");
5546        thread.join().expect("tls thread");
5547        reset_http_state();
5548    }
5549
5550    fn install_rustls_provider() {
5551        static INSTALL: Once = Once::new();
5552        INSTALL.call_once(|| {
5553            let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
5554        });
5555    }
5556
5557    fn spki_pin_base64(cert_der: &[u8]) -> String {
5558        let (_, cert) = X509Certificate::from_der(cert_der).expect("parse cert");
5559        base64::engine::general_purpose::STANDARD
5560            .encode(Sha256::digest(cert.tbs_certificate.subject_pki.raw))
5561    }
5562
5563    fn read_http_request_generic<T: Read>(stream: &mut T) -> String {
5564        let mut buffer = Vec::new();
5565        let mut chunk = [0u8; 4096];
5566        loop {
5567            let read = stream.read(&mut chunk).expect("read request");
5568            assert!(read > 0, "request closed before headers");
5569            buffer.extend_from_slice(&chunk[..read]);
5570            if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
5571                return String::from_utf8_lossy(&buffer).into_owned();
5572            }
5573        }
5574    }
5575
5576    fn write_http_response_generic<T: Write>(
5577        stream: &mut T,
5578        status: u16,
5579        headers: &[(&str, String)],
5580        body: &str,
5581    ) {
5582        let mut response = format!(
5583            "HTTP/1.1 {status} OK\r\ncontent-length: {}\r\nconnection: close\r\n",
5584            body.len()
5585        );
5586        for (name, value) in headers {
5587            response.push_str(name);
5588            response.push_str(": ");
5589            response.push_str(value);
5590            response.push_str("\r\n");
5591        }
5592        response.push_str("\r\n");
5593        response.push_str(body);
5594        stream
5595            .write_all(response.as_bytes())
5596            .expect("write response");
5597        stream.flush().expect("flush response");
5598    }
5599
5600    #[test]
5601    fn formats_sse_event_fields_and_multiline_data() {
5602        let frame = vm_sse_event_frame(
5603            &VmValue::Dict(Rc::new(BTreeMap::from([
5604                ("event".to_string(), VmValue::String(Rc::from("progress"))),
5605                ("data".to_string(), VmValue::String(Rc::from("one\ntwo"))),
5606                ("id".to_string(), VmValue::String(Rc::from("evt-1"))),
5607                ("retry_ms".to_string(), VmValue::Int(2500)),
5608            ]))),
5609            &BTreeMap::new(),
5610        )
5611        .expect("event frame");
5612        assert_eq!(
5613            frame,
5614            "id: evt-1\nevent: progress\nretry: 2500\ndata: one\ndata: two\n\n"
5615        );
5616    }
5617
5618    #[test]
5619    fn rejects_sse_event_control_fields_with_newlines() {
5620        let err = vm_sse_event_frame(
5621            &VmValue::Dict(Rc::new(BTreeMap::from([(
5622                "event".to_string(),
5623                VmValue::String(Rc::from("bad\nname")),
5624            )]))),
5625            &BTreeMap::new(),
5626        )
5627        .expect_err("newline should reject");
5628        assert!(err.to_string().contains("event must not contain newlines"));
5629    }
5630
5631    #[test]
5632    fn server_sse_mock_client_observes_heartbeat_disconnect_and_cancel() {
5633        reset_http_state();
5634        let response = vm_sse_server_response(&BTreeMap::from([(
5635            "max_buffered_events".to_string(),
5636            VmValue::Int(4),
5637        )]))
5638        .expect("response");
5639        let stream_id = handle_from_value(&response, "test").expect("handle");
5640
5641        assert!(expect_bool(
5642            vm_sse_server_send(
5643                &stream_id,
5644                &VmValue::Dict(Rc::new(BTreeMap::from([
5645                    ("event".to_string(), VmValue::String(Rc::from("progress")),),
5646                    ("data".to_string(), VmValue::String(Rc::from("50"))),
5647                ]))),
5648                &BTreeMap::new(),
5649            )
5650            .expect("send")
5651        ));
5652        assert!(expect_bool(
5653            vm_sse_server_heartbeat(&stream_id, Some(&VmValue::String(Rc::from("tick"))))
5654                .expect("heartbeat")
5655        ));
5656
5657        let first = vm_sse_server_mock_receive(&stream_id).expect("first");
5658        let first = first.as_dict().expect("first dict");
5659        assert_eq!(first["event"].display(), "progress");
5660        assert_eq!(first["data"].display(), "50");
5661        let heartbeat = vm_sse_server_mock_receive(&stream_id).expect("heartbeat read");
5662        let heartbeat = heartbeat.as_dict().expect("heartbeat dict");
5663        assert_eq!(heartbeat["type"].display(), "comment");
5664        assert_eq!(heartbeat["comment"].display(), "tick");
5665
5666        assert!(expect_bool(
5667            vm_sse_server_mock_disconnect(&stream_id).expect("disconnect")
5668        ));
5669        assert!(expect_bool(
5670            vm_sse_server_observed_bool(&stream_id, "test", |handle| handle.disconnected)
5671                .expect("observed")
5672        ));
5673        assert!(!expect_bool(
5674            vm_sse_server_send(
5675                &stream_id,
5676                &VmValue::String(Rc::from("late")),
5677                &BTreeMap::new()
5678            )
5679            .expect("late send")
5680        ));
5681
5682        let cancelled = vm_sse_server_response(&BTreeMap::new()).expect("cancelled response");
5683        let cancelled_id = handle_from_value(&cancelled, "test").expect("cancelled handle");
5684        assert!(expect_bool(
5685            vm_sse_server_cancel(&cancelled_id, Some(&VmValue::String(Rc::from("stop"))))
5686                .expect("cancel")
5687        ));
5688        assert!(expect_bool(
5689            vm_sse_server_observed_bool(&cancelled_id, "test", |handle| handle.cancelled)
5690                .expect("cancelled observed")
5691        ));
5692        reset_http_state();
5693    }
5694
5695    #[test]
5696    fn server_sse_rejects_oversized_events() {
5697        reset_http_state();
5698        let response = vm_sse_server_response(&BTreeMap::from([(
5699            "max_event_bytes".to_string(),
5700            VmValue::Int(12),
5701        )]))
5702        .expect("response");
5703        let stream_id = handle_from_value(&response, "test").expect("handle");
5704        let err = vm_sse_server_send(
5705            &stream_id,
5706            &VmValue::String(Rc::from("this is too large")),
5707            &BTreeMap::new(),
5708        )
5709        .expect_err("oversized event should reject");
5710        assert!(err.to_string().contains("max_event_bytes"));
5711        reset_http_state();
5712    }
5713}