Skip to main content

secure_exec_sidecar/
stdio.rs

1use crate::wire::{
2    self, AuthenticatedResponse, ExtEnvelope, OwnershipScope, ProtocolCodecError, ProtocolFrame,
3    RequestFrame, RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
4    SidecarResponseFrame, WireDispatchResult, WireFrameCodec,
5};
6use crate::{
7    Extension, ExtensionInterruptRequest, NativeSidecar, NativeSidecarConfig, SidecarError,
8    SidecarRequestTransport,
9};
10use secure_exec_bridge::queue_tracker::{tracked_sync_channel, TrackedLimit, TrackedSyncSender};
11use secure_exec_bridge::{
12    BridgeTypes, ChmodRequest, ClockBridge, ClockRequest, CommandPermissionRequest,
13    CreateDirRequest, CreateJavascriptContextRequest, CreateWasmContextRequest, DiagnosticRecord,
14    DirectoryEntry, EnvironmentPermissionRequest, EventBridge, ExecutionBridge, ExecutionEvent,
15    ExecutionHandleRequest, FileMetadata, FilesystemBridge, FilesystemPermissionRequest,
16    FilesystemSnapshot, FlushFilesystemStateRequest, GuestContextHandle, KillExecutionRequest,
17    LifecycleEventRecord, LoadFilesystemStateRequest, LogRecord, NetworkPermissionRequest,
18    PathRequest, PermissionBridge, PermissionDecision, PersistenceBridge,
19    PollExecutionEventRequest, RandomBridge, RandomBytesRequest, ReadDirRequest, ReadFileRequest,
20    RenameRequest, ScheduleTimerRequest, ScheduledTimer, StartExecutionRequest, StartedExecution,
21    StructuredEventRecord, SymlinkRequest, TruncateRequest, WriteExecutionStdinRequest,
22    WriteFileRequest,
23};
24use std::collections::{BTreeMap, BTreeSet};
25use std::error::Error;
26use std::fmt;
27use std::fs::{self, OpenOptions};
28use std::io::{self, Read, Write};
29use std::os::unix::fs::{symlink as create_symlink, MetadataExt, PermissionsExt};
30use std::path::{Path, PathBuf};
31use std::sync::{mpsc, Arc, Mutex};
32use std::thread;
33use std::time::{Duration, Instant, SystemTime};
34use tokio::sync::mpsc::{channel, unbounded_channel, Receiver};
35use tokio::time;
36
37// Guest sync fs/module RPCs are serviced by `pump_process_events` on this timer,
38// so a blocked guest call waits up to one interval before the host even sees it.
39// At 5ms this dominated per-call latency (~5ms/stat); 250us cuts it ~11x (stat
40// 7.5s -> ~0.65s over 1500 ops) and the sub-ms tokio timer is honored. Idle
41// pumps are cheap no-ops (try_recv + zero-timeout poll), so the higher cadence
42// costs negligible CPU when no guest is issuing RPCs.
43const EVENT_PUMP_INTERVAL: Duration = Duration::from_micros(250);
44const MAX_STDIN_FRAME_QUEUE: usize = 128;
45const MAX_EVENT_READY_QUEUE: usize = 1;
46// Defense-in-depth headroom for the host-bound frame queue: a burst of output
47// frames from a busy turn should be buffered, so the writer only backpressures
48// when the host genuinely stops reading stdout rather than on every spike.
49const MAX_STDOUT_FRAME_QUEUE: usize = 4096;
50
51#[cfg(test)]
52fn request_frame(
53    request_id: RequestId,
54    ownership: OwnershipScope,
55    payload: RequestPayload,
56) -> RequestFrame {
57    RequestFrame {
58        schema: wire::protocol_schema(),
59        request_id,
60        ownership,
61        payload,
62    }
63}
64
65fn response_frame(
66    request_id: RequestId,
67    ownership: OwnershipScope,
68    payload: ResponsePayload,
69) -> ResponseFrame {
70    ResponseFrame {
71        schema: wire::protocol_schema(),
72        request_id,
73        ownership,
74        payload,
75    }
76}
77
78#[cfg(test)]
79fn connection_ownership(connection_id: &str) -> OwnershipScope {
80    OwnershipScope::ConnectionOwnership(wire::ConnectionOwnership {
81        connection_id: connection_id.to_owned(),
82    })
83}
84
85fn session_ownership(connection_id: &str, session_id: &str) -> OwnershipScope {
86    OwnershipScope::SessionOwnership(wire::SessionOwnership {
87        connection_id: connection_id.to_owned(),
88        session_id: session_id.to_owned(),
89    })
90}
91
92#[cfg(test)]
93fn vm_ownership(connection_id: &str, session_id: &str, vm_id: &str) -> OwnershipScope {
94    OwnershipScope::VmOwnership(wire::VmOwnership {
95        connection_id: connection_id.to_owned(),
96        session_id: session_id.to_owned(),
97        vm_id: vm_id.to_owned(),
98    })
99}
100
101fn wire_protocol_error(error: ProtocolCodecError) -> SidecarError {
102    SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
103}
104
105pub fn run() -> Result<(), Box<dyn Error>> {
106    run_with_extensions(Vec::new())
107}
108
109pub fn run_with_extensions(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
110    // Initialize the embedded V8 runtime + platform now, on the long-lived main
111    // thread, so it is never first-initialized on a transient worker thread (e.g. a
112    // VM-create snapshot pre-warm thread that then exits — which corrupts V8's
113    // platform and wedges later isolate creation). Best-effort.
114    if let Err(error) = secure_exec_execution::v8_host::ensure_runtime_initialized() {
115        eprintln!("embedded V8 runtime init failed at startup: {error}");
116    }
117    tokio::runtime::Builder::new_current_thread()
118        .enable_all()
119        .build()?
120        .block_on(run_async(extensions))
121}
122
123async fn run_async(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
124    let config = NativeSidecarConfig {
125        compile_cache_root: Some(default_compile_cache_root()),
126        ..NativeSidecarConfig::default()
127    };
128    let codec = WireFrameCodec::new(config.max_frame_bytes);
129    let mut sidecar =
130        NativeSidecar::with_config_and_extensions(LocalBridge::default(), config, extensions)?;
131    let mut active_sessions = BTreeSet::<SessionScope>::new();
132    let mut active_connections = BTreeSet::<String>::new();
133    let (stdin_tx, mut stdin_rx) =
134        channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
135    let stdin_gauge = secure_exec_bridge::queue_tracker::register_queue(
136        TrackedLimit::SidecarStdinFrames,
137        MAX_STDIN_FRAME_QUEUE,
138    );
139    let (event_ready_tx, mut event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
140    let (write_tx, write_rx) = tracked_sync_channel::<ProtocolFrame>(
141        TrackedLimit::SidecarStdoutFrames,
142        MAX_STDOUT_FRAME_QUEUE,
143    );
144    let (write_error_tx, mut write_error_rx) = unbounded_channel::<String>();
145
146    // Forward limit-registry near-capacity warnings to the host: the global sink
147    // fires (edge-triggered, from arbitrary threads) into this channel, and the
148    // event loop below drains it and emits a `StructuredEvent` (name
149    // "limit_warning"). The unbounded sender is Send+Sync and lives for the whole
150    // process inside the global handler, so the receiver never sees a hangup.
151    let (limit_warning_tx, mut limit_warning_rx) =
152        unbounded_channel::<secure_exec_bridge::queue_tracker::LimitWarning>();
153    secure_exec_bridge::queue_tracker::set_limit_warning_handler(Box::new(move |warning| {
154        let _ = limit_warning_tx.send(warning.clone());
155    }));
156    let callback_transport = Arc::new(FrameSidecarRequestTransport::new(write_tx.clone()));
157    sidecar.set_sidecar_request_transport(callback_transport.clone());
158    let mut event_pump = time::interval(EVENT_PUMP_INTERVAL);
159    let writer_codec = codec.clone();
160    let reader_codec = codec.clone();
161    let writer_error_tx = write_error_tx.clone();
162    thread::spawn(move || {
163        let mut writer = io::BufWriter::new(io::stdout());
164        while let Ok(frame) = write_rx.recv() {
165            if let Err(error) = write_frame(&writer_codec, &mut writer, &frame) {
166                let _ = writer_error_tx.send(error.to_string());
167                break;
168            }
169        }
170    });
171
172    thread::spawn({
173        let callback_transport = callback_transport.clone();
174        let read_error_tx = write_error_tx.clone();
175        move || {
176            let mut stdin = io::stdin();
177            loop {
178                let frame = match read_frame(&reader_codec, &mut stdin) {
179                    Ok(Some(ProtocolFrame::SidecarResponseFrame(response))) => {
180                        if callback_transport.accept_response(response.clone()) {
181                            continue;
182                        }
183                        Ok(Some(ProtocolFrame::SidecarResponseFrame(response)))
184                    }
185                    Ok(Some(frame)) => Ok(Some(frame)),
186                    other => other,
187                }
188                .map_err(|error: Box<dyn Error>| error.to_string());
189                let should_stop = matches!(frame, Ok(None) | Err(_));
190                match enqueue_stdin_frame(&stdin_tx, frame) {
191                    Ok(()) => {
192                        // Sample inbound queue depth so the centralized tracker
193                        // can warn before host requests back up on the sidecar.
194                        stdin_gauge.observe_depth(
195                            stdin_tx.max_capacity().saturating_sub(stdin_tx.capacity()),
196                        );
197                    }
198                    Err(StdinFrameQueueError::Full(message)) => {
199                        let _ = read_error_tx.send(message);
200                        break;
201                    }
202                    Err(StdinFrameQueueError::Closed) => break,
203                }
204                if should_stop {
205                    break;
206                }
207            }
208        }
209    });
210
211    flush_sidecar_requests(&mut sidecar, &write_tx)?;
212    let mut pending_frame: Option<ProtocolFrame> = None;
213    let mut limit_warning_closed = false;
214
215    loop {
216        if let Some(frame) = pending_frame.take() {
217            handle_protocol_frame(
218                frame,
219                &mut sidecar,
220                &mut stdin_rx,
221                &mut pending_frame,
222                &write_tx,
223                &mut active_sessions,
224                &mut active_connections,
225            )
226            .await?;
227            continue;
228        }
229
230        tokio::select! {
231            maybe_frame = stdin_rx.recv() => {
232                let Some(frame) = maybe_frame else {
233                    break;
234                };
235                let Some(frame) = frame.map_err(io::Error::other)? else {
236                    break;
237                };
238                handle_protocol_frame(
239                    frame,
240                    &mut sidecar,
241                    &mut stdin_rx,
242                    &mut pending_frame,
243                    &write_tx,
244                    &mut active_sessions,
245                    &mut active_connections,
246                ).await?;
247            }
248            maybe_warning = limit_warning_rx.recv(), if !limit_warning_closed => {
249                match maybe_warning {
250                    Some(warning) => {
251                        // A limit warning is process-global; deliver it ONCE. The
252                        // stdio transport is single-client, so emit it to the first
253                        // active connection (if any) rather than fanning out a copy
254                        // per connection. Dropped if no client has authenticated yet
255                        // (only the tracing log survives, which is acceptable).
256                        if let Some(connection_id) = active_connections.iter().next() {
257                            let mut detail = std::collections::HashMap::new();
258                            detail.insert(String::from("limit"), warning.name.as_str().to_string());
259                            detail.insert(
260                                String::from("category"),
261                                warning.category.as_str().to_string(),
262                            );
263                            detail.insert(String::from("observed"), warning.observed.to_string());
264                            detail.insert(String::from("capacity"), warning.capacity.to_string());
265                            detail.insert(
266                                String::from("fillPercent"),
267                                warning.fill_percent.to_string(),
268                            );
269                            let frame = crate::service::structured_event_frame(
270                                connection_id,
271                                "limit_warning",
272                                detail,
273                            )?;
274                            send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
275                        }
276                    }
277                    None => {
278                        // Sender dropped (only possible if another sidecar replaced
279                        // the global handler in-process). Disarm this branch so the
280                        // select! does not hot-spin on an always-ready closed
281                        // receiver; do NOT break — that would tear down the sidecar.
282                        limit_warning_closed = true;
283                    }
284                }
285            }
286            maybe_ready = event_ready_rx.recv() => {
287                let Some(()) = maybe_ready else {
288                    break;
289                };
290                loop {
291                    let mut emitted_frame = false;
292                    for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
293                        if let Some(frame) = sidecar
294                            .poll_event_wire(&session.ownership_scope(), Duration::ZERO)
295                            .await?
296                        {
297                            send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
298                            emitted_frame = true;
299                        }
300                    }
301
302                    if !emitted_frame {
303                        break;
304                    }
305                }
306                flush_sidecar_requests(&mut sidecar, &write_tx)?;
307            }
308            _ = event_pump.tick() => {
309                for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
310                    if sidecar.pump_process_events(&session.compat_ownership_scope()).await? {
311                        let _ = event_ready_tx.try_send(());
312                    }
313                }
314                flush_sidecar_requests(&mut sidecar, &write_tx)?;
315            }
316            maybe_write_error = write_error_rx.recv() => {
317                if let Some(error) = maybe_write_error {
318                    return Err(io::Error::new(io::ErrorKind::BrokenPipe, error).into());
319                }
320            }
321        }
322    }
323
324    cleanup_connections(&mut sidecar, &active_connections).await;
325    Ok(())
326}
327
328async fn handle_protocol_frame(
329    frame: ProtocolFrame,
330    sidecar: &mut NativeSidecar<LocalBridge>,
331    stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
332    pending_frame: &mut Option<ProtocolFrame>,
333    write_tx: &TrackedSyncSender<ProtocolFrame>,
334    active_sessions: &mut BTreeSet<SessionScope>,
335    active_connections: &mut BTreeSet<String>,
336) -> Result<(), Box<dyn Error>> {
337    match frame {
338        ProtocolFrame::RequestFrame(request) => {
339            let (dispatch, extra_responses) =
340                dispatch_with_prompt_interrupt(sidecar, request.clone(), stdin_rx, pending_frame)
341                    .await?;
342            track_session_state(
343                &dispatch.response.payload,
344                active_sessions,
345                active_connections,
346            );
347
348            send_output_frame(write_tx, ProtocolFrame::ResponseFrame(dispatch.response))?;
349            for response in extra_responses {
350                send_output_frame(write_tx, ProtocolFrame::ResponseFrame(response))?;
351            }
352            for event in dispatch.events {
353                send_output_frame(write_tx, ProtocolFrame::EventFrame(event))?;
354            }
355            flush_sidecar_requests(sidecar, write_tx)?;
356        }
357        ProtocolFrame::SidecarResponseFrame(response) => {
358            sidecar.accept_wire_sidecar_response(response)?;
359            flush_sidecar_requests(sidecar, write_tx)?;
360        }
361        other => {
362            return Err(format!(
363                "expected request or sidecar_response frame on stdin, received {}",
364                frame_kind(&other)
365            )
366            .into());
367        }
368    }
369    Ok(())
370}
371
372async fn dispatch_with_prompt_interrupt(
373    sidecar: &mut NativeSidecar<LocalBridge>,
374    request: RequestFrame,
375    stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
376    pending_frame: &mut Option<ProtocolFrame>,
377) -> Result<(WireDispatchResult, Vec<ResponseFrame>), Box<dyn Error>> {
378    let Some(blocking_request) = blocking_extension_request(sidecar, &request) else {
379        return Ok((sidecar.dispatch_wire(request).await?, Vec::new()));
380    };
381
382    let mut dispatch = Box::pin(sidecar.dispatch_wire(request.clone()));
383    tokio::select! {
384        result = dispatch.as_mut() => Ok((result?, Vec::new())),
385        maybe_frame = stdin_rx.recv() => {
386            let frame = decode_stdin_frame(maybe_frame)?;
387            if let Some(frame) = frame {
388                if let Some(interrupt) = extension_interrupt_response(&blocking_request, &request, &frame) {
389                    drop(dispatch);
390                    let mut extra_responses = Vec::new();
391                    if let Some(response) = interrupt.interrupting_response {
392                        extra_responses.push(response);
393                    } else {
394                        *pending_frame = Some(frame);
395                    }
396                    return Ok((interrupt.interrupted_dispatch, extra_responses));
397                }
398                *pending_frame = Some(frame);
399            }
400            Ok((dispatch.await?, Vec::new()))
401        }
402    }
403}
404
405fn decode_stdin_frame(
406    maybe_frame: Option<Result<Option<ProtocolFrame>, String>>,
407) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
408    let Some(frame) = maybe_frame else {
409        return Ok(None);
410    };
411    Ok(frame.map_err(io::Error::other)?)
412}
413
414struct BlockingExtensionRequest {
415    namespace: String,
416    payload: Vec<u8>,
417    extension: Arc<dyn Extension>,
418}
419
420struct ExtensionInterruptDispatch {
421    interrupted_dispatch: WireDispatchResult,
422    interrupting_response: Option<ResponseFrame>,
423}
424
425fn blocking_extension_request(
426    sidecar: &NativeSidecar<LocalBridge>,
427    request: &RequestFrame,
428) -> Option<BlockingExtensionRequest> {
429    let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
430        return None;
431    };
432    let extension = sidecar.extensions.get(&envelope.namespace)?.clone();
433    if !extension.is_blocking_request(&envelope.payload) {
434        return None;
435    }
436    Some(BlockingExtensionRequest {
437        namespace: envelope.namespace.clone(),
438        payload: envelope.payload.clone(),
439        extension,
440    })
441}
442
443fn extension_interrupt_response(
444    blocking_request: &BlockingExtensionRequest,
445    active_request: &RequestFrame,
446    frame: &ProtocolFrame,
447) -> Option<ExtensionInterruptDispatch> {
448    match frame {
449        ProtocolFrame::RequestFrame(request) => {
450            if request.ownership != active_request.ownership {
451                return None;
452            }
453            let interrupt = match &request.payload {
454                RequestPayload::ExtEnvelope(envelope)
455                    if envelope.namespace == blocking_request.namespace =>
456                {
457                    blocking_request.extension.interrupt_blocking_request(
458                        &blocking_request.payload,
459                        ExtensionInterruptRequest::ExtensionPayload(&envelope.payload),
460                    )?
461                }
462                RequestPayload::ExtEnvelope(_) => return None,
463                RequestPayload::KillProcessRequest(_) => {
464                    blocking_request.extension.interrupt_blocking_request(
465                        &blocking_request.payload,
466                        ExtensionInterruptRequest::KillProcess,
467                    )?
468                }
469                // Control-plane setup, inspection, filesystem, process plumbing, and
470                // persistence requests run concurrently with an in-flight prompt and
471                // must not interrupt it. DisposeVm is deliberately non-interrupting for
472                // now; see the todo entry about dispose racing a blocked prompt.
473                RequestPayload::AuthenticateRequest(_)
474                | RequestPayload::OpenSessionRequest(_)
475                | RequestPayload::CreateVmRequest(_)
476                | RequestPayload::DisposeVmRequest(_)
477                | RequestPayload::BootstrapRootFilesystemRequest(_)
478                | RequestPayload::ConfigureVmRequest(_)
479                | RequestPayload::RegisterHostCallbacksRequest(_)
480                | RequestPayload::CreateLayerRequest
481                | RequestPayload::SealLayerRequest(_)
482                | RequestPayload::ImportSnapshotRequest(_)
483                | RequestPayload::ExportSnapshotRequest(_)
484                | RequestPayload::CreateOverlayRequest(_)
485                | RequestPayload::GuestFilesystemCallRequest(_)
486                | RequestPayload::SnapshotRootFilesystemRequest
487                | RequestPayload::ExecuteRequest(_)
488                | RequestPayload::WriteStdinRequest(_)
489                | RequestPayload::CloseStdinRequest(_)
490                | RequestPayload::GetProcessSnapshotRequest
491                | RequestPayload::FindListenerRequest(_)
492                | RequestPayload::FindBoundUdpRequest(_)
493                | RequestPayload::VmFetchRequest(_)
494                | RequestPayload::GetSignalStateRequest(_)
495                | RequestPayload::GetZombieTimerCountRequest
496                | RequestPayload::HostFilesystemCallRequest(_)
497                | RequestPayload::PersistenceLoadRequest(_)
498                | RequestPayload::PersistenceFlushRequest(_) => return None,
499            };
500            let interrupted_dispatch = interrupted_extension_dispatch(
501                active_request,
502                &blocking_request.namespace,
503                interrupt.interrupted_response_payload,
504            );
505            let interrupting_response = interrupt.interrupting_response_payload.map(|payload| {
506                response_frame(
507                    request.request_id,
508                    request.ownership.clone(),
509                    ResponsePayload::ExtEnvelope(ExtEnvelope {
510                        namespace: blocking_request.namespace.clone(),
511                        payload,
512                    }),
513                )
514            });
515            Some(ExtensionInterruptDispatch {
516                interrupted_dispatch,
517                interrupting_response,
518            })
519        }
520        // Response, Event, and SidecarRequest frames are sidecar-to-host only. If one
521        // arrives on stdin it is requeued and rejected as a protocol error by
522        // handle_protocol_frame, so it must not synthesize a cancelled prompt first.
523        // SidecarResponse frames answer sidecar-initiated callbacks and may be the very
524        // response the blocked prompt dispatch is waiting on, so they never interrupt.
525        ProtocolFrame::ResponseFrame(_)
526        | ProtocolFrame::EventFrame(_)
527        | ProtocolFrame::SidecarRequestFrame(_)
528        | ProtocolFrame::SidecarResponseFrame(_) => None,
529    }
530}
531
532fn interrupted_extension_dispatch(
533    request: &RequestFrame,
534    namespace: &str,
535    payload: Vec<u8>,
536) -> WireDispatchResult {
537    match &request.payload {
538        RequestPayload::ExtEnvelope(_) => {
539            let response = ResponsePayload::ExtEnvelope(ExtEnvelope {
540                namespace: namespace.to_string(),
541                payload,
542            });
543            WireDispatchResult {
544                response: response_frame(request.request_id, request.ownership.clone(), response),
545                events: Vec::new(),
546            }
547        }
548        RequestPayload::AuthenticateRequest(_)
549        | RequestPayload::OpenSessionRequest(_)
550        | RequestPayload::CreateVmRequest(_)
551        | RequestPayload::DisposeVmRequest(_)
552        | RequestPayload::BootstrapRootFilesystemRequest(_)
553        | RequestPayload::ConfigureVmRequest(_)
554        | RequestPayload::RegisterHostCallbacksRequest(_)
555        | RequestPayload::CreateLayerRequest
556        | RequestPayload::SealLayerRequest(_)
557        | RequestPayload::ImportSnapshotRequest(_)
558        | RequestPayload::ExportSnapshotRequest(_)
559        | RequestPayload::CreateOverlayRequest(_)
560        | RequestPayload::GuestFilesystemCallRequest(_)
561        | RequestPayload::SnapshotRootFilesystemRequest
562        | RequestPayload::ExecuteRequest(_)
563        | RequestPayload::WriteStdinRequest(_)
564        | RequestPayload::CloseStdinRequest(_)
565        | RequestPayload::KillProcessRequest(_)
566        | RequestPayload::GetProcessSnapshotRequest
567        | RequestPayload::FindListenerRequest(_)
568        | RequestPayload::FindBoundUdpRequest(_)
569        | RequestPayload::VmFetchRequest(_)
570        | RequestPayload::GetSignalStateRequest(_)
571        | RequestPayload::GetZombieTimerCountRequest
572        | RequestPayload::HostFilesystemCallRequest(_)
573        | RequestPayload::PersistenceLoadRequest(_)
574        | RequestPayload::PersistenceFlushRequest(_) => {
575            unreachable!("interrupted extension dispatch requires an extension request");
576        }
577    }
578}
579
580async fn cleanup_connections(
581    sidecar: &mut NativeSidecar<LocalBridge>,
582    active_connections: &BTreeSet<String>,
583) {
584    for connection_id in active_connections {
585        let _ = sidecar.remove_connection(connection_id).await;
586    }
587}
588
589fn track_session_state(
590    payload: &ResponsePayload,
591    active_sessions: &mut BTreeSet<SessionScope>,
592    active_connections: &mut BTreeSet<String>,
593) {
594    match payload {
595        ResponsePayload::AuthenticatedResponse(AuthenticatedResponse { connection_id, .. }) => {
596            active_connections.insert(connection_id.clone());
597        }
598        ResponsePayload::SessionOpenedResponse(SessionOpenedResponse {
599            session_id,
600            owner_connection_id,
601        }) => {
602            active_sessions.insert(SessionScope {
603                connection_id: owner_connection_id.clone(),
604                session_id: session_id.clone(),
605            });
606        }
607        _ => {}
608    }
609}
610
611fn read_frame(
612    codec: &WireFrameCodec,
613    reader: &mut impl Read,
614) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
615    let mut prefix = [0u8; 4];
616    match reader.read_exact(&mut prefix) {
617        Ok(()) => {}
618        Err(error) if error.kind() == io::ErrorKind::UnexpectedEof => {
619            return Ok(None);
620        }
621        Err(error) => return Err(error.into()),
622    }
623
624    let declared_len = u32::from_be_bytes(prefix) as usize;
625    if declared_len > codec.max_frame_bytes() {
626        return Err(ProtocolCodecError::FrameTooLarge {
627            size: declared_len,
628            max: codec.max_frame_bytes(),
629        }
630        .into());
631    }
632    let total_len = prefix.len().saturating_add(declared_len);
633    let mut bytes = Vec::with_capacity(total_len);
634    bytes.extend_from_slice(&prefix);
635    bytes.resize(total_len, 0);
636    reader.read_exact(&mut bytes[prefix.len()..])?;
637
638    Ok(Some(codec.decode(&bytes)?))
639}
640
641fn write_frame(
642    codec: &WireFrameCodec,
643    writer: &mut impl Write,
644    frame: &ProtocolFrame,
645) -> Result<(), Box<dyn Error>> {
646    let bytes = codec.encode(frame)?;
647    writer.write_all(&bytes)?;
648    writer.flush()?;
649    Ok(())
650}
651
652fn frame_kind(frame: &ProtocolFrame) -> &'static str {
653    match frame {
654        ProtocolFrame::RequestFrame(_) => "request",
655        ProtocolFrame::ResponseFrame(_) => "response",
656        ProtocolFrame::EventFrame(_) => "event",
657        ProtocolFrame::SidecarRequestFrame(_) => "sidecar_request",
658        ProtocolFrame::SidecarResponseFrame(_) => "sidecar_response",
659    }
660}
661
662#[derive(Debug, Clone, PartialEq, Eq)]
663enum StdinFrameQueueError {
664    Full(String),
665    Closed,
666}
667
668fn enqueue_stdin_frame(
669    sender: &tokio::sync::mpsc::Sender<Result<Option<ProtocolFrame>, String>>,
670    frame: Result<Option<ProtocolFrame>, String>,
671) -> Result<(), StdinFrameQueueError> {
672    sender.try_send(frame).map_err(|error| match error {
673        tokio::sync::mpsc::error::TrySendError::Full(_) => StdinFrameQueueError::Full(format!(
674            "stdin frame queue exceeded {MAX_STDIN_FRAME_QUEUE} pending frames"
675        )),
676        tokio::sync::mpsc::error::TrySendError::Closed(_) => StdinFrameQueueError::Closed,
677    })
678}
679
680fn flush_sidecar_requests(
681    sidecar: &mut NativeSidecar<LocalBridge>,
682    writer: &TrackedSyncSender<ProtocolFrame>,
683) -> Result<(), Box<dyn Error>> {
684    while let Some(request) = sidecar.pop_wire_sidecar_request()? {
685        send_output_frame(writer, ProtocolFrame::SidecarRequestFrame(request))?;
686    }
687    Ok(())
688}
689
690fn send_output_frame(
691    writer: &TrackedSyncSender<ProtocolFrame>,
692    frame: ProtocolFrame,
693) -> Result<(), io::Error> {
694    // Apply backpressure rather than killing the sidecar when the host reads
695    // stdout slowly. A full queue means the dedicated writer thread is blocked on
696    // the stdout pipe (the host has not drained it yet) — a transient, recoverable
697    // condition. Previously `try_send` turned that backlog into a `BrokenPipe`
698    // error that propagated up and exited the whole sidecar process (code 1),
699    // taking every session with it. A blocking `send` parks the producer until the
700    // writer drains a slot, which transitively backpressures the V8 event bridge
701    // and the guest. It never deadlocks: the writer thread runs independently, and
702    // if it dies (real broken pipe) the receiver is dropped and `send` returns
703    // `Disconnected`, which we still surface as a terminal `BrokenPipe`.
704    writer.send(frame).map_err(|_disconnected| {
705        io::Error::new(io::ErrorKind::BrokenPipe, "stdout writer disconnected")
706    })
707}
708
709fn default_compile_cache_root() -> PathBuf {
710    // Stable across sidecar processes so V8 compile-cache (cachedData) survives a
711    // fresh sidecar/VM and benefits cold starts. Previously keyed by PID, which
712    // gave every process an empty cache — cold module imports never reused
713    // compiled bytecode. Entries are namespaced+validated downstream by
714    // `stable_compile_cache_namespace_hash` + V8's source/version checks, so a
715    // shared root is safe; stale or mismatched entries are simply ignored.
716    std::env::temp_dir().join("secure-exec-sidecar-compile-cache")
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722    use crate::wire::{AuthenticateRequest, KillProcessRequest};
723    use crate::{ExtensionContext, ExtensionFuture, ExtensionInterruptResponse, ExtensionResponse};
724    use std::io::Cursor;
725
726    const TEST_EXTENSION_NAMESPACE: &str = "dev.rivet.secure-exec.test.blocking";
727
728    #[test]
729    fn read_frame_rejects_oversized_prefix_before_allocating_payload() {
730        let codec = WireFrameCodec::new(16);
731        let mut reader = Cursor::new((32_u32).to_be_bytes().to_vec());
732
733        let error = read_frame(&codec, &mut reader).expect_err("oversized frame should fail");
734        let error = error
735            .downcast::<ProtocolCodecError>()
736            .expect("protocol codec error");
737        assert!(matches!(
738            *error,
739            ProtocolCodecError::FrameTooLarge { size: 32, max: 16 }
740        ));
741    }
742
743    #[test]
744    fn stdio_work_queues_are_bounded() {
745        let (stdin_tx, _stdin_rx) =
746            channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
747        for _ in 0..MAX_STDIN_FRAME_QUEUE {
748            enqueue_stdin_frame(&stdin_tx, Ok(None))
749                .expect("stdin frame queue should accept capacity");
750        }
751        assert!(matches!(
752            enqueue_stdin_frame(&stdin_tx, Ok(None)),
753            Err(StdinFrameQueueError::Full(_))
754        ));
755
756        let (event_ready_tx, _event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
757        event_ready_tx
758            .try_send(())
759            .expect("event-ready queue should accept capacity");
760        assert!(matches!(
761            event_ready_tx.try_send(()),
762            Err(tokio::sync::mpsc::error::TrySendError::Full(_))
763        ));
764    }
765
766    // Regression: a full stdout frame queue must apply backpressure (block the
767    // producer until the writer drains a slot), NOT tear the sidecar down. The
768    // old `try_send` turned a slow host reader into a `BrokenPipe` error that
769    // propagated up and exited the whole sidecar process (code 1). Here a slow
770    // drainer forces the queue past capacity; with backpressure every send
771    // succeeds, and overflow only fails when the writer (receiver) is gone.
772    #[test]
773    fn stdout_frame_queue_applies_backpressure_instead_of_crashing() {
774        let queue_frame = |request_id: RequestId| {
775            ProtocolFrame::RequestFrame(request_frame(
776                request_id,
777                connection_ownership("conn-queue"),
778                RequestPayload::AuthenticateRequest(AuthenticateRequest {
779                    client_name: String::from("queue-test"),
780                    auth_token: String::from("token"),
781                    protocol_version: wire::PROTOCOL_VERSION,
782                    bridge_version: secure_exec_bridge::bridge_contract().version,
783                }),
784            ))
785        };
786
787        // Small fixed capacity (independent of the production constant) with a
788        // drainer slow enough that the queue fills and the producer is forced
789        // onto the blocking path. The old try_send path errored on the
790        // (capacity + 1)th frame; backpressure accepts all of them.
791        let queue_cap = 8usize;
792        let total_frames = queue_cap * 3;
793        let (stdout_tx, stdout_rx) =
794            tracked_sync_channel::<ProtocolFrame>(TrackedLimit::SidecarStdoutFrames, queue_cap);
795        let drainer = std::thread::spawn(move || {
796            let mut drained = 0usize;
797            while stdout_rx.recv().is_ok() {
798                drained += 1;
799                std::thread::sleep(std::time::Duration::from_millis(1));
800            }
801            drained
802        });
803
804        for request_id in 0..total_frames {
805            send_output_frame(&stdout_tx, queue_frame(request_id as RequestId))
806                .expect("backpressured stdout queue must accept frames, not crash");
807        }
808        drop(stdout_tx);
809        let drained = drainer.join().expect("drainer thread panicked");
810        assert_eq!(
811            drained, total_frames,
812            "every frame must survive the backpressured queue"
813        );
814
815        // When the writer (receiver) is gone, overflow is genuinely terminal and
816        // still surfaces as a BrokenPipe error rather than blocking forever.
817        let (closed_tx, closed_rx) =
818            tracked_sync_channel::<ProtocolFrame>(TrackedLimit::SidecarStdoutFrames, queue_cap);
819        drop(closed_rx);
820        let error = send_output_frame(&closed_tx, queue_frame(0))
821            .expect_err("send to a dropped writer must error");
822        assert_eq!(error.kind(), io::ErrorKind::BrokenPipe);
823    }
824
825    #[test]
826    fn read_frame_decodes_wire_authenticate_request() {
827        let codec = WireFrameCodec::new(wire::DEFAULT_MAX_FRAME_BYTES);
828        let frame = ProtocolFrame::RequestFrame(request_frame(
829            1,
830            connection_ownership("client-hint"),
831            RequestPayload::AuthenticateRequest(AuthenticateRequest {
832                client_name: "probe".to_string(),
833                auth_token: "probe-token".to_string(),
834                protocol_version: wire::PROTOCOL_VERSION,
835                bridge_version: secure_exec_bridge::bridge_contract().version,
836            }),
837        ));
838        let encoded = codec.encode(&frame).expect("encode wire frame");
839        let mut reader = Cursor::new(encoded);
840
841        let decoded = read_frame(&codec, &mut reader)
842            .expect("decode bare frame")
843            .expect("frame present");
844
845        assert_eq!(decoded, frame);
846    }
847
848    #[test]
849    fn extension_close_interrupts_matching_blocking_request() {
850        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
851        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
852        let close = ProtocolFrame::RequestFrame(test_extension_request_frame(
853            11,
854            ownership,
855            "close:ext-session-1",
856        ));
857
858        let blocking_request = blocking_extension_request(&prompt);
859        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &close)
860            .expect("close should interrupt prompt");
861
862        assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
863        let ResponsePayload::ExtEnvelope(envelope) =
864            interrupt.interrupted_dispatch.response.payload
865        else {
866            panic!("expected extension response");
867        };
868        assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
869        assert_eq!(envelope.payload, b"prompt-cancelled:ext-session-1");
870    }
871
872    #[test]
873    fn extension_cancel_interrupt_gets_synthetic_response() {
874        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
875        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
876        let cancel = ProtocolFrame::RequestFrame(test_extension_request_frame(
877            11,
878            ownership,
879            "cancel:ext-session-1",
880        ));
881
882        let blocking_request = blocking_extension_request(&prompt);
883        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &cancel)
884            .expect("cancel should interrupt prompt");
885        let response = interrupt
886            .interrupting_response
887            .expect("cancel should get a response");
888
889        assert_eq!(response.request_id, 11);
890        let ResponsePayload::ExtEnvelope(envelope) = response.payload else {
891            panic!("expected extension response");
892        };
893        assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
894        assert_eq!(envelope.payload, b"cancelled:ext-session-1");
895    }
896
897    #[test]
898    fn kill_process_interrupts_blocking_extension_request() {
899        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
900        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
901        let kill = ProtocolFrame::RequestFrame(request_frame(
902            11,
903            ownership,
904            RequestPayload::KillProcessRequest(KillProcessRequest {
905                process_id: "adapter-process".to_string(),
906                signal: "SIGTERM".to_string(),
907            }),
908        ));
909
910        let blocking_request = blocking_extension_request(&prompt);
911        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &kill)
912            .expect("kill should interrupt prompt");
913
914        assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
915        assert!(interrupt.interrupting_response.is_none());
916    }
917
918    fn test_extension_request_frame(
919        request_id: RequestId,
920        ownership: OwnershipScope,
921        payload: &str,
922    ) -> RequestFrame {
923        request_frame(
924            request_id,
925            ownership,
926            RequestPayload::ExtEnvelope(ExtEnvelope {
927                namespace: TEST_EXTENSION_NAMESPACE.to_string(),
928                payload: payload.as_bytes().to_vec(),
929            }),
930        )
931    }
932
933    fn blocking_extension_request(request: &RequestFrame) -> BlockingExtensionRequest {
934        let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
935            panic!("expected extension request");
936        };
937        BlockingExtensionRequest {
938            namespace: TEST_EXTENSION_NAMESPACE.to_string(),
939            payload: envelope.payload.clone(),
940            extension: Arc::new(TestBlockingInterruptExtension),
941        }
942    }
943
944    struct TestBlockingInterruptExtension;
945
946    impl Extension for TestBlockingInterruptExtension {
947        fn namespace(&self) -> &str {
948            TEST_EXTENSION_NAMESPACE
949        }
950
951        fn handle_request<'a>(
952            &'a self,
953            _ctx: ExtensionContext<'a>,
954            _payload: Vec<u8>,
955        ) -> ExtensionFuture<'a, ExtensionResponse> {
956            Box::pin(async { Ok(ExtensionResponse::new(Vec::new())) })
957        }
958
959        fn is_blocking_request(&self, payload: &[u8]) -> bool {
960            parse_test_payload(payload).is_some_and(|(kind, _session_id)| kind == "prompt")
961        }
962
963        fn interrupt_blocking_request(
964            &self,
965            blocking_payload: &[u8],
966            interrupt: ExtensionInterruptRequest<'_>,
967        ) -> Option<ExtensionInterruptResponse> {
968            let (blocking_kind, blocking_session_id) = parse_test_payload(blocking_payload)?;
969            if blocking_kind != "prompt" {
970                return None;
971            }
972
973            let interrupted_response_payload =
974                encode_test_response("prompt-cancelled", blocking_session_id);
975            match interrupt {
976                ExtensionInterruptRequest::KillProcess => Some(ExtensionInterruptResponse {
977                    interrupted_response_payload,
978                    interrupting_response_payload: None,
979                }),
980                ExtensionInterruptRequest::ExtensionPayload(payload) => {
981                    let (interrupt_kind, interrupt_session_id) = parse_test_payload(payload)?;
982                    match interrupt_kind {
983                        "close" if interrupt_session_id == blocking_session_id => {
984                            Some(ExtensionInterruptResponse {
985                                interrupted_response_payload,
986                                interrupting_response_payload: None,
987                            })
988                        }
989                        "cancel" if interrupt_session_id == blocking_session_id => {
990                            Some(ExtensionInterruptResponse {
991                                interrupted_response_payload,
992                                interrupting_response_payload: Some(encode_test_response(
993                                    "cancelled",
994                                    interrupt_session_id,
995                                )),
996                            })
997                        }
998                        "prompt" | "close" | "cancel" => None,
999                        _ => None,
1000                    }
1001                }
1002            }
1003        }
1004    }
1005
1006    fn parse_test_payload(payload: &[u8]) -> Option<(&str, &str)> {
1007        let payload = std::str::from_utf8(payload).ok()?;
1008        payload.split_once(':')
1009    }
1010
1011    fn encode_test_response(kind: &str, session_id: &str) -> Vec<u8> {
1012        format!("{kind}:{session_id}").into_bytes()
1013    }
1014}
1015
1016#[derive(Debug, Clone)]
1017struct LocalBridge {
1018    started_at: Instant,
1019    next_timer_id: usize,
1020    snapshots: BTreeMap<String, FilesystemSnapshot>,
1021}
1022
1023impl Default for LocalBridge {
1024    fn default() -> Self {
1025        Self {
1026            started_at: Instant::now(),
1027            next_timer_id: 0,
1028            snapshots: BTreeMap::new(),
1029        }
1030    }
1031}
1032
1033impl BridgeTypes for LocalBridge {
1034    type Error = LocalBridgeError;
1035}
1036
1037impl FilesystemBridge for LocalBridge {
1038    fn read_file(&mut self, request: ReadFileRequest) -> Result<Vec<u8>, Self::Error> {
1039        fs::read(Self::host_path(&request.path))
1040            .map_err(|error| LocalBridgeError::io("read", &request.path, error))
1041    }
1042
1043    fn write_file(&mut self, request: WriteFileRequest) -> Result<(), Self::Error> {
1044        let host_path = Self::host_path(&request.path);
1045        if let Some(parent) = host_path.parent() {
1046            fs::create_dir_all(parent)
1047                .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))?;
1048        }
1049        fs::write(host_path, request.contents)
1050            .map_err(|error| LocalBridgeError::io("write", &request.path, error))
1051    }
1052
1053    fn stat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
1054        fs::metadata(Self::host_path(&request.path))
1055            .map(Self::file_metadata)
1056            .map_err(|error| LocalBridgeError::io("stat", &request.path, error))
1057    }
1058
1059    fn lstat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
1060        fs::symlink_metadata(Self::host_path(&request.path))
1061            .map(Self::file_metadata)
1062            .map_err(|error| LocalBridgeError::io("lstat", &request.path, error))
1063    }
1064
1065    fn read_dir(&mut self, request: ReadDirRequest) -> Result<Vec<DirectoryEntry>, Self::Error> {
1066        let mut entries = fs::read_dir(Self::host_path(&request.path))
1067            .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?
1068            .map(|entry| {
1069                let entry =
1070                    entry.map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
1071                let kind = entry
1072                    .file_type()
1073                    .map(Self::file_kind)
1074                    .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
1075                Ok(DirectoryEntry {
1076                    name: entry.file_name().to_string_lossy().into_owned(),
1077                    kind,
1078                })
1079            })
1080            .collect::<Result<Vec<_>, LocalBridgeError>>()?;
1081        entries.sort_by(|left, right| left.name.cmp(&right.name));
1082        Ok(entries)
1083    }
1084
1085    fn create_dir(&mut self, request: CreateDirRequest) -> Result<(), Self::Error> {
1086        let host_path = Self::host_path(&request.path);
1087        if request.recursive {
1088            fs::create_dir_all(host_path)
1089        } else {
1090            fs::create_dir(host_path)
1091        }
1092        .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))
1093    }
1094
1095    fn remove_file(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1096        fs::remove_file(Self::host_path(&request.path))
1097            .map_err(|error| LocalBridgeError::io("unlink", &request.path, error))
1098    }
1099
1100    fn remove_dir(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1101        fs::remove_dir(Self::host_path(&request.path))
1102            .map_err(|error| LocalBridgeError::io("rmdir", &request.path, error))
1103    }
1104
1105    fn rename(&mut self, request: RenameRequest) -> Result<(), Self::Error> {
1106        let from_path = Self::host_path(&request.from_path);
1107        let to_path = Self::host_path(&request.to_path);
1108        if let Some(parent) = to_path.parent() {
1109            fs::create_dir_all(parent)
1110                .map_err(|error| LocalBridgeError::io("mkdir", &request.to_path, error))?;
1111        }
1112        fs::rename(from_path, to_path).map_err(|error| {
1113            LocalBridgeError::unsupported(format!(
1114                "rename {} -> {}: {}",
1115                request.from_path, request.to_path, error
1116            ))
1117        })
1118    }
1119
1120    fn symlink(&mut self, request: SymlinkRequest) -> Result<(), Self::Error> {
1121        let link_path = Self::host_path(&request.link_path);
1122        if let Some(parent) = link_path.parent() {
1123            fs::create_dir_all(parent)
1124                .map_err(|error| LocalBridgeError::io("mkdir", &request.link_path, error))?;
1125        }
1126        create_symlink(&request.target_path, link_path)
1127            .map_err(|error| LocalBridgeError::io("symlink", &request.link_path, error))
1128    }
1129
1130    fn read_link(&mut self, request: PathRequest) -> Result<String, Self::Error> {
1131        fs::read_link(Self::host_path(&request.path))
1132            .map(|target| target.to_string_lossy().into_owned())
1133            .map_err(|error| LocalBridgeError::io("readlink", &request.path, error))
1134    }
1135
1136    fn chmod(&mut self, request: ChmodRequest) -> Result<(), Self::Error> {
1137        let permissions = fs::Permissions::from_mode(request.mode);
1138        fs::set_permissions(Self::host_path(&request.path), permissions)
1139            .map_err(|error| LocalBridgeError::io("chmod", &request.path, error))
1140    }
1141
1142    fn truncate(&mut self, request: TruncateRequest) -> Result<(), Self::Error> {
1143        OpenOptions::new()
1144            .write(true)
1145            .create(false)
1146            .open(Self::host_path(&request.path))
1147            .and_then(|file| file.set_len(request.len))
1148            .map_err(|error| LocalBridgeError::io("truncate", &request.path, error))
1149    }
1150
1151    fn exists(&mut self, request: PathRequest) -> Result<bool, Self::Error> {
1152        Ok(fs::symlink_metadata(Self::host_path(&request.path)).is_ok())
1153    }
1154}
1155
1156impl PermissionBridge for LocalBridge {
1157    fn check_filesystem_access(
1158        &mut self,
1159        request: FilesystemPermissionRequest,
1160    ) -> Result<PermissionDecision, Self::Error> {
1161        Ok(PermissionDecision::deny(format!(
1162            "no static filesystem policy registered for {}:{}",
1163            request.vm_id, request.path
1164        )))
1165    }
1166
1167    fn check_network_access(
1168        &mut self,
1169        request: NetworkPermissionRequest,
1170    ) -> Result<PermissionDecision, Self::Error> {
1171        Ok(PermissionDecision::deny(format!(
1172            "no static network policy registered for {}:{}",
1173            request.vm_id, request.resource
1174        )))
1175    }
1176
1177    fn check_command_execution(
1178        &mut self,
1179        request: CommandPermissionRequest,
1180    ) -> Result<PermissionDecision, Self::Error> {
1181        Ok(PermissionDecision::deny(format!(
1182            "no static child_process policy registered for {}:{}",
1183            request.vm_id, request.command
1184        )))
1185    }
1186
1187    fn check_environment_access(
1188        &mut self,
1189        request: EnvironmentPermissionRequest,
1190    ) -> Result<PermissionDecision, Self::Error> {
1191        Ok(PermissionDecision::deny(format!(
1192            "no static env policy registered for {}:{}",
1193            request.vm_id, request.key
1194        )))
1195    }
1196}
1197
1198impl PersistenceBridge for LocalBridge {
1199    fn load_filesystem_state(
1200        &mut self,
1201        request: LoadFilesystemStateRequest,
1202    ) -> Result<Option<FilesystemSnapshot>, Self::Error> {
1203        Ok(self.snapshots.get(&request.vm_id).cloned())
1204    }
1205
1206    fn flush_filesystem_state(
1207        &mut self,
1208        request: FlushFilesystemStateRequest,
1209    ) -> Result<(), Self::Error> {
1210        self.snapshots.insert(request.vm_id, request.snapshot);
1211        Ok(())
1212    }
1213}
1214
1215impl ClockBridge for LocalBridge {
1216    fn wall_clock(&mut self, _request: ClockRequest) -> Result<SystemTime, Self::Error> {
1217        Ok(SystemTime::now())
1218    }
1219
1220    fn monotonic_clock(&mut self, _request: ClockRequest) -> Result<Duration, Self::Error> {
1221        Ok(self.started_at.elapsed())
1222    }
1223
1224    fn schedule_timer(
1225        &mut self,
1226        request: ScheduleTimerRequest,
1227    ) -> Result<ScheduledTimer, Self::Error> {
1228        self.next_timer_id += 1;
1229        Ok(ScheduledTimer {
1230            timer_id: format!("timer-{}", self.next_timer_id),
1231            delay: request.delay,
1232        })
1233    }
1234}
1235
1236impl RandomBridge for LocalBridge {
1237    fn fill_random_bytes(&mut self, request: RandomBytesRequest) -> Result<Vec<u8>, Self::Error> {
1238        Ok(vec![0u8; request.len])
1239    }
1240}
1241
1242impl EventBridge for LocalBridge {
1243    fn emit_structured_event(&mut self, _event: StructuredEventRecord) -> Result<(), Self::Error> {
1244        Ok(())
1245    }
1246
1247    fn emit_diagnostic(&mut self, _event: DiagnosticRecord) -> Result<(), Self::Error> {
1248        Ok(())
1249    }
1250
1251    fn emit_log(&mut self, _event: LogRecord) -> Result<(), Self::Error> {
1252        Ok(())
1253    }
1254
1255    fn emit_lifecycle(&mut self, _event: LifecycleEventRecord) -> Result<(), Self::Error> {
1256        Ok(())
1257    }
1258}
1259
1260impl ExecutionBridge for LocalBridge {
1261    fn create_javascript_context(
1262        &mut self,
1263        _request: CreateJavascriptContextRequest,
1264    ) -> Result<GuestContextHandle, Self::Error> {
1265        Err(LocalBridgeError::unsupported(
1266            "execution bridge is handled internally by the native sidecar",
1267        ))
1268    }
1269
1270    fn create_wasm_context(
1271        &mut self,
1272        _request: CreateWasmContextRequest,
1273    ) -> Result<GuestContextHandle, Self::Error> {
1274        Err(LocalBridgeError::unsupported(
1275            "execution bridge is handled internally by the native sidecar",
1276        ))
1277    }
1278
1279    fn start_execution(
1280        &mut self,
1281        _request: StartExecutionRequest,
1282    ) -> Result<StartedExecution, Self::Error> {
1283        Err(LocalBridgeError::unsupported(
1284            "execution bridge is handled internally by the native sidecar",
1285        ))
1286    }
1287
1288    fn write_stdin(&mut self, _request: WriteExecutionStdinRequest) -> Result<(), Self::Error> {
1289        Err(LocalBridgeError::unsupported(
1290            "execution bridge is handled internally by the native sidecar",
1291        ))
1292    }
1293
1294    fn close_stdin(&mut self, _request: ExecutionHandleRequest) -> Result<(), Self::Error> {
1295        Err(LocalBridgeError::unsupported(
1296            "execution bridge is handled internally by the native sidecar",
1297        ))
1298    }
1299
1300    fn kill_execution(&mut self, _request: KillExecutionRequest) -> Result<(), Self::Error> {
1301        Err(LocalBridgeError::unsupported(
1302            "execution bridge is handled internally by the native sidecar",
1303        ))
1304    }
1305
1306    fn poll_execution_event(
1307        &mut self,
1308        _request: PollExecutionEventRequest,
1309    ) -> Result<Option<ExecutionEvent>, Self::Error> {
1310        Ok(None)
1311    }
1312}
1313
1314#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1315struct SessionScope {
1316    connection_id: String,
1317    session_id: String,
1318}
1319
1320impl SessionScope {
1321    fn ownership_scope(&self) -> OwnershipScope {
1322        session_ownership(&self.connection_id, &self.session_id)
1323    }
1324
1325    fn compat_ownership_scope(&self) -> crate::protocol::OwnershipScope {
1326        wire::ownership_scope_to_compat(self.ownership_scope())
1327    }
1328}
1329
1330struct FrameSidecarRequestTransport {
1331    writer: TrackedSyncSender<ProtocolFrame>,
1332    pending: Arc<Mutex<BTreeMap<RequestId, mpsc::SyncSender<SidecarResponseFrame>>>>,
1333}
1334
1335impl FrameSidecarRequestTransport {
1336    fn new(writer: TrackedSyncSender<ProtocolFrame>) -> Self {
1337        Self {
1338            writer,
1339            pending: Arc::new(Mutex::new(BTreeMap::new())),
1340        }
1341    }
1342
1343    fn accept_response(&self, response: SidecarResponseFrame) -> bool {
1344        let sender = {
1345            let mut pending = match self.pending.lock() {
1346                Ok(pending) => pending,
1347                Err(_) => return false,
1348            };
1349            pending.remove(&response.request_id)
1350        };
1351        let Some(sender) = sender else {
1352            return false;
1353        };
1354        let _ = sender.send(response);
1355        true
1356    }
1357}
1358
1359impl SidecarRequestTransport for FrameSidecarRequestTransport {
1360    fn send_request(
1361        &self,
1362        request: crate::protocol::SidecarRequestFrame,
1363        timeout: Duration,
1364    ) -> Result<crate::protocol::SidecarResponseFrame, SidecarError> {
1365        let request =
1366            wire::sidecar_request_frame_from_compat(request).map_err(wire_protocol_error)?;
1367        let (sender, receiver) = mpsc::sync_channel(1);
1368        self.pending
1369            .lock()
1370            .map_err(|_| {
1371                SidecarError::Bridge(String::from("sidecar callback waiter map lock poisoned"))
1372            })?
1373            .insert(request.request_id, sender);
1374        // Bound the request-frame WRITE by the caller's deadline. The shared
1375        // `send_output_frame` blocks (correct backpressure for the fire-and-forget
1376        // event/response paths), but this request path has a `timeout` that the
1377        // response wait below already honors — so a stalled host stdout must not
1378        // make the *send* block past it. Poll try_send until a slot frees or the
1379        // deadline passes.
1380        let write_deadline = Instant::now() + timeout;
1381        let mut frame = ProtocolFrame::SidecarRequestFrame(request.clone());
1382        let write_result = loop {
1383            match self.writer.try_send(frame) {
1384                Ok(()) => break Ok(()),
1385                Err(mpsc::TrySendError::Disconnected(_)) => {
1386                    break Err(String::from("stdout writer disconnected"));
1387                }
1388                Err(mpsc::TrySendError::Full(returned)) => {
1389                    if Instant::now() >= write_deadline {
1390                        break Err(format!(
1391                            "timed out writing sidecar request frame after {}s",
1392                            timeout.as_secs()
1393                        ));
1394                    }
1395                    frame = returned;
1396                    thread::sleep(Duration::from_millis(1));
1397                }
1398            }
1399        };
1400        if let Err(message) = write_result {
1401            let _ = self
1402                .pending
1403                .lock()
1404                .map(|mut pending| pending.remove(&request.request_id));
1405            return Err(SidecarError::Io(format!(
1406                "failed to write sidecar request frame: {message}"
1407            )));
1408        }
1409        match receiver.recv_timeout(timeout) {
1410            Ok(response) => {
1411                wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)
1412            }
1413            Err(mpsc::RecvTimeoutError::Timeout) => {
1414                let _ = self
1415                    .pending
1416                    .lock()
1417                    .map(|mut pending| pending.remove(&request.request_id));
1418                Err(SidecarError::Io(format!(
1419                    "timed out waiting for sidecar response after {}s",
1420                    timeout.as_secs()
1421                )))
1422            }
1423            Err(mpsc::RecvTimeoutError::Disconnected) => Err(SidecarError::Io(String::from(
1424                "sidecar response waiter disconnected",
1425            ))),
1426        }
1427    }
1428}
1429
1430#[derive(Debug, Clone, PartialEq, Eq)]
1431struct LocalBridgeError {
1432    message: String,
1433}
1434
1435impl LocalBridgeError {
1436    fn unsupported(message: impl Into<String>) -> Self {
1437        Self {
1438            message: message.into(),
1439        }
1440    }
1441
1442    fn io(operation: &str, path: &str, error: io::Error) -> Self {
1443        Self::unsupported(format!("{operation} {path}: {error}"))
1444    }
1445}
1446
1447impl fmt::Display for LocalBridgeError {
1448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1449        f.write_str(&self.message)
1450    }
1451}
1452
1453impl Error for LocalBridgeError {}
1454
1455impl LocalBridge {
1456    fn host_path(path: &str) -> PathBuf {
1457        let candidate = Path::new(path);
1458        if candidate.is_absolute() {
1459            candidate.to_path_buf()
1460        } else {
1461            std::env::current_dir()
1462                .unwrap_or_else(|_| PathBuf::from("."))
1463                .join(candidate)
1464        }
1465    }
1466
1467    fn file_metadata(metadata: fs::Metadata) -> FileMetadata {
1468        FileMetadata {
1469            mode: metadata.permissions().mode(),
1470            size: metadata.size(),
1471            kind: Self::file_kind(metadata.file_type()),
1472        }
1473    }
1474
1475    fn file_kind(file_type: fs::FileType) -> secure_exec_bridge::FileKind {
1476        if file_type.is_file() {
1477            secure_exec_bridge::FileKind::File
1478        } else if file_type.is_dir() {
1479            secure_exec_bridge::FileKind::Directory
1480        } else if file_type.is_symlink() {
1481            secure_exec_bridge::FileKind::SymbolicLink
1482        } else {
1483            secure_exec_bridge::FileKind::Other
1484        }
1485    }
1486}