1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::io::Write;
4use std::net::{Shutdown, TcpListener, TcpStream};
5use std::rc::Rc;
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 mpsc, Arc, RwLock,
9};
10use std::thread;
11use std::time::{Duration, SystemTime};
12
13use crate::value::{VmClosure, VmError, VmValue};
14use crate::vm::Vm;
15
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest_eventsource::{Event as SseEvent, EventSource};
19use sha2::{Digest, Sha256};
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
22use tokio_tungstenite::tungstenite::http::{HeaderValue, StatusCode};
23use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
24use tokio_tungstenite::tungstenite::protocol::CloseFrame;
25use tokio_tungstenite::tungstenite::Message as WsMessage;
26use x509_parser::prelude::{FromDer, X509Certificate};
27
28mod mock;
29
30use mock::{
31 clear_http_mocks, consume_http_mock, http_mock_calls_value, parse_mock_responses,
32 register_http_mock, reset_http_mocks, url_matches,
33};
34pub use mock::{http_mock_calls_snapshot, push_http_mock, HttpMockCallSnapshot, HttpMockResponse};
35#[cfg(test)]
36use mock::{mock_call_headers_value, redact_mock_call_url};
37
38#[derive(Clone)]
39struct RetryConfig {
40 max: u32,
41 backoff_ms: u64,
42 retryable_statuses: Vec<u16>,
43 retryable_methods: Vec<String>,
44 respect_retry_after: bool,
45}
46
47#[derive(Clone)]
48struct HttpRequestConfig {
49 total_timeout_ms: u64,
50 connect_timeout_ms: Option<u64>,
51 read_timeout_ms: Option<u64>,
52 retry: RetryConfig,
53 follow_redirects: bool,
54 max_redirects: usize,
55 proxy: Option<HttpProxyConfig>,
56 tls: HttpTlsConfig,
57 decompress: bool,
58}
59
60#[derive(Clone, Default)]
61struct HttpTlsConfig {
62 ca_bundle_path: Option<String>,
63 client_cert_path: Option<String>,
64 client_key_path: Option<String>,
65 client_identity_path: Option<String>,
66 pinned_sha256: Vec<String>,
67}
68
69#[derive(Clone)]
70struct HttpProxyConfig {
71 url: String,
72 auth: Option<(String, String)>,
73 no_proxy: Option<String>,
74}
75
76#[derive(Clone)]
77struct HttpSession {
78 client: reqwest::Client,
79 options: BTreeMap<String, VmValue>,
80}
81
82struct HttpRequestParts {
83 method: reqwest::Method,
84 headers: reqwest::header::HeaderMap,
85 recorded_headers: BTreeMap<String, VmValue>,
86 body: Option<String>,
87 multipart: Option<MultipartRequest>,
88}
89
90#[derive(Clone)]
91struct MultipartRequest {
92 parts: Vec<MultipartField>,
93 mock_body: String,
94}
95
96#[derive(Clone)]
97struct MultipartField {
98 name: String,
99 value: Vec<u8>,
100 filename: Option<String>,
101 content_type: Option<String>,
102}
103
104struct HttpStreamHandle {
105 kind: HttpStreamKind,
106 status: i64,
107 headers: BTreeMap<String, VmValue>,
108 pending: VecDeque<u8>,
109 closed: bool,
110}
111
112enum HttpStreamKind {
113 Real(Rc<tokio::sync::Mutex<reqwest::Response>>),
114 Fake,
115}
116
117struct SseMock {
118 url_pattern: String,
119 events: Vec<MockStreamEvent>,
120}
121
122#[derive(Clone)]
123struct MockStreamEvent {
124 event_type: String,
125 data: String,
126 id: Option<String>,
127 retry_ms: Option<i64>,
128}
129
130struct SseHandle {
131 kind: SseHandleKind,
132 url: String,
133 max_events: usize,
134 max_message_bytes: usize,
135 received: usize,
136}
137
138enum SseHandleKind {
139 Real(Rc<tokio::sync::Mutex<EventSource>>),
140 Fake(Rc<tokio::sync::Mutex<FakeSseStream>>),
141}
142
143struct FakeSseStream {
144 events: VecDeque<MockStreamEvent>,
145 opened: bool,
146 closed: bool,
147}
148
149struct SseServerHandle {
150 status: i64,
151 headers: BTreeMap<String, VmValue>,
152 frames: VecDeque<String>,
153 max_event_bytes: usize,
154 max_buffered_events: usize,
155 sent_events: usize,
156 flushed_events: usize,
157 closed: bool,
158 disconnected: bool,
159 cancelled: bool,
160 cancel_reason: Option<String>,
161}
162
163struct WebSocketMock {
164 url_pattern: String,
165 messages: Vec<MockWsMessage>,
166 echo: bool,
167}
168
169#[derive(Clone)]
170struct MockWsMessage {
171 message_type: String,
172 data: Vec<u8>,
173 close_code: Option<u16>,
174 close_reason: Option<String>,
175}
176
177type RealWebSocket =
178 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
179
180struct WebSocketHandle {
181 kind: WebSocketHandleKind,
182 url: String,
183 max_messages: usize,
184 max_message_bytes: usize,
185 received: usize,
186}
187
188enum WebSocketHandleKind {
189 Real(Rc<tokio::sync::Mutex<RealWebSocket>>),
190 Fake(Rc<tokio::sync::Mutex<FakeWebSocket>>),
191 Server(Rc<tokio::sync::Mutex<ServerWebSocket>>),
192}
193
194struct FakeWebSocket {
195 messages: VecDeque<MockWsMessage>,
196 echo: bool,
197 closed: bool,
198}
199
200struct WebSocketServer {
201 addr: String,
202 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
203 events: Rc<tokio::sync::Mutex<mpsc::Receiver<WebSocketServerEvent>>>,
204 running: Arc<AtomicBool>,
205}
206
207#[derive(Clone)]
208struct WebSocketRoute {
209 path: String,
210 bearer_token: Option<String>,
211 max_messages: usize,
212 max_message_bytes: usize,
213 send_buffer_messages: usize,
214 idle_timeout_ms: u64,
215}
216
217struct WebSocketServerEvent {
218 handle: ServerWebSocket,
219 path: String,
220 peer: String,
221 headers: BTreeMap<String, String>,
222 max_messages: usize,
223 max_message_bytes: usize,
224}
225
226struct ServerWebSocket {
227 incoming: VecDeque<MockWsMessage>,
228 incoming_rx: mpsc::Receiver<MockWsMessage>,
229 outgoing_tx: mpsc::SyncSender<ServerWebSocketCommand>,
230 closed: bool,
231}
232
233enum ServerWebSocketCommand {
234 Send(MockWsMessage),
235 Close(Option<u16>, Option<String>),
236}
237
238#[derive(Clone)]
239struct TransportMockCall {
240 kind: String,
241 handle: Option<String>,
242 url: String,
243 message_type: Option<String>,
244 data: Option<String>,
245}
246
247#[derive(Clone)]
248struct HttpServerRoute {
249 method: String,
250 template: String,
251 handler: Rc<VmClosure>,
252 max_body_bytes: Option<usize>,
253 retain_raw_body: Option<bool>,
254}
255
256#[derive(Clone)]
257struct HttpServer {
258 routes: Vec<HttpServerRoute>,
259 before: Vec<Rc<VmClosure>>,
260 after: Vec<Rc<VmClosure>>,
261 ready: bool,
262 readiness: Option<Rc<VmClosure>>,
263 shutdown_hooks: Vec<Rc<VmClosure>>,
264 shutdown: bool,
265 max_body_bytes: usize,
266 retain_raw_body: bool,
267}
268
269const DEFAULT_TIMEOUT_MS: u64 = 30_000;
270const DEFAULT_BACKOFF_MS: u64 = 1_000;
271const MAX_RETRY_DELAY_MS: u64 = 60_000;
272const DEFAULT_RETRYABLE_STATUSES: [u16; 6] = [408, 429, 500, 502, 503, 504];
273const DEFAULT_RETRYABLE_METHODS: [&str; 5] = ["GET", "HEAD", "PUT", "DELETE", "OPTIONS"];
274const DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS: u64 = 30_000;
275const DEFAULT_MAX_STREAM_EVENTS: usize = 10_000;
276const DEFAULT_MAX_MESSAGE_BYTES: usize = 1024 * 1024;
277const DEFAULT_SERVER_MAX_BODY_BYTES: usize = 1024 * 1024;
278const DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS: u64 = 30_000;
279const MAX_HTTP_SESSIONS: usize = 64;
280const MAX_HTTP_STREAMS: usize = 64;
281const MAX_SSE_STREAMS: usize = 64;
282const MAX_SSE_SERVER_STREAMS: usize = 64;
283const MAX_WEBSOCKETS: usize = 64;
284const MULTIPART_MOCK_BOUNDARY: &str = "harn-boundary";
285const MAX_HTTP_SERVERS: usize = 128;
286const MAX_WEBSOCKET_SERVERS: usize = 16;
287
288thread_local! {
289 static HTTP_CLIENTS: RefCell<HashMap<String, reqwest::Client>> = RefCell::new(HashMap::new());
290 static HTTP_SESSIONS: RefCell<HashMap<String, HttpSession>> = RefCell::new(HashMap::new());
291 static HTTP_STREAMS: RefCell<HashMap<String, HttpStreamHandle>> = RefCell::new(HashMap::new());
292 static SSE_MOCKS: RefCell<Vec<SseMock>> = const { RefCell::new(Vec::new()) };
293 static SSE_HANDLES: RefCell<HashMap<String, SseHandle>> = RefCell::new(HashMap::new());
294 static SSE_SERVER_HANDLES: RefCell<HashMap<String, SseServerHandle>> = RefCell::new(HashMap::new());
295 static WEBSOCKET_MOCKS: RefCell<Vec<WebSocketMock>> = const { RefCell::new(Vec::new()) };
296 static WEBSOCKET_HANDLES: RefCell<HashMap<String, WebSocketHandle>> = RefCell::new(HashMap::new());
297 static WEBSOCKET_SERVERS: RefCell<HashMap<String, WebSocketServer>> = RefCell::new(HashMap::new());
298 static TRANSPORT_MOCK_CALLS: RefCell<Vec<TransportMockCall>> = const { RefCell::new(Vec::new()) };
299 static TRANSPORT_HANDLE_COUNTER: RefCell<u64> = const { RefCell::new(0) };
300 static HTTP_SERVERS: RefCell<HashMap<String, HttpServer>> = RefCell::new(HashMap::new());
301}
302
303pub fn reset_http_state() {
305 reset_http_mocks();
306 HTTP_CLIENTS.with(|clients| clients.borrow_mut().clear());
307 HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().clear());
308 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
309 SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
310 SSE_HANDLES.with(|handles| {
311 for handle in handles.borrow_mut().values_mut() {
312 if let SseHandleKind::Real(stream) = &handle.kind {
313 if let Ok(mut stream) = stream.try_lock() {
314 stream.close();
315 }
316 }
317 }
318 handles.borrow_mut().clear();
319 });
320 SSE_SERVER_HANDLES.with(|handles| handles.borrow_mut().clear());
321 WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
322 WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
323 WEBSOCKET_SERVERS.with(|servers| {
324 let mut servers = servers.borrow_mut();
325 for server in servers.values() {
326 server.running.store(false, Ordering::SeqCst);
327 let _ = TcpStream::connect(&server.addr);
328 }
329 servers.clear();
330 });
331 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
332 TRANSPORT_HANDLE_COUNTER.with(|counter| *counter.borrow_mut() = 0);
333 HTTP_SERVERS.with(|servers| servers.borrow_mut().clear());
334}
335
336fn build_http_response(status: i64, headers: BTreeMap<String, VmValue>, body: String) -> VmValue {
338 let mut result = BTreeMap::new();
339 result.insert("status".to_string(), VmValue::Int(status));
340 result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
341 result.insert("body".to_string(), VmValue::String(Rc::from(body)));
342 result.insert(
343 "ok".to_string(),
344 VmValue::Bool((200..300).contains(&(status as u16))),
345 );
346 VmValue::Dict(Rc::new(result))
347}
348
349fn build_http_download_response(
350 status: i64,
351 headers: BTreeMap<String, VmValue>,
352 bytes_written: u64,
353) -> VmValue {
354 let mut result = BTreeMap::new();
355 result.insert("status".to_string(), VmValue::Int(status));
356 result.insert("headers".to_string(), VmValue::Dict(Rc::new(headers)));
357 result.insert(
358 "bytes_written".to_string(),
359 VmValue::Int(bytes_written as i64),
360 );
361 result.insert(
362 "ok".to_string(),
363 VmValue::Bool((200..300).contains(&(status as u16))),
364 );
365 VmValue::Dict(Rc::new(result))
366}
367
368fn response_headers(headers: &reqwest::header::HeaderMap) -> BTreeMap<String, VmValue> {
369 let mut resp_headers = BTreeMap::new();
370 for (name, value) in headers {
371 if let Ok(v) = value.to_str() {
372 resp_headers.insert(name.as_str().to_string(), VmValue::String(Rc::from(v)));
373 }
374 }
375 resp_headers
376}
377
378fn vm_error(message: impl Into<String>) -> VmError {
379 VmError::Thrown(VmValue::String(Rc::from(message.into())))
380}
381
382fn next_transport_handle(prefix: &str) -> String {
383 TRANSPORT_HANDLE_COUNTER.with(|counter| {
384 let mut counter = counter.borrow_mut();
385 *counter += 1;
386 format!("{prefix}-{}", *counter)
387 })
388}
389
390fn handle_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
391 match value {
392 VmValue::String(handle) => Ok(handle.to_string()),
393 VmValue::Dict(dict) => dict
394 .get("id")
395 .map(|id| id.display())
396 .filter(|id| !id.is_empty())
397 .ok_or_else(|| vm_error(format!("{builtin}: handle dict must contain id"))),
398 _ => Err(vm_error(format!(
399 "{builtin}: first argument must be a handle string or dict"
400 ))),
401 }
402}
403
404fn get_options_arg(args: &[VmValue], index: usize) -> BTreeMap<String, VmValue> {
405 args.get(index)
406 .and_then(|value| value.as_dict())
407 .cloned()
408 .unwrap_or_default()
409}
410
411fn merge_options(
412 base: &BTreeMap<String, VmValue>,
413 overrides: &BTreeMap<String, VmValue>,
414) -> BTreeMap<String, VmValue> {
415 let mut merged = base.clone();
416 for (key, value) in overrides {
417 merged.insert(key.clone(), value.clone());
418 }
419 merged
420}
421
422fn resolve_http_path(
423 builtin: &str,
424 path: &str,
425 access: crate::stdlib::sandbox::FsAccess,
426) -> Result<std::path::PathBuf, VmError> {
427 let resolved = crate::stdlib::process::resolve_source_relative_path(path);
428 crate::stdlib::sandbox::enforce_fs_path(builtin, &resolved, access)?;
429 Ok(resolved)
430}
431
432fn value_to_bytes(value: &VmValue) -> Vec<u8> {
433 match value {
434 VmValue::Bytes(bytes) => bytes.as_ref().clone(),
435 other => other.display().into_bytes(),
436 }
437}
438
439fn parse_multipart_field(value: &VmValue) -> Result<MultipartField, VmError> {
440 let dict = value
441 .as_dict()
442 .ok_or_else(|| vm_error("http: multipart entries must be dicts"))?;
443 let name = dict
444 .get("name")
445 .map(|value| value.display())
446 .filter(|value| !value.is_empty())
447 .ok_or_else(|| vm_error("http: multipart entry requires name"))?;
448 let content_type = dict
449 .get("content_type")
450 .or_else(|| dict.get("mime_type"))
451 .map(|value| value.display())
452 .filter(|value| !value.is_empty());
453
454 let mut filename = dict
455 .get("filename")
456 .map(|value| value.display())
457 .filter(|value| !value.is_empty());
458 let value = if let Some(path_value) = dict.get("path") {
459 let path = path_value.display();
460 let resolved = resolve_http_path(
461 "http multipart",
462 &path,
463 crate::stdlib::sandbox::FsAccess::Read,
464 )?;
465 if filename.is_none() {
466 filename = resolved
467 .file_name()
468 .and_then(|name| name.to_str())
469 .map(|name| name.to_string());
470 }
471 std::fs::read(&resolved).map_err(|error| {
472 vm_error(format!(
473 "http: failed to read multipart file {}: {error}",
474 resolved.display()
475 ))
476 })?
477 } else if let Some(base64_value) = dict.get("value_base64").or_else(|| dict.get("base64")) {
478 base64::engine::general_purpose::STANDARD
479 .decode(base64_value.display())
480 .map_err(|error| vm_error(format!("http: invalid multipart base64 value: {error}")))?
481 } else {
482 dict.get("value").map(value_to_bytes).ok_or_else(|| {
483 vm_error("http: multipart entry requires value, value_base64, or path")
484 })?
485 };
486
487 Ok(MultipartField {
488 name,
489 value,
490 filename,
491 content_type,
492 })
493}
494
495fn parse_multipart_request(
496 options: &BTreeMap<String, VmValue>,
497) -> Result<Option<MultipartRequest>, VmError> {
498 let Some(value) = options.get("multipart") else {
499 return Ok(None);
500 };
501 let VmValue::List(items) = value else {
502 return Err(vm_error("http: multipart must be a list"));
503 };
504 let parts = items
505 .iter()
506 .map(parse_multipart_field)
507 .collect::<Result<Vec<_>, _>>()?;
508 let mock_body = multipart_mock_body(&parts);
509 Ok(Some(MultipartRequest { parts, mock_body }))
510}
511
512fn multipart_mock_body(parts: &[MultipartField]) -> String {
513 let mut out = String::new();
514 for part in parts {
515 out.push_str("--");
516 out.push_str(MULTIPART_MOCK_BOUNDARY);
517 out.push_str("\r\nContent-Disposition: form-data; name=\"");
518 out.push_str(&part.name);
519 out.push('"');
520 if let Some(filename) = &part.filename {
521 out.push_str("; filename=\"");
522 out.push_str(filename);
523 out.push('"');
524 }
525 out.push_str("\r\n");
526 if let Some(content_type) = &part.content_type {
527 out.push_str("Content-Type: ");
528 out.push_str(content_type);
529 out.push_str("\r\n");
530 }
531 out.push_str("\r\n");
532 out.push_str(&String::from_utf8_lossy(&part.value));
533 out.push_str("\r\n");
534 }
535 out.push_str("--");
536 out.push_str(MULTIPART_MOCK_BOUNDARY);
537 out.push_str("--\r\n");
538 out
539}
540
541fn multipart_form(request: &MultipartRequest) -> Result<reqwest::multipart::Form, VmError> {
542 let mut form = reqwest::multipart::Form::new();
543 for field in &request.parts {
544 let mut part = reqwest::multipart::Part::bytes(field.value.clone());
545 if let Some(filename) = &field.filename {
546 part = part.file_name(filename.clone());
547 }
548 if let Some(content_type) = &field.content_type {
549 part = part.mime_str(content_type).map_err(|error| {
550 vm_error(format!("http: invalid multipart content_type: {error}"))
551 })?;
552 }
553 form = form.part(field.name.clone(), part);
554 }
555 Ok(form)
556}
557
558fn transport_limit_option(options: &BTreeMap<String, VmValue>, key: &str, default: usize) -> usize {
559 options
560 .get(key)
561 .and_then(|value| value.as_int())
562 .map(|value| value.max(0) as usize)
563 .unwrap_or(default)
564}
565
566fn receive_timeout_arg(args: &[VmValue], index: usize) -> u64 {
567 match args.get(index) {
568 Some(VmValue::Duration(ms)) => (*ms).max(0) as u64,
569 Some(value) => value
570 .as_int()
571 .map(|ms| ms.max(0) as u64)
572 .unwrap_or(DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS),
573 None => DEFAULT_TRANSPORT_RECEIVE_TIMEOUT_MS,
574 }
575}
576
577fn timeout_event() -> VmValue {
578 let mut dict = BTreeMap::new();
579 dict.insert("type".to_string(), VmValue::String(Rc::from("timeout")));
580 VmValue::Dict(Rc::new(dict))
581}
582
583fn closed_event() -> VmValue {
584 let mut dict = BTreeMap::new();
585 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
586 VmValue::Dict(Rc::new(dict))
587}
588
589fn sse_server_closed_event() -> VmValue {
590 let mut dict = BTreeMap::new();
591 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
592 dict.insert("server_closed".to_string(), VmValue::Bool(true));
593 VmValue::Dict(Rc::new(dict))
594}
595
596fn record_transport_call(call: TransportMockCall) {
597 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().push(call));
598}
599
600async fn http_verb_handler(
604 method: &str,
605 has_body: bool,
606 args: Vec<VmValue>,
607) -> Result<VmValue, VmError> {
608 let url = args.first().map(|a| a.display()).unwrap_or_default();
609 if url.is_empty() {
610 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
611 "http_{}: URL is required",
612 method.to_ascii_lowercase()
613 )))));
614 }
615 let mut options = if has_body {
616 match (args.get(1), args.get(2)) {
617 (Some(VmValue::Dict(d)), None) => (**d).clone(),
618 (_, Some(VmValue::Dict(d))) => (**d).clone(),
619 _ => BTreeMap::new(),
620 }
621 } else {
622 match args.get(1) {
623 Some(VmValue::Dict(d)) => (**d).clone(),
624 _ => BTreeMap::new(),
625 }
626 };
627 if has_body && !(matches!(args.get(1), Some(VmValue::Dict(_))) && args.get(2).is_none()) {
628 let body = args.get(1).map(|a| a.display()).unwrap_or_default();
629 options.insert("body".to_string(), VmValue::String(Rc::from(body)));
630 }
631 vm_execute_http_request(method, &url, &options).await
632}
633
634fn vm_string(value: impl AsRef<str>) -> VmValue {
635 VmValue::String(Rc::from(value.as_ref()))
636}
637
638fn dict_value(entries: BTreeMap<String, VmValue>) -> VmValue {
639 VmValue::Dict(Rc::new(entries))
640}
641
642fn get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
643 match options.get(key) {
644 Some(VmValue::Bool(value)) => *value,
645 _ => default,
646 }
647}
648
649fn get_usize_option(
650 options: &BTreeMap<String, VmValue>,
651 key: &str,
652 default: usize,
653) -> Result<usize, VmError> {
654 match options.get(key).and_then(VmValue::as_int) {
655 Some(value) if value >= 0 => Ok(value as usize),
656 Some(_) => Err(vm_error(format!("http_server: {key} must be non-negative"))),
657 None => Ok(default),
658 }
659}
660
661fn get_optional_usize_option(
662 options: &BTreeMap<String, VmValue>,
663 key: &str,
664) -> Result<Option<usize>, VmError> {
665 match options.get(key).and_then(VmValue::as_int) {
666 Some(value) if value >= 0 => Ok(Some(value as usize)),
667 Some(_) => Err(vm_error(format!(
668 "http_server_route: {key} must be non-negative"
669 ))),
670 None => Ok(None),
671 }
672}
673
674fn server_from_value(value: &VmValue, builtin: &str) -> Result<String, VmError> {
675 handle_from_value(value, builtin)
676}
677
678fn closure_arg(args: &[VmValue], index: usize, builtin: &str) -> Result<Rc<VmClosure>, VmError> {
679 match args.get(index) {
680 Some(VmValue::Closure(closure)) => Ok(closure.clone()),
681 Some(other) => Err(vm_error(format!(
682 "{builtin}: argument {} must be a closure, got {}",
683 index + 1,
684 other.type_name()
685 ))),
686 None => Err(vm_error(format!(
687 "{builtin}: missing closure argument {}",
688 index + 1
689 ))),
690 }
691}
692
693fn http_server_handle_value(id: &str) -> VmValue {
694 let mut dict = BTreeMap::new();
695 dict.insert("id".to_string(), vm_string(id));
696 dict.insert("kind".to_string(), vm_string("http_server"));
697 dict_value(dict)
698}
699
700fn header_lookup_value(headers: &BTreeMap<String, VmValue>, name: &str) -> VmValue {
701 headers
702 .iter()
703 .find(|(candidate, _)| candidate.eq_ignore_ascii_case(name))
704 .map(|(_, value)| value.clone())
705 .unwrap_or(VmValue::Nil)
706}
707
708fn headers_from_value(value: &VmValue) -> BTreeMap<String, VmValue> {
709 match value {
710 VmValue::Dict(dict) => dict
711 .get("headers")
712 .and_then(VmValue::as_dict)
713 .map(|headers| {
714 headers
715 .iter()
716 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
717 .collect()
718 })
719 .unwrap_or_else(|| {
720 dict.iter()
721 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
722 .collect()
723 }),
724 _ => BTreeMap::new(),
725 }
726}
727
728fn normalize_headers(value: Option<&VmValue>) -> BTreeMap<String, VmValue> {
729 match value.and_then(VmValue::as_dict) {
730 Some(headers) => headers
731 .iter()
732 .map(|(key, value)| (key.to_ascii_lowercase(), vm_string(value.display())))
733 .collect(),
734 None => BTreeMap::new(),
735 }
736}
737
738fn percent_decode(input: &str) -> String {
739 let bytes = input.as_bytes();
740 let mut out = Vec::with_capacity(bytes.len());
741 let mut i = 0;
742 while i < bytes.len() {
743 if bytes[i] == b'+' {
744 out.push(b' ');
745 i += 1;
746 continue;
747 }
748 if bytes[i] == b'%' && i + 2 < bytes.len() {
749 if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
750 out.push((hi << 4) | lo);
751 i += 3;
752 continue;
753 }
754 }
755 out.push(bytes[i]);
756 i += 1;
757 }
758 String::from_utf8_lossy(&out).into_owned()
759}
760
761fn hex_val(byte: u8) -> Option<u8> {
762 match byte {
763 b'0'..=b'9' => Some(byte - b'0'),
764 b'a'..=b'f' => Some(byte - b'a' + 10),
765 b'A'..=b'F' => Some(byte - b'A' + 10),
766 _ => None,
767 }
768}
769
770fn split_path_and_query(raw_path: &str) -> (String, BTreeMap<String, VmValue>) {
771 let (path, query) = raw_path.split_once('?').unwrap_or((raw_path, ""));
772 let mut query_map = BTreeMap::new();
773 for pair in query.split('&').filter(|part| !part.is_empty()) {
774 let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
775 query_map.insert(percent_decode(key), vm_string(percent_decode(value)));
776 }
777 (
778 if path.is_empty() { "/" } else { path }.to_string(),
779 query_map,
780 )
781}
782
783fn request_body_bytes(input: &BTreeMap<String, VmValue>) -> Vec<u8> {
784 match input.get("raw_body").or_else(|| input.get("body")) {
785 Some(VmValue::Bytes(bytes)) => bytes.as_ref().clone(),
786 Some(value) => value.display().into_bytes(),
787 None => Vec::new(),
788 }
789}
790
791fn request_value(
792 method: &str,
793 path: &str,
794 path_params: BTreeMap<String, VmValue>,
795 mut query: BTreeMap<String, VmValue>,
796 input: &BTreeMap<String, VmValue>,
797 body_bytes: &[u8],
798 retain_raw_body: bool,
799) -> VmValue {
800 if let Some(explicit_query) = input.get("query").and_then(VmValue::as_dict) {
801 query.extend(
802 explicit_query
803 .iter()
804 .map(|(key, value)| (key.clone(), value.clone())),
805 );
806 }
807
808 let headers = normalize_headers(input.get("headers"));
809 let body = String::from_utf8_lossy(body_bytes).into_owned();
810 let mut request = BTreeMap::new();
811 request.insert("method".to_string(), vm_string(method));
812 request.insert("path".to_string(), vm_string(path));
813 let path_params = dict_value(path_params);
814 request.insert("path_params".to_string(), path_params.clone());
815 request.insert("params".to_string(), path_params);
816 request.insert("query".to_string(), dict_value(query));
817 request.insert("headers".to_string(), dict_value(headers));
818 request.insert("body".to_string(), vm_string(body));
819 request.insert(
820 "raw_body".to_string(),
821 if retain_raw_body {
822 VmValue::Bytes(Rc::new(body_bytes.to_vec()))
823 } else {
824 VmValue::Nil
825 },
826 );
827 request.insert(
828 "body_bytes".to_string(),
829 VmValue::Int(body_bytes.len() as i64),
830 );
831 request.insert(
832 "remote_addr".to_string(),
833 input
834 .get("remote_addr")
835 .or_else(|| input.get("remote"))
836 .map(|value| vm_string(value.display()))
837 .unwrap_or(VmValue::Nil),
838 );
839 request.insert(
840 "client_ip".to_string(),
841 input
842 .get("client_ip")
843 .or_else(|| input.get("remote_ip"))
844 .or_else(|| input.get("ip"))
845 .map(|value| vm_string(value.display()))
846 .unwrap_or(VmValue::Nil),
847 );
848 dict_value(request)
849}
850
851fn normalize_status(status: i64) -> i64 {
852 if (100..=999).contains(&status) {
853 status
854 } else {
855 500
856 }
857}
858
859fn response_with_kind(
860 status: i64,
861 mut headers: BTreeMap<String, VmValue>,
862 body: VmValue,
863 body_kind: &str,
864) -> VmValue {
865 let status = normalize_status(status);
866 let mut response = BTreeMap::new();
867 if body_kind == "json" && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
868 {
869 headers.insert(
870 "content-type".to_string(),
871 vm_string("application/json; charset=utf-8"),
872 );
873 } else if body_kind == "text"
874 && matches!(header_lookup_value(&headers, "content-type"), VmValue::Nil)
875 {
876 headers.insert(
877 "content-type".to_string(),
878 vm_string("text/plain; charset=utf-8"),
879 );
880 }
881 response.insert("status".to_string(), VmValue::Int(status));
882 response.insert("headers".to_string(), dict_value(headers));
883 response.insert(
884 "ok".to_string(),
885 VmValue::Bool((200..300).contains(&status)),
886 );
887 response.insert("body_kind".to_string(), vm_string(body_kind));
888 match body {
889 VmValue::Bytes(bytes) => {
890 response.insert(
891 "body".to_string(),
892 vm_string(String::from_utf8_lossy(&bytes)),
893 );
894 response.insert("raw_body".to_string(), VmValue::Bytes(bytes));
895 }
896 other => {
897 response.insert("body".to_string(), vm_string(other.display()));
898 response.insert(
899 "raw_body".to_string(),
900 VmValue::Bytes(Rc::new(other.display().into_bytes())),
901 );
902 }
903 }
904 dict_value(response)
905}
906
907fn normalize_response(value: VmValue) -> VmValue {
908 match value {
909 VmValue::Dict(dict) if dict.contains_key("status") => {
910 let status = dict.get("status").and_then(VmValue::as_int).unwrap_or(200);
911 let headers = dict
912 .get("headers")
913 .and_then(VmValue::as_dict)
914 .cloned()
915 .unwrap_or_default();
916 let body_kind = dict
917 .get("body_kind")
918 .or_else(|| dict.get("kind"))
919 .map(|value| value.display())
920 .unwrap_or_else(|| "text".to_string());
921 let body = dict
922 .get("raw_body")
923 .filter(|value| matches!(value, VmValue::Bytes(_)))
924 .or_else(|| dict.get("body"))
925 .cloned()
926 .unwrap_or(VmValue::Nil);
927 response_with_kind(status, headers, body, &body_kind)
928 }
929 VmValue::Nil => response_with_kind(204, BTreeMap::new(), VmValue::Nil, "text"),
930 other => response_with_kind(200, BTreeMap::new(), other, "text"),
931 }
932}
933
934fn body_limit_response(limit: usize, actual: usize) -> VmValue {
935 let mut headers = BTreeMap::new();
936 headers.insert(
937 "content-type".to_string(),
938 vm_string("text/plain; charset=utf-8"),
939 );
940 headers.insert("connection".to_string(), vm_string("close"));
941 headers.insert(
942 "x-harn-body-limit".to_string(),
943 vm_string(limit.to_string()),
944 );
945 response_with_kind(
946 413,
947 headers,
948 vm_string(format!("request body too large: {actual} > {limit} bytes")),
949 "text",
950 )
951}
952
953fn not_found_response(method: &str, path: &str) -> VmValue {
954 response_with_kind(
955 404,
956 BTreeMap::new(),
957 vm_string(format!("no route for {method} {path}")),
958 "text",
959 )
960}
961
962fn unavailable_response(message: &str) -> VmValue {
963 response_with_kind(503, BTreeMap::new(), vm_string(message), "text")
964}
965
966fn route_template_match(template: &str, path: &str) -> Option<BTreeMap<String, VmValue>> {
967 let template_segments: Vec<&str> = template.trim_matches('/').split('/').collect();
968 let path_segments: Vec<&str> = path.trim_matches('/').split('/').collect();
969 if template == "/" && path == "/" {
970 return Some(BTreeMap::new());
971 }
972 if template_segments.len() != path_segments.len() {
973 return None;
974 }
975 let mut params = BTreeMap::new();
976 for (tmpl, actual) in template_segments.iter().zip(path_segments.iter()) {
977 if tmpl.starts_with('{') && tmpl.ends_with('}') && tmpl.len() > 2 {
978 params.insert(
979 tmpl[1..tmpl.len() - 1].to_string(),
980 vm_string(percent_decode(actual)),
981 );
982 } else if tmpl.starts_with(':') && tmpl.len() > 1 {
983 params.insert(tmpl[1..].to_string(), vm_string(percent_decode(actual)));
984 } else if tmpl != actual {
985 return None;
986 }
987 }
988 Some(params)
989}
990
991fn matching_route(
992 server: &HttpServer,
993 method: &str,
994 path: &str,
995) -> Option<(HttpServerRoute, BTreeMap<String, VmValue>)> {
996 server.routes.iter().find_map(|route| {
997 if route.method != "*" && !route.method.eq_ignore_ascii_case(method) {
998 return None;
999 }
1000 route_template_match(&route.template, path).map(|params| (route.clone(), params))
1001 })
1002}
1003
1004async fn call_server_closure(
1005 closure: &Rc<VmClosure>,
1006 args: &[VmValue],
1007 builtin: &str,
1008) -> Result<VmValue, VmError> {
1009 let mut vm = crate::vm::clone_async_builtin_child_vm()
1010 .ok_or_else(|| vm_error(format!("{builtin}: requires an async builtin VM context")))?;
1011 vm.call_closure_pub(closure, args).await
1012}
1013
1014fn value_is_response(value: &VmValue) -> bool {
1015 matches!(value, VmValue::Dict(dict) if dict.contains_key("status"))
1016}
1017
1018async fn run_http_server_request(server_id: &str, request: VmValue) -> Result<VmValue, VmError> {
1019 let server = HTTP_SERVERS.with(|servers| servers.borrow().get(server_id).cloned());
1020 let Some(server) = server else {
1021 return Err(vm_error(format!(
1022 "http_server_request: unknown server handle '{server_id}'"
1023 )));
1024 };
1025 if server.shutdown {
1026 return Ok(unavailable_response("server is shut down"));
1027 }
1028 if !server.ready {
1029 return Ok(unavailable_response("server is not ready"));
1030 }
1031 if let Some(readiness) = &server.readiness {
1032 let ready = call_server_closure(
1033 readiness,
1034 &[http_server_handle_value(server_id)],
1035 "http_server_request",
1036 )
1037 .await?;
1038 if !ready.is_truthy() {
1039 return Ok(unavailable_response("server is not ready"));
1040 }
1041 }
1042
1043 let input = request.as_dict().cloned().unwrap_or_default();
1044 let method = input
1045 .get("method")
1046 .map(|value| value.display())
1047 .filter(|value| !value.is_empty())
1048 .unwrap_or_else(|| "GET".to_string())
1049 .to_ascii_uppercase();
1050 let raw_path = input
1051 .get("path")
1052 .map(|value| value.display())
1053 .filter(|value| !value.is_empty())
1054 .unwrap_or_else(|| "/".to_string());
1055 let (path, query) = split_path_and_query(&raw_path);
1056 let body_bytes = request_body_bytes(&input);
1057
1058 let Some((route, path_params)) = matching_route(&server, &method, &path) else {
1059 return Ok(not_found_response(&method, &path));
1060 };
1061
1062 let limit = route.max_body_bytes.unwrap_or(server.max_body_bytes);
1063 if body_bytes.len() > limit {
1064 return Ok(body_limit_response(limit, body_bytes.len()));
1065 }
1066 let retain_raw_body = route.retain_raw_body.unwrap_or(server.retain_raw_body);
1067 let mut req = request_value(
1068 &method,
1069 &path,
1070 path_params,
1071 query,
1072 &input,
1073 &body_bytes,
1074 retain_raw_body,
1075 );
1076
1077 for before in &server.before {
1078 let result = call_server_closure(before, &[req.clone()], "http_server_request").await?;
1079 if value_is_response(&result) {
1080 return Ok(normalize_response(result));
1081 }
1082 if !matches!(result, VmValue::Nil) {
1083 req = result;
1084 }
1085 }
1086
1087 let handler_result =
1088 call_server_closure(&route.handler, &[req.clone()], "http_server_request").await?;
1089 let mut response = normalize_response(handler_result);
1090
1091 for after in &server.after {
1092 let result = call_server_closure(
1093 after,
1094 &[response.clone(), req.clone()],
1095 "http_server_request",
1096 )
1097 .await?;
1098 if !matches!(result, VmValue::Nil) {
1099 response = normalize_response(result);
1100 }
1101 }
1102
1103 Ok(response)
1104}
1105
1106pub fn register_http_builtins(vm: &mut Vm) {
1108 vm.register_builtin("http_server_tls_plain", |_args, _out| {
1109 Ok(http_server_tls_config_value(
1110 "plain",
1111 false,
1112 "http",
1113 false,
1114 BTreeMap::new(),
1115 ))
1116 });
1117 vm.register_builtin("http_server_tls_edge", |args, _out| {
1118 let options = get_options_arg(args, 0);
1119 Ok(http_server_tls_config_value(
1120 "edge",
1121 false,
1122 "https",
1123 vm_get_bool_option(&options, "hsts", true),
1124 hsts_options(&options),
1125 ))
1126 });
1127 vm.register_builtin("http_server_tls_pem", |args, _out| {
1128 if args.len() < 2 {
1129 return Err(vm_error(
1130 "http_server_tls_pem: requires cert path and key path",
1131 ));
1132 }
1133 let cert_path = args[0].display();
1134 let key_path = args[1].display();
1135 if !std::path::Path::new(&cert_path).is_file() {
1136 return Err(vm_error(format!(
1137 "http_server_tls_pem: certificate not found: {cert_path}"
1138 )));
1139 }
1140 if !std::path::Path::new(&key_path).is_file() {
1141 return Err(vm_error(format!(
1142 "http_server_tls_pem: private key not found: {key_path}"
1143 )));
1144 }
1145 let mut extra = BTreeMap::new();
1146 extra.insert(
1147 "cert_path".to_string(),
1148 VmValue::String(Rc::from(cert_path)),
1149 );
1150 extra.insert("key_path".to_string(), VmValue::String(Rc::from(key_path)));
1151 Ok(http_server_tls_config_value(
1152 "pem", true, "https", true, extra,
1153 ))
1154 });
1155 vm.register_builtin("http_server_tls_self_signed_dev", |args, _out| {
1156 let hosts = tls_hosts_arg(args.first())?;
1157 let cert = rcgen::generate_simple_self_signed(hosts.clone()).map_err(|error| {
1158 vm_error(format!(
1159 "http_server_tls_self_signed_dev: failed to generate certificate: {error}"
1160 ))
1161 })?;
1162 let mut extra = BTreeMap::new();
1163 extra.insert(
1164 "hosts".to_string(),
1165 VmValue::List(Rc::new(
1166 hosts
1167 .into_iter()
1168 .map(|host| VmValue::String(Rc::from(host)))
1169 .collect(),
1170 )),
1171 );
1172 extra.insert(
1173 "cert_pem".to_string(),
1174 VmValue::String(Rc::from(cert.cert.pem())),
1175 );
1176 extra.insert(
1177 "key_pem".to_string(),
1178 VmValue::String(Rc::from(cert.key_pair.serialize_pem())),
1179 );
1180 Ok(http_server_tls_config_value(
1181 "self_signed_dev",
1182 true,
1183 "https",
1184 false,
1185 extra,
1186 ))
1187 });
1188 vm.register_builtin("http_server_security_headers", |args, _out| {
1189 let Some(VmValue::Dict(config)) = args.first() else {
1190 return Err(vm_error(
1191 "http_server_security_headers: requires a TLS config dict",
1192 ));
1193 };
1194 Ok(VmValue::Dict(Rc::new(http_server_security_headers(config))))
1195 });
1196
1197 vm.register_async_builtin("http_get", |args| async move {
1198 http_verb_handler("GET", false, args).await
1199 });
1200 vm.register_async_builtin("http_post", |args| async move {
1201 http_verb_handler("POST", true, args).await
1202 });
1203 vm.register_async_builtin("http_put", |args| async move {
1204 http_verb_handler("PUT", true, args).await
1205 });
1206 vm.register_async_builtin("http_patch", |args| async move {
1207 http_verb_handler("PATCH", true, args).await
1208 });
1209 vm.register_async_builtin("http_delete", |args| async move {
1210 http_verb_handler("DELETE", false, args).await
1211 });
1212
1213 vm.register_builtin("http_server", |args, _out| {
1216 let options = get_options_arg(args, 0);
1217 let server = HttpServer {
1218 routes: Vec::new(),
1219 before: Vec::new(),
1220 after: Vec::new(),
1221 ready: get_bool_option(&options, "ready", true),
1222 readiness: None,
1223 shutdown_hooks: Vec::new(),
1224 shutdown: false,
1225 max_body_bytes: get_usize_option(
1226 &options,
1227 "max_body_bytes",
1228 DEFAULT_SERVER_MAX_BODY_BYTES,
1229 )?,
1230 retain_raw_body: get_bool_option(&options, "retain_raw_body", true),
1231 };
1232 let id = next_transport_handle("http-server");
1233 HTTP_SERVERS.with(|servers| {
1234 let mut servers = servers.borrow_mut();
1235 if servers.len() >= MAX_HTTP_SERVERS {
1236 return Err(vm_error(format!(
1237 "http_server: maximum open servers ({MAX_HTTP_SERVERS}) reached"
1238 )));
1239 }
1240 servers.insert(id.clone(), server);
1241 Ok(())
1242 })?;
1243 Ok(http_server_handle_value(&id))
1244 });
1245
1246 vm.register_builtin("http_server_route", |args, _out| {
1247 if args.len() < 4 {
1248 return Err(vm_error(
1249 "http_server_route: requires server, method, path template, and handler",
1250 ));
1251 }
1252 let server_id = server_from_value(&args[0], "http_server_route")?;
1253 let method = args[1].display().to_ascii_uppercase();
1254 if method.is_empty() {
1255 return Err(vm_error("http_server_route: method is required"));
1256 }
1257 let template = args[2].display();
1258 if !template.starts_with('/') {
1259 return Err(vm_error(
1260 "http_server_route: path template must start with '/'",
1261 ));
1262 }
1263 let handler = closure_arg(args, 3, "http_server_route")?;
1264 let options = get_options_arg(args, 4);
1265 let route = HttpServerRoute {
1266 method,
1267 template,
1268 handler,
1269 max_body_bytes: get_optional_usize_option(&options, "max_body_bytes")?,
1270 retain_raw_body: match options.get("retain_raw_body") {
1271 Some(VmValue::Bool(value)) => Some(*value),
1272 _ => None,
1273 },
1274 };
1275 HTTP_SERVERS.with(|servers| {
1276 let mut servers = servers.borrow_mut();
1277 let server = servers.get_mut(&server_id).ok_or_else(|| {
1278 vm_error(format!("http_server_route: unknown server '{server_id}'"))
1279 })?;
1280 server.routes.push(route);
1281 Ok::<_, VmError>(())
1282 })?;
1283 Ok(http_server_handle_value(&server_id))
1284 });
1285
1286 vm.register_builtin("http_server_before", |args, _out| {
1287 if args.len() < 2 {
1288 return Err(vm_error("http_server_before: requires server and handler"));
1289 }
1290 let server_id = server_from_value(&args[0], "http_server_before")?;
1291 let handler = closure_arg(args, 1, "http_server_before")?;
1292 HTTP_SERVERS.with(|servers| {
1293 let mut servers = servers.borrow_mut();
1294 let server = servers.get_mut(&server_id).ok_or_else(|| {
1295 vm_error(format!("http_server_before: unknown server '{server_id}'"))
1296 })?;
1297 server.before.push(handler);
1298 Ok::<_, VmError>(())
1299 })?;
1300 Ok(http_server_handle_value(&server_id))
1301 });
1302
1303 vm.register_builtin("http_server_after", |args, _out| {
1304 if args.len() < 2 {
1305 return Err(vm_error("http_server_after: requires server and handler"));
1306 }
1307 let server_id = server_from_value(&args[0], "http_server_after")?;
1308 let handler = closure_arg(args, 1, "http_server_after")?;
1309 HTTP_SERVERS.with(|servers| {
1310 let mut servers = servers.borrow_mut();
1311 let server = servers.get_mut(&server_id).ok_or_else(|| {
1312 vm_error(format!("http_server_after: unknown server '{server_id}'"))
1313 })?;
1314 server.after.push(handler);
1315 Ok::<_, VmError>(())
1316 })?;
1317 Ok(http_server_handle_value(&server_id))
1318 });
1319
1320 vm.register_async_builtin("http_server_request", |args| async move {
1321 if args.len() < 2 {
1322 return Err(vm_error("http_server_request: requires server and request"));
1323 }
1324 let server_id = server_from_value(&args[0], "http_server_request")?;
1325 run_http_server_request(&server_id, args[1].clone()).await
1326 });
1327
1328 vm.register_async_builtin("http_server_test", |args| async move {
1329 if args.len() < 2 {
1330 return Err(vm_error("http_server_test: requires server and request"));
1331 }
1332 let server_id = server_from_value(&args[0], "http_server_test")?;
1333 run_http_server_request(&server_id, args[1].clone()).await
1334 });
1335
1336 vm.register_builtin("http_server_set_ready", |args, _out| {
1337 if args.len() < 2 {
1338 return Err(vm_error(
1339 "http_server_set_ready: requires server and ready bool",
1340 ));
1341 }
1342 let server_id = server_from_value(&args[0], "http_server_set_ready")?;
1343 let ready = matches!(args[1], VmValue::Bool(true));
1344 HTTP_SERVERS.with(|servers| {
1345 let mut servers = servers.borrow_mut();
1346 let server = servers.get_mut(&server_id).ok_or_else(|| {
1347 vm_error(format!(
1348 "http_server_set_ready: unknown server '{server_id}'"
1349 ))
1350 })?;
1351 server.ready = ready;
1352 Ok::<_, VmError>(())
1353 })?;
1354 Ok(VmValue::Bool(ready))
1355 });
1356
1357 vm.register_builtin("http_server_readiness", |args, _out| {
1358 if args.len() < 2 {
1359 return Err(vm_error(
1360 "http_server_readiness: requires server and readiness closure",
1361 ));
1362 }
1363 let server_id = server_from_value(&args[0], "http_server_readiness")?;
1364 let handler = closure_arg(args, 1, "http_server_readiness")?;
1365 HTTP_SERVERS.with(|servers| {
1366 let mut servers = servers.borrow_mut();
1367 let server = servers.get_mut(&server_id).ok_or_else(|| {
1368 vm_error(format!(
1369 "http_server_readiness: unknown server '{server_id}'"
1370 ))
1371 })?;
1372 server.readiness = Some(handler);
1373 Ok::<_, VmError>(())
1374 })?;
1375 Ok(http_server_handle_value(&server_id))
1376 });
1377
1378 vm.register_async_builtin("http_server_ready", |args| async move {
1379 let Some(server_arg) = args.first() else {
1380 return Err(vm_error("http_server_ready: requires server"));
1381 };
1382 let server_id = server_from_value(server_arg, "http_server_ready")?;
1383 let server = HTTP_SERVERS.with(|servers| servers.borrow().get(&server_id).cloned());
1384 let Some(server) = server else {
1385 return Err(vm_error(format!(
1386 "http_server_ready: unknown server '{server_id}'"
1387 )));
1388 };
1389 if server.shutdown {
1390 return Ok(VmValue::Bool(false));
1391 }
1392 let Some(readiness) = server.readiness else {
1393 return Ok(VmValue::Bool(server.ready));
1394 };
1395 let result = call_server_closure(
1396 &readiness,
1397 &[http_server_handle_value(&server_id)],
1398 "http_server_ready",
1399 )
1400 .await?;
1401 Ok(VmValue::Bool(result.is_truthy()))
1402 });
1403
1404 vm.register_builtin("http_server_on_shutdown", |args, _out| {
1405 if args.len() < 2 {
1406 return Err(vm_error(
1407 "http_server_on_shutdown: requires server and handler",
1408 ));
1409 }
1410 let server_id = server_from_value(&args[0], "http_server_on_shutdown")?;
1411 let handler = closure_arg(args, 1, "http_server_on_shutdown")?;
1412 HTTP_SERVERS.with(|servers| {
1413 let mut servers = servers.borrow_mut();
1414 let server = servers.get_mut(&server_id).ok_or_else(|| {
1415 vm_error(format!(
1416 "http_server_on_shutdown: unknown server '{server_id}'"
1417 ))
1418 })?;
1419 server.shutdown_hooks.push(handler);
1420 Ok::<_, VmError>(())
1421 })?;
1422 Ok(http_server_handle_value(&server_id))
1423 });
1424
1425 vm.register_async_builtin("http_server_shutdown", |args| async move {
1426 let Some(server_arg) = args.first() else {
1427 return Err(vm_error("http_server_shutdown: requires server"));
1428 };
1429 let server_id = server_from_value(server_arg, "http_server_shutdown")?;
1430 let hooks = HTTP_SERVERS.with(|servers| {
1431 let mut servers = servers.borrow_mut();
1432 let server = servers.get_mut(&server_id).ok_or_else(|| {
1433 vm_error(format!(
1434 "http_server_shutdown: unknown server '{server_id}'"
1435 ))
1436 })?;
1437 server.shutdown = true;
1438 Ok::<_, VmError>(server.shutdown_hooks.clone())
1439 })?;
1440 for hook in hooks {
1441 let _ = call_server_closure(
1442 &hook,
1443 &[http_server_handle_value(&server_id)],
1444 "http_server_shutdown",
1445 )
1446 .await?;
1447 }
1448 Ok(VmValue::Bool(true))
1449 });
1450
1451 vm.register_builtin("http_response", |args, _out| {
1452 let status = args.first().and_then(VmValue::as_int).unwrap_or(200);
1453 let body = args.get(1).cloned().unwrap_or(VmValue::Nil);
1454 let headers = args
1455 .get(2)
1456 .and_then(VmValue::as_dict)
1457 .cloned()
1458 .unwrap_or_default();
1459 Ok(response_with_kind(status, headers, body, "text"))
1460 });
1461
1462 vm.register_builtin("http_response_text", |args, _out| {
1463 let body = args.first().cloned().unwrap_or(VmValue::Nil);
1464 let options = get_options_arg(args, 1);
1465 let status = options
1466 .get("status")
1467 .and_then(VmValue::as_int)
1468 .unwrap_or(200);
1469 let headers = options
1470 .get("headers")
1471 .and_then(VmValue::as_dict)
1472 .cloned()
1473 .unwrap_or_default();
1474 Ok(response_with_kind(status, headers, body, "text"))
1475 });
1476
1477 vm.register_builtin("http_response_json", |args, _out| {
1478 let body = args
1479 .first()
1480 .map(crate::stdlib::json::vm_value_to_json)
1481 .map(vm_string)
1482 .unwrap_or_else(|| vm_string("null"));
1483 let options = get_options_arg(args, 1);
1484 let status = options
1485 .get("status")
1486 .and_then(VmValue::as_int)
1487 .unwrap_or(200);
1488 let headers = options
1489 .get("headers")
1490 .and_then(VmValue::as_dict)
1491 .cloned()
1492 .unwrap_or_default();
1493 Ok(response_with_kind(status, headers, body, "json"))
1494 });
1495
1496 vm.register_builtin("http_response_bytes", |args, _out| {
1497 let body = match args.first() {
1498 Some(VmValue::Bytes(bytes)) => VmValue::Bytes(bytes.clone()),
1499 Some(value) => VmValue::Bytes(Rc::new(value.display().into_bytes())),
1500 None => VmValue::Bytes(Rc::new(Vec::new())),
1501 };
1502 let options = get_options_arg(args, 1);
1503 let status = options
1504 .get("status")
1505 .and_then(VmValue::as_int)
1506 .unwrap_or(200);
1507 let headers = options
1508 .get("headers")
1509 .and_then(VmValue::as_dict)
1510 .cloned()
1511 .unwrap_or_default();
1512 Ok(response_with_kind(status, headers, body, "bytes"))
1513 });
1514
1515 vm.register_builtin("http_header", |args, _out| {
1516 if args.len() < 2 {
1517 return Err(vm_error(
1518 "http_header: requires headers/request/response and name",
1519 ));
1520 }
1521 let headers = headers_from_value(&args[0]);
1522 Ok(header_lookup_value(&headers, &args[1].display()))
1523 });
1524
1525 vm.register_builtin("http_mock", |args, _out| {
1533 let method = args.first().map(|a| a.display()).unwrap_or_default();
1534 let url_pattern = args.get(1).map(|a| a.display()).unwrap_or_default();
1535 let response = args
1536 .get(2)
1537 .and_then(|a| a.as_dict())
1538 .cloned()
1539 .unwrap_or_default();
1540 let responses = parse_mock_responses(&response);
1541
1542 register_http_mock(method, url_pattern, responses);
1543 Ok(VmValue::Nil)
1544 });
1545
1546 vm.register_builtin("http_mock_clear", |_args, _out| {
1548 clear_http_mocks();
1549 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1550 Ok(VmValue::Nil)
1551 });
1552
1553 vm.register_builtin("http_mock_calls", |args, _out| {
1555 let options = get_options_arg(args, 0);
1556 let include_sensitive = get_bool_option(&options, "include_sensitive", false)
1557 || get_bool_option(&options, "include_sensitive_headers", false);
1558 let redact_sensitive = get_bool_option(
1559 &options,
1560 "redact_sensitive",
1561 get_bool_option(&options, "redact_headers", true),
1562 ) && !include_sensitive;
1563 Ok(VmValue::List(Rc::new(http_mock_calls_value(
1564 redact_sensitive,
1565 ))))
1566 });
1567
1568 vm.register_async_builtin("http_request", |args| async move {
1569 let method = args
1570 .first()
1571 .map(|a| a.display())
1572 .unwrap_or_default()
1573 .to_uppercase();
1574 if method.is_empty() {
1575 return Err(VmError::Thrown(VmValue::String(Rc::from(
1576 "http_request: method is required",
1577 ))));
1578 }
1579 let url = args.get(1).map(|a| a.display()).unwrap_or_default();
1580 if url.is_empty() {
1581 return Err(VmError::Thrown(VmValue::String(Rc::from(
1582 "http_request: URL is required",
1583 ))));
1584 }
1585 let options = match args.get(2) {
1586 Some(VmValue::Dict(d)) => (**d).clone(),
1587 _ => BTreeMap::new(),
1588 };
1589 vm_execute_http_request(&method, &url, &options).await
1590 });
1591
1592 vm.register_async_builtin("http_download", |args| async move {
1593 let url = args.first().map(|a| a.display()).unwrap_or_default();
1594 if url.is_empty() {
1595 return Err(vm_error("http_download: URL is required"));
1596 }
1597 let dst_path = args.get(1).map(|a| a.display()).unwrap_or_default();
1598 if dst_path.is_empty() {
1599 return Err(vm_error("http_download: destination path is required"));
1600 }
1601 let options = get_options_arg(&args, 2);
1602 vm_http_download(&url, &dst_path, &options).await
1603 });
1604
1605 vm.register_async_builtin("http_stream_open", |args| async move {
1606 let url = args.first().map(|a| a.display()).unwrap_or_default();
1607 if url.is_empty() {
1608 return Err(vm_error("http_stream_open: URL is required"));
1609 }
1610 let options = get_options_arg(&args, 1);
1611 vm_http_stream_open(&url, &options).await
1612 });
1613
1614 vm.register_async_builtin("http_stream_read", |args| async move {
1615 let Some(handle) = args.first() else {
1616 return Err(vm_error("http_stream_read: requires a stream handle"));
1617 };
1618 let stream_id = handle_from_value(handle, "http_stream_read")?;
1619 let max_bytes = args
1620 .get(1)
1621 .and_then(|value| value.as_int())
1622 .map(|value| value.max(1) as usize)
1623 .unwrap_or(DEFAULT_MAX_MESSAGE_BYTES);
1624 vm_http_stream_read(&stream_id, max_bytes).await
1625 });
1626
1627 vm.register_builtin("http_stream_info", |args, _out| {
1628 let Some(handle) = args.first() else {
1629 return Err(vm_error("http_stream_info: requires a stream handle"));
1630 };
1631 let stream_id = handle_from_value(handle, "http_stream_info")?;
1632 vm_http_stream_info(&stream_id)
1633 });
1634
1635 vm.register_builtin("http_stream_close", |args, _out| {
1636 let Some(handle) = args.first() else {
1637 return Err(vm_error("http_stream_close: requires a stream handle"));
1638 };
1639 let stream_id = handle_from_value(handle, "http_stream_close")?;
1640 let removed = HTTP_STREAMS.with(|streams| streams.borrow_mut().remove(&stream_id));
1641 Ok(VmValue::Bool(removed.is_some()))
1642 });
1643
1644 vm.register_builtin("http_session", |args, _out| {
1645 let options = get_options_arg(args, 0);
1646 let config = parse_http_options(&options);
1647 let client = build_http_client(&config)?;
1648 let id = next_transport_handle("http-session");
1649 HTTP_SESSIONS.with(|sessions| {
1650 let mut sessions = sessions.borrow_mut();
1651 if sessions.len() >= MAX_HTTP_SESSIONS {
1652 return Err(vm_error(format!(
1653 "http_session: maximum open sessions ({MAX_HTTP_SESSIONS}) reached"
1654 )));
1655 }
1656 sessions.insert(id.clone(), HttpSession { client, options });
1657 Ok(())
1658 })?;
1659 Ok(VmValue::String(Rc::from(id)))
1660 });
1661
1662 vm.register_async_builtin("http_session_request", |args| async move {
1663 if args.len() < 3 {
1664 return Err(vm_error(
1665 "http_session_request: requires session, method, and URL",
1666 ));
1667 }
1668 let session_id = handle_from_value(&args[0], "http_session_request")?;
1669 let method = args[1].display().to_uppercase();
1670 if method.is_empty() {
1671 return Err(vm_error("http_session_request: method is required"));
1672 }
1673 let url = args[2].display();
1674 if url.is_empty() {
1675 return Err(vm_error("http_session_request: URL is required"));
1676 }
1677 let options = get_options_arg(&args, 3);
1678 vm_execute_http_session_request(&session_id, &method, &url, &options).await
1679 });
1680
1681 vm.register_builtin("http_session_close", |args, _out| {
1682 let Some(handle) = args.first() else {
1683 return Err(vm_error("http_session_close: requires a session handle"));
1684 };
1685 let session_id = handle_from_value(handle, "http_session_close")?;
1686 let removed = HTTP_SESSIONS.with(|sessions| sessions.borrow_mut().remove(&session_id));
1687 Ok(VmValue::Bool(removed.is_some()))
1688 });
1689
1690 vm.register_builtin("sse_mock", |args, _out| {
1691 let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1692 if url_pattern.is_empty() {
1693 return Err(vm_error("sse_mock: URL pattern is required"));
1694 }
1695 let events = parse_mock_stream_events(args.get(1));
1696 SSE_MOCKS.with(|mocks| {
1697 mocks.borrow_mut().push(SseMock {
1698 url_pattern,
1699 events,
1700 });
1701 });
1702 Ok(VmValue::Nil)
1703 });
1704
1705 vm.register_async_builtin("sse_connect", |args| async move {
1706 let method = args
1707 .first()
1708 .map(|arg| arg.display())
1709 .filter(|method| !method.is_empty())
1710 .unwrap_or_else(|| "GET".to_string())
1711 .to_uppercase();
1712 let url = args.get(1).map(|arg| arg.display()).unwrap_or_default();
1713 if url.is_empty() {
1714 return Err(vm_error("sse_connect: URL is required"));
1715 }
1716 let options = get_options_arg(&args, 2);
1717 vm_sse_connect(&method, &url, &options).await
1718 });
1719
1720 vm.register_async_builtin("sse_receive", |args| async move {
1721 let Some(handle) = args.first() else {
1722 return Err(vm_error("sse_receive: requires a stream handle"));
1723 };
1724 let stream_id = handle_from_value(handle, "sse_receive")?;
1725 let timeout_ms = receive_timeout_arg(&args, 1);
1726 vm_sse_receive(&stream_id, timeout_ms).await
1727 });
1728
1729 vm.register_builtin("sse_event", |args, _out| {
1730 let Some(event) = args.first() else {
1731 return Err(vm_error("sse_event: requires event data or an event dict"));
1732 };
1733 let options = get_options_arg(args, 1);
1734 Ok(VmValue::String(Rc::from(vm_sse_event_frame(
1735 event, &options,
1736 )?)))
1737 });
1738
1739 vm.register_builtin("sse_server_response", |args, _out| {
1740 let options = get_options_arg(args, 0);
1741 vm_sse_server_response(&options)
1742 });
1743
1744 vm.register_builtin("sse_server_send", |args, _out| {
1745 if args.len() < 2 {
1746 return Err(vm_error("sse_server_send: requires stream and event"));
1747 }
1748 let stream_id = handle_from_value(&args[0], "sse_server_send")?;
1749 let options = get_options_arg(args, 2);
1750 vm_sse_server_send(&stream_id, &args[1], &options)
1751 });
1752
1753 vm.register_builtin("sse_server_heartbeat", |args, _out| {
1754 let Some(handle) = args.first() else {
1755 return Err(vm_error("sse_server_heartbeat: requires a stream handle"));
1756 };
1757 let stream_id = handle_from_value(handle, "sse_server_heartbeat")?;
1758 vm_sse_server_heartbeat(&stream_id, args.get(1))
1759 });
1760
1761 vm.register_builtin("sse_server_flush", |args, _out| {
1762 let Some(handle) = args.first() else {
1763 return Err(vm_error("sse_server_flush: requires a stream handle"));
1764 };
1765 let stream_id = handle_from_value(handle, "sse_server_flush")?;
1766 vm_sse_server_flush(&stream_id)
1767 });
1768
1769 vm.register_builtin("sse_server_close", |args, _out| {
1770 let Some(handle) = args.first() else {
1771 return Err(vm_error("sse_server_close: requires a stream handle"));
1772 };
1773 let stream_id = handle_from_value(handle, "sse_server_close")?;
1774 vm_sse_server_close(&stream_id)
1775 });
1776
1777 vm.register_builtin("sse_server_cancel", |args, _out| {
1778 let Some(handle) = args.first() else {
1779 return Err(vm_error("sse_server_cancel: requires a stream handle"));
1780 };
1781 let stream_id = handle_from_value(handle, "sse_server_cancel")?;
1782 vm_sse_server_cancel(&stream_id, args.get(1))
1783 });
1784
1785 vm.register_builtin("sse_server_status", |args, _out| {
1786 let Some(handle) = args.first() else {
1787 return Err(vm_error("sse_server_status: requires a stream handle"));
1788 };
1789 let stream_id = handle_from_value(handle, "sse_server_status")?;
1790 vm_sse_server_status(&stream_id)
1791 });
1792
1793 vm.register_builtin("sse_server_disconnected", |args, _out| {
1794 let Some(handle) = args.first() else {
1795 return Err(vm_error(
1796 "sse_server_disconnected: requires a stream handle",
1797 ));
1798 };
1799 let stream_id = handle_from_value(handle, "sse_server_disconnected")?;
1800 vm_sse_server_observed_bool(&stream_id, "sse_server_disconnected", |handle| {
1801 handle.disconnected
1802 })
1803 });
1804
1805 vm.register_builtin("sse_server_cancelled", |args, _out| {
1806 let Some(handle) = args.first() else {
1807 return Err(vm_error("sse_server_cancelled: requires a stream handle"));
1808 };
1809 let stream_id = handle_from_value(handle, "sse_server_cancelled")?;
1810 vm_sse_server_observed_bool(&stream_id, "sse_server_cancelled", |handle| {
1811 handle.cancelled
1812 })
1813 });
1814
1815 vm.register_builtin("sse_server_mock_receive", |args, _out| {
1816 let Some(handle) = args.first() else {
1817 return Err(vm_error(
1818 "sse_server_mock_receive: requires a stream handle",
1819 ));
1820 };
1821 let stream_id = handle_from_value(handle, "sse_server_mock_receive")?;
1822 vm_sse_server_mock_receive(&stream_id)
1823 });
1824
1825 vm.register_builtin("sse_server_mock_disconnect", |args, _out| {
1826 let Some(handle) = args.first() else {
1827 return Err(vm_error(
1828 "sse_server_mock_disconnect: requires a stream handle",
1829 ));
1830 };
1831 let stream_id = handle_from_value(handle, "sse_server_mock_disconnect")?;
1832 vm_sse_server_mock_disconnect(&stream_id)
1833 });
1834
1835 vm.register_builtin("sse_close", |args, _out| {
1836 let Some(handle) = args.first() else {
1837 return Err(vm_error("sse_close: requires a stream handle"));
1838 };
1839 let stream_id = handle_from_value(handle, "sse_close")?;
1840 let removed = SSE_HANDLES.with(|handles| {
1841 let mut handles = handles.borrow_mut();
1842 let removed = handles.remove(&stream_id);
1843 if let Some(handle) = &removed {
1844 if let SseHandleKind::Real(stream) = &handle.kind {
1845 if let Ok(mut stream) = stream.try_lock() {
1846 stream.close();
1847 }
1848 }
1849 }
1850 removed
1851 });
1852 Ok(VmValue::Bool(removed.is_some()))
1853 });
1854
1855 vm.register_builtin("websocket_mock", |args, _out| {
1856 let url_pattern = args.first().map(|arg| arg.display()).unwrap_or_default();
1857 if url_pattern.is_empty() {
1858 return Err(vm_error("websocket_mock: URL pattern is required"));
1859 }
1860 let (messages, echo) = parse_websocket_mock(args.get(1));
1861 WEBSOCKET_MOCKS.with(|mocks| {
1862 mocks.borrow_mut().push(WebSocketMock {
1863 url_pattern,
1864 messages,
1865 echo,
1866 });
1867 });
1868 Ok(VmValue::Nil)
1869 });
1870
1871 vm.register_async_builtin("websocket_connect", |args| async move {
1872 let url = args.first().map(|arg| arg.display()).unwrap_or_default();
1873 if url.is_empty() {
1874 return Err(vm_error("websocket_connect: URL is required"));
1875 }
1876 let options = get_options_arg(&args, 1);
1877 vm_websocket_connect(&url, &options).await
1878 });
1879
1880 vm.register_builtin("websocket_server", |args, _out| {
1881 let bind = args
1882 .first()
1883 .map(|arg| arg.display())
1884 .filter(|bind| !bind.is_empty())
1885 .unwrap_or_else(|| "127.0.0.1:0".to_string());
1886 let options = get_options_arg(args, 1);
1887 vm_websocket_server(&bind, &options)
1888 });
1889
1890 vm.register_builtin("websocket_route", |args, _out| {
1891 if args.len() < 2 {
1892 return Err(vm_error(
1893 "websocket_route: requires server handle and route path",
1894 ));
1895 }
1896 let server_id = handle_from_value(&args[0], "websocket_route")?;
1897 let path = args[1].display();
1898 if path.is_empty() || !path.starts_with('/') {
1899 return Err(vm_error("websocket_route: path must start with '/'"));
1900 }
1901 let options = get_options_arg(args, 2);
1902 vm_websocket_route(&server_id, &path, &options)
1903 });
1904
1905 vm.register_async_builtin("websocket_accept", |args| async move {
1906 let Some(handle) = args.first() else {
1907 return Err(vm_error("websocket_accept: requires a server handle"));
1908 };
1909 let server_id = handle_from_value(handle, "websocket_accept")?;
1910 let timeout_ms = receive_timeout_arg(&args, 1);
1911 vm_websocket_accept(&server_id, timeout_ms).await
1912 });
1913
1914 vm.register_async_builtin("websocket_send", |args| async move {
1915 if args.len() < 2 {
1916 return Err(vm_error(
1917 "websocket_send: requires socket handle and message",
1918 ));
1919 }
1920 let socket_id = handle_from_value(&args[0], "websocket_send")?;
1921 let message = args[1].clone();
1922 let options = get_options_arg(&args, 2);
1923 vm_websocket_send(&socket_id, message, &options).await
1924 });
1925
1926 vm.register_async_builtin("websocket_receive", |args| async move {
1927 let Some(handle) = args.first() else {
1928 return Err(vm_error("websocket_receive: requires a socket handle"));
1929 };
1930 let socket_id = handle_from_value(handle, "websocket_receive")?;
1931 let timeout_ms = receive_timeout_arg(&args, 1);
1932 vm_websocket_receive(&socket_id, timeout_ms).await
1933 });
1934
1935 vm.register_async_builtin("websocket_close", |args| async move {
1936 let Some(handle) = args.first() else {
1937 return Err(vm_error("websocket_close: requires a socket handle"));
1938 };
1939 let socket_id = handle_from_value(handle, "websocket_close")?;
1940 vm_websocket_close(&socket_id).await
1941 });
1942
1943 vm.register_builtin("websocket_server_close", |args, _out| {
1944 let Some(handle) = args.first() else {
1945 return Err(vm_error("websocket_server_close: requires a server handle"));
1946 };
1947 let server_id = handle_from_value(handle, "websocket_server_close")?;
1948 vm_websocket_server_close(&server_id)
1949 });
1950
1951 vm.register_builtin("transport_mock_clear", |_args, _out| {
1952 HTTP_STREAMS.with(|streams| streams.borrow_mut().clear());
1953 SSE_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1954 SSE_HANDLES.with(|handles| handles.borrow_mut().clear());
1955 WEBSOCKET_MOCKS.with(|mocks| mocks.borrow_mut().clear());
1956 WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().clear());
1957 WEBSOCKET_SERVERS.with(|servers| {
1958 let mut servers = servers.borrow_mut();
1959 for server in servers.values() {
1960 server.running.store(false, Ordering::SeqCst);
1961 let _ = TcpStream::connect(&server.addr);
1962 }
1963 servers.clear();
1964 });
1965 TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow_mut().clear());
1966 Ok(VmValue::Nil)
1967 });
1968
1969 vm.register_builtin("transport_mock_calls", |_args, _out| {
1970 let calls = TRANSPORT_MOCK_CALLS.with(|calls| calls.borrow().clone());
1971 let values = calls
1972 .iter()
1973 .map(transport_mock_call_value)
1974 .collect::<Vec<_>>();
1975 Ok(VmValue::List(Rc::new(values)))
1976 });
1977}
1978
1979fn http_server_tls_config_value(
1980 mode: &str,
1981 terminate_tls: bool,
1982 scheme: &str,
1983 hsts: bool,
1984 extra: BTreeMap<String, VmValue>,
1985) -> VmValue {
1986 let mut dict = BTreeMap::new();
1987 dict.insert("mode".to_string(), VmValue::String(Rc::from(mode)));
1988 dict.insert("terminate_tls".to_string(), VmValue::Bool(terminate_tls));
1989 dict.insert("scheme".to_string(), VmValue::String(Rc::from(scheme)));
1990 dict.insert("hsts".to_string(), VmValue::Bool(hsts));
1991 for (key, value) in extra {
1992 dict.insert(key, value);
1993 }
1994 VmValue::Dict(Rc::new(dict))
1995}
1996
1997fn hsts_options(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
1998 let mut hsts = BTreeMap::new();
1999 hsts.insert(
2000 "hsts_max_age_seconds".to_string(),
2001 VmValue::Int(vm_get_int_option(
2002 options,
2003 "hsts_max_age_seconds",
2004 31_536_000,
2005 )),
2006 );
2007 hsts.insert(
2008 "hsts_include_subdomains".to_string(),
2009 VmValue::Bool(vm_get_bool_option(
2010 options,
2011 "hsts_include_subdomains",
2012 false,
2013 )),
2014 );
2015 hsts.insert(
2016 "hsts_preload".to_string(),
2017 VmValue::Bool(vm_get_bool_option(options, "hsts_preload", false)),
2018 );
2019 hsts
2020}
2021
2022fn http_server_security_headers(config: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2023 let hsts_enabled = vm_get_bool_option(config, "hsts", false);
2024 if !hsts_enabled {
2025 return BTreeMap::new();
2026 }
2027 let mut value = format!(
2028 "max-age={}",
2029 vm_get_int_option(config, "hsts_max_age_seconds", 31_536_000).max(0)
2030 );
2031 if vm_get_bool_option(config, "hsts_include_subdomains", false) {
2032 value.push_str("; includeSubDomains");
2033 }
2034 if vm_get_bool_option(config, "hsts_preload", false) {
2035 value.push_str("; preload");
2036 }
2037 BTreeMap::from([(
2038 "strict-transport-security".to_string(),
2039 VmValue::String(Rc::from(value)),
2040 )])
2041}
2042
2043fn tls_hosts_arg(value: Option<&VmValue>) -> Result<Vec<String>, VmError> {
2044 match value {
2045 None | Some(VmValue::Nil) => Ok(vec!["localhost".to_string(), "127.0.0.1".to_string()]),
2046 Some(VmValue::List(hosts)) => {
2047 let mut parsed = Vec::new();
2048 for host in hosts.iter() {
2049 let host = host.display();
2050 if host.is_empty() {
2051 return Err(vm_error(
2052 "http_server_tls_self_signed_dev: host names must be non-empty",
2053 ));
2054 }
2055 parsed.push(host);
2056 }
2057 if parsed.is_empty() {
2058 return Err(vm_error(
2059 "http_server_tls_self_signed_dev: host list must not be empty",
2060 ));
2061 }
2062 Ok(parsed)
2063 }
2064 Some(other) => {
2065 let host = other.display();
2066 if host.is_empty() {
2067 return Err(vm_error(
2068 "http_server_tls_self_signed_dev: host name must be non-empty",
2069 ));
2070 }
2071 Ok(vec![host])
2072 }
2073 }
2074}
2075
2076fn vm_get_int_option(options: &BTreeMap<String, VmValue>, key: &str, default: i64) -> i64 {
2077 options.get(key).and_then(|v| v.as_int()).unwrap_or(default)
2078}
2079
2080fn vm_get_bool_option(options: &BTreeMap<String, VmValue>, key: &str, default: bool) -> bool {
2081 match options.get(key) {
2082 Some(VmValue::Bool(b)) => *b,
2083 _ => default,
2084 }
2085}
2086
2087fn vm_get_int_option_prefer(
2088 options: &BTreeMap<String, VmValue>,
2089 canonical: &str,
2090 alias: &str,
2091 default: i64,
2092) -> i64 {
2093 options
2094 .get(canonical)
2095 .and_then(|value| value.as_int())
2096 .or_else(|| options.get(alias).and_then(|value| value.as_int()))
2097 .unwrap_or(default)
2098}
2099
2100fn vm_get_optional_int_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<u64> {
2101 options
2102 .get(key)
2103 .and_then(|value| value.as_int())
2104 .map(|value| value.max(0) as u64)
2105}
2106
2107fn string_option(options: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
2108 options
2109 .get(key)
2110 .map(|value| value.display())
2111 .filter(|value| !value.is_empty())
2112}
2113
2114fn parse_proxy_config(options: &BTreeMap<String, VmValue>) -> Option<HttpProxyConfig> {
2115 let proxy = options.get("proxy")?;
2116 let (url, no_proxy) = match proxy {
2117 VmValue::Dict(dict) => (
2118 dict.get("url")
2119 .map(|value| value.display())
2120 .filter(|value| !value.is_empty())?,
2121 dict.get("no_proxy")
2122 .map(|value| value.display())
2123 .filter(|value| !value.is_empty()),
2124 ),
2125 other => (other.display(), None),
2126 };
2127 if url.is_empty() {
2128 return None;
2129 }
2130 let auth = options
2131 .get("proxy_auth")
2132 .and_then(|value| value.as_dict())
2133 .map(|dict| {
2134 (
2135 dict.get("user")
2136 .map(|value| value.display())
2137 .unwrap_or_default(),
2138 dict.get("pass")
2139 .or_else(|| dict.get("password"))
2140 .map(|value| value.display())
2141 .unwrap_or_default(),
2142 )
2143 })
2144 .filter(|(user, _)| !user.is_empty());
2145 Some(HttpProxyConfig {
2146 url,
2147 auth,
2148 no_proxy,
2149 })
2150}
2151
2152fn parse_tls_config(options: &BTreeMap<String, VmValue>) -> HttpTlsConfig {
2153 let Some(tls) = options.get("tls").and_then(|value| value.as_dict()) else {
2154 return HttpTlsConfig::default();
2155 };
2156 let pinned_sha256 = match tls.get("pinned_sha256") {
2157 Some(VmValue::List(values)) => values
2158 .iter()
2159 .map(|value| value.display())
2160 .filter(|value| !value.is_empty())
2161 .collect(),
2162 Some(value) => {
2163 let value = value.display();
2164 if value.is_empty() {
2165 Vec::new()
2166 } else {
2167 vec![value]
2168 }
2169 }
2170 None => Vec::new(),
2171 };
2172 HttpTlsConfig {
2173 ca_bundle_path: string_option(tls, "ca_bundle_path"),
2174 client_cert_path: string_option(tls, "client_cert_path"),
2175 client_key_path: string_option(tls, "client_key_path"),
2176 client_identity_path: string_option(tls, "client_identity_path"),
2177 pinned_sha256,
2178 }
2179}
2180
2181fn parse_retry_statuses(options: &BTreeMap<String, VmValue>) -> Vec<u16> {
2182 match options.get("retry_on") {
2183 Some(VmValue::List(values)) => {
2184 let statuses: Vec<u16> = values
2185 .iter()
2186 .filter_map(|value| value.as_int())
2187 .filter(|status| (0..=u16::MAX as i64).contains(status))
2188 .map(|status| status as u16)
2189 .collect();
2190 if statuses.is_empty() {
2191 DEFAULT_RETRYABLE_STATUSES.to_vec()
2192 } else {
2193 statuses
2194 }
2195 }
2196 _ => DEFAULT_RETRYABLE_STATUSES.to_vec(),
2197 }
2198}
2199
2200fn parse_retry_methods(options: &BTreeMap<String, VmValue>) -> Vec<String> {
2201 match options.get("retry_methods") {
2202 Some(VmValue::List(values)) => {
2203 let methods: Vec<String> = values
2204 .iter()
2205 .map(|value| value.display().trim().to_ascii_uppercase())
2206 .filter(|value| !value.is_empty())
2207 .collect();
2208 if methods.is_empty() {
2209 DEFAULT_RETRYABLE_METHODS
2210 .iter()
2211 .map(|method| (*method).to_string())
2212 .collect()
2213 } else {
2214 methods
2215 }
2216 }
2217 _ => DEFAULT_RETRYABLE_METHODS
2218 .iter()
2219 .map(|method| (*method).to_string())
2220 .collect(),
2221 }
2222}
2223
2224fn parse_http_options(options: &BTreeMap<String, VmValue>) -> HttpRequestConfig {
2225 let total_timeout_ms = vm_get_int_option(options, "total_timeout_ms", -1);
2226 let total_timeout_ms = if total_timeout_ms >= 0 {
2227 total_timeout_ms as u64
2228 } else {
2229 vm_get_int_option_prefer(options, "timeout_ms", "timeout", DEFAULT_TIMEOUT_MS as i64).max(0)
2230 as u64
2231 };
2232 let retry_options = options.get("retry").and_then(|value| value.as_dict());
2233 let retry_max = retry_options
2234 .and_then(|retry| retry.get("max"))
2235 .and_then(|value| value.as_int())
2236 .unwrap_or_else(|| vm_get_int_option(options, "retries", 0))
2237 .max(0) as u32;
2238 let retry_backoff_ms = retry_options
2239 .and_then(|retry| retry.get("backoff_ms"))
2240 .and_then(|value| value.as_int())
2241 .unwrap_or_else(|| vm_get_int_option(options, "backoff", DEFAULT_BACKOFF_MS as i64))
2242 .max(0) as u64;
2243 let respect_retry_after = vm_get_bool_option(options, "respect_retry_after", true);
2244 let follow_redirects = vm_get_bool_option(options, "follow_redirects", true);
2245 let max_redirects = vm_get_int_option(options, "max_redirects", 10).max(0) as usize;
2246
2247 HttpRequestConfig {
2248 total_timeout_ms,
2249 connect_timeout_ms: vm_get_optional_int_option(options, "connect_timeout_ms"),
2250 read_timeout_ms: vm_get_optional_int_option(options, "read_timeout_ms"),
2251 retry: RetryConfig {
2252 max: retry_max,
2253 backoff_ms: retry_backoff_ms,
2254 retryable_statuses: parse_retry_statuses(options),
2255 retryable_methods: parse_retry_methods(options),
2256 respect_retry_after,
2257 },
2258 follow_redirects,
2259 max_redirects,
2260 proxy: parse_proxy_config(options),
2261 tls: parse_tls_config(options),
2262 decompress: vm_get_bool_option(options, "decompress", true),
2263 }
2264}
2265
2266fn http_client_key(config: &HttpRequestConfig) -> String {
2267 format!(
2268 "follow_redirects={};max_redirects={};connect_timeout={:?};read_timeout={:?};proxy={};proxy_auth={};no_proxy={};ca={};client_cert={};client_key={};identity={};pins={};decompress={}",
2269 config.follow_redirects,
2270 config.max_redirects,
2271 config.connect_timeout_ms,
2272 config.read_timeout_ms,
2273 config
2274 .proxy
2275 .as_ref()
2276 .map(|proxy| proxy.url.as_str())
2277 .unwrap_or(""),
2278 config
2279 .proxy
2280 .as_ref()
2281 .and_then(|proxy| proxy.auth.as_ref())
2282 .map(|(user, _)| user.as_str())
2283 .unwrap_or(""),
2284 config
2285 .proxy
2286 .as_ref()
2287 .and_then(|proxy| proxy.no_proxy.as_deref())
2288 .unwrap_or(""),
2289 config.tls.ca_bundle_path.as_deref().unwrap_or(""),
2290 config.tls.client_cert_path.as_deref().unwrap_or(""),
2291 config.tls.client_key_path.as_deref().unwrap_or(""),
2292 config.tls.client_identity_path.as_deref().unwrap_or(""),
2293 config.tls.pinned_sha256.join(","),
2294 config.decompress,
2295 )
2296}
2297
2298fn build_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2299 let redirect_policy = if config.follow_redirects {
2300 let max_redirects = config.max_redirects;
2301 reqwest::redirect::Policy::custom(move |attempt| {
2302 if attempt.previous().len() >= max_redirects {
2303 attempt.error("too many redirects")
2304 } else if crate::egress::redirect_url_allowed("http_redirect", attempt.url().as_str()) {
2305 attempt.follow()
2306 } else {
2307 attempt.error("egress policy blocked redirect target")
2308 }
2309 })
2310 } else {
2311 reqwest::redirect::Policy::none()
2312 };
2313
2314 let mut builder = reqwest::Client::builder().redirect(redirect_policy);
2315 if let Some(ms) = config.connect_timeout_ms {
2316 builder = builder.connect_timeout(Duration::from_millis(ms));
2317 }
2318 if let Some(ms) = config.read_timeout_ms {
2319 builder = builder.read_timeout(Duration::from_millis(ms));
2320 }
2321 if !config.decompress {
2322 builder = builder.no_gzip().no_brotli().no_deflate().no_zstd();
2323 }
2324 if let Some(proxy_config) = &config.proxy {
2325 let mut proxy = reqwest::Proxy::all(&proxy_config.url)
2326 .map_err(|e| vm_error(format!("http: invalid proxy '{}': {e}", proxy_config.url)))?;
2327 if let Some((user, pass)) = &proxy_config.auth {
2328 proxy = proxy.basic_auth(user, pass);
2329 }
2330 if let Some(no_proxy) = &proxy_config.no_proxy {
2331 proxy = proxy.no_proxy(reqwest::NoProxy::from_string(no_proxy));
2332 }
2333 builder = builder.proxy(proxy);
2334 }
2335 builder = configure_tls(builder, &config.tls)?;
2336 builder
2337 .build()
2338 .map_err(|e| vm_error(format!("http: failed to build client: {e}")))
2339}
2340
2341fn configure_tls(
2342 mut builder: reqwest::ClientBuilder,
2343 tls: &HttpTlsConfig,
2344) -> Result<reqwest::ClientBuilder, VmError> {
2345 if let Some(path) = &tls.ca_bundle_path {
2346 let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2347 let bytes = std::fs::read(&resolved).map_err(|error| {
2348 vm_error(format!(
2349 "http: failed to read CA bundle {}: {error}",
2350 resolved.display()
2351 ))
2352 })?;
2353 match reqwest::Certificate::from_pem_bundle(&bytes) {
2354 Ok(certs) => {
2355 for cert in certs {
2356 builder = builder.add_root_certificate(cert);
2357 }
2358 }
2359 Err(pem_error) => {
2360 let cert = reqwest::Certificate::from_der(&bytes).map_err(|der_error| {
2361 vm_error(format!(
2362 "http: failed to parse CA bundle {} as PEM ({pem_error}) or DER ({der_error})",
2363 resolved.display()
2364 ))
2365 })?;
2366 builder = builder.add_root_certificate(cert);
2367 }
2368 }
2369 }
2370
2371 if let Some(path) = &tls.client_identity_path {
2372 let resolved = resolve_http_path("http tls", path, crate::stdlib::sandbox::FsAccess::Read)?;
2373 let bytes = std::fs::read(&resolved).map_err(|error| {
2374 vm_error(format!(
2375 "http: failed to read client identity {}: {error}",
2376 resolved.display()
2377 ))
2378 })?;
2379 let identity = reqwest::Identity::from_pem(&bytes).map_err(|error| {
2380 vm_error(format!(
2381 "http: failed to parse client identity {}: {error}",
2382 resolved.display()
2383 ))
2384 })?;
2385 builder = builder.identity(identity);
2386 } else if let Some(cert_path) = &tls.client_cert_path {
2387 let cert = {
2388 let resolved = resolve_http_path(
2389 "http tls",
2390 cert_path,
2391 crate::stdlib::sandbox::FsAccess::Read,
2392 )?;
2393 std::fs::read(&resolved).map_err(|error| {
2394 vm_error(format!(
2395 "http: failed to read client certificate {}: {error}",
2396 resolved.display()
2397 ))
2398 })?
2399 };
2400 let mut identity_pem = cert;
2401 if let Some(key_path) = &tls.client_key_path {
2402 let resolved =
2403 resolve_http_path("http tls", key_path, crate::stdlib::sandbox::FsAccess::Read)?;
2404 let key = std::fs::read(&resolved).map_err(|error| {
2405 vm_error(format!(
2406 "http: failed to read client key {}: {error}",
2407 resolved.display()
2408 ))
2409 })?;
2410 identity_pem.extend_from_slice(b"\n");
2411 identity_pem.extend_from_slice(&key);
2412 }
2413 let identity = reqwest::Identity::from_pem(&identity_pem)
2414 .map_err(|error| vm_error(format!("http: failed to parse client identity: {error}")))?;
2415 builder = builder.identity(identity);
2416 }
2417
2418 if !tls.pinned_sha256.is_empty() {
2419 builder = builder.tls_info(true);
2420 }
2421 Ok(builder)
2422}
2423
2424fn pooled_http_client(config: &HttpRequestConfig) -> Result<reqwest::Client, VmError> {
2425 let key = http_client_key(config);
2426 if let Some(client) = HTTP_CLIENTS.with(|clients| clients.borrow().get(&key).cloned()) {
2427 return Ok(client);
2428 }
2429
2430 let client = build_http_client(config)?;
2431 HTTP_CLIENTS.with(|clients| {
2432 clients.borrow_mut().insert(key, client.clone());
2433 });
2434 Ok(client)
2435}
2436
2437fn normalize_pin(value: &str) -> String {
2438 let trimmed = value.trim();
2439 let trimmed = trimmed
2440 .strip_prefix("sha256/")
2441 .or_else(|| trimmed.strip_prefix("sha256:"))
2442 .unwrap_or(trimmed);
2443 let compact = trimmed.replace(':', "");
2444 if !compact.is_empty() && compact.chars().all(|ch| ch.is_ascii_hexdigit()) {
2445 compact.to_ascii_lowercase()
2446 } else {
2447 compact
2448 }
2449}
2450
2451fn verify_tls_pin(response: &reqwest::Response, pins: &[String]) -> Result<(), VmError> {
2452 if pins.is_empty() {
2453 return Ok(());
2454 }
2455 let Some(info) = response.extensions().get::<reqwest::tls::TlsInfo>() else {
2456 return Err(vm_error(
2457 "http: TLS pinning requested but TLS info is unavailable",
2458 ));
2459 };
2460 let Some(cert_der) = info.peer_certificate() else {
2461 return Err(vm_error(
2462 "http: TLS pinning requested but no peer certificate was presented",
2463 ));
2464 };
2465 let (_, cert) = X509Certificate::from_der(cert_der)
2466 .map_err(|error| vm_error(format!("http: failed to parse peer certificate: {error}")))?;
2467 let digest = Sha256::digest(cert.tbs_certificate.subject_pki.raw);
2468 let hex_pin = hex::encode(digest.as_slice());
2469 let base64_pin = base64::engine::general_purpose::STANDARD.encode(digest);
2470 let base64url_pin = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
2471 let wanted = pins
2472 .iter()
2473 .map(|pin| normalize_pin(pin))
2474 .collect::<Vec<_>>();
2475 if wanted
2476 .iter()
2477 .any(|pin| pin == &hex_pin || pin == &base64_pin || pin == &base64url_pin)
2478 {
2479 Ok(())
2480 } else {
2481 Err(vm_error("http: TLS SPKI pin mismatch"))
2482 }
2483}
2484
2485fn parse_http_request_parts(
2486 method: &str,
2487 options: &BTreeMap<String, VmValue>,
2488) -> Result<HttpRequestParts, VmError> {
2489 let req_method = method
2490 .parse::<reqwest::Method>()
2491 .map_err(|e| vm_error(format!("http: invalid method '{method}': {e}")))?;
2492
2493 let mut header_map = reqwest::header::HeaderMap::new();
2494 let mut recorded_headers = BTreeMap::new();
2495
2496 if let Some(auth_val) = options.get("auth") {
2497 match auth_val {
2498 VmValue::String(s) => {
2499 let hv = reqwest::header::HeaderValue::from_str(s)
2500 .map_err(|e| vm_error(format!("http: invalid auth header value: {e}")))?;
2501 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2502 recorded_headers.insert(
2503 "authorization".to_string(),
2504 VmValue::String(Rc::from(s.as_ref())),
2505 );
2506 }
2507 VmValue::Dict(d) => {
2508 if let Some(bearer) = d.get("bearer") {
2509 let token = bearer.display();
2510 let authorization = format!("Bearer {token}");
2511 let hv = reqwest::header::HeaderValue::from_str(&authorization)
2512 .map_err(|e| vm_error(format!("http: invalid bearer token: {e}")))?;
2513 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2514 recorded_headers.insert(
2515 "authorization".to_string(),
2516 VmValue::String(Rc::from(authorization)),
2517 );
2518 } else if let Some(VmValue::Dict(basic)) = d.get("basic") {
2519 let user = basic.get("user").map(|v| v.display()).unwrap_or_default();
2520 let password = basic
2521 .get("password")
2522 .map(|v| v.display())
2523 .unwrap_or_default();
2524 use base64::Engine;
2525 let encoded = base64::engine::general_purpose::STANDARD
2526 .encode(format!("{user}:{password}"));
2527 let authorization = format!("Basic {encoded}");
2528 let hv = reqwest::header::HeaderValue::from_str(&authorization)
2529 .map_err(|e| vm_error(format!("http: invalid basic auth: {e}")))?;
2530 header_map.insert(reqwest::header::AUTHORIZATION, hv);
2531 recorded_headers.insert(
2532 "authorization".to_string(),
2533 VmValue::String(Rc::from(authorization)),
2534 );
2535 }
2536 }
2537 _ => {}
2538 }
2539 }
2540
2541 if let Some(VmValue::Dict(hdrs)) = options.get("headers") {
2542 for (k, v) in hdrs.iter() {
2543 let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())
2544 .map_err(|e| vm_error(format!("http: invalid header name '{k}': {e}")))?;
2545 let val = reqwest::header::HeaderValue::from_str(&v.display())
2546 .map_err(|e| vm_error(format!("http: invalid header value for '{k}': {e}")))?;
2547 header_map.insert(name, val);
2548 recorded_headers.insert(
2549 k.to_ascii_lowercase(),
2550 VmValue::String(Rc::from(v.display())),
2551 );
2552 }
2553 }
2554
2555 let multipart = parse_multipart_request(options)?;
2556 if multipart.is_some() {
2557 if options.contains_key("body") {
2558 return Err(vm_error(
2559 "http: body and multipart options are mutually exclusive",
2560 ));
2561 }
2562 recorded_headers.insert(
2563 "content-type".to_string(),
2564 VmValue::String(Rc::from(format!(
2565 "multipart/form-data; boundary={MULTIPART_MOCK_BOUNDARY}"
2566 ))),
2567 );
2568 }
2569
2570 Ok(HttpRequestParts {
2571 method: req_method,
2572 headers: header_map,
2573 recorded_headers,
2574 body: if multipart.is_some() {
2575 multipart.as_ref().map(|request| request.mock_body.clone())
2576 } else {
2577 options.get("body").map(|v| v.display())
2578 },
2579 multipart,
2580 })
2581}
2582
2583fn final_http_url(
2584 url: &str,
2585 options: &BTreeMap<String, VmValue>,
2586 builtin: &str,
2587) -> Result<String, VmError> {
2588 let Some(query) = options.get("query").and_then(VmValue::as_dict) else {
2589 return Ok(url.to_string());
2590 };
2591 let mut parsed = url::Url::parse(url)
2592 .map_err(|error| vm_error(format!("{builtin}: invalid URL '{url}': {error}")))?;
2593 {
2594 let mut pairs = parsed.query_pairs_mut();
2595 for (key, value) in query.iter() {
2596 if !matches!(value, VmValue::Nil) {
2597 pairs.append_pair(key, &value.display());
2598 }
2599 }
2600 }
2601 Ok(parsed.to_string())
2602}
2603
2604fn session_from_options(options: &BTreeMap<String, VmValue>) -> Option<String> {
2605 options
2606 .get("session")
2607 .and_then(|value| handle_from_value(value, "http_request").ok())
2608}
2609
2610fn parse_mock_stream_event(value: &VmValue) -> MockStreamEvent {
2611 match value {
2612 VmValue::Dict(dict) => MockStreamEvent {
2613 event_type: dict
2614 .get("event")
2615 .or_else(|| dict.get("type"))
2616 .map(|value| value.display())
2617 .filter(|value| !value.is_empty())
2618 .unwrap_or_else(|| "message".to_string()),
2619 data: dict
2620 .get("data")
2621 .map(|value| value.display())
2622 .unwrap_or_default(),
2623 id: dict
2624 .get("id")
2625 .map(|value| value.display())
2626 .filter(|value| !value.is_empty()),
2627 retry_ms: dict.get("retry_ms").and_then(|value| value.as_int()),
2628 },
2629 _ => MockStreamEvent {
2630 event_type: "message".to_string(),
2631 data: value.display(),
2632 id: None,
2633 retry_ms: None,
2634 },
2635 }
2636}
2637
2638fn parse_mock_stream_events(value: Option<&VmValue>) -> Vec<MockStreamEvent> {
2639 let Some(value) = value else {
2640 return Vec::new();
2641 };
2642 match value {
2643 VmValue::Dict(dict) => dict
2644 .get("events")
2645 .and_then(|events| match events {
2646 VmValue::List(items) => Some(items.iter().map(parse_mock_stream_event).collect()),
2647 _ => None,
2648 })
2649 .unwrap_or_default(),
2650 VmValue::List(items) => items.iter().map(parse_mock_stream_event).collect(),
2651 other => vec![parse_mock_stream_event(other)],
2652 }
2653}
2654
2655fn sse_event_value(event: &MockStreamEvent) -> VmValue {
2656 let mut dict = BTreeMap::new();
2657 dict.insert("type".to_string(), VmValue::String(Rc::from("event")));
2658 dict.insert(
2659 "event".to_string(),
2660 VmValue::String(Rc::from(event.event_type.as_str())),
2661 );
2662 dict.insert(
2663 "data".to_string(),
2664 VmValue::String(Rc::from(event.data.as_str())),
2665 );
2666 dict.insert(
2667 "id".to_string(),
2668 event
2669 .id
2670 .as_deref()
2671 .map(|id| VmValue::String(Rc::from(id)))
2672 .unwrap_or(VmValue::Nil),
2673 );
2674 dict.insert(
2675 "retry_ms".to_string(),
2676 event.retry_ms.map(VmValue::Int).unwrap_or(VmValue::Nil),
2677 );
2678 VmValue::Dict(Rc::new(dict))
2679}
2680
2681fn sse_server_response_value(id: &str, handle: &SseServerHandle) -> VmValue {
2682 let mut dict = BTreeMap::new();
2683 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2684 dict.insert(
2685 "type".to_string(),
2686 VmValue::String(Rc::from("sse_response")),
2687 );
2688 dict.insert("status".to_string(), VmValue::Int(handle.status));
2689 dict.insert(
2690 "headers".to_string(),
2691 VmValue::Dict(Rc::new(handle.headers.clone())),
2692 );
2693 dict.insert("body".to_string(), VmValue::Nil);
2694 dict.insert("streaming".to_string(), VmValue::Bool(true));
2695 dict.insert(
2696 "max_event_bytes".to_string(),
2697 VmValue::Int(handle.max_event_bytes as i64),
2698 );
2699 dict.insert(
2700 "max_buffered_events".to_string(),
2701 VmValue::Int(handle.max_buffered_events as i64),
2702 );
2703 VmValue::Dict(Rc::new(dict))
2704}
2705
2706fn default_sse_response_headers() -> BTreeMap<String, VmValue> {
2707 BTreeMap::from([
2708 (
2709 "content-type".to_string(),
2710 VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2711 ),
2712 (
2713 "cache-control".to_string(),
2714 VmValue::String(Rc::from("no-cache")),
2715 ),
2716 (
2717 "connection".to_string(),
2718 VmValue::String(Rc::from("keep-alive")),
2719 ),
2720 (
2721 "x-accel-buffering".to_string(),
2722 VmValue::String(Rc::from("no")),
2723 ),
2724 ])
2725}
2726
2727fn sse_response_headers(options: &BTreeMap<String, VmValue>) -> BTreeMap<String, VmValue> {
2728 let mut headers = default_sse_response_headers();
2729 if let Some(VmValue::Dict(custom)) = options.get("headers") {
2730 for (name, value) in custom.iter() {
2731 headers.retain(|existing, _| !existing.eq_ignore_ascii_case(name));
2732 headers.insert(name.clone(), VmValue::String(Rc::from(value.display())));
2733 }
2734 }
2735 if !headers
2736 .keys()
2737 .any(|name| name.eq_ignore_ascii_case("content-type"))
2738 {
2739 headers.insert(
2740 "content-type".to_string(),
2741 VmValue::String(Rc::from("text/event-stream; charset=utf-8")),
2742 );
2743 }
2744 headers
2745}
2746
2747fn validate_sse_field(field: &str, value: &str) -> Result<(), VmError> {
2748 if value.contains('\n') || value.contains('\r') {
2749 return Err(vm_error(format!(
2750 "sse_event: {field} must not contain newlines"
2751 )));
2752 }
2753 Ok(())
2754}
2755
2756fn push_sse_multiline_field(frame: &mut String, field: &str, value: &str) {
2757 let normalized = value.replace("\r\n", "\n").replace('\r', "\n");
2758 if normalized.is_empty() {
2759 frame.push_str(field);
2760 frame.push_str(": \n");
2761 return;
2762 }
2763 for line in normalized.split('\n') {
2764 frame.push_str(field);
2765 frame.push_str(": ");
2766 frame.push_str(line);
2767 frame.push('\n');
2768 }
2769}
2770
2771fn vm_sse_event_frame(
2772 event: &VmValue,
2773 options: &BTreeMap<String, VmValue>,
2774) -> Result<String, VmError> {
2775 let mut frame = String::new();
2776 let mut has_event_payload = false;
2777
2778 match event {
2779 VmValue::Dict(dict) => {
2780 if let Some(comment) = dict.get("comment").or_else(|| options.get("comment")) {
2781 push_sse_comment(&mut frame, &comment.display());
2782 }
2783 if let Some(id) = dict.get("id").or_else(|| options.get("id")) {
2784 let id = id.display();
2785 validate_sse_field("id", &id)?;
2786 frame.push_str("id: ");
2787 frame.push_str(&id);
2788 frame.push('\n');
2789 }
2790 if let Some(event_type) = dict
2791 .get("event")
2792 .or_else(|| dict.get("name"))
2793 .or_else(|| options.get("event"))
2794 {
2795 let event_type = event_type.display();
2796 validate_sse_field("event", &event_type)?;
2797 frame.push_str("event: ");
2798 frame.push_str(&event_type);
2799 frame.push('\n');
2800 has_event_payload = true;
2801 }
2802 if let Some(retry) = dict
2803 .get("retry")
2804 .or_else(|| dict.get("retry_ms"))
2805 .or_else(|| options.get("retry"))
2806 .or_else(|| options.get("retry_ms"))
2807 {
2808 let retry_ms = retry.as_int().ok_or_else(|| {
2809 vm_error("sse_event: retry/retry_ms must be a non-negative integer")
2810 })?;
2811 if retry_ms < 0 {
2812 return Err(vm_error(
2813 "sse_event: retry/retry_ms must be a non-negative integer",
2814 ));
2815 }
2816 frame.push_str("retry: ");
2817 frame.push_str(&retry_ms.to_string());
2818 frame.push('\n');
2819 has_event_payload = true;
2820 }
2821 if let Some(data) = dict.get("data").or_else(|| options.get("data")) {
2822 push_sse_multiline_field(&mut frame, "data", &data.display());
2823 has_event_payload = true;
2824 } else if !frame.is_empty() && !dict.contains_key("comment") {
2825 push_sse_multiline_field(&mut frame, "data", "");
2826 has_event_payload = true;
2827 }
2828 }
2829 other => {
2830 if let Some(comment) = options.get("comment") {
2831 push_sse_comment(&mut frame, &comment.display());
2832 }
2833 if let Some(id) = options.get("id") {
2834 let id = id.display();
2835 validate_sse_field("id", &id)?;
2836 frame.push_str("id: ");
2837 frame.push_str(&id);
2838 frame.push('\n');
2839 }
2840 if let Some(event_type) = options.get("event") {
2841 let event_type = event_type.display();
2842 validate_sse_field("event", &event_type)?;
2843 frame.push_str("event: ");
2844 frame.push_str(&event_type);
2845 frame.push('\n');
2846 }
2847 if let Some(retry) = options.get("retry").or_else(|| options.get("retry_ms")) {
2848 let retry_ms = retry.as_int().ok_or_else(|| {
2849 vm_error("sse_event: retry/retry_ms must be a non-negative integer")
2850 })?;
2851 if retry_ms < 0 {
2852 return Err(vm_error(
2853 "sse_event: retry/retry_ms must be a non-negative integer",
2854 ));
2855 }
2856 frame.push_str("retry: ");
2857 frame.push_str(&retry_ms.to_string());
2858 frame.push('\n');
2859 }
2860 push_sse_multiline_field(&mut frame, "data", &other.display());
2861 has_event_payload = true;
2862 }
2863 }
2864
2865 if frame.is_empty() || !has_event_payload && !frame.starts_with(':') {
2866 push_sse_comment(&mut frame, "");
2867 }
2868 frame.push('\n');
2869 Ok(frame)
2870}
2871
2872fn push_sse_comment(frame: &mut String, comment: &str) {
2873 let normalized = comment.replace("\r\n", "\n").replace('\r', "\n");
2874 if normalized.is_empty() {
2875 frame.push_str(":\n");
2876 return;
2877 }
2878 for line in normalized.split('\n') {
2879 frame.push_str(": ");
2880 frame.push_str(line);
2881 frame.push('\n');
2882 }
2883}
2884
2885fn vm_sse_server_response(options: &BTreeMap<String, VmValue>) -> Result<VmValue, VmError> {
2886 let id = next_transport_handle("sse-server");
2887 let status = options
2888 .get("status")
2889 .and_then(|value| value.as_int())
2890 .unwrap_or(200)
2891 .clamp(100, 599);
2892 let handle = SseServerHandle {
2893 status,
2894 headers: sse_response_headers(options),
2895 frames: VecDeque::new(),
2896 max_event_bytes: transport_limit_option(
2897 options,
2898 "max_event_bytes",
2899 DEFAULT_MAX_MESSAGE_BYTES,
2900 )
2901 .max(1),
2902 max_buffered_events: transport_limit_option(
2903 options,
2904 "max_buffered_events",
2905 DEFAULT_MAX_STREAM_EVENTS,
2906 )
2907 .max(1),
2908 sent_events: 0,
2909 flushed_events: 0,
2910 closed: false,
2911 disconnected: false,
2912 cancelled: false,
2913 cancel_reason: None,
2914 };
2915 let value = sse_server_response_value(&id, &handle);
2916 SSE_SERVER_HANDLES.with(|handles| {
2917 let mut handles = handles.borrow_mut();
2918 if handles.len() >= MAX_SSE_SERVER_STREAMS {
2919 return Err(vm_error(format!(
2920 "sse_server_response: maximum open streams ({MAX_SSE_SERVER_STREAMS}) reached"
2921 )));
2922 }
2923 handles.insert(id, handle);
2924 Ok(())
2925 })?;
2926 Ok(value)
2927}
2928
2929fn sse_server_status_value(id: &str, handle: &SseServerHandle) -> VmValue {
2930 let mut dict = BTreeMap::new();
2931 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
2932 dict.insert("status".to_string(), VmValue::Int(handle.status));
2933 dict.insert(
2934 "headers".to_string(),
2935 VmValue::Dict(Rc::new(handle.headers.clone())),
2936 );
2937 dict.insert(
2938 "buffered_events".to_string(),
2939 VmValue::Int(handle.frames.len() as i64),
2940 );
2941 dict.insert(
2942 "sent_events".to_string(),
2943 VmValue::Int(handle.sent_events as i64),
2944 );
2945 dict.insert(
2946 "flushed_events".to_string(),
2947 VmValue::Int(handle.flushed_events as i64),
2948 );
2949 dict.insert("closed".to_string(), VmValue::Bool(handle.closed));
2950 dict.insert(
2951 "disconnected".to_string(),
2952 VmValue::Bool(handle.disconnected),
2953 );
2954 dict.insert("cancelled".to_string(), VmValue::Bool(handle.cancelled));
2955 dict.insert(
2956 "cancel_reason".to_string(),
2957 handle
2958 .cancel_reason
2959 .as_deref()
2960 .map(|reason| VmValue::String(Rc::from(reason)))
2961 .unwrap_or(VmValue::Nil),
2962 );
2963 dict.insert(
2964 "max_event_bytes".to_string(),
2965 VmValue::Int(handle.max_event_bytes as i64),
2966 );
2967 dict.insert(
2968 "max_buffered_events".to_string(),
2969 VmValue::Int(handle.max_buffered_events as i64),
2970 );
2971 VmValue::Dict(Rc::new(dict))
2972}
2973
2974fn vm_sse_server_status(stream_id: &str) -> Result<VmValue, VmError> {
2975 SSE_SERVER_HANDLES.with(|handles| {
2976 handles
2977 .borrow()
2978 .get(stream_id)
2979 .map(|handle| sse_server_status_value(stream_id, handle))
2980 .ok_or_else(|| vm_error(format!("sse_server_status: unknown stream '{stream_id}'")))
2981 })
2982}
2983
2984fn vm_sse_server_send(
2985 stream_id: &str,
2986 event: &VmValue,
2987 options: &BTreeMap<String, VmValue>,
2988) -> Result<VmValue, VmError> {
2989 let frame = vm_sse_event_frame(event, options)?;
2990 SSE_SERVER_HANDLES.with(|handles| {
2991 let mut handles = handles.borrow_mut();
2992 let handle = handles
2993 .get_mut(stream_id)
2994 .ok_or_else(|| vm_error(format!("sse_server_send: unknown stream '{stream_id}'")))?;
2995 if handle.closed || handle.cancelled || handle.disconnected {
2996 return Ok(VmValue::Bool(false));
2997 }
2998 if frame.len() > handle.max_event_bytes {
2999 return Err(vm_error(format!(
3000 "sse_server_send: event exceeded max_event_bytes ({})",
3001 handle.max_event_bytes
3002 )));
3003 }
3004 if handle.frames.len() >= handle.max_buffered_events {
3005 return Err(vm_error(format!(
3006 "sse_server_send: buffered events exceeded max_buffered_events ({})",
3007 handle.max_buffered_events
3008 )));
3009 }
3010 handle.frames.push_back(frame);
3011 handle.sent_events += 1;
3012 Ok(VmValue::Bool(true))
3013 })
3014}
3015
3016fn vm_sse_server_heartbeat(stream_id: &str, comment: Option<&VmValue>) -> Result<VmValue, VmError> {
3017 let mut frame = String::new();
3018 push_sse_comment(
3019 &mut frame,
3020 &comment
3021 .map(|value| value.display())
3022 .unwrap_or_else(|| "heartbeat".to_string()),
3023 );
3024 frame.push('\n');
3025 SSE_SERVER_HANDLES.with(|handles| {
3026 let mut handles = handles.borrow_mut();
3027 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3028 vm_error(format!(
3029 "sse_server_heartbeat: unknown stream '{stream_id}'"
3030 ))
3031 })?;
3032 if handle.closed || handle.cancelled || handle.disconnected {
3033 return Ok(VmValue::Bool(false));
3034 }
3035 if frame.len() > handle.max_event_bytes {
3036 return Err(vm_error(format!(
3037 "sse_server_heartbeat: event exceeded max_event_bytes ({})",
3038 handle.max_event_bytes
3039 )));
3040 }
3041 if handle.frames.len() >= handle.max_buffered_events {
3042 return Err(vm_error(format!(
3043 "sse_server_heartbeat: buffered events exceeded max_buffered_events ({})",
3044 handle.max_buffered_events
3045 )));
3046 }
3047 handle.frames.push_back(frame);
3048 handle.sent_events += 1;
3049 Ok(VmValue::Bool(true))
3050 })
3051}
3052
3053fn vm_sse_server_flush(stream_id: &str) -> Result<VmValue, VmError> {
3054 SSE_SERVER_HANDLES.with(|handles| {
3055 let mut handles = handles.borrow_mut();
3056 let handle = handles
3057 .get_mut(stream_id)
3058 .ok_or_else(|| vm_error(format!("sse_server_flush: unknown stream '{stream_id}'")))?;
3059 if handle.disconnected || handle.cancelled {
3060 return Ok(VmValue::Bool(false));
3061 }
3062 handle.flushed_events = handle.sent_events;
3063 Ok(VmValue::Bool(!handle.closed))
3064 })
3065}
3066
3067fn vm_sse_server_close(stream_id: &str) -> Result<VmValue, VmError> {
3068 SSE_SERVER_HANDLES.with(|handles| {
3069 let mut handles = handles.borrow_mut();
3070 let handle = handles
3071 .get_mut(stream_id)
3072 .ok_or_else(|| vm_error(format!("sse_server_close: unknown stream '{stream_id}'")))?;
3073 if handle.closed {
3074 return Ok(VmValue::Bool(false));
3075 }
3076 handle.closed = true;
3077 Ok(VmValue::Bool(true))
3078 })
3079}
3080
3081fn vm_sse_server_cancel(stream_id: &str, reason: Option<&VmValue>) -> Result<VmValue, VmError> {
3082 SSE_SERVER_HANDLES.with(|handles| {
3083 let mut handles = handles.borrow_mut();
3084 let handle = handles
3085 .get_mut(stream_id)
3086 .ok_or_else(|| vm_error(format!("sse_server_cancel: unknown stream '{stream_id}'")))?;
3087 if handle.cancelled {
3088 return Ok(VmValue::Bool(false));
3089 }
3090 handle.cancelled = true;
3091 handle.closed = true;
3092 handle.cancel_reason = reason
3093 .map(|value| value.display())
3094 .filter(|value| !value.is_empty());
3095 Ok(VmValue::Bool(true))
3096 })
3097}
3098
3099fn vm_sse_server_observed_bool(
3100 stream_id: &str,
3101 builtin: &str,
3102 predicate: impl Fn(&SseServerHandle) -> bool,
3103) -> Result<VmValue, VmError> {
3104 SSE_SERVER_HANDLES.with(|handles| {
3105 handles
3106 .borrow()
3107 .get(stream_id)
3108 .map(|handle| VmValue::Bool(predicate(handle)))
3109 .ok_or_else(|| vm_error(format!("{builtin}: unknown stream '{stream_id}'")))
3110 })
3111}
3112
3113fn vm_sse_server_mock_receive(stream_id: &str) -> Result<VmValue, VmError> {
3114 SSE_SERVER_HANDLES.with(|handles| {
3115 let mut handles = handles.borrow_mut();
3116 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3117 vm_error(format!(
3118 "sse_server_mock_receive: unknown stream '{stream_id}'"
3119 ))
3120 })?;
3121 if let Some(frame) = handle.frames.pop_front() {
3122 return Ok(sse_server_mock_frame_value(&frame));
3123 }
3124 if handle.closed || handle.cancelled || handle.disconnected {
3125 return Ok(sse_server_closed_event());
3126 }
3127 Ok(timeout_event())
3128 })
3129}
3130
3131fn vm_sse_server_mock_disconnect(stream_id: &str) -> Result<VmValue, VmError> {
3132 SSE_SERVER_HANDLES.with(|handles| {
3133 let mut handles = handles.borrow_mut();
3134 let handle = handles.get_mut(stream_id).ok_or_else(|| {
3135 vm_error(format!(
3136 "sse_server_mock_disconnect: unknown stream '{stream_id}'"
3137 ))
3138 })?;
3139 if handle.disconnected {
3140 return Ok(VmValue::Bool(false));
3141 }
3142 handle.disconnected = true;
3143 handle.closed = true;
3144 Ok(VmValue::Bool(true))
3145 })
3146}
3147
3148fn sse_server_mock_frame_value(frame: &str) -> VmValue {
3149 let mut event = MockStreamEvent {
3150 event_type: "message".to_string(),
3151 data: String::new(),
3152 id: None,
3153 retry_ms: None,
3154 };
3155 let mut data_lines = Vec::new();
3156 let mut comments = Vec::new();
3157 for raw in frame.lines() {
3158 if raw.is_empty() {
3159 continue;
3160 }
3161 if let Some(comment) = raw.strip_prefix(':') {
3162 comments.push(comment.strip_prefix(' ').unwrap_or(comment).to_string());
3163 continue;
3164 }
3165 let (field, value) = raw.split_once(':').unwrap_or((raw, ""));
3166 let value = value.strip_prefix(' ').unwrap_or(value);
3167 match field {
3168 "event" => event.event_type = value.to_string(),
3169 "data" => data_lines.push(value.to_string()),
3170 "id" => event.id = Some(value.to_string()).filter(|value| !value.is_empty()),
3171 "retry" => event.retry_ms = value.parse::<i64>().ok(),
3172 _ => {}
3173 }
3174 }
3175 if !data_lines.is_empty() {
3176 event.data = data_lines.join("\n");
3177 }
3178 let mut value = if comments.is_empty() || !data_lines.is_empty() {
3179 sse_event_value(&event)
3180 } else {
3181 let mut dict = BTreeMap::new();
3182 dict.insert("type".to_string(), VmValue::String(Rc::from("comment")));
3183 dict.insert(
3184 "comment".to_string(),
3185 VmValue::String(Rc::from(comments.join("\n"))),
3186 );
3187 VmValue::Dict(Rc::new(dict))
3188 };
3189 if let VmValue::Dict(dict) = &mut value {
3190 let mut owned = (**dict).clone();
3191 owned.insert("raw".to_string(), VmValue::String(Rc::from(frame)));
3192 value = VmValue::Dict(Rc::new(owned));
3193 }
3194 value
3195}
3196
3197fn real_sse_event_value(event: SseEvent) -> VmValue {
3198 match event {
3199 SseEvent::Open => {
3200 let mut dict = BTreeMap::new();
3201 dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
3202 VmValue::Dict(Rc::new(dict))
3203 }
3204 SseEvent::Message(message) => {
3205 let retry_ms = message.retry.map(|retry| retry.as_millis() as i64);
3206 sse_event_value(&MockStreamEvent {
3207 event_type: if message.event.is_empty() {
3208 "message".to_string()
3209 } else {
3210 message.event
3211 },
3212 data: message.data,
3213 id: if message.id.is_empty() {
3214 None
3215 } else {
3216 Some(message.id)
3217 },
3218 retry_ms,
3219 })
3220 }
3221 }
3222}
3223
3224fn consume_sse_mock(url: &str) -> Option<Vec<MockStreamEvent>> {
3225 SSE_MOCKS.with(|mocks| {
3226 mocks
3227 .borrow()
3228 .iter()
3229 .find(|mock| url_matches(&mock.url_pattern, url))
3230 .map(|mock| mock.events.clone())
3231 })
3232}
3233
3234fn parse_ws_message(value: &VmValue) -> MockWsMessage {
3235 match value {
3236 VmValue::Dict(dict) => {
3237 let message_type = dict
3238 .get("type")
3239 .map(|value| value.display())
3240 .filter(|value| !value.is_empty())
3241 .unwrap_or_else(|| "text".to_string());
3242 let data = if dict
3243 .get("base64")
3244 .and_then(|value| match value {
3245 VmValue::Bool(value) => Some(*value),
3246 _ => None,
3247 })
3248 .unwrap_or(false)
3249 {
3250 use base64::Engine;
3251 dict.get("data")
3252 .map(|value| value.display())
3253 .and_then(|data| base64::engine::general_purpose::STANDARD.decode(data).ok())
3254 .unwrap_or_default()
3255 } else {
3256 dict.get("data")
3257 .map(|value| value.display().into_bytes())
3258 .unwrap_or_default()
3259 };
3260 MockWsMessage {
3261 message_type,
3262 data,
3263 close_code: dict
3264 .get("code")
3265 .and_then(|value| value.as_int())
3266 .map(|value| value as u16),
3267 close_reason: dict.get("reason").map(|value| value.display()),
3268 }
3269 }
3270 VmValue::Bytes(bytes) => MockWsMessage {
3271 message_type: "binary".to_string(),
3272 data: bytes.as_ref().clone(),
3273 close_code: None,
3274 close_reason: None,
3275 },
3276 other => MockWsMessage {
3277 message_type: "text".to_string(),
3278 data: other.display().into_bytes(),
3279 close_code: None,
3280 close_reason: None,
3281 },
3282 }
3283}
3284
3285fn parse_websocket_mock(value: Option<&VmValue>) -> (Vec<MockWsMessage>, bool) {
3286 let Some(value) = value else {
3287 return (Vec::new(), false);
3288 };
3289 match value {
3290 VmValue::Dict(dict) => {
3291 let echo = dict
3292 .get("echo")
3293 .and_then(|value| match value {
3294 VmValue::Bool(value) => Some(*value),
3295 _ => None,
3296 })
3297 .unwrap_or(false);
3298 let messages = dict
3299 .get("messages")
3300 .and_then(|messages| match messages {
3301 VmValue::List(items) => Some(items.iter().map(parse_ws_message).collect()),
3302 _ => None,
3303 })
3304 .unwrap_or_default();
3305 (messages, echo)
3306 }
3307 VmValue::List(items) => (items.iter().map(parse_ws_message).collect(), false),
3308 other => (vec![parse_ws_message(other)], false),
3309 }
3310}
3311
3312fn consume_websocket_mock(url: &str) -> Option<(Vec<MockWsMessage>, bool)> {
3313 WEBSOCKET_MOCKS.with(|mocks| {
3314 mocks
3315 .borrow()
3316 .iter()
3317 .find(|mock| url_matches(&mock.url_pattern, url))
3318 .map(|mock| (mock.messages.clone(), mock.echo))
3319 })
3320}
3321
3322fn ws_message_data(message: &MockWsMessage) -> String {
3323 match message.message_type.as_str() {
3324 "text" => String::from_utf8_lossy(&message.data).into_owned(),
3325 _ => {
3326 use base64::Engine;
3327 base64::engine::general_purpose::STANDARD.encode(&message.data)
3328 }
3329 }
3330}
3331
3332fn closed_event_with(code: Option<u16>, reason: Option<String>) -> VmValue {
3333 let mut dict = BTreeMap::new();
3334 dict.insert("type".to_string(), VmValue::String(Rc::from("close")));
3335 if let Some(code) = code {
3336 dict.insert("code".to_string(), VmValue::Int(i64::from(code)));
3337 }
3338 if let Some(reason) = reason {
3339 dict.insert("reason".to_string(), VmValue::String(Rc::from(reason)));
3340 }
3341 VmValue::Dict(Rc::new(dict))
3342}
3343
3344fn ws_event_value(message: MockWsMessage) -> VmValue {
3345 if message.message_type == "close" {
3346 return closed_event_with(message.close_code, message.close_reason);
3347 }
3348 let mut dict = BTreeMap::new();
3349 dict.insert(
3350 "type".to_string(),
3351 VmValue::String(Rc::from(message.message_type.as_str())),
3352 );
3353 match message.message_type.as_str() {
3354 "text" => {
3355 dict.insert(
3356 "data".to_string(),
3357 VmValue::String(Rc::from(String::from_utf8_lossy(&message.data).as_ref())),
3358 );
3359 }
3360 _ => {
3361 use base64::Engine;
3362 dict.insert(
3363 "data_base64".to_string(),
3364 VmValue::String(Rc::from(
3365 base64::engine::general_purpose::STANDARD
3366 .encode(&message.data)
3367 .as_str(),
3368 )),
3369 );
3370 }
3371 }
3372 VmValue::Dict(Rc::new(dict))
3373}
3374
3375fn real_ws_event_value(message: WsMessage) -> VmValue {
3376 match message {
3377 WsMessage::Text(text) => ws_event_value(MockWsMessage {
3378 message_type: "text".to_string(),
3379 data: text.as_bytes().to_vec(),
3380 close_code: None,
3381 close_reason: None,
3382 }),
3383 WsMessage::Binary(bytes) => ws_event_value(MockWsMessage {
3384 message_type: "binary".to_string(),
3385 data: bytes.to_vec(),
3386 close_code: None,
3387 close_reason: None,
3388 }),
3389 WsMessage::Ping(bytes) => ws_event_value(MockWsMessage {
3390 message_type: "ping".to_string(),
3391 data: bytes.to_vec(),
3392 close_code: None,
3393 close_reason: None,
3394 }),
3395 WsMessage::Pong(bytes) => ws_event_value(MockWsMessage {
3396 message_type: "pong".to_string(),
3397 data: bytes.to_vec(),
3398 close_code: None,
3399 close_reason: None,
3400 }),
3401 WsMessage::Close(frame) => match frame {
3402 Some(frame) => {
3403 closed_event_with(Some(u16::from(frame.code)), Some(frame.reason.to_string()))
3404 }
3405 None => closed_event(),
3406 },
3407 WsMessage::Frame(_) => VmValue::Nil,
3408 }
3409}
3410
3411fn transport_mock_call_value(call: &TransportMockCall) -> VmValue {
3412 let mut dict = BTreeMap::new();
3413 dict.insert(
3414 "kind".to_string(),
3415 VmValue::String(Rc::from(call.kind.as_str())),
3416 );
3417 dict.insert(
3418 "url".to_string(),
3419 VmValue::String(Rc::from(call.url.as_str())),
3420 );
3421 dict.insert(
3422 "handle".to_string(),
3423 call.handle
3424 .as_deref()
3425 .map(|handle| VmValue::String(Rc::from(handle)))
3426 .unwrap_or(VmValue::Nil),
3427 );
3428 dict.insert(
3429 "type".to_string(),
3430 call.message_type
3431 .as_deref()
3432 .map(|message_type| VmValue::String(Rc::from(message_type)))
3433 .unwrap_or(VmValue::Nil),
3434 );
3435 dict.insert(
3436 "data".to_string(),
3437 call.data
3438 .as_deref()
3439 .map(|data| VmValue::String(Rc::from(data)))
3440 .unwrap_or(VmValue::Nil),
3441 );
3442 VmValue::Dict(Rc::new(dict))
3443}
3444
3445fn method_is_retryable(retry: &RetryConfig, method: &reqwest::Method) -> bool {
3446 retry
3447 .retryable_methods
3448 .iter()
3449 .any(|candidate| candidate.eq_ignore_ascii_case(method.as_str()))
3450}
3451
3452fn should_retry_response(
3453 config: &HttpRequestConfig,
3454 method: &reqwest::Method,
3455 status: u16,
3456 attempt: u32,
3457) -> bool {
3458 attempt < config.retry.max
3459 && method_is_retryable(&config.retry, method)
3460 && config.retry.retryable_statuses.contains(&status)
3461}
3462
3463fn should_retry_transport(
3464 config: &HttpRequestConfig,
3465 method: &reqwest::Method,
3466 error: &reqwest::Error,
3467 attempt: u32,
3468) -> bool {
3469 attempt < config.retry.max
3470 && method_is_retryable(&config.retry, method)
3471 && (error.is_timeout() || error.is_connect())
3472}
3473
3474fn parse_retry_after_value(value: &str) -> Option<Duration> {
3475 let value = value.trim();
3476 if value.is_empty() {
3477 return None;
3478 }
3479
3480 if let Ok(secs) = value.parse::<f64>() {
3481 if !secs.is_finite() || secs < 0.0 {
3482 return Some(Duration::from_millis(0));
3483 }
3484 let millis = (secs * 1_000.0) as u64;
3485 return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3486 }
3487
3488 if let Ok(target) = httpdate::parse_http_date(value) {
3489 let millis = target
3490 .duration_since(SystemTime::now())
3491 .map(|delta| delta.as_millis() as u64)
3492 .unwrap_or(0);
3493 return Some(Duration::from_millis(millis.min(MAX_RETRY_DELAY_MS)));
3494 }
3495
3496 None
3497}
3498
3499fn parse_retry_after_header(value: &reqwest::header::HeaderValue) -> Option<Duration> {
3500 value.to_str().ok().and_then(parse_retry_after_value)
3501}
3502
3503fn mock_retry_after(status: u16, headers: &BTreeMap<String, VmValue>) -> Option<Duration> {
3504 if !(status == 429 || status == 503) {
3505 return None;
3506 }
3507
3508 headers
3509 .iter()
3510 .find(|(name, _)| name.eq_ignore_ascii_case("retry-after"))
3511 .and_then(|(_, value)| parse_retry_after_value(&value.display()))
3512}
3513
3514fn response_retry_after(
3515 status: u16,
3516 headers: &reqwest::header::HeaderMap,
3517 respect_retry_after: bool,
3518) -> Option<Duration> {
3519 if !respect_retry_after || !(status == 429 || status == 503) {
3520 return None;
3521 }
3522 headers
3523 .get(reqwest::header::RETRY_AFTER)
3524 .and_then(parse_retry_after_header)
3525}
3526
3527fn compute_retry_delay(attempt: u32, base_ms: u64, retry_after: Option<Duration>) -> Duration {
3528 use rand::RngExt;
3529
3530 let base_delay = base_ms.saturating_mul(1u64 << attempt.min(30));
3531 let jitter: f64 = rand::rng().random_range(0.75..=1.25);
3532 let exponential_ms = ((base_delay as f64 * jitter) as u64).min(MAX_RETRY_DELAY_MS);
3533 let retry_after_ms = retry_after
3534 .map(|duration| duration.as_millis() as u64)
3535 .unwrap_or(0)
3536 .min(MAX_RETRY_DELAY_MS);
3537 Duration::from_millis(exponential_ms.max(retry_after_ms))
3538}
3539
3540async fn vm_execute_http_request(
3541 method: &str,
3542 url: &str,
3543 options: &BTreeMap<String, VmValue>,
3544) -> Result<VmValue, VmError> {
3545 if let Some(session_id) = session_from_options(options) {
3546 return vm_execute_http_session_request(&session_id, method, url, options).await;
3547 }
3548
3549 let config = parse_http_options(options);
3550 let client = pooled_http_client(&config)?;
3551 vm_execute_http_request_with_client(client, &config, method, url, options).await
3552}
3553
3554async fn vm_execute_http_session_request(
3555 session_id: &str,
3556 method: &str,
3557 url: &str,
3558 options: &BTreeMap<String, VmValue>,
3559) -> Result<VmValue, VmError> {
3560 let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(session_id).cloned());
3561 let Some(session) = session else {
3562 return Err(vm_error(format!(
3563 "http_session_request: unknown HTTP session '{session_id}'"
3564 )));
3565 };
3566 let merged_options = merge_options(&session.options, options);
3567 let config = parse_http_options(&merged_options);
3568 vm_execute_http_request_with_client(session.client, &config, method, url, &merged_options).await
3569}
3570
3571async fn vm_execute_http_request_with_client(
3572 client: reqwest::Client,
3573 config: &HttpRequestConfig,
3574 method: &str,
3575 url: &str,
3576 options: &BTreeMap<String, VmValue>,
3577) -> Result<VmValue, VmError> {
3578 let parts = parse_http_request_parts(method, options)?;
3579 let final_url = final_http_url(url, options, "http")?;
3580
3581 if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3582 return Err(vm_error(format!(
3583 "http: URL must start with http:// or https://, got '{url}'"
3584 )));
3585 }
3586 crate::egress::enforce_url_allowed("http_request", &final_url).await?;
3587
3588 for attempt in 0..=config.retry.max {
3589 if let Some(mock_response) = consume_http_mock(
3590 method,
3591 &final_url,
3592 parts.recorded_headers.clone(),
3593 parts.body.clone(),
3594 ) {
3595 let status = mock_response.status.clamp(0, u16::MAX as i64) as u16;
3596 if should_retry_response(config, &parts.method, status, attempt) {
3597 let retry_after = if config.retry.respect_retry_after {
3598 mock_retry_after(status, &mock_response.headers)
3599 } else {
3600 None
3601 };
3602 tokio::time::sleep(compute_retry_delay(
3603 attempt,
3604 config.retry.backoff_ms,
3605 retry_after,
3606 ))
3607 .await;
3608 continue;
3609 }
3610
3611 return Ok(build_http_response(
3612 mock_response.status,
3613 mock_response.headers,
3614 mock_response.body,
3615 ));
3616 }
3617
3618 let mut req = client.request(parts.method.clone(), &final_url);
3619 req = req
3620 .headers(parts.headers.clone())
3621 .timeout(Duration::from_millis(config.total_timeout_ms));
3622 if let Some(multipart) = &parts.multipart {
3623 req = req.multipart(multipart_form(multipart)?);
3624 } else if let Some(ref b) = parts.body {
3625 req = req.body(b.clone());
3626 }
3627
3628 match req.send().await {
3629 Ok(response) => {
3630 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3631 let status = response.status().as_u16();
3632 if should_retry_response(config, &parts.method, status, attempt) {
3633 let retry_after = response_retry_after(
3634 status,
3635 response.headers(),
3636 config.retry.respect_retry_after,
3637 );
3638 tokio::time::sleep(compute_retry_delay(
3639 attempt,
3640 config.retry.backoff_ms,
3641 retry_after,
3642 ))
3643 .await;
3644 continue;
3645 }
3646
3647 let resp_headers = response_headers(response.headers());
3648
3649 let body_text = response
3650 .text()
3651 .await
3652 .map_err(|e| vm_error(format!("http: failed to read response body: {e}")))?;
3653 return Ok(build_http_response(status as i64, resp_headers, body_text));
3654 }
3655 Err(e) => {
3656 if should_retry_transport(config, &parts.method, &e, attempt) {
3657 tokio::time::sleep(compute_retry_delay(attempt, config.retry.backoff_ms, None))
3658 .await;
3659 continue;
3660 }
3661 return Err(vm_error(format!("http: request failed: {e}")));
3662 }
3663 }
3664 }
3665
3666 Err(vm_error("http: request failed"))
3667}
3668
3669async fn vm_http_download(
3670 url: &str,
3671 dst_path: &str,
3672 options: &BTreeMap<String, VmValue>,
3673) -> Result<VmValue, VmError> {
3674 let method = options
3675 .get("method")
3676 .map(|value| value.display())
3677 .filter(|value| !value.is_empty())
3678 .unwrap_or_else(|| "GET".to_string())
3679 .to_uppercase();
3680 let parts = parse_http_request_parts(&method, options)?;
3681 let final_url = final_http_url(url, options, "http_download")?;
3682 if let Some(mock_response) = consume_http_mock(
3683 &method,
3684 &final_url,
3685 parts.recorded_headers.clone(),
3686 parts.body.clone(),
3687 ) {
3688 let resolved = resolve_http_path(
3689 "http_download",
3690 dst_path,
3691 crate::stdlib::sandbox::FsAccess::Write,
3692 )?;
3693 std::fs::write(&resolved, mock_response.body.as_bytes()).map_err(|error| {
3694 vm_error(format!(
3695 "http_download: failed to write {}: {error}",
3696 resolved.display()
3697 ))
3698 })?;
3699 return Ok(build_http_download_response(
3700 mock_response.status,
3701 mock_response.headers,
3702 mock_response.body.len() as u64,
3703 ));
3704 }
3705
3706 if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3707 return Err(vm_error(format!(
3708 "http_download: URL must start with http:// or https://, got '{url}'"
3709 )));
3710 }
3711 crate::egress::enforce_url_allowed("http_download", &final_url).await?;
3712 let config = parse_http_options(options);
3713 let client = if let Some(session_id) = session_from_options(options) {
3714 HTTP_SESSIONS
3715 .with(|sessions| sessions.borrow().get(&session_id).cloned())
3716 .map(|session| session.client)
3717 .ok_or_else(|| {
3718 vm_error(format!(
3719 "http_download: unknown HTTP session '{session_id}'"
3720 ))
3721 })?
3722 } else {
3723 pooled_http_client(&config)?
3724 };
3725 let mut request = client
3726 .request(parts.method, &final_url)
3727 .headers(parts.headers)
3728 .timeout(Duration::from_millis(config.total_timeout_ms));
3729 if let Some(multipart) = &parts.multipart {
3730 request = request.multipart(multipart_form(multipart)?);
3731 } else if let Some(body) = parts.body {
3732 request = request.body(body);
3733 }
3734 let mut response = request
3735 .send()
3736 .await
3737 .map_err(|error| vm_error(format!("http_download: request failed: {error}")))?;
3738 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3739 let status = response.status().as_u16() as i64;
3740 let headers = response_headers(response.headers());
3741 let resolved = resolve_http_path(
3742 "http_download",
3743 dst_path,
3744 crate::stdlib::sandbox::FsAccess::Write,
3745 )?;
3746 let mut file = std::fs::File::create(&resolved).map_err(|error| {
3747 vm_error(format!(
3748 "http_download: failed to create {}: {error}",
3749 resolved.display()
3750 ))
3751 })?;
3752 let mut bytes_written = 0_u64;
3753 while let Some(chunk) = response.chunk().await.map_err(|error| {
3754 vm_error(format!(
3755 "http_download: failed to read response body: {error}"
3756 ))
3757 })? {
3758 file.write_all(&chunk).map_err(|error| {
3759 vm_error(format!(
3760 "http_download: failed to write {}: {error}",
3761 resolved.display()
3762 ))
3763 })?;
3764 bytes_written += chunk.len() as u64;
3765 }
3766 Ok(build_http_download_response(status, headers, bytes_written))
3767}
3768
3769async fn vm_http_stream_open(
3770 url: &str,
3771 options: &BTreeMap<String, VmValue>,
3772) -> Result<VmValue, VmError> {
3773 let method = options
3774 .get("method")
3775 .map(|value| value.display())
3776 .filter(|value| !value.is_empty())
3777 .unwrap_or_else(|| "GET".to_string())
3778 .to_uppercase();
3779 let parts = parse_http_request_parts(&method, options)?;
3780 let final_url = final_http_url(url, options, "http_stream_open")?;
3781 let id = next_transport_handle("http-stream");
3782 if let Some(mock_response) = consume_http_mock(
3783 &method,
3784 &final_url,
3785 parts.recorded_headers.clone(),
3786 parts.body.clone(),
3787 ) {
3788 let handle = HttpStreamHandle {
3789 kind: HttpStreamKind::Fake,
3790 status: mock_response.status,
3791 headers: mock_response.headers,
3792 pending: mock_response.body.into_bytes().into(),
3793 closed: false,
3794 };
3795 HTTP_STREAMS.with(|streams| {
3796 let mut streams = streams.borrow_mut();
3797 if streams.len() >= MAX_HTTP_STREAMS {
3798 return Err(vm_error(format!(
3799 "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
3800 )));
3801 }
3802 streams.insert(id.clone(), handle);
3803 Ok(())
3804 })?;
3805 return Ok(VmValue::String(Rc::from(id)));
3806 }
3807
3808 if !final_url.starts_with("http://") && !final_url.starts_with("https://") {
3809 return Err(vm_error(format!(
3810 "http_stream_open: URL must start with http:// or https://, got '{url}'"
3811 )));
3812 }
3813 crate::egress::enforce_url_allowed("http_stream_open", &final_url).await?;
3814 let config = parse_http_options(options);
3815 let client = if let Some(session_id) = session_from_options(options) {
3816 HTTP_SESSIONS
3817 .with(|sessions| sessions.borrow().get(&session_id).cloned())
3818 .map(|session| session.client)
3819 .ok_or_else(|| {
3820 vm_error(format!(
3821 "http_stream_open: unknown HTTP session '{session_id}'"
3822 ))
3823 })?
3824 } else {
3825 pooled_http_client(&config)?
3826 };
3827 let mut request = client
3828 .request(parts.method, &final_url)
3829 .headers(parts.headers)
3830 .timeout(Duration::from_millis(config.total_timeout_ms));
3831 if let Some(multipart) = &parts.multipart {
3832 request = request.multipart(multipart_form(multipart)?);
3833 } else if let Some(body) = parts.body {
3834 request = request.body(body);
3835 }
3836 let response = request
3837 .send()
3838 .await
3839 .map_err(|error| vm_error(format!("http_stream_open: request failed: {error}")))?;
3840 verify_tls_pin(&response, &config.tls.pinned_sha256)?;
3841 let status = response.status().as_u16() as i64;
3842 let headers = response_headers(response.headers());
3843 let handle = HttpStreamHandle {
3844 kind: HttpStreamKind::Real(Rc::new(tokio::sync::Mutex::new(response))),
3845 status,
3846 headers,
3847 pending: VecDeque::new(),
3848 closed: false,
3849 };
3850 HTTP_STREAMS.with(|streams| {
3851 let mut streams = streams.borrow_mut();
3852 if streams.len() >= MAX_HTTP_STREAMS {
3853 return Err(vm_error(format!(
3854 "http_stream_open: maximum open streams ({MAX_HTTP_STREAMS}) reached"
3855 )));
3856 }
3857 streams.insert(id.clone(), handle);
3858 Ok(())
3859 })?;
3860 Ok(VmValue::String(Rc::from(id)))
3861}
3862
3863async fn vm_http_stream_read(stream_id: &str, max_bytes: usize) -> Result<VmValue, VmError> {
3864 let (kind, mut pending, closed) = HTTP_STREAMS
3865 .with(|streams| {
3866 let mut streams = streams.borrow_mut();
3867 let handle = streams.get_mut(stream_id)?;
3868 let kind = match &handle.kind {
3869 HttpStreamKind::Real(response) => HttpStreamKind::Real(response.clone()),
3870 HttpStreamKind::Fake => HttpStreamKind::Fake,
3871 };
3872 let pending = std::mem::take(&mut handle.pending);
3873 Some((kind, pending, handle.closed))
3874 })
3875 .ok_or_else(|| vm_error(format!("http_stream_read: unknown stream '{stream_id}'")))?;
3876 if closed {
3877 return Ok(VmValue::Nil);
3878 }
3879 if pending.is_empty() {
3880 match kind {
3881 HttpStreamKind::Fake => {}
3882 HttpStreamKind::Real(response) => {
3883 let mut response = response.lock().await;
3884 if let Some(chunk) = response.chunk().await.map_err(|error| {
3885 vm_error(format!(
3886 "http_stream_read: failed to read response body: {error}"
3887 ))
3888 })? {
3889 pending.extend(chunk);
3890 }
3891 }
3892 }
3893 }
3894 if pending.is_empty() {
3895 HTTP_STREAMS.with(|streams| {
3896 if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
3897 handle.closed = true;
3898 }
3899 });
3900 return Ok(VmValue::Nil);
3901 }
3902 let take = pending.len().min(max_bytes.max(1));
3903 let chunk = pending.drain(..take).collect::<Vec<_>>();
3904 HTTP_STREAMS.with(|streams| {
3905 if let Some(handle) = streams.borrow_mut().get_mut(stream_id) {
3906 handle.pending = pending;
3907 }
3908 });
3909 Ok(VmValue::Bytes(Rc::new(chunk)))
3910}
3911
3912fn vm_http_stream_info(stream_id: &str) -> Result<VmValue, VmError> {
3913 HTTP_STREAMS.with(|streams| {
3914 let streams = streams.borrow();
3915 let handle = streams
3916 .get(stream_id)
3917 .ok_or_else(|| vm_error(format!("http_stream_info: unknown stream '{stream_id}'")))?;
3918 let mut dict = BTreeMap::new();
3919 dict.insert("status".to_string(), VmValue::Int(handle.status));
3920 dict.insert(
3921 "headers".to_string(),
3922 VmValue::Dict(Rc::new(handle.headers.clone())),
3923 );
3924 dict.insert(
3925 "ok".to_string(),
3926 VmValue::Bool((200..300).contains(&(handle.status as u16))),
3927 );
3928 Ok(VmValue::Dict(Rc::new(dict)))
3929 })
3930}
3931
3932async fn vm_sse_connect(
3933 method: &str,
3934 url: &str,
3935 options: &BTreeMap<String, VmValue>,
3936) -> Result<VmValue, VmError> {
3937 let id = next_transport_handle("sse");
3938 let max_events =
3939 transport_limit_option(options, "max_events", DEFAULT_MAX_STREAM_EVENTS).max(1);
3940 let max_message_bytes =
3941 transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
3942
3943 if let Some(events) = consume_sse_mock(url) {
3944 let handle = SseHandle {
3945 kind: SseHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeSseStream {
3946 events: events.into(),
3947 opened: false,
3948 closed: false,
3949 }))),
3950 url: url.to_string(),
3951 max_events,
3952 max_message_bytes,
3953 received: 0,
3954 };
3955 SSE_HANDLES.with(|handles| {
3956 let mut handles = handles.borrow_mut();
3957 if handles.len() >= MAX_SSE_STREAMS {
3958 return Err(vm_error(format!(
3959 "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
3960 )));
3961 }
3962 handles.insert(id.clone(), handle);
3963 Ok(())
3964 })?;
3965 record_transport_call(TransportMockCall {
3966 kind: "sse_connect".to_string(),
3967 handle: Some(id.clone()),
3968 url: url.to_string(),
3969 message_type: None,
3970 data: None,
3971 });
3972 return Ok(VmValue::String(Rc::from(id)));
3973 }
3974
3975 if !url.starts_with("http://") && !url.starts_with("https://") {
3976 return Err(vm_error(format!(
3977 "sse_connect: URL must start with http:// or https://, got '{url}'"
3978 )));
3979 }
3980 crate::egress::enforce_url_allowed("sse_connect", url).await?;
3981
3982 let config = parse_http_options(options);
3983 let client = if let Some(session_id) = session_from_options(options) {
3984 let session = HTTP_SESSIONS.with(|sessions| sessions.borrow().get(&session_id).cloned());
3985 session
3986 .map(|session| session.client)
3987 .ok_or_else(|| vm_error(format!("sse_connect: unknown HTTP session '{session_id}'")))?
3988 } else {
3989 pooled_http_client(&config)?
3990 };
3991 let parts = parse_http_request_parts(method, options)?;
3992 let mut request = client
3993 .request(parts.method, url)
3994 .headers(parts.headers)
3995 .timeout(Duration::from_millis(config.total_timeout_ms));
3996 if let Some(body) = parts.body {
3997 request = request.body(body);
3998 }
3999 let stream = EventSource::new(request)
4000 .map_err(|error| vm_error(format!("sse_connect: failed to create stream: {error}")))?;
4001 let handle = SseHandle {
4002 kind: SseHandleKind::Real(Rc::new(tokio::sync::Mutex::new(stream))),
4003 url: url.to_string(),
4004 max_events,
4005 max_message_bytes,
4006 received: 0,
4007 };
4008 SSE_HANDLES.with(|handles| {
4009 let mut handles = handles.borrow_mut();
4010 if handles.len() >= MAX_SSE_STREAMS {
4011 return Err(vm_error(format!(
4012 "sse_connect: maximum open streams ({MAX_SSE_STREAMS}) reached"
4013 )));
4014 }
4015 handles.insert(id.clone(), handle);
4016 Ok(())
4017 })?;
4018 Ok(VmValue::String(Rc::from(id)))
4019}
4020
4021async fn vm_sse_receive(stream_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4022 let stream = SSE_HANDLES.with(|handles| {
4023 let mut handles = handles.borrow_mut();
4024 let handle = handles.get_mut(stream_id)?;
4025 if handle.received >= handle.max_events {
4026 return Some(Err(vm_error(format!(
4027 "sse_receive: stream '{stream_id}' exceeded max_events"
4028 ))));
4029 }
4030 handle.received += 1;
4031 let url = handle.url.clone();
4032 let max_message_bytes = handle.max_message_bytes;
4033 let kind = match &handle.kind {
4034 SseHandleKind::Real(stream) => SseHandleKind::Real(stream.clone()),
4035 SseHandleKind::Fake(stream) => SseHandleKind::Fake(stream.clone()),
4036 };
4037 Some(Ok((kind, url, max_message_bytes)))
4038 });
4039 let Some(stream) = stream else {
4040 return Err(vm_error(format!(
4041 "sse_receive: unknown stream '{stream_id}'"
4042 )));
4043 };
4044 let (kind, _url, max_message_bytes) = stream?;
4045
4046 match kind {
4047 SseHandleKind::Fake(stream) => {
4048 let mut stream = stream.lock().await;
4049 if stream.closed {
4050 return Ok(VmValue::Nil);
4051 }
4052 if !stream.opened {
4053 stream.opened = true;
4054 let mut dict = BTreeMap::new();
4055 dict.insert("type".to_string(), VmValue::String(Rc::from("open")));
4056 return Ok(VmValue::Dict(Rc::new(dict)));
4057 }
4058 let Some(event) = stream.events.pop_front() else {
4059 stream.closed = true;
4060 return Ok(VmValue::Nil);
4061 };
4062 if event.data.len() > max_message_bytes {
4063 return Err(vm_error(format!(
4064 "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4065 )));
4066 }
4067 Ok(sse_event_value(&event))
4068 }
4069 SseHandleKind::Real(stream) => {
4070 let mut stream = stream.lock().await;
4071 let next = stream.next();
4072 let event = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await {
4073 Ok(Some(Ok(event))) => event,
4074 Ok(Some(Err(error))) => {
4075 return Err(vm_error(format!("sse_receive: stream error: {error}")));
4076 }
4077 Ok(None) => return Ok(VmValue::Nil),
4078 Err(_) => return Ok(timeout_event()),
4079 };
4080 if let SseEvent::Message(message) = &event {
4081 if message.data.len() > max_message_bytes {
4082 stream.close();
4083 return Err(vm_error(format!(
4084 "sse_receive: message exceeded max_message_bytes ({max_message_bytes})"
4085 )));
4086 }
4087 }
4088 Ok(real_sse_event_value(event))
4089 }
4090 }
4091}
4092
4093fn websocket_route_from_options(path: &str, options: &BTreeMap<String, VmValue>) -> WebSocketRoute {
4094 let bearer_token = options.get("auth").and_then(|auth| match auth {
4095 VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4096 other => {
4097 let value = other.display();
4098 (!value.is_empty()).then_some(value)
4099 }
4100 });
4101 WebSocketRoute {
4102 path: path.to_string(),
4103 bearer_token,
4104 max_messages: transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS)
4105 .max(1),
4106 max_message_bytes: transport_limit_option(
4107 options,
4108 "max_message_bytes",
4109 DEFAULT_MAX_MESSAGE_BYTES,
4110 )
4111 .max(1),
4112 send_buffer_messages: transport_limit_option(options, "send_buffer_messages", 64),
4113 idle_timeout_ms: vm_get_int_option(
4114 options,
4115 "idle_timeout_ms",
4116 DEFAULT_WEBSOCKET_SERVER_IDLE_TIMEOUT_MS as i64,
4117 )
4118 .max(0) as u64,
4119 }
4120}
4121
4122fn vm_websocket_server(
4123 bind: &str,
4124 options: &BTreeMap<String, VmValue>,
4125) -> Result<VmValue, VmError> {
4126 let listener = TcpListener::bind(bind)
4127 .map_err(|error| vm_error(format!("websocket_server: bind failed: {error}")))?;
4128 listener
4129 .set_nonblocking(true)
4130 .map_err(|error| vm_error(format!("websocket_server: nonblocking failed: {error}")))?;
4131 let local_addr = listener
4132 .local_addr()
4133 .map_err(|error| vm_error(format!("websocket_server: local addr failed: {error}")))?;
4134 let id = next_transport_handle("websocket-server");
4135 let addr = local_addr.to_string();
4136 let url = format!("ws://{addr}");
4137 let routes = Arc::new(RwLock::new(HashMap::<String, WebSocketRoute>::new()));
4138 if let Some(path) = options
4139 .get("path")
4140 .map(|value| value.display())
4141 .filter(|path| !path.is_empty())
4142 {
4143 if !path.starts_with('/') {
4144 return Err(vm_error("websocket_server: path must start with '/'"));
4145 }
4146 routes
4147 .write()
4148 .expect("websocket routes poisoned")
4149 .insert(path.clone(), websocket_route_from_options(&path, options));
4150 }
4151 let (event_tx, event_rx) = mpsc::channel();
4152 let running = Arc::new(AtomicBool::new(true));
4153 let server_routes = routes.clone();
4154 let server_running = running.clone();
4155 thread::Builder::new()
4156 .name(format!("harn-ws-{id}"))
4157 .spawn(move || websocket_server_loop(listener, server_routes, event_tx, server_running))
4158 .map_err(|error| vm_error(format!("websocket_server: spawn failed: {error}")))?;
4159 WEBSOCKET_SERVERS.with(|servers| {
4160 let mut servers = servers.borrow_mut();
4161 if servers.len() >= MAX_WEBSOCKET_SERVERS {
4162 running.store(false, Ordering::SeqCst);
4163 let _ = TcpStream::connect(&addr);
4164 return Err(vm_error(format!(
4165 "websocket_server: maximum open servers ({MAX_WEBSOCKET_SERVERS}) reached"
4166 )));
4167 }
4168 servers.insert(
4169 id.clone(),
4170 WebSocketServer {
4171 addr: addr.clone(),
4172 routes,
4173 events: Rc::new(tokio::sync::Mutex::new(event_rx)),
4174 running,
4175 },
4176 );
4177 Ok(())
4178 })?;
4179 let mut dict = BTreeMap::new();
4180 dict.insert("id".to_string(), VmValue::String(Rc::from(id)));
4181 dict.insert("addr".to_string(), VmValue::String(Rc::from(addr)));
4182 dict.insert("url".to_string(), VmValue::String(Rc::from(url)));
4183 Ok(VmValue::Dict(Rc::new(dict)))
4184}
4185
4186fn vm_websocket_route(
4187 server_id: &str,
4188 path: &str,
4189 options: &BTreeMap<String, VmValue>,
4190) -> Result<VmValue, VmError> {
4191 let routes = WEBSOCKET_SERVERS.with(|servers| {
4192 servers
4193 .borrow()
4194 .get(server_id)
4195 .map(|server| server.routes.clone())
4196 });
4197 let Some(routes) = routes else {
4198 return Err(vm_error(format!(
4199 "websocket_route: unknown server '{server_id}'"
4200 )));
4201 };
4202 routes.write().expect("websocket routes poisoned").insert(
4203 path.to_string(),
4204 websocket_route_from_options(path, options),
4205 );
4206 Ok(VmValue::Bool(true))
4207}
4208
4209async fn vm_websocket_accept(server_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4210 let receiver = WEBSOCKET_SERVERS.with(|servers| {
4211 servers
4212 .borrow()
4213 .get(server_id)
4214 .map(|server| server.events.clone())
4215 });
4216 let Some(receiver) = receiver else {
4217 return Err(vm_error(format!(
4218 "websocket_accept: unknown server '{server_id}'"
4219 )));
4220 };
4221 let started = std::time::Instant::now();
4222 loop {
4223 let event = {
4224 let receiver = receiver.lock().await;
4225 receiver.try_recv()
4226 };
4227 match event {
4228 Ok(event) => return register_accepted_websocket(event),
4229 Err(mpsc::TryRecvError::Disconnected) => return Ok(VmValue::Nil),
4230 Err(mpsc::TryRecvError::Empty) => {
4231 if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4232 return Ok(timeout_event());
4233 }
4234 tokio::time::sleep(Duration::from_millis(10)).await;
4235 }
4236 }
4237 }
4238}
4239
4240fn register_accepted_websocket(event: WebSocketServerEvent) -> Result<VmValue, VmError> {
4241 let WebSocketServerEvent {
4242 handle,
4243 path,
4244 peer,
4245 headers,
4246 max_messages,
4247 max_message_bytes,
4248 } = event;
4249 let id = next_transport_handle("websocket");
4250 WEBSOCKET_HANDLES.with(|handles| {
4251 let mut handles = handles.borrow_mut();
4252 if handles.len() >= MAX_WEBSOCKETS {
4253 return Err(vm_error(format!(
4254 "websocket_accept: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4255 )));
4256 }
4257 handles.insert(
4258 id.clone(),
4259 WebSocketHandle {
4260 kind: WebSocketHandleKind::Server(Rc::new(tokio::sync::Mutex::new(handle))),
4261 url: path.clone(),
4262 max_messages,
4263 max_message_bytes,
4264 received: 0,
4265 },
4266 );
4267 Ok(())
4268 })?;
4269 let mut metadata = BTreeMap::new();
4270 metadata.insert("id".to_string(), VmValue::String(Rc::from(id)));
4271 metadata.insert("path".to_string(), VmValue::String(Rc::from(path)));
4272 metadata.insert("peer".to_string(), VmValue::String(Rc::from(peer)));
4273 metadata.insert(
4274 "headers".to_string(),
4275 VmValue::Dict(Rc::new(
4276 headers
4277 .into_iter()
4278 .map(|(name, value)| (name, VmValue::String(Rc::from(value))))
4279 .collect(),
4280 )),
4281 );
4282 Ok(VmValue::Dict(Rc::new(metadata)))
4283}
4284
4285fn vm_websocket_server_close(server_id: &str) -> Result<VmValue, VmError> {
4286 let server = WEBSOCKET_SERVERS.with(|servers| servers.borrow_mut().remove(server_id));
4287 let Some(server) = server else {
4288 return Ok(VmValue::Bool(false));
4289 };
4290 server.running.store(false, Ordering::SeqCst);
4291 let _ = TcpStream::connect(&server.addr);
4292 Ok(VmValue::Bool(true))
4293}
4294
4295fn websocket_server_loop(
4296 listener: TcpListener,
4297 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4298 event_tx: mpsc::Sender<WebSocketServerEvent>,
4299 running: Arc<AtomicBool>,
4300) {
4301 while running.load(Ordering::SeqCst) {
4302 match listener.accept() {
4303 Ok((stream, peer)) => {
4304 let routes = routes.clone();
4305 let event_tx = event_tx.clone();
4306 let running = running.clone();
4307 let peer = peer.to_string();
4308 let _ = thread::Builder::new()
4309 .name("harn-ws-conn".to_string())
4310 .spawn(move || {
4311 websocket_connection_thread(stream, peer, routes, event_tx, running);
4312 });
4313 }
4314 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
4315 thread::sleep(Duration::from_millis(10));
4316 }
4317 Err(_) => break,
4318 }
4319 }
4320}
4321
4322fn error_response(status: StatusCode, body: &str) -> ErrorResponse {
4323 let mut response = ErrorResponse::new(Some(body.to_string()));
4324 *response.status_mut() = status;
4325 response
4326}
4327
4328fn route_for_request(
4329 request: &Request,
4330 routes: &Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4331) -> Result<WebSocketRoute, ErrorResponse> {
4332 let path = request.uri().path().to_string();
4333 let Some(route) = routes
4334 .read()
4335 .expect("websocket routes poisoned")
4336 .get(&path)
4337 .cloned()
4338 else {
4339 return Err(error_response(
4340 StatusCode::NOT_FOUND,
4341 "websocket route not found",
4342 ));
4343 };
4344 if let Some(token) = route.bearer_token.as_ref() {
4345 let expected = format!("Bearer {token}");
4346 let authorized = request
4347 .headers()
4348 .get("authorization")
4349 .and_then(|value| value.to_str().ok())
4350 .map(|value| value == expected)
4351 .unwrap_or(false);
4352 if !authorized {
4353 return Err(error_response(
4354 StatusCode::UNAUTHORIZED,
4355 "websocket route unauthorized",
4356 ));
4357 }
4358 }
4359 Ok(route)
4360}
4361
4362fn websocket_connection_thread(
4363 stream: TcpStream,
4364 peer: String,
4365 routes: Arc<RwLock<HashMap<String, WebSocketRoute>>>,
4366 event_tx: mpsc::Sender<WebSocketServerEvent>,
4367 running: Arc<AtomicBool>,
4368) {
4369 let accepted_route = Arc::new(std::sync::Mutex::new(
4370 None::<(WebSocketRoute, BTreeMap<String, String>)>,
4371 ));
4372 let callback_route = accepted_route.clone();
4373 let callback =
4374 move |request: &Request, response: Response| -> Result<Response, ErrorResponse> {
4375 let route = route_for_request(request, &routes)?;
4376 let headers = request
4377 .headers()
4378 .iter()
4379 .filter_map(|(name, value)| {
4380 value
4381 .to_str()
4382 .ok()
4383 .map(|value| (name.as_str().to_ascii_lowercase(), value.to_string()))
4384 })
4385 .collect::<BTreeMap<_, _>>();
4386 *callback_route
4387 .lock()
4388 .expect("websocket route metadata poisoned") = Some((route, headers));
4389 Ok(response)
4390 };
4391 let Ok(mut socket) = tokio_tungstenite::tungstenite::accept_hdr(stream, callback) else {
4392 return;
4393 };
4394 let Some((route, headers)) = accepted_route
4395 .lock()
4396 .expect("websocket route metadata poisoned")
4397 .clone()
4398 else {
4399 let _ = socket.close(None);
4400 return;
4401 };
4402 let _ = socket
4403 .get_mut()
4404 .set_read_timeout(Some(Duration::from_millis(50)));
4405 let (incoming_tx, incoming_rx) = mpsc::channel();
4406 let (outgoing_tx, outgoing_rx) = mpsc::sync_channel(route.send_buffer_messages);
4407 let event = WebSocketServerEvent {
4408 handle: ServerWebSocket {
4409 incoming: VecDeque::new(),
4410 incoming_rx,
4411 outgoing_tx,
4412 closed: false,
4413 },
4414 path: route.path.clone(),
4415 peer,
4416 headers,
4417 max_messages: route.max_messages,
4418 max_message_bytes: route.max_message_bytes,
4419 };
4420 if event_tx.send(event).is_err() {
4421 let _ = socket.close(None);
4422 return;
4423 }
4424 let mut last_activity = std::time::Instant::now();
4425 while running.load(Ordering::SeqCst) {
4426 while let Ok(command) = outgoing_rx.try_recv() {
4427 match command {
4428 ServerWebSocketCommand::Send(message) => {
4429 let Ok(message) = real_ws_message(&message) else {
4430 continue;
4431 };
4432 if socket.send(message).is_err() {
4433 return;
4434 }
4435 last_activity = std::time::Instant::now();
4436 }
4437 ServerWebSocketCommand::Close(code, reason) => {
4438 let _ = socket.close(close_frame(code, reason));
4439 return;
4440 }
4441 }
4442 }
4443 if route.idle_timeout_ms > 0
4444 && last_activity.elapsed() >= Duration::from_millis(route.idle_timeout_ms)
4445 {
4446 let _ = socket.close(close_frame(Some(1001), Some("idle timeout".to_string())));
4447 let _ = incoming_tx.send(MockWsMessage {
4448 message_type: "close".to_string(),
4449 data: Vec::new(),
4450 close_code: Some(1001),
4451 close_reason: Some("idle timeout".to_string()),
4452 });
4453 return;
4454 }
4455 match socket.read() {
4456 Ok(message) => {
4457 last_activity = std::time::Instant::now();
4458 if incoming_tx
4459 .send(mock_ws_message_from_real(message))
4460 .is_err()
4461 {
4462 return;
4463 }
4464 }
4465 Err(tokio_tungstenite::tungstenite::Error::Io(error))
4466 if error.kind() == std::io::ErrorKind::WouldBlock
4467 || error.kind() == std::io::ErrorKind::TimedOut =>
4468 {
4469 continue;
4470 }
4471 Err(_) => {
4472 let _ = incoming_tx.send(MockWsMessage {
4473 message_type: "close".to_string(),
4474 data: Vec::new(),
4475 close_code: None,
4476 close_reason: None,
4477 });
4478 return;
4479 }
4480 }
4481 }
4482 let _ = socket.get_mut().shutdown(Shutdown::Both);
4483}
4484
4485async fn vm_websocket_connect(
4486 url: &str,
4487 options: &BTreeMap<String, VmValue>,
4488) -> Result<VmValue, VmError> {
4489 let id = next_transport_handle("websocket");
4490 let max_messages =
4491 transport_limit_option(options, "max_messages", DEFAULT_MAX_STREAM_EVENTS).max(1);
4492 let max_message_bytes =
4493 transport_limit_option(options, "max_message_bytes", DEFAULT_MAX_MESSAGE_BYTES).max(1);
4494
4495 if let Some((messages, echo)) = consume_websocket_mock(url) {
4496 let handle = WebSocketHandle {
4497 kind: WebSocketHandleKind::Fake(Rc::new(tokio::sync::Mutex::new(FakeWebSocket {
4498 messages: messages.into(),
4499 echo,
4500 closed: false,
4501 }))),
4502 url: url.to_string(),
4503 max_messages,
4504 max_message_bytes,
4505 received: 0,
4506 };
4507 WEBSOCKET_HANDLES.with(|handles| {
4508 let mut handles = handles.borrow_mut();
4509 if handles.len() >= MAX_WEBSOCKETS {
4510 return Err(vm_error(format!(
4511 "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4512 )));
4513 }
4514 handles.insert(id.clone(), handle);
4515 Ok(())
4516 })?;
4517 record_transport_call(TransportMockCall {
4518 kind: "websocket_connect".to_string(),
4519 handle: Some(id.clone()),
4520 url: url.to_string(),
4521 message_type: None,
4522 data: None,
4523 });
4524 return Ok(VmValue::String(Rc::from(id)));
4525 }
4526
4527 if !url.starts_with("ws://") && !url.starts_with("wss://") {
4528 return Err(vm_error(format!(
4529 "websocket_connect: URL must start with ws:// or wss://, got '{url}'"
4530 )));
4531 }
4532 crate::egress::enforce_url_allowed("websocket_connect", url).await?;
4533 let timeout_ms = vm_get_int_option_prefer(
4534 options,
4535 "timeout_ms",
4536 "timeout",
4537 DEFAULT_TIMEOUT_MS as i64,
4538 )
4539 .max(0) as u64;
4540 let request = websocket_client_request(url, options)?;
4541 let connect = tokio_tungstenite::connect_async(request);
4542 let (socket, _) = tokio::time::timeout(Duration::from_millis(timeout_ms), connect)
4543 .await
4544 .map_err(|_| vm_error(format!("websocket_connect: timed out after {timeout_ms}ms")))?
4545 .map_err(|error| vm_error(format!("websocket_connect: failed: {error}")))?;
4546 let handle = WebSocketHandle {
4547 kind: WebSocketHandleKind::Real(Rc::new(tokio::sync::Mutex::new(socket))),
4548 url: url.to_string(),
4549 max_messages,
4550 max_message_bytes,
4551 received: 0,
4552 };
4553 WEBSOCKET_HANDLES.with(|handles| {
4554 let mut handles = handles.borrow_mut();
4555 if handles.len() >= MAX_WEBSOCKETS {
4556 return Err(vm_error(format!(
4557 "websocket_connect: maximum open sockets ({MAX_WEBSOCKETS}) reached"
4558 )));
4559 }
4560 handles.insert(id.clone(), handle);
4561 Ok(())
4562 })?;
4563 Ok(VmValue::String(Rc::from(id)))
4564}
4565
4566fn websocket_message_from_vm(
4567 value: VmValue,
4568 options: &BTreeMap<String, VmValue>,
4569) -> Result<MockWsMessage, VmError> {
4570 let message_type = options
4571 .get("type")
4572 .map(|value| value.display())
4573 .filter(|value| !value.is_empty())
4574 .unwrap_or_else(|| match value {
4575 VmValue::Bytes(_) => "binary".to_string(),
4576 _ => "text".to_string(),
4577 });
4578 let data = match value {
4579 VmValue::Bytes(bytes) => bytes.as_ref().clone(),
4580 other
4581 if options
4582 .get("base64")
4583 .and_then(|value| match value {
4584 VmValue::Bool(value) => Some(*value),
4585 _ => None,
4586 })
4587 .unwrap_or(false) =>
4588 {
4589 use base64::Engine;
4590 base64::engine::general_purpose::STANDARD
4591 .decode(other.display())
4592 .map_err(|error| vm_error(format!("websocket_send: invalid base64: {error}")))?
4593 }
4594 other => other.display().into_bytes(),
4595 };
4596 Ok(MockWsMessage {
4597 message_type,
4598 data,
4599 close_code: options
4600 .get("code")
4601 .or_else(|| options.get("close_code"))
4602 .and_then(|value| value.as_int())
4603 .map(|value| value as u16),
4604 close_reason: options
4605 .get("reason")
4606 .or_else(|| options.get("close_reason"))
4607 .map(|value| value.display()),
4608 })
4609}
4610
4611fn websocket_client_request(
4612 url: &str,
4613 options: &BTreeMap<String, VmValue>,
4614) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, VmError> {
4615 let mut request = url
4616 .into_client_request()
4617 .map_err(|error| vm_error(format!("websocket_connect: invalid request: {error}")))?;
4618 if let Some(headers) = options.get("headers").and_then(|value| value.as_dict()) {
4619 for (name, value) in headers {
4620 let header_name = tokio_tungstenite::tungstenite::http::header::HeaderName::from_bytes(
4621 name.as_bytes(),
4622 )
4623 .map_err(|error| {
4624 vm_error(format!(
4625 "websocket_connect: invalid header name '{name}': {error}"
4626 ))
4627 })?;
4628 let header_value = HeaderValue::from_str(&value.display()).map_err(|error| {
4629 vm_error(format!(
4630 "websocket_connect: invalid header value for '{name}': {error}"
4631 ))
4632 })?;
4633 request.headers_mut().insert(header_name, header_value);
4634 }
4635 }
4636 if let Some(auth) = options.get("auth") {
4637 let bearer = match auth {
4638 VmValue::Dict(dict) => dict.get("bearer").map(|value| value.display()),
4639 other => Some(other.display()),
4640 };
4641 if let Some(token) = bearer.filter(|token| !token.is_empty()) {
4642 let value = HeaderValue::from_str(&format!("Bearer {token}")).map_err(|error| {
4643 vm_error(format!(
4644 "websocket_connect: invalid authorization header: {error}"
4645 ))
4646 })?;
4647 request.headers_mut().insert("authorization", value);
4648 }
4649 }
4650 Ok(request)
4651}
4652
4653fn close_frame(code: Option<u16>, reason: Option<String>) -> Option<CloseFrame> {
4654 code.or(reason.as_ref().map(|_| 1000))
4655 .map(|code| CloseFrame {
4656 code: CloseCode::from(code),
4657 reason: reason.unwrap_or_default().into(),
4658 })
4659}
4660
4661fn real_ws_message(message: &MockWsMessage) -> Result<WsMessage, VmError> {
4662 match message.message_type.as_str() {
4663 "text" => Ok(WsMessage::Text(
4664 String::from_utf8(message.data.clone())
4665 .map_err(|error| vm_error(format!("websocket_send: text is not UTF-8: {error}")))?
4666 .into(),
4667 )),
4668 "binary" => Ok(WsMessage::Binary(message.data.clone().into())),
4669 "ping" => Ok(WsMessage::Ping(message.data.clone().into())),
4670 "pong" => Ok(WsMessage::Pong(message.data.clone().into())),
4671 "close" => Ok(WsMessage::Close(close_frame(
4672 message.close_code,
4673 message.close_reason.clone(),
4674 ))),
4675 other => Err(vm_error(format!(
4676 "websocket_send: unsupported message type '{other}'"
4677 ))),
4678 }
4679}
4680
4681fn mock_ws_message_from_real(message: WsMessage) -> MockWsMessage {
4682 match message {
4683 WsMessage::Text(text) => MockWsMessage {
4684 message_type: "text".to_string(),
4685 data: text.as_bytes().to_vec(),
4686 close_code: None,
4687 close_reason: None,
4688 },
4689 WsMessage::Binary(bytes) => MockWsMessage {
4690 message_type: "binary".to_string(),
4691 data: bytes.to_vec(),
4692 close_code: None,
4693 close_reason: None,
4694 },
4695 WsMessage::Ping(bytes) => MockWsMessage {
4696 message_type: "ping".to_string(),
4697 data: bytes.to_vec(),
4698 close_code: None,
4699 close_reason: None,
4700 },
4701 WsMessage::Pong(bytes) => MockWsMessage {
4702 message_type: "pong".to_string(),
4703 data: bytes.to_vec(),
4704 close_code: None,
4705 close_reason: None,
4706 },
4707 WsMessage::Close(frame) => {
4708 let (close_code, close_reason) = frame
4709 .map(|frame| (Some(u16::from(frame.code)), Some(frame.reason.to_string())))
4710 .unwrap_or((None, None));
4711 MockWsMessage {
4712 message_type: "close".to_string(),
4713 data: Vec::new(),
4714 close_code,
4715 close_reason,
4716 }
4717 }
4718 WsMessage::Frame(_) => MockWsMessage {
4719 message_type: "close".to_string(),
4720 data: Vec::new(),
4721 close_code: None,
4722 close_reason: None,
4723 },
4724 }
4725}
4726
4727async fn vm_websocket_send(
4728 socket_id: &str,
4729 value: VmValue,
4730 options: &BTreeMap<String, VmValue>,
4731) -> Result<VmValue, VmError> {
4732 let message = websocket_message_from_vm(value, options)?;
4733 let socket = WEBSOCKET_HANDLES.with(|handles| {
4734 let handles = handles.borrow();
4735 let handle = handles.get(socket_id)?;
4736 let url = handle.url.clone();
4737 let max_message_bytes = handle.max_message_bytes;
4738 let kind = match &handle.kind {
4739 WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4740 WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4741 WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4742 };
4743 Some((kind, url, max_message_bytes))
4744 });
4745 let Some((kind, url, max_message_bytes)) = socket else {
4746 return Err(vm_error(format!(
4747 "websocket_send: unknown socket '{socket_id}'"
4748 )));
4749 };
4750 if message.data.len() > max_message_bytes {
4751 return Err(vm_error(format!(
4752 "websocket_send: message exceeded max_message_bytes ({max_message_bytes})"
4753 )));
4754 }
4755 match kind {
4756 WebSocketHandleKind::Fake(socket) => {
4757 let mut socket = socket.lock().await;
4758 if socket.closed {
4759 return Ok(VmValue::Bool(false));
4760 }
4761 if message.message_type == "close" {
4762 socket.closed = true;
4763 } else if socket.echo {
4764 socket.messages.push_back(message.clone());
4765 }
4766 record_transport_call(TransportMockCall {
4767 kind: "websocket_send".to_string(),
4768 handle: Some(socket_id.to_string()),
4769 url,
4770 message_type: Some(message.message_type.clone()),
4771 data: Some(ws_message_data(&message)),
4772 });
4773 Ok(VmValue::Bool(true))
4774 }
4775 WebSocketHandleKind::Real(socket) => {
4776 let mut socket = socket.lock().await;
4777 socket
4778 .send(real_ws_message(&message)?)
4779 .await
4780 .map_err(|error| vm_error(format!("websocket_send: failed: {error}")))?;
4781 Ok(VmValue::Bool(true))
4782 }
4783 WebSocketHandleKind::Server(socket) => {
4784 let mut socket = socket.lock().await;
4785 if socket.closed {
4786 return Ok(VmValue::Bool(false));
4787 }
4788 let command = if message.message_type == "close" {
4789 socket.closed = true;
4790 ServerWebSocketCommand::Close(message.close_code, message.close_reason.clone())
4791 } else {
4792 ServerWebSocketCommand::Send(message.clone())
4793 };
4794 socket
4795 .outgoing_tx
4796 .try_send(command)
4797 .map_err(|error| match error {
4798 mpsc::TrySendError::Full(_) => vm_error("websocket_send: send buffer full"),
4799 mpsc::TrySendError::Disconnected(_) => {
4800 vm_error("websocket_send: connection closed")
4801 }
4802 })?;
4803 Ok(VmValue::Bool(true))
4804 }
4805 }
4806}
4807
4808async fn vm_websocket_receive(socket_id: &str, timeout_ms: u64) -> Result<VmValue, VmError> {
4809 let socket = WEBSOCKET_HANDLES.with(|handles| {
4810 let mut handles = handles.borrow_mut();
4811 let handle = handles.get_mut(socket_id)?;
4812 if handle.received >= handle.max_messages {
4813 return Some(Err(vm_error(format!(
4814 "websocket_receive: socket '{socket_id}' exceeded max_messages"
4815 ))));
4816 }
4817 handle.received += 1;
4818 let max_message_bytes = handle.max_message_bytes;
4819 let kind = match &handle.kind {
4820 WebSocketHandleKind::Real(socket) => WebSocketHandleKind::Real(socket.clone()),
4821 WebSocketHandleKind::Fake(socket) => WebSocketHandleKind::Fake(socket.clone()),
4822 WebSocketHandleKind::Server(socket) => WebSocketHandleKind::Server(socket.clone()),
4823 };
4824 Some(Ok((kind, max_message_bytes)))
4825 });
4826 let Some(socket) = socket else {
4827 return Err(vm_error(format!(
4828 "websocket_receive: unknown socket '{socket_id}'"
4829 )));
4830 };
4831 let (kind, max_message_bytes) = socket?;
4832 match kind {
4833 WebSocketHandleKind::Fake(socket) => {
4834 let mut socket = socket.lock().await;
4835 if socket.closed {
4836 return Ok(VmValue::Nil);
4837 }
4838 let Some(message) = socket.messages.pop_front() else {
4839 return Ok(timeout_event());
4840 };
4841 if message.data.len() > max_message_bytes {
4842 socket.closed = true;
4843 return Err(vm_error(format!(
4844 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4845 )));
4846 }
4847 if message.message_type == "close" {
4848 socket.closed = true;
4849 }
4850 Ok(ws_event_value(message))
4851 }
4852 WebSocketHandleKind::Real(socket) => {
4853 let mut socket = socket.lock().await;
4854 let next = socket.next();
4855 let message = match tokio::time::timeout(Duration::from_millis(timeout_ms), next).await
4856 {
4857 Ok(Some(Ok(message))) => message,
4858 Ok(Some(Err(error))) => {
4859 return Err(vm_error(format!("websocket_receive: failed: {error}")));
4860 }
4861 Ok(None) => return Ok(VmValue::Nil),
4862 Err(_) => return Ok(timeout_event()),
4863 };
4864 match &message {
4865 WsMessage::Text(text) if text.len() > max_message_bytes => {
4866 return Err(vm_error(format!(
4867 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4868 )));
4869 }
4870 WsMessage::Binary(bytes) | WsMessage::Ping(bytes) | WsMessage::Pong(bytes)
4871 if bytes.len() > max_message_bytes =>
4872 {
4873 return Err(vm_error(format!(
4874 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4875 )));
4876 }
4877 _ => {}
4878 }
4879 Ok(real_ws_event_value(message))
4880 }
4881 WebSocketHandleKind::Server(socket) => {
4882 let started = std::time::Instant::now();
4883 loop {
4884 let message = {
4885 let mut socket = socket.lock().await;
4886 if socket.closed {
4887 return Ok(VmValue::Nil);
4888 }
4889 while let Ok(message) = socket.incoming_rx.try_recv() {
4890 socket.incoming.push_back(message);
4891 }
4892 socket.incoming.pop_front()
4893 };
4894 if let Some(message) = message {
4895 if message.data.len() > max_message_bytes {
4896 let mut socket = socket.lock().await;
4897 socket.closed = true;
4898 let _ = socket.outgoing_tx.try_send(ServerWebSocketCommand::Close(
4899 Some(1009),
4900 Some("message too large".to_string()),
4901 ));
4902 return Err(vm_error(format!(
4903 "websocket_receive: message exceeded max_message_bytes ({max_message_bytes})"
4904 )));
4905 }
4906 if message.message_type == "close" {
4907 let mut socket = socket.lock().await;
4908 socket.closed = true;
4909 }
4910 return Ok(ws_event_value(message));
4911 }
4912 if timeout_ms == 0 || started.elapsed() >= Duration::from_millis(timeout_ms) {
4913 return Ok(timeout_event());
4914 }
4915 tokio::time::sleep(Duration::from_millis(10)).await;
4916 }
4917 }
4918 }
4919}
4920
4921async fn vm_websocket_close(socket_id: &str) -> Result<VmValue, VmError> {
4922 let removed = WEBSOCKET_HANDLES.with(|handles| handles.borrow_mut().remove(socket_id));
4923 let Some(handle) = removed else {
4924 return Ok(VmValue::Bool(false));
4925 };
4926 match handle.kind {
4927 WebSocketHandleKind::Fake(socket) => {
4928 socket.lock().await.closed = true;
4929 record_transport_call(TransportMockCall {
4930 kind: "websocket_close".to_string(),
4931 handle: Some(socket_id.to_string()),
4932 url: handle.url,
4933 message_type: None,
4934 data: None,
4935 });
4936 Ok(VmValue::Bool(true))
4937 }
4938 WebSocketHandleKind::Real(socket) => {
4939 let mut socket = socket.lock().await;
4940 socket
4941 .close(None)
4942 .await
4943 .map_err(|error| vm_error(format!("websocket_close: failed: {error}")))?;
4944 Ok(VmValue::Bool(true))
4945 }
4946 WebSocketHandleKind::Server(socket) => {
4947 let mut socket = socket.lock().await;
4948 socket.closed = true;
4949 let _ = socket
4950 .outgoing_tx
4951 .try_send(ServerWebSocketCommand::Close(None, None));
4952 Ok(VmValue::Bool(true))
4953 }
4954 }
4955}
4956
4957#[cfg(test)]
4958mod tests {
4959 use super::{
4960 compute_retry_delay, handle_from_value, http_mock_calls_snapshot, mock_call_headers_value,
4961 parse_retry_after_value, push_http_mock, redact_mock_call_url, reset_http_state,
4962 vm_execute_http_request, vm_http_download, vm_http_stream_info, vm_http_stream_open,
4963 vm_http_stream_read, vm_sse_event_frame, vm_sse_server_cancel, vm_sse_server_heartbeat,
4964 vm_sse_server_mock_disconnect, vm_sse_server_mock_receive, vm_sse_server_observed_bool,
4965 vm_sse_server_response, vm_sse_server_send, HttpMockResponse,
4966 };
4967 use crate::connectors::test_util::{
4968 accept_http_connection, read_http_request, write_http_response,
4969 };
4970 use crate::value::VmValue;
4971 use base64::Engine;
4972 use rcgen::generate_simple_self_signed;
4973 use rustls::pki_types::PrivatePkcs8KeyDer;
4974 use rustls::{ServerConfig, ServerConnection, StreamOwned};
4975 use sha2::{Digest, Sha256};
4976 use std::collections::BTreeMap;
4977 use std::io::{Read, Write};
4978 use std::net::TcpListener;
4979 use std::rc::Rc;
4980 use std::sync::{Arc, Once};
4981 use std::time::{Duration, SystemTime};
4982 use tempfile::TempDir;
4983 use x509_parser::prelude::{FromDer, X509Certificate};
4984
4985 fn expect_bool(value: VmValue) -> bool {
4986 let VmValue::Bool(value) = value else {
4987 panic!("expected bool, got {}", value.display());
4988 };
4989 value
4990 }
4991
4992 #[test]
4993 fn parses_retry_after_delta_seconds() {
4994 assert_eq!(parse_retry_after_value("5"), Some(Duration::from_secs(5)));
4995 }
4996
4997 #[test]
4998 fn parses_retry_after_http_date() {
4999 let header = httpdate::fmt_http_date(SystemTime::now() + Duration::from_secs(2));
5000 let parsed = parse_retry_after_value(&header).expect("http-date should parse");
5001 assert!(parsed <= Duration::from_secs(2));
5002 assert!(parsed <= Duration::from_secs(60));
5003 }
5004
5005 #[test]
5006 fn malformed_retry_after_returns_none() {
5007 assert_eq!(parse_retry_after_value("soon-ish"), None);
5008 }
5009
5010 #[test]
5011 fn retry_delay_honors_retry_after_floor() {
5012 let delay = compute_retry_delay(0, 1, Some(Duration::from_millis(250)));
5013 assert!(delay >= Duration::from_millis(250));
5014 assert!(delay <= Duration::from_secs(60));
5015 }
5016
5017 #[tokio::test]
5018 async fn typed_mock_api_drives_http_request_retries() {
5019 reset_http_state();
5020 push_http_mock(
5021 "GET",
5022 "https://api.example.com/retry",
5023 vec![
5024 HttpMockResponse::new(503, "busy").with_header("retry-after", "0"),
5025 HttpMockResponse::new(200, "ok"),
5026 ],
5027 );
5028 let result = vm_execute_http_request(
5029 "GET",
5030 "https://api.example.com/retry",
5031 &BTreeMap::from([
5032 ("retries".to_string(), VmValue::Int(1)),
5033 ("backoff".to_string(), VmValue::Int(0)),
5034 ]),
5035 )
5036 .await
5037 .expect("mocked request should succeed after retry");
5038
5039 let dict = result.as_dict().expect("response dict");
5040 assert_eq!(dict["status"].as_int(), Some(200));
5041 let calls = http_mock_calls_snapshot();
5042 assert_eq!(calls.len(), 2);
5043 assert_eq!(calls[0].url, "https://api.example.com/retry");
5044 reset_http_state();
5045 }
5046
5047 #[tokio::test]
5048 async fn http_mock_records_normalized_headers_and_final_query_url() {
5049 reset_http_state();
5050 push_http_mock(
5051 "GET",
5052 "https://api.example.com/items?api_key=secret&limit=2",
5053 vec![HttpMockResponse::new(200, "ok")],
5054 );
5055 let options = BTreeMap::from([
5056 (
5057 "headers".to_string(),
5058 VmValue::Dict(Rc::new(BTreeMap::from([
5059 (
5060 "Authorization".to_string(),
5061 VmValue::String(Rc::from("Bearer secret")),
5062 ),
5063 ("X-Trace".to_string(), VmValue::String(Rc::from("trace-1"))),
5064 ]))),
5065 ),
5066 (
5067 "query".to_string(),
5068 VmValue::Dict(Rc::new(BTreeMap::from([
5069 ("api_key".to_string(), VmValue::String(Rc::from("secret"))),
5070 ("limit".to_string(), VmValue::Int(2)),
5071 ]))),
5072 ),
5073 ]);
5074
5075 let response = vm_execute_http_request("GET", "https://api.example.com/items", &options)
5076 .await
5077 .expect("mocked request with query");
5078 let response = response.as_dict().expect("response dict");
5079 assert_eq!(response["status"].as_int(), Some(200));
5080
5081 let calls = http_mock_calls_snapshot();
5082 assert_eq!(calls.len(), 1);
5083 assert_eq!(
5084 calls[0].url,
5085 "https://api.example.com/items?api_key=secret&limit=2"
5086 );
5087 assert_eq!(
5088 calls[0].headers.get("authorization").map(String::as_str),
5089 Some("Bearer secret")
5090 );
5091 assert_eq!(
5092 calls[0].headers.get("x-trace").map(String::as_str),
5093 Some("trace-1")
5094 );
5095 reset_http_state();
5096 }
5097
5098 #[test]
5099 fn mock_call_headers_redact_sensitive_values() {
5100 let headers = BTreeMap::from([
5101 (
5102 "authorization".to_string(),
5103 VmValue::String(Rc::from("Bearer secret")),
5104 ),
5105 (
5106 "accept".to_string(),
5107 VmValue::String(Rc::from("application/json")),
5108 ),
5109 ("x-api-key".to_string(), VmValue::String(Rc::from("secret"))),
5110 ]);
5111 let redacted = mock_call_headers_value(&headers, true);
5112 assert_eq!(redacted["authorization"].display(), "[redacted]");
5113 assert_eq!(redacted["x-api-key"].display(), "[redacted]");
5114 assert_eq!(redacted["accept"].display(), "application/json");
5115
5116 let raw = mock_call_headers_value(&headers, false);
5117 assert_eq!(raw["authorization"].display(), "Bearer secret");
5118 }
5119
5120 #[test]
5121 fn mock_call_url_redacts_sensitive_query_values() {
5122 assert_eq!(
5123 redact_mock_call_url(
5124 "https://api.example.com/items?api_key=secret&limit=2&access_token=token",
5125 true,
5126 ),
5127 "https://api.example.com/items?api_key=%5Bredacted%5D&limit=2&access_token=%5Bredacted%5D"
5128 );
5129 assert_eq!(
5130 redact_mock_call_url("https://api.example.com/items?api_key=secret", false),
5131 "https://api.example.com/items?api_key=secret"
5132 );
5133 assert_eq!(
5134 redact_mock_call_url("https://api.example.com/items?q=a%20b", true),
5135 "https://api.example.com/items?q=a%20b"
5136 );
5137 }
5138
5139 #[tokio::test]
5140 async fn multipart_requests_are_mock_visible() {
5141 reset_http_state();
5142 push_http_mock(
5143 "POST",
5144 "https://api.example.com/upload",
5145 vec![HttpMockResponse::new(201, "uploaded")],
5146 );
5147 let options = BTreeMap::from([(
5148 "multipart".to_string(),
5149 VmValue::List(Rc::new(vec![
5150 VmValue::Dict(Rc::new(BTreeMap::from([
5151 ("name".to_string(), VmValue::String(Rc::from("meta"))),
5152 ("value".to_string(), VmValue::String(Rc::from("hello"))),
5153 ]))),
5154 VmValue::Dict(Rc::new(BTreeMap::from([
5155 ("name".to_string(), VmValue::String(Rc::from("blob"))),
5156 (
5157 "filename".to_string(),
5158 VmValue::String(Rc::from("blob.bin")),
5159 ),
5160 (
5161 "content_type".to_string(),
5162 VmValue::String(Rc::from("application/octet-stream")),
5163 ),
5164 (
5165 "value".to_string(),
5166 VmValue::Bytes(Rc::new(vec![0, 1, 2, 3])),
5167 ),
5168 ]))),
5169 ])),
5170 )]);
5171
5172 let response = vm_execute_http_request("POST", "https://api.example.com/upload", &options)
5173 .await
5174 .expect("multipart mock request should succeed");
5175 let response = response.as_dict().expect("response dict");
5176 assert_eq!(response["status"].as_int(), Some(201));
5177
5178 let calls = http_mock_calls_snapshot();
5179 assert_eq!(calls.len(), 1);
5180 assert!(calls[0]
5181 .headers
5182 .get("content-type")
5183 .expect("content-type recorded")
5184 .contains("multipart/form-data"));
5185 let body = calls[0].body.as_deref().expect("multipart body recorded");
5186 assert!(body.contains("name=\"meta\""));
5187 assert!(body.contains("hello"));
5188 assert!(body.contains("filename=\"blob.bin\""));
5189 reset_http_state();
5190 }
5191
5192 #[tokio::test]
5193 async fn http_stream_mock_reads_in_chunks() {
5194 reset_http_state();
5195 push_http_mock(
5196 "GET",
5197 "https://api.example.com/stream",
5198 vec![HttpMockResponse::new(200, "stream-body")],
5199 );
5200
5201 let handle = vm_http_stream_open("https://api.example.com/stream", &BTreeMap::new())
5202 .await
5203 .expect("stream open");
5204 let stream_id = handle.display();
5205 let info = vm_http_stream_info(&stream_id).expect("stream info");
5206 let info = info.as_dict().expect("info dict");
5207 assert_eq!(info["status"].as_int(), Some(200));
5208
5209 let first = vm_http_stream_read(&stream_id, 6)
5210 .await
5211 .expect("first chunk");
5212 let second = vm_http_stream_read(&stream_id, 64)
5213 .await
5214 .expect("second chunk");
5215 let end = vm_http_stream_read(&stream_id, 64)
5216 .await
5217 .expect("end marker");
5218 assert_eq!(first.as_bytes().expect("bytes"), b"stream");
5219 assert_eq!(second.as_bytes().expect("bytes"), b"-body");
5220 assert!(matches!(end, VmValue::Nil));
5221 reset_http_state();
5222 }
5223
5224 #[tokio::test]
5225 async fn http_download_mock_writes_file() {
5226 reset_http_state();
5227 let temp = TempDir::new().expect("tempdir");
5228 let path = temp.path().join("download.bin");
5229 push_http_mock(
5230 "GET",
5231 "https://api.example.com/download",
5232 vec![HttpMockResponse::new(200, "downloaded")],
5233 );
5234
5235 let response = vm_http_download(
5236 "https://api.example.com/download",
5237 &path.display().to_string(),
5238 &BTreeMap::new(),
5239 )
5240 .await
5241 .expect("download response");
5242 let response = response.as_dict().expect("response dict");
5243 assert_eq!(response["bytes_written"].as_int(), Some(10));
5244 assert_eq!(
5245 std::fs::read_to_string(path).expect("downloaded file"),
5246 "downloaded"
5247 );
5248 reset_http_state();
5249 }
5250
5251 #[tokio::test]
5252 async fn http_proxy_routes_requests_through_configured_proxy() {
5253 reset_http_state();
5254 let listener = TcpListener::bind("127.0.0.1:0").expect("bind proxy listener");
5255 let proxy_addr = listener.local_addr().expect("proxy addr");
5256 let proxy_thread = std::thread::spawn(move || {
5257 let mut stream = accept_http_connection(&listener, "proxy listener");
5258 let request = read_http_request(&mut stream);
5259 assert_eq!(request.method, "GET");
5260 assert_eq!(request.path, "http://example.invalid/proxy-check");
5261 assert_eq!(
5262 request
5263 .headers
5264 .get("proxy-authorization")
5265 .map(String::as_str),
5266 Some("Basic dXNlcjpwYXNz")
5267 );
5268 write_http_response(
5269 &mut stream,
5270 200,
5271 &[("content-type", "text/plain".to_string())],
5272 "proxied",
5273 );
5274 });
5275
5276 let options = BTreeMap::from([
5277 (
5278 "proxy".to_string(),
5279 VmValue::String(Rc::from(format!("http://{proxy_addr}"))),
5280 ),
5281 (
5282 "proxy_auth".to_string(),
5283 VmValue::Dict(Rc::new(BTreeMap::from([
5284 ("user".to_string(), VmValue::String(Rc::from("user"))),
5285 ("pass".to_string(), VmValue::String(Rc::from("pass"))),
5286 ]))),
5287 ),
5288 ("timeout_ms".to_string(), VmValue::Int(1_000)),
5289 ]);
5290
5291 let response =
5292 vm_execute_http_request("GET", "http://example.invalid/proxy-check", &options)
5293 .await
5294 .expect("proxied response");
5295 let response = response.as_dict().expect("response dict");
5296 assert_eq!(response["status"].as_int(), Some(200));
5297 assert_eq!(response["body"].display(), "proxied");
5298 proxy_thread.join().expect("proxy thread");
5299 reset_http_state();
5300 }
5301
5302 #[tokio::test]
5303 async fn custom_tls_ca_bundle_and_pin_allow_request() {
5304 reset_http_state();
5305 install_rustls_provider();
5306 let temp = TempDir::new().expect("tempdir");
5307 let cert =
5308 generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5309 .expect("generate cert");
5310 let cert_pem = cert.cert.pem();
5311 let cert_path = temp.path().join("cert.pem");
5312 std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5313 let pin = spki_pin_base64(cert.cert.der().as_ref());
5314
5315 let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5316 let port = listener.local_addr().expect("tls addr").port();
5317 let server_config = Arc::new(
5318 ServerConfig::builder()
5319 .with_no_client_auth()
5320 .with_single_cert(
5321 vec![cert.cert.der().clone()],
5322 PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5323 )
5324 .expect("build tls config"),
5325 );
5326 let thread = std::thread::spawn(move || {
5327 let (tcp, _) = listener.accept().expect("accept tls client");
5328 let conn = ServerConnection::new(server_config).expect("server connection");
5329 let mut stream = StreamOwned::new(conn, tcp);
5330 let request = read_http_request_generic(&mut stream);
5331 assert!(request.starts_with("GET /secure HTTP/1.1\r\n"));
5332 write_http_response_generic(
5333 &mut stream,
5334 200,
5335 &[("content-type", "text/plain".to_string())],
5336 "secure",
5337 );
5338 });
5339
5340 let options = BTreeMap::from([(
5341 "tls".to_string(),
5342 VmValue::Dict(Rc::new(BTreeMap::from([
5343 (
5344 "ca_bundle_path".to_string(),
5345 VmValue::String(Rc::from(cert_path.display().to_string())),
5346 ),
5347 (
5348 "pinned_sha256".to_string(),
5349 VmValue::List(Rc::new(vec![VmValue::String(Rc::from(pin))])),
5350 ),
5351 ]))),
5352 )]);
5353 let response =
5354 vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5355 .await
5356 .expect("tls request should succeed");
5357 let response = response.as_dict().expect("response dict");
5358 assert_eq!(response["body"].display(), "secure");
5359 thread.join().expect("tls thread");
5360 reset_http_state();
5361 }
5362
5363 #[tokio::test]
5364 async fn custom_tls_pin_mismatch_is_rejected() {
5365 reset_http_state();
5366 install_rustls_provider();
5367 let temp = TempDir::new().expect("tempdir");
5368 let cert =
5369 generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])
5370 .expect("generate cert");
5371 let cert_pem = cert.cert.pem();
5372 let cert_path = temp.path().join("cert.pem");
5373 std::fs::write(&cert_path, cert_pem.as_bytes()).expect("write cert");
5374
5375 let listener = TcpListener::bind("127.0.0.1:0").expect("bind tls listener");
5376 let port = listener.local_addr().expect("tls addr").port();
5377 let server_config = Arc::new(
5378 ServerConfig::builder()
5379 .with_no_client_auth()
5380 .with_single_cert(
5381 vec![cert.cert.der().clone()],
5382 PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into(),
5383 )
5384 .expect("build tls config"),
5385 );
5386 let thread = std::thread::spawn(move || {
5387 let (tcp, _) = listener.accept().expect("accept tls client");
5388 let conn = ServerConnection::new(server_config).expect("server connection");
5389 let mut stream = StreamOwned::new(conn, tcp);
5390 let _ = read_http_request_generic(&mut stream);
5391 write_http_response_generic(
5392 &mut stream,
5393 200,
5394 &[("content-type", "text/plain".to_string())],
5395 "secure",
5396 );
5397 });
5398
5399 let options = BTreeMap::from([(
5400 "tls".to_string(),
5401 VmValue::Dict(Rc::new(BTreeMap::from([
5402 (
5403 "ca_bundle_path".to_string(),
5404 VmValue::String(Rc::from(cert_path.display().to_string())),
5405 ),
5406 (
5407 "pinned_sha256".to_string(),
5408 VmValue::List(Rc::new(vec![VmValue::String(Rc::from("deadbeef"))])),
5409 ),
5410 ]))),
5411 )]);
5412 let error =
5413 vm_execute_http_request("GET", &format!("https://localhost:{port}/secure"), &options)
5414 .await
5415 .expect_err("pin mismatch should fail");
5416 let message = error.to_string();
5417 assert!(message.contains("TLS SPKI pin mismatch"), "{message}");
5418 thread.join().expect("tls thread");
5419 reset_http_state();
5420 }
5421
5422 fn install_rustls_provider() {
5423 static INSTALL: Once = Once::new();
5424 INSTALL.call_once(|| {
5425 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
5426 });
5427 }
5428
5429 fn spki_pin_base64(cert_der: &[u8]) -> String {
5430 let (_, cert) = X509Certificate::from_der(cert_der).expect("parse cert");
5431 base64::engine::general_purpose::STANDARD
5432 .encode(Sha256::digest(cert.tbs_certificate.subject_pki.raw))
5433 }
5434
5435 fn read_http_request_generic<T: Read>(stream: &mut T) -> String {
5436 let mut buffer = Vec::new();
5437 let mut chunk = [0u8; 4096];
5438 loop {
5439 let read = stream.read(&mut chunk).expect("read request");
5440 assert!(read > 0, "request closed before headers");
5441 buffer.extend_from_slice(&chunk[..read]);
5442 if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
5443 return String::from_utf8_lossy(&buffer).into_owned();
5444 }
5445 }
5446 }
5447
5448 fn write_http_response_generic<T: Write>(
5449 stream: &mut T,
5450 status: u16,
5451 headers: &[(&str, String)],
5452 body: &str,
5453 ) {
5454 let mut response = format!(
5455 "HTTP/1.1 {status} OK\r\ncontent-length: {}\r\nconnection: close\r\n",
5456 body.len()
5457 );
5458 for (name, value) in headers {
5459 response.push_str(name);
5460 response.push_str(": ");
5461 response.push_str(value);
5462 response.push_str("\r\n");
5463 }
5464 response.push_str("\r\n");
5465 response.push_str(body);
5466 stream
5467 .write_all(response.as_bytes())
5468 .expect("write response");
5469 stream.flush().expect("flush response");
5470 }
5471
5472 #[test]
5473 fn formats_sse_event_fields_and_multiline_data() {
5474 let frame = vm_sse_event_frame(
5475 &VmValue::Dict(Rc::new(BTreeMap::from([
5476 ("event".to_string(), VmValue::String(Rc::from("progress"))),
5477 ("data".to_string(), VmValue::String(Rc::from("one\ntwo"))),
5478 ("id".to_string(), VmValue::String(Rc::from("evt-1"))),
5479 ("retry_ms".to_string(), VmValue::Int(2500)),
5480 ]))),
5481 &BTreeMap::new(),
5482 )
5483 .expect("event frame");
5484 assert_eq!(
5485 frame,
5486 "id: evt-1\nevent: progress\nretry: 2500\ndata: one\ndata: two\n\n"
5487 );
5488 }
5489
5490 #[test]
5491 fn rejects_sse_event_control_fields_with_newlines() {
5492 let err = vm_sse_event_frame(
5493 &VmValue::Dict(Rc::new(BTreeMap::from([(
5494 "event".to_string(),
5495 VmValue::String(Rc::from("bad\nname")),
5496 )]))),
5497 &BTreeMap::new(),
5498 )
5499 .expect_err("newline should reject");
5500 assert!(err.to_string().contains("event must not contain newlines"));
5501 }
5502
5503 #[test]
5504 fn server_sse_mock_client_observes_heartbeat_disconnect_and_cancel() {
5505 reset_http_state();
5506 let response = vm_sse_server_response(&BTreeMap::from([(
5507 "max_buffered_events".to_string(),
5508 VmValue::Int(4),
5509 )]))
5510 .expect("response");
5511 let stream_id = handle_from_value(&response, "test").expect("handle");
5512
5513 assert!(expect_bool(
5514 vm_sse_server_send(
5515 &stream_id,
5516 &VmValue::Dict(Rc::new(BTreeMap::from([
5517 ("event".to_string(), VmValue::String(Rc::from("progress")),),
5518 ("data".to_string(), VmValue::String(Rc::from("50"))),
5519 ]))),
5520 &BTreeMap::new(),
5521 )
5522 .expect("send")
5523 ));
5524 assert!(expect_bool(
5525 vm_sse_server_heartbeat(&stream_id, Some(&VmValue::String(Rc::from("tick"))))
5526 .expect("heartbeat")
5527 ));
5528
5529 let first = vm_sse_server_mock_receive(&stream_id).expect("first");
5530 let first = first.as_dict().expect("first dict");
5531 assert_eq!(first["event"].display(), "progress");
5532 assert_eq!(first["data"].display(), "50");
5533 let heartbeat = vm_sse_server_mock_receive(&stream_id).expect("heartbeat read");
5534 let heartbeat = heartbeat.as_dict().expect("heartbeat dict");
5535 assert_eq!(heartbeat["type"].display(), "comment");
5536 assert_eq!(heartbeat["comment"].display(), "tick");
5537
5538 assert!(expect_bool(
5539 vm_sse_server_mock_disconnect(&stream_id).expect("disconnect")
5540 ));
5541 assert!(expect_bool(
5542 vm_sse_server_observed_bool(&stream_id, "test", |handle| handle.disconnected)
5543 .expect("observed")
5544 ));
5545 assert!(!expect_bool(
5546 vm_sse_server_send(
5547 &stream_id,
5548 &VmValue::String(Rc::from("late")),
5549 &BTreeMap::new()
5550 )
5551 .expect("late send")
5552 ));
5553
5554 let cancelled = vm_sse_server_response(&BTreeMap::new()).expect("cancelled response");
5555 let cancelled_id = handle_from_value(&cancelled, "test").expect("cancelled handle");
5556 assert!(expect_bool(
5557 vm_sse_server_cancel(&cancelled_id, Some(&VmValue::String(Rc::from("stop"))))
5558 .expect("cancel")
5559 ));
5560 assert!(expect_bool(
5561 vm_sse_server_observed_bool(&cancelled_id, "test", |handle| handle.cancelled)
5562 .expect("cancelled observed")
5563 ));
5564 reset_http_state();
5565 }
5566
5567 #[test]
5568 fn server_sse_rejects_oversized_events() {
5569 reset_http_state();
5570 let response = vm_sse_server_response(&BTreeMap::from([(
5571 "max_event_bytes".to_string(),
5572 VmValue::Int(12),
5573 )]))
5574 .expect("response");
5575 let stream_id = handle_from_value(&response, "test").expect("handle");
5576 let err = vm_sse_server_send(
5577 &stream_id,
5578 &VmValue::String(Rc::from("this is too large")),
5579 &BTreeMap::new(),
5580 )
5581 .expect_err("oversized event should reject");
5582 assert!(err.to_string().contains("max_event_bytes"));
5583 reset_http_state();
5584 }
5585}