1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::io::Write;
4use std::net::{Shutdown, TcpListener, TcpStream};
5use std::rc::Rc;
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 mpsc, Arc, RwLock,
9};
10use std::thread;
11use std::time::{Duration, SystemTime};
12
13use crate::value::{VmClosure, VmError, VmValue};
14use crate::vm::Vm;
15
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest_eventsource::{Event as SseEvent, EventSource};
19use sha2::{Digest, Sha256};
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
22use tokio_tungstenite::tungstenite::http::{HeaderValue, StatusCode};
23use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
24use tokio_tungstenite::tungstenite::protocol::CloseFrame;
25use tokio_tungstenite::tungstenite::Message as WsMessage;
26use x509_parser::prelude::{FromDer, X509Certificate};
27
28#[derive(Clone)]
31struct MockResponse {
32 status: i64,
33 body: String,
34 headers: BTreeMap<String, VmValue>,
35}
36
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct HttpMockResponse {
39 pub status: i64,
40 pub body: String,
41 pub headers: BTreeMap<String, String>,
42}
43
44impl HttpMockResponse {
45 pub fn new(status: i64, body: impl Into<String>) -> Self {
46 Self {
47 status,
48 body: body.into(),
49 headers: BTreeMap::new(),
50 }
51 }
52
53 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
54 self.headers.insert(name.into(), value.into());
55 self
56 }
57}
58
59impl From<HttpMockResponse> for MockResponse {
60 fn from(value: HttpMockResponse) -> Self {
61 Self {
62 status: value.status,
63 body: value.body,
64 headers: value
65 .headers
66 .into_iter()
67 .map(|(key, value)| (key, VmValue::String(Rc::from(value))))
68 .collect(),
69 }
70 }
71}
72
73struct HttpMock {
74 method: String,
75 url_pattern: String,
76 responses: Vec<MockResponse>,
77 next_response: usize,
78}
79
80#[derive(Clone)]
81struct HttpMockCall {
82 method: String,
83 url: String,
84 headers: BTreeMap<String, VmValue>,
85 body: Option<String>,
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct HttpMockCallSnapshot {
90 pub method: String,
91 pub url: String,
92 pub headers: BTreeMap<String, String>,
93 pub body: Option<String>,
94}
95
96#[derive(Clone)]
97struct RetryConfig {
98 max: u32,
99 backoff_ms: u64,
100 retryable_statuses: Vec<u16>,
101 retryable_methods: Vec<String>,
102 respect_retry_after: bool,
103}
104
105#[derive(Clone)]
106struct HttpRequestConfig {
107 total_timeout_ms: u64,
108 connect_timeout_ms: Option<u64>,
109 read_timeout_ms: Option<u64>,
110 retry: RetryConfig,
111 follow_redirects: bool,
112 max_redirects: usize,
113 proxy: Option<HttpProxyConfig>,
114 tls: HttpTlsConfig,
115 decompress: bool,
116}
117
118#[derive(Clone, Default)]
119struct HttpTlsConfig {
120 ca_bundle_path: Option<String>,
121 client_cert_path: Option<String>,
122 client_key_path: Option<String>,
123 client_identity_path: Option<String>,
124 pinned_sha256: Vec<String>,
125}
126
127#[derive(Clone)]
128struct HttpProxyConfig {
129 url: String,
130 auth: Option<(String, String)>,
131 no_proxy: Option<String>,
132}
133
134#[derive(Clone)]
135struct HttpSession {
136 client: reqwest::Client,
137 options: BTreeMap<String, VmValue>,
138}
139
140struct HttpRequestParts {
141 method: reqwest::Method,
142 headers: reqwest::header::HeaderMap,
143 recorded_headers: BTreeMap<String, VmValue>,
144 body: Option<String>,
145 multipart: Option<MultipartRequest>,
146}
147
148#[derive(Clone)]
149struct MultipartRequest {
150 parts: Vec<MultipartField>,
151 mock_body: String,
152}
153
154#[derive(Clone)]
155struct MultipartField {
156 name: String,
157 value: Vec<u8>,
158 filename: Option<String>,
159 content_type: Option<String>,
160}
161
162struct HttpStreamHandle {
163 kind: HttpStreamKind,
164 status: i64,
165 headers: BTreeMap<String, VmValue>,
166 pending: VecDeque<u8>,
167 closed: bool,
168}
169
170enum HttpStreamKind {
171 Real(Rc<tokio::sync::Mutex<reqwest::Response>>),
172 Fake,
173}
174
175struct SseMock {
176 url_pattern: String,
177 events: Vec<MockStreamEvent>,
178}
179
180#[derive(Clone)]
181struct MockStreamEvent {
182 event_type: String,
183 data: String,
184 id: Option<String>,
185 retry_ms: Option<i64>,
186}
187
188struct SseHandle {
189 kind: SseHandleKind,
190 url: String,
191 max_events: usize,
192 max_message_bytes: usize,
193 received: usize,
194}
195
196enum SseHandleKind {
197 Real(Rc<tokio::sync::Mutex<EventSource>>),
198 Fake(Rc<tokio::sync::Mutex<FakeSseStream>>),
199}
200
201struct FakeSseStream {
202 events: VecDeque<MockStreamEvent>,
203 opened: bool,
204 closed: bool,
205}
206
207struct SseServerHandle {
208 status: i64,
209 headers: BTreeMap<String, VmValue>,
210 frames: VecDeque<String>,
211 max_event_bytes: usize,
212 max_buffered_events: usize,
213 sent_events: usize,
214 flushed_events: usize,
215 closed: bool,
216 disconnected: bool,
217 cancelled: bool,
218 cancel_reason: Option<String>,
219}
220
221struct WebSocketMock {
222 url_pattern: String,
223 messages: Vec<MockWsMessage>,
224 echo: bool,
225}
226
227#[derive(Clone)]
228struct MockWsMessage {
229 message_type: String,
230 data: Vec<u8>,
231 close_code: Option<u16>,
232 close_reason: Option<String>,
233}
234
235type RealWebSocket =
236 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
237
238struct WebSocketHandle {
239 kind: WebSocketHandleKind,
240 url: String,
241 max_messages: usize,
242 max_message_bytes: usize,
243 received: usize,
244}
245
246enum WebSocketHandleKind {
247 Real(Rc<tokio::sync::Mutex<RealWebSocket>>),
248 Fake(Rc<tokio::sync::Mutex<FakeWebSocket>>),
249 Server(Rc<tokio::sync::Mutex<ServerWebSocket>>),
250}
251
252struct FakeWebSocket {
253 messages: VecDeque<MockWsMessage>,
254 echo: bool,
255 closed: bool,
256}
257
258struct WebSocketServer {
259 addr: String,
260 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
261 events: Rc<tokio::sync::Mutex<mpsc::Receiver<WebSocketServerEvent>>>,
262 running: Arc<AtomicBool>,
263}
264
265#[derive(Clone)]
266struct WebSocketRoute {
267 path: String,
268 bearer_token: Option<String>,
269 max_messages: usize,
270 max_message_bytes: usize,
271 send_buffer_messages: usize,
272 idle_timeout_ms: u64,
273}
274
275struct WebSocketServerEvent {
276 handle: ServerWebSocket,
277 path: String,
278 peer: String,
279 headers: BTreeMap<String, String>,
280 max_messages: usize,
281 max_message_bytes: usize,
282}
283
284struct ServerWebSocket {
285 incoming: VecDeque<MockWsMessage>,
286 incoming_rx: mpsc::Receiver<MockWsMessage>,
287 outgoing_tx: mpsc::SyncSender<ServerWebSocketCommand>,
288 closed: bool,
289}
290
291enum ServerWebSocketCommand {
292 Send(MockWsMessage),
293 Close(Option<u16>, Option<String>),
294}
295
296#[derive(Clone)]
297struct TransportMockCall {
298 kind: String,
299 handle: Option<String>,
300 url: String,
301 message_type: Option<String>,
302 data: Option<String>,
303}
304
305#[derive(Clone)]
306struct HttpServerRoute {
307 method: String,
308 template: String,
309 handler: Rc<VmClosure>,
310 max_body_bytes: Option<usize>,
311 retain_raw_body: Option<bool>,
312}
313
314#[derive(Clone)]
315struct HttpServer {
316 routes: Vec<HttpServerRoute>,
317 before: Vec<Rc<VmClosure>>,
318 after: Vec<Rc<VmClosure>>,
319 ready: bool,
320 readiness: Option<Rc<VmClosure>>,
321 shutdown_hooks: Vec<Rc<VmClosure>>,
322 shutdown: bool,
323 max_body_bytes: usize,
324 retain_raw_body: bool,
325}
326
327const DEFAULT_TIMEOUT_MS: u64 = 30_000;
328const DEFAULT_BACKOFF_MS: u64 = 1_000;
329const MAX_RETRY_DELAY_MS: u64 = 60_000;
330const DEFAULT_RETRYABLE_STATUSES: [u16; 6] = [408, 429, 500, 502, 503, 504];
331const DEFAULT_RETRYABLE_METHODS: [&str; 5] = ["GET", "HEAD", "PUT", "DELETE", "OPTIONS"];
332const DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS: u64 = 30_000;
333const DEFAULT_MAX_STREAM_EVENTS: usize = 10_000;
334const DEFAULT_MAX_MESSAGE_BYTES: usize = 1024 * 1024;
335const DEFAULT_SERVER_MAX_BODY_BYTES: usize = 1024 * 1024;
336const DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS: u64 = 30_000;
337const MAX_HTTP_SESSIONS: usize = 64;
338const MAX_HTTP_STREAMS: usize = 64;
339const MAX_SSE_STREAMS: usize = 64;
340const MAX_SSE_SERVER_STREAMS: usize = 64;
341const MAX_WEBSOCKETS: usize = 64;
342const MULTIPART_MOCK_BOUNDARY: &str = "harn-boundary";
343const MAX_HTTP_SERVERS: usize = 128;
344const MAX_WEBSOCKET_SERVERS: usize = 16;
345
346thread_local! {
347 static HTTP_MOCKS: RefCell<Vec<HttpMock>> = const { RefCell::new(Vec::new()) };
348 static HTTP_MOCK_CALLS: RefCell<Vec<HttpMockCall>> = const { RefCell::new(Vec::new()) };
349 static HTTP_CLIENTS: RefCell<HashMap<String, reqwest::Client>> = RefCell::new(HashMap::new());
350 static HTTP_SESSIONS: RefCell<HashMap<String, HttpSession>> = RefCell::new(HashMap::new());
351 static HTTP_STREAMS: RefCell<HashMap<String, HttpStreamHandle>> = RefCell::new(HashMap::new());
352 static SSE_MOCKS: RefCell<Vec<SseMock>> = const { RefCell::new(Vec::new()) };
353 static SSE_HANDLES: RefCell<HashMap<String, SseHandle>> = RefCell::new(HashMap::new());
354 static SSE_SERVER_HANDLES: RefCell<HashMap<String, SseServerHandle>> = RefCell::new(HashMap::new());
355 static WEBSOCKET_MOCKS: RefCell<Vec<WebSocketMock>> = const { RefCell::new(Vec::new()) };
356 static WEBSOCKET_HANDLES: RefCell<HashMap<String, WebSocketHandle>> = RefCell::new(HashMap::new());
357 static WEBSOCKET_SERVERS: RefCell<HashMap<String, WebSocketServer>> = RefCell::new(HashMap::new());
358 static TRANSPORT_MOCK_CALLS: RefCell<Vec<TransportMockCall>> = const { RefCell::new(Vec::new()) };
359 static TRANSPORT_HANDLE_COUNTER: RefCell<u64> = const { RefCell::new(0) };
360 static HTTP_SERVERS: RefCell<HashMap<String, HttpServer>> = RefCell::new(HashMap::new());
361}
362
363pub fn reset_http_state() {
365 HTTP_MOCKS.with(|m| m.borrow_mut().clear());
366 HTTP_MOCK_CALLS.with(|c| c.borrow_mut().clear());
367 HTTP_CLIENTS.with(|clients| clients.borrow_mut().clear());
368 HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().clear());
369 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
370 SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
371 SSE_HANDLES.with(|handles| {
372 for handle in handles.borrow_mut().values_mut() {
373 if let SseHandleKind::Real(stream) = &handle.kind {
374 if let Ok(mut stream) = stream.try_lock() {
375 stream.close();
376 }
377 }
378 }
379 handles.borrow_mut().clear();
380 });
381 SSE_SERVER_HANDLES.with(|handles| handles.borrow_mut().clear());
382 WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
383 WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
384 WEBSOCKET_SERVERS.with(|servers| {
385 let mut servers = servers.borrow_mut();
386 for server in servers.values() {
387 server.running.store(false, Ordering::SeqCst);
388 let _ = TcpStream::connect(&server.addr);
389 }
390 servers.clear();
391 });
392 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
393 TRANSPORT_HANDLE_COUNTER.with(|counter| *counter.borrow_mut() = 0);
394 HTTP_SERVERS.with(|servers| servers.borrow_mut().clear());
395}
396
397pub fn push_http_mock(
398 method: impl Into<String>,
399 url_pattern: impl Into<String>,
400 responses: Vec<HttpMockResponse>,
401) {
402 let responses = if responses.is_empty() {
403 vec![MockResponse::from(HttpMockResponse::new(200, ""))]
404 } else {
405 responses.into_iter().map(MockResponse::from).collect()
406 };
407 let method = method.into();
408 let url_pattern = url_pattern.into();
409 HTTP_MOCKS.with(|mocks| {
410 let mut mocks = mocks.borrow_mut();
411 mocks.retain(|mock| !(mock.method == method && mock.url_pattern == url_pattern));
416 mocks.push(HttpMock {
417 method,
418 url_pattern,
419 responses,
420 next_response: 0,
421 });
422 });
423}
424
425pub fn http_mock_calls_snapshot() -> Vec<HttpMockCallSnapshot> {
426 HTTP_MOCK_CALLS.with(|calls| {
427 calls
428 .borrow()
429 .iter()
430 .map(|call| HttpMockCallSnapshot {
431 method: call.method.clone(),
432 url: call.url.clone(),
433 headers: call
434 .headers
435 .iter()
436 .map(|(key, value)| (key.clone(), value.display()))
437 .collect(),
438 body: call.body.clone(),
439 })
440 .collect()
441 })
442}
443
444fn url_matches(pattern: &str, url: &str) -> bool {
446 if pattern == "*" {
447 return true;
448 }
449 if !pattern.contains('*') {
450 return pattern == url;
451 }
452 let parts: Vec<&str> = pattern.split('*').collect();
454 let mut remaining = url;
455 for (i, part) in parts.iter().enumerate() {
456 if part.is_empty() {
457 continue;
458 }
459 if i == 0 {
460 if !remaining.starts_with(part) {
461 return false;
462 }
463 remaining = &remaining[part.len()..];
464 } else if i == parts.len() - 1 {
465 if !remaining.ends_with(part) {
466 return false;
467 }
468 remaining = "";
469 } else {
470 match remaining.find(part) {
471 Some(pos) => remaining = &remaining[pos + part.len()..],
472 None => return false,
473 }
474 }
475 }
476 true
477}
478
479fn build_http_response(status: i64, headers: BTreeMap<String, VmValue>, body: String) -> VmValue {
481 let mut result = BTreeMap::new();
482 result.insert("status".to_string(), VmValue::Int(status));
483 result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
484 result.insert("body".to_string(), VmValue::String(Rc::from(body)));
485 result.insert(
486 "ok".to_string(),
487 VmValue::Bool((200..300).contains(&(status as u16))),
488 );
489 VmValue::Dict(Rc::new(result))
490}
491
492fn build_http_download_response(
493 status: i64,
494 headers: BTreeMap<String, VmValue>,
495 bytes_written: u64,
496) -> VmValue {
497 let mut result = BTreeMap::new();
498 result.insert("status".to_string(), VmValue::Int(status));
499 result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
500 result.insert(
501 "bytes_written".to_string(),
502 VmValue::Int(bytes_written as i64),
503 );
504 result.insert(
505 "ok".to_string(),
506 VmValue::Bool((200..300).contains(&(status as u16))),
507 );
508 VmValue::Dict(Rc::new(result))
509}
510
511fn response_headers(headers: &reqwest::header::HeaderMap) -> BTreeMap<String, VmValue> {
512 let mut resp_headers = BTreeMap::new();
513 for (name, value) in headers {
514 if let Ok(v) = value.to_str() {
515 resp_headers.insert(name.as_str().to_string(), VmValue::String(Rc::from(v)));
516 }
517 }
518 resp_headers
519}
520
521fn vm_error(message: impl Into<String>) -> VmError {
522 VmError::Thrown(VmValue::String(Rc::from(message.into())))
523}
524
525fn next_transport_handle(prefix: &str) -> String {
526 TRANSPORT_HANDLE_COUNTER.with(|counter| {
527 let mut counter = counter.borrow_mut();
528 *counter += 1;
529 format!("{prefix}-{}", *counter)
530 })
531}
532
533fn handle_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
534 match value {
535 VmValue::String(handle) => Ok(handle.to_string()),
536 VmValue::Dict(dict) => dict
537 .get("id")
538 .map(|id| id.display())
539 .filter(|id| !id.is_empty())
540 .ok_or_else(|| vm_error(format!("{builtin}: handle dict must contain id"))),
541 _ => Err(vm_error(format!(
542 "{builtin}: first argument must be a handle string or dict"
543 ))),
544 }
545}
546
547fn get_options_arg(args: &[VmValue], index: usize) -> BTreeMap<String, VmValue> {
548 args.get(index)
549 .and_then(|value| value.as_dict())
550 .cloned()
551 .unwrap_or_default()
552}
553
554fn merge_options(
555 base: &BTreeMap<String, VmValue>,
556 overrides: &BTreeMap<String, VmValue>,
557) -> BTreeMap<String, VmValue> {
558 let mut merged = base.clone();
559 for (key, value) in overrides {
560 merged.insert(key.clone(), value.clone());
561 }
562 merged
563}
564
565fn resolve_http_path(
566 builtin: &str,
567 path: &str,
568 access: crate::stdlib::sandbox::FsAccess,
569) -> Result<std::path::PathBuf, VmError> {
570 let resolved = crate::stdlib::process::resolve_source_relative_path(path);
571 crate::stdlib::sandbox::enforce_fs_path(builtin, &resolved, access)?;
572 Ok(resolved)
573}
574
575fn value_to_bytes(value: &VmValue) -> Vec<u8> {
576 match value {
577 VmValue::Bytes(bytes) => bytes.as_ref().clone(),
578 other => other.display().into_bytes(),
579 }
580}
581
582fn parse_multipart_field(value: &VmValue) -> Result<MultipartField, VmError> {
583 let dict = value
584 .as_dict()
585 .ok_or_else(|| vm_error("http: multipart entries must be dicts"))?;
586 let name = dict
587 .get("name")
588 .map(|value| value.display())
589 .filter(|value| !value.is_empty())
590 .ok_or_else(|| vm_error("http: multipart entry requires name"))?;
591 let content_type = dict
592 .get("content_type")
593 .or_else(|| dict.get("mime_type"))
594 .map(|value| value.display())
595 .filter(|value| !value.is_empty());
596
597 let mut filename = dict
598 .get("filename")
599 .map(|value| value.display())
600 .filter(|value| !value.is_empty());
601 let value = if let Some(path_value) = dict.get("path") {
602 let path = path_value.display();
603 let resolved = resolve_http_path(
604 "http multipart",
605 &path,
606 crate::stdlib::sandbox::FsAccess::Read,
607 )?;
608 if filename.is_none() {
609 filename = resolved
610 .file_name()
611 .and_then(|name| name.to_str())
612 .map(|name| name.to_string());
613 }
614 std::fs::read(&resolved).map_err(|error| {
615 vm_error(format!(
616 "http: failed to read multipart file {}: {error}",
617 resolved.display()
618 ))
619 })?
620 } else if let Some(base64_value) = dict.get("value_base64").or_else(|| dict.get("base64")) {
621 base64::engine::general_purpose::STANDARD
622 .decode(base64_value.display())
623 .map_err(|error| vm_error(format!("http: invalid multipart base64 value: {error}")))?
624 } else {
625 dict.get("value").map(value_to_bytes).ok_or_else(|| {
626 vm_error("http: multipart entry requires value, value_base64, or path")
627 })?
628 };
629
630 Ok(MultipartField {
631 name,
632 value,
633 filename,
634 content_type,
635 })
636}
637
638fn parse_multipart_request(
639 options: &BTreeMap<String, VmValue>,
640) -> Result<Option<MultipartRequest>, VmError> {
641 let Some(value) = options.get("multipart") else {
642 return Ok(None);
643 };
644 let VmValue::List(items) = value else {
645 return Err(vm_error("http: multipart must be a list"));
646 };
647 let parts = items
648 .iter()
649 .map(parse_multipart_field)
650 .collect::<Result<Vec<_>, _>>()?;
651 let mock_body = multipart_mock_body(&parts);
652 Ok(Some(MultipartRequest { parts, mock_body }))
653}
654
655fn multipart_mock_body(parts: &[MultipartField]) -> String {
656 let mut out = String::new();
657 for part in parts {
658 out.push_str("--");
659 out.push_str(MULTIPART_MOCK_BOUNDARY);
660 out.push_str("\r\nContent-Disposition: form-data; name=\"");
661 out.push_str(&part.name);
662 out.push('"');
663 if let Some(filename) = &part.filename {
664 out.push_str("; filename=\"");
665 out.push_str(filename);
666 out.push('"');
667 }
668 out.push_str("\r\n");
669 if let Some(content_type) = &part.content_type {
670 out.push_str("Content-Type: ");
671 out.push_str(content_type);
672 out.push_str("\r\n");
673 }
674 out.push_str("\r\n");
675 out.push_str(&String::from_utf8_lossy(&part.value));
676 out.push_str("\r\n");
677 }
678 out.push_str("--");
679 out.push_str(MULTIPART_MOCK_BOUNDARY);
680 out.push_str("--\r\n");
681 out
682}
683
684fn multipart_form(request: &MultipartRequest) -> Result<reqwest::multipart::Form, VmError> {
685 let mut form = reqwest::multipart::Form::new();
686 for field in &request.parts {
687 let mut part = reqwest::multipart::Part::bytes(field.value.clone());
688 if let Some(filename) = &field.filename {
689 part = part.file_name(filename.clone());
690 }
691 if let Some(content_type) = &field.content_type {
692 part = part.mime_str(content_type).map_err(|error| {
693 vm_error(format!("http: invalid multipart content_type: {error}"))
694 })?;
695 }
696 form = form.part(field.name.clone(), part);
697 }
698 Ok(form)
699}
700
701fn transport_limit_option(options: &BTreeMap<String, VmValue>, key: &str, default: usize) -> usize {
702 options
703 .get(key)
704 .and_then(|value| value.as_int())
705 .map(|value| value.max(0) as usize)
706 .unwrap_or(default)
707}
708
709fn receive_timeout_arg(args: &[VmValue], index: usize) -> u64 {
710 match args.get(index) {
711 Some(VmValue::Duration(ms)) => (*ms).max(0) as u64,
712 Some(value) => value
713 .as_int()
714 .map(|ms| ms.max(0) as u64)
715 .unwrap_or(DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS),
716 None => DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS,
717 }
718}
719
720fn timeout_event() -> VmValue {
721 let mut dict = BTreeMap::new();
722 dict.insert("type".to_string(), VmValue::String(Rc::from("timeout")));
723 VmValue::Dict(Rc::new(dict))
724}
725
726fn closed_event() -> VmValue {
727 let mut dict = BTreeMap::new();
728 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
729 VmValue::Dict(Rc::new(dict))
730}
731
732fn sse_server_closed_event() -> VmValue {
733 let mut dict = BTreeMap::new();
734 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
735 dict.insert("server_closed".to_string(), VmValue::Bool(true));
736 VmValue::Dict(Rc::new(dict))
737}
738
739fn record_transport_call(call: TransportMockCall) {
740 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().push(call));
741}
742
743async fn http_verb_handler(
747 method: &str,
748 has_body: bool,
749 args: Vec<VmValue>,
750) -> Result<VmValue, VmError> {
751 let url = args.first().map(|a| a.display()).unwrap_or_default();
752 if url.is_empty() {
753 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
754 "http_{}: URL is required",
755 method.to_ascii_lowercase()
756 )))));
757 }
758 let mut options = if has_body {
759 match (args.get(1), args.get(2)) {
760 (Some(VmValue::Dict(d)), None) => (**d).clone(),
761 (_, Some(VmValue::Dict(d))) => (**d).clone(),
762 _ => BTreeMap::new(),
763 }
764 } else {
765 match args.get(1) {
766 Some(VmValue::Dict(d)) => (**d).clone(),
767 _ => BTreeMap::new(),
768 }
769 };
770 if has_body && !(matches!(args.get(1), Some(VmValue::Dict(_))) && args.get(2).is_none()) {
771 let body = args.get(1).map(|a| a.display()).unwrap_or_default();
772 options.insert("body".to_string(), VmValue::String(Rc::from(body)));
773 }
774 vm_execute_http_request(method, &url, &options).await
775}
776
777fn parse_mock_response_dict(response: &BTreeMap<String, VmValue>) -> MockResponse {
778 let status = response
779 .get("status")
780 .and_then(|v| v.as_int())
781 .unwrap_or(200);
782 let body = response
783 .get("body")
784 .map(|v| v.display())
785 .unwrap_or_default();
786 let headers = response
787 .get("headers")
788 .and_then(|v| v.as_dict())
789 .cloned()
790 .unwrap_or_default();
791 MockResponse {
792 status,
793 body,
794 headers,
795 }
796}
797
798fn parse_mock_responses(response: &BTreeMap<String, VmValue>) -> Vec<MockResponse> {
799 let scripted = response
800 .get("responses")
801 .and_then(|value| match value {
802 VmValue::List(items) => Some(
803 items
804 .iter()
805 .filter_map(|item| item.as_dict().map(parse_mock_response_dict))
806 .collect::<Vec<_>>(),
807 ),
808 _ => None,
809 })
810 .unwrap_or_default();
811
812 if scripted.is_empty() {
813 vec![parse_mock_response_dict(response)]
814 } else {
815 scripted
816 }
817}
818
819fn consume_http_mock(
820 method: &str,
821 url: &str,
822 headers: BTreeMap<String, VmValue>,
823 body: Option<String>,
824) -> Option<MockResponse> {
825 let response = HTTP_MOCKS.with(|mocks| {
826 let mut mocks = mocks.borrow_mut();
827 for mock in mocks.iter_mut() {
828 if (mock.method == "*" || mock.method.eq_ignore_ascii_case(method))
829 && url_matches(&mock.url_pattern, url)
830 {
831 let Some(last_index) = mock.responses.len().checked_sub(1) else {
832 continue;
833 };
834 let index = mock.next_response.min(last_index);
835 let response = mock.responses[index].clone();
836 if mock.next_response < last_index {
837 mock.next_response += 1;
838 }
839 return Some(response);
840 }
841 }
842 None
843 })?;
844
845 HTTP_MOCK_CALLS.with(|calls| {
846 calls.borrow_mut().push(HttpMockCall {
847 method: method.to_string(),
848 url: url.to_string(),
849 headers,
850 body,
851 });
852 });
853
854 Some(response)
855}
856
857fn vm_string(value: impl AsRef<str>) -> VmValue {
858 VmValue::String(Rc::from(value.as_ref()))
859}
860
861fn dict_value(entries: BTreeMap<String, VmValue>) -> VmValue {
862 VmValue::Dict(Rc::new(entries))
863}
864
865fn get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
866 match options.get(key) {
867 Some(VmValue::Bool(value)) => *value,
868 _ => default,
869 }
870}
871
872fn get_usize_option(
873 options: &BTreeMap<String, VmValue>,
874 key: &str,
875 default: usize,
876) -> Result<usize, VmError> {
877 match options.get(key).and_then(VmValue::as_int) {
878 Some(value) if value >= 0 => Ok(value as usize),
879 Some(_) => Err(vm_error(format!("http_server: {key} must be non-negative"))),
880 None => Ok(default),
881 }
882}
883
884fn get_optional_usize_option(
885 options: &BTreeMap<String, VmValue>,
886 key: &str,
887) -> Result<Option<usize>, VmError> {
888 match options.get(key).and_then(VmValue::as_int) {
889 Some(value) if value >= 0 => Ok(Some(value as usize)),
890 Some(_) => Err(vm_error(format!(
891 "http_server_route: {key} must be non-negative"
892 ))),
893 None => Ok(None),
894 }
895}
896
897fn server_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
898 handle_from_value(value, builtin)
899}
900
901fn closure_arg(args: &[VmValue], index: usize, builtin: &str) -> Result<Rc<VmClosure>, VmError> {
902 match args.get(index) {
903 Some(VmValue::Closure(closure)) => Ok(closure.clone()),
904 Some(other) => Err(vm_error(format!(
905 "{builtin}: argument {} must be a closure, got {}",
906 index + 1,
907 other.type_name()
908 ))),
909 None => Err(vm_error(format!(
910 "{builtin}: missing closure argument {}",
911 index + 1
912 ))),
913 }
914}
915
916fn http_server_handle_value(id: &str) -> VmValue {
917 let mut dict = BTreeMap::new();
918 dict.insert("id".to_string(), vm_string(id));
919 dict.insert("kind".to_string(), vm_string("http_server"));
920 dict_value(dict)
921}
922
923fn header_lookup_value(headers: &BTreeMap<String, VmValue>, name: &str) -> VmValue {
924 headers
925 .iter()
926 .find(|(candidate, _)| candidate.eq_ignore_ascii_case(name))
927 .map(|(_, value)| value.clone())
928 .unwrap_or(VmValue::Nil)
929}
930
931fn headers_from_value(value: &VmValue) -> BTreeMap<String, VmValue> {
932 match value {
933 VmValue::Dict(dict) => dict
934 .get("headers")
935 .and_then(VmValue::as_dict)
936 .map(|headers| {
937 headers
938 .iter()
939 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
940 .collect()
941 })
942 .unwrap_or_else(|| {
943 dict.iter()
944 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
945 .collect()
946 }),
947 _ => BTreeMap::new(),
948 }
949}
950
951fn normalize_headers(value: Option<&VmValue>) -> BTreeMap<String, VmValue> {
952 match value.and_then(VmValue::as_dict) {
953 Some(headers) => headers
954 .iter()
955 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
956 .collect(),
957 None => BTreeMap::new(),
958 }
959}
960
961fn percent_decode(input: &str) -> String {
962 let bytes = input.as_bytes();
963 let mut out = Vec::with_capacity(bytes.len());
964 let mut i = 0;
965 while i < bytes.len() {
966 if bytes[i] == b'+' {
967 out.push(b' ');
968 i += 1;
969 continue;
970 }
971 if bytes[i] == b'%' && i + 2 < bytes.len() {
972 if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
973 out.push((hi << 4) | lo);
974 i += 3;
975 continue;
976 }
977 }
978 out.push(bytes[i]);
979 i += 1;
980 }
981 String::from_utf8_lossy(&out).into_owned()
982}
983
984fn hex_val(byte: u8) -> Option<u8> {
985 match byte {
986 b'0'..=b'9' => Some(byte - b'0'),
987 b'a'..=b'f' => Some(byte - b'a' + 10),
988 b'A'..=b'F' => Some(byte - b'A' + 10),
989 _ => None,
990 }
991}
992
993fn split_path_and_query(raw_path: &str) -> (String, BTreeMap<String, VmValue>) {
994 let (path, query) = raw_path.split_once('?').unwrap_or((raw_path, ""));
995 let mut query_map = BTreeMap::new();
996 for pair in query.split('&').filter(|part| !part.is_empty()) {
997 let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
998 query_map.insert(percent_decode(key), vm_string(percent_decode(value)));
999 }
1000 (
1001 if path.is_empty() { "/" } else { path }.to_string(),
1002 query_map,
1003 )
1004}
1005
1006fn request_body_bytes(input: &BTreeMap<String, VmValue>) -> Vec<u8> {
1007 match input.get("raw_body").or_else(|| input.get("body")) {
1008 Some(VmValue::Bytes(bytes)) => bytes.as_ref().clone(),
1009 Some(value) => value.display().into_bytes(),
1010 None => Vec::new(),
1011 }
1012}
1013
1014fn request_value(
1015 method: &str,
1016 path: &str,
1017 path_params: BTreeMap<String, VmValue>,
1018 mut query: BTreeMap<String, VmValue>,
1019 input: &BTreeMap<String, VmValue>,
1020 body_bytes: &[u8],
1021 retain_raw_body: bool,
1022) -> VmValue {
1023 if let Some(explicit_query) = input.get("query").and_then(VmValue::as_dict) {
1024 query.extend(
1025 explicit_query
1026 .iter()
1027 .map(|(key, value)| (key.clone(), value.clone())),
1028 );
1029 }
1030
1031 let headers = normalize_headers(input.get("headers"));
1032 let body = String::from_utf8_lossy(body_bytes).into_owned();
1033 let mut request = BTreeMap::new();
1034 request.insert("method".to_string(), vm_string(method));
1035 request.insert("path".to_string(), vm_string(path));
1036 let path_params = dict_value(path_params);
1037 request.insert("path_params".to_string(), path_params.clone());
1038 request.insert("params".to_string(), path_params);
1039 request.insert("query".to_string(), dict_value(query));
1040 request.insert("headers".to_string(), dict_value(headers));
1041 request.insert("body".to_string(), vm_string(body));
1042 request.insert(
1043 "raw_body".to_string(),
1044 if retain_raw_body {
1045 VmValue::Bytes(Rc::new(body_bytes.to_vec()))
1046 } else {
1047 VmValue::Nil
1048 },
1049 );
1050 request.insert(
1051 "body_bytes".to_string(),
1052 VmValue::Int(body_bytes.len() as i64),
1053 );
1054 request.insert(
1055 "remote_addr".to_string(),
1056 input
1057 .get("remote_addr")
1058 .or_else(|| input.get("remote"))
1059 .map(|value| vm_string(value.display()))
1060 .unwrap_or(VmValue::Nil),
1061 );
1062 request.insert(
1063 "client_ip".to_string(),
1064 input
1065 .get("client_ip")
1066 .or_else(|| input.get("remote_ip"))
1067 .or_else(|| input.get("ip"))
1068 .map(|value| vm_string(value.display()))
1069 .unwrap_or(VmValue::Nil),
1070 );
1071 dict_value(request)
1072}
1073
1074fn normalize_status(status: i64) -> i64 {
1075 if (100..=999).contains(&status) {
1076 status
1077 } else {
1078 500
1079 }
1080}
1081
1082fn response_with_kind(
1083 status: i64,
1084 mut headers: BTreeMap<String, VmValue>,
1085 body: VmValue,
1086 body_kind: &str,
1087) -> VmValue {
1088 let status = normalize_status(status);
1089 let mut response = BTreeMap::new();
1090 if body_kind == "json" && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
1091 {
1092 headers.insert(
1093 "content-type".to_string(),
1094 vm_string("application/json; charset=utf-8"),
1095 );
1096 } else if body_kind == "text"
1097 && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
1098 {
1099 headers.insert(
1100 "content-type".to_string(),
1101 vm_string("text/plain; charset=utf-8"),
1102 );
1103 }
1104 response.insert("status".to_string(), VmValue::Int(status));
1105 response.insert("headers".to_string(), dict_value(headers));
1106 response.insert(
1107 "ok".to_string(),
1108 VmValue::Bool((200..300).contains(&status)),
1109 );
1110 response.insert("body_kind".to_string(), vm_string(body_kind));
1111 match body {
1112 VmValue::Bytes(bytes) => {
1113 response.insert(
1114 "body".to_string(),
1115 vm_string(String::from_utf8_lossy(&bytes)),
1116 );
1117 response.insert("raw_body".to_string(), VmValue::Bytes(bytes));
1118 }
1119 other => {
1120 response.insert("body".to_string(), vm_string(other.display()));
1121 response.insert(
1122 "raw_body".to_string(),
1123 VmValue::Bytes(Rc::new(other.display().into_bytes())),
1124 );
1125 }
1126 }
1127 dict_value(response)
1128}
1129
1130fn normalize_response(value: VmValue) -> VmValue {
1131 match value {
1132 VmValue::Dict(dict) if dict.contains_key("status") => {
1133 let status = dict.get("status").and_then(VmValue::as_int).unwrap_or(200);
1134 let headers = dict
1135 .get("headers")
1136 .and_then(VmValue::as_dict)
1137 .cloned()
1138 .unwrap_or_default();
1139 let body_kind = dict
1140 .get("body_kind")
1141 .or_else(|| dict.get("kind"))
1142 .map(|value| value.display())
1143 .unwrap_or_else(|| "text".to_string());
1144 let body = dict
1145 .get("raw_body")
1146 .filter(|value| matches!(value, VmValue::Bytes(_)))
1147 .or_else(|| dict.get("body"))
1148 .cloned()
1149 .unwrap_or(VmValue::Nil);
1150 response_with_kind(status, headers, body, &body_kind)
1151 }
1152 VmValue::Nil => response_with_kind(204, BTreeMap::new(), VmValue::Nil, "text"),
1153 other => response_with_kind(200, BTreeMap::new(), other, "text"),
1154 }
1155}
1156
1157fn body_limit_response(limit: usize, actual: usize) -> VmValue {
1158 let mut headers = BTreeMap::new();
1159 headers.insert(
1160 "content-type".to_string(),
1161 vm_string("text/plain; charset=utf-8"),
1162 );
1163 headers.insert("connection".to_string(), vm_string("close"));
1164 headers.insert(
1165 "x-harn-body-limit".to_string(),
1166 vm_string(limit.to_string()),
1167 );
1168 response_with_kind(
1169 413,
1170 headers,
1171 vm_string(format!("request body too large: {actual} > {limit} bytes")),
1172 "text",
1173 )
1174}
1175
1176fn not_found_response(method: &str, path: &str) -> VmValue {
1177 response_with_kind(
1178 404,
1179 BTreeMap::new(),
1180 vm_string(format!("no route for {method} {path}")),
1181 "text",
1182 )
1183}
1184
1185fn unavailable_response(message: &str) -> VmValue {
1186 response_with_kind(503, BTreeMap::new(), vm_string(message), "text")
1187}
1188
1189fn route_template_match(template: &str, path: &str) -> Option<BTreeMap<String, VmValue>> {
1190 let template_segments: Vec<&str> = template.trim_matches('/').split('/').collect();
1191 let path_segments: Vec<&str> = path.trim_matches('/').split('/').collect();
1192 if template == "/" && path == "/" {
1193 return Some(BTreeMap::new());
1194 }
1195 if template_segments.len() != path_segments.len() {
1196 return None;
1197 }
1198 let mut params = BTreeMap::new();
1199 for (tmpl, actual) in template_segments.iter().zip(path_segments.iter()) {
1200 if tmpl.starts_with('{') && tmpl.ends_with('}') && tmpl.len() > 2 {
1201 params.insert(
1202 tmpl[1..tmpl.len() - 1].to_string(),
1203 vm_string(percent_decode(actual)),
1204 );
1205 } else if tmpl.starts_with(':') && tmpl.len() > 1 {
1206 params.insert(tmpl[1..].to_string(), vm_string(percent_decode(actual)));
1207 } else if tmpl != actual {
1208 return None;
1209 }
1210 }
1211 Some(params)
1212}
1213
1214fn matching_route(
1215 server: &HttpServer,
1216 method: &str,
1217 path: &str,
1218) -> Option<(HttpServerRoute, BTreeMap<String, VmValue>)> {
1219 server.routes.iter().find_map(|route| {
1220 if route.method != "*" && !route.method.eq_ignore_ascii_case(method) {
1221 return None;
1222 }
1223 route_template_match(&route.template, path).map(|params| (route.clone(), params))
1224 })
1225}
1226
1227async fn call_server_closure(
1228 closure: &Rc<VmClosure>,
1229 args: &[VmValue],
1230 builtin: &str,
1231) -> Result<VmValue, VmError> {
1232 let mut vm = crate::vm::clone_async_builtin_child_vm()
1233 .ok_or_else(|| vm_error(format!("{builtin}: requires an async builtin VM context")))?;
1234 vm.call_closure_pub(closure, args).await
1235}
1236
1237fn value_is_response(value: &VmValue) -> bool {
1238 matches!(value, VmValue::Dict(dict) if dict.contains_key("status"))
1239}
1240
1241async fn run_http_server_request(server_id: &str, request: VmValue) -> Result<VmValue, VmError> {
1242 let server = HTTP_SERVERS.with(|servers| servers.borrow().get(server_id).cloned());
1243 let Some(server) = server else {
1244 return Err(vm_error(format!(
1245 "http_server_request: unknown server handle '{server_id}'"
1246 )));
1247 };
1248 if server.shutdown {
1249 return Ok(unavailable_response("server is shut down"));
1250 }
1251 if !server.ready {
1252 return Ok(unavailable_response("server is not ready"));
1253 }
1254 if let Some(readiness) = &server.readiness {
1255 let ready = call_server_closure(
1256 readiness,
1257 &[http_server_handle_value(server_id)],
1258 "http_server_request",
1259 )
1260 .await?;
1261 if !ready.is_truthy() {
1262 return Ok(unavailable_response("server is not ready"));
1263 }
1264 }
1265
1266 let input = request.as_dict().cloned().unwrap_or_default();
1267 let method = input
1268 .get("method")
1269 .map(|value| value.display())
1270 .filter(|value| !value.is_empty())
1271 .unwrap_or_else(|| "GET".to_string())
1272 .to_ascii_uppercase();
1273 let raw_path = input
1274 .get("path")
1275 .map(|value| value.display())
1276 .filter(|value| !value.is_empty())
1277 .unwrap_or_else(|| "/".to_string());
1278 let (path, query) = split_path_and_query(&raw_path);
1279 let body_bytes = request_body_bytes(&input);
1280
1281 let Some((route, path_params)) = matching_route(&server, &method, &path) else {
1282 return Ok(not_found_response(&method, &path));
1283 };
1284
1285 let limit = route.max_body_bytes.unwrap_or(server.max_body_bytes);
1286 if body_bytes.len() > limit {
1287 return Ok(body_limit_response(limit, body_bytes.len()));
1288 }
1289 let retain_raw_body = route.retain_raw_body.unwrap_or(server.retain_raw_body);
1290 let mut req = request_value(
1291 &method,
1292 &path,
1293 path_params,
1294 query,
1295 &input,
1296 &body_bytes,
1297 retain_raw_body,
1298 );
1299
1300 for before in &server.before {
1301 let result = call_server_closure(before, &[req.clone()], "http_server_request").await?;
1302 if value_is_response(&result) {
1303 return Ok(normalize_response(result));
1304 }
1305 if !matches!(result, VmValue::Nil) {
1306 req = result;
1307 }
1308 }
1309
1310 let handler_result =
1311 call_server_closure(&route.handler, &[req.clone()], "http_server_request").await?;
1312 let mut response = normalize_response(handler_result);
1313
1314 for after in &server.after {
1315 let result = call_server_closure(
1316 after,
1317 &[response.clone(), req.clone()],
1318 "http_server_request",
1319 )
1320 .await?;
1321 if !matches!(result, VmValue::Nil) {
1322 response = normalize_response(result);
1323 }
1324 }
1325
1326 Ok(response)
1327}
1328
1329pub fn register_http_builtins(vm: &mut Vm) {
1331 vm.register_builtin("http_server_tls_plain", |_args, _out| {
1332 Ok(http_server_tls_config_value(
1333 "plain",
1334 false,
1335 "http",
1336 false,
1337 BTreeMap::new(),
1338 ))
1339 });
1340 vm.register_builtin("http_server_tls_edge", |args, _out| {
1341 let options = get_options_arg(args, 0);
1342 Ok(http_server_tls_config_value(
1343 "edge",
1344 false,
1345 "https",
1346 vm_get_bool_option(&options, "hsts", true),
1347 hsts_options(&options),
1348 ))
1349 });
1350 vm.register_builtin("http_server_tls_pem", |args, _out| {
1351 if args.len() < 2 {
1352 return Err(vm_error(
1353 "http_server_tls_pem: requires cert path and key path",
1354 ));
1355 }
1356 let cert_path = args[0].display();
1357 let key_path = args[1].display();
1358 if !std::path::Path::new(&cert_path).is_file() {
1359 return Err(vm_error(format!(
1360 "http_server_tls_pem: certificate not found: {cert_path}"
1361 )));
1362 }
1363 if !std::path::Path::new(&key_path).is_file() {
1364 return Err(vm_error(format!(
1365 "http_server_tls_pem: private key not found: {key_path}"
1366 )));
1367 }
1368 let mut extra = BTreeMap::new();
1369 extra.insert(
1370 "cert_path".to_string(),
1371 VmValue::String(Rc::from(cert_path)),
1372 );
1373 extra.insert("key_path".to_string(), VmValue::String(Rc::from(key_path)));
1374 Ok(http_server_tls_config_value(
1375 "pem", true, "https", true, extra,
1376 ))
1377 });
1378 vm.register_builtin("http_server_tls_self_signed_dev", |args, _out| {
1379 let hosts = tls_hosts_arg(args.first())?;
1380 let cert = rcgen::generate_simple_self_signed(hosts.clone()).map_err(|error| {
1381 vm_error(format!(
1382 "http_server_tls_self_signed_dev: failed to generate certificate: {error}"
1383 ))
1384 })?;
1385 let mut extra = BTreeMap::new();
1386 extra.insert(
1387 "hosts".to_string(),
1388 VmValue::List(Rc::new(
1389 hosts
1390 .into_iter()
1391 .map(|host| VmValue::String(Rc::from(host)))
1392 .collect(),
1393 )),
1394 );
1395 extra.insert(
1396 "cert_pem".to_string(),
1397 VmValue::String(Rc::from(cert.cert.pem())),
1398 );
1399 extra.insert(
1400 "key_pem".to_string(),
1401 VmValue::String(Rc::from(cert.key_pair.serialize_pem())),
1402 );
1403 Ok(http_server_tls_config_value(
1404 "self_signed_dev",
1405 true,
1406 "https",
1407 false,
1408 extra,
1409 ))
1410 });
1411 vm.register_builtin("http_server_security_headers", |args, _out| {
1412 let Some(VmValue::Dict(config)) = args.first() else {
1413 return Err(vm_error(
1414 "http_server_security_headers: requires a TLS config dict",
1415 ));
1416 };
1417 Ok(VmValue::Dict(Rc::new(http_server_security_headers(config))))
1418 });
1419
1420 vm.register_async_builtin("http_get", |args| async move {
1421 http_verb_handler("GET", false, args).await
1422 });
1423 vm.register_async_builtin("http_post", |args| async move {
1424 http_verb_handler("POST", true, args).await
1425 });
1426 vm.register_async_builtin("http_put", |args| async move {
1427 http_verb_handler("PUT", true, args).await
1428 });
1429 vm.register_async_builtin("http_patch", |args| async move {
1430 http_verb_handler("PATCH", true, args).await
1431 });
1432 vm.register_async_builtin("http_delete", |args| async move {
1433 http_verb_handler("DELETE", false, args).await
1434 });
1435
1436 vm.register_builtin("http_server", |args, _out| {
1439 let options = get_options_arg(args, 0);
1440 let server = HttpServer {
1441 routes: Vec::new(),
1442 before: Vec::new(),
1443 after: Vec::new(),
1444 ready: get_bool_option(&options, "ready", true),
1445 readiness: None,
1446 shutdown_hooks: Vec::new(),
1447 shutdown: false,
1448 max_body_bytes: get_usize_option(
1449 &options,
1450 "max_body_bytes",
1451 DEFAULT_SERVER_MAX_BODY_BYTES,
1452 )?,
1453 retain_raw_body: get_bool_option(&options, "retain_raw_body", true),
1454 };
1455 let id = next_transport_handle("http-server");
1456 HTTP_SERVERS.with(|servers| {
1457 let mut servers = servers.borrow_mut();
1458 if servers.len() >= MAX_HTTP_SERVERS {
1459 return Err(vm_error(format!(
1460 "http_server: maximum open servers ({MAX_HTTP_SERVERS}) reached"
1461 )));
1462 }
1463 servers.insert(id.clone(), server);
1464 Ok(())
1465 })?;
1466 Ok(http_server_handle_value(&id))
1467 });
1468
1469 vm.register_builtin("http_server_route", |args, _out| {
1470 if args.len() < 4 {
1471 return Err(vm_error(
1472 "http_server_route: requires server, method, path template, and handler",
1473 ));
1474 }
1475 let server_id = server_from_value(&args[0], "http_server_route")?;
1476 let method = args[1].display().to_ascii_uppercase();
1477 if method.is_empty() {
1478 return Err(vm_error("http_server_route: method is required"));
1479 }
1480 let template = args[2].display();
1481 if !template.starts_with('/') {
1482 return Err(vm_error(
1483 "http_server_route: path template must start with '/'",
1484 ));
1485 }
1486 let handler = closure_arg(args, 3, "http_server_route")?;
1487 let options = get_options_arg(args, 4);
1488 let route = HttpServerRoute {
1489 method,
1490 template,
1491 handler,
1492 max_body_bytes: get_optional_usize_option(&options, "max_body_bytes")?,
1493 retain_raw_body: match options.get("retain_raw_body") {
1494 Some(VmValue::Bool(value)) => Some(*value),
1495 _ => None,
1496 },
1497 };
1498 HTTP_SERVERS.with(|servers| {
1499 let mut servers = servers.borrow_mut();
1500 let server = servers.get_mut(&server_id).ok_or_else(|| {
1501 vm_error(format!("http_server_route: unknown server '{server_id}'"))
1502 })?;
1503 server.routes.push(route);
1504 Ok::<_, VmError>(())
1505 })?;
1506 Ok(http_server_handle_value(&server_id))
1507 });
1508
1509 vm.register_builtin("http_server_before", |args, _out| {
1510 if args.len() < 2 {
1511 return Err(vm_error("http_server_before: requires server and handler"));
1512 }
1513 let server_id = server_from_value(&args[0], "http_server_before")?;
1514 let handler = closure_arg(args, 1, "http_server_before")?;
1515 HTTP_SERVERS.with(|servers| {
1516 let mut servers = servers.borrow_mut();
1517 let server = servers.get_mut(&server_id).ok_or_else(|| {
1518 vm_error(format!("http_server_before: unknown server '{server_id}'"))
1519 })?;
1520 server.before.push(handler);
1521 Ok::<_, VmError>(())
1522 })?;
1523 Ok(http_server_handle_value(&server_id))
1524 });
1525
1526 vm.register_builtin("http_server_after", |args, _out| {
1527 if args.len() < 2 {
1528 return Err(vm_error("http_server_after: requires server and handler"));
1529 }
1530 let server_id = server_from_value(&args[0], "http_server_after")?;
1531 let handler = closure_arg(args, 1, "http_server_after")?;
1532 HTTP_SERVERS.with(|servers| {
1533 let mut servers = servers.borrow_mut();
1534 let server = servers.get_mut(&server_id).ok_or_else(|| {
1535 vm_error(format!("http_server_after: unknown server '{server_id}'"))
1536 })?;
1537 server.after.push(handler);
1538 Ok::<_, VmError>(())
1539 })?;
1540 Ok(http_server_handle_value(&server_id))
1541 });
1542
1543 vm.register_async_builtin("http_server_request", |args| async move {
1544 if args.len() < 2 {
1545 return Err(vm_error("http_server_request: requires server and request"));
1546 }
1547 let server_id = server_from_value(&args[0], "http_server_request")?;
1548 run_http_server_request(&server_id, args[1].clone()).await
1549 });
1550
1551 vm.register_async_builtin("http_server_test", |args| async move {
1552 if args.len() < 2 {
1553 return Err(vm_error("http_server_test: requires server and request"));
1554 }
1555 let server_id = server_from_value(&args[0], "http_server_test")?;
1556 run_http_server_request(&server_id, args[1].clone()).await
1557 });
1558
1559 vm.register_builtin("http_server_set_ready", |args, _out| {
1560 if args.len() < 2 {
1561 return Err(vm_error(
1562 "http_server_set_ready: requires server and ready bool",
1563 ));
1564 }
1565 let server_id = server_from_value(&args[0], "http_server_set_ready")?;
1566 let ready = matches!(args[1], VmValue::Bool(true));
1567 HTTP_SERVERS.with(|servers| {
1568 let mut servers = servers.borrow_mut();
1569 let server = servers.get_mut(&server_id).ok_or_else(|| {
1570 vm_error(format!(
1571 "http_server_set_ready: unknown server '{server_id}'"
1572 ))
1573 })?;
1574 server.ready = ready;
1575 Ok::<_, VmError>(())
1576 })?;
1577 Ok(VmValue::Bool(ready))
1578 });
1579
1580 vm.register_builtin("http_server_readiness", |args, _out| {
1581 if args.len() < 2 {
1582 return Err(vm_error(
1583 "http_server_readiness: requires server and readiness closure",
1584 ));
1585 }
1586 let server_id = server_from_value(&args[0], "http_server_readiness")?;
1587 let handler = closure_arg(args, 1, "http_server_readiness")?;
1588 HTTP_SERVERS.with(|servers| {
1589 let mut servers = servers.borrow_mut();
1590 let server = servers.get_mut(&server_id).ok_or_else(|| {
1591 vm_error(format!(
1592 "http_server_readiness: unknown server '{server_id}'"
1593 ))
1594 })?;
1595 server.readiness = Some(handler);
1596 Ok::<_, VmError>(())
1597 })?;
1598 Ok(http_server_handle_value(&server_id))
1599 });
1600
1601 vm.register_async_builtin("http_server_ready", |args| async move {
1602 let Some(server_arg) = args.first() else {
1603 return Err(vm_error("http_server_ready: requires server"));
1604 };
1605 let server_id = server_from_value(server_arg, "http_server_ready")?;
1606 let server = HTTP_SERVERS.with(|servers| servers.borrow().get(&server_id).cloned());
1607 let Some(server) = server else {
1608 return Err(vm_error(format!(
1609 "http_server_ready: unknown server '{server_id}'"
1610 )));
1611 };
1612 if server.shutdown {
1613 return Ok(VmValue::Bool(false));
1614 }
1615 let Some(readiness) = server.readiness else {
1616 return Ok(VmValue::Bool(server.ready));
1617 };
1618 let result = call_server_closure(
1619 &readiness,
1620 &[http_server_handle_value(&server_id)],
1621 "http_server_ready",
1622 )
1623 .await?;
1624 Ok(VmValue::Bool(result.is_truthy()))
1625 });
1626
1627 vm.register_builtin("http_server_on_shutdown", |args, _out| {
1628 if args.len() < 2 {
1629 return Err(vm_error(
1630 "http_server_on_shutdown: requires server and handler",
1631 ));
1632 }
1633 let server_id = server_from_value(&args[0], "http_server_on_shutdown")?;
1634 let handler = closure_arg(args, 1, "http_server_on_shutdown")?;
1635 HTTP_SERVERS.with(|servers| {
1636 let mut servers = servers.borrow_mut();
1637 let server = servers.get_mut(&server_id).ok_or_else(|| {
1638 vm_error(format!(
1639 "http_server_on_shutdown: unknown server '{server_id}'"
1640 ))
1641 })?;
1642 server.shutdown_hooks.push(handler);
1643 Ok::<_, VmError>(())
1644 })?;
1645 Ok(http_server_handle_value(&server_id))
1646 });
1647
1648 vm.register_async_builtin("http_server_shutdown", |args| async move {
1649 let Some(server_arg) = args.first() else {
1650 return Err(vm_error("http_server_shutdown: requires server"));
1651 };
1652 let server_id = server_from_value(server_arg, "http_server_shutdown")?;
1653 let hooks = HTTP_SERVERS.with(|servers| {
1654 let mut servers = servers.borrow_mut();
1655 let server = servers.get_mut(&server_id).ok_or_else(|| {
1656 vm_error(format!(
1657 "http_server_shutdown: unknown server '{server_id}'"
1658 ))
1659 })?;
1660 server.shutdown = true;
1661 Ok::<_, VmError>(server.shutdown_hooks.clone())
1662 })?;
1663 for hook in hooks {
1664 let _ = call_server_closure(
1665 &hook,
1666 &[http_server_handle_value(&server_id)],
1667 "http_server_shutdown",
1668 )
1669 .await?;
1670 }
1671 Ok(VmValue::Bool(true))
1672 });
1673
1674 vm.register_builtin("http_response", |args, _out| {
1675 let status = args.first().and_then(VmValue::as_int).unwrap_or(200);
1676 let body = args.get(1).cloned().unwrap_or(VmValue::Nil);
1677 let headers = args
1678 .get(2)
1679 .and_then(VmValue::as_dict)
1680 .cloned()
1681 .unwrap_or_default();
1682 Ok(response_with_kind(status, headers, body, "text"))
1683 });
1684
1685 vm.register_builtin("http_response_text", |args, _out| {
1686 let body = args.first().cloned().unwrap_or(VmValue::Nil);
1687 let options = get_options_arg(args, 1);
1688 let status = options
1689 .get("status")
1690 .and_then(VmValue::as_int)
1691 .unwrap_or(200);
1692 let headers = options
1693 .get("headers")
1694 .and_then(VmValue::as_dict)
1695 .cloned()
1696 .unwrap_or_default();
1697 Ok(response_with_kind(status, headers, body, "text"))
1698 });
1699
1700 vm.register_builtin("http_response_json", |args, _out| {
1701 let body = args
1702 .first()
1703 .map(crate::stdlib::json::vm_value_to_json)
1704 .map(vm_string)
1705 .unwrap_or_else(|| vm_string("null"));
1706 let options = get_options_arg(args, 1);
1707 let status = options
1708 .get("status")
1709 .and_then(VmValue::as_int)
1710 .unwrap_or(200);
1711 let headers = options
1712 .get("headers")
1713 .and_then(VmValue::as_dict)
1714 .cloned()
1715 .unwrap_or_default();
1716 Ok(response_with_kind(status, headers, body, "json"))
1717 });
1718
1719 vm.register_builtin("http_response_bytes", |args, _out| {
1720 let body = match args.first() {
1721 Some(VmValue::Bytes(bytes)) => VmValue::Bytes(bytes.clone()),
1722 Some(value) => VmValue::Bytes(Rc::new(value.display().into_bytes())),
1723 None => VmValue::Bytes(Rc::new(Vec::new())),
1724 };
1725 let options = get_options_arg(args, 1);
1726 let status = options
1727 .get("status")
1728 .and_then(VmValue::as_int)
1729 .unwrap_or(200);
1730 let headers = options
1731 .get("headers")
1732 .and_then(VmValue::as_dict)
1733 .cloned()
1734 .unwrap_or_default();
1735 Ok(response_with_kind(status, headers, body, "bytes"))
1736 });
1737
1738 vm.register_builtin("http_header", |args, _out| {
1739 if args.len() < 2 {
1740 return Err(vm_error(
1741 "http_header: requires headers/request/response and name",
1742 ));
1743 }
1744 let headers = headers_from_value(&args[0]);
1745 Ok(header_lookup_value(&headers, &args[1].display()))
1746 });
1747
1748 vm.register_builtin("http_mock", |args, _out| {
1756 let method = args.first().map(|a| a.display()).unwrap_or_default();
1757 let url_pattern = args.get(1).map(|a| a.display()).unwrap_or_default();
1758 let response = args
1759 .get(2)
1760 .and_then(|a| a.as_dict())
1761 .cloned()
1762 .unwrap_or_default();
1763 let responses = parse_mock_responses(&response);
1764
1765 HTTP_MOCKS.with(|mocks| {
1766 let mut mocks = mocks.borrow_mut();
1767 mocks.retain(|mock| !(mock.method == method && mock.url_pattern == url_pattern));
1768 mocks.push(HttpMock {
1769 method,
1770 url_pattern,
1771 responses,
1772 next_response: 0,
1773 });
1774 });
1775 Ok(VmValue::Nil)
1776 });
1777
1778 vm.register_builtin("http_mock_clear", |_args, _out| {
1780 HTTP_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1781 HTTP_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
1782 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1783 Ok(VmValue::Nil)
1784 });
1785
1786 vm.register_builtin("http_mock_calls", |_args, _out| {
1788 let calls = HTTP_MOCK_CALLS.with(|calls| calls.borrow().clone());
1789 let result: Vec<VmValue> = calls
1790 .iter()
1791 .map(|c| {
1792 let mut dict = BTreeMap::new();
1793 dict.insert(
1794 "method".to_string(),
1795 VmValue::String(Rc::from(c.method.as_str())),
1796 );
1797 dict.insert("url".to_string(), VmValue::String(Rc::from(c.url.as_str())));
1798 dict.insert(
1799 "headers".to_string(),
1800 VmValue::Dict(Rc::new(c.headers.clone())),
1801 );
1802 dict.insert(
1803 "body".to_string(),
1804 match &c.body {
1805 Some(b) => VmValue::String(Rc::from(b.as_str())),
1806 None => VmValue::Nil,
1807 },
1808 );
1809 VmValue::Dict(Rc::new(dict))
1810 })
1811 .collect();
1812 Ok(VmValue::List(Rc::new(result)))
1813 });
1814
1815 vm.register_async_builtin("http_request", |args| async move {
1816 let method = args
1817 .first()
1818 .map(|a| a.display())
1819 .unwrap_or_default()
1820 .to_uppercase();
1821 if method.is_empty() {
1822 return Err(VmError::Thrown(VmValue::String(Rc::from(
1823 "http_request: method is required",
1824 ))));
1825 }
1826 let url = args.get(1).map(|a| a.display()).unwrap_or_default();
1827 if url.is_empty() {
1828 return Err(VmError::Thrown(VmValue::String(Rc::from(
1829 "http_request: URL is required",
1830 ))));
1831 }
1832 let options = match args.get(2) {
1833 Some(VmValue::Dict(d)) => (**d).clone(),
1834 _ => BTreeMap::new(),
1835 };
1836 vm_execute_http_request(&method, &url, &options).await
1837 });
1838
1839 vm.register_async_builtin("http_download", |args| async move {
1840 let url = args.first().map(|a| a.display()).unwrap_or_default();
1841 if url.is_empty() {
1842 return Err(vm_error("http_download: URL is required"));
1843 }
1844 let dst_path = args.get(1).map(|a| a.display()).unwrap_or_default();
1845 if dst_path.is_empty() {
1846 return Err(vm_error("http_download: destination path is required"));
1847 }
1848 let options = get_options_arg(&args, 2);
1849 vm_http_download(&url, &dst_path, &options).await
1850 });
1851
1852 vm.register_async_builtin("http_stream_open", |args| async move {
1853 let url = args.first().map(|a| a.display()).unwrap_or_default();
1854 if url.is_empty() {
1855 return Err(vm_error("http_stream_open: URL is required"));
1856 }
1857 let options = get_options_arg(&args, 1);
1858 vm_http_stream_open(&url, &options).await
1859 });
1860
1861 vm.register_async_builtin("http_stream_read", |args| async move {
1862 let Some(handle) = args.first() else {
1863 return Err(vm_error("http_stream_read: requires a stream handle"));
1864 };
1865 let stream_id = handle_from_value(handle, "http_stream_read")?;
1866 let max_bytes = args
1867 .get(1)
1868 .and_then(|value| value.as_int())
1869 .map(|value| value.max(1) as usize)
1870 .unwrap_or(DEFAULT_MAX_MESSAGE_BYTES);
1871 vm_http_stream_read(&stream_id, max_bytes).await
1872 });
1873
1874 vm.register_builtin("http_stream_info", |args, _out| {
1875 let Some(handle) = args.first() else {
1876 return Err(vm_error("http_stream_info: requires a stream handle"));
1877 };
1878 let stream_id = handle_from_value(handle, "http_stream_info")?;
1879 vm_http_stream_info(&stream_id)
1880 });
1881
1882 vm.register_builtin("http_stream_close", |args, _out| {
1883 let Some(handle) = args.first() else {
1884 return Err(vm_error("http_stream_close: requires a stream handle"));
1885 };
1886 let stream_id = handle_from_value(handle, "http_stream_close")?;
1887 let removed = HTTP_STREAMS.with(|streams| streams.borrow_mut().remove(&stream_id));
1888 Ok(VmValue::Bool(removed.is_some()))
1889 });
1890
1891 vm.register_builtin("http_session", |args, _out| {
1892 let options = get_options_arg(args, 0);
1893 let config = parse_http_options(&options);
1894 let client = build_http_client(&config)?;
1895 let id = next_transport_handle("http-session");
1896 HTTP_SESSIONS.with(|sessions| {
1897 let mut sessions = sessions.borrow_mut();
1898 if sessions.len() >= MAX_HTTP_SESSIONS {
1899 return Err(vm_error(format!(
1900 "http_session: maximum open sessions ({MAX_HTTP_SESSIONS}) reached"
1901 )));
1902 }
1903 sessions.insert(id.clone(), HttpSession { client, options });
1904 Ok(())
1905 })?;
1906 Ok(VmValue::String(Rc::from(id)))
1907 });
1908
1909 vm.register_async_builtin("http_session_request", |args| async move {
1910 if args.len() < 3 {
1911 return Err(vm_error(
1912 "http_session_request: requires session, method, and URL",
1913 ));
1914 }
1915 let session_id = handle_from_value(&args[0], "http_session_request")?;
1916 let method = args[1].display().to_uppercase();
1917 if method.is_empty() {
1918 return Err(vm_error("http_session_request: method is required"));
1919 }
1920 let url = args[2].display();
1921 if url.is_empty() {
1922 return Err(vm_error("http_session_request: URL is required"));
1923 }
1924 let options = get_options_arg(&args, 3);
1925 vm_execute_http_session_request(&session_id, &method, &url, &options).await
1926 });
1927
1928 vm.register_builtin("http_session_close", |args, _out| {
1929 let Some(handle) = args.first() else {
1930 return Err(vm_error("http_session_close: requires a session handle"));
1931 };
1932 let session_id = handle_from_value(handle, "http_session_close")?;
1933 let removed = HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().remove(&session_id));
1934 Ok(VmValue::Bool(removed.is_some()))
1935 });
1936
1937 vm.register_builtin("sse_mock", |args, _out| {
1938 let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1939 if url_pattern.is_empty() {
1940 return Err(vm_error("sse_mock: URL pattern is required"));
1941 }
1942 let events = parse_mock_stream_events(args.get(1));
1943 SSE_MOCKS.with(|mocks| {
1944 mocks.borrow_mut().push(SseMock {
1945 url_pattern,
1946 events,
1947 });
1948 });
1949 Ok(VmValue::Nil)
1950 });
1951
1952 vm.register_async_builtin("sse_connect", |args| async move {
1953 let method = args
1954 .first()
1955 .map(|arg| arg.display())
1956 .filter(|method| !method.is_empty())
1957 .unwrap_or_else(|| "GET".to_string())
1958 .to_uppercase();
1959 let url = args.get(1).map(|arg| arg.display()).unwrap_or_default();
1960 if url.is_empty() {
1961 return Err(vm_error("sse_connect: URL is required"));
1962 }
1963 let options = get_options_arg(&args, 2);
1964 vm_sse_connect(&method, &url, &options).await
1965 });
1966
1967 vm.register_async_builtin("sse_receive", |args| async move {
1968 let Some(handle) = args.first() else {
1969 return Err(vm_error("sse_receive: requires a stream handle"));
1970 };
1971 let stream_id = handle_from_value(handle, "sse_receive")?;
1972 let timeout_ms = receive_timeout_arg(&args, 1);
1973 vm_sse_receive(&stream_id, timeout_ms).await
1974 });
1975
1976 vm.register_builtin("sse_event", |args, _out| {
1977 let Some(event) = args.first() else {
1978 return Err(vm_error("sse_event: requires event data or an event dict"));
1979 };
1980 let options = get_options_arg(args, 1);
1981 Ok(VmValue::String(Rc::from(vm_sse_event_frame(
1982 event, &options,
1983 )?)))
1984 });
1985
1986 vm.register_builtin("sse_server_response", |args, _out| {
1987 let options = get_options_arg(args, 0);
1988 vm_sse_server_response(&options)
1989 });
1990
1991 vm.register_builtin("sse_server_send", |args, _out| {
1992 if args.len() < 2 {
1993 return Err(vm_error("sse_server_send: requires stream and event"));
1994 }
1995 let stream_id = handle_from_value(&args[0], "sse_server_send")?;
1996 let options = get_options_arg(args, 2);
1997 vm_sse_server_send(&stream_id, &args[1], &options)
1998 });
1999
2000 vm.register_builtin("sse_server_heartbeat", |args, _out| {
2001 let Some(handle) = args.first() else {
2002 return Err(vm_error("sse_server_heartbeat: requires a stream handle"));
2003 };
2004 let stream_id = handle_from_value(handle, "sse_server_heartbeat")?;
2005 vm_sse_server_heartbeat(&stream_id, args.get(1))
2006 });
2007
2008 vm.register_builtin("sse_server_flush", |args, _out| {
2009 let Some(handle) = args.first() else {
2010 return Err(vm_error("sse_server_flush: requires a stream handle"));
2011 };
2012 let stream_id = handle_from_value(handle, "sse_server_flush")?;
2013 vm_sse_server_flush(&stream_id)
2014 });
2015
2016 vm.register_builtin("sse_server_close", |args, _out| {
2017 let Some(handle) = args.first() else {
2018 return Err(vm_error("sse_server_close: requires a stream handle"));
2019 };
2020 let stream_id = handle_from_value(handle, "sse_server_close")?;
2021 vm_sse_server_close(&stream_id)
2022 });
2023
2024 vm.register_builtin("sse_server_cancel", |args, _out| {
2025 let Some(handle) = args.first() else {
2026 return Err(vm_error("sse_server_cancel: requires a stream handle"));
2027 };
2028 let stream_id = handle_from_value(handle, "sse_server_cancel")?;
2029 vm_sse_server_cancel(&stream_id, args.get(1))
2030 });
2031
2032 vm.register_builtin("sse_server_status", |args, _out| {
2033 let Some(handle) = args.first() else {
2034 return Err(vm_error("sse_server_status: requires a stream handle"));
2035 };
2036 let stream_id = handle_from_value(handle, "sse_server_status")?;
2037 vm_sse_server_status(&stream_id)
2038 });
2039
2040 vm.register_builtin("sse_server_disconnected", |args, _out| {
2041 let Some(handle) = args.first() else {
2042 return Err(vm_error(
2043 "sse_server_disconnected: requires a stream handle",
2044 ));
2045 };
2046 let stream_id = handle_from_value(handle, "sse_server_disconnected")?;
2047 vm_sse_server_observed_bool(&stream_id, "sse_server_disconnected", |handle| {
2048 handle.disconnected
2049 })
2050 });
2051
2052 vm.register_builtin("sse_server_cancelled", |args, _out| {
2053 let Some(handle) = args.first() else {
2054 return Err(vm_error("sse_server_cancelled: requires a stream handle"));
2055 };
2056 let stream_id = handle_from_value(handle, "sse_server_cancelled")?;
2057 vm_sse_server_observed_bool(&stream_id, "sse_server_cancelled", |handle| {
2058 handle.cancelled
2059 })
2060 });
2061
2062 vm.register_builtin("sse_server_mock_receive", |args, _out| {
2063 let Some(handle) = args.first() else {
2064 return Err(vm_error(
2065 "sse_server_mock_receive: requires a stream handle",
2066 ));
2067 };
2068 let stream_id = handle_from_value(handle, "sse_server_mock_receive")?;
2069 vm_sse_server_mock_receive(&stream_id)
2070 });
2071
2072 vm.register_builtin("sse_server_mock_disconnect", |args, _out| {
2073 let Some(handle) = args.first() else {
2074 return Err(vm_error(
2075 "sse_server_mock_disconnect: requires a stream handle",
2076 ));
2077 };
2078 let stream_id = handle_from_value(handle, "sse_server_mock_disconnect")?;
2079 vm_sse_server_mock_disconnect(&stream_id)
2080 });
2081
2082 vm.register_builtin("sse_close", |args, _out| {
2083 let Some(handle) = args.first() else {
2084 return Err(vm_error("sse_close: requires a stream handle"));
2085 };
2086 let stream_id = handle_from_value(handle, "sse_close")?;
2087 let removed = SSE_HANDLES.with(|handles| {
2088 let mut handles = handles.borrow_mut();
2089 let removed = handles.remove(&stream_id);
2090 if let Some(handle) = &removed {
2091 if let SseHandleKind::Real(stream) = &handle.kind {
2092 if let Ok(mut stream) = stream.try_lock() {
2093 stream.close();
2094 }
2095 }
2096 }
2097 removed
2098 });
2099 Ok(VmValue::Bool(removed.is_some()))
2100 });
2101
2102 vm.register_builtin("websocket_mock", |args, _out| {
2103 let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
2104 if url_pattern.is_empty() {
2105 return Err(vm_error("websocket_mock: URL pattern is required"));
2106 }
2107 let (messages, echo) = parse_websocket_mock(args.get(1));
2108 WEBSOCKET_MOCKS.with(|mocks| {
2109 mocks.borrow_mut().push(WebSocketMock {
2110 url_pattern,
2111 messages,
2112 echo,
2113 });
2114 });
2115 Ok(VmValue::Nil)
2116 });
2117
2118 vm.register_async_builtin("websocket_connect", |args| async move {
2119 let url = args.first().map(|arg| arg.display()).unwrap_or_default();
2120 if url.is_empty() {
2121 return Err(vm_error("websocket_connect: URL is required"));
2122 }
2123 let options = get_options_arg(&args, 1);
2124 vm_websocket_connect(&url, &options).await
2125 });
2126
2127 vm.register_builtin("websocket_server", |args, _out| {
2128 let bind = args
2129 .first()
2130 .map(|arg| arg.display())
2131 .filter(|bind| !bind.is_empty())
2132 .unwrap_or_else(|| "127.0.0.1:0".to_string());
2133 let options = get_options_arg(args, 1);
2134 vm_websocket_server(&bind, &options)
2135 });
2136
2137 vm.register_builtin("websocket_route", |args, _out| {
2138 if args.len() < 2 {
2139 return Err(vm_error(
2140 "websocket_route: requires server handle and route path",
2141 ));
2142 }
2143 let server_id = handle_from_value(&args[0], "websocket_route")?;
2144 let path = args[1].display();
2145 if path.is_empty() || !path.starts_with('/') {
2146 return Err(vm_error("websocket_route: path must start with '/'"));
2147 }
2148 let options = get_options_arg(args, 2);
2149 vm_websocket_route(&server_id, &path, &options)
2150 });
2151
2152 vm.register_async_builtin("websocket_accept", |args| async move {
2153 let Some(handle) = args.first() else {
2154 return Err(vm_error("websocket_accept: requires a server handle"));
2155 };
2156 let server_id = handle_from_value(handle, "websocket_accept")?;
2157 let timeout_ms = receive_timeout_arg(&args, 1);
2158 vm_websocket_accept(&server_id, timeout_ms).await
2159 });
2160
2161 vm.register_async_builtin("websocket_send", |args| async move {
2162 if args.len() < 2 {
2163 return Err(vm_error(
2164 "websocket_send: requires socket handle and message",
2165 ));
2166 }
2167 let socket_id = handle_from_value(&args[0], "websocket_send")?;
2168 let message = args[1].clone();
2169 let options = get_options_arg(&args, 2);
2170 vm_websocket_send(&socket_id, message, &options).await
2171 });
2172
2173 vm.register_async_builtin("websocket_receive", |args| async move {
2174 let Some(handle) = args.first() else {
2175 return Err(vm_error("websocket_receive: requires a socket handle"));
2176 };
2177 let socket_id = handle_from_value(handle, "websocket_receive")?;
2178 let timeout_ms = receive_timeout_arg(&args, 1);
2179 vm_websocket_receive(&socket_id, timeout_ms).await
2180 });
2181
2182 vm.register_async_builtin("websocket_close", |args| async move {
2183 let Some(handle) = args.first() else {
2184 return Err(vm_error("websocket_close: requires a socket handle"));
2185 };
2186 let socket_id = handle_from_value(handle, "websocket_close")?;
2187 vm_websocket_close(&socket_id).await
2188 });
2189
2190 vm.register_builtin("websocket_server_close", |args, _out| {
2191 let Some(handle) = args.first() else {
2192 return Err(vm_error("websocket_server_close: requires a server handle"));
2193 };
2194 let server_id = handle_from_value(handle, "websocket_server_close")?;
2195 vm_websocket_server_close(&server_id)
2196 });
2197
2198 vm.register_builtin("transport_mock_clear", |_args, _out| {
2199 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
2200 SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
2201 SSE_HANDLES.with(|handles| handles.borrow_mut().clear());
2202 WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
2203 WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
2204 WEBSOCKET_SERVERS.with(|servers| {
2205 let mut servers = servers.borrow_mut();
2206 for server in servers.values() {
2207 server.running.store(false, Ordering::SeqCst);
2208 let _ = TcpStream::connect(&server.addr);
2209 }
2210 servers.clear();
2211 });
2212 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
2213 Ok(VmValue::Nil)
2214 });
2215
2216 vm.register_builtin("transport_mock_calls", |_args, _out| {
2217 let calls = TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow().clone());
2218 let values = calls
2219 .iter()
2220 .map(transport_mock_call_value)
2221 .collect::<Vec<_>>();
2222 Ok(VmValue::List(Rc::new(values)))
2223 });
2224}
2225
2226fn http_server_tls_config_value(
2227 mode: &str,
2228 terminate_tls: bool,
2229 scheme: &str,
2230 hsts: bool,
2231 extra: BTreeMap<String, VmValue>,
2232) -> VmValue {
2233 let mut dict = BTreeMap::new();
2234 dict.insert("mode".to_string(), VmValue::String(Rc::from(mode)));
2235 dict.insert("terminate_tls".to_string(), VmValue::Bool(terminate_tls));
2236 dict.insert("scheme".to_string(), VmValue::String(Rc::from(scheme)));
2237 dict.insert("hsts".to_string(), VmValue::Bool(hsts));
2238 for (key, value) in extra {
2239 dict.insert(key, value);
2240 }
2241 VmValue::Dict(Rc::new(dict))
2242}
2243
2244fn hsts_options(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2245 let mut hsts = BTreeMap::new();
2246 hsts.insert(
2247 "hsts_max_age_seconds".to_string(),
2248 VmValue::Int(vm_get_int_option(
2249 options,
2250 "hsts_max_age_seconds",
2251 31_536_000,
2252 )),
2253 );
2254 hsts.insert(
2255 "hsts_include_subdomains".to_string(),
2256 VmValue::Bool(vm_get_bool_option(
2257 options,
2258 "hsts_include_subdomains",
2259 false,
2260 )),
2261 );
2262 hsts.insert(
2263 "hsts_preload".to_string(),
2264 VmValue::Bool(vm_get_bool_option(options, "hsts_preload", false)),
2265 );
2266 hsts
2267}
2268
2269fn http_server_security_headers(config: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2270 let hsts_enabled = vm_get_bool_option(config, "hsts", false);
2271 if !hsts_enabled {
2272 return BTreeMap::new();
2273 }
2274 let mut value = format!(
2275 "max-age={}",
2276 vm_get_int_option(config, "hsts_max_age_seconds", 31_536_000).max(0)
2277 );
2278 if vm_get_bool_option(config, "hsts_include_subdomains", false) {
2279 value.push_str("; includeSubDomains");
2280 }
2281 if vm_get_bool_option(config, "hsts_preload", false) {
2282 value.push_str("; preload");
2283 }
2284 BTreeMap::from([(
2285 "strict-transport-security".to_string(),
2286 VmValue::String(Rc::from(value)),
2287 )])
2288}
2289
2290fn tls_hosts_arg(value: Option<&VmValue>) -> Result<Vec<String>, VmError> {
2291 match value {
2292 None | Some(VmValue::Nil) => Ok(vec!["localhost".to_string(), "127.0.0.1".to_string()]),
2293 Some(VmValue::List(hosts)) => {
2294 let mut parsed = Vec::new();
2295 for host in hosts.iter() {
2296 let host = host.display();
2297 if host.is_empty() {
2298 return Err(vm_error(
2299 "http_server_tls_self_signed_dev: host names must be non-empty",
2300 ));
2301 }
2302 parsed.push(host);
2303 }
2304 if parsed.is_empty() {
2305 return Err(vm_error(
2306 "http_server_tls_self_signed_dev: host list must not be empty",
2307 ));
2308 }
2309 Ok(parsed)
2310 }
2311 Some(other) => {
2312 let host = other.display();
2313 if host.is_empty() {
2314 return Err(vm_error(
2315 "http_server_tls_self_signed_dev: host name must be non-empty",
2316 ));
2317 }
2318 Ok(vec![host])
2319 }
2320 }
2321}
2322
2323fn vm_get_int_option(options: &BTreeMap<String, VmValue>, key: &str, default: i64) -> i64 {
2324 options.get(key).and_then(|v| v.as_int()).unwrap_or(default)
2325}
2326
2327fn vm_get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
2328 match options.get(key) {
2329 Some(VmValue::Bool(b)) => *b,
2330 _ => default,
2331 }
2332}
2333
2334fn vm_get_int_option_prefer(
2335 options: &BTreeMap<String, VmValue>,
2336 canonical: &str,
2337 alias: &str,
2338 default: i64,
2339) -> i64 {
2340 options
2341 .get(canonical)
2342 .and_then(|value| value.as_int())
2343 .or_else(|| options.get(alias).and_then(|value| value.as_int()))
2344 .unwrap_or(default)
2345}
2346
2347fn vm_get_optional_int_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<u64> {
2348 options
2349 .get(key)
2350 .and_then(|value| value.as_int())
2351 .map(|value| value.max(0) as u64)
2352}
2353
2354fn string_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
2355 options
2356 .get(key)
2357 .map(|value| value.display())
2358 .filter(|value| !value.is_empty())
2359}
2360
2361fn parse_proxy_config(options: &BTreeMap<String, VmValue>) -> Option<HttpProxyConfig> {
2362 let proxy = options.get("proxy")?;
2363 let (url, no_proxy) = match proxy {
2364 VmValue::Dict(dict) => (
2365 dict.get("url")
2366 .map(|value| value.display())
2367 .filter(|value| !value.is_empty())?,
2368 dict.get("no_proxy")
2369 .map(|value| value.display())
2370 .filter(|value| !value.is_empty()),
2371 ),
2372 other => (other.display(), None),
2373 };
2374 if url.is_empty() {
2375 return None;
2376 }
2377 let auth = options
2378 .get("proxy_auth")
2379 .and_then(|value| value.as_dict())
2380 .map(|dict| {
2381 (
2382 dict.get("user")
2383 .map(|value| value.display())
2384 .unwrap_or_default(),
2385 dict.get("pass")
2386 .or_else(|| dict.get("password"))
2387 .map(|value| value.display())
2388 .unwrap_or_default(),
2389 )
2390 })
2391 .filter(|(user, _)| !user.is_empty());
2392 Some(HttpProxyConfig {
2393 url,
2394 auth,
2395 no_proxy,
2396 })
2397}
2398
2399fn parse_tls_config(options: &BTreeMap<String, VmValue>) -> HttpTlsConfig {
2400 let Some(tls) = options.get("tls").and_then(|value| value.as_dict()) else {
2401 return HttpTlsConfig::default();
2402 };
2403 let pinned_sha256 = match tls.get("pinned_sha256") {
2404 Some(VmValue::List(values)) => values
2405 .iter()
2406 .map(|value| value.display())
2407 .filter(|value| !value.is_empty())
2408 .collect(),
2409 Some(value) => {
2410 let value = value.display();
2411 if value.is_empty() {
2412 Vec::new()
2413 } else {
2414 vec![value]
2415 }
2416 }
2417 None => Vec::new(),
2418 };
2419 HttpTlsConfig {
2420 ca_bundle_path: string_option(tls, "ca_bundle_path"),
2421 client_cert_path: string_option(tls, "client_cert_path"),
2422 client_key_path: string_option(tls, "client_key_path"),
2423 client_identity_path: string_option(tls, "client_identity_path"),
2424 pinned_sha256,
2425 }
2426}
2427
2428fn parse_retry_statuses(options: &BTreeMap<String, VmValue>) -> Vec<u16> {
2429 match options.get("retry_on") {
2430 Some(VmValue::List(values)) => {
2431 let statuses: Vec<u16> = values
2432 .iter()
2433 .filter_map(|value| value.as_int())
2434 .filter(|status| (0..=u16::MAX as i64).contains(status))
2435 .map(|status| status as u16)
2436 .collect();
2437 if statuses.is_empty() {
2438 DEFAULT_RETRYABLE_STATUSES.to_vec()
2439 } else {
2440 statuses
2441 }
2442 }
2443 _ => DEFAULT_RETRYABLE_STATUSES.to_vec(),
2444 }
2445}
2446
2447fn parse_retry_methods(options: &BTreeMap<String, VmValue>) -> Vec<String> {
2448 match options.get("retry_methods") {
2449 Some(VmValue::List(values)) => {
2450 let methods: Vec<String> = values
2451 .iter()
2452 .map(|value| value.display().trim().to_ascii_uppercase())
2453 .filter(|value| !value.is_empty())
2454 .collect();
2455 if methods.is_empty() {
2456 DEFAULT_RETRYABLE_METHODS
2457 .iter()
2458 .map(|method| (*method).to_string())
2459 .collect()
2460 } else {
2461 methods
2462 }
2463 }
2464 _ => DEFAULT_RETRYABLE_METHODS
2465 .iter()
2466 .map(|method| (*method).to_string())
2467 .collect(),
2468 }
2469}
2470
2471fn parse_http_options(options: &BTreeMap<String, VmValue>) -> HttpRequestConfig {
2472 let total_timeout_ms = vm_get_int_option(options, "total_timeout_ms", -1);
2473 let total_timeout_ms = if total_timeout_ms >= 0 {
2474 total_timeout_ms as u64
2475 } else {
2476 vm_get_int_option_prefer(options, "timeout_ms", "timeout", DEFAULT_TIMEOUT_MS as i64).max(0)
2477 as u64
2478 };
2479 let retry_options = options.get("retry").and_then(|value| value.as_dict());
2480 let retry_max = retry_options
2481 .and_then(|retry| retry.get("max"))
2482 .and_then(|value| value.as_int())
2483 .unwrap_or_else(|| vm_get_int_option(options, "retries", 0))
2484 .max(0) as u32;
2485 let retry_backoff_ms = retry_options
2486 .and_then(|retry| retry.get("backoff_ms"))
2487 .and_then(|value| value.as_int())
2488 .unwrap_or_else(|| vm_get_int_option(options, "backoff", DEFAULT_BACKOFF_MS as i64))
2489 .max(0) as u64;
2490 let respect_retry_after = vm_get_bool_option(options, "respect_retry_after", true);
2491 let follow_redirects = vm_get_bool_option(options, "follow_redirects", true);
2492 let max_redirects = vm_get_int_option(options, "max_redirects", 10).max(0) as usize;
2493
2494 HttpRequestConfig {
2495 total_timeout_ms,
2496 connect_timeout_ms: vm_get_optional_int_option(options, "connect_timeout_ms"),
2497 read_timeout_ms: vm_get_optional_int_option(options, "read_timeout_ms"),
2498 retry: RetryConfig {
2499 max: retry_max,
2500 backoff_ms: retry_backoff_ms,
2501 retryable_statuses: parse_retry_statuses(options),
2502 retryable_methods: parse_retry_methods(options),
2503 respect_retry_after,
2504 },
2505 follow_redirects,
2506 max_redirects,
2507 proxy: parse_proxy_config(options),
2508 tls: parse_tls_config(options),
2509 decompress: vm_get_bool_option(options, "decompress", true),
2510 }
2511}
2512
2513fn http_client_key(config: &HttpRequestConfig) -> String {
2514 format!(
2515 "follow_redirects={};max_redirects={};connect_timeout={:?};read_timeout={:?};proxy={};proxy_auth={};no_proxy={};ca={};client_cert={};client_key={};identity={};pins={};decompress={}",
2516 config.follow_redirects,
2517 config.max_redirects,
2518 config.connect_timeout_ms,
2519 config.read_timeout_ms,
2520 config
2521 .proxy
2522 .as_ref()
2523 .map(|proxy| proxy.url.as_str())
2524 .unwrap_or(""),
2525 config
2526 .proxy
2527 .as_ref()
2528 .and_then(|proxy| proxy.auth.as_ref())
2529 .map(|(user, _)| user.as_str())
2530 .unwrap_or(""),
2531 config
2532 .proxy
2533 .as_ref()
2534 .and_then(|proxy| proxy.no_proxy.as_deref())
2535 .unwrap_or(""),
2536 config.tls.ca_bundle_path.as_deref().unwrap_or(""),
2537 config.tls.client_cert_path.as_deref().unwrap_or(""),
2538 config.tls.client_key_path.as_deref().unwrap_or(""),
2539 config.tls.client_identity_path.as_deref().unwrap_or(""),
2540 config.tls.pinned_sha256.join(","),
2541 config.decompress,
2542 )
2543}
2544
2545fn build_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2546 let redirect_policy = if config.follow_redirects {
2547 let max_redirects = config.max_redirects;
2548 reqwest::redirect::Policy::custom(move |attempt| {
2549 if attempt.previous().len() >= max_redirects {
2550 attempt.error("too many redirects")
2551 } else if crate::egress::redirect_url_allowed("http_redirect", attempt.url().as_str()) {
2552 attempt.follow()
2553 } else {
2554 attempt.error("egress policy blocked redirect target")
2555 }
2556 })
2557 } else {
2558 reqwest::redirect::Policy::none()
2559 };
2560
2561 let mut builder = reqwest::Client::builder().redirect(redirect_policy);
2562 if let Some(ms) = config.connect_timeout_ms {
2563 builder = builder.connect_timeout(Duration::from_millis(ms));
2564 }
2565 if let Some(ms) = config.read_timeout_ms {
2566 builder = builder.read_timeout(Duration::from_millis(ms));
2567 }
2568 if !config.decompress {
2569 builder = builder.no_gzip().no_brotli().no_deflate().no_zstd();
2570 }
2571 if let Some(proxy_config) = &config.proxy {
2572 let mut proxy = reqwest::Proxy::all(&proxy_config.url)
2573 .map_err(|e| vm_error(format!("http: invalid proxy '{}': {e}", proxy_config.url)))?;
2574 if let Some((user, pass)) = &proxy_config.auth {
2575 proxy = proxy.basic_auth(user, pass);
2576 }
2577 if let Some(no_proxy) = &proxy_config.no_proxy {
2578 proxy = proxy.no_proxy(reqwest::NoProxy::from_string(no_proxy));
2579 }
2580 builder = builder.proxy(proxy);
2581 }
2582 builder = configure_tls(builder, &config.tls)?;
2583 builder
2584 .build()
2585 .map_err(|e| vm_error(format!("http: failed to build client: {e}")))
2586}
2587
2588fn configure_tls(
2589 mut builder: reqwest::ClientBuilder,
2590 tls: &HttpTlsConfig,
2591) -> Result<reqwest::ClientBuilder, VmError> {
2592 if let Some(path) = &tls.ca_bundle_path {
2593 let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2594 let bytes = std::fs::read(&resolved).map_err(|error| {
2595 vm_error(format!(
2596 "http: failed to read CA bundle {}: {error}",
2597 resolved.display()
2598 ))
2599 })?;
2600 match reqwest::Certificate::from_pem_bundle(&bytes) {
2601 Ok(certs) => {
2602 for cert in certs {
2603 builder = builder.add_root_certificate(cert);
2604 }
2605 }
2606 Err(pem_error) => {
2607 let cert = reqwest::Certificate::from_der(&bytes).map_err(|der_error| {
2608 vm_error(format!(
2609 "http: failed to parse CA bundle {} as PEM ({pem_error}) or DER ({der_error})",
2610 resolved.display()
2611 ))
2612 })?;
2613 builder = builder.add_root_certificate(cert);
2614 }
2615 }
2616 }
2617
2618 if let Some(path) = &tls.client_identity_path {
2619 let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2620 let bytes = std::fs::read(&resolved).map_err(|error| {
2621 vm_error(format!(
2622 "http: failed to read client identity {}: {error}",
2623 resolved.display()
2624 ))
2625 })?;
2626 let identity = reqwest::Identity::from_pem(&bytes).map_err(|error| {
2627 vm_error(format!(
2628 "http: failed to parse client identity {}: {error}",
2629 resolved.display()
2630 ))
2631 })?;
2632 builder = builder.identity(identity);
2633 } else if let Some(cert_path) = &tls.client_cert_path {
2634 let cert = {
2635 let resolved = resolve_http_path(
2636 "http tls",
2637 cert_path,
2638 crate::stdlib::sandbox::FsAccess::Read,
2639 )?;
2640 std::fs::read(&resolved).map_err(|error| {
2641 vm_error(format!(
2642 "http: failed to read client certificate {}: {error}",
2643 resolved.display()
2644 ))
2645 })?
2646 };
2647 let mut identity_pem = cert;
2648 if let Some(key_path) = &tls.client_key_path {
2649 let resolved =
2650 resolve_http_path("http tls", key_path, crate::stdlib::sandbox::FsAccess::Read)?;
2651 let key = std::fs::read(&resolved).map_err(|error| {
2652 vm_error(format!(
2653 "http: failed to read client key {}: {error}",
2654 resolved.display()
2655 ))
2656 })?;
2657 identity_pem.extend_from_slice(b"\n");
2658 identity_pem.extend_from_slice(&key);
2659 }
2660 let identity = reqwest::Identity::from_pem(&identity_pem)
2661 .map_err(|error| vm_error(format!("http: failed to parse client identity: {error}")))?;
2662 builder = builder.identity(identity);
2663 }
2664
2665 if !tls.pinned_sha256.is_empty() {
2666 builder = builder.tls_info(true);
2667 }
2668 Ok(builder)
2669}
2670
2671fn pooled_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2672 let key = http_client_key(config);
2673 if let Some(client) = HTTP_CLIENTS.with(|clients| clients.borrow().get(&key).cloned()) {
2674 return Ok(client);
2675 }
2676
2677 let client = build_http_client(config)?;
2678 HTTP_CLIENTS.with(|clients| {
2679 clients.borrow_mut().insert(key, client.clone());
2680 });
2681 Ok(client)
2682}
2683
2684fn normalize_pin(value: &str) -> String {
2685 let trimmed = value.trim();
2686 let trimmed = trimmed
2687 .strip_prefix("sha256/")
2688 .or_else(|| trimmed.strip_prefix("sha256:"))
2689 .unwrap_or(trimmed);
2690 let compact = trimmed.replace(':', "");
2691 if !compact.is_empty() && compact.chars().all(|ch| ch.is_ascii_hexdigit()) {
2692 compact.to_ascii_lowercase()
2693 } else {
2694 compact
2695 }
2696}
2697
2698fn verify_tls_pin(response: &reqwest::Response, pins: &[String]) -> Result<(), VmError> {
2699 if pins.is_empty() {
2700 return Ok(());
2701 }
2702 let Some(info) = response.extensions().get::<reqwest::tls::TlsInfo>() else {
2703 return Err(vm_error(
2704 "http: TLS pinning requested but TLS info is unavailable",
2705 ));
2706 };
2707 let Some(cert_der) = info.peer_certificate() else {
2708 return Err(vm_error(
2709 "http: TLS pinning requested but no peer certificate was presented",
2710 ));
2711 };
2712 let (_, cert) = X509Certificate::from_der(cert_der)
2713 .map_err(|error| vm_error(format!("http: failed to parse peer certificate: {error}")))?;
2714 let digest = Sha256::digest(cert.tbs_certificate.subject_pki.raw);
2715 let hex_pin = hex::encode(digest.as_slice());
2716 let base64_pin = base64::engine::general_purpose::STANDARD.encode(digest);
2717 let base64url_pin = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
2718 let wanted = pins
2719 .iter()
2720 .map(|pin| normalize_pin(pin))
2721 .collect::<Vec<_>>();
2722 if wanted
2723 .iter()
2724 .any(|pin| pin == &hex_pin || pin == &base64_pin || pin == &base64url_pin)
2725 {
2726 Ok(())
2727 } else {
2728 Err(vm_error("http: TLS SPKI pin mismatch"))
2729 }
2730}
2731
2732fn parse_http_request_parts(
2733 method: &str,
2734 options: &BTreeMap<String, VmValue>,
2735) -> Result<HttpRequestParts, VmError> {
2736 let req_method = method
2737 .parse::<reqwest::Method>()
2738 .map_err(|e| vm_error(format!("http: invalid method '{method}': {e}")))?;
2739
2740 let mut header_map = reqwest::header::HeaderMap::new();
2741 let mut recorded_headers = BTreeMap::new();
2742
2743 if let Some(auth_val) = options.get("auth") {
2744 match auth_val {
2745 VmValue::String(s) => {
2746 let hv = reqwest::header::HeaderValue::from_str(s)
2747 .map_err(|e| vm_error(format!("http: invalid auth header value: {e}")))?;
2748 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2749 recorded_headers.insert(
2750 "Authorization".to_string(),
2751 VmValue::String(Rc::from(s.as_ref())),
2752 );
2753 }
2754 VmValue::Dict(d) => {
2755 if let Some(bearer) = d.get("bearer") {
2756 let token = bearer.display();
2757 let authorization = format!("Bearer {token}");
2758 let hv = reqwest::header::HeaderValue::from_str(&authorization)
2759 .map_err(|e| vm_error(format!("http: invalid bearer token: {e}")))?;
2760 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2761 recorded_headers.insert(
2762 "Authorization".to_string(),
2763 VmValue::String(Rc::from(authorization)),
2764 );
2765 } else if let Some(VmValue::Dict(basic)) = d.get("basic") {
2766 let user = basic.get("user").map(|v| v.display()).unwrap_or_default();
2767 let password = basic
2768 .get("password")
2769 .map(|v| v.display())
2770 .unwrap_or_default();
2771 use base64::Engine;
2772 let encoded = base64::engine::general_purpose::STANDARD
2773 .encode(format!("{user}:{password}"));
2774 let authorization = format!("Basic {encoded}");
2775 let hv = reqwest::header::HeaderValue::from_str(&authorization)
2776 .map_err(|e| vm_error(format!("http: invalid basic auth: {e}")))?;
2777 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2778 recorded_headers.insert(
2779 "Authorization".to_string(),
2780 VmValue::String(Rc::from(authorization)),
2781 );
2782 }
2783 }
2784 _ => {}
2785 }
2786 }
2787
2788 if let Some(VmValue::Dict(hdrs)) = options.get("headers") {
2789 for (k, v) in hdrs.iter() {
2790 let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())
2791 .map_err(|e| vm_error(format!("http: invalid header name '{k}': {e}")))?;
2792 let val = reqwest::header::HeaderValue::from_str(&v.display())
2793 .map_err(|e| vm_error(format!("http: invalid header value for '{k}': {e}")))?;
2794 header_map.insert(name, val);
2795 recorded_headers.insert(k.clone(), VmValue::String(Rc::from(v.display())));
2796 }
2797 }
2798
2799 let multipart = parse_multipart_request(options)?;
2800 if multipart.is_some() {
2801 if options.contains_key("body") {
2802 return Err(vm_error(
2803 "http: body and multipart options are mutually exclusive",
2804 ));
2805 }
2806 recorded_headers.insert(
2807 "content-type".to_string(),
2808 VmValue::String(Rc::from(format!(
2809 "multipart/form-data; boundary={MULTIPART_MOCK_BOUNDARY}"
2810 ))),
2811 );
2812 }
2813
2814 Ok(HttpRequestParts {
2815 method: req_method,
2816 headers: header_map,
2817 recorded_headers,
2818 body: if multipart.is_some() {
2819 multipart.as_ref().map(|request| request.mock_body.clone())
2820 } else {
2821 options.get("body").map(|v| v.display())
2822 },
2823 multipart,
2824 })
2825}
2826
2827fn session_from_options(options: &BTreeMap<String, VmValue>) -> Option<String> {
2828 options
2829 .get("session")
2830 .and_then(|value| handle_from_value(value, "http_request").ok())
2831}
2832
2833fn parse_mock_stream_event(value: &VmValue) -> MockStreamEvent {
2834 match value {
2835 VmValue::Dict(dict) => MockStreamEvent {
2836 event_type: dict
2837 .get("event")
2838 .or_else(|| dict.get("type"))
2839 .map(|value| value.display())
2840 .filter(|value| !value.is_empty())
2841 .unwrap_or_else(|| "message".to_string()),
2842 data: dict
2843 .get("data")
2844 .map(|value| value.display())
2845 .unwrap_or_default(),
2846 id: dict
2847 .get("id")
2848 .map(|value| value.display())
2849 .filter(|value| !value.is_empty()),
2850 retry_ms: dict.get("retry_ms").and_then(|value| value.as_int()),
2851 },
2852 _ => MockStreamEvent {
2853 event_type: "message".to_string(),
2854 data: value.display(),
2855 id: None,
2856 retry_ms: None,
2857 },
2858 }
2859}
2860
2861fn parse_mock_stream_events(value: Option<&VmValue>) -> Vec<MockStreamEvent> {
2862 let Some(value) = value else {
2863 return Vec::new();
2864 };
2865 match value {
2866 VmValue::Dict(dict) => dict
2867 .get("events")
2868 .and_then(|events| match events {
2869 VmValue::List(items) => Some(items.iter().map(parse_mock_stream_event).collect()),
2870 _ => None,
2871 })
2872 .unwrap_or_default(),
2873 VmValue::List(items) => items.iter().map(parse_mock_stream_event).collect(),
2874 other => vec![parse_mock_stream_event(other)],
2875 }
2876}
2877
2878fn sse_event_value(event: &MockStreamEvent) -> VmValue {
2879 let mut dict = BTreeMap::new();
2880 dict.insert("type".to_string(), VmValue::String(Rc::from("event")));
2881 dict.insert(
2882 "event".to_string(),
2883 VmValue::String(Rc::from(event.event_type.as_str())),
2884 );
2885 dict.insert(
2886 "data".to_string(),
2887 VmValue::String(Rc::from(event.data.as_str())),
2888 );
2889 dict.insert(
2890 "id".to_string(),
2891 event
2892 .id
2893 .as_deref()
2894 .map(|id| VmValue::String(Rc::from(id)))
2895 .unwrap_or(VmValue::Nil),
2896 );
2897 dict.insert(
2898 "retry_ms".to_string(),
2899 event.retry_ms.map(VmValue::Int).unwrap_or(VmValue::Nil),
2900 );
2901 VmValue::Dict(Rc::new(dict))
2902}
2903
2904fn sse_server_response_value(id: &str, handle: &SseServerHandle) -> VmValue {
2905 let mut dict = BTreeMap::new();
2906 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2907 dict.insert(
2908 "type".to_string(),
2909 VmValue::String(Rc::from("sse_response")),
2910 );
2911 dict.insert("status".to_string(), VmValue::Int(handle.status));
2912 dict.insert(
2913 "headers".to_string(),
2914 VmValue::Dict(Rc::new(handle.headers.clone())),
2915 );
2916 dict.insert("body".to_string(), VmValue::Nil);
2917 dict.insert("streaming".to_string(), VmValue::Bool(true));
2918 dict.insert(
2919 "max_event_bytes".to_string(),
2920 VmValue::Int(handle.max_event_bytes as i64),
2921 );
2922 dict.insert(
2923 "max_buffered_events".to_string(),
2924 VmValue::Int(handle.max_buffered_events as i64),
2925 );
2926 VmValue::Dict(Rc::new(dict))
2927}
2928
2929fn default_sse_response_headers() -> BTreeMap<String, VmValue> {
2930 BTreeMap::from([
2931 (
2932 "content-type".to_string(),
2933 VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2934 ),
2935 (
2936 "cache-control".to_string(),
2937 VmValue::String(Rc::from("no-cache")),
2938 ),
2939 (
2940 "connection".to_string(),
2941 VmValue::String(Rc::from("keep-alive")),
2942 ),
2943 (
2944 "x-accel-buffering".to_string(),
2945 VmValue::String(Rc::from("no")),
2946 ),
2947 ])
2948}
2949
2950fn sse_response_headers(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2951 let mut headers = default_sse_response_headers();
2952 if let Some(VmValue::Dict(custom)) = options.get("headers") {
2953 for (name, value) in custom.iter() {
2954 headers.retain(|existing, _| !existing.eq_ignore_ascii_case(name));
2955 headers.insert(name.clone(), VmValue::String(Rc::from(value.display())));
2956 }
2957 }
2958 if !headers
2959 .keys()
2960 .any(|name| name.eq_ignore_ascii_case("content-type"))
2961 {
2962 headers.insert(
2963 "content-type".to_string(),
2964 VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2965 );
2966 }
2967 headers
2968}
2969
2970fn validate_sse_field(field: &str, value: &str) -> Result<(), VmError> {
2971 if value.contains('\n') || value.contains('\r') {
2972 return Err(vm_error(format!(
2973 "sse_event: {field} must not contain newlines"
2974 )));
2975 }
2976 Ok(())
2977}
2978
2979fn push_sse_multiline_field(frame: &mut String, field: &str, value: &str) {
2980 let normalized = value.replace("\r\n", "\n").replace('\r', "\n");
2981 if normalized.is_empty() {
2982 frame.push_str(field);
2983 frame.push_str(": \n");
2984 return;
2985 }
2986 for line in normalized.split('\n') {
2987 frame.push_str(field);
2988 frame.push_str(": ");
2989 frame.push_str(line);
2990 frame.push('\n');
2991 }
2992}
2993
2994fn vm_sse_event_frame(
2995 event: &VmValue,
2996 options: &BTreeMap<String, VmValue>,
2997) -> Result<String, VmError> {
2998 let mut frame = String::new();
2999 let mut has_event_payload = false;
3000
3001 match event {
3002 VmValue::Dict(dict) => {
3003 if let Some(comment) = dict.get("comment").or_else(|| options.get("comment")) {
3004 push_sse_comment(&mut frame, &comment.display());
3005 }
3006 if let Some(id) = dict.get("id").or_else(|| options.get("id")) {
3007 let id = id.display();
3008 validate_sse_field("id", &id)?;
3009 frame.push_str("id: ");
3010 frame.push_str(&id);
3011 frame.push('\n');
3012 }
3013 if let Some(event_type) = dict
3014 .get("event")
3015 .or_else(|| dict.get("name"))
3016 .or_else(|| options.get("event"))
3017 {
3018 let event_type = event_type.display();
3019 validate_sse_field("event", &event_type)?;
3020 frame.push_str("event: ");
3021 frame.push_str(&event_type);
3022 frame.push('\n');
3023 has_event_payload = true;
3024 }
3025 if let Some(retry) = dict
3026 .get("retry")
3027 .or_else(|| dict.get("retry_ms"))
3028 .or_else(|| options.get("retry"))
3029 .or_else(|| options.get("retry_ms"))
3030 {
3031 let retry_ms = retry.as_int().ok_or_else(|| {
3032 vm_error("sse_event: retry/retry_ms must be a non-negative integer")
3033 })?;
3034 if retry_ms < 0 {
3035 return Err(vm_error(
3036 "sse_event: retry/retry_ms must be a non-negative integer",
3037 ));
3038 }
3039 frame.push_str("retry: ");
3040 frame.push_str(&retry_ms.to_string());
3041 frame.push('\n');
3042 has_event_payload = true;
3043 }
3044 if let Some(data) = dict.get("data").or_else(|| options.get("data")) {
3045 push_sse_multiline_field(&mut frame, "data", &data.display());
3046 has_event_payload = true;
3047 } else if !frame.is_empty() && !dict.contains_key("comment") {
3048 push_sse_multiline_field(&mut frame, "data", "");
3049 has_event_payload = true;
3050 }
3051 }
3052 other => {
3053 if let Some(comment) = options.get("comment") {
3054 push_sse_comment(&mut frame, &comment.display());
3055 }
3056 if let Some(id) = options.get("id") {
3057 let id = id.display();
3058 validate_sse_field("id", &id)?;
3059 frame.push_str("id: ");
3060 frame.push_str(&id);
3061 frame.push('\n');
3062 }
3063 if let Some(event_type) = options.get("event") {
3064 let event_type = event_type.display();
3065 validate_sse_field("event", &event_type)?;
3066 frame.push_str("event: ");
3067 frame.push_str(&event_type);
3068 frame.push('\n');
3069 }
3070 if let Some(retry) = options.get("retry").or_else(|| options.get("retry_ms")) {
3071 let retry_ms = retry.as_int().ok_or_else(|| {
3072 vm_error("sse_event: retry/retry_ms must be a non-negative integer")
3073 })?;
3074 if retry_ms < 0 {
3075 return Err(vm_error(
3076 "sse_event: retry/retry_ms must be a non-negative integer",
3077 ));
3078 }
3079 frame.push_str("retry: ");
3080 frame.push_str(&retry_ms.to_string());
3081 frame.push('\n');
3082 }
3083 push_sse_multiline_field(&mut frame, "data", &other.display());
3084 has_event_payload = true;
3085 }
3086 }
3087
3088 if frame.is_empty() || !has_event_payload && !frame.starts_with(':') {
3089 push_sse_comment(&mut frame, "");
3090 }
3091 frame.push('\n');
3092 Ok(frame)
3093}
3094
3095fn push_sse_comment(frame: &mut String, comment: &str) {
3096 let normalized = comment.replace("\r\n", "\n").replace('\r', "\n");
3097 if normalized.is_empty() {
3098 frame.push_str(":\n");
3099 return;
3100 }
3101 for line in normalized.split('\n') {
3102 frame.push_str(": ");
3103 frame.push_str(line);
3104 frame.push('\n');
3105 }
3106}
3107
3108fn vm_sse_server_response(options: &BTreeMap<String, VmValue>) -> Result<VmValue, VmError> {
3109 let id = next_transport_handle("sse-server");
3110 let status = options
3111 .get("status")
3112 .and_then(|value| value.as_int())
3113 .unwrap_or(200)
3114 .clamp(100, 599);
3115 let handle = SseServerHandle {
3116 status,
3117 headers: sse_response_headers(options),
3118 frames: VecDeque::new(),
3119 max_event_bytes: transport_limit_option(
3120 options,
3121 "max_event_bytes",
3122 DEFAULT_MAX_MESSAGE_BYTES,
3123 )
3124 .max(1),
3125 max_buffered_events: transport_limit_option(
3126 options,
3127 "max_buffered_events",
3128 DEFAULT_MAX_STREAM_EVENTS,
3129 )
3130 .max(1),
3131 sent_events: 0,
3132 flushed_events: 0,
3133 closed: false,
3134 disconnected: false,
3135 cancelled: false,
3136 cancel_reason: None,
3137 };
3138 let value = sse_server_response_value(&id, &handle);
3139 SSE_SERVER_HANDLES.with(|handles| {
3140 let mut handles = handles.borrow_mut();
3141 if handles.len() >= MAX_SSE_SERVER_STREAMS {
3142 return Err(vm_error(format!(
3143 "sse_server_response: maximum open streams ({MAX_SSE_SERVER_STREAMS}) reached"
3144 )));
3145 }
3146 handles.insert(id, handle);
3147 Ok(())
3148 })?;
3149 Ok(value)
3150}
3151
3152fn sse_server_status_value(id: &str, handle: &SseServerHandle) -> VmValue {
3153 let mut dict = BTreeMap::new();
3154 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
3155 dict.insert("status".to_string(), VmValue::Int(handle.status));
3156 dict.insert(
3157 "headers".to_string(),
3158 VmValue::Dict(Rc::new(handle.headers.clone())),
3159 );
3160 dict.insert(
3161 "buffered_events".to_string(),
3162 VmValue::Int(handle.frames.len() as i64),
3163 );
3164 dict.insert(
3165 "sent_events".to_string(),
3166 VmValue::Int(handle.sent_events as i64),
3167 );
3168 dict.insert(
3169 "flushed_events".to_string(),
3170 VmValue::Int(handle.flushed_events as i64),
3171 );
3172 dict.insert("closed".to_string(), VmValue::Bool(handle.closed));
3173 dict.insert(
3174 "disconnected".to_string(),
3175 VmValue::Bool(handle.disconnected),
3176 );
3177 dict.insert("cancelled".to_string(), VmValue::Bool(handle.cancelled));
3178 dict.insert(
3179 "cancel_reason".to_string(),
3180 handle
3181 .cancel_reason
3182 .as_deref()
3183 .map(|reason| VmValue::String(Rc::from(reason)))
3184 .unwrap_or(VmValue::Nil),
3185 );
3186 dict.insert(
3187 "max_event_bytes".to_string(),
3188 VmValue::Int(handle.max_event_bytes as i64),
3189 );
3190 dict.insert(
3191 "max_buffered_events".to_string(),
3192 VmValue::Int(handle.max_buffered_events as i64),
3193 );
3194 VmValue::Dict(Rc::new(dict))
3195}
3196
3197fn vm_sse_server_status(stream_id: &str) -> Result<VmValue, VmError> {
3198 SSE_SERVER_HANDLES.with(|handles| {
3199 handles
3200 .borrow()
3201 .get(stream_id)
3202 .map(|handle| sse_server_status_value(stream_id, handle))
3203 .ok_or_else(|| vm_error(format!("sse_server_status: unknown stream '{stream_id}'")))
3204 })
3205}
3206
3207fn vm_sse_server_send(
3208 stream_id: &str,
3209 event: &VmValue,
3210 options: &BTreeMap<String, VmValue>,
3211) -> Result<VmValue, VmError> {
3212 let frame = vm_sse_event_frame(event, options)?;
3213 SSE_SERVER_HANDLES.with(|handles| {
3214 let mut handles = handles.borrow_mut();
3215 let handle = handles
3216 .get_mut(stream_id)
3217 .ok_or_else(|| vm_error(format!("sse_server_send: unknown stream '{stream_id}'")))?;
3218 if handle.closed || handle.cancelled || handle.disconnected {
3219 return Ok(VmValue::Bool(false));
3220 }
3221 if frame.len() > handle.max_event_bytes {
3222 return Err(vm_error(format!(
3223 "sse_server_send: event exceeded max_event_bytes ({})",
3224 handle.max_event_bytes
3225 )));
3226 }
3227 if handle.frames.len() >= handle.max_buffered_events {
3228 return Err(vm_error(format!(
3229 "sse_server_send: buffered events exceeded max_buffered_events ({})",
3230 handle.max_buffered_events
3231 )));
3232 }
3233 handle.frames.push_back(frame);
3234 handle.sent_events += 1;
3235 Ok(VmValue::Bool(true))
3236 })
3237}
3238
3239fn vm_sse_server_heartbeat(stream_id: &str, comment: Option<&VmValue>) -> Result<VmValue, VmError> {
3240 let mut frame = String::new();
3241 push_sse_comment(
3242 &mut frame,
3243 &comment
3244 .map(|value| value.display())
3245 .unwrap_or_else(|| "heartbeat".to_string()),
3246 );
3247 frame.push('\n');
3248 SSE_SERVER_HANDLES.with(|handles| {
3249 let mut handles = handles.borrow_mut();
3250 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3251 vm_error(format!(
3252 "sse_server_heartbeat: unknown stream '{stream_id}'"
3253 ))
3254 })?;
3255 if handle.closed || handle.cancelled || handle.disconnected {
3256 return Ok(VmValue::Bool(false));
3257 }
3258 if frame.len() > handle.max_event_bytes {
3259 return Err(vm_error(format!(
3260 "sse_server_heartbeat: event exceeded max_event_bytes ({})",
3261 handle.max_event_bytes
3262 )));
3263 }
3264 if handle.frames.len() >= handle.max_buffered_events {
3265 return Err(vm_error(format!(
3266 "sse_server_heartbeat: buffered events exceeded max_buffered_events ({})",
3267 handle.max_buffered_events
3268 )));
3269 }
3270 handle.frames.push_back(frame);
3271 handle.sent_events += 1;
3272 Ok(VmValue::Bool(true))
3273 })
3274}
3275
3276fn vm_sse_server_flush(stream_id: &str) -> Result<VmValue, VmError> {
3277 SSE_SERVER_HANDLES.with(|handles| {
3278 let mut handles = handles.borrow_mut();
3279 let handle = handles
3280 .get_mut(stream_id)
3281 .ok_or_else(|| vm_error(format!("sse_server_flush: unknown stream '{stream_id}'")))?;
3282 if handle.disconnected || handle.cancelled {
3283 return Ok(VmValue::Bool(false));
3284 }
3285 handle.flushed_events = handle.sent_events;
3286 Ok(VmValue::Bool(!handle.closed))
3287 })
3288}
3289
3290fn vm_sse_server_close(stream_id: &str) -> Result<VmValue, VmError> {
3291 SSE_SERVER_HANDLES.with(|handles| {
3292 let mut handles = handles.borrow_mut();
3293 let handle = handles
3294 .get_mut(stream_id)
3295 .ok_or_else(|| vm_error(format!("sse_server_close: unknown stream '{stream_id}'")))?;
3296 if handle.closed {
3297 return Ok(VmValue::Bool(false));
3298 }
3299 handle.closed = true;
3300 Ok(VmValue::Bool(true))
3301 })
3302}
3303
3304fn vm_sse_server_cancel(stream_id: &str, reason: Option<&VmValue>) -> Result<VmValue, VmError> {
3305 SSE_SERVER_HANDLES.with(|handles| {
3306 let mut handles = handles.borrow_mut();
3307 let handle = handles
3308 .get_mut(stream_id)
3309 .ok_or_else(|| vm_error(format!("sse_server_cancel: unknown stream '{stream_id}'")))?;
3310 if handle.cancelled {
3311 return Ok(VmValue::Bool(false));
3312 }
3313 handle.cancelled = true;
3314 handle.closed = true;
3315 handle.cancel_reason = reason
3316 .map(|value| value.display())
3317 .filter(|value| !value.is_empty());
3318 Ok(VmValue::Bool(true))
3319 })
3320}
3321
3322fn vm_sse_server_observed_bool(
3323 stream_id: &str,
3324 builtin: &str,
3325 predicate: impl Fn(&SseServerHandle) -> bool,
3326) -> Result<VmValue, VmError> {
3327 SSE_SERVER_HANDLES.with(|handles| {
3328 handles
3329 .borrow()
3330 .get(stream_id)
3331 .map(|handle| VmValue::Bool(predicate(handle)))
3332 .ok_or_else(|| vm_error(format!("{builtin}: unknown stream '{stream_id}'")))
3333 })
3334}
3335
3336fn vm_sse_server_mock_receive(stream_id: &str) -> Result<VmValue, VmError> {
3337 SSE_SERVER_HANDLES.with(|handles| {
3338 let mut handles = handles.borrow_mut();
3339 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3340 vm_error(format!(
3341 "sse_server_mock_receive: unknown stream '{stream_id}'"
3342 ))
3343 })?;
3344 if let Some(frame) = handle.frames.pop_front() {
3345 return Ok(sse_server_mock_frame_value(&frame));
3346 }
3347 if handle.closed || handle.cancelled || handle.disconnected {
3348 return Ok(sse_server_closed_event());
3349 }
3350 Ok(timeout_event())
3351 })
3352}
3353
3354fn vm_sse_server_mock_disconnect(stream_id: &str) -> Result<VmValue, VmError> {
3355 SSE_SERVER_HANDLES.with(|handles| {
3356 let mut handles = handles.borrow_mut();
3357 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3358 vm_error(format!(
3359 "sse_server_mock_disconnect: unknown stream '{stream_id}'"
3360 ))
3361 })?;
3362 if handle.disconnected {
3363 return Ok(VmValue::Bool(false));
3364 }
3365 handle.disconnected = true;
3366 handle.closed = true;
3367 Ok(VmValue::Bool(true))
3368 })
3369}
3370
3371fn sse_server_mock_frame_value(frame: &str) -> VmValue {
3372 let mut event = MockStreamEvent {
3373 event_type: "message".to_string(),
3374 data: String::new(),
3375 id: None,
3376 retry_ms: None,
3377 };
3378 let mut data_lines = Vec::new();
3379 let mut comments = Vec::new();
3380 for raw in frame.lines() {
3381 if raw.is_empty() {
3382 continue;
3383 }
3384 if let Some(comment) = raw.strip_prefix(':') {
3385 comments.push(comment.strip_prefix(' ').unwrap_or(comment).to_string());
3386 continue;
3387 }
3388 let (field, value) = raw.split_once(':').unwrap_or((raw, ""));
3389 let value = value.strip_prefix(' ').unwrap_or(value);
3390 match field {
3391 "event" => event.event_type = value.to_string(),
3392 "data" => data_lines.push(value.to_string()),
3393 "id" => event.id = Some(value.to_string()).filter(|value| !value.is_empty()),
3394 "retry" => event.retry_ms = value.parse::<i64>().ok(),
3395 _ => {}
3396 }
3397 }
3398 if !data_lines.is_empty() {
3399 event.data = data_lines.join("\n");
3400 }
3401 let mut value = if comments.is_empty() || !data_lines.is_empty() {
3402 sse_event_value(&event)
3403 } else {
3404 let mut dict = BTreeMap::new();
3405 dict.insert("type".to_string(), VmValue::String(Rc::from("comment")));
3406 dict.insert(
3407 "comment".to_string(),
3408 VmValue::String(Rc::from(comments.join("\n"))),
3409 );
3410 VmValue::Dict(Rc::new(dict))
3411 };
3412 if let VmValue::Dict(dict) = &mut value {
3413 let mut owned = (**dict).clone();
3414 owned.insert("raw".to_string(), VmValue::String(Rc::from(frame)));
3415 value = VmValue::Dict(Rc::new(owned));
3416 }
3417 value
3418}
3419
3420fn real_sse_event_value(event: SseEvent) -> VmValue {
3421 match event {
3422 SseEvent::Open => {
3423 let mut dict = BTreeMap::new();
3424 dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
3425 VmValue::Dict(Rc::new(dict))
3426 }
3427 SseEvent::Message(message) => {
3428 let retry_ms = message.retry.map(|retry| retry.as_millis() as i64);
3429 sse_event_value(&MockStreamEvent {
3430 event_type: if message.event.is_empty() {
3431 "message".to_string()
3432 } else {
3433 message.event
3434 },
3435 data: message.data,
3436 id: if message.id.is_empty() {
3437 None
3438 } else {
3439 Some(message.id)
3440 },
3441 retry_ms,
3442 })
3443 }
3444 }
3445}
3446
3447fn consume_sse_mock(url: &str) -> Option<Vec<MockStreamEvent>> {
3448 SSE_MOCKS.with(|mocks| {
3449 mocks
3450 .borrow()
3451 .iter()
3452 .find(|mock| url_matches(&mock.url_pattern, url))
3453 .map(|mock| mock.events.clone())
3454 })
3455}
3456
3457fn parse_ws_message(value: &VmValue) -> MockWsMessage {
3458 match value {
3459 VmValue::Dict(dict) => {
3460 let message_type = dict
3461 .get("type")
3462 .map(|value| value.display())
3463 .filter(|value| !value.is_empty())
3464 .unwrap_or_else(|| "text".to_string());
3465 let data = if dict
3466 .get("base64")
3467 .and_then(|value| match value {
3468 VmValue::Bool(value) => Some(*value),
3469 _ => None,
3470 })
3471 .unwrap_or(false)
3472 {
3473 use base64::Engine;
3474 dict.get("data")
3475 .map(|value| value.display())
3476 .and_then(|data| base64::engine::general_purpose::STANDARD.decode(data).ok())
3477 .unwrap_or_default()
3478 } else {
3479 dict.get("data")
3480 .map(|value| value.display().into_bytes())
3481 .unwrap_or_default()
3482 };
3483 MockWsMessage {
3484 message_type,
3485 data,
3486 close_code: dict
3487 .get("code")
3488 .and_then(|value| value.as_int())
3489 .map(|value| value as u16),
3490 close_reason: dict.get("reason").map(|value| value.display()),
3491 }
3492 }
3493 VmValue::Bytes(bytes) => MockWsMessage {
3494 message_type: "binary".to_string(),
3495 data: bytes.as_ref().clone(),
3496 close_code: None,
3497 close_reason: None,
3498 },
3499 other => MockWsMessage {
3500 message_type: "text".to_string(),
3501 data: other.display().into_bytes(),
3502 close_code: None,
3503 close_reason: None,
3504 },
3505 }
3506}
3507
3508fn parse_websocket_mock(value: Option<&VmValue>) -> (Vec<MockWsMessage>, bool) {
3509 let Some(value) = value else {
3510 return (Vec::new(), false);
3511 };
3512 match value {
3513 VmValue::Dict(dict) => {
3514 let echo = dict
3515 .get("echo")
3516 .and_then(|value| match value {
3517 VmValue::Bool(value) => Some(*value),
3518 _ => None,
3519 })
3520 .unwrap_or(false);
3521 let messages = dict
3522 .get("messages")
3523 .and_then(|messages| match messages {
3524 VmValue::List(items) => Some(items.iter().map(parse_ws_message).collect()),
3525 _ => None,
3526 })
3527 .unwrap_or_default();
3528 (messages, echo)
3529 }
3530 VmValue::List(items) => (items.iter().map(parse_ws_message).collect(), false),
3531 other => (vec![parse_ws_message(other)], false),
3532 }
3533}
3534
3535fn consume_websocket_mock(url: &str) -> Option<(Vec<MockWsMessage>, bool)> {
3536 WEBSOCKET_MOCKS.with(|mocks| {
3537 mocks
3538 .borrow()
3539 .iter()
3540 .find(|mock| url_matches(&mock.url_pattern, url))
3541 .map(|mock| (mock.messages.clone(), mock.echo))
3542 })
3543}
3544
3545fn ws_message_data(message: &MockWsMessage) -> String {
3546 match message.message_type.as_str() {
3547 "text" => String::from_utf8_lossy(&message.data).into_owned(),
3548 _ => {
3549 use base64::Engine;
3550 base64::engine::general_purpose::STANDARD.encode(&message.data)
3551 }
3552 }
3553}
3554
3555fn closed_event_with(code: Option<u16>, reason: Option<String>) -> VmValue {
3556 let mut dict = BTreeMap::new();
3557 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
3558 if let Some(code) = code {
3559 dict.insert("code".to_string(), VmValue::Int(i64::from(code)));
3560 }
3561 if let Some(reason) = reason {
3562 dict.insert("reason".to_string(), VmValue::String(Rc::from(reason)));
3563 }
3564 VmValue::Dict(Rc::new(dict))
3565}
3566
3567fn ws_event_value(message: MockWsMessage) -> VmValue {
3568 if message.message_type == "close" {
3569 return closed_event_with(message.close_code, message.close_reason);
3570 }
3571 let mut dict = BTreeMap::new();
3572 dict.insert(
3573 "type".to_string(),
3574 VmValue::String(Rc::from(message.message_type.as_str())),
3575 );
3576 match message.message_type.as_str() {
3577 "text" => {
3578 dict.insert(
3579 "data".to_string(),
3580 VmValue::String(Rc::from(String::from_utf8_lossy(&message.data).as_ref())),
3581 );
3582 }
3583 _ => {
3584 use base64::Engine;
3585 dict.insert(
3586 "data_base64".to_string(),
3587 VmValue::String(Rc::from(
3588 base64::engine::general_purpose::STANDARD
3589 .encode(&message.data)
3590 .as_str(),
3591 )),
3592 );
3593 }
3594 }
3595 VmValue::Dict(Rc::new(dict))
3596}
3597
3598fn real_ws_event_value(message: WsMessage) -> VmValue {
3599 match message {
3600 WsMessage::Text(text) => ws_event_value(MockWsMessage {
3601 message_type: "text".to_string(),
3602 data: text.as_bytes().to_vec(),
3603 close_code: None,
3604 close_reason: None,
3605 }),
3606 WsMessage::Binary(bytes) => ws_event_value(MockWsMessage {
3607 message_type: "binary".to_string(),
3608 data: bytes.to_vec(),
3609 close_code: None,
3610 close_reason: None,
3611 }),
3612 WsMessage::Ping(bytes) => ws_event_value(MockWsMessage {
3613 message_type: "ping".to_string(),
3614 data: bytes.to_vec(),
3615 close_code: None,
3616 close_reason: None,
3617 }),
3618 WsMessage::Pong(bytes) => ws_event_value(MockWsMessage {
3619 message_type: "pong".to_string(),
3620 data: bytes.to_vec(),
3621 close_code: None,
3622 close_reason: None,
3623 }),
3624 WsMessage::Close(frame) => match frame {
3625 Some(frame) => {
3626 closed_event_with(Some(u16::from(frame.code)), Some(frame.reason.to_string()))
3627 }
3628 None => closed_event(),
3629 },
3630 WsMessage::Frame(_) => VmValue::Nil,
3631 }
3632}
3633
3634fn transport_mock_call_value(call: &TransportMockCall) -> VmValue {
3635 let mut dict = BTreeMap::new();
3636 dict.insert(
3637 "kind".to_string(),
3638 VmValue::String(Rc::from(call.kind.as_str())),
3639 );
3640 dict.insert(
3641 "url".to_string(),
3642 VmValue::String(Rc::from(call.url.as_str())),
3643 );
3644 dict.insert(
3645 "handle".to_string(),
3646 call.handle
3647 .as_deref()
3648 .map(|handle| VmValue::String(Rc::from(handle)))
3649 .unwrap_or(VmValue::Nil),
3650 );
3651 dict.insert(
3652 "type".to_string(),
3653 call.message_type
3654 .as_deref()
3655 .map(|message_type| VmValue::String(Rc::from(message_type)))
3656 .unwrap_or(VmValue::Nil),
3657 );
3658 dict.insert(
3659 "data".to_string(),
3660 call.data
3661 .as_deref()
3662 .map(|data| VmValue::String(Rc::from(data)))
3663 .unwrap_or(VmValue::Nil),
3664 );
3665 VmValue::Dict(Rc::new(dict))
3666}
3667
3668fn method_is_retryable(retry: &RetryConfig, method: &reqwest::Method) -> bool {
3669 retry
3670 .retryable_methods
3671 .iter()
3672 .any(|candidate| candidate.eq_ignore_ascii_case(method.as_str()))
3673}
3674
3675fn should_retry_response(
3676 config: &HttpRequestConfig,
3677 method: &reqwest::Method,
3678 status: u16,
3679 attempt: u32,
3680) -> bool {
3681 attempt < config.retry.max
3682 && method_is_retryable(&config.retry, method)
3683 && config.retry.retryable_statuses.contains(&status)
3684}
3685
3686fn should_retry_transport(
3687 config: &HttpRequestConfig,
3688 method: &reqwest::Method,
3689 error: &reqwest::Error,
3690 attempt: u32,
3691) -> bool {
3692 attempt < config.retry.max
3693 && method_is_retryable(&config.retry, method)
3694 && (error.is_timeout() || error.is_connect())
3695}
3696
3697fn parse_retry_after_value(value: &str) -> Option<Duration> {
3698 let value = value.trim();
3699 if value.is_empty() {
3700 return None;
3701 }
3702
3703 if let Ok(secs) = value.parse::<f64>() {
3704 if !secs.is_finite() || secs < 0.0 {
3705 return Some(Duration::from_millis(0));
3706 }
3707 let millis = (secs * 1_000.0) as u64;
3708 return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3709 }
3710
3711 if let Ok(target) = httpdate::parse_http_date(value) {
3712 let millis = target
3713 .duration_since(SystemTime::now())
3714 .map(|delta| delta.as_millis() as u64)
3715 .unwrap_or(0);
3716 return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3717 }
3718
3719 None
3720}
3721
3722fn parse_retry_after_header(value: &reqwest::header::HeaderValue) -> Option<Duration> {
3723 value.to_str().ok().and_then(parse_retry_after_value)
3724}
3725
3726fn mock_retry_after(status: u16, headers: &BTreeMap<String, VmValue>) -> Option<Duration> {
3727 if !(status == 429 || status == 503) {
3728 return None;
3729 }
3730
3731 headers
3732 .iter()
3733 .find(|(name, _)| name.eq_ignore_ascii_case("retry-after"))
3734 .and_then(|(_, value)| parse_retry_after_value(&value.display()))
3735}
3736
3737fn response_retry_after(
3738 status: u16,
3739 headers: &reqwest::header::HeaderMap,
3740 respect_retry_after: bool,
3741) -> Option<Duration> {
3742 if !respect_retry_after || !(status == 429 || status == 503) {
3743 return None;
3744 }
3745 headers
3746 .get(reqwest::header::RETRY_AFTER)
3747 .and_then(parse_retry_after_header)
3748}
3749
3750fn compute_retry_delay(attempt: u32, base_ms: u64, retry_after: Option<Duration>) -> Duration {
3751 use rand::RngExt;
3752
3753 let base_delay = base_ms.saturating_mul(1u64 << attempt.min(30));
3754 let jitter: f64 = rand::rng().random_range(0.75..=1.25);
3755 let exponential_ms = ((base_delay as f64 * jitter) as u64).min(MAX_RETRY_DELAY_MS);
3756 let retry_after_ms = retry_after
3757 .map(|duration| duration.as_millis() as u64)
3758 .unwrap_or(0)
3759 .min(MAX_RETRY_DELAY_MS);
3760 Duration::from_millis(exponential_ms.max(retry_after_ms))
3761}
3762
3763async fn vm_execute_http_request(
3764 method: &str,
3765 url: &str,
3766 options: &BTreeMap<String, VmValue>,
3767) -> Result<VmValue, VmError> {
3768 if let Some(session_id) = session_from_options(options) {
3769 return vm_execute_http_session_request(&session_id, method, url, options).await;
3770 }
3771
3772 let config = parse_http_options(options);
3773 let client = pooled_http_client(&config)?;
3774 vm_execute_http_request_with_client(client, &config, method, url, options).await
3775}
3776
3777async fn vm_execute_http_session_request(
3778 session_id: &str,
3779 method: &str,
3780 url: &str,
3781 options: &BTreeMap<String, VmValue>,
3782) -> Result<VmValue, VmError> {
3783 let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(session_id).cloned());
3784 let Some(session) = session else {
3785 return Err(vm_error(format!(
3786 "http_session_request: unknown HTTP session '{session_id}'"
3787 )));
3788 };
3789 let merged_options = merge_options(&session.options, options);
3790 let config = parse_http_options(&merged_options);
3791 vm_execute_http_request_with_client(session.client, &config, method, url, &merged_options).await
3792}
3793
3794async fn vm_execute_http_request_with_client(
3795 client: reqwest::Client,
3796 config: &HttpRequestConfig,
3797 method: &str,
3798 url: &str,
3799 options: &BTreeMap<String, VmValue>,
3800) -> Result<VmValue, VmError> {
3801 let parts = parse_http_request_parts(method, options)?;
3802
3803 if !url.starts_with("http://") && !url.starts_with("https://") {
3804 return Err(vm_error(format!(
3805 "http: URL must start with http:// or https://, got '{url}'"
3806 )));
3807 }
3808 crate::egress::enforce_url_allowed("http_request", url).await?;
3809
3810 for attempt in 0..=config.retry.max {
3811 if let Some(mock_response) = consume_http_mock(
3812 method,
3813 url,
3814 parts.recorded_headers.clone(),
3815 parts.body.clone(),
3816 ) {
3817 let status = mock_response.status.clamp(0, u16::MAX as i64) as u16;
3818 if should_retry_response(config, &parts.method, status, attempt) {
3819 let retry_after = if config.retry.respect_retry_after {
3820 mock_retry_after(status, &mock_response.headers)
3821 } else {
3822 None
3823 };
3824 tokio::time::sleep(compute_retry_delay(
3825 attempt,
3826 config.retry.backoff_ms,
3827 retry_after,
3828 ))
3829 .await;
3830 continue;
3831 }
3832
3833 return Ok(build_http_response(
3834 mock_response.status,
3835 mock_response.headers,
3836 mock_response.body,
3837 ));
3838 }
3839
3840 let mut req = client.request(parts.method.clone(), url);
3841 req = req
3842 .headers(parts.headers.clone())
3843 .timeout(Duration::from_millis(config.total_timeout_ms));
3844 if let Some(multipart) = &parts.multipart {
3845 req = req.multipart(multipart_form(multipart)?);
3846 } else if let Some(ref b) = parts.body {
3847 req = req.body(b.clone());
3848 }
3849
3850 match req.send().await {
3851 Ok(response) => {
3852 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3853 let status = response.status().as_u16();
3854 if should_retry_response(config, &parts.method, status, attempt) {
3855 let retry_after = response_retry_after(
3856 status,
3857 response.headers(),
3858 config.retry.respect_retry_after,
3859 );
3860 tokio::time::sleep(compute_retry_delay(
3861 attempt,
3862 config.retry.backoff_ms,
3863 retry_after,
3864 ))
3865 .await;
3866 continue;
3867 }
3868
3869 let resp_headers = response_headers(response.headers());
3870
3871 let body_text = response
3872 .text()
3873 .await
3874 .map_err(|e| vm_error(format!("http: failed to read response body: {e}")))?;
3875 return Ok(build_http_response(status as i64, resp_headers, body_text));
3876 }
3877 Err(e) => {
3878 if should_retry_transport(config, &parts.method, &e, attempt) {
3879 tokio::time::sleep(compute_retry_delay(attempt, config.retry.backoff_ms, None))
3880 .await;
3881 continue;
3882 }
3883 return Err(vm_error(format!("http: request failed: {e}")));
3884 }
3885 }
3886 }
3887
3888 Err(vm_error("http: request failed"))
3889}
3890
3891async fn vm_http_download(
3892 url: &str,
3893 dst_path: &str,
3894 options: &BTreeMap<String, VmValue>,
3895) -> Result<VmValue, VmError> {
3896 let method = options
3897 .get("method")
3898 .map(|value| value.display())
3899 .filter(|value| !value.is_empty())
3900 .unwrap_or_else(|| "GET".to_string())
3901 .to_uppercase();
3902 let parts = parse_http_request_parts(&method, options)?;
3903 if let Some(mock_response) = consume_http_mock(
3904 &method,
3905 url,
3906 parts.recorded_headers.clone(),
3907 parts.body.clone(),
3908 ) {
3909 let resolved = resolve_http_path(
3910 "http_download",
3911 dst_path,
3912 crate::stdlib::sandbox::FsAccess::Write,
3913 )?;
3914 std::fs::write(&resolved, mock_response.body.as_bytes()).map_err(|error| {
3915 vm_error(format!(
3916 "http_download: failed to write {}: {error}",
3917 resolved.display()
3918 ))
3919 })?;
3920 return Ok(build_http_download_response(
3921 mock_response.status,
3922 mock_response.headers,
3923 mock_response.body.len() as u64,
3924 ));
3925 }
3926
3927 if !url.starts_with("http://") && !url.starts_with("https://") {
3928 return Err(vm_error(format!(
3929 "http_download: URL must start with http:// or https://, got '{url}'"
3930 )));
3931 }
3932 crate::egress::enforce_url_allowed("http_download", url).await?;
3933 let config = parse_http_options(options);
3934 let client = if let Some(session_id) = session_from_options(options) {
3935 HTTP_SESSIONS
3936 .with(|sessions| sessions.borrow().get(&session_id).cloned())
3937 .map(|session| session.client)
3938 .ok_or_else(|| {
3939 vm_error(format!(
3940 "http_download: unknown HTTP session '{session_id}'"
3941 ))
3942 })?
3943 } else {
3944 pooled_http_client(&config)?
3945 };
3946 let mut request = client
3947 .request(parts.method, url)
3948 .headers(parts.headers)
3949 .timeout(Duration::from_millis(config.total_timeout_ms));
3950 if let Some(multipart) = &parts.multipart {
3951 request = request.multipart(multipart_form(multipart)?);
3952 } else if let Some(body) = parts.body {
3953 request = request.body(body);
3954 }
3955 let mut response = request
3956 .send()
3957 .await
3958 .map_err(|error| vm_error(format!("http_download: request failed: {error}")))?;
3959 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3960 let status = response.status().as_u16() as i64;
3961 let headers = response_headers(response.headers());
3962 let resolved = resolve_http_path(
3963 "http_download",
3964 dst_path,
3965 crate::stdlib::sandbox::FsAccess::Write,
3966 )?;
3967 let mut file = std::fs::File::create(&resolved).map_err(|error| {
3968 vm_error(format!(
3969 "http_download: failed to create {}: {error}",
3970 resolved.display()
3971 ))
3972 })?;
3973 let mut bytes_written = 0_u64;
3974 while let Some(chunk) = response.chunk().await.map_err(|error| {
3975 vm_error(format!(
3976 "http_download: failed to read response body: {error}"
3977 ))
3978 })? {
3979 file.write_all(&chunk).map_err(|error| {
3980 vm_error(format!(
3981 "http_download: failed to write {}: {error}",
3982 resolved.display()
3983 ))
3984 })?;
3985 bytes_written += chunk.len() as u64;
3986 }
3987 Ok(build_http_download_response(status, headers, bytes_written))
3988}
3989
3990async fn vm_http_stream_open(
3991 url: &str,
3992 options: &BTreeMap<String, VmValue>,
3993) -> Result<VmValue, VmError> {
3994 let method = options
3995 .get("method")
3996 .map(|value| value.display())
3997 .filter(|value| !value.is_empty())
3998 .unwrap_or_else(|| "GET".to_string())
3999 .to_uppercase();
4000 let parts = parse_http_request_parts(&method, options)?;
4001 let id = next_transport_handle("http-stream");
4002 if let Some(mock_response) = consume_http_mock(
4003 &method,
4004 url,
4005 parts.recorded_headers.clone(),
4006 parts.body.clone(),
4007 ) {
4008 let handle = HttpStreamHandle {
4009 kind: HttpStreamKind::Fake,
4010 status: mock_response.status,
4011 headers: mock_response.headers,
4012 pending: mock_response.body.into_bytes().into(),
4013 closed: false,
4014 };
4015 HTTP_STREAMS.with(|streams| {
4016 let mut streams = streams.borrow_mut();
4017 if streams.len() >= MAX_HTTP_STREAMS {
4018 return Err(vm_error(format!(
4019 "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
4020 )));
4021 }
4022 streams.insert(id.clone(), handle);
4023 Ok(())
4024 })?;
4025 return Ok(VmValue::String(Rc::from(id)));
4026 }
4027
4028 if !url.starts_with("http://") && !url.starts_with("https://") {
4029 return Err(vm_error(format!(
4030 "http_stream_open: URL must start with http:// or https://, got '{url}'"
4031 )));
4032 }
4033 crate::egress::enforce_url_allowed("http_stream_open", url).await?;
4034 let config = parse_http_options(options);
4035 let client = if let Some(session_id) = session_from_options(options) {
4036 HTTP_SESSIONS
4037 .with(|sessions| sessions.borrow().get(&session_id).cloned())
4038 .map(|session| session.client)
4039 .ok_or_else(|| {
4040 vm_error(format!(
4041 "http_stream_open: unknown HTTP session '{session_id}'"
4042 ))
4043 })?
4044 } else {
4045 pooled_http_client(&config)?
4046 };
4047 let mut request = client
4048 .request(parts.method, url)
4049 .headers(parts.headers)
4050 .timeout(Duration::from_millis(config.total_timeout_ms));
4051 if let Some(multipart) = &parts.multipart {
4052 request = request.multipart(multipart_form(multipart)?);
4053 } else if let Some(body) = parts.body {
4054 request = request.body(body);
4055 }
4056 let response = request
4057 .send()
4058 .await
4059 .map_err(|error| vm_error(format!("http_stream_open: request failed: {error}")))?;
4060 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
4061 let status = response.status().as_u16() as i64;
4062 let headers = response_headers(response.headers());
4063 let handle = HttpStreamHandle {
4064 kind: HttpStreamKind::Real(Rc::new(tokio::sync::Mutex::new(response))),
4065 status,
4066 headers,
4067 pending: VecDeque::new(),
4068 closed: false,
4069 };
4070 HTTP_STREAMS.with(|streams| {
4071 let mut streams = streams.borrow_mut();
4072 if streams.len() >= MAX_HTTP_STREAMS {
4073 return Err(vm_error(format!(
4074 "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
4075 )));
4076 }
4077 streams.insert(id.clone(), handle);
4078 Ok(())
4079 })?;
4080 Ok(VmValue::String(Rc::from(id)))
4081}
4082
4083async fn vm_http_stream_read(stream_id: &str, max_bytes: usize) -> Result<VmValue, VmError> {
4084 let (kind, mut pending, closed) = HTTP_STREAMS
4085 .with(|streams| {
4086 let mut streams = streams.borrow_mut();
4087 let handle = streams.get_mut(stream_id)?;
4088 let kind = match &handle.kind {
4089 HttpStreamKind::Real(response) => HttpStreamKind::Real(response.clone()),
4090 HttpStreamKind::Fake => HttpStreamKind::Fake,
4091 };
4092 let pending = std::mem::take(&mut handle.pending);
4093 Some((kind, pending, handle.closed))
4094 })
4095 .ok_or_else(|| vm_error(format!("http_stream_read: unknown stream '{stream_id}'")))?;
4096 if closed {
4097 return Ok(VmValue::Nil);
4098 }
4099 if pending.is_empty() {
4100 match kind {
4101 HttpStreamKind::Fake => {}
4102 HttpStreamKind::Real(response) => {
4103 let mut response = response.lock().await;
4104 if let Some(chunk) = response.chunk().await.map_err(|error| {
4105 vm_error(format!(
4106 "http_stream_read: failed to read response body: {error}"
4107 ))
4108 })? {
4109 pending.extend(chunk);
4110 }
4111 }
4112 }
4113 }
4114 if pending.is_empty() {
4115 HTTP_STREAMS.with(|streams| {
4116 if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
4117 handle.closed = true;
4118 }
4119 });
4120 return Ok(VmValue::Nil);
4121 }
4122 let take = pending.len().min(max_bytes.max(1));
4123 let chunk = pending.drain(..take).collect::<Vec<_>>();
4124 HTTP_STREAMS.with(|streams| {
4125 if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
4126 handle.pending = pending;
4127 }
4128 });
4129 Ok(VmValue::Bytes(Rc::new(chunk)))
4130}
4131
4132fn vm_http_stream_info(stream_id: &str) -> Result<VmValue, VmError> {
4133 HTTP_STREAMS.with(|streams| {
4134 let streams = streams.borrow();
4135 let handle = streams
4136 .get(stream_id)
4137 .ok_or_else(|| vm_error(format!("http_stream_info: unknown stream '{stream_id}'")))?;
4138 let mut dict = BTreeMap::new();
4139 dict.insert("status".to_string(), VmValue::Int(handle.status));
4140 dict.insert(
4141 "headers".to_string(),
4142 VmValue::Dict(Rc::new(handle.headers.clone())),
4143 );
4144 dict.insert(
4145 "ok".to_string(),
4146 VmValue::Bool((200..300).contains(&(handle.status as u16))),
4147 );
4148 Ok(VmValue::Dict(Rc::new(dict)))
4149 })
4150}
4151
4152async fn vm_sse_connect(
4153 method: &str,
4154 url: &str,
4155 options: &BTreeMap<String, VmValue>,
4156) -> Result<VmValue, VmError> {
4157 let id = next_transport_handle("sse");
4158 let max_events =
4159 transport_limit_option(options, "max_events", DEFAULT_MAX_STREAM_EVENTS).max(1);
4160 let max_message_bytes =
4161 transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4162
4163 if let Some(events) = consume_sse_mock(url) {
4164 let handle = SseHandle {
4165 kind: SseHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeSseStream {
4166 events: events.into(),
4167 opened: false,
4168 closed: false,
4169 }))),
4170 url: url.to_string(),
4171 max_events,
4172 max_message_bytes,
4173 received: 0,
4174 };
4175 SSE_HANDLES.with(|handles| {
4176 let mut handles = handles.borrow_mut();
4177 if handles.len() >= MAX_SSE_STREAMS {
4178 return Err(vm_error(format!(
4179 "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4180 )));
4181 }
4182 handles.insert(id.clone(), handle);
4183 Ok(())
4184 })?;
4185 record_transport_call(TransportMockCall {
4186 kind: "sse_connect".to_string(),
4187 handle: Some(id.clone()),
4188 url: url.to_string(),
4189 message_type: None,
4190 data: None,
4191 });
4192 return Ok(VmValue::String(Rc::from(id)));
4193 }
4194
4195 if !url.starts_with("http://") && !url.starts_with("https://") {
4196 return Err(vm_error(format!(
4197 "sse_connect: URL must start with http:// or https://, got '{url}'"
4198 )));
4199 }
4200 crate::egress::enforce_url_allowed("sse_connect", url).await?;
4201
4202 let config = parse_http_options(options);
4203 let client = if let Some(session_id) = session_from_options(options) {
4204 let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(&session_id).cloned());
4205 session
4206 .map(|session| session.client)
4207 .ok_or_else(|| vm_error(format!("sse_connect: unknown HTTP session '{session_id}'")))?
4208 } else {
4209 pooled_http_client(&config)?
4210 };
4211 let parts = parse_http_request_parts(method, options)?;
4212 let mut request = client
4213 .request(parts.method, url)
4214 .headers(parts.headers)
4215 .timeout(Duration::from_millis(config.total_timeout_ms));
4216 if let Some(body) = parts.body {
4217 request = request.body(body);
4218 }
4219 let stream = EventSource::new(request)
4220 .map_err(|error| vm_error(format!("sse_connect: failed to create stream: {error}")))?;
4221 let handle = SseHandle {
4222 kind: SseHandleKind::Real(Rc::new(tokio::sync::Mutex::new(stream))),
4223 url: url.to_string(),
4224 max_events,
4225 max_message_bytes,
4226 received: 0,
4227 };
4228 SSE_HANDLES.with(|handles| {
4229 let mut handles = handles.borrow_mut();
4230 if handles.len() >= MAX_SSE_STREAMS {
4231 return Err(vm_error(format!(
4232 "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4233 )));
4234 }
4235 handles.insert(id.clone(), handle);
4236 Ok(())
4237 })?;
4238 Ok(VmValue::String(Rc::from(id)))
4239}
4240
4241async fn vm_sse_receive(stream_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4242 let stream = SSE_HANDLES.with(|handles| {
4243 let mut handles = handles.borrow_mut();
4244 let handle = handles.get_mut(stream_id)?;
4245 if handle.received >= handle.max_events {
4246 return Some(Err(vm_error(format!(
4247 "sse_receive: stream '{stream_id}' exceeded max_events"
4248 ))));
4249 }
4250 handle.received += 1;
4251 let url = handle.url.clone();
4252 let max_message_bytes = handle.max_message_bytes;
4253 let kind = match &handle.kind {
4254 SseHandleKind::Real(stream) => SseHandleKind::Real(stream.clone()),
4255 SseHandleKind::Fake(stream) => SseHandleKind::Fake(stream.clone()),
4256 };
4257 Some(Ok((kind, url, max_message_bytes)))
4258 });
4259 let Some(stream) = stream else {
4260 return Err(vm_error(format!(
4261 "sse_receive: unknown stream '{stream_id}'"
4262 )));
4263 };
4264 let (kind, _url, max_message_bytes) = stream?;
4265
4266 match kind {
4267 SseHandleKind::Fake(stream) => {
4268 let mut stream = stream.lock().await;
4269 if stream.closed {
4270 return Ok(VmValue::Nil);
4271 }
4272 if !stream.opened {
4273 stream.opened = true;
4274 let mut dict = BTreeMap::new();
4275 dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
4276 return Ok(VmValue::Dict(Rc::new(dict)));
4277 }
4278 let Some(event) = stream.events.pop_front() else {
4279 stream.closed = true;
4280 return Ok(VmValue::Nil);
4281 };
4282 if event.data.len() > max_message_bytes {
4283 return Err(vm_error(format!(
4284 "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4285 )));
4286 }
4287 Ok(sse_event_value(&event))
4288 }
4289 SseHandleKind::Real(stream) => {
4290 let mut stream = stream.lock().await;
4291 let next = stream.next();
4292 let event = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await {
4293 Ok(Some(Ok(event))) => event,
4294 Ok(Some(Err(error))) => {
4295 return Err(vm_error(format!("sse_receive: stream error: {error}")));
4296 }
4297 Ok(None) => return Ok(VmValue::Nil),
4298 Err(_) => return Ok(timeout_event()),
4299 };
4300 if let SseEvent::Message(message) = &event {
4301 if message.data.len() > max_message_bytes {
4302 stream.close();
4303 return Err(vm_error(format!(
4304 "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4305 )));
4306 }
4307 }
4308 Ok(real_sse_event_value(event))
4309 }
4310 }
4311}
4312
4313fn websocket_route_from_options(path: &str, options: &BTreeMap<String, VmValue>) -> WebSocketRoute {
4314 let bearer_token = options.get("auth").and_then(|auth| match auth {
4315 VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4316 other => {
4317 let value = other.display();
4318 (!value.is_empty()).then_some(value)
4319 }
4320 });
4321 WebSocketRoute {
4322 path: path.to_string(),
4323 bearer_token,
4324 max_messages: transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS)
4325 .max(1),
4326 max_message_bytes: transport_limit_option(
4327 options,
4328 "max_message_bytes",
4329 DEFAULT_MAX_MESSAGE_BYTES,
4330 )
4331 .max(1),
4332 send_buffer_messages: transport_limit_option(options, "send_buffer_messages", 64),
4333 idle_timeout_ms: vm_get_int_option(
4334 options,
4335 "idle_timeout_ms",
4336 DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS as i64,
4337 )
4338 .max(0) as u64,
4339 }
4340}
4341
4342fn vm_websocket_server(
4343 bind: &str,
4344 options: &BTreeMap<String, VmValue>,
4345) -> Result<VmValue, VmError> {
4346 let listener = TcpListener::bind(bind)
4347 .map_err(|error| vm_error(format!("websocket_server: bind failed: {error}")))?;
4348 listener
4349 .set_nonblocking(true)
4350 .map_err(|error| vm_error(format!("websocket_server: nonblocking failed: {error}")))?;
4351 let local_addr = listener
4352 .local_addr()
4353 .map_err(|error| vm_error(format!("websocket_server: local addr failed: {error}")))?;
4354 let id = next_transport_handle("websocket-server");
4355 let addr = local_addr.to_string();
4356 let url = format!("ws://{addr}");
4357 let routes = Arc::new(RwLock::new(HashMap::<String, WebSocketRoute>::new()));
4358 if let Some(path) = options
4359 .get("path")
4360 .map(|value| value.display())
4361 .filter(|path| !path.is_empty())
4362 {
4363 if !path.starts_with('/') {
4364 return Err(vm_error("websocket_server: path must start with '/'"));
4365 }
4366 routes
4367 .write()
4368 .expect("websocket routes poisoned")
4369 .insert(path.clone(), websocket_route_from_options(&path, options));
4370 }
4371 let (event_tx, event_rx) = mpsc::channel();
4372 let running = Arc::new(AtomicBool::new(true));
4373 let server_routes = routes.clone();
4374 let server_running = running.clone();
4375 thread::Builder::new()
4376 .name(format!("harn-ws-{id}"))
4377 .spawn(move || websocket_server_loop(listener, server_routes, event_tx, server_running))
4378 .map_err(|error| vm_error(format!("websocket_server: spawn failed: {error}")))?;
4379 WEBSOCKET_SERVERS.with(|servers| {
4380 let mut servers = servers.borrow_mut();
4381 if servers.len() >= MAX_WEBSOCKET_SERVERS {
4382 running.store(false, Ordering::SeqCst);
4383 let _ = TcpStream::connect(&addr);
4384 return Err(vm_error(format!(
4385 "websocket_server: maximum open servers ({MAX_WEBSOCKET_SERVERS}) reached"
4386 )));
4387 }
4388 servers.insert(
4389 id.clone(),
4390 WebSocketServer {
4391 addr: addr.clone(),
4392 routes,
4393 events: Rc::new(tokio::sync::Mutex::new(event_rx)),
4394 running,
4395 },
4396 );
4397 Ok(())
4398 })?;
4399 let mut dict = BTreeMap::new();
4400 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
4401 dict.insert("addr".to_string(), VmValue::String(Rc::from(addr)));
4402 dict.insert("url".to_string(), VmValue::String(Rc::from(url)));
4403 Ok(VmValue::Dict(Rc::new(dict)))
4404}
4405
4406fn vm_websocket_route(
4407 server_id: &str,
4408 path: &str,
4409 options: &BTreeMap<String, VmValue>,
4410) -> Result<VmValue, VmError> {
4411 let routes = WEBSOCKET_SERVERS.with(|servers| {
4412 servers
4413 .borrow()
4414 .get(server_id)
4415 .map(|server| server.routes.clone())
4416 });
4417 let Some(routes) = routes else {
4418 return Err(vm_error(format!(
4419 "websocket_route: unknown server '{server_id}'"
4420 )));
4421 };
4422 routes.write().expect("websocket routes poisoned").insert(
4423 path.to_string(),
4424 websocket_route_from_options(path, options),
4425 );
4426 Ok(VmValue::Bool(true))
4427}
4428
4429async fn vm_websocket_accept(server_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4430 let receiver = WEBSOCKET_SERVERS.with(|servers| {
4431 servers
4432 .borrow()
4433 .get(server_id)
4434 .map(|server| server.events.clone())
4435 });
4436 let Some(receiver) = receiver else {
4437 return Err(vm_error(format!(
4438 "websocket_accept: unknown server '{server_id}'"
4439 )));
4440 };
4441 let started = std::time::Instant::now();
4442 loop {
4443 let event = {
4444 let receiver = receiver.lock().await;
4445 receiver.try_recv()
4446 };
4447 match event {
4448 Ok(event) => return register_accepted_websocket(event),
4449 Err(mpsc::TryRecvError::Disconnected) => return Ok(VmValue::Nil),
4450 Err(mpsc::TryRecvError::Empty) => {
4451 if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4452 return Ok(timeout_event());
4453 }
4454 tokio::time::sleep(Duration::from_millis(10)).await;
4455 }
4456 }
4457 }
4458}
4459
4460fn register_accepted_websocket(event: WebSocketServerEvent) -> Result<VmValue, VmError> {
4461 let WebSocketServerEvent {
4462 handle,
4463 path,
4464 peer,
4465 headers,
4466 max_messages,
4467 max_message_bytes,
4468 } = event;
4469 let id = next_transport_handle("websocket");
4470 WEBSOCKET_HANDLES.with(|handles| {
4471 let mut handles = handles.borrow_mut();
4472 if handles.len() >= MAX_WEBSOCKETS {
4473 return Err(vm_error(format!(
4474 "websocket_accept: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4475 )));
4476 }
4477 handles.insert(
4478 id.clone(),
4479 WebSocketHandle {
4480 kind: WebSocketHandleKind::Server(Rc::new(tokio::sync::Mutex::new(handle))),
4481 url: path.clone(),
4482 max_messages,
4483 max_message_bytes,
4484 received: 0,
4485 },
4486 );
4487 Ok(())
4488 })?;
4489 let mut metadata = BTreeMap::new();
4490 metadata.insert("id".to_string(), VmValue::String(Rc::from(id)));
4491 metadata.insert("path".to_string(), VmValue::String(Rc::from(path)));
4492 metadata.insert("peer".to_string(), VmValue::String(Rc::from(peer)));
4493 metadata.insert(
4494 "headers".to_string(),
4495 VmValue::Dict(Rc::new(
4496 headers
4497 .into_iter()
4498 .map(|(name, value)| (name, VmValue::String(Rc::from(value))))
4499 .collect(),
4500 )),
4501 );
4502 Ok(VmValue::Dict(Rc::new(metadata)))
4503}
4504
4505fn vm_websocket_server_close(server_id: &str) -> Result<VmValue, VmError> {
4506 let server = WEBSOCKET_SERVERS.with(|servers| servers.borrow_mut().remove(server_id));
4507 let Some(server) = server else {
4508 return Ok(VmValue::Bool(false));
4509 };
4510 server.running.store(false, Ordering::SeqCst);
4511 let _ = TcpStream::connect(&server.addr);
4512 Ok(VmValue::Bool(true))
4513}
4514
4515fn websocket_server_loop(
4516 listener: TcpListener,
4517 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4518 event_tx: mpsc::Sender<WebSocketServerEvent>,
4519 running: Arc<AtomicBool>,
4520) {
4521 while running.load(Ordering::SeqCst) {
4522 match listener.accept() {
4523 Ok((stream, peer)) => {
4524 let routes = routes.clone();
4525 let event_tx = event_tx.clone();
4526 let running = running.clone();
4527 let peer = peer.to_string();
4528 let _ = thread::Builder::new()
4529 .name("harn-ws-conn".to_string())
4530 .spawn(move || {
4531 websocket_connection_thread(stream, peer, routes, event_tx, running);
4532 });
4533 }
4534 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
4535 thread::sleep(Duration::from_millis(10));
4536 }
4537 Err(_) => break,
4538 }
4539 }
4540}
4541
4542fn error_response(status: StatusCode, body: &str) -> ErrorResponse {
4543 let mut response = ErrorResponse::new(Some(body.to_string()));
4544 *response.status_mut() = status;
4545 response
4546}
4547
4548fn route_for_request(
4549 request: &Request,
4550 routes: &Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4551) -> Result<WebSocketRoute, ErrorResponse> {
4552 let path = request.uri().path().to_string();
4553 let Some(route) = routes
4554 .read()
4555 .expect("websocket routes poisoned")
4556 .get(&path)
4557 .cloned()
4558 else {
4559 return Err(error_response(
4560 StatusCode::NOT_FOUND,
4561 "websocket route not found",
4562 ));
4563 };
4564 if let Some(token) = route.bearer_token.as_ref() {
4565 let expected = format!("Bearer {token}");
4566 let authorized = request
4567 .headers()
4568 .get("authorization")
4569 .and_then(|value| value.to_str().ok())
4570 .map(|value| value == expected)
4571 .unwrap_or(false);
4572 if !authorized {
4573 return Err(error_response(
4574 StatusCode::UNAUTHORIZED,
4575 "websocket route unauthorized",
4576 ));
4577 }
4578 }
4579 Ok(route)
4580}
4581
4582fn websocket_connection_thread(
4583 stream: TcpStream,
4584 peer: String,
4585 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4586 event_tx: mpsc::Sender<WebSocketServerEvent>,
4587 running: Arc<AtomicBool>,
4588) {
4589 let accepted_route = Arc::new(std::sync::Mutex::new(
4590 None::<(WebSocketRoute, BTreeMap<String, String>)>,
4591 ));
4592 let callback_route = accepted_route.clone();
4593 let callback =
4594 move |request: &Request, response: Response| -> Result<Response, ErrorResponse> {
4595 let route = route_for_request(request, &routes)?;
4596 let headers = request
4597 .headers()
4598 .iter()
4599 .filter_map(|(name, value)| {
4600 value
4601 .to_str()
4602 .ok()
4603 .map(|value| (name.as_str().to_ascii_lowercase(), value.to_string()))
4604 })
4605 .collect::<BTreeMap<_, _>>();
4606 *callback_route
4607 .lock()
4608 .expect("websocket route metadata poisoned") = Some((route, headers));
4609 Ok(response)
4610 };
4611 let Ok(mut socket) = tokio_tungstenite::tungstenite::accept_hdr(stream, callback) else {
4612 return;
4613 };
4614 let Some((route, headers)) = accepted_route
4615 .lock()
4616 .expect("websocket route metadata poisoned")
4617 .clone()
4618 else {
4619 let _ = socket.close(None);
4620 return;
4621 };
4622 let _ = socket
4623 .get_mut()
4624 .set_read_timeout(Some(Duration::from_millis(50)));
4625 let (incoming_tx, incoming_rx) = mpsc::channel();
4626 let (outgoing_tx, outgoing_rx) = mpsc::sync_channel(route.send_buffer_messages);
4627 let event = WebSocketServerEvent {
4628 handle: ServerWebSocket {
4629 incoming: VecDeque::new(),
4630 incoming_rx,
4631 outgoing_tx,
4632 closed: false,
4633 },
4634 path: route.path.clone(),
4635 peer,
4636 headers,
4637 max_messages: route.max_messages,
4638 max_message_bytes: route.max_message_bytes,
4639 };
4640 if event_tx.send(event).is_err() {
4641 let _ = socket.close(None);
4642 return;
4643 }
4644 let mut last_activity = std::time::Instant::now();
4645 while running.load(Ordering::SeqCst) {
4646 while let Ok(command) = outgoing_rx.try_recv() {
4647 match command {
4648 ServerWebSocketCommand::Send(message) => {
4649 let Ok(message) = real_ws_message(&message) else {
4650 continue;
4651 };
4652 if socket.send(message).is_err() {
4653 return;
4654 }
4655 last_activity = std::time::Instant::now();
4656 }
4657 ServerWebSocketCommand::Close(code, reason) => {
4658 let _ = socket.close(close_frame(code, reason));
4659 return;
4660 }
4661 }
4662 }
4663 if route.idle_timeout_ms > 0
4664 && last_activity.elapsed() >= Duration::from_millis(route.idle_timeout_ms)
4665 {
4666 let _ = socket.close(close_frame(Some(1001), Some("idle timeout".to_string())));
4667 let _ = incoming_tx.send(MockWsMessage {
4668 message_type: "close".to_string(),
4669 data: Vec::new(),
4670 close_code: Some(1001),
4671 close_reason: Some("idle timeout".to_string()),
4672 });
4673 return;
4674 }
4675 match socket.read() {
4676 Ok(message) => {
4677 last_activity = std::time::Instant::now();
4678 if incoming_tx
4679 .send(mock_ws_message_from_real(message))
4680 .is_err()
4681 {
4682 return;
4683 }
4684 }
4685 Err(tokio_tungstenite::tungstenite::Error::Io(error))
4686 if error.kind() == std::io::ErrorKind::WouldBlock
4687 || error.kind() == std::io::ErrorKind::TimedOut =>
4688 {
4689 continue;
4690 }
4691 Err(_) => {
4692 let _ = incoming_tx.send(MockWsMessage {
4693 message_type: "close".to_string(),
4694 data: Vec::new(),
4695 close_code: None,
4696 close_reason: None,
4697 });
4698 return;
4699 }
4700 }
4701 }
4702 let _ = socket.get_mut().shutdown(Shutdown::Both);
4703}
4704
4705async fn vm_websocket_connect(
4706 url: &str,
4707 options: &BTreeMap<String, VmValue>,
4708) -> Result<VmValue, VmError> {
4709 let id = next_transport_handle("websocket");
4710 let max_messages =
4711 transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS).max(1);
4712 let max_message_bytes =
4713 transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4714
4715 if let Some((messages, echo)) = consume_websocket_mock(url) {
4716 let handle = WebSocketHandle {
4717 kind: WebSocketHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeWebSocket {
4718 messages: messages.into(),
4719 echo,
4720 closed: false,
4721 }))),
4722 url: url.to_string(),
4723 max_messages,
4724 max_message_bytes,
4725 received: 0,
4726 };
4727 WEBSOCKET_HANDLES.with(|handles| {
4728 let mut handles = handles.borrow_mut();
4729 if handles.len() >= MAX_WEBSOCKETS {
4730 return Err(vm_error(format!(
4731 "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4732 )));
4733 }
4734 handles.insert(id.clone(), handle);
4735 Ok(())
4736 })?;
4737 record_transport_call(TransportMockCall {
4738 kind: "websocket_connect".to_string(),
4739 handle: Some(id.clone()),
4740 url: url.to_string(),
4741 message_type: None,
4742 data: None,
4743 });
4744 return Ok(VmValue::String(Rc::from(id)));
4745 }
4746
4747 if !url.starts_with("ws://") && !url.starts_with("wss://") {
4748 return Err(vm_error(format!(
4749 "websocket_connect: URL must start with ws:// or wss://, got '{url}'"
4750 )));
4751 }
4752 crate::egress::enforce_url_allowed("websocket_connect", url).await?;
4753 let timeout_ms = vm_get_int_option_prefer(
4754 options,
4755 "timeout_ms",
4756 "timeout",
4757 DEFAULT_TIMEOUT_MS as i64,
4758 )
4759 .max(0) as u64;
4760 let request = websocket_client_request(url, options)?;
4761 let connect = tokio_tungstenite::connect_async(request);
4762 let (socket, _) = tokio::time::timeout(Duration::from_millis(timeout_ms), connect)
4763 .await
4764 .map_err(|_| vm_error(format!("websocket_connect: timed out after {timeout_ms}ms")))?
4765 .map_err(|error| vm_error(format!("websocket_connect: failed: {error}")))?;
4766 let handle = WebSocketHandle {
4767 kind: WebSocketHandleKind::Real(Rc::new(tokio::sync::Mutex::new(socket))),
4768 url: url.to_string(),
4769 max_messages,
4770 max_message_bytes,
4771 received: 0,
4772 };
4773 WEBSOCKET_HANDLES.with(|handles| {
4774 let mut handles = handles.borrow_mut();
4775 if handles.len() >= MAX_WEBSOCKETS {
4776 return Err(vm_error(format!(
4777 "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4778 )));
4779 }
4780 handles.insert(id.clone(), handle);
4781 Ok(())
4782 })?;
4783 Ok(VmValue::String(Rc::from(id)))
4784}
4785
4786fn websocket_message_from_vm(
4787 value: VmValue,
4788 options: &BTreeMap<String, VmValue>,
4789) -> Result<MockWsMessage, VmError> {
4790 let message_type = options
4791 .get("type")
4792 .map(|value| value.display())
4793 .filter(|value| !value.is_empty())
4794 .unwrap_or_else(|| match value {
4795 VmValue::Bytes(_) => "binary".to_string(),
4796 _ => "text".to_string(),
4797 });
4798 let data = match value {
4799 VmValue::Bytes(bytes) => bytes.as_ref().clone(),
4800 other
4801 if options
4802 .get("base64")
4803 .and_then(|value| match value {
4804 VmValue::Bool(value) => Some(*value),
4805 _ => None,
4806 })
4807 .unwrap_or(false) =>
4808 {
4809 use base64::Engine;
4810 base64::engine::general_purpose::STANDARD
4811 .decode(other.display())
4812 .map_err(|error| vm_error(format!("websocket_send: invalid base64: {error}")))?
4813 }
4814 other => other.display().into_bytes(),
4815 };
4816 Ok(MockWsMessage {
4817 message_type,
4818 data,
4819 close_code: options
4820 .get("code")
4821 .or_else(|| options.get("close_code"))
4822 .and_then(|value| value.as_int())
4823 .map(|value| value as u16),
4824 close_reason: options
4825 .get("reason")
4826 .or_else(|| options.get("close_reason"))
4827 .map(|value| value.display()),
4828 })
4829}
4830
4831fn websocket_client_request(
4832 url: &str,
4833 options: &BTreeMap<String, VmValue>,
4834) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, VmError> {
4835 let mut request = url
4836 .into_client_request()
4837 .map_err(|error| vm_error(format!("websocket_connect: invalid request: {error}")))?;
4838 if let Some(headers) = options.get("headers").and_then(|value| value.as_dict()) {
4839 for (name, value) in headers {
4840 let header_name = tokio_tungstenite::tungstenite::http::header::HeaderName::from_bytes(
4841 name.as_bytes(),
4842 )
4843 .map_err(|error| {
4844 vm_error(format!(
4845 "websocket_connect: invalid header name '{name}': {error}"
4846 ))
4847 })?;
4848 let header_value = HeaderValue::from_str(&value.display()).map_err(|error| {
4849 vm_error(format!(
4850 "websocket_connect: invalid header value for '{name}': {error}"
4851 ))
4852 })?;
4853 request.headers_mut().insert(header_name, header_value);
4854 }
4855 }
4856 if let Some(auth) = options.get("auth") {
4857 let bearer = match auth {
4858 VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4859 other => Some(other.display()),
4860 };
4861 if let Some(token) = bearer.filter(|token| !token.is_empty()) {
4862 let value = HeaderValue::from_str(&format!("Bearer {token}")).map_err(|error| {
4863 vm_error(format!(
4864 "websocket_connect: invalid authorization header: {error}"
4865 ))
4866 })?;
4867 request.headers_mut().insert("authorization", value);
4868 }
4869 }
4870 Ok(request)
4871}
4872
4873fn close_frame(code: Option<u16>, reason: Option<String>) -> Option<CloseFrame> {
4874 code.or(reason.as_ref().map(|_| 1000))
4875 .map(|code| CloseFrame {
4876 code: CloseCode::from(code),
4877 reason: reason.unwrap_or_default().into(),
4878 })
4879}
4880
4881fn real_ws_message(message: &MockWsMessage) -> Result<WsMessage, VmError> {
4882 match message.message_type.as_str() {
4883 "text" => Ok(WsMessage::Text(
4884 String::from_utf8(message.data.clone())
4885 .map_err(|error| vm_error(format!("websocket_send: text is not UTF-8: {error}")))?
4886 .into(),
4887 )),
4888 "binary" => Ok(WsMessage::Binary(message.data.clone().into())),
4889 "ping" => Ok(WsMessage::Ping(message.data.clone().into())),
4890 "pong" => Ok(WsMessage::Pong(message.data.clone().into())),
4891 "close" => Ok(WsMessage::Close(close_frame(
4892 message.close_code,
4893 message.close_reason.clone(),
4894 ))),
4895 other => Err(vm_error(format!(
4896 "websocket_send: unsupported message type '{other}'"
4897 ))),
4898 }
4899}
4900
4901fn mock_ws_message_from_real(message: WsMessage) -> MockWsMessage {
4902 match message {
4903 WsMessage::Text(text) => MockWsMessage {
4904 message_type: "text".to_string(),
4905 data: text.as_bytes().to_vec(),
4906 close_code: None,
4907 close_reason: None,
4908 },
4909 WsMessage::Binary(bytes) => MockWsMessage {
4910 message_type: "binary".to_string(),
4911 data: bytes.to_vec(),
4912 close_code: None,
4913 close_reason: None,
4914 },
4915 WsMessage::Ping(bytes) => MockWsMessage {
4916 message_type: "ping".to_string(),
4917 data: bytes.to_vec(),
4918 close_code: None,
4919 close_reason: None,
4920 },
4921 WsMessage::Pong(bytes) => MockWsMessage {
4922 message_type: "pong".to_string(),
4923 data: bytes.to_vec(),
4924 close_code: None,
4925 close_reason: None,
4926 },
4927 WsMessage::Close(frame) => {
4928 let (close_code, close_reason) = frame
4929 .map(|frame| (Some(u16::from(frame.code)), Some(frame.reason.to_string())))
4930 .unwrap_or((None, None));
4931 MockWsMessage {
4932 message_type: "close".to_string(),
4933 data: Vec::new(),
4934 close_code,
4935 close_reason,
4936 }
4937 }
4938 WsMessage::Frame(_) => MockWsMessage {
4939 message_type: "close".to_string(),
4940 data: Vec::new(),
4941 close_code: None,
4942 close_reason: None,
4943 },
4944 }
4945}
4946
4947async fn vm_websocket_send(
4948 socket_id: &str,
4949 value: VmValue,
4950 options: &BTreeMap<String, VmValue>,
4951) -> Result<VmValue, VmError> {
4952 let message = websocket_message_from_vm(value, options)?;
4953 let socket = WEBSOCKET_HANDLES.with(|handles| {
4954 let handles = handles.borrow();
4955 let handle = handles.get(socket_id)?;
4956 let url = handle.url.clone();
4957 let max_message_bytes = handle.max_message_bytes;
4958 let kind = match &handle.kind {
4959 WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4960 WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4961 WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4962 };
4963 Some((kind, url, max_message_bytes))
4964 });
4965 let Some((kind, url, max_message_bytes)) = socket else {
4966 return Err(vm_error(format!(
4967 "websocket_send: unknown socket '{socket_id}'"
4968 )));
4969 };
4970 if message.data.len() > max_message_bytes {
4971 return Err(vm_error(format!(
4972 "websocket_send: message exceeded max_message_bytes ({max_message_bytes})"
4973 )));
4974 }
4975 match kind {
4976 WebSocketHandleKind::Fake(socket) => {
4977 let mut socket = socket.lock().await;
4978 if socket.closed {
4979 return Ok(VmValue::Bool(false));
4980 }
4981 if message.message_type == "close" {
4982 socket.closed = true;
4983 } else if socket.echo {
4984 socket.messages.push_back(message.clone());
4985 }
4986 record_transport_call(TransportMockCall {
4987 kind: "websocket_send".to_string(),
4988 handle: Some(socket_id.to_string()),
4989 url,
4990 message_type: Some(message.message_type.clone()),
4991 data: Some(ws_message_data(&message)),
4992 });
4993 Ok(VmValue::Bool(true))
4994 }
4995 WebSocketHandleKind::Real(socket) => {
4996 let mut socket = socket.lock().await;
4997 socket
4998 .send(real_ws_message(&message)?)
4999 .await
5000 .map_err(|error| vm_error(format!("websocket_send: failed: {error}")))?;
5001 Ok(VmValue::Bool(true))
5002 }
5003 WebSocketHandleKind::Server(socket) => {
5004 let mut socket = socket.lock().await;
5005 if socket.closed {
5006 return Ok(VmValue::Bool(false));
5007 }
5008 let command = if message.message_type == "close" {
5009 socket.closed = true;
5010 ServerWebSocketCommand::Close(message.close_code, message.close_reason.clone())
5011 } else {
5012 ServerWebSocketCommand::Send(message.clone())
5013 };
5014 socket
5015 .outgoing_tx
5016 .try_send(command)
5017 .map_err(|error| match error {
5018 mpsc::TrySendError::Full(_) => vm_error("websocket_send: send buffer full"),
5019 mpsc::TrySendError::Disconnected(_) => {
5020 vm_error("websocket_send: connection closed")
5021 }
5022 })?;
5023 Ok(VmValue::Bool(true))
5024 }
5025 }
5026}
5027
5028async fn vm_websocket_receive(socket_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
5029 let socket = WEBSOCKET_HANDLES.with(|handles| {
5030 let mut handles = handles.borrow_mut();
5031 let handle = handles.get_mut(socket_id)?;
5032 if handle.received >= handle.max_messages {
5033 return Some(Err(vm_error(format!(
5034 "websocket_receive: socket '{socket_id}' exceeded max_messages"
5035 ))));
5036 }
5037 handle.received += 1;
5038 let max_message_bytes = handle.max_message_bytes;
5039 let kind = match &handle.kind {
5040 WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
5041 WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
5042 WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
5043 };
5044 Some(Ok((kind, max_message_bytes)))
5045 });
5046 let Some(socket) = socket else {
5047 return Err(vm_error(format!(
5048 "websocket_receive: unknown socket '{socket_id}'"
5049 )));
5050 };
5051 let (kind, max_message_bytes) = socket?;
5052 match kind {
5053 WebSocketHandleKind::Fake(socket) => {
5054 let mut socket = socket.lock().await;
5055 if socket.closed {
5056 return Ok(VmValue::Nil);
5057 }
5058 let Some(message) = socket.messages.pop_front() else {
5059 return Ok(timeout_event());
5060 };
5061 if message.data.len() > max_message_bytes {
5062 socket.closed = true;
5063 return Err(vm_error(format!(
5064 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5065 )));
5066 }
5067 if message.message_type == "close" {
5068 socket.closed = true;
5069 }
5070 Ok(ws_event_value(message))
5071 }
5072 WebSocketHandleKind::Real(socket) => {
5073 let mut socket = socket.lock().await;
5074 let next = socket.next();
5075 let message = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await
5076 {
5077 Ok(Some(Ok(message))) => message,
5078 Ok(Some(Err(error))) => {
5079 return Err(vm_error(format!("websocket_receive: failed: {error}")));
5080 }
5081 Ok(None) => return Ok(VmValue::Nil),
5082 Err(_) => return Ok(timeout_event()),
5083 };
5084 match &message {
5085 WsMessage::Text(text) if text.len() > max_message_bytes => {
5086 return Err(vm_error(format!(
5087 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5088 )));
5089 }
5090 WsMessage::Binary(bytes) | WsMessage::Ping(bytes) | WsMessage::Pong(bytes)
5091 if bytes.len() > max_message_bytes =>
5092 {
5093 return Err(vm_error(format!(
5094 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5095 )));
5096 }
5097 _ => {}
5098 }
5099 Ok(real_ws_event_value(message))
5100 }
5101 WebSocketHandleKind::Server(socket) => {
5102 let started = std::time::Instant::now();
5103 loop {
5104 let message = {
5105 let mut socket = socket.lock().await;
5106 if socket.closed {
5107 return Ok(VmValue::Nil);
5108 }
5109 while let Ok(message) = socket.incoming_rx.try_recv() {
5110 socket.incoming.push_back(message);
5111 }
5112 socket.incoming.pop_front()
5113 };
5114 if let Some(message) = message {
5115 if message.data.len() > max_message_bytes {
5116 let mut socket = socket.lock().await;
5117 socket.closed = true;
5118 let _ = socket.outgoing_tx.try_send(ServerWebSocketCommand::Close(
5119 Some(1009),
5120 Some("message too large".to_string()),
5121 ));
5122 return Err(vm_error(format!(
5123 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
5124 )));
5125 }
5126 if message.message_type == "close" {
5127 let mut socket = socket.lock().await;
5128 socket.closed = true;
5129 }
5130 return Ok(ws_event_value(message));
5131 }
5132 if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
5133 return Ok(timeout_event());
5134 }
5135 tokio::time::sleep(Duration::from_millis(10)).await;
5136 }
5137 }
5138 }
5139}
5140
5141async fn vm_websocket_close(socket_id: &str) -> Result<VmValue, VmError> {
5142 let removed = WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().remove(socket_id));
5143 let Some(handle) = removed else {
5144 return Ok(VmValue::Bool(false));
5145 };
5146 match handle.kind {
5147 WebSocketHandleKind::Fake(socket) => {
5148 socket.lock().await.closed = true;
5149 record_transport_call(TransportMockCall {
5150 kind: "websocket_close".to_string(),
5151 handle: Some(socket_id.to_string()),
5152 url: handle.url,
5153 message_type: None,
5154 data: None,
5155 });
5156 Ok(VmValue::Bool(true))
5157 }
5158 WebSocketHandleKind::Real(socket) => {
5159 let mut socket = socket.lock().await;
5160 socket
5161 .close(None)
5162 .await
5163 .map_err(|error| vm_error(format!("websocket_close: failed: {error}")))?;
5164 Ok(VmValue::Bool(true))
5165 }
5166 WebSocketHandleKind::Server(socket) => {
5167 let mut socket = socket.lock().await;
5168 socket.closed = true;
5169 let _ = socket
5170 .outgoing_tx
5171 .try_send(ServerWebSocketCommand::Close(None, None));
5172 Ok(VmValue::Bool(true))
5173 }
5174 }
5175}
5176
5177#[cfg(test)]
5178mod tests {
5179 use super::{
5180 compute_retry_delay, handle_from_value, http_mock_calls_snapshot, parse_retry_after_value,
5181 push_http_mock, reset_http_state, vm_execute_http_request, vm_http_download,
5182 vm_http_stream_info, vm_http_stream_open, vm_http_stream_read, vm_sse_event_frame,
5183 vm_sse_server_cancel, vm_sse_server_heartbeat, vm_sse_server_mock_disconnect,
5184 vm_sse_server_mock_receive, vm_sse_server_observed_bool, vm_sse_server_response,
5185 vm_sse_server_send, HttpMockResponse,
5186 };
5187 use crate::connectors::test_util::{
5188 accept_http_connection, read_http_request, write_http_response,
5189 };
5190 use crate::value::VmValue;
5191 use base64::Engine;
5192 use rcgen::generate_simple_self_signed;
5193 use rustls::pki_types::PrivatePkcs8KeyDer;
5194 use rustls::{ServerConfig, ServerConnection, StreamOwned};
5195 use sha2::{Digest, Sha256};
5196 use std::collections::BTreeMap;
5197 use std::io::{Read, Write};
5198 use std::net::TcpListener;
5199 use std::rc::Rc;
5200 use std::sync::{Arc, Once};
5201 use std::time::{Duration, SystemTime};
5202 use tempfile::TempDir;
5203 use x509_parser::prelude::{FromDer, X509Certificate};
5204
5205 fn expect_bool(value: VmValue) -> bool {
5206 let VmValue::Bool(value) = value else {
5207 panic!("expected bool, got {}", value.display());
5208 };
5209 value
5210 }
5211
5212 #[test]
5213 fn parses_retry_after_delta_seconds() {
5214 assert_eq!(parse_retry_after_value("5"), Some(Duration::from_secs(5)));
5215 }
5216
5217 #[test]
5218 fn parses_retry_after_http_date() {
5219 let header = httpdate::fmt_http_date(SystemTime::now() + Duration::from_secs(2));
5220 let parsed = parse_retry_after_value(&header).expect("http-date should parse");
5221 assert!(parsed <= Duration::from_secs(2));
5222 assert!(parsed <= Duration::from_secs(60));
5223 }
5224
5225 #[test]
5226 fn malformed_retry_after_returns_none() {
5227 assert_eq!(parse_retry_after_value("soon-ish"), None);
5228 }
5229
5230 #[test]
5231 fn retry_delay_honors_retry_after_floor() {
5232 let delay = compute_retry_delay(0, 1, Some(Duration::from_millis(250)));
5233 assert!(delay >= Duration::from_millis(250));
5234 assert!(delay <= Duration::from_secs(60));
5235 }
5236
5237 #[tokio::test]
5238 async fn typed_mock_api_drives_http_request_retries() {
5239 reset_http_state();
5240 push_http_mock(
5241 "GET",
5242 "https://api.example.com/retry",
5243 vec![
5244 HttpMockResponse::new(503, "busy").with_header("retry-after", "0"),
5245 HttpMockResponse::new(200, "ok"),
5246 ],
5247 );
5248 let result = vm_execute_http_request(
5249 "GET",
5250 "https://api.example.com/retry",
5251 &BTreeMap::from([
5252 ("retries".to_string(), VmValue::Int(1)),
5253 ("backoff".to_string(), VmValue::Int(0)),
5254 ]),
5255 )
5256 .await
5257 .expect("mocked request should succeed after retry");
5258
5259 let dict = result.as_dict().expect("response dict");
5260 assert_eq!(dict["status"].as_int(), Some(200));
5261 let calls = http_mock_calls_snapshot();
5262 assert_eq!(calls.len(), 2);
5263 assert_eq!(calls[0].url, "https://api.example.com/retry");
5264 reset_http_state();
5265 }
5266
5267 #[tokio::test]
5268 async fn multipart_requests_are_mock_visible() {
5269 reset_http_state();
5270 push_http_mock(
5271 "POST",
5272 "https://api.example.com/upload",
5273 vec![HttpMockResponse::new(201, "uploaded")],
5274 );
5275 let options = BTreeMap::from([(
5276 "multipart".to_string(),
5277 VmValue::List(Rc::new(vec![
5278 VmValue::Dict(Rc::new(BTreeMap::from([
5279 ("name".to_string(), VmValue::String(Rc::from("meta"))),
5280 ("value".to_string(), VmValue::String(Rc::from("hello"))),
5281 ]))),
5282 VmValue::Dict(Rc::new(BTreeMap::from([
5283 ("name".to_string(), VmValue::String(Rc::from("blob"))),
5284 (
5285 "filename".to_string(),
5286 VmValue::String(Rc::from("blob.bin")),
5287 ),
5288 (
5289 "content_type".to_string(),
5290 VmValue::String(Rc::from("application/octet-stream")),
5291 ),
5292 (
5293 "value".to_string(),
5294 VmValue::Bytes(Rc::new(vec![0, 1, 2, 3])),
5295 ),
5296 ]))),
5297 ])),
5298 )]);
5299
5300 let response = vm_execute_http_request("POST", "https://api.example.com/upload", &options)
5301 .await
5302 .expect("multipart mock request should succeed");
5303 let response = response.as_dict().expect("response dict");
5304 assert_eq!(response["status"].as_int(), Some(201));
5305
5306 let calls = http_mock_calls_snapshot();
5307 assert_eq!(calls.len(), 1);
5308 assert!(calls[0]
5309 .headers
5310 .get("content-type")
5311 .expect("content-type recorded")
5312 .contains("multipart/form-data"));
5313 let body = calls[0].body.as_deref().expect("multipart body recorded");
5314 assert!(body.contains("name=\"meta\""));
5315 assert!(body.contains("hello"));
5316 assert!(body.contains("filename=\"blob.bin\""));
5317 reset_http_state();
5318 }
5319
5320 #[tokio::test]
5321 async fn http_stream_mock_reads_in_chunks() {
5322 reset_http_state();
5323 push_http_mock(
5324 "GET",
5325 "https://api.example.com/stream",
5326 vec![HttpMockResponse::new(200, "stream-body")],
5327 );
5328
5329 let handle = vm_http_stream_open("https://api.example.com/stream", &BTreeMap::new())
5330 .await
5331 .expect("stream open");
5332 let stream_id = handle.display();
5333 let info = vm_http_stream_info(&stream_id).expect("stream info");
5334 let info = info.as_dict().expect("info dict");
5335 assert_eq!(info["status"].as_int(), Some(200));
5336
5337 let first = vm_http_stream_read(&stream_id, 6)
5338 .await
5339 .expect("first chunk");
5340 let second = vm_http_stream_read(&stream_id, 64)
5341 .await
5342 .expect("second chunk");
5343 let end = vm_http_stream_read(&stream_id, 64)
5344 .await
5345 .expect("end marker");
5346 assert_eq!(first.as_bytes().expect("bytes"), b"stream");
5347 assert_eq!(second.as_bytes().expect("bytes"), b"-body");
5348 assert!(matches!(end, VmValue::Nil));
5349 reset_http_state();
5350 }
5351
5352 #[tokio::test]
5353 async fn http_download_mock_writes_file() {
5354 reset_http_state();
5355 let temp = TempDir::new().expect("tempdir");
5356 let path = temp.path().join("download.bin");
5357 push_http_mock(
5358 "GET",
5359 "https://api.example.com/download",
5360 vec![HttpMockResponse::new(200, "downloaded")],
5361 );
5362
5363 let response = vm_http_download(
5364 "https://api.example.com/download",
5365 &path.display().to_string(),
5366 &BTreeMap::new(),
5367 )
5368 .await
5369 .expect("download response");
5370 let response = response.as_dict().expect("response dict");
5371 assert_eq!(response["bytes_written"].as_int(), Some(10));
5372 assert_eq!(
5373 std::fs::read_to_string(path).expect("downloaded file"),
5374 "downloaded"
5375 );
5376 reset_http_state();
5377 }
5378
5379 #[tokio::test]
5380 async fn http_proxy_routes_requests_through_configured_proxy() {
5381 reset_http_state();
5382 let listener = TcpListener::bind("127.0.0.1:0").expect("bind proxy listener");
5383 let proxy_addr = listener.local_addr().expect("proxy addr");
5384 let proxy_thread = std::thread::spawn(move || {
5385 let mut stream = accept_http_connection(&listener, "proxy listener");
5386 let request = read_http_request(&mut stream);
5387 assert_eq!(request.method, "GET");
5388 assert_eq!(request.path, "http://example.invalid/proxy-check");
5389 assert_eq!(
5390 request
5391 .headers
5392 .get("proxy-authorization")
5393 .map(String::as_str),
5394 Some("Basic dXNlcjpwYXNz")
5395 );
5396 write_http_response(
5397 &mut stream,
5398 200,
5399 &[("content-type", "text/plain".to_string())],
5400 "proxied",
5401 );
5402 });
5403
5404 let options = BTreeMap::from([
5405 (
5406 "proxy".to_string(),
5407 VmValue::String(Rc::from(format!("http://{proxy_addr}"))),
5408 ),
5409 (
5410 "proxy_auth".to_string(),
5411 VmValue::Dict(Rc::new(BTreeMap::from([
5412 ("user".to_string(), VmValue::String(Rc::from("user"))),
5413 ("pass".to_string(), VmValue::String(Rc::from("pass"))),
5414 ]))),
5415 ),
5416 ("timeout_ms".to_string(), VmValue::Int(1_000)),
5417 ]);
5418
5419 let response =
5420 vm_execute_http_request("GET", "http://example.invalid/proxy-check", &options)
5421 .await
5422 .expect("proxied response");
5423 let response = response.as_dict().expect("response dict");
5424 assert_eq!(response["status"].as_int(), Some(200));
5425 assert_eq!(response["body"].display(), "proxied");
5426 proxy_thread.join().expect("proxy thread");
5427 reset_http_state();
5428 }
5429
5430 #[tokio::test]
5431 async fn custom_tls_ca_bundle_and_pin_allow_request() {
5432 reset_http_state();
5433 install_rustls_provider();
5434 let temp = TempDir::new().expect("tempdir");
5435 let cert =
5436 generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5437 .expect("generate cert");
5438 let cert_pem = cert.cert.pem();
5439 let cert_path = temp.path().join("cert.pem");
5440 std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5441 let pin = spki_pin_base64(cert.cert.der().as_ref());
5442
5443 let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5444 let port = listener.local_addr().expect("tls addr").port();
5445 let server_config = Arc::new(
5446 ServerConfig::builder()
5447 .with_no_client_auth()
5448 .with_single_cert(
5449 vec![cert.cert.der().clone()],
5450 PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5451 )
5452 .expect("build tls config"),
5453 );
5454 let thread = std::thread::spawn(move || {
5455 let (tcp, _) = listener.accept().expect("accept tls client");
5456 let conn = ServerConnection::new(server_config).expect("server connection");
5457 let mut stream = StreamOwned::new(conn, tcp);
5458 let request = read_http_request_generic(&mut stream);
5459 assert!(request.starts_with("GET /secure HTTP/1.1\r\n"));
5460 write_http_response_generic(
5461 &mut stream,
5462 200,
5463 &[("content-type", "text/plain".to_string())],
5464 "secure",
5465 );
5466 });
5467
5468 let options = BTreeMap::from([(
5469 "tls".to_string(),
5470 VmValue::Dict(Rc::new(BTreeMap::from([
5471 (
5472 "ca_bundle_path".to_string(),
5473 VmValue::String(Rc::from(cert_path.display().to_string())),
5474 ),
5475 (
5476 "pinned_sha256".to_string(),
5477 VmValue::List(Rc::new(vec![VmValue::String(Rc::from(pin))])),
5478 ),
5479 ]))),
5480 )]);
5481 let response =
5482 vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5483 .await
5484 .expect("tls request should succeed");
5485 let response = response.as_dict().expect("response dict");
5486 assert_eq!(response["body"].display(), "secure");
5487 thread.join().expect("tls thread");
5488 reset_http_state();
5489 }
5490
5491 #[tokio::test]
5492 async fn custom_tls_pin_mismatch_is_rejected() {
5493 reset_http_state();
5494 install_rustls_provider();
5495 let temp = TempDir::new().expect("tempdir");
5496 let cert =
5497 generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5498 .expect("generate cert");
5499 let cert_pem = cert.cert.pem();
5500 let cert_path = temp.path().join("cert.pem");
5501 std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5502
5503 let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5504 let port = listener.local_addr().expect("tls addr").port();
5505 let server_config = Arc::new(
5506 ServerConfig::builder()
5507 .with_no_client_auth()
5508 .with_single_cert(
5509 vec![cert.cert.der().clone()],
5510 PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5511 )
5512 .expect("build tls config"),
5513 );
5514 let thread = std::thread::spawn(move || {
5515 let (tcp, _) = listener.accept().expect("accept tls client");
5516 let conn = ServerConnection::new(server_config).expect("server connection");
5517 let mut stream = StreamOwned::new(conn, tcp);
5518 let _ = read_http_request_generic(&mut stream);
5519 write_http_response_generic(
5520 &mut stream,
5521 200,
5522 &[("content-type", "text/plain".to_string())],
5523 "secure",
5524 );
5525 });
5526
5527 let options = BTreeMap::from([(
5528 "tls".to_string(),
5529 VmValue::Dict(Rc::new(BTreeMap::from([
5530 (
5531 "ca_bundle_path".to_string(),
5532 VmValue::String(Rc::from(cert_path.display().to_string())),
5533 ),
5534 (
5535 "pinned_sha256".to_string(),
5536 VmValue::List(Rc::new(vec![VmValue::String(Rc::from("deadbeef"))])),
5537 ),
5538 ]))),
5539 )]);
5540 let error =
5541 vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5542 .await
5543 .expect_err("pin mismatch should fail");
5544 let message = error.to_string();
5545 assert!(message.contains("TLS SPKI pin mismatch"), "{message}");
5546 thread.join().expect("tls thread");
5547 reset_http_state();
5548 }
5549
5550 fn install_rustls_provider() {
5551 static INSTALL: Once = Once::new();
5552 INSTALL.call_once(|| {
5553 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
5554 });
5555 }
5556
5557 fn spki_pin_base64(cert_der: &[u8]) -> String {
5558 let (_, cert) = X509Certificate::from_der(cert_der).expect("parse cert");
5559 base64::engine::general_purpose::STANDARD
5560 .encode(Sha256::digest(cert.tbs_certificate.subject_pki.raw))
5561 }
5562
5563 fn read_http_request_generic<T: Read>(stream: &mut T) -> String {
5564 let mut buffer = Vec::new();
5565 let mut chunk = [0u8; 4096];
5566 loop {
5567 let read = stream.read(&mut chunk).expect("read request");
5568 assert!(read > 0, "request closed before headers");
5569 buffer.extend_from_slice(&chunk[..read]);
5570 if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
5571 return String::from_utf8_lossy(&buffer).into_owned();
5572 }
5573 }
5574 }
5575
5576 fn write_http_response_generic<T: Write>(
5577 stream: &mut T,
5578 status: u16,
5579 headers: &[(&str, String)],
5580 body: &str,
5581 ) {
5582 let mut response = format!(
5583 "HTTP/1.1 {status} OK\r\ncontent-length: {}\r\nconnection: close\r\n",
5584 body.len()
5585 );
5586 for (name, value) in headers {
5587 response.push_str(name);
5588 response.push_str(": ");
5589 response.push_str(value);
5590 response.push_str("\r\n");
5591 }
5592 response.push_str("\r\n");
5593 response.push_str(body);
5594 stream
5595 .write_all(response.as_bytes())
5596 .expect("write response");
5597 stream.flush().expect("flush response");
5598 }
5599
5600 #[test]
5601 fn formats_sse_event_fields_and_multiline_data() {
5602 let frame = vm_sse_event_frame(
5603 &VmValue::Dict(Rc::new(BTreeMap::from([
5604 ("event".to_string(), VmValue::String(Rc::from("progress"))),
5605 ("data".to_string(), VmValue::String(Rc::from("one\ntwo"))),
5606 ("id".to_string(), VmValue::String(Rc::from("evt-1"))),
5607 ("retry_ms".to_string(), VmValue::Int(2500)),
5608 ]))),
5609 &BTreeMap::new(),
5610 )
5611 .expect("event frame");
5612 assert_eq!(
5613 frame,
5614 "id: evt-1\nevent: progress\nretry: 2500\ndata: one\ndata: two\n\n"
5615 );
5616 }
5617
5618 #[test]
5619 fn rejects_sse_event_control_fields_with_newlines() {
5620 let err = vm_sse_event_frame(
5621 &VmValue::Dict(Rc::new(BTreeMap::from([(
5622 "event".to_string(),
5623 VmValue::String(Rc::from("bad\nname")),
5624 )]))),
5625 &BTreeMap::new(),
5626 )
5627 .expect_err("newline should reject");
5628 assert!(err.to_string().contains("event must not contain newlines"));
5629 }
5630
5631 #[test]
5632 fn server_sse_mock_client_observes_heartbeat_disconnect_and_cancel() {
5633 reset_http_state();
5634 let response = vm_sse_server_response(&BTreeMap::from([(
5635 "max_buffered_events".to_string(),
5636 VmValue::Int(4),
5637 )]))
5638 .expect("response");
5639 let stream_id = handle_from_value(&response, "test").expect("handle");
5640
5641 assert!(expect_bool(
5642 vm_sse_server_send(
5643 &stream_id,
5644 &VmValue::Dict(Rc::new(BTreeMap::from([
5645 ("event".to_string(), VmValue::String(Rc::from("progress")),),
5646 ("data".to_string(), VmValue::String(Rc::from("50"))),
5647 ]))),
5648 &BTreeMap::new(),
5649 )
5650 .expect("send")
5651 ));
5652 assert!(expect_bool(
5653 vm_sse_server_heartbeat(&stream_id, Some(&VmValue::String(Rc::from("tick"))))
5654 .expect("heartbeat")
5655 ));
5656
5657 let first = vm_sse_server_mock_receive(&stream_id).expect("first");
5658 let first = first.as_dict().expect("first dict");
5659 assert_eq!(first["event"].display(), "progress");
5660 assert_eq!(first["data"].display(), "50");
5661 let heartbeat = vm_sse_server_mock_receive(&stream_id).expect("heartbeat read");
5662 let heartbeat = heartbeat.as_dict().expect("heartbeat dict");
5663 assert_eq!(heartbeat["type"].display(), "comment");
5664 assert_eq!(heartbeat["comment"].display(), "tick");
5665
5666 assert!(expect_bool(
5667 vm_sse_server_mock_disconnect(&stream_id).expect("disconnect")
5668 ));
5669 assert!(expect_bool(
5670 vm_sse_server_observed_bool(&stream_id, "test", |handle| handle.disconnected)
5671 .expect("observed")
5672 ));
5673 assert!(!expect_bool(
5674 vm_sse_server_send(
5675 &stream_id,
5676 &VmValue::String(Rc::from("late")),
5677 &BTreeMap::new()
5678 )
5679 .expect("late send")
5680 ));
5681
5682 let cancelled = vm_sse_server_response(&BTreeMap::new()).expect("cancelled response");
5683 let cancelled_id = handle_from_value(&cancelled, "test").expect("cancelled handle");
5684 assert!(expect_bool(
5685 vm_sse_server_cancel(&cancelled_id, Some(&VmValue::String(Rc::from("stop"))))
5686 .expect("cancel")
5687 ));
5688 assert!(expect_bool(
5689 vm_sse_server_observed_bool(&cancelled_id, "test", |handle| handle.cancelled)
5690 .expect("cancelled observed")
5691 ));
5692 reset_http_state();
5693 }
5694
5695 #[test]
5696 fn server_sse_rejects_oversized_events() {
5697 reset_http_state();
5698 let response = vm_sse_server_response(&BTreeMap::from([(
5699 "max_event_bytes".to_string(),
5700 VmValue::Int(12),
5701 )]))
5702 .expect("response");
5703 let stream_id = handle_from_value(&response, "test").expect("handle");
5704 let err = vm_sse_server_send(
5705 &stream_id,
5706 &VmValue::String(Rc::from("this is too large")),
5707 &BTreeMap::new(),
5708 )
5709 .expect_err("oversized event should reject");
5710 assert!(err.to_string().contains("max_event_bytes"));
5711 reset_http_state();
5712 }
5713}