1use crate::wire::{
2 self, AuthenticatedResponse, ExtEnvelope, OwnershipScope, ProtocolCodecError, ProtocolFrame,
3 RequestFrame, RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
4 SidecarResponseFrame, WireDispatchResult, WireFrameCodec,
5};
6use crate::{
7 Extension, ExtensionInterruptRequest, NativeSidecar, NativeSidecarConfig, SidecarError,
8 SidecarRequestTransport,
9};
10use secure_exec_bridge::{
11 BridgeTypes, ChmodRequest, ClockBridge, ClockRequest, CommandPermissionRequest,
12 CreateDirRequest, CreateJavascriptContextRequest, CreateWasmContextRequest, DiagnosticRecord,
13 DirectoryEntry, EnvironmentPermissionRequest, EventBridge, ExecutionBridge, ExecutionEvent,
14 ExecutionHandleRequest, FileMetadata, FilesystemBridge, FilesystemPermissionRequest,
15 FilesystemSnapshot, FlushFilesystemStateRequest, GuestContextHandle, KillExecutionRequest,
16 LifecycleEventRecord, LoadFilesystemStateRequest, LogRecord, NetworkPermissionRequest,
17 PathRequest, PermissionBridge, PermissionDecision, PersistenceBridge,
18 PollExecutionEventRequest, RandomBridge, RandomBytesRequest, ReadDirRequest, ReadFileRequest,
19 RenameRequest, ScheduleTimerRequest, ScheduledTimer, StartExecutionRequest, StartedExecution,
20 StructuredEventRecord, SymlinkRequest, TruncateRequest, WriteExecutionStdinRequest,
21 WriteFileRequest,
22};
23use std::collections::{BTreeMap, BTreeSet};
24use std::error::Error;
25use std::fmt;
26use std::fs::{self, OpenOptions};
27use std::io::{self, Read, Write};
28use std::os::unix::fs::{symlink as create_symlink, MetadataExt, PermissionsExt};
29use std::path::{Path, PathBuf};
30use std::sync::{mpsc, Arc, Mutex};
31use std::thread;
32use std::time::{Duration, Instant, SystemTime};
33use tokio::sync::mpsc::{channel, unbounded_channel, Receiver};
34use tokio::time;
35
36const EVENT_PUMP_INTERVAL: Duration = Duration::from_micros(250);
43const MAX_STDIN_FRAME_QUEUE: usize = 128;
44const MAX_EVENT_READY_QUEUE: usize = 1;
45const MAX_STDOUT_FRAME_QUEUE: usize = 128;
46
47#[cfg(test)]
48fn request_frame(
49 request_id: RequestId,
50 ownership: OwnershipScope,
51 payload: RequestPayload,
52) -> RequestFrame {
53 RequestFrame {
54 schema: wire::protocol_schema(),
55 request_id,
56 ownership,
57 payload,
58 }
59}
60
61fn response_frame(
62 request_id: RequestId,
63 ownership: OwnershipScope,
64 payload: ResponsePayload,
65) -> ResponseFrame {
66 ResponseFrame {
67 schema: wire::protocol_schema(),
68 request_id,
69 ownership,
70 payload,
71 }
72}
73
74#[cfg(test)]
75fn connection_ownership(connection_id: &str) -> OwnershipScope {
76 OwnershipScope::ConnectionOwnership(wire::ConnectionOwnership {
77 connection_id: connection_id.to_owned(),
78 })
79}
80
81fn session_ownership(connection_id: &str, session_id: &str) -> OwnershipScope {
82 OwnershipScope::SessionOwnership(wire::SessionOwnership {
83 connection_id: connection_id.to_owned(),
84 session_id: session_id.to_owned(),
85 })
86}
87
88#[cfg(test)]
89fn vm_ownership(connection_id: &str, session_id: &str, vm_id: &str) -> OwnershipScope {
90 OwnershipScope::VmOwnership(wire::VmOwnership {
91 connection_id: connection_id.to_owned(),
92 session_id: session_id.to_owned(),
93 vm_id: vm_id.to_owned(),
94 })
95}
96
97fn wire_protocol_error(error: ProtocolCodecError) -> SidecarError {
98 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
99}
100
101pub fn run() -> Result<(), Box<dyn Error>> {
102 run_with_extensions(Vec::new())
103}
104
105pub fn run_with_extensions(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
106 tokio::runtime::Builder::new_current_thread()
107 .enable_all()
108 .build()?
109 .block_on(run_async(extensions))
110}
111
112async fn run_async(extensions: Vec<Box<dyn Extension>>) -> Result<(), Box<dyn Error>> {
113 let config = NativeSidecarConfig {
114 compile_cache_root: Some(default_compile_cache_root()),
115 ..NativeSidecarConfig::default()
116 };
117 let codec = WireFrameCodec::new(config.max_frame_bytes);
118 let mut sidecar =
119 NativeSidecar::with_config_and_extensions(LocalBridge::default(), config, extensions)?;
120 let mut active_sessions = BTreeSet::<SessionScope>::new();
121 let mut active_connections = BTreeSet::<String>::new();
122 let (stdin_tx, mut stdin_rx) =
123 channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
124 let (event_ready_tx, mut event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
125 let (write_tx, write_rx) = mpsc::sync_channel::<ProtocolFrame>(MAX_STDOUT_FRAME_QUEUE);
126 let (write_error_tx, mut write_error_rx) = unbounded_channel::<String>();
127 let callback_transport = Arc::new(FrameSidecarRequestTransport::new(write_tx.clone()));
128 sidecar.set_sidecar_request_transport(callback_transport.clone());
129 let mut event_pump = time::interval(EVENT_PUMP_INTERVAL);
130 let writer_codec = codec.clone();
131 let reader_codec = codec.clone();
132 let writer_error_tx = write_error_tx.clone();
133 thread::spawn(move || {
134 let mut writer = io::BufWriter::new(io::stdout());
135 while let Ok(frame) = write_rx.recv() {
136 if let Err(error) = write_frame(&writer_codec, &mut writer, &frame) {
137 let _ = writer_error_tx.send(error.to_string());
138 break;
139 }
140 }
141 });
142
143 thread::spawn({
144 let callback_transport = callback_transport.clone();
145 let read_error_tx = write_error_tx.clone();
146 move || {
147 let mut stdin = io::stdin();
148 loop {
149 let frame = match read_frame(&reader_codec, &mut stdin) {
150 Ok(Some(ProtocolFrame::SidecarResponseFrame(response))) => {
151 if callback_transport.accept_response(response.clone()) {
152 continue;
153 }
154 Ok(Some(ProtocolFrame::SidecarResponseFrame(response)))
155 }
156 Ok(Some(frame)) => Ok(Some(frame)),
157 other => other,
158 }
159 .map_err(|error: Box<dyn Error>| error.to_string());
160 let should_stop = matches!(frame, Ok(None) | Err(_));
161 match enqueue_stdin_frame(&stdin_tx, frame) {
162 Ok(()) => {}
163 Err(StdinFrameQueueError::Full(message)) => {
164 let _ = read_error_tx.send(message);
165 break;
166 }
167 Err(StdinFrameQueueError::Closed) => break,
168 }
169 if should_stop {
170 break;
171 }
172 }
173 }
174 });
175
176 flush_sidecar_requests(&mut sidecar, &write_tx)?;
177 let mut pending_frame: Option<ProtocolFrame> = None;
178
179 loop {
180 if let Some(frame) = pending_frame.take() {
181 handle_protocol_frame(
182 frame,
183 &mut sidecar,
184 &mut stdin_rx,
185 &mut pending_frame,
186 &write_tx,
187 &mut active_sessions,
188 &mut active_connections,
189 )
190 .await?;
191 continue;
192 }
193
194 tokio::select! {
195 maybe_frame = stdin_rx.recv() => {
196 let Some(frame) = maybe_frame else {
197 break;
198 };
199 let Some(frame) = frame.map_err(io::Error::other)? else {
200 break;
201 };
202 handle_protocol_frame(
203 frame,
204 &mut sidecar,
205 &mut stdin_rx,
206 &mut pending_frame,
207 &write_tx,
208 &mut active_sessions,
209 &mut active_connections,
210 ).await?;
211 }
212 maybe_ready = event_ready_rx.recv() => {
213 let Some(()) = maybe_ready else {
214 break;
215 };
216 loop {
217 let mut emitted_frame = false;
218 for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
219 if let Some(frame) = sidecar
220 .poll_event_wire(&session.ownership_scope(), Duration::ZERO)
221 .await?
222 {
223 send_output_frame(&write_tx, ProtocolFrame::EventFrame(frame))?;
224 emitted_frame = true;
225 }
226 }
227
228 if !emitted_frame {
229 break;
230 }
231 }
232 flush_sidecar_requests(&mut sidecar, &write_tx)?;
233 }
234 _ = event_pump.tick() => {
235 for session in active_sessions.iter().cloned().collect::<Vec<_>>() {
236 if sidecar.pump_process_events(&session.compat_ownership_scope()).await? {
237 let _ = event_ready_tx.try_send(());
238 }
239 }
240 flush_sidecar_requests(&mut sidecar, &write_tx)?;
241 }
242 maybe_write_error = write_error_rx.recv() => {
243 if let Some(error) = maybe_write_error {
244 return Err(io::Error::new(io::ErrorKind::BrokenPipe, error).into());
245 }
246 }
247 }
248 }
249
250 cleanup_connections(&mut sidecar, &active_connections).await;
251 Ok(())
252}
253
254async fn handle_protocol_frame(
255 frame: ProtocolFrame,
256 sidecar: &mut NativeSidecar<LocalBridge>,
257 stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
258 pending_frame: &mut Option<ProtocolFrame>,
259 write_tx: &mpsc::SyncSender<ProtocolFrame>,
260 active_sessions: &mut BTreeSet<SessionScope>,
261 active_connections: &mut BTreeSet<String>,
262) -> Result<(), Box<dyn Error>> {
263 match frame {
264 ProtocolFrame::RequestFrame(request) => {
265 let (dispatch, extra_responses) =
266 dispatch_with_prompt_interrupt(sidecar, request.clone(), stdin_rx, pending_frame)
267 .await?;
268 track_session_state(
269 &dispatch.response.payload,
270 active_sessions,
271 active_connections,
272 );
273
274 send_output_frame(write_tx, ProtocolFrame::ResponseFrame(dispatch.response))?;
275 for response in extra_responses {
276 send_output_frame(write_tx, ProtocolFrame::ResponseFrame(response))?;
277 }
278 for event in dispatch.events {
279 send_output_frame(write_tx, ProtocolFrame::EventFrame(event))?;
280 }
281 flush_sidecar_requests(sidecar, write_tx)?;
282 }
283 ProtocolFrame::SidecarResponseFrame(response) => {
284 sidecar.accept_wire_sidecar_response(response)?;
285 flush_sidecar_requests(sidecar, write_tx)?;
286 }
287 other => {
288 return Err(format!(
289 "expected request or sidecar_response frame on stdin, received {}",
290 frame_kind(&other)
291 )
292 .into());
293 }
294 }
295 Ok(())
296}
297
298async fn dispatch_with_prompt_interrupt(
299 sidecar: &mut NativeSidecar<LocalBridge>,
300 request: RequestFrame,
301 stdin_rx: &mut Receiver<Result<Option<ProtocolFrame>, String>>,
302 pending_frame: &mut Option<ProtocolFrame>,
303) -> Result<(WireDispatchResult, Vec<ResponseFrame>), Box<dyn Error>> {
304 let Some(blocking_request) = blocking_extension_request(sidecar, &request) else {
305 return Ok((sidecar.dispatch_wire(request).await?, Vec::new()));
306 };
307
308 let mut dispatch = Box::pin(sidecar.dispatch_wire(request.clone()));
309 tokio::select! {
310 result = dispatch.as_mut() => Ok((result?, Vec::new())),
311 maybe_frame = stdin_rx.recv() => {
312 let frame = decode_stdin_frame(maybe_frame)?;
313 if let Some(frame) = frame {
314 if let Some(interrupt) = extension_interrupt_response(&blocking_request, &request, &frame) {
315 drop(dispatch);
316 let mut extra_responses = Vec::new();
317 if let Some(response) = interrupt.interrupting_response {
318 extra_responses.push(response);
319 } else {
320 *pending_frame = Some(frame);
321 }
322 return Ok((interrupt.interrupted_dispatch, extra_responses));
323 }
324 *pending_frame = Some(frame);
325 }
326 Ok((dispatch.await?, Vec::new()))
327 }
328 }
329}
330
331fn decode_stdin_frame(
332 maybe_frame: Option<Result<Option<ProtocolFrame>, String>>,
333) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
334 let Some(frame) = maybe_frame else {
335 return Ok(None);
336 };
337 Ok(frame.map_err(io::Error::other)?)
338}
339
340struct BlockingExtensionRequest {
341 namespace: String,
342 payload: Vec<u8>,
343 extension: Arc<dyn Extension>,
344}
345
346struct ExtensionInterruptDispatch {
347 interrupted_dispatch: WireDispatchResult,
348 interrupting_response: Option<ResponseFrame>,
349}
350
351fn blocking_extension_request(
352 sidecar: &NativeSidecar<LocalBridge>,
353 request: &RequestFrame,
354) -> Option<BlockingExtensionRequest> {
355 let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
356 return None;
357 };
358 let extension = sidecar.extensions.get(&envelope.namespace)?.clone();
359 if !extension.is_blocking_request(&envelope.payload) {
360 return None;
361 }
362 Some(BlockingExtensionRequest {
363 namespace: envelope.namespace.clone(),
364 payload: envelope.payload.clone(),
365 extension,
366 })
367}
368
369fn extension_interrupt_response(
370 blocking_request: &BlockingExtensionRequest,
371 active_request: &RequestFrame,
372 frame: &ProtocolFrame,
373) -> Option<ExtensionInterruptDispatch> {
374 match frame {
375 ProtocolFrame::RequestFrame(request) => {
376 if request.ownership != active_request.ownership {
377 return None;
378 }
379 let interrupt = match &request.payload {
380 RequestPayload::ExtEnvelope(envelope)
381 if envelope.namespace == blocking_request.namespace =>
382 {
383 blocking_request.extension.interrupt_blocking_request(
384 &blocking_request.payload,
385 ExtensionInterruptRequest::ExtensionPayload(&envelope.payload),
386 )?
387 }
388 RequestPayload::ExtEnvelope(_) => return None,
389 RequestPayload::KillProcessRequest(_) => {
390 blocking_request.extension.interrupt_blocking_request(
391 &blocking_request.payload,
392 ExtensionInterruptRequest::KillProcess,
393 )?
394 }
395 RequestPayload::AuthenticateRequest(_)
400 | RequestPayload::OpenSessionRequest(_)
401 | RequestPayload::CreateVmRequest(_)
402 | RequestPayload::DisposeVmRequest(_)
403 | RequestPayload::BootstrapRootFilesystemRequest(_)
404 | RequestPayload::ConfigureVmRequest(_)
405 | RequestPayload::RegisterHostCallbacksRequest(_)
406 | RequestPayload::CreateLayerRequest
407 | RequestPayload::SealLayerRequest(_)
408 | RequestPayload::ImportSnapshotRequest(_)
409 | RequestPayload::ExportSnapshotRequest(_)
410 | RequestPayload::CreateOverlayRequest(_)
411 | RequestPayload::GuestFilesystemCallRequest(_)
412 | RequestPayload::SnapshotRootFilesystemRequest
413 | RequestPayload::ExecuteRequest(_)
414 | RequestPayload::WriteStdinRequest(_)
415 | RequestPayload::CloseStdinRequest(_)
416 | RequestPayload::GetProcessSnapshotRequest
417 | RequestPayload::FindListenerRequest(_)
418 | RequestPayload::FindBoundUdpRequest(_)
419 | RequestPayload::VmFetchRequest(_)
420 | RequestPayload::GetSignalStateRequest(_)
421 | RequestPayload::GetZombieTimerCountRequest
422 | RequestPayload::HostFilesystemCallRequest(_)
423 | RequestPayload::PersistenceLoadRequest(_)
424 | RequestPayload::PersistenceFlushRequest(_) => return None,
425 };
426 let interrupted_dispatch = interrupted_extension_dispatch(
427 active_request,
428 &blocking_request.namespace,
429 interrupt.interrupted_response_payload,
430 );
431 let interrupting_response = interrupt.interrupting_response_payload.map(|payload| {
432 response_frame(
433 request.request_id,
434 request.ownership.clone(),
435 ResponsePayload::ExtEnvelope(ExtEnvelope {
436 namespace: blocking_request.namespace.clone(),
437 payload,
438 }),
439 )
440 });
441 Some(ExtensionInterruptDispatch {
442 interrupted_dispatch,
443 interrupting_response,
444 })
445 }
446 ProtocolFrame::ResponseFrame(_)
452 | ProtocolFrame::EventFrame(_)
453 | ProtocolFrame::SidecarRequestFrame(_)
454 | ProtocolFrame::SidecarResponseFrame(_) => None,
455 }
456}
457
458fn interrupted_extension_dispatch(
459 request: &RequestFrame,
460 namespace: &str,
461 payload: Vec<u8>,
462) -> WireDispatchResult {
463 match &request.payload {
464 RequestPayload::ExtEnvelope(_) => {
465 let response = ResponsePayload::ExtEnvelope(ExtEnvelope {
466 namespace: namespace.to_string(),
467 payload,
468 });
469 WireDispatchResult {
470 response: response_frame(request.request_id, request.ownership.clone(), response),
471 events: Vec::new(),
472 }
473 }
474 RequestPayload::AuthenticateRequest(_)
475 | RequestPayload::OpenSessionRequest(_)
476 | RequestPayload::CreateVmRequest(_)
477 | RequestPayload::DisposeVmRequest(_)
478 | RequestPayload::BootstrapRootFilesystemRequest(_)
479 | RequestPayload::ConfigureVmRequest(_)
480 | RequestPayload::RegisterHostCallbacksRequest(_)
481 | RequestPayload::CreateLayerRequest
482 | RequestPayload::SealLayerRequest(_)
483 | RequestPayload::ImportSnapshotRequest(_)
484 | RequestPayload::ExportSnapshotRequest(_)
485 | RequestPayload::CreateOverlayRequest(_)
486 | RequestPayload::GuestFilesystemCallRequest(_)
487 | RequestPayload::SnapshotRootFilesystemRequest
488 | RequestPayload::ExecuteRequest(_)
489 | RequestPayload::WriteStdinRequest(_)
490 | RequestPayload::CloseStdinRequest(_)
491 | RequestPayload::KillProcessRequest(_)
492 | RequestPayload::GetProcessSnapshotRequest
493 | RequestPayload::FindListenerRequest(_)
494 | RequestPayload::FindBoundUdpRequest(_)
495 | RequestPayload::VmFetchRequest(_)
496 | RequestPayload::GetSignalStateRequest(_)
497 | RequestPayload::GetZombieTimerCountRequest
498 | RequestPayload::HostFilesystemCallRequest(_)
499 | RequestPayload::PersistenceLoadRequest(_)
500 | RequestPayload::PersistenceFlushRequest(_) => {
501 unreachable!("interrupted extension dispatch requires an extension request");
502 }
503 }
504}
505
506async fn cleanup_connections(
507 sidecar: &mut NativeSidecar<LocalBridge>,
508 active_connections: &BTreeSet<String>,
509) {
510 for connection_id in active_connections {
511 let _ = sidecar.remove_connection(connection_id).await;
512 }
513}
514
515fn track_session_state(
516 payload: &ResponsePayload,
517 active_sessions: &mut BTreeSet<SessionScope>,
518 active_connections: &mut BTreeSet<String>,
519) {
520 match payload {
521 ResponsePayload::AuthenticatedResponse(AuthenticatedResponse { connection_id, .. }) => {
522 active_connections.insert(connection_id.clone());
523 }
524 ResponsePayload::SessionOpenedResponse(SessionOpenedResponse {
525 session_id,
526 owner_connection_id,
527 }) => {
528 active_sessions.insert(SessionScope {
529 connection_id: owner_connection_id.clone(),
530 session_id: session_id.clone(),
531 });
532 }
533 _ => {}
534 }
535}
536
537fn read_frame(
538 codec: &WireFrameCodec,
539 reader: &mut impl Read,
540) -> Result<Option<ProtocolFrame>, Box<dyn Error>> {
541 let mut prefix = [0u8; 4];
542 match reader.read_exact(&mut prefix) {
543 Ok(()) => {}
544 Err(error) if error.kind() == io::ErrorKind::UnexpectedEof => {
545 return Ok(None);
546 }
547 Err(error) => return Err(error.into()),
548 }
549
550 let declared_len = u32::from_be_bytes(prefix) as usize;
551 if declared_len > codec.max_frame_bytes() {
552 return Err(ProtocolCodecError::FrameTooLarge {
553 size: declared_len,
554 max: codec.max_frame_bytes(),
555 }
556 .into());
557 }
558 let total_len = prefix.len().saturating_add(declared_len);
559 let mut bytes = Vec::with_capacity(total_len);
560 bytes.extend_from_slice(&prefix);
561 bytes.resize(total_len, 0);
562 reader.read_exact(&mut bytes[prefix.len()..])?;
563
564 Ok(Some(codec.decode(&bytes)?))
565}
566
567fn write_frame(
568 codec: &WireFrameCodec,
569 writer: &mut impl Write,
570 frame: &ProtocolFrame,
571) -> Result<(), Box<dyn Error>> {
572 let bytes = codec.encode(frame)?;
573 writer.write_all(&bytes)?;
574 writer.flush()?;
575 Ok(())
576}
577
578fn frame_kind(frame: &ProtocolFrame) -> &'static str {
579 match frame {
580 ProtocolFrame::RequestFrame(_) => "request",
581 ProtocolFrame::ResponseFrame(_) => "response",
582 ProtocolFrame::EventFrame(_) => "event",
583 ProtocolFrame::SidecarRequestFrame(_) => "sidecar_request",
584 ProtocolFrame::SidecarResponseFrame(_) => "sidecar_response",
585 }
586}
587
588#[derive(Debug, Clone, PartialEq, Eq)]
589enum StdinFrameQueueError {
590 Full(String),
591 Closed,
592}
593
594fn enqueue_stdin_frame(
595 sender: &tokio::sync::mpsc::Sender<Result<Option<ProtocolFrame>, String>>,
596 frame: Result<Option<ProtocolFrame>, String>,
597) -> Result<(), StdinFrameQueueError> {
598 sender.try_send(frame).map_err(|error| match error {
599 tokio::sync::mpsc::error::TrySendError::Full(_) => StdinFrameQueueError::Full(format!(
600 "stdin frame queue exceeded {MAX_STDIN_FRAME_QUEUE} pending frames"
601 )),
602 tokio::sync::mpsc::error::TrySendError::Closed(_) => StdinFrameQueueError::Closed,
603 })
604}
605
606fn flush_sidecar_requests(
607 sidecar: &mut NativeSidecar<LocalBridge>,
608 writer: &mpsc::SyncSender<ProtocolFrame>,
609) -> Result<(), Box<dyn Error>> {
610 while let Some(request) = sidecar.pop_wire_sidecar_request()? {
611 send_output_frame(writer, ProtocolFrame::SidecarRequestFrame(request))?;
612 }
613 Ok(())
614}
615
616fn send_output_frame(
617 writer: &mpsc::SyncSender<ProtocolFrame>,
618 frame: ProtocolFrame,
619) -> Result<(), io::Error> {
620 writer.try_send(frame).map_err(|error| {
621 let message = match error {
622 mpsc::TrySendError::Full(_) => {
623 format!("stdout frame queue exceeded {MAX_STDOUT_FRAME_QUEUE} pending frames")
624 }
625 mpsc::TrySendError::Disconnected(_) => String::from("stdout writer disconnected"),
626 };
627 io::Error::new(io::ErrorKind::BrokenPipe, message)
628 })
629}
630
631fn default_compile_cache_root() -> PathBuf {
632 std::env::temp_dir().join("secure-exec-sidecar-compile-cache")
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644 use crate::wire::{AuthenticateRequest, KillProcessRequest};
645 use crate::{ExtensionContext, ExtensionFuture, ExtensionInterruptResponse, ExtensionResponse};
646 use std::io::Cursor;
647
648 const TEST_EXTENSION_NAMESPACE: &str = "dev.rivet.secure-exec.test.blocking";
649
650 #[test]
651 fn read_frame_rejects_oversized_prefix_before_allocating_payload() {
652 let codec = WireFrameCodec::new(16);
653 let mut reader = Cursor::new((32_u32).to_be_bytes().to_vec());
654
655 let error = read_frame(&codec, &mut reader).expect_err("oversized frame should fail");
656 let error = error
657 .downcast::<ProtocolCodecError>()
658 .expect("protocol codec error");
659 assert!(matches!(
660 *error,
661 ProtocolCodecError::FrameTooLarge { size: 32, max: 16 }
662 ));
663 }
664
665 #[test]
666 fn stdio_work_queues_are_bounded() {
667 let (stdin_tx, _stdin_rx) =
668 channel::<Result<Option<ProtocolFrame>, String>>(MAX_STDIN_FRAME_QUEUE);
669 for _ in 0..MAX_STDIN_FRAME_QUEUE {
670 enqueue_stdin_frame(&stdin_tx, Ok(None))
671 .expect("stdin frame queue should accept capacity");
672 }
673 assert!(matches!(
674 enqueue_stdin_frame(&stdin_tx, Ok(None)),
675 Err(StdinFrameQueueError::Full(_))
676 ));
677
678 let (event_ready_tx, _event_ready_rx) = channel::<()>(MAX_EVENT_READY_QUEUE);
679 event_ready_tx
680 .try_send(())
681 .expect("event-ready queue should accept capacity");
682 assert!(matches!(
683 event_ready_tx.try_send(()),
684 Err(tokio::sync::mpsc::error::TrySendError::Full(_))
685 ));
686
687 let (stdout_tx, _stdout_rx) = mpsc::sync_channel(MAX_STDOUT_FRAME_QUEUE);
688 for request_id in 0..MAX_STDOUT_FRAME_QUEUE {
689 send_output_frame(
690 &stdout_tx,
691 ProtocolFrame::RequestFrame(request_frame(
692 request_id as RequestId,
693 connection_ownership("conn-queue"),
694 RequestPayload::AuthenticateRequest(AuthenticateRequest {
695 client_name: String::from("queue-test"),
696 auth_token: String::from("token"),
697 protocol_version: wire::PROTOCOL_VERSION,
698 bridge_version: secure_exec_bridge::bridge_contract().version,
699 }),
700 )),
701 )
702 .expect("stdout frame queue should accept capacity");
703 }
704 let error = send_output_frame(
705 &stdout_tx,
706 ProtocolFrame::RequestFrame(request_frame(
707 MAX_STDOUT_FRAME_QUEUE as RequestId,
708 connection_ownership("conn-queue"),
709 RequestPayload::AuthenticateRequest(AuthenticateRequest {
710 client_name: String::from("queue-test"),
711 auth_token: String::from("token"),
712 protocol_version: wire::PROTOCOL_VERSION,
713 bridge_version: secure_exec_bridge::bridge_contract().version,
714 }),
715 )),
716 )
717 .expect_err("stdout frame queue should reject overflow");
718 assert!(
719 error.to_string().contains("stdout frame queue exceeded"),
720 "unexpected stdout queue error: {error}"
721 );
722 }
723
724 #[test]
725 fn read_frame_decodes_wire_authenticate_request() {
726 let codec = WireFrameCodec::new(wire::DEFAULT_MAX_FRAME_BYTES);
727 let frame = ProtocolFrame::RequestFrame(request_frame(
728 1,
729 connection_ownership("client-hint"),
730 RequestPayload::AuthenticateRequest(AuthenticateRequest {
731 client_name: "probe".to_string(),
732 auth_token: "probe-token".to_string(),
733 protocol_version: wire::PROTOCOL_VERSION,
734 bridge_version: secure_exec_bridge::bridge_contract().version,
735 }),
736 ));
737 let encoded = codec.encode(&frame).expect("encode wire frame");
738 let mut reader = Cursor::new(encoded);
739
740 let decoded = read_frame(&codec, &mut reader)
741 .expect("decode bare frame")
742 .expect("frame present");
743
744 assert_eq!(decoded, frame);
745 }
746
747 #[test]
748 fn extension_close_interrupts_matching_blocking_request() {
749 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
750 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
751 let close = ProtocolFrame::RequestFrame(test_extension_request_frame(
752 11,
753 ownership,
754 "close:ext-session-1",
755 ));
756
757 let blocking_request = blocking_extension_request(&prompt);
758 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &close)
759 .expect("close should interrupt prompt");
760
761 assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
762 let ResponsePayload::ExtEnvelope(envelope) =
763 interrupt.interrupted_dispatch.response.payload
764 else {
765 panic!("expected extension response");
766 };
767 assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
768 assert_eq!(envelope.payload, b"prompt-cancelled:ext-session-1");
769 }
770
771 #[test]
772 fn extension_cancel_interrupt_gets_synthetic_response() {
773 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
774 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
775 let cancel = ProtocolFrame::RequestFrame(test_extension_request_frame(
776 11,
777 ownership,
778 "cancel:ext-session-1",
779 ));
780
781 let blocking_request = blocking_extension_request(&prompt);
782 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &cancel)
783 .expect("cancel should interrupt prompt");
784 let response = interrupt
785 .interrupting_response
786 .expect("cancel should get a response");
787
788 assert_eq!(response.request_id, 11);
789 let ResponsePayload::ExtEnvelope(envelope) = response.payload else {
790 panic!("expected extension response");
791 };
792 assert_eq!(envelope.namespace, TEST_EXTENSION_NAMESPACE);
793 assert_eq!(envelope.payload, b"cancelled:ext-session-1");
794 }
795
796 #[test]
797 fn kill_process_interrupts_blocking_extension_request() {
798 let ownership = vm_ownership("conn-1", "session-1", "vm-1");
799 let prompt = test_extension_request_frame(10, ownership.clone(), "prompt:ext-session-1");
800 let kill = ProtocolFrame::RequestFrame(request_frame(
801 11,
802 ownership,
803 RequestPayload::KillProcessRequest(KillProcessRequest {
804 process_id: "adapter-process".to_string(),
805 signal: "SIGTERM".to_string(),
806 }),
807 ));
808
809 let blocking_request = blocking_extension_request(&prompt);
810 let interrupt = extension_interrupt_response(&blocking_request, &prompt, &kill)
811 .expect("kill should interrupt prompt");
812
813 assert_eq!(interrupt.interrupted_dispatch.response.request_id, 10);
814 assert!(interrupt.interrupting_response.is_none());
815 }
816
817 fn test_extension_request_frame(
818 request_id: RequestId,
819 ownership: OwnershipScope,
820 payload: &str,
821 ) -> RequestFrame {
822 request_frame(
823 request_id,
824 ownership,
825 RequestPayload::ExtEnvelope(ExtEnvelope {
826 namespace: TEST_EXTENSION_NAMESPACE.to_string(),
827 payload: payload.as_bytes().to_vec(),
828 }),
829 )
830 }
831
832 fn blocking_extension_request(request: &RequestFrame) -> BlockingExtensionRequest {
833 let RequestPayload::ExtEnvelope(envelope) = &request.payload else {
834 panic!("expected extension request");
835 };
836 BlockingExtensionRequest {
837 namespace: TEST_EXTENSION_NAMESPACE.to_string(),
838 payload: envelope.payload.clone(),
839 extension: Arc::new(TestBlockingInterruptExtension),
840 }
841 }
842
843 struct TestBlockingInterruptExtension;
844
845 impl Extension for TestBlockingInterruptExtension {
846 fn namespace(&self) -> &str {
847 TEST_EXTENSION_NAMESPACE
848 }
849
850 fn handle_request<'a>(
851 &'a self,
852 _ctx: ExtensionContext<'a>,
853 _payload: Vec<u8>,
854 ) -> ExtensionFuture<'a, ExtensionResponse> {
855 Box::pin(async { Ok(ExtensionResponse::new(Vec::new())) })
856 }
857
858 fn is_blocking_request(&self, payload: &[u8]) -> bool {
859 parse_test_payload(payload).is_some_and(|(kind, _session_id)| kind == "prompt")
860 }
861
862 fn interrupt_blocking_request(
863 &self,
864 blocking_payload: &[u8],
865 interrupt: ExtensionInterruptRequest<'_>,
866 ) -> Option<ExtensionInterruptResponse> {
867 let (blocking_kind, blocking_session_id) = parse_test_payload(blocking_payload)?;
868 if blocking_kind != "prompt" {
869 return None;
870 }
871
872 let interrupted_response_payload =
873 encode_test_response("prompt-cancelled", blocking_session_id);
874 match interrupt {
875 ExtensionInterruptRequest::KillProcess => Some(ExtensionInterruptResponse {
876 interrupted_response_payload,
877 interrupting_response_payload: None,
878 }),
879 ExtensionInterruptRequest::ExtensionPayload(payload) => {
880 let (interrupt_kind, interrupt_session_id) = parse_test_payload(payload)?;
881 match interrupt_kind {
882 "close" if interrupt_session_id == blocking_session_id => {
883 Some(ExtensionInterruptResponse {
884 interrupted_response_payload,
885 interrupting_response_payload: None,
886 })
887 }
888 "cancel" if interrupt_session_id == blocking_session_id => {
889 Some(ExtensionInterruptResponse {
890 interrupted_response_payload,
891 interrupting_response_payload: Some(encode_test_response(
892 "cancelled",
893 interrupt_session_id,
894 )),
895 })
896 }
897 "prompt" | "close" | "cancel" => None,
898 _ => None,
899 }
900 }
901 }
902 }
903 }
904
905 fn parse_test_payload(payload: &[u8]) -> Option<(&str, &str)> {
906 let payload = std::str::from_utf8(payload).ok()?;
907 payload.split_once(':')
908 }
909
910 fn encode_test_response(kind: &str, session_id: &str) -> Vec<u8> {
911 format!("{kind}:{session_id}").into_bytes()
912 }
913}
914
915#[derive(Debug, Clone)]
916struct LocalBridge {
917 started_at: Instant,
918 next_timer_id: usize,
919 snapshots: BTreeMap<String, FilesystemSnapshot>,
920}
921
922impl Default for LocalBridge {
923 fn default() -> Self {
924 Self {
925 started_at: Instant::now(),
926 next_timer_id: 0,
927 snapshots: BTreeMap::new(),
928 }
929 }
930}
931
932impl BridgeTypes for LocalBridge {
933 type Error = LocalBridgeError;
934}
935
936impl FilesystemBridge for LocalBridge {
937 fn read_file(&mut self, request: ReadFileRequest) -> Result<Vec<u8>, Self::Error> {
938 fs::read(Self::host_path(&request.path))
939 .map_err(|error| LocalBridgeError::io("read", &request.path, error))
940 }
941
942 fn write_file(&mut self, request: WriteFileRequest) -> Result<(), Self::Error> {
943 let host_path = Self::host_path(&request.path);
944 if let Some(parent) = host_path.parent() {
945 fs::create_dir_all(parent)
946 .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))?;
947 }
948 fs::write(host_path, request.contents)
949 .map_err(|error| LocalBridgeError::io("write", &request.path, error))
950 }
951
952 fn stat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
953 fs::metadata(Self::host_path(&request.path))
954 .map(Self::file_metadata)
955 .map_err(|error| LocalBridgeError::io("stat", &request.path, error))
956 }
957
958 fn lstat(&mut self, request: PathRequest) -> Result<FileMetadata, Self::Error> {
959 fs::symlink_metadata(Self::host_path(&request.path))
960 .map(Self::file_metadata)
961 .map_err(|error| LocalBridgeError::io("lstat", &request.path, error))
962 }
963
964 fn read_dir(&mut self, request: ReadDirRequest) -> Result<Vec<DirectoryEntry>, Self::Error> {
965 let mut entries = fs::read_dir(Self::host_path(&request.path))
966 .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?
967 .map(|entry| {
968 let entry =
969 entry.map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
970 let kind = entry
971 .file_type()
972 .map(Self::file_kind)
973 .map_err(|error| LocalBridgeError::io("readdir", &request.path, error))?;
974 Ok(DirectoryEntry {
975 name: entry.file_name().to_string_lossy().into_owned(),
976 kind,
977 })
978 })
979 .collect::<Result<Vec<_>, LocalBridgeError>>()?;
980 entries.sort_by(|left, right| left.name.cmp(&right.name));
981 Ok(entries)
982 }
983
984 fn create_dir(&mut self, request: CreateDirRequest) -> Result<(), Self::Error> {
985 let host_path = Self::host_path(&request.path);
986 if request.recursive {
987 fs::create_dir_all(host_path)
988 } else {
989 fs::create_dir(host_path)
990 }
991 .map_err(|error| LocalBridgeError::io("mkdir", &request.path, error))
992 }
993
994 fn remove_file(&mut self, request: PathRequest) -> Result<(), Self::Error> {
995 fs::remove_file(Self::host_path(&request.path))
996 .map_err(|error| LocalBridgeError::io("unlink", &request.path, error))
997 }
998
999 fn remove_dir(&mut self, request: PathRequest) -> Result<(), Self::Error> {
1000 fs::remove_dir(Self::host_path(&request.path))
1001 .map_err(|error| LocalBridgeError::io("rmdir", &request.path, error))
1002 }
1003
1004 fn rename(&mut self, request: RenameRequest) -> Result<(), Self::Error> {
1005 let from_path = Self::host_path(&request.from_path);
1006 let to_path = Self::host_path(&request.to_path);
1007 if let Some(parent) = to_path.parent() {
1008 fs::create_dir_all(parent)
1009 .map_err(|error| LocalBridgeError::io("mkdir", &request.to_path, error))?;
1010 }
1011 fs::rename(from_path, to_path).map_err(|error| {
1012 LocalBridgeError::unsupported(format!(
1013 "rename {} -> {}: {}",
1014 request.from_path, request.to_path, error
1015 ))
1016 })
1017 }
1018
1019 fn symlink(&mut self, request: SymlinkRequest) -> Result<(), Self::Error> {
1020 let link_path = Self::host_path(&request.link_path);
1021 if let Some(parent) = link_path.parent() {
1022 fs::create_dir_all(parent)
1023 .map_err(|error| LocalBridgeError::io("mkdir", &request.link_path, error))?;
1024 }
1025 create_symlink(&request.target_path, link_path)
1026 .map_err(|error| LocalBridgeError::io("symlink", &request.link_path, error))
1027 }
1028
1029 fn read_link(&mut self, request: PathRequest) -> Result<String, Self::Error> {
1030 fs::read_link(Self::host_path(&request.path))
1031 .map(|target| target.to_string_lossy().into_owned())
1032 .map_err(|error| LocalBridgeError::io("readlink", &request.path, error))
1033 }
1034
1035 fn chmod(&mut self, request: ChmodRequest) -> Result<(), Self::Error> {
1036 let permissions = fs::Permissions::from_mode(request.mode);
1037 fs::set_permissions(Self::host_path(&request.path), permissions)
1038 .map_err(|error| LocalBridgeError::io("chmod", &request.path, error))
1039 }
1040
1041 fn truncate(&mut self, request: TruncateRequest) -> Result<(), Self::Error> {
1042 OpenOptions::new()
1043 .write(true)
1044 .create(false)
1045 .open(Self::host_path(&request.path))
1046 .and_then(|file| file.set_len(request.len))
1047 .map_err(|error| LocalBridgeError::io("truncate", &request.path, error))
1048 }
1049
1050 fn exists(&mut self, request: PathRequest) -> Result<bool, Self::Error> {
1051 Ok(fs::symlink_metadata(Self::host_path(&request.path)).is_ok())
1052 }
1053}
1054
1055impl PermissionBridge for LocalBridge {
1056 fn check_filesystem_access(
1057 &mut self,
1058 request: FilesystemPermissionRequest,
1059 ) -> Result<PermissionDecision, Self::Error> {
1060 Ok(PermissionDecision::deny(format!(
1061 "no static filesystem policy registered for {}:{}",
1062 request.vm_id, request.path
1063 )))
1064 }
1065
1066 fn check_network_access(
1067 &mut self,
1068 request: NetworkPermissionRequest,
1069 ) -> Result<PermissionDecision, Self::Error> {
1070 Ok(PermissionDecision::deny(format!(
1071 "no static network policy registered for {}:{}",
1072 request.vm_id, request.resource
1073 )))
1074 }
1075
1076 fn check_command_execution(
1077 &mut self,
1078 request: CommandPermissionRequest,
1079 ) -> Result<PermissionDecision, Self::Error> {
1080 Ok(PermissionDecision::deny(format!(
1081 "no static child_process policy registered for {}:{}",
1082 request.vm_id, request.command
1083 )))
1084 }
1085
1086 fn check_environment_access(
1087 &mut self,
1088 request: EnvironmentPermissionRequest,
1089 ) -> Result<PermissionDecision, Self::Error> {
1090 Ok(PermissionDecision::deny(format!(
1091 "no static env policy registered for {}:{}",
1092 request.vm_id, request.key
1093 )))
1094 }
1095}
1096
1097impl PersistenceBridge for LocalBridge {
1098 fn load_filesystem_state(
1099 &mut self,
1100 request: LoadFilesystemStateRequest,
1101 ) -> Result<Option<FilesystemSnapshot>, Self::Error> {
1102 Ok(self.snapshots.get(&request.vm_id).cloned())
1103 }
1104
1105 fn flush_filesystem_state(
1106 &mut self,
1107 request: FlushFilesystemStateRequest,
1108 ) -> Result<(), Self::Error> {
1109 self.snapshots.insert(request.vm_id, request.snapshot);
1110 Ok(())
1111 }
1112}
1113
1114impl ClockBridge for LocalBridge {
1115 fn wall_clock(&mut self, _request: ClockRequest) -> Result<SystemTime, Self::Error> {
1116 Ok(SystemTime::now())
1117 }
1118
1119 fn monotonic_clock(&mut self, _request: ClockRequest) -> Result<Duration, Self::Error> {
1120 Ok(self.started_at.elapsed())
1121 }
1122
1123 fn schedule_timer(
1124 &mut self,
1125 request: ScheduleTimerRequest,
1126 ) -> Result<ScheduledTimer, Self::Error> {
1127 self.next_timer_id += 1;
1128 Ok(ScheduledTimer {
1129 timer_id: format!("timer-{}", self.next_timer_id),
1130 delay: request.delay,
1131 })
1132 }
1133}
1134
1135impl RandomBridge for LocalBridge {
1136 fn fill_random_bytes(&mut self, request: RandomBytesRequest) -> Result<Vec<u8>, Self::Error> {
1137 Ok(vec![0u8; request.len])
1138 }
1139}
1140
1141impl EventBridge for LocalBridge {
1142 fn emit_structured_event(&mut self, _event: StructuredEventRecord) -> Result<(), Self::Error> {
1143 Ok(())
1144 }
1145
1146 fn emit_diagnostic(&mut self, _event: DiagnosticRecord) -> Result<(), Self::Error> {
1147 Ok(())
1148 }
1149
1150 fn emit_log(&mut self, _event: LogRecord) -> Result<(), Self::Error> {
1151 Ok(())
1152 }
1153
1154 fn emit_lifecycle(&mut self, _event: LifecycleEventRecord) -> Result<(), Self::Error> {
1155 Ok(())
1156 }
1157}
1158
1159impl ExecutionBridge for LocalBridge {
1160 fn create_javascript_context(
1161 &mut self,
1162 _request: CreateJavascriptContextRequest,
1163 ) -> Result<GuestContextHandle, Self::Error> {
1164 Err(LocalBridgeError::unsupported(
1165 "execution bridge is handled internally by the native sidecar",
1166 ))
1167 }
1168
1169 fn create_wasm_context(
1170 &mut self,
1171 _request: CreateWasmContextRequest,
1172 ) -> Result<GuestContextHandle, Self::Error> {
1173 Err(LocalBridgeError::unsupported(
1174 "execution bridge is handled internally by the native sidecar",
1175 ))
1176 }
1177
1178 fn start_execution(
1179 &mut self,
1180 _request: StartExecutionRequest,
1181 ) -> Result<StartedExecution, Self::Error> {
1182 Err(LocalBridgeError::unsupported(
1183 "execution bridge is handled internally by the native sidecar",
1184 ))
1185 }
1186
1187 fn write_stdin(&mut self, _request: WriteExecutionStdinRequest) -> Result<(), Self::Error> {
1188 Err(LocalBridgeError::unsupported(
1189 "execution bridge is handled internally by the native sidecar",
1190 ))
1191 }
1192
1193 fn close_stdin(&mut self, _request: ExecutionHandleRequest) -> Result<(), Self::Error> {
1194 Err(LocalBridgeError::unsupported(
1195 "execution bridge is handled internally by the native sidecar",
1196 ))
1197 }
1198
1199 fn kill_execution(&mut self, _request: KillExecutionRequest) -> Result<(), Self::Error> {
1200 Err(LocalBridgeError::unsupported(
1201 "execution bridge is handled internally by the native sidecar",
1202 ))
1203 }
1204
1205 fn poll_execution_event(
1206 &mut self,
1207 _request: PollExecutionEventRequest,
1208 ) -> Result<Option<ExecutionEvent>, Self::Error> {
1209 Ok(None)
1210 }
1211}
1212
1213#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1214struct SessionScope {
1215 connection_id: String,
1216 session_id: String,
1217}
1218
1219impl SessionScope {
1220 fn ownership_scope(&self) -> OwnershipScope {
1221 session_ownership(&self.connection_id, &self.session_id)
1222 }
1223
1224 fn compat_ownership_scope(&self) -> crate::protocol::OwnershipScope {
1225 wire::ownership_scope_to_compat(self.ownership_scope())
1226 }
1227}
1228
1229struct FrameSidecarRequestTransport {
1230 writer: mpsc::SyncSender<ProtocolFrame>,
1231 pending: Arc<Mutex<BTreeMap<RequestId, mpsc::SyncSender<SidecarResponseFrame>>>>,
1232}
1233
1234impl FrameSidecarRequestTransport {
1235 fn new(writer: mpsc::SyncSender<ProtocolFrame>) -> Self {
1236 Self {
1237 writer,
1238 pending: Arc::new(Mutex::new(BTreeMap::new())),
1239 }
1240 }
1241
1242 fn accept_response(&self, response: SidecarResponseFrame) -> bool {
1243 let sender = {
1244 let mut pending = match self.pending.lock() {
1245 Ok(pending) => pending,
1246 Err(_) => return false,
1247 };
1248 pending.remove(&response.request_id)
1249 };
1250 let Some(sender) = sender else {
1251 return false;
1252 };
1253 let _ = sender.send(response);
1254 true
1255 }
1256}
1257
1258impl SidecarRequestTransport for FrameSidecarRequestTransport {
1259 fn send_request(
1260 &self,
1261 request: crate::protocol::SidecarRequestFrame,
1262 timeout: Duration,
1263 ) -> Result<crate::protocol::SidecarResponseFrame, SidecarError> {
1264 let request =
1265 wire::sidecar_request_frame_from_compat(request).map_err(wire_protocol_error)?;
1266 let (sender, receiver) = mpsc::sync_channel(1);
1267 self.pending
1268 .lock()
1269 .map_err(|_| {
1270 SidecarError::Bridge(String::from("sidecar callback waiter map lock poisoned"))
1271 })?
1272 .insert(request.request_id, sender);
1273 if let Err(error) = send_output_frame(
1274 &self.writer,
1275 ProtocolFrame::SidecarRequestFrame(request.clone()),
1276 ) {
1277 let _ = self
1278 .pending
1279 .lock()
1280 .map(|mut pending| pending.remove(&request.request_id));
1281 return Err(SidecarError::Io(format!(
1282 "failed to write sidecar request frame: {error}"
1283 )));
1284 }
1285 match receiver.recv_timeout(timeout) {
1286 Ok(response) => {
1287 wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)
1288 }
1289 Err(mpsc::RecvTimeoutError::Timeout) => {
1290 let _ = self
1291 .pending
1292 .lock()
1293 .map(|mut pending| pending.remove(&request.request_id));
1294 Err(SidecarError::Io(format!(
1295 "timed out waiting for sidecar response after {}s",
1296 timeout.as_secs()
1297 )))
1298 }
1299 Err(mpsc::RecvTimeoutError::Disconnected) => Err(SidecarError::Io(String::from(
1300 "sidecar response waiter disconnected",
1301 ))),
1302 }
1303 }
1304}
1305
1306#[derive(Debug, Clone, PartialEq, Eq)]
1307struct LocalBridgeError {
1308 message: String,
1309}
1310
1311impl LocalBridgeError {
1312 fn unsupported(message: impl Into<String>) -> Self {
1313 Self {
1314 message: message.into(),
1315 }
1316 }
1317
1318 fn io(operation: &str, path: &str, error: io::Error) -> Self {
1319 Self::unsupported(format!("{operation} {path}: {error}"))
1320 }
1321}
1322
1323impl fmt::Display for LocalBridgeError {
1324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1325 f.write_str(&self.message)
1326 }
1327}
1328
1329impl Error for LocalBridgeError {}
1330
1331impl LocalBridge {
1332 fn host_path(path: &str) -> PathBuf {
1333 let candidate = Path::new(path);
1334 if candidate.is_absolute() {
1335 candidate.to_path_buf()
1336 } else {
1337 std::env::current_dir()
1338 .unwrap_or_else(|_| PathBuf::from("."))
1339 .join(candidate)
1340 }
1341 }
1342
1343 fn file_metadata(metadata: fs::Metadata) -> FileMetadata {
1344 FileMetadata {
1345 mode: metadata.permissions().mode(),
1346 size: metadata.size(),
1347 kind: Self::file_kind(metadata.file_type()),
1348 }
1349 }
1350
1351 fn file_kind(file_type: fs::FileType) -> secure_exec_bridge::FileKind {
1352 if file_type.is_file() {
1353 secure_exec_bridge::FileKind::File
1354 } else if file_type.is_dir() {
1355 secure_exec_bridge::FileKind::Directory
1356 } else if file_type.is_symlink() {
1357 secure_exec_bridge::FileKind::SymbolicLink
1358 } else {
1359 secure_exec_bridge::FileKind::Other
1360 }
1361 }
1362}