Skip to main content

secure_exec_sidecar/
stdio.rs

1use crate::wire::{
2    self, AuthenticatedResponse, ExtEnvelope, OwnershipScope, ProtocolCodecError, ProtocolFrame,
3    RequestFrame, RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
4    SidecarResponseFrame, WireDispatchResult, WireFrameCodec,
5};
6use crate::{
7    Extension, ExtensionInterruptRequest, NativeSidecar, NativeSidecarConfig, SidecarError,
8    SidecarRequestTransport,
9};
10use secure_exec_bridge::{
11    BridgeTypes, ChmodRequest, ClockBridge, ClockRequest, CommandPermissionRequest,
12    CreateDirRequest, CreateJavascriptContextRequest, CreateWasmContextRequest, DiagnosticRecord,
13    DirectoryEntry, EnvironmentPermissionRequest, EventBridge, ExecutionBridge, ExecutionEvent,
14    ExecutionHandleRequest, FileMetadata, FilesystemBridge, FilesystemPermissionRequest,
15    FilesystemSnapshot, FlushFilesystemStateRequest, GuestContextHandle, KillExecutionRequest,
16    LifecycleEventRecord, LoadFilesystemStateRequest, LogRecord, NetworkPermissionRequest,
17    PathRequest, PermissionBridge, PermissionDecision, PersistenceBridge,
18    PollExecutionEventRequest, RandomBridge, RandomBytesRequest, ReadDirRequest, ReadFileRequest,
19    RenameRequest, ScheduleTimerRequest, ScheduledTimer, StartExecutionRequest, StartedExecution,
20    StructuredEventRecord, SymlinkRequest, TruncateRequest, WriteExecutionStdinRequest,
21    WriteFileRequest,
22};
23use std::collections::{BTreeMap, BTreeSet};
24use std::error::Error;
25use std::fmt;
26use std::fs::{self, OpenOptions};
27use std::io::{self, Read, Write};
28use std::os::unix::fs::{symlink as create_symlink, MetadataExt, PermissionsExt};
29use std::path::{Path, PathBuf};
30use std::sync::{mpsc, Arc, Mutex};
31use std::thread;
32use std::time::{Duration, Instant, SystemTime};
33use tokio::sync::mpsc::{channel, unbounded_channel, Receiver};
34use tokio::time;
35
36// Guest sync fs/module RPCs are serviced by `pump_process_events` on this timer,
37// so a blocked guest call waits up to one interval before the host even sees it.
38// At 5ms this dominated per-call latency (~5ms/stat); 250us cuts it ~11x (stat
39// 7.5s -> ~0.65s over 1500 ops) and the sub-ms tokio timer is honored. Idle
40// pumps are cheap no-ops (try_recv + zero-timeout poll), so the higher cadence
41// costs negligible CPU when no guest is issuing RPCs.
42const EVENT_PUMP_INTERVAL: Duration = Duration::from_micros(250);
43const MAX_STDIN_FRAME_QUEUE: usize = 128;
44const MAX_EVENT_READY_QUEUE: usize = 1;
45const MAX_STDOUT_FRAME_QUEUE: usize = 128;
46
47#[cfg(test)]
48fn request_frame(
49    request_id: RequestId,
50    ownership: OwnershipScope,
51    payload: RequestPayload,
52) -> RequestFrame {
53    RequestFrame {
54        schema: wire::protocol_schema(),
55        request_id,
56        ownership,
57        payload,
58    }
59}
60
61fn response_frame(
62    request_id: RequestId,
63    ownership: OwnershipScope,
64    payload: ResponsePayload,
65) -> ResponseFrame {
66    ResponseFrame {
67        schema: wire::protocol_schema(),
68        request_id,
69        ownership,
70        payload,
71    }
72}
73
74#[cfg(test)]
75fn connection_ownership(connection_id: &str) -> OwnershipScope {
76    OwnershipScope::ConnectionOwnership(wire::ConnectionOwnership {
77        connection_id: connection_id.to_owned(),
78    })
79}
80
81fn session_ownership(connection_id: &str, session_id: &str) -> OwnershipScope {
82    OwnershipScope::SessionOwnership(wire::SessionOwnership {
83        connection_id: connection_id.to_owned(),
84        session_id: session_id.to_owned(),
85    })
86}
87
88#[cfg(test)]
89fn vm_ownership(connection_id: &str, session_id: &str, vm_id: &str) -> OwnershipScope {
90    OwnershipScope::VmOwnership(wire::VmOwnership {
91        connection_id: connection_id.to_owned(),
92        session_id: session_id.to_owned(),
93        vm_id: vm_id.to_owned(),
94    })
95}
96
97fn wire_protocol_error(error: ProtocolCodecError) -> SidecarError {
98    SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
99}
100
101pub fn run() -> Result<(), Box<dyn Error>> {
102    run_with_extensions(Vec::new())
103}
104
105pub fn run_with_extensions(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
106    tokio::runtime::Builder::new_current_thread()
107        .enable_all()
108        .build()?
109        .block_on(run_async(extensions))
110}
111
112async fn run_async(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
113    let config = NativeSidecarConfig {
114        compile_cache_root: Some(default_compile_cache_root()),
115        ..NativeSidecarConfig::default()
116    };
117    let codec = WireFrameCodec::new(config.max_frame_bytes);
118    let mut sidecar =
119        NativeSidecar::with_config_and_extensions(LocalBridge::default(), config, extensions)?;
120    let mut active_sessions = BTreeSet::<SessionScope>::new();
121    let mut active_connections = BTreeSet::<String>::new();
122    let (stdin_tx, mut stdin_rx) =
123        channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
124    let (event_ready_tx, mut event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
125    let (write_tx, write_rx) = mpsc::sync_channel::<ProtocolFrame>(MAX_STDOUT_FRAME_QUEUE);
126    let (write_error_tx, mut write_error_rx) = unbounded_channel::<String>();
127    let callback_transport = Arc::new(FrameSidecarRequestTransport::new(write_tx.clone()));
128    sidecar.set_sidecar_request_transport(callback_transport.clone());
129    let mut event_pump = time::interval(EVENT_PUMP_INTERVAL);
130    let writer_codec = codec.clone();
131    let reader_codec = codec.clone();
132    let writer_error_tx = write_error_tx.clone();
133    thread::spawn(move || {
134        let mut writer = io::BufWriter::new(io::stdout());
135        while let Ok(frame) = write_rx.recv() {
136            if let Err(error) = write_frame(&writer_codec, &mut writer, &frame) {
137                let _ = writer_error_tx.send(error.to_string());
138                break;
139            }
140        }
141    });
142
143    thread::spawn({
144        let callback_transport = callback_transport.clone();
145        let read_error_tx = write_error_tx.clone();
146        move || {
147            let mut stdin = io::stdin();
148            loop {
149                let frame = match read_frame(&reader_codec, &mut stdin) {
150                    Ok(Some(ProtocolFrame::SidecarResponseFrame(response))) => {
151                        if callback_transport.accept_response(response.clone()) {
152                            continue;
153                        }
154                        Ok(Some(ProtocolFrame::SidecarResponseFrame(response)))
155                    }
156                    Ok(Some(frame)) => Ok(Some(frame)),
157                    other => other,
158                }
159                .map_err(|error: Box<dyn Error>| error.to_string());
160                let should_stop = matches!(frame, Ok(None) | Err(_));
161                match enqueue_stdin_frame(&stdin_tx, frame) {
162                    Ok(()) => {}
163                    Err(StdinFrameQueueError::Full(message)) => {
164                        let _ = read_error_tx.send(message);
165                        break;
166                    }
167                    Err(StdinFrameQueueError::Closed) => break,
168                }
169                if should_stop {
170                    break;
171                }
172            }
173        }
174    });
175
176    flush_sidecar_requests(&mut sidecar, &write_tx)?;
177    let mut pending_frame: Option<ProtocolFrame> = None;
178
179    loop {
180        if let Some(frame) = pending_frame.take() {
181            handle_protocol_frame(
182                frame,
183                &mut sidecar,
184                &mut stdin_rx,
185                &mut pending_frame,
186                &write_tx,
187                &mut active_sessions,
188                &mut active_connections,
189            )
190            .await?;
191            continue;
192        }
193
194        tokio::select! {
195            maybe_frame = stdin_rx.recv() => {
196                let Some(frame) = maybe_frame else {
197                    break;
198                };
199                let Some(frame) = frame.map_err(io::Error::other)? else {
200                    break;
201                };
202                handle_protocol_frame(
203                    frame,
204                    &mut sidecar,
205                    &mut stdin_rx,
206                    &mut pending_frame,
207                    &write_tx,
208                    &mut active_sessions,
209                    &mut active_connections,
210                ).await?;
211            }
212            maybe_ready = event_ready_rx.recv() => {
213                let Some(()) = maybe_ready else {
214                    break;
215                };
216                loop {
217                    let mut emitted_frame = false;
218                    for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
219                        if let Some(frame) = sidecar
220                            .poll_event_wire(&session.ownership_scope(), Duration::ZERO)
221                            .await?
222                        {
223                            send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
224                            emitted_frame = true;
225                        }
226                    }
227
228                    if !emitted_frame {
229                        break;
230                    }
231                }
232                flush_sidecar_requests(&mut sidecar, &write_tx)?;
233            }
234            _ = event_pump.tick() => {
235                for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
236                    if sidecar.pump_process_events(&session.compat_ownership_scope()).await? {
237                        let _ = event_ready_tx.try_send(());
238                    }
239                }
240                flush_sidecar_requests(&mut sidecar, &write_tx)?;
241            }
242            maybe_write_error = write_error_rx.recv() => {
243                if let Some(error) = maybe_write_error {
244                    return Err(io::Error::new(io::ErrorKind::BrokenPipe, error).into());
245                }
246            }
247        }
248    }
249
250    cleanup_connections(&mut sidecar, &active_connections).await;
251    Ok(())
252}
253
254async fn handle_protocol_frame(
255    frame: ProtocolFrame,
256    sidecar: &mut NativeSidecar<LocalBridge>,
257    stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
258    pending_frame: &mut Option<ProtocolFrame>,
259    write_tx: &mpsc::SyncSender<ProtocolFrame>,
260    active_sessions: &mut BTreeSet<SessionScope>,
261    active_connections: &mut BTreeSet<String>,
262) -> Result<(), Box<dyn Error>> {
263    match frame {
264        ProtocolFrame::RequestFrame(request) => {
265            let (dispatch, extra_responses) =
266                dispatch_with_prompt_interrupt(sidecar, request.clone(), stdin_rx, pending_frame)
267                    .await?;
268            track_session_state(
269                &dispatch.response.payload,
270                active_sessions,
271                active_connections,
272            );
273
274            send_output_frame(write_tx, ProtocolFrame::ResponseFrame(dispatch.response))?;
275            for response in extra_responses {
276                send_output_frame(write_tx, ProtocolFrame::ResponseFrame(response))?;
277            }
278            for event in dispatch.events {
279                send_output_frame(write_tx, ProtocolFrame::EventFrame(event))?;
280            }
281            flush_sidecar_requests(sidecar, write_tx)?;
282        }
283        ProtocolFrame::SidecarResponseFrame(response) => {
284            sidecar.accept_wire_sidecar_response(response)?;
285            flush_sidecar_requests(sidecar, write_tx)?;
286        }
287        other => {
288            return Err(format!(
289                "expected request or sidecar_response frame on stdin, received {}",
290                frame_kind(&other)
291            )
292            .into());
293        }
294    }
295    Ok(())
296}
297
298async fn dispatch_with_prompt_interrupt(
299    sidecar: &mut NativeSidecar<LocalBridge>,
300    request: RequestFrame,
301    stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
302    pending_frame: &mut Option<ProtocolFrame>,
303) -> Result<(WireDispatchResult, Vec<ResponseFrame>), Box<dyn Error>> {
304    let Some(blocking_request) = blocking_extension_request(sidecar, &request) else {
305        return Ok((sidecar.dispatch_wire(request).await?, Vec::new()));
306    };
307
308    let mut dispatch = Box::pin(sidecar.dispatch_wire(request.clone()));
309    tokio::select! {
310        result = dispatch.as_mut() => Ok((result?, Vec::new())),
311        maybe_frame = stdin_rx.recv() => {
312            let frame = decode_stdin_frame(maybe_frame)?;
313            if let Some(frame) = frame {
314                if let Some(interrupt) = extension_interrupt_response(&blocking_request, &request, &frame) {
315                    drop(dispatch);
316                    let mut extra_responses = Vec::new();
317                    if let Some(response) = interrupt.interrupting_response {
318                        extra_responses.push(response);
319                    } else {
320                        *pending_frame = Some(frame);
321                    }
322                    return Ok((interrupt.interrupted_dispatch, extra_responses));
323                }
324                *pending_frame = Some(frame);
325            }
326            Ok((dispatch.await?, Vec::new()))
327        }
328    }
329}
330
331fn decode_stdin_frame(
332    maybe_frame: Option<Result<Option<ProtocolFrame>, String>>,
333) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
334    let Some(frame) = maybe_frame else {
335        return Ok(None);
336    };
337    Ok(frame.map_err(io::Error::other)?)
338}
339
340struct BlockingExtensionRequest {
341    namespace: String,
342    payload: Vec<u8>,
343    extension: Arc<dyn Extension>,
344}
345
346struct ExtensionInterruptDispatch {
347    interrupted_dispatch: WireDispatchResult,
348    interrupting_response: Option<ResponseFrame>,
349}
350
351fn blocking_extension_request(
352    sidecar: &NativeSidecar<LocalBridge>,
353    request: &RequestFrame,
354) -> Option<BlockingExtensionRequest> {
355    let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
356        return None;
357    };
358    let extension = sidecar.extensions.get(&envelope.namespace)?.clone();
359    if !extension.is_blocking_request(&envelope.payload) {
360        return None;
361    }
362    Some(BlockingExtensionRequest {
363        namespace: envelope.namespace.clone(),
364        payload: envelope.payload.clone(),
365        extension,
366    })
367}
368
369fn extension_interrupt_response(
370    blocking_request: &BlockingExtensionRequest,
371    active_request: &RequestFrame,
372    frame: &ProtocolFrame,
373) -> Option<ExtensionInterruptDispatch> {
374    match frame {
375        ProtocolFrame::RequestFrame(request) => {
376            if request.ownership != active_request.ownership {
377                return None;
378            }
379            let interrupt = match &request.payload {
380                RequestPayload::ExtEnvelope(envelope)
381                    if envelope.namespace == blocking_request.namespace =>
382                {
383                    blocking_request.extension.interrupt_blocking_request(
384                        &blocking_request.payload,
385                        ExtensionInterruptRequest::ExtensionPayload(&envelope.payload),
386                    )?
387                }
388                RequestPayload::ExtEnvelope(_) => return None,
389                RequestPayload::KillProcessRequest(_) => {
390                    blocking_request.extension.interrupt_blocking_request(
391                        &blocking_request.payload,
392                        ExtensionInterruptRequest::KillProcess,
393                    )?
394                }
395                // Control-plane setup, inspection, filesystem, process plumbing, and
396                // persistence requests run concurrently with an in-flight prompt and
397                // must not interrupt it. DisposeVm is deliberately non-interrupting for
398                // now; see the todo entry about dispose racing a blocked prompt.
399                RequestPayload::AuthenticateRequest(_)
400                | RequestPayload::OpenSessionRequest(_)
401                | RequestPayload::CreateVmRequest(_)
402                | RequestPayload::DisposeVmRequest(_)
403                | RequestPayload::BootstrapRootFilesystemRequest(_)
404                | RequestPayload::ConfigureVmRequest(_)
405                | RequestPayload::RegisterHostCallbacksRequest(_)
406                | RequestPayload::CreateLayerRequest
407                | RequestPayload::SealLayerRequest(_)
408                | RequestPayload::ImportSnapshotRequest(_)
409                | RequestPayload::ExportSnapshotRequest(_)
410                | RequestPayload::CreateOverlayRequest(_)
411                | RequestPayload::GuestFilesystemCallRequest(_)
412                | RequestPayload::SnapshotRootFilesystemRequest
413                | RequestPayload::ExecuteRequest(_)
414                | RequestPayload::WriteStdinRequest(_)
415                | RequestPayload::CloseStdinRequest(_)
416                | RequestPayload::GetProcessSnapshotRequest
417                | RequestPayload::FindListenerRequest(_)
418                | RequestPayload::FindBoundUdpRequest(_)
419                | RequestPayload::VmFetchRequest(_)
420                | RequestPayload::GetSignalStateRequest(_)
421                | RequestPayload::GetZombieTimerCountRequest
422                | RequestPayload::HostFilesystemCallRequest(_)
423                | RequestPayload::PersistenceLoadRequest(_)
424                | RequestPayload::PersistenceFlushRequest(_) => return None,
425            };
426            let interrupted_dispatch = interrupted_extension_dispatch(
427                active_request,
428                &blocking_request.namespace,
429                interrupt.interrupted_response_payload,
430            );
431            let interrupting_response = interrupt.interrupting_response_payload.map(|payload| {
432                response_frame(
433                    request.request_id,
434                    request.ownership.clone(),
435                    ResponsePayload::ExtEnvelope(ExtEnvelope {
436                        namespace: blocking_request.namespace.clone(),
437                        payload,
438                    }),
439                )
440            });
441            Some(ExtensionInterruptDispatch {
442                interrupted_dispatch,
443                interrupting_response,
444            })
445        }
446        // Response, Event, and SidecarRequest frames are sidecar-to-host only. If one
447        // arrives on stdin it is requeued and rejected as a protocol error by
448        // handle_protocol_frame, so it must not synthesize a cancelled prompt first.
449        // SidecarResponse frames answer sidecar-initiated callbacks and may be the very
450        // response the blocked prompt dispatch is waiting on, so they never interrupt.
451        ProtocolFrame::ResponseFrame(_)
452        | ProtocolFrame::EventFrame(_)
453        | ProtocolFrame::SidecarRequestFrame(_)
454        | ProtocolFrame::SidecarResponseFrame(_) => None,
455    }
456}
457
458fn interrupted_extension_dispatch(
459    request: &RequestFrame,
460    namespace: &str,
461    payload: Vec<u8>,
462) -> WireDispatchResult {
463    match &request.payload {
464        RequestPayload::ExtEnvelope(_) => {
465            let response = ResponsePayload::ExtEnvelope(ExtEnvelope {
466                namespace: namespace.to_string(),
467                payload,
468            });
469            WireDispatchResult {
470                response: response_frame(request.request_id, request.ownership.clone(), response),
471                events: Vec::new(),
472            }
473        }
474        RequestPayload::AuthenticateRequest(_)
475        | RequestPayload::OpenSessionRequest(_)
476        | RequestPayload::CreateVmRequest(_)
477        | RequestPayload::DisposeVmRequest(_)
478        | RequestPayload::BootstrapRootFilesystemRequest(_)
479        | RequestPayload::ConfigureVmRequest(_)
480        | RequestPayload::RegisterHostCallbacksRequest(_)
481        | RequestPayload::CreateLayerRequest
482        | RequestPayload::SealLayerRequest(_)
483        | RequestPayload::ImportSnapshotRequest(_)
484        | RequestPayload::ExportSnapshotRequest(_)
485        | RequestPayload::CreateOverlayRequest(_)
486        | RequestPayload::GuestFilesystemCallRequest(_)
487        | RequestPayload::SnapshotRootFilesystemRequest
488        | RequestPayload::ExecuteRequest(_)
489        | RequestPayload::WriteStdinRequest(_)
490        | RequestPayload::CloseStdinRequest(_)
491        | RequestPayload::KillProcessRequest(_)
492        | RequestPayload::GetProcessSnapshotRequest
493        | RequestPayload::FindListenerRequest(_)
494        | RequestPayload::FindBoundUdpRequest(_)
495        | RequestPayload::VmFetchRequest(_)
496        | RequestPayload::GetSignalStateRequest(_)
497        | RequestPayload::GetZombieTimerCountRequest
498        | RequestPayload::HostFilesystemCallRequest(_)
499        | RequestPayload::PersistenceLoadRequest(_)
500        | RequestPayload::PersistenceFlushRequest(_) => {
501            unreachable!("interrupted extension dispatch requires an extension request");
502        }
503    }
504}
505
506async fn cleanup_connections(
507    sidecar: &mut NativeSidecar<LocalBridge>,
508    active_connections: &BTreeSet<String>,
509) {
510    for connection_id in active_connections {
511        let _ = sidecar.remove_connection(connection_id).await;
512    }
513}
514
515fn track_session_state(
516    payload: &ResponsePayload,
517    active_sessions: &mut BTreeSet<SessionScope>,
518    active_connections: &mut BTreeSet<String>,
519) {
520    match payload {
521        ResponsePayload::AuthenticatedResponse(AuthenticatedResponse { connection_id, .. }) => {
522            active_connections.insert(connection_id.clone());
523        }
524        ResponsePayload::SessionOpenedResponse(SessionOpenedResponse {
525            session_id,
526            owner_connection_id,
527        }) => {
528            active_sessions.insert(SessionScope {
529                connection_id: owner_connection_id.clone(),
530                session_id: session_id.clone(),
531            });
532        }
533        _ => {}
534    }
535}
536
537fn read_frame(
538    codec: &WireFrameCodec,
539    reader: &mut impl Read,
540) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
541    let mut prefix = [0u8; 4];
542    match reader.read_exact(&mut prefix) {
543        Ok(()) => {}
544        Err(error) if error.kind() == io::ErrorKind::UnexpectedEof => {
545            return Ok(None);
546        }
547        Err(error) => return Err(error.into()),
548    }
549
550    let declared_len = u32::from_be_bytes(prefix) as usize;
551    if declared_len > codec.max_frame_bytes() {
552        return Err(ProtocolCodecError::FrameTooLarge {
553            size: declared_len,
554            max: codec.max_frame_bytes(),
555        }
556        .into());
557    }
558    let total_len = prefix.len().saturating_add(declared_len);
559    let mut bytes = Vec::with_capacity(total_len);
560    bytes.extend_from_slice(&prefix);
561    bytes.resize(total_len, 0);
562    reader.read_exact(&mut bytes[prefix.len()..])?;
563
564    Ok(Some(codec.decode(&bytes)?))
565}
566
567fn write_frame(
568    codec: &WireFrameCodec,
569    writer: &mut impl Write,
570    frame: &ProtocolFrame,
571) -> Result<(), Box<dyn Error>> {
572    let bytes = codec.encode(frame)?;
573    writer.write_all(&bytes)?;
574    writer.flush()?;
575    Ok(())
576}
577
578fn frame_kind(frame: &ProtocolFrame) -> &'static str {
579    match frame {
580        ProtocolFrame::RequestFrame(_) => "request",
581        ProtocolFrame::ResponseFrame(_) => "response",
582        ProtocolFrame::EventFrame(_) => "event",
583        ProtocolFrame::SidecarRequestFrame(_) => "sidecar_request",
584        ProtocolFrame::SidecarResponseFrame(_) => "sidecar_response",
585    }
586}
587
588#[derive(Debug, Clone, PartialEq, Eq)]
589enum StdinFrameQueueError {
590    Full(String),
591    Closed,
592}
593
594fn enqueue_stdin_frame(
595    sender: &tokio::sync::mpsc::Sender<Result<Option<ProtocolFrame>, String>>,
596    frame: Result<Option<ProtocolFrame>, String>,
597) -> Result<(), StdinFrameQueueError> {
598    sender.try_send(frame).map_err(|error| match error {
599        tokio::sync::mpsc::error::TrySendError::Full(_) => StdinFrameQueueError::Full(format!(
600            "stdin frame queue exceeded {MAX_STDIN_FRAME_QUEUE} pending frames"
601        )),
602        tokio::sync::mpsc::error::TrySendError::Closed(_) => StdinFrameQueueError::Closed,
603    })
604}
605
606fn flush_sidecar_requests(
607    sidecar: &mut NativeSidecar<LocalBridge>,
608    writer: &mpsc::SyncSender<ProtocolFrame>,
609) -> Result<(), Box<dyn Error>> {
610    while let Some(request) = sidecar.pop_wire_sidecar_request()? {
611        send_output_frame(writer, ProtocolFrame::SidecarRequestFrame(request))?;
612    }
613    Ok(())
614}
615
616fn send_output_frame(
617    writer: &mpsc::SyncSender<ProtocolFrame>,
618    frame: ProtocolFrame,
619) -> Result<(), io::Error> {
620    writer.try_send(frame).map_err(|error| {
621        let message = match error {
622            mpsc::TrySendError::Full(_) => {
623                format!("stdout frame queue exceeded {MAX_STDOUT_FRAME_QUEUE} pending frames")
624            }
625            mpsc::TrySendError::Disconnected(_) => String::from("stdout writer disconnected"),
626        };
627        io::Error::new(io::ErrorKind::BrokenPipe, message)
628    })
629}
630
631fn default_compile_cache_root() -> PathBuf {
632    // Stable across sidecar processes so V8 compile-cache (cachedData) survives a
633    // fresh sidecar/VM and benefits cold starts. Previously keyed by PID, which
634    // gave every process an empty cache — cold module imports never reused
635    // compiled bytecode. Entries are namespaced+validated downstream by
636    // `stable_compile_cache_namespace_hash` + V8's source/version checks, so a
637    // shared root is safe; stale or mismatched entries are simply ignored.
638    std::env::temp_dir().join("secure-exec-sidecar-compile-cache")
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644    use crate::wire::{AuthenticateRequest, KillProcessRequest};
645    use crate::{ExtensionContext, ExtensionFuture, ExtensionInterruptResponse, ExtensionResponse};
646    use std::io::Cursor;
647
648    const TEST_EXTENSION_NAMESPACE: &str = "dev.rivet.secure-exec.test.blocking";
649
650    #[test]
651    fn read_frame_rejects_oversized_prefix_before_allocating_payload() {
652        let codec = WireFrameCodec::new(16);
653        let mut reader = Cursor::new((32_u32).to_be_bytes().to_vec());
654
655        let error = read_frame(&codec, &mut reader).expect_err("oversized frame should fail");
656        let error = error
657            .downcast::<ProtocolCodecError>()
658            .expect("protocol codec error");
659        assert!(matches!(
660            *error,
661            ProtocolCodecError::FrameTooLarge { size: 32, max: 16 }
662        ));
663    }
664
665    #[test]
666    fn stdio_work_queues_are_bounded() {
667        let (stdin_tx, _stdin_rx) =
668            channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
669        for _ in 0..MAX_STDIN_FRAME_QUEUE {
670            enqueue_stdin_frame(&stdin_tx, Ok(None))
671                .expect("stdin frame queue should accept capacity");
672        }
673        assert!(matches!(
674            enqueue_stdin_frame(&stdin_tx, Ok(None)),
675            Err(StdinFrameQueueError::Full(_))
676        ));
677
678        let (event_ready_tx, _event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
679        event_ready_tx
680            .try_send(())
681            .expect("event-ready queue should accept capacity");
682        assert!(matches!(
683            event_ready_tx.try_send(()),
684            Err(tokio::sync::mpsc::error::TrySendError::Full(_))
685        ));
686
687        let (stdout_tx, _stdout_rx) = mpsc::sync_channel(MAX_STDOUT_FRAME_QUEUE);
688        for request_id in 0..MAX_STDOUT_FRAME_QUEUE {
689            send_output_frame(
690                &stdout_tx,
691                ProtocolFrame::RequestFrame(request_frame(
692                    request_id as RequestId,
693                    connection_ownership("conn-queue"),
694                    RequestPayload::AuthenticateRequest(AuthenticateRequest {
695                        client_name: String::from("queue-test"),
696                        auth_token: String::from("token"),
697                        protocol_version: wire::PROTOCOL_VERSION,
698                        bridge_version: secure_exec_bridge::bridge_contract().version,
699                    }),
700                )),
701            )
702            .expect("stdout frame queue should accept capacity");
703        }
704        let error = send_output_frame(
705            &stdout_tx,
706            ProtocolFrame::RequestFrame(request_frame(
707                MAX_STDOUT_FRAME_QUEUE as RequestId,
708                connection_ownership("conn-queue"),
709                RequestPayload::AuthenticateRequest(AuthenticateRequest {
710                    client_name: String::from("queue-test"),
711                    auth_token: String::from("token"),
712                    protocol_version: wire::PROTOCOL_VERSION,
713                    bridge_version: secure_exec_bridge::bridge_contract().version,
714                }),
715            )),
716        )
717        .expect_err("stdout frame queue should reject overflow");
718        assert!(
719            error.to_string().contains("stdout frame queue exceeded"),
720            "unexpected stdout queue error: {error}"
721        );
722    }
723
724    #[test]
725    fn read_frame_decodes_wire_authenticate_request() {
726        let codec = WireFrameCodec::new(wire::DEFAULT_MAX_FRAME_BYTES);
727        let frame = ProtocolFrame::RequestFrame(request_frame(
728            1,
729            connection_ownership("client-hint"),
730            RequestPayload::AuthenticateRequest(AuthenticateRequest {
731                client_name: "probe".to_string(),
732                auth_token: "probe-token".to_string(),
733                protocol_version: wire::PROTOCOL_VERSION,
734                bridge_version: secure_exec_bridge::bridge_contract().version,
735            }),
736        ));
737        let encoded = codec.encode(&frame).expect("encode wire frame");
738        let mut reader = Cursor::new(encoded);
739
740        let decoded = read_frame(&codec, &mut reader)
741            .expect("decode bare frame")
742            .expect("frame present");
743
744        assert_eq!(decoded, frame);
745    }
746
747    #[test]
748    fn extension_close_interrupts_matching_blocking_request() {
749        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
750        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
751        let close = ProtocolFrame::RequestFrame(test_extension_request_frame(
752            11,
753            ownership,
754            "close:ext-session-1",
755        ));
756
757        let blocking_request = blocking_extension_request(&prompt);
758        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &close)
759            .expect("close should interrupt prompt");
760
761        assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
762        let ResponsePayload::ExtEnvelope(envelope) =
763            interrupt.interrupted_dispatch.response.payload
764        else {
765            panic!("expected extension response");
766        };
767        assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
768        assert_eq!(envelope.payload, b"prompt-cancelled:ext-session-1");
769    }
770
771    #[test]
772    fn extension_cancel_interrupt_gets_synthetic_response() {
773        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
774        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
775        let cancel = ProtocolFrame::RequestFrame(test_extension_request_frame(
776            11,
777            ownership,
778            "cancel:ext-session-1",
779        ));
780
781        let blocking_request = blocking_extension_request(&prompt);
782        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &cancel)
783            .expect("cancel should interrupt prompt");
784        let response = interrupt
785            .interrupting_response
786            .expect("cancel should get a response");
787
788        assert_eq!(response.request_id, 11);
789        let ResponsePayload::ExtEnvelope(envelope) = response.payload else {
790            panic!("expected extension response");
791        };
792        assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
793        assert_eq!(envelope.payload, b"cancelled:ext-session-1");
794    }
795
796    #[test]
797    fn kill_process_interrupts_blocking_extension_request() {
798        let ownership = vm_ownership("conn-1", "session-1", "vm-1");
799        let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
800        let kill = ProtocolFrame::RequestFrame(request_frame(
801            11,
802            ownership,
803            RequestPayload::KillProcessRequest(KillProcessRequest {
804                process_id: "adapter-process".to_string(),
805                signal: "SIGTERM".to_string(),
806            }),
807        ));
808
809        let blocking_request = blocking_extension_request(&prompt);
810        let interrupt = extension_interrupt_response(&blocking_request, &prompt, &kill)
811            .expect("kill should interrupt prompt");
812
813        assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
814        assert!(interrupt.interrupting_response.is_none());
815    }
816
817    fn test_extension_request_frame(
818        request_id: RequestId,
819        ownership: OwnershipScope,
820        payload: &str,
821    ) -> RequestFrame {
822        request_frame(
823            request_id,
824            ownership,
825            RequestPayload::ExtEnvelope(ExtEnvelope {
826                namespace: TEST_EXTENSION_NAMESPACE.to_string(),
827                payload: payload.as_bytes().to_vec(),
828            }),
829        )
830    }
831
832    fn blocking_extension_request(request: &RequestFrame) -> BlockingExtensionRequest {
833        let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
834            panic!("expected extension request");
835        };
836        BlockingExtensionRequest {
837            namespace: TEST_EXTENSION_NAMESPACE.to_string(),
838            payload: envelope.payload.clone(),
839            extension: Arc::new(TestBlockingInterruptExtension),
840        }
841    }
842
843    struct TestBlockingInterruptExtension;
844
845    impl Extension for TestBlockingInterruptExtension {
846        fn namespace(&self) -> &str {
847            TEST_EXTENSION_NAMESPACE
848        }
849
850        fn handle_request<'a>(
851            &'a self,
852            _ctx: ExtensionContext<'a>,
853            _payload: Vec<u8>,
854        ) -> ExtensionFuture<'a, ExtensionResponse> {
855            Box::pin(async { Ok(ExtensionResponse::new(Vec::new())) })
856        }
857
858        fn is_blocking_request(&self, payload: &[u8]) -> bool {
859            parse_test_payload(payload).is_some_and(|(kind, _session_id)| kind == "prompt")
860        }
861
862        fn interrupt_blocking_request(
863            &self,
864            blocking_payload: &[u8],
865            interrupt: ExtensionInterruptRequest<'_>,
866        ) -> Option<ExtensionInterruptResponse> {
867            let (blocking_kind, blocking_session_id) = parse_test_payload(blocking_payload)?;
868            if blocking_kind != "prompt" {
869                return None;
870            }
871
872            let interrupted_response_payload =
873                encode_test_response("prompt-cancelled", blocking_session_id);
874            match interrupt {
875                ExtensionInterruptRequest::KillProcess => Some(ExtensionInterruptResponse {
876                    interrupted_response_payload,
877                    interrupting_response_payload: None,
878                }),
879                ExtensionInterruptRequest::ExtensionPayload(payload) => {
880                    let (interrupt_kind, interrupt_session_id) = parse_test_payload(payload)?;
881                    match interrupt_kind {
882                        "close" if interrupt_session_id == blocking_session_id => {
883                            Some(ExtensionInterruptResponse {
884                                interrupted_response_payload,
885                                interrupting_response_payload: None,
886                            })
887                        }
888                        "cancel" if interrupt_session_id == blocking_session_id => {
889                            Some(ExtensionInterruptResponse {
890                                interrupted_response_payload,
891                                interrupting_response_payload: Some(encode_test_response(
892                                    "cancelled",
893                                    interrupt_session_id,
894                                )),
895                            })
896                        }
897                        "prompt" | "close" | "cancel" => None,
898                        _ => None,
899                    }
900                }
901            }
902        }
903    }
904
905    fn parse_test_payload(payload: &[u8]) -> Option<(&str, &str)> {
906        let payload = std::str::from_utf8(payload).ok()?;
907        payload.split_once(':')
908    }
909
910    fn encode_test_response(kind: &str, session_id: &str) -> Vec<u8> {
911        format!("{kind}:{session_id}").into_bytes()
912    }
913}
914
915#[derive(Debug, Clone)]
916struct LocalBridge {
917    started_at: Instant,
918    next_timer_id: usize,
919    snapshots: BTreeMap<String, FilesystemSnapshot>,
920}
921
922impl Default for LocalBridge {
923    fn default() -> Self {
924        Self {
925            started_at: Instant::now(),
926            next_timer_id: 0,
927            snapshots: BTreeMap::new(),
928        }
929    }
930}
931
932impl BridgeTypes for LocalBridge {
933    type Error = LocalBridgeError;
934}
935
936impl FilesystemBridge for LocalBridge {
937    fn read_file(&mut self, request: ReadFileRequest) -> Result<Vec<u8>, Self::Error> {
938        fs::read(Self::host_path(&request.path))
939            .map_err(|error| LocalBridgeError::io("read", &request.path, error))
940    }
941
942    fn write_file(&mut self, request: WriteFileRequest) -> Result<(), Self::Error> {
943        let host_path = Self::host_path(&request.path);
944        if let Some(parent) = host_path.parent() {
945            fs::create_dir_all(parent)
946                .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))?;
947        }
948        fs::write(host_path, request.contents)
949            .map_err(|error| LocalBridgeError::io("write", &request.path, error))
950    }
951
952    fn stat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
953        fs::metadata(Self::host_path(&request.path))
954            .map(Self::file_metadata)
955            .map_err(|error| LocalBridgeError::io("stat", &request.path, error))
956    }
957
958    fn lstat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
959        fs::symlink_metadata(Self::host_path(&request.path))
960            .map(Self::file_metadata)
961            .map_err(|error| LocalBridgeError::io("lstat", &request.path, error))
962    }
963
964    fn read_dir(&mut self, request: ReadDirRequest) -> Result<Vec<DirectoryEntry>, Self::Error> {
965        let mut entries = fs::read_dir(Self::host_path(&request.path))
966            .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?
967            .map(|entry| {
968                let entry =
969                    entry.map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
970                let kind = entry
971                    .file_type()
972                    .map(Self::file_kind)
973                    .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
974                Ok(DirectoryEntry {
975                    name: entry.file_name().to_string_lossy().into_owned(),
976                    kind,
977                })
978            })
979            .collect::<Result<Vec<_>, LocalBridgeError>>()?;
980        entries.sort_by(|left, right| left.name.cmp(&right.name));
981        Ok(entries)
982    }
983
984    fn create_dir(&mut self, request: CreateDirRequest) -> Result<(), Self::Error> {
985        let host_path = Self::host_path(&request.path);
986        if request.recursive {
987            fs::create_dir_all(host_path)
988        } else {
989            fs::create_dir(host_path)
990        }
991        .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))
992    }
993
994    fn remove_file(&mut self, request: PathRequest) -> Result<(), Self::Error> {
995        fs::remove_file(Self::host_path(&request.path))
996            .map_err(|error| LocalBridgeError::io("unlink", &request.path, error))
997    }
998
999    fn remove_dir(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1000        fs::remove_dir(Self::host_path(&request.path))
1001            .map_err(|error| LocalBridgeError::io("rmdir", &request.path, error))
1002    }
1003
1004    fn rename(&mut self, request: RenameRequest) -> Result<(), Self::Error> {
1005        let from_path = Self::host_path(&request.from_path);
1006        let to_path = Self::host_path(&request.to_path);
1007        if let Some(parent) = to_path.parent() {
1008            fs::create_dir_all(parent)
1009                .map_err(|error| LocalBridgeError::io("mkdir", &request.to_path, error))?;
1010        }
1011        fs::rename(from_path, to_path).map_err(|error| {
1012            LocalBridgeError::unsupported(format!(
1013                "rename {} -> {}: {}",
1014                request.from_path, request.to_path, error
1015            ))
1016        })
1017    }
1018
1019    fn symlink(&mut self, request: SymlinkRequest) -> Result<(), Self::Error> {
1020        let link_path = Self::host_path(&request.link_path);
1021        if let Some(parent) = link_path.parent() {
1022            fs::create_dir_all(parent)
1023                .map_err(|error| LocalBridgeError::io("mkdir", &request.link_path, error))?;
1024        }
1025        create_symlink(&request.target_path, link_path)
1026            .map_err(|error| LocalBridgeError::io("symlink", &request.link_path, error))
1027    }
1028
1029    fn read_link(&mut self, request: PathRequest) -> Result<String, Self::Error> {
1030        fs::read_link(Self::host_path(&request.path))
1031            .map(|target| target.to_string_lossy().into_owned())
1032            .map_err(|error| LocalBridgeError::io("readlink", &request.path, error))
1033    }
1034
1035    fn chmod(&mut self, request: ChmodRequest) -> Result<(), Self::Error> {
1036        let permissions = fs::Permissions::from_mode(request.mode);
1037        fs::set_permissions(Self::host_path(&request.path), permissions)
1038            .map_err(|error| LocalBridgeError::io("chmod", &request.path, error))
1039    }
1040
1041    fn truncate(&mut self, request: TruncateRequest) -> Result<(), Self::Error> {
1042        OpenOptions::new()
1043            .write(true)
1044            .create(false)
1045            .open(Self::host_path(&request.path))
1046            .and_then(|file| file.set_len(request.len))
1047            .map_err(|error| LocalBridgeError::io("truncate", &request.path, error))
1048    }
1049
1050    fn exists(&mut self, request: PathRequest) -> Result<bool, Self::Error> {
1051        Ok(fs::symlink_metadata(Self::host_path(&request.path)).is_ok())
1052    }
1053}
1054
1055impl PermissionBridge for LocalBridge {
1056    fn check_filesystem_access(
1057        &mut self,
1058        request: FilesystemPermissionRequest,
1059    ) -> Result<PermissionDecision, Self::Error> {
1060        Ok(PermissionDecision::deny(format!(
1061            "no static filesystem policy registered for {}:{}",
1062            request.vm_id, request.path
1063        )))
1064    }
1065
1066    fn check_network_access(
1067        &mut self,
1068        request: NetworkPermissionRequest,
1069    ) -> Result<PermissionDecision, Self::Error> {
1070        Ok(PermissionDecision::deny(format!(
1071            "no static network policy registered for {}:{}",
1072            request.vm_id, request.resource
1073        )))
1074    }
1075
1076    fn check_command_execution(
1077        &mut self,
1078        request: CommandPermissionRequest,
1079    ) -> Result<PermissionDecision, Self::Error> {
1080        Ok(PermissionDecision::deny(format!(
1081            "no static child_process policy registered for {}:{}",
1082            request.vm_id, request.command
1083        )))
1084    }
1085
1086    fn check_environment_access(
1087        &mut self,
1088        request: EnvironmentPermissionRequest,
1089    ) -> Result<PermissionDecision, Self::Error> {
1090        Ok(PermissionDecision::deny(format!(
1091            "no static env policy registered for {}:{}",
1092            request.vm_id, request.key
1093        )))
1094    }
1095}
1096
1097impl PersistenceBridge for LocalBridge {
1098    fn load_filesystem_state(
1099        &mut self,
1100        request: LoadFilesystemStateRequest,
1101    ) -> Result<Option<FilesystemSnapshot>, Self::Error> {
1102        Ok(self.snapshots.get(&request.vm_id).cloned())
1103    }
1104
1105    fn flush_filesystem_state(
1106        &mut self,
1107        request: FlushFilesystemStateRequest,
1108    ) -> Result<(), Self::Error> {
1109        self.snapshots.insert(request.vm_id, request.snapshot);
1110        Ok(())
1111    }
1112}
1113
1114impl ClockBridge for LocalBridge {
1115    fn wall_clock(&mut self, _request: ClockRequest) -> Result<SystemTime, Self::Error> {
1116        Ok(SystemTime::now())
1117    }
1118
1119    fn monotonic_clock(&mut self, _request: ClockRequest) -> Result<Duration, Self::Error> {
1120        Ok(self.started_at.elapsed())
1121    }
1122
1123    fn schedule_timer(
1124        &mut self,
1125        request: ScheduleTimerRequest,
1126    ) -> Result<ScheduledTimer, Self::Error> {
1127        self.next_timer_id += 1;
1128        Ok(ScheduledTimer {
1129            timer_id: format!("timer-{}", self.next_timer_id),
1130            delay: request.delay,
1131        })
1132    }
1133}
1134
1135impl RandomBridge for LocalBridge {
1136    fn fill_random_bytes(&mut self, request: RandomBytesRequest) -> Result<Vec<u8>, Self::Error> {
1137        Ok(vec![0u8; request.len])
1138    }
1139}
1140
1141impl EventBridge for LocalBridge {
1142    fn emit_structured_event(&mut self, _event: StructuredEventRecord) -> Result<(), Self::Error> {
1143        Ok(())
1144    }
1145
1146    fn emit_diagnostic(&mut self, _event: DiagnosticRecord) -> Result<(), Self::Error> {
1147        Ok(())
1148    }
1149
1150    fn emit_log(&mut self, _event: LogRecord) -> Result<(), Self::Error> {
1151        Ok(())
1152    }
1153
1154    fn emit_lifecycle(&mut self, _event: LifecycleEventRecord) -> Result<(), Self::Error> {
1155        Ok(())
1156    }
1157}
1158
1159impl ExecutionBridge for LocalBridge {
1160    fn create_javascript_context(
1161        &mut self,
1162        _request: CreateJavascriptContextRequest,
1163    ) -> Result<GuestContextHandle, Self::Error> {
1164        Err(LocalBridgeError::unsupported(
1165            "execution bridge is handled internally by the native sidecar",
1166        ))
1167    }
1168
1169    fn create_wasm_context(
1170        &mut self,
1171        _request: CreateWasmContextRequest,
1172    ) -> Result<GuestContextHandle, Self::Error> {
1173        Err(LocalBridgeError::unsupported(
1174            "execution bridge is handled internally by the native sidecar",
1175        ))
1176    }
1177
1178    fn start_execution(
1179        &mut self,
1180        _request: StartExecutionRequest,
1181    ) -> Result<StartedExecution, Self::Error> {
1182        Err(LocalBridgeError::unsupported(
1183            "execution bridge is handled internally by the native sidecar",
1184        ))
1185    }
1186
1187    fn write_stdin(&mut self, _request: WriteExecutionStdinRequest) -> Result<(), Self::Error> {
1188        Err(LocalBridgeError::unsupported(
1189            "execution bridge is handled internally by the native sidecar",
1190        ))
1191    }
1192
1193    fn close_stdin(&mut self, _request: ExecutionHandleRequest) -> Result<(), Self::Error> {
1194        Err(LocalBridgeError::unsupported(
1195            "execution bridge is handled internally by the native sidecar",
1196        ))
1197    }
1198
1199    fn kill_execution(&mut self, _request: KillExecutionRequest) -> Result<(), Self::Error> {
1200        Err(LocalBridgeError::unsupported(
1201            "execution bridge is handled internally by the native sidecar",
1202        ))
1203    }
1204
1205    fn poll_execution_event(
1206        &mut self,
1207        _request: PollExecutionEventRequest,
1208    ) -> Result<Option<ExecutionEvent>, Self::Error> {
1209        Ok(None)
1210    }
1211}
1212
1213#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1214struct SessionScope {
1215    connection_id: String,
1216    session_id: String,
1217}
1218
1219impl SessionScope {
1220    fn ownership_scope(&self) -> OwnershipScope {
1221        session_ownership(&self.connection_id, &self.session_id)
1222    }
1223
1224    fn compat_ownership_scope(&self) -> crate::protocol::OwnershipScope {
1225        wire::ownership_scope_to_compat(self.ownership_scope())
1226    }
1227}
1228
1229struct FrameSidecarRequestTransport {
1230    writer: mpsc::SyncSender<ProtocolFrame>,
1231    pending: Arc<Mutex<BTreeMap<RequestId, mpsc::SyncSender<SidecarResponseFrame>>>>,
1232}
1233
1234impl FrameSidecarRequestTransport {
1235    fn new(writer: mpsc::SyncSender<ProtocolFrame>) -> Self {
1236        Self {
1237            writer,
1238            pending: Arc::new(Mutex::new(BTreeMap::new())),
1239        }
1240    }
1241
1242    fn accept_response(&self, response: SidecarResponseFrame) -> bool {
1243        let sender = {
1244            let mut pending = match self.pending.lock() {
1245                Ok(pending) => pending,
1246                Err(_) => return false,
1247            };
1248            pending.remove(&response.request_id)
1249        };
1250        let Some(sender) = sender else {
1251            return false;
1252        };
1253        let _ = sender.send(response);
1254        true
1255    }
1256}
1257
1258impl SidecarRequestTransport for FrameSidecarRequestTransport {
1259    fn send_request(
1260        &self,
1261        request: crate::protocol::SidecarRequestFrame,
1262        timeout: Duration,
1263    ) -> Result<crate::protocol::SidecarResponseFrame, SidecarError> {
1264        let request =
1265            wire::sidecar_request_frame_from_compat(request).map_err(wire_protocol_error)?;
1266        let (sender, receiver) = mpsc::sync_channel(1);
1267        self.pending
1268            .lock()
1269            .map_err(|_| {
1270                SidecarError::Bridge(String::from("sidecar callback waiter map lock poisoned"))
1271            })?
1272            .insert(request.request_id, sender);
1273        if let Err(error) = send_output_frame(
1274            &self.writer,
1275            ProtocolFrame::SidecarRequestFrame(request.clone()),
1276        ) {
1277            let _ = self
1278                .pending
1279                .lock()
1280                .map(|mut pending| pending.remove(&request.request_id));
1281            return Err(SidecarError::Io(format!(
1282                "failed to write sidecar request frame: {error}"
1283            )));
1284        }
1285        match receiver.recv_timeout(timeout) {
1286            Ok(response) => {
1287                wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)
1288            }
1289            Err(mpsc::RecvTimeoutError::Timeout) => {
1290                let _ = self
1291                    .pending
1292                    .lock()
1293                    .map(|mut pending| pending.remove(&request.request_id));
1294                Err(SidecarError::Io(format!(
1295                    "timed out waiting for sidecar response after {}s",
1296                    timeout.as_secs()
1297                )))
1298            }
1299            Err(mpsc::RecvTimeoutError::Disconnected) => Err(SidecarError::Io(String::from(
1300                "sidecar response waiter disconnected",
1301            ))),
1302        }
1303    }
1304}
1305
1306#[derive(Debug, Clone, PartialEq, Eq)]
1307struct LocalBridgeError {
1308    message: String,
1309}
1310
1311impl LocalBridgeError {
1312    fn unsupported(message: impl Into<String>) -> Self {
1313        Self {
1314            message: message.into(),
1315        }
1316    }
1317
1318    fn io(operation: &str, path: &str, error: io::Error) -> Self {
1319        Self::unsupported(format!("{operation} {path}: {error}"))
1320    }
1321}
1322
1323impl fmt::Display for LocalBridgeError {
1324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1325        f.write_str(&self.message)
1326    }
1327}
1328
1329impl Error for LocalBridgeError {}
1330
1331impl LocalBridge {
1332    fn host_path(path: &str) -> PathBuf {
1333        let candidate = Path::new(path);
1334        if candidate.is_absolute() {
1335            candidate.to_path_buf()
1336        } else {
1337            std::env::current_dir()
1338                .unwrap_or_else(|_| PathBuf::from("."))
1339                .join(candidate)
1340        }
1341    }
1342
1343    fn file_metadata(metadata: fs::Metadata) -> FileMetadata {
1344        FileMetadata {
1345            mode: metadata.permissions().mode(),
1346            size: metadata.size(),
1347            kind: Self::file_kind(metadata.file_type()),
1348        }
1349    }
1350
1351    fn file_kind(file_type: fs::FileType) -> secure_exec_bridge::FileKind {
1352        if file_type.is_file() {
1353            secure_exec_bridge::FileKind::File
1354        } else if file_type.is_dir() {
1355            secure_exec_bridge::FileKind::Directory
1356        } else if file_type.is_symlink() {
1357            secure_exec_bridge::FileKind::SymbolicLink
1358        } else {
1359            secure_exec_bridge::FileKind::Other
1360        }
1361    }
1362}