1use crate::wire::{
2 self, AuthenticatedResponse, ExtEnvelope, OwnershipScope, ProtocolCodecError, ProtocolFrame,
3 RequestFrame, RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
4 SidecarResponseFrame, WireDispatchResult, WireFrameCodec,
5};
6use crate::{
7 Extension, ExtensionInterruptRequest, NativeSidecar, NativeSidecarConfig, SidecarError,
8 SidecarRequestTransport,
9};
10use secure_exec_bridge::queue_tracker::{tracked_sync_channel, TrackedLimit, TrackedSyncSender};
11use secure_exec_bridge::{
12 BridgeTypes, ChmodRequest, ClockBridge, ClockRequest, CommandPermissionRequest,
13 CreateDirRequest, CreateJavascriptContextRequest, CreateWasmContextRequest, DiagnosticRecord,
14 DirectoryEntry, EnvironmentPermissionRequest, EventBridge, ExecutionBridge, ExecutionEvent,
15 ExecutionHandleRequest, FileMetadata, FilesystemBridge, FilesystemPermissionRequest,
16 FilesystemSnapshot, FlushFilesystemStateRequest, GuestContextHandle, KillExecutionRequest,
17 LifecycleEventRecord, LoadFilesystemStateRequest, LogRecord, NetworkPermissionRequest,
18 PathRequest, PermissionBridge, PermissionDecision, PersistenceBridge,
19 PollExecutionEventRequest, RandomBridge, RandomBytesRequest, ReadDirRequest, ReadFileRequest,
20 RenameRequest, ScheduleTimerRequest, ScheduledTimer, StartExecutionRequest, StartedExecution,
21 StructuredEventRecord, SymlinkRequest, TruncateRequest, WriteExecutionStdinRequest,
22 WriteFileRequest,
23};
24use std::collections::{BTreeMap, BTreeSet};
25use std::error::Error;
26use std::fmt;
27use std::fs::{self, OpenOptions};
28use std::io::{self, Read, Write};
29use std::os::unix::fs::{symlink as create_symlink, MetadataExt, PermissionsExt};
30use std::path::{Path, PathBuf};
31use std::sync::{mpsc, Arc, Mutex};
32use std::thread;
33use std::time::{Duration, Instant, SystemTime};
34use tokio::sync::mpsc::{channel, unbounded_channel, Receiver};
35use tokio::time;
36
37const EVENT_PUMP_INTERVAL: Duration = Duration::from_micros(250);
44const MAX_STDIN_FRAME_QUEUE: usize = 128;
45const MAX_EVENT_READY_QUEUE: usize = 1;
46const MAX_STDOUT_FRAME_QUEUE: usize = 4096;
50
51#[cfg(test)]
52fn request_frame(
53 request_id: RequestId,
54 ownership: OwnershipScope,
55 payload: RequestPayload,
56) -> RequestFrame {
57 RequestFrame {
58 schema: wire::protocol_schema(),
59 request_id,
60 ownership,
61 payload,
62 }
63}
64
65fn response_frame(
66 request_id: RequestId,
67 ownership: OwnershipScope,
68 payload: ResponsePayload,
69) -> ResponseFrame {
70 ResponseFrame {
71 schema: wire::protocol_schema(),
72 request_id,
73 ownership,
74 payload,
75 }
76}
77
78#[cfg(test)]
79fn connection_ownership(connection_id: &str) -> OwnershipScope {
80 OwnershipScope::ConnectionOwnership(wire::ConnectionOwnership {
81 connection_id: connection_id.to_owned(),
82 })
83}
84
85fn session_ownership(connection_id: &str, session_id: &str) -> OwnershipScope {
86 OwnershipScope::SessionOwnership(wire::SessionOwnership {
87 connection_id: connection_id.to_owned(),
88 session_id: session_id.to_owned(),
89 })
90}
91
92#[cfg(test)]
93fn vm_ownership(connection_id: &str, session_id: &str, vm_id: &str) -> OwnershipScope {
94 OwnershipScope::VmOwnership(wire::VmOwnership {
95 connection_id: connection_id.to_owned(),
96 session_id: session_id.to_owned(),
97 vm_id: vm_id.to_owned(),
98 })
99}
100
101fn wire_protocol_error(error: ProtocolCodecError) -> SidecarError {
102 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
103}
104
105pub fn run() -> Result<(), Box<dyn Error>> {
106 run_with_extensions(Vec::new())
107}
108
109pub fn run_with_extensions(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
110 if let Err(error) = secure_exec_execution::v8_host::ensure_runtime_initialized() {
115 eprintln!("embedded V8 runtime init failed at startup: {error}");
116 }
117 tokio::runtime::Builder::new_current_thread()
118 .enable_all()
119 .build()?
120 .block_on(run_async(extensions))
121}
122
123async fn run_async(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
124 let config = NativeSidecarConfig {
125 compile_cache_root: Some(default_compile_cache_root()),
126 ..NativeSidecarConfig::default()
127 };
128 let codec = WireFrameCodec::new(config.max_frame_bytes);
129 let mut sidecar =
130 NativeSidecar::with_config_and_extensions(LocalBridge::default(), config, extensions)?;
131 let mut active_sessions = BTreeSet::<SessionScope>::new();
132 let mut active_connections = BTreeSet::<String>::new();
133 let (stdin_tx, mut stdin_rx) =
134 channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
135 let stdin_gauge = secure_exec_bridge::queue_tracker::register_queue(
136 TrackedLimit::SidecarStdinFrames,
137 MAX_STDIN_FRAME_QUEUE,
138 );
139 let (event_ready_tx, mut event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
140 let (write_tx, write_rx) = tracked_sync_channel::<ProtocolFrame>(
141 TrackedLimit::SidecarStdoutFrames,
142 MAX_STDOUT_FRAME_QUEUE,
143 );
144 let (write_error_tx, mut write_error_rx) = unbounded_channel::<String>();
145
146 let (limit_warning_tx, mut limit_warning_rx) =
152 unbounded_channel::<secure_exec_bridge::queue_tracker::LimitWarning>();
153 secure_exec_bridge::queue_tracker::set_limit_warning_handler(Box::new(move |warning| {
154 let _ = limit_warning_tx.send(warning.clone());
155 }));
156 let callback_transport = Arc::new(FrameSidecarRequestTransport::new(write_tx.clone()));
157 sidecar.set_sidecar_request_transport(callback_transport.clone());
158 let mut event_pump = time::interval(EVENT_PUMP_INTERVAL);
159 let writer_codec = codec.clone();
160 let reader_codec = codec.clone();
161 let writer_error_tx = write_error_tx.clone();
162 thread::spawn(move || {
163 let mut writer = io::BufWriter::new(io::stdout());
164 while let Ok(frame) = write_rx.recv() {
165 if let Err(error) = write_frame(&writer_codec, &mut writer, &frame) {
166 let _ = writer_error_tx.send(error.to_string());
167 break;
168 }
169 }
170 });
171
172 thread::spawn({
173 let callback_transport = callback_transport.clone();
174 let read_error_tx = write_error_tx.clone();
175 move || {
176 let mut stdin = io::stdin();
177 loop {
178 let frame = match read_frame(&reader_codec, &mut stdin) {
179 Ok(Some(ProtocolFrame::SidecarResponseFrame(response))) => {
180 if callback_transport.accept_response(response.clone()) {
181 continue;
182 }
183 Ok(Some(ProtocolFrame::SidecarResponseFrame(response)))
184 }
185 Ok(Some(frame)) => Ok(Some(frame)),
186 other => other,
187 }
188 .map_err(|error: Box<dyn Error>| error.to_string());
189 let should_stop = matches!(frame, Ok(None) | Err(_));
190 match enqueue_stdin_frame(&stdin_tx, frame) {
191 Ok(()) => {
192 stdin_gauge.observe_depth(
195 stdin_tx.max_capacity().saturating_sub(stdin_tx.capacity()),
196 );
197 }
198 Err(StdinFrameQueueError::Full(message)) => {
199 let _ = read_error_tx.send(message);
200 break;
201 }
202 Err(StdinFrameQueueError::Closed) => break,
203 }
204 if should_stop {
205 break;
206 }
207 }
208 }
209 });
210
211 flush_sidecar_requests(&mut sidecar, &write_tx)?;
212 let mut pending_frame: Option<ProtocolFrame> = None;
213 let mut limit_warning_closed = false;
214
215 loop {
216 if let Some(frame) = pending_frame.take() {
217 handle_protocol_frame(
218 frame,
219 &mut sidecar,
220 &mut stdin_rx,
221 &mut pending_frame,
222 &write_tx,
223 &mut active_sessions,
224 &mut active_connections,
225 )
226 .await?;
227 continue;
228 }
229
230 tokio::select! {
231 maybe_frame = stdin_rx.recv() => {
232 let Some(frame) = maybe_frame else {
233 break;
234 };
235 let Some(frame) = frame.map_err(io::Error::other)? else {
236 break;
237 };
238 handle_protocol_frame(
239 frame,
240 &mut sidecar,
241 &mut stdin_rx,
242 &mut pending_frame,
243 &write_tx,
244 &mut active_sessions,
245 &mut active_connections,
246 ).await?;
247 }
248 maybe_warning = limit_warning_rx.recv(), if !limit_warning_closed => {
249 match maybe_warning {
250 Some(warning) => {
251 if let Some(connection_id) = active_connections.iter().next() {
257 let mut detail = std::collections::HashMap::new();
258 detail.insert(String::from("limit"), warning.name.as_str().to_string());
259 detail.insert(
260 String::from("category"),
261 warning.category.as_str().to_string(),
262 );
263 detail.insert(String::from("observed"), warning.observed.to_string());
264 detail.insert(String::from("capacity"), warning.capacity.to_string());
265 detail.insert(
266 String::from("fillPercent"),
267 warning.fill_percent.to_string(),
268 );
269 let frame = crate::service::structured_event_frame(
270 connection_id,
271 "limit_warning",
272 detail,
273 )?;
274 send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
275 }
276 }
277 None => {
278 limit_warning_closed = true;
283 }
284 }
285 }
286 maybe_ready = event_ready_rx.recv() => {
287 let Some(()) = maybe_ready else {
288 break;
289 };
290 loop {
291 let mut emitted_frame = false;
292 for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
293 if let Some(frame) = sidecar
294 .poll_event_wire(&session.ownership_scope(), Duration::ZERO)
295 .await?
296 {
297 send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
298 emitted_frame = true;
299 }
300 }
301
302 if !emitted_frame {
303 break;
304 }
305 }
306 flush_sidecar_requests(&mut sidecar, &write_tx)?;
307 }
308 _ = event_pump.tick() => {
309 for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
310 if sidecar.pump_process_events(&session.compat_ownership_scope()).await? {
311 let _ = event_ready_tx.try_send(());
312 }
313 }
314 flush_sidecar_requests(&mut sidecar, &write_tx)?;
315 }
316 maybe_write_error = write_error_rx.recv() => {
317 if let Some(error) = maybe_write_error {
318 return Err(io::Error::new(io::ErrorKind::BrokenPipe, error).into());
319 }
320 }
321 }
322 }
323
324 cleanup_connections(&mut sidecar, &active_connections).await;
325 Ok(())
326}
327
328async fn handle_protocol_frame(
329 frame: ProtocolFrame,
330 sidecar: &mut NativeSidecar<LocalBridge>,
331 stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
332 pending_frame: &mut Option<ProtocolFrame>,
333 write_tx: &TrackedSyncSender<ProtocolFrame>,
334 active_sessions: &mut BTreeSet<SessionScope>,
335 active_connections: &mut BTreeSet<String>,
336) -> Result<(), Box<dyn Error>> {
337 match frame {
338 ProtocolFrame::RequestFrame(request) => {
339 let (dispatch, extra_responses) =
340 dispatch_with_prompt_interrupt(sidecar, request.clone(), stdin_rx, pending_frame)
341 .await?;
342 track_session_state(
343 &dispatch.response.payload,
344 active_sessions,
345 active_connections,
346 );
347
348 send_output_frame(write_tx, ProtocolFrame::ResponseFrame(dispatch.response))?;
349 for response in extra_responses {
350 send_output_frame(write_tx, ProtocolFrame::ResponseFrame(response))?;
351 }
352 for event in dispatch.events {
353 send_output_frame(write_tx, ProtocolFrame::EventFrame(event))?;
354 }
355 flush_sidecar_requests(sidecar, write_tx)?;
356 }
357 ProtocolFrame::SidecarResponseFrame(response) => {
358 sidecar.accept_wire_sidecar_response(response)?;
359 flush_sidecar_requests(sidecar, write_tx)?;
360 }
361 other => {
362 return Err(format!(
363 "expected request or sidecar_response frame on stdin, received {}",
364 frame_kind(&other)
365 )
366 .into());
367 }
368 }
369 Ok(())
370}
371
372async fn dispatch_with_prompt_interrupt(
373 sidecar: &mut NativeSidecar<LocalBridge>,
374 request: RequestFrame,
375 stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
376 pending_frame: &mut Option<ProtocolFrame>,
377) -> Result<(WireDispatchResult, Vec<ResponseFrame>), Box<dyn Error>> {
378 let Some(blocking_request) = blocking_extension_request(sidecar, &request) else {
379 return Ok((sidecar.dispatch_wire(request).await?, Vec::new()));
380 };
381
382 let mut dispatch = Box::pin(sidecar.dispatch_wire(request.clone()));
383 tokio::select! {
384 result = dispatch.as_mut() => Ok((result?, Vec::new())),
385 maybe_frame = stdin_rx.recv() => {
386 let frame = decode_stdin_frame(maybe_frame)?;
387 if let Some(frame) = frame {
388 if let Some(interrupt) = extension_interrupt_response(&blocking_request, &request, &frame) {
389 drop(dispatch);
390 let mut extra_responses = Vec::new();
391 if let Some(response) = interrupt.interrupting_response {
392 extra_responses.push(response);
393 } else {
394 *pending_frame = Some(frame);
395 }
396 return Ok((interrupt.interrupted_dispatch, extra_responses));
397 }
398 *pending_frame = Some(frame);
399 }
400 Ok((dispatch.await?, Vec::new()))
401 }
402 }
403}
404
405fn decode_stdin_frame(
406 maybe_frame: Option<Result<Option<ProtocolFrame>, String>>,
407) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
408 let Some(frame) = maybe_frame else {
409 return Ok(None);
410 };
411 Ok(frame.map_err(io::Error::other)?)
412}
413
414struct BlockingExtensionRequest {
415 namespace: String,
416 payload: Vec<u8>,
417 extension: Arc<dyn Extension>,
418}
419
420struct ExtensionInterruptDispatch {
421 interrupted_dispatch: WireDispatchResult,
422 interrupting_response: Option<ResponseFrame>,
423}
424
425fn blocking_extension_request(
426 sidecar: &NativeSidecar<LocalBridge>,
427 request: &RequestFrame,
428) -> Option<BlockingExtensionRequest> {
429 let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
430 return None;
431 };
432 let extension = sidecar.extensions.get(&envelope.namespace)?.clone();
433 if !extension.is_blocking_request(&envelope.payload) {
434 return None;
435 }
436 Some(BlockingExtensionRequest {
437 namespace: envelope.namespace.clone(),
438 payload: envelope.payload.clone(),
439 extension,
440 })
441}
442
443fn extension_interrupt_response(
444 blocking_request: &BlockingExtensionRequest,
445 active_request: &RequestFrame,
446 frame: &ProtocolFrame,
447) -> Option<ExtensionInterruptDispatch> {
448 match frame {
449 ProtocolFrame::RequestFrame(request) => {
450 if request.ownership != active_request.ownership {
451 return None;
452 }
453 let interrupt = match &request.payload {
454 RequestPayload::ExtEnvelope(envelope)
455 if envelope.namespace == blocking_request.namespace =>
456 {
457 blocking_request.extension.interrupt_blocking_request(
458 &blocking_request.payload,
459 ExtensionInterruptRequest::ExtensionPayload(&envelope.payload),
460 )?
461 }
462 RequestPayload::ExtEnvelope(_) => return None,
463 RequestPayload::KillProcessRequest(_) => {
464 blocking_request.extension.interrupt_blocking_request(
465 &blocking_request.payload,
466 ExtensionInterruptRequest::KillProcess,
467 )?
468 }
469 RequestPayload::AuthenticateRequest(_)
474 | RequestPayload::OpenSessionRequest(_)
475 | RequestPayload::CreateVmRequest(_)
476 | RequestPayload::DisposeVmRequest(_)
477 | RequestPayload::BootstrapRootFilesystemRequest(_)
478 | RequestPayload::ConfigureVmRequest(_)
479 | RequestPayload::RegisterHostCallbacksRequest(_)
480 | RequestPayload::CreateLayerRequest
481 | RequestPayload::SealLayerRequest(_)
482 | RequestPayload::ImportSnapshotRequest(_)
483 | RequestPayload::ExportSnapshotRequest(_)
484 | RequestPayload::CreateOverlayRequest(_)
485 | RequestPayload::GuestFilesystemCallRequest(_)
486 | RequestPayload::SnapshotRootFilesystemRequest
487 | RequestPayload::ExecuteRequest(_)
488 | RequestPayload::WriteStdinRequest(_)
489 | RequestPayload::CloseStdinRequest(_)
490 | RequestPayload::GetProcessSnapshotRequest
491 | RequestPayload::FindListenerRequest(_)
492 | RequestPayload::FindBoundUdpRequest(_)
493 | RequestPayload::VmFetchRequest(_)
494 | RequestPayload::GetSignalStateRequest(_)
495 | RequestPayload::GetZombieTimerCountRequest
496 | RequestPayload::HostFilesystemCallRequest(_)
497 | RequestPayload::PersistenceLoadRequest(_)
498 | RequestPayload::PersistenceFlushRequest(_) => return None,
499 };
500 let interrupted_dispatch = interrupted_extension_dispatch(
501 active_request,
502 &blocking_request.namespace,
503 interrupt.interrupted_response_payload,
504 );
505 let interrupting_response = interrupt.interrupting_response_payload.map(|payload| {
506 response_frame(
507 request.request_id,
508 request.ownership.clone(),
509 ResponsePayload::ExtEnvelope(ExtEnvelope {
510 namespace: blocking_request.namespace.clone(),
511 payload,
512 }),
513 )
514 });
515 Some(ExtensionInterruptDispatch {
516 interrupted_dispatch,
517 interrupting_response,
518 })
519 }
520 ProtocolFrame::ResponseFrame(_)
526 | ProtocolFrame::EventFrame(_)
527 | ProtocolFrame::SidecarRequestFrame(_)
528 | ProtocolFrame::SidecarResponseFrame(_) => None,
529 }
530}
531
532fn interrupted_extension_dispatch(
533 request: &RequestFrame,
534 namespace: &str,
535 payload: Vec<u8>,
536) -> WireDispatchResult {
537 match &request.payload {
538 RequestPayload::ExtEnvelope(_) => {
539 let response = ResponsePayload::ExtEnvelope(ExtEnvelope {
540 namespace: namespace.to_string(),
541 payload,
542 });
543 WireDispatchResult {
544 response: response_frame(request.request_id, request.ownership.clone(), response),
545 events: Vec::new(),
546 }
547 }
548 RequestPayload::AuthenticateRequest(_)
549 | RequestPayload::OpenSessionRequest(_)
550 | RequestPayload::CreateVmRequest(_)
551 | RequestPayload::DisposeVmRequest(_)
552 | RequestPayload::BootstrapRootFilesystemRequest(_)
553 | RequestPayload::ConfigureVmRequest(_)
554 | RequestPayload::RegisterHostCallbacksRequest(_)
555 | RequestPayload::CreateLayerRequest
556 | RequestPayload::SealLayerRequest(_)
557 | RequestPayload::ImportSnapshotRequest(_)
558 | RequestPayload::ExportSnapshotRequest(_)
559 | RequestPayload::CreateOverlayRequest(_)
560 | RequestPayload::GuestFilesystemCallRequest(_)
561 | RequestPayload::SnapshotRootFilesystemRequest
562 | RequestPayload::ExecuteRequest(_)
563 | RequestPayload::WriteStdinRequest(_)
564 | RequestPayload::CloseStdinRequest(_)
565 | RequestPayload::KillProcessRequest(_)
566 | RequestPayload::GetProcessSnapshotRequest
567 | RequestPayload::FindListenerRequest(_)
568 | RequestPayload::FindBoundUdpRequest(_)
569 | RequestPayload::VmFetchRequest(_)
570 | RequestPayload::GetSignalStateRequest(_)
571 | RequestPayload::GetZombieTimerCountRequest
572 | RequestPayload::HostFilesystemCallRequest(_)
573 | RequestPayload::PersistenceLoadRequest(_)
574 | RequestPayload::PersistenceFlushRequest(_) => {
575 unreachable!("interrupted extension dispatch requires an extension request");
576 }
577 }
578}
579
580async fn cleanup_connections(
581 sidecar: &mut NativeSidecar<LocalBridge>,
582 active_connections: &BTreeSet<String>,
583) {
584 for connection_id in active_connections {
585 let _ = sidecar.remove_connection(connection_id).await;
586 }
587}
588
589fn track_session_state(
590 payload: &ResponsePayload,
591 active_sessions: &mut BTreeSet<SessionScope>,
592 active_connections: &mut BTreeSet<String>,
593) {
594 match payload {
595 ResponsePayload::AuthenticatedResponse(AuthenticatedResponse { connection_id, .. }) => {
596 active_connections.insert(connection_id.clone());
597 }
598 ResponsePayload::SessionOpenedResponse(SessionOpenedResponse {
599 session_id,
600 owner_connection_id,
601 }) => {
602 active_sessions.insert(SessionScope {
603 connection_id: owner_connection_id.clone(),
604 session_id: session_id.clone(),
605 });
606 }
607 _ => {}
608 }
609}
610
611fn read_frame(
612 codec: &WireFrameCodec,
613 reader: &mut impl Read,
614) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
615 let mut prefix = [0u8; 4];
616 match reader.read_exact(&mut prefix) {
617 Ok(()) => {}
618 Err(error) if error.kind() == io::ErrorKind::UnexpectedEof => {
619 return Ok(None);
620 }
621 Err(error) => return Err(error.into()),
622 }
623
624 let declared_len = u32::from_be_bytes(prefix) as usize;
625 if declared_len > codec.max_frame_bytes() {
626 return Err(ProtocolCodecError::FrameTooLarge {
627 size: declared_len,
628 max: codec.max_frame_bytes(),
629 }
630 .into());
631 }
632 let total_len = prefix.len().saturating_add(declared_len);
633 let mut bytes = Vec::with_capacity(total_len);
634 bytes.extend_from_slice(&prefix);
635 bytes.resize(total_len, 0);
636 reader.read_exact(&mut bytes[prefix.len()..])?;
637
638 Ok(Some(codec.decode(&bytes)?))
639}
640
641fn write_frame(
642 codec: &WireFrameCodec,
643 writer: &mut impl Write,
644 frame: &ProtocolFrame,
645) -> Result<(), Box<dyn Error>> {
646 let bytes = codec.encode(frame)?;
647 writer.write_all(&bytes)?;
648 writer.flush()?;
649 Ok(())
650}
651
652fn frame_kind(frame: &ProtocolFrame) -> &'static str {
653 match frame {
654 ProtocolFrame::RequestFrame(_) => "request",
655 ProtocolFrame::ResponseFrame(_) => "response",
656 ProtocolFrame::EventFrame(_) => "event",
657 ProtocolFrame::SidecarRequestFrame(_) => "sidecar_request",
658 ProtocolFrame::SidecarResponseFrame(_) => "sidecar_response",
659 }
660}
661
662#[derive(Debug, Clone, PartialEq, Eq)]
663enum StdinFrameQueueError {
664 Full(String),
665 Closed,
666}
667
668fn enqueue_stdin_frame(
669 sender: &tokio::sync::mpsc::Sender<Result<Option<ProtocolFrame>, String>>,
670 frame: Result<Option<ProtocolFrame>, String>,
671) -> Result<(), StdinFrameQueueError> {
672 sender.try_send(frame).map_err(|error| match error {
673 tokio::sync::mpsc::error::TrySendError::Full(_) => StdinFrameQueueError::Full(format!(
674 "stdin frame queue exceeded {MAX_STDIN_FRAME_QUEUE} pending frames"
675 )),
676 tokio::sync::mpsc::error::TrySendError::Closed(_) => StdinFrameQueueError::Closed,
677 })
678}
679
680fn flush_sidecar_requests(
681 sidecar: &mut NativeSidecar<LocalBridge>,
682 writer: &TrackedSyncSender<ProtocolFrame>,
683) -> Result<(), Box<dyn Error>> {
684 while let Some(request) = sidecar.pop_wire_sidecar_request()? {
685 send_output_frame(writer, ProtocolFrame::SidecarRequestFrame(request))?;
686 }
687 Ok(())
688}
689
690fn send_output_frame(
691 writer: &TrackedSyncSender<ProtocolFrame>,
692 frame: ProtocolFrame,
693) -> Result<(), io::Error> {
694 writer.send(frame).map_err(|_disconnected| {
705 io::Error::new(io::ErrorKind::BrokenPipe, "stdout writer disconnected")
706 })
707}
708
709fn default_compile_cache_root() -> PathBuf {
710 std::env::temp_dir().join("secure-exec-sidecar-compile-cache")
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722 use crate::wire::{AuthenticateRequest, KillProcessRequest};
723 use crate::{ExtensionContext, ExtensionFuture, ExtensionInterruptResponse, ExtensionResponse};
724 use std::io::Cursor;
725
726 const TEST_EXTENSION_NAMESPACE: &str = "dev.rivet.secure-exec.test.blocking";
727
728 #[test]
729 fn read_frame_rejects_oversized_prefix_before_allocating_payload() {
730 let codec = WireFrameCodec::new(16);
731 let mut reader = Cursor::new((32_u32).to_be_bytes().to_vec());
732
733 let error = read_frame(&codec, &mut reader).expect_err("oversized frame should fail");
734 let error = error
735 .downcast::<ProtocolCodecError>()
736 .expect("protocol codec error");
737 assert!(matches!(
738 *error,
739 ProtocolCodecError::FrameTooLarge { size: 32, max: 16 }
740 ));
741 }
742
743 #[test]
744 fn stdio_work_queues_are_bounded() {
745 let (stdin_tx, _stdin_rx) =
746 channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
747 for _ in 0..MAX_STDIN_FRAME_QUEUE {
748 enqueue_stdin_frame(&stdin_tx, Ok(None))
749 .expect("stdin frame queue should accept capacity");
750 }
751 assert!(matches!(
752 enqueue_stdin_frame(&stdin_tx, Ok(None)),
753 Err(StdinFrameQueueError::Full(_))
754 ));
755
756 let (event_ready_tx, _event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
757 event_ready_tx
758 .try_send(())
759 .expect("event-ready queue should accept capacity");
760 assert!(matches!(
761 event_ready_tx.try_send(()),
762 Err(tokio::sync::mpsc::error::TrySendError::Full(_))
763 ));
764 }
765
766 #[test]
773 fn stdout_frame_queue_applies_backpressure_instead_of_crashing() {
774 let queue_frame = |request_id: RequestId| {
775 ProtocolFrame::RequestFrame(request_frame(
776 request_id,
777 connection_ownership("conn-queue"),
778 RequestPayload::AuthenticateRequest(AuthenticateRequest {
779 client_name: String::from("queue-test"),
780 auth_token: String::from("token"),
781 protocol_version: wire::PROTOCOL_VERSION,
782 bridge_version: secure_exec_bridge::bridge_contract().version,
783 }),
784 ))
785 };
786
787 let queue_cap = 8usize;
792 let total_frames = queue_cap * 3;
793 let (stdout_tx, stdout_rx) =
794 tracked_sync_channel::<ProtocolFrame>(TrackedLimit::SidecarStdoutFrames, queue_cap);
795 let drainer = std::thread::spawn(move || {
796 let mut drained = 0usize;
797 while stdout_rx.recv().is_ok() {
798 drained += 1;
799 std::thread::sleep(std::time::Duration::from_millis(1));
800 }
801 drained
802 });
803
804 for request_id in 0..total_frames {
805 send_output_frame(&stdout_tx, queue_frame(request_id as RequestId))
806 .expect("backpressured stdout queue must accept frames, not crash");
807 }
808 drop(stdout_tx);
809 let drained = drainer.join().expect("drainer thread panicked");
810 assert_eq!(
811 drained, total_frames,
812 "every frame must survive the backpressured queue"
813 );
814
815 let (closed_tx, closed_rx) =
818 tracked_sync_channel::<ProtocolFrame>(TrackedLimit::SidecarStdoutFrames, queue_cap);
819 drop(closed_rx);
820 let error = send_output_frame(&closed_tx, queue_frame(0))
821 .expect_err("send to a dropped writer must error");
822 assert_eq!(error.kind(), io::ErrorKind::BrokenPipe);
823 }
824
825 #[test]
826 fn read_frame_decodes_wire_authenticate_request() {
827 let codec = WireFrameCodec::new(wire::DEFAULT_MAX_FRAME_BYTES);
828 let frame = ProtocolFrame::RequestFrame(request_frame(
829 1,
830 connection_ownership("client-hint"),
831 RequestPayload::AuthenticateRequest(AuthenticateRequest {
832 client_name: "probe".to_string(),
833 auth_token: "probe-token".to_string(),
834 protocol_version: wire::PROTOCOL_VERSION,
835 bridge_version: secure_exec_bridge::bridge_contract().version,
836 }),
837 ));
838 let encoded = codec.encode(&frame).expect("encode wire frame");
839 let mut reader = Cursor::new(encoded);
840
841 let decoded = read_frame(&codec, &mut reader)
842 .expect("decode bare frame")
843 .expect("frame present");
844
845 assert_eq!(decoded, frame);
846 }
847
848 #[test]
849 fn extension_close_interrupts_matching_blocking_request() {
850 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
851 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
852 let close = ProtocolFrame::RequestFrame(test_extension_request_frame(
853 11,
854 ownership,
855 "close:ext-session-1",
856 ));
857
858 let blocking_request = blocking_extension_request(&prompt);
859 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &close)
860 .expect("close should interrupt prompt");
861
862 assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
863 let ResponsePayload::ExtEnvelope(envelope) =
864 interrupt.interrupted_dispatch.response.payload
865 else {
866 panic!("expected extension response");
867 };
868 assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
869 assert_eq!(envelope.payload, b"prompt-cancelled:ext-session-1");
870 }
871
872 #[test]
873 fn extension_cancel_interrupt_gets_synthetic_response() {
874 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
875 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
876 let cancel = ProtocolFrame::RequestFrame(test_extension_request_frame(
877 11,
878 ownership,
879 "cancel:ext-session-1",
880 ));
881
882 let blocking_request = blocking_extension_request(&prompt);
883 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &cancel)
884 .expect("cancel should interrupt prompt");
885 let response = interrupt
886 .interrupting_response
887 .expect("cancel should get a response");
888
889 assert_eq!(response.request_id, 11);
890 let ResponsePayload::ExtEnvelope(envelope) = response.payload else {
891 panic!("expected extension response");
892 };
893 assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
894 assert_eq!(envelope.payload, b"cancelled:ext-session-1");
895 }
896
897 #[test]
898 fn kill_process_interrupts_blocking_extension_request() {
899 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
900 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
901 let kill = ProtocolFrame::RequestFrame(request_frame(
902 11,
903 ownership,
904 RequestPayload::KillProcessRequest(KillProcessRequest {
905 process_id: "adapter-process".to_string(),
906 signal: "SIGTERM".to_string(),
907 }),
908 ));
909
910 let blocking_request = blocking_extension_request(&prompt);
911 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &kill)
912 .expect("kill should interrupt prompt");
913
914 assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
915 assert!(interrupt.interrupting_response.is_none());
916 }
917
918 fn test_extension_request_frame(
919 request_id: RequestId,
920 ownership: OwnershipScope,
921 payload: &str,
922 ) -> RequestFrame {
923 request_frame(
924 request_id,
925 ownership,
926 RequestPayload::ExtEnvelope(ExtEnvelope {
927 namespace: TEST_EXTENSION_NAMESPACE.to_string(),
928 payload: payload.as_bytes().to_vec(),
929 }),
930 )
931 }
932
933 fn blocking_extension_request(request: &RequestFrame) -> BlockingExtensionRequest {
934 let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
935 panic!("expected extension request");
936 };
937 BlockingExtensionRequest {
938 namespace: TEST_EXTENSION_NAMESPACE.to_string(),
939 payload: envelope.payload.clone(),
940 extension: Arc::new(TestBlockingInterruptExtension),
941 }
942 }
943
944 struct TestBlockingInterruptExtension;
945
946 impl Extension for TestBlockingInterruptExtension {
947 fn namespace(&self) -> &str {
948 TEST_EXTENSION_NAMESPACE
949 }
950
951 fn handle_request<'a>(
952 &'a self,
953 _ctx: ExtensionContext<'a>,
954 _payload: Vec<u8>,
955 ) -> ExtensionFuture<'a, ExtensionResponse> {
956 Box::pin(async { Ok(ExtensionResponse::new(Vec::new())) })
957 }
958
959 fn is_blocking_request(&self, payload: &[u8]) -> bool {
960 parse_test_payload(payload).is_some_and(|(kind, _session_id)| kind == "prompt")
961 }
962
963 fn interrupt_blocking_request(
964 &self,
965 blocking_payload: &[u8],
966 interrupt: ExtensionInterruptRequest<'_>,
967 ) -> Option<ExtensionInterruptResponse> {
968 let (blocking_kind, blocking_session_id) = parse_test_payload(blocking_payload)?;
969 if blocking_kind != "prompt" {
970 return None;
971 }
972
973 let interrupted_response_payload =
974 encode_test_response("prompt-cancelled", blocking_session_id);
975 match interrupt {
976 ExtensionInterruptRequest::KillProcess => Some(ExtensionInterruptResponse {
977 interrupted_response_payload,
978 interrupting_response_payload: None,
979 }),
980 ExtensionInterruptRequest::ExtensionPayload(payload) => {
981 let (interrupt_kind, interrupt_session_id) = parse_test_payload(payload)?;
982 match interrupt_kind {
983 "close" if interrupt_session_id == blocking_session_id => {
984 Some(ExtensionInterruptResponse {
985 interrupted_response_payload,
986 interrupting_response_payload: None,
987 })
988 }
989 "cancel" if interrupt_session_id == blocking_session_id => {
990 Some(ExtensionInterruptResponse {
991 interrupted_response_payload,
992 interrupting_response_payload: Some(encode_test_response(
993 "cancelled",
994 interrupt_session_id,
995 )),
996 })
997 }
998 "prompt" | "close" | "cancel" => None,
999 _ => None,
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 fn parse_test_payload(payload: &[u8]) -> Option<(&str, &str)> {
1007 let payload = std::str::from_utf8(payload).ok()?;
1008 payload.split_once(':')
1009 }
1010
1011 fn encode_test_response(kind: &str, session_id: &str) -> Vec<u8> {
1012 format!("{kind}:{session_id}").into_bytes()
1013 }
1014}
1015
1016#[derive(Debug, Clone)]
1017struct LocalBridge {
1018 started_at: Instant,
1019 next_timer_id: usize,
1020 snapshots: BTreeMap<String, FilesystemSnapshot>,
1021}
1022
1023impl Default for LocalBridge {
1024 fn default() -> Self {
1025 Self {
1026 started_at: Instant::now(),
1027 next_timer_id: 0,
1028 snapshots: BTreeMap::new(),
1029 }
1030 }
1031}
1032
1033impl BridgeTypes for LocalBridge {
1034 type Error = LocalBridgeError;
1035}
1036
1037impl FilesystemBridge for LocalBridge {
1038 fn read_file(&mut self, request: ReadFileRequest) -> Result<Vec<u8>, Self::Error> {
1039 fs::read(Self::host_path(&request.path))
1040 .map_err(|error| LocalBridgeError::io("read", &request.path, error))
1041 }
1042
1043 fn write_file(&mut self, request: WriteFileRequest) -> Result<(), Self::Error> {
1044 let host_path = Self::host_path(&request.path);
1045 if let Some(parent) = host_path.parent() {
1046 fs::create_dir_all(parent)
1047 .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))?;
1048 }
1049 fs::write(host_path, request.contents)
1050 .map_err(|error| LocalBridgeError::io("write", &request.path, error))
1051 }
1052
1053 fn stat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
1054 fs::metadata(Self::host_path(&request.path))
1055 .map(Self::file_metadata)
1056 .map_err(|error| LocalBridgeError::io("stat", &request.path, error))
1057 }
1058
1059 fn lstat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
1060 fs::symlink_metadata(Self::host_path(&request.path))
1061 .map(Self::file_metadata)
1062 .map_err(|error| LocalBridgeError::io("lstat", &request.path, error))
1063 }
1064
1065 fn read_dir(&mut self, request: ReadDirRequest) -> Result<Vec<DirectoryEntry>, Self::Error> {
1066 let mut entries = fs::read_dir(Self::host_path(&request.path))
1067 .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?
1068 .map(|entry| {
1069 let entry =
1070 entry.map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
1071 let kind = entry
1072 .file_type()
1073 .map(Self::file_kind)
1074 .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
1075 Ok(DirectoryEntry {
1076 name: entry.file_name().to_string_lossy().into_owned(),
1077 kind,
1078 })
1079 })
1080 .collect::<Result<Vec<_>, LocalBridgeError>>()?;
1081 entries.sort_by(|left, right| left.name.cmp(&right.name));
1082 Ok(entries)
1083 }
1084
1085 fn create_dir(&mut self, request: CreateDirRequest) -> Result<(), Self::Error> {
1086 let host_path = Self::host_path(&request.path);
1087 if request.recursive {
1088 fs::create_dir_all(host_path)
1089 } else {
1090 fs::create_dir(host_path)
1091 }
1092 .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))
1093 }
1094
1095 fn remove_file(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1096 fs::remove_file(Self::host_path(&request.path))
1097 .map_err(|error| LocalBridgeError::io("unlink", &request.path, error))
1098 }
1099
1100 fn remove_dir(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1101 fs::remove_dir(Self::host_path(&request.path))
1102 .map_err(|error| LocalBridgeError::io("rmdir", &request.path, error))
1103 }
1104
1105 fn rename(&mut self, request: RenameRequest) -> Result<(), Self::Error> {
1106 let from_path = Self::host_path(&request.from_path);
1107 let to_path = Self::host_path(&request.to_path);
1108 if let Some(parent) = to_path.parent() {
1109 fs::create_dir_all(parent)
1110 .map_err(|error| LocalBridgeError::io("mkdir", &request.to_path, error))?;
1111 }
1112 fs::rename(from_path, to_path).map_err(|error| {
1113 LocalBridgeError::unsupported(format!(
1114 "rename {} -> {}: {}",
1115 request.from_path, request.to_path, error
1116 ))
1117 })
1118 }
1119
1120 fn symlink(&mut self, request: SymlinkRequest) -> Result<(), Self::Error> {
1121 let link_path = Self::host_path(&request.link_path);
1122 if let Some(parent) = link_path.parent() {
1123 fs::create_dir_all(parent)
1124 .map_err(|error| LocalBridgeError::io("mkdir", &request.link_path, error))?;
1125 }
1126 create_symlink(&request.target_path, link_path)
1127 .map_err(|error| LocalBridgeError::io("symlink", &request.link_path, error))
1128 }
1129
1130 fn read_link(&mut self, request: PathRequest) -> Result<String, Self::Error> {
1131 fs::read_link(Self::host_path(&request.path))
1132 .map(|target| target.to_string_lossy().into_owned())
1133 .map_err(|error| LocalBridgeError::io("readlink", &request.path, error))
1134 }
1135
1136 fn chmod(&mut self, request: ChmodRequest) -> Result<(), Self::Error> {
1137 let permissions = fs::Permissions::from_mode(request.mode);
1138 fs::set_permissions(Self::host_path(&request.path), permissions)
1139 .map_err(|error| LocalBridgeError::io("chmod", &request.path, error))
1140 }
1141
1142 fn truncate(&mut self, request: TruncateRequest) -> Result<(), Self::Error> {
1143 OpenOptions::new()
1144 .write(true)
1145 .create(false)
1146 .open(Self::host_path(&request.path))
1147 .and_then(|file| file.set_len(request.len))
1148 .map_err(|error| LocalBridgeError::io("truncate", &request.path, error))
1149 }
1150
1151 fn exists(&mut self, request: PathRequest) -> Result<bool, Self::Error> {
1152 Ok(fs::symlink_metadata(Self::host_path(&request.path)).is_ok())
1153 }
1154}
1155
1156impl PermissionBridge for LocalBridge {
1157 fn check_filesystem_access(
1158 &mut self,
1159 request: FilesystemPermissionRequest,
1160 ) -> Result<PermissionDecision, Self::Error> {
1161 Ok(PermissionDecision::deny(format!(
1162 "no static filesystem policy registered for {}:{}",
1163 request.vm_id, request.path
1164 )))
1165 }
1166
1167 fn check_network_access(
1168 &mut self,
1169 request: NetworkPermissionRequest,
1170 ) -> Result<PermissionDecision, Self::Error> {
1171 Ok(PermissionDecision::deny(format!(
1172 "no static network policy registered for {}:{}",
1173 request.vm_id, request.resource
1174 )))
1175 }
1176
1177 fn check_command_execution(
1178 &mut self,
1179 request: CommandPermissionRequest,
1180 ) -> Result<PermissionDecision, Self::Error> {
1181 Ok(PermissionDecision::deny(format!(
1182 "no static child_process policy registered for {}:{}",
1183 request.vm_id, request.command
1184 )))
1185 }
1186
1187 fn check_environment_access(
1188 &mut self,
1189 request: EnvironmentPermissionRequest,
1190 ) -> Result<PermissionDecision, Self::Error> {
1191 Ok(PermissionDecision::deny(format!(
1192 "no static env policy registered for {}:{}",
1193 request.vm_id, request.key
1194 )))
1195 }
1196}
1197
1198impl PersistenceBridge for LocalBridge {
1199 fn load_filesystem_state(
1200 &mut self,
1201 request: LoadFilesystemStateRequest,
1202 ) -> Result<Option<FilesystemSnapshot>, Self::Error> {
1203 Ok(self.snapshots.get(&request.vm_id).cloned())
1204 }
1205
1206 fn flush_filesystem_state(
1207 &mut self,
1208 request: FlushFilesystemStateRequest,
1209 ) -> Result<(), Self::Error> {
1210 self.snapshots.insert(request.vm_id, request.snapshot);
1211 Ok(())
1212 }
1213}
1214
1215impl ClockBridge for LocalBridge {
1216 fn wall_clock(&mut self, _request: ClockRequest) -> Result<SystemTime, Self::Error> {
1217 Ok(SystemTime::now())
1218 }
1219
1220 fn monotonic_clock(&mut self, _request: ClockRequest) -> Result<Duration, Self::Error> {
1221 Ok(self.started_at.elapsed())
1222 }
1223
1224 fn schedule_timer(
1225 &mut self,
1226 request: ScheduleTimerRequest,
1227 ) -> Result<ScheduledTimer, Self::Error> {
1228 self.next_timer_id += 1;
1229 Ok(ScheduledTimer {
1230 timer_id: format!("timer-{}", self.next_timer_id),
1231 delay: request.delay,
1232 })
1233 }
1234}
1235
1236impl RandomBridge for LocalBridge {
1237 fn fill_random_bytes(&mut self, request: RandomBytesRequest) -> Result<Vec<u8>, Self::Error> {
1238 Ok(vec![0u8; request.len])
1239 }
1240}
1241
1242impl EventBridge for LocalBridge {
1243 fn emit_structured_event(&mut self, _event: StructuredEventRecord) -> Result<(), Self::Error> {
1244 Ok(())
1245 }
1246
1247 fn emit_diagnostic(&mut self, _event: DiagnosticRecord) -> Result<(), Self::Error> {
1248 Ok(())
1249 }
1250
1251 fn emit_log(&mut self, _event: LogRecord) -> Result<(), Self::Error> {
1252 Ok(())
1253 }
1254
1255 fn emit_lifecycle(&mut self, _event: LifecycleEventRecord) -> Result<(), Self::Error> {
1256 Ok(())
1257 }
1258}
1259
1260impl ExecutionBridge for LocalBridge {
1261 fn create_javascript_context(
1262 &mut self,
1263 _request: CreateJavascriptContextRequest,
1264 ) -> Result<GuestContextHandle, Self::Error> {
1265 Err(LocalBridgeError::unsupported(
1266 "execution bridge is handled internally by the native sidecar",
1267 ))
1268 }
1269
1270 fn create_wasm_context(
1271 &mut self,
1272 _request: CreateWasmContextRequest,
1273 ) -> Result<GuestContextHandle, Self::Error> {
1274 Err(LocalBridgeError::unsupported(
1275 "execution bridge is handled internally by the native sidecar",
1276 ))
1277 }
1278
1279 fn start_execution(
1280 &mut self,
1281 _request: StartExecutionRequest,
1282 ) -> Result<StartedExecution, Self::Error> {
1283 Err(LocalBridgeError::unsupported(
1284 "execution bridge is handled internally by the native sidecar",
1285 ))
1286 }
1287
1288 fn write_stdin(&mut self, _request: WriteExecutionStdinRequest) -> Result<(), Self::Error> {
1289 Err(LocalBridgeError::unsupported(
1290 "execution bridge is handled internally by the native sidecar",
1291 ))
1292 }
1293
1294 fn close_stdin(&mut self, _request: ExecutionHandleRequest) -> Result<(), Self::Error> {
1295 Err(LocalBridgeError::unsupported(
1296 "execution bridge is handled internally by the native sidecar",
1297 ))
1298 }
1299
1300 fn kill_execution(&mut self, _request: KillExecutionRequest) -> Result<(), Self::Error> {
1301 Err(LocalBridgeError::unsupported(
1302 "execution bridge is handled internally by the native sidecar",
1303 ))
1304 }
1305
1306 fn poll_execution_event(
1307 &mut self,
1308 _request: PollExecutionEventRequest,
1309 ) -> Result<Option<ExecutionEvent>, Self::Error> {
1310 Ok(None)
1311 }
1312}
1313
1314#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1315struct SessionScope {
1316 connection_id: String,
1317 session_id: String,
1318}
1319
1320impl SessionScope {
1321 fn ownership_scope(&self) -> OwnershipScope {
1322 session_ownership(&self.connection_id, &self.session_id)
1323 }
1324
1325 fn compat_ownership_scope(&self) -> crate::protocol::OwnershipScope {
1326 wire::ownership_scope_to_compat(self.ownership_scope())
1327 }
1328}
1329
1330struct FrameSidecarRequestTransport {
1331 writer: TrackedSyncSender<ProtocolFrame>,
1332 pending: Arc<Mutex<BTreeMap<RequestId, mpsc::SyncSender<SidecarResponseFrame>>>>,
1333}
1334
1335impl FrameSidecarRequestTransport {
1336 fn new(writer: TrackedSyncSender<ProtocolFrame>) -> Self {
1337 Self {
1338 writer,
1339 pending: Arc::new(Mutex::new(BTreeMap::new())),
1340 }
1341 }
1342
1343 fn accept_response(&self, response: SidecarResponseFrame) -> bool {
1344 let sender = {
1345 let mut pending = match self.pending.lock() {
1346 Ok(pending) => pending,
1347 Err(_) => return false,
1348 };
1349 pending.remove(&response.request_id)
1350 };
1351 let Some(sender) = sender else {
1352 return false;
1353 };
1354 let _ = sender.send(response);
1355 true
1356 }
1357}
1358
1359impl SidecarRequestTransport for FrameSidecarRequestTransport {
1360 fn send_request(
1361 &self,
1362 request: crate::protocol::SidecarRequestFrame,
1363 timeout: Duration,
1364 ) -> Result<crate::protocol::SidecarResponseFrame, SidecarError> {
1365 let request =
1366 wire::sidecar_request_frame_from_compat(request).map_err(wire_protocol_error)?;
1367 let (sender, receiver) = mpsc::sync_channel(1);
1368 self.pending
1369 .lock()
1370 .map_err(|_| {
1371 SidecarError::Bridge(String::from("sidecar callback waiter map lock poisoned"))
1372 })?
1373 .insert(request.request_id, sender);
1374 let write_deadline = Instant::now() + timeout;
1381 let mut frame = ProtocolFrame::SidecarRequestFrame(request.clone());
1382 let write_result = loop {
1383 match self.writer.try_send(frame) {
1384 Ok(()) => break Ok(()),
1385 Err(mpsc::TrySendError::Disconnected(_)) => {
1386 break Err(String::from("stdout writer disconnected"));
1387 }
1388 Err(mpsc::TrySendError::Full(returned)) => {
1389 if Instant::now() >= write_deadline {
1390 break Err(format!(
1391 "timed out writing sidecar request frame after {}s",
1392 timeout.as_secs()
1393 ));
1394 }
1395 frame = returned;
1396 thread::sleep(Duration::from_millis(1));
1397 }
1398 }
1399 };
1400 if let Err(message) = write_result {
1401 let _ = self
1402 .pending
1403 .lock()
1404 .map(|mut pending| pending.remove(&request.request_id));
1405 return Err(SidecarError::Io(format!(
1406 "failed to write sidecar request frame: {message}"
1407 )));
1408 }
1409 match receiver.recv_timeout(timeout) {
1410 Ok(response) => {
1411 wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)
1412 }
1413 Err(mpsc::RecvTimeoutError::Timeout) => {
1414 let _ = self
1415 .pending
1416 .lock()
1417 .map(|mut pending| pending.remove(&request.request_id));
1418 Err(SidecarError::Io(format!(
1419 "timed out waiting for sidecar response after {}s",
1420 timeout.as_secs()
1421 )))
1422 }
1423 Err(mpsc::RecvTimeoutError::Disconnected) => Err(SidecarError::Io(String::from(
1424 "sidecar response waiter disconnected",
1425 ))),
1426 }
1427 }
1428}
1429
1430#[derive(Debug, Clone, PartialEq, Eq)]
1431struct LocalBridgeError {
1432 message: String,
1433}
1434
1435impl LocalBridgeError {
1436 fn unsupported(message: impl Into<String>) -> Self {
1437 Self {
1438 message: message.into(),
1439 }
1440 }
1441
1442 fn io(operation: &str, path: &str, error: io::Error) -> Self {
1443 Self::unsupported(format!("{operation} {path}: {error}"))
1444 }
1445}
1446
1447impl fmt::Display for LocalBridgeError {
1448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1449 f.write_str(&self.message)
1450 }
1451}
1452
1453impl Error for LocalBridgeError {}
1454
1455impl LocalBridge {
1456 fn host_path(path: &str) -> PathBuf {
1457 let candidate = Path::new(path);
1458 if candidate.is_absolute() {
1459 candidate.to_path_buf()
1460 } else {
1461 std::env::current_dir()
1462 .unwrap_or_else(|_| PathBuf::from("."))
1463 .join(candidate)
1464 }
1465 }
1466
1467 fn file_metadata(metadata: fs::Metadata) -> FileMetadata {
1468 FileMetadata {
1469 mode: metadata.permissions().mode(),
1470 size: metadata.size(),
1471 kind: Self::file_kind(metadata.file_type()),
1472 }
1473 }
1474
1475 fn file_kind(file_type: fs::FileType) -> secure_exec_bridge::FileKind {
1476 if file_type.is_file() {
1477 secure_exec_bridge::FileKind::File
1478 } else if file_type.is_dir() {
1479 secure_exec_bridge::FileKind::Directory
1480 } else if file_type.is_symlink() {
1481 secure_exec_bridge::FileKind::SymbolicLink
1482 } else {
1483 secure_exec_bridge::FileKind::Other
1484 }
1485 }
1486}