1use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
2pub(crate) use crate::execution::{
3 build_javascript_socket_path_context, canonical_signal_name, dispatch_loopback_http_request,
4 error_code, format_tcp_resource, ignore_stale_javascript_sync_rpc_response,
5 javascript_sync_rpc_arg_i32, javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32,
6 javascript_sync_rpc_arg_u32_optional, javascript_sync_rpc_arg_u64,
7 javascript_sync_rpc_arg_u64_optional, javascript_sync_rpc_bytes_arg,
8 javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding, javascript_sync_rpc_error_code,
9 javascript_sync_rpc_option_bool, javascript_sync_rpc_option_u32, parse_signal,
10 sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
11 vm_network_resource_counts, JavascriptSyncRpcServiceRequest, LoopbackHttpDispatchRequest,
12};
13use crate::extension::{
14 Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
15 ExtensionSnapshot,
16};
17use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
18use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
19use crate::protocol::{
20 AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
21 ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
22 GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
23 JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
24 PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
25 ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
26 RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
27 SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
28 SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
29 SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
30 VmLifecycleState, WriteStdinRequest,
31};
32use crate::state::{
33 ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
34 JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
35 SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
36};
37use crate::tools::register_host_callbacks;
38use crate::NativeSidecarBridge;
39use secure_exec_bridge::{
40 CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
41 FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
42 NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
43};
44use secure_exec_execution::{
45 JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
46 PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
47};
48use secure_exec_kernel::kernel::KernelError;
49use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
50use secure_exec_kernel::permissions::{
51 permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
52 NetworkAccessRequest, NetworkOperation, PermissionDecision,
53};
54use secure_exec_kernel::vfs::VfsError;
56use serde::Deserialize;
57use serde_json::{json, Value};
58use std::collections::{BTreeMap, BTreeSet, VecDeque};
59use std::fmt;
60use std::fs;
61use std::os::unix::fs::PermissionsExt;
62use std::path::{Component, Path, PathBuf};
63use std::sync::{Arc, Mutex};
64use std::task::{Context, Poll, Waker};
65use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
66use tokio::sync::mpsc::{channel, Receiver, Sender};
67use tokio::time;
68
69const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
72 &["AGENTOS_ENTRYPOINT", "AGENTOS_BOOTSTRAP_MODULE"];
73const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
74 &["AGENTOS_WASM_MODULE_PATH", "AGENTOS_WASM_MODULE_BASE64"];
75const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENTOS_PYTHON_"];
76pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
77pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
78pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
79pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
80
81pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
82 SidecarError::InvalidState(format!(
83 "process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
84 ))
85}
86
87fn sidecar_response_pending_overflow_error() -> SidecarError {
88 SidecarError::InvalidState(format!(
89 "sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
90 ))
91}
92
93fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
94 SidecarError::InvalidState(format!(
95 "outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
96 ))
97}
98
99fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
100 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
101}
102
103pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
105
106#[derive(Debug, Default, Deserialize)]
109struct LegacyJavascriptChildProcessSpawnOptions {
110 #[serde(default)]
111 cwd: Option<String>,
112 #[serde(default)]
113 env: BTreeMap<String, String>,
114 #[serde(default)]
115 input: Option<Value>,
116 #[serde(default)]
117 shell: bool,
118 #[serde(default)]
119 detached: bool,
120 #[serde(default)]
121 stdio: Vec<String>,
122 #[serde(default, rename = "maxBuffer")]
123 max_buffer: Option<usize>,
124 #[serde(default)]
125 timeout: Option<u64>,
126 #[serde(default, rename = "killSignal")]
127 kill_signal: Option<String>,
128}
129
130#[derive(Debug, Deserialize)]
131#[serde(rename_all = "camelCase")]
132struct JavascriptHttpLoopbackRequest {
133 process_id: String,
134 server_id: u64,
135 host: String,
136 port: u16,
137 request: String,
138}
139
140fn is_javascript_loopback_host(host: &str) -> bool {
141 host == "127.0.0.1" || host == "::1" || host.eq_ignore_ascii_case("localhost")
142}
143
144pub(crate) fn parse_javascript_child_process_spawn_request(
145 vm: &VmState,
146 args: &[Value],
147) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
148 if let Some(value) = args.first().cloned() {
149 if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
150 return Ok((request, None));
151 }
152 }
153
154 let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
155 let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
156 let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
157
158 let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
159 SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
160 })?;
161 let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
162 raw_options,
163 )
164 .map_err(|error| {
165 SidecarError::InvalidState(format!(
166 "invalid child_process.spawn options payload: {error}"
167 ))
168 })?;
169
170 Ok((
171 JavascriptChildProcessSpawnRequest {
172 command,
173 args: parsed_args,
174 options: JavascriptChildProcessSpawnOptions {
175 cwd: parsed_options.cwd,
176 env: parsed_options.env,
177 internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
178 &vm.guest_env,
179 ),
180 input: parsed_options.input,
181 shell: parsed_options.shell,
182 detached: parsed_options.detached,
183 stdio: parsed_options.stdio,
184 timeout: parsed_options.timeout,
185 kill_signal: parsed_options.kill_signal,
186 },
187 },
188 parsed_options.max_buffer,
189 ))
190}
191
192impl<B> SharedBridge<B> {
193 fn new(bridge: B) -> Self {
194 Self {
195 inner: Arc::new(Mutex::new(bridge)),
196 permissions: Arc::new(Mutex::new(BTreeMap::new())),
197 #[cfg(test)]
198 set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
199 }
200 }
201}
202
203impl<B> SharedBridge<B>
204where
205 B: NativeSidecarBridge + Send + 'static,
206 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
207{
208 pub(crate) fn with_mut<T>(
209 &self,
210 operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
211 ) -> Result<T, SidecarError> {
212 let mut bridge = self.inner.lock().map_err(|_| {
213 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
214 })?;
215 operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
216 }
217
218 fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
219 let mut bridge = self.inner.lock().map_err(|_| {
220 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
221 })?;
222 Ok(operation(&mut bridge))
223 }
224
225 #[cfg(test)]
226 #[allow(dead_code)]
227 pub(crate) fn queue_set_vm_permissions_result(
228 &self,
229 result: Result<(), SidecarError>,
230 ) -> Result<(), SidecarError> {
231 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
232 SidecarError::Bridge(String::from(
233 "native sidecar test set_vm_permissions outcome lock poisoned",
234 ))
235 })?;
236 outcomes.push_back(result.err());
237 Ok(())
238 }
239
240 pub(crate) fn emit_lifecycle(
241 &self,
242 vm_id: &str,
243 state: LifecycleState,
244 ) -> Result<(), SidecarError> {
245 self.with_mut(|bridge| {
246 bridge.emit_lifecycle(LifecycleEventRecord {
247 vm_id: vm_id.to_owned(),
248 state,
249 detail: None,
250 })
251 })
252 }
253
254 pub(crate) fn emit_log(
255 &self,
256 vm_id: &str,
257 message: impl Into<String>,
258 ) -> Result<(), SidecarError> {
259 self.with_mut(|bridge| {
260 bridge.emit_log(LogRecord {
261 vm_id: vm_id.to_owned(),
262 level: LogLevel::Info,
263 message: message.into(),
264 })
265 })
266 }
267
268 pub(crate) fn filesystem_decision(
269 &self,
270 vm_id: &str,
271 path: &str,
272 access: FilesystemAccess,
273 ) -> PermissionDecision {
274 if let Some(decision) = self.static_permission_decision(
275 vm_id,
276 filesystem_permission_capability(access),
277 "fs",
278 Some(path),
279 ) {
280 return decision;
281 }
282 match self.with_mut(|bridge| {
283 bridge.check_filesystem_access(FilesystemPermissionRequest {
284 vm_id: vm_id.to_owned(),
285 path: path.to_owned(),
286 access,
287 })
288 }) {
289 Ok(decision) => map_bridge_permission(decision),
290 Err(error) => PermissionDecision::deny(error.to_string()),
291 }
292 }
293
294 pub(crate) fn command_decision(
295 &self,
296 vm_id: &str,
297 request: &CommandAccessRequest,
298 ) -> PermissionDecision {
299 if is_internal_runtime_command_request(request) {
300 return PermissionDecision::allow();
301 }
302 if let Some(decision) = self.static_permission_decision(
303 vm_id,
304 "child_process.spawn",
305 "child_process",
306 Some(&request.command),
307 ) {
308 return decision;
309 }
310 match self.with_mut(|bridge| {
311 bridge.check_command_execution(CommandPermissionRequest {
312 vm_id: vm_id.to_owned(),
313 command: request.command.clone(),
314 args: request.args.clone(),
315 cwd: request.cwd.clone(),
316 env: request.env.clone(),
317 })
318 }) {
319 Ok(decision) => map_bridge_permission(decision),
320 Err(error) => PermissionDecision::deny(error.to_string()),
321 }
322 }
323
324 pub(crate) fn environment_decision(
325 &self,
326 vm_id: &str,
327 request: &EnvAccessRequest,
328 ) -> PermissionDecision {
329 if let Some(decision) = self.static_permission_decision(
330 vm_id,
331 environment_permission_capability(request.op),
332 "env",
333 Some(&request.key),
334 ) {
335 return decision;
336 }
337 match self.with_mut(|bridge| {
338 bridge.check_environment_access(EnvironmentPermissionRequest {
339 vm_id: vm_id.to_owned(),
340 access: match request.op {
341 EnvironmentOperation::Read => EnvironmentAccess::Read,
342 EnvironmentOperation::Write => EnvironmentAccess::Write,
343 },
344 key: request.key.clone(),
345 value: request.value.clone(),
346 })
347 }) {
348 Ok(decision) => map_bridge_permission(decision),
349 Err(error) => PermissionDecision::deny(error.to_string()),
350 }
351 }
352
353 pub(crate) fn network_decision(
354 &self,
355 vm_id: &str,
356 request: &NetworkAccessRequest,
357 ) -> PermissionDecision {
358 if let Some(decision) = self.static_permission_decision(
359 vm_id,
360 network_permission_capability(request.op),
361 "network",
362 Some(&request.resource),
363 ) {
364 return decision;
365 }
366 match self.with_mut(|bridge| {
367 bridge.check_network_access(NetworkPermissionRequest {
368 vm_id: vm_id.to_owned(),
369 access: match request.op {
370 NetworkOperation::Fetch => NetworkAccess::Fetch,
371 NetworkOperation::Http => NetworkAccess::Http,
372 NetworkOperation::Dns => NetworkAccess::Dns,
373 NetworkOperation::Listen => NetworkAccess::Listen,
374 },
375 resource: request.resource.clone(),
376 })
377 }) {
378 Ok(decision) => map_bridge_permission(decision),
379 Err(error) => PermissionDecision::deny(error.to_string()),
380 }
381 }
382
383 pub(crate) fn require_network_access(
384 &self,
385 vm_id: &str,
386 op: NetworkOperation,
387 resource: impl Into<String>,
388 ) -> Result<(), SidecarError> {
389 let resource = resource.into();
390 let decision = self.network_decision(
391 vm_id,
392 &NetworkAccessRequest {
393 vm_id: vm_id.to_owned(),
394 op,
395 resource: resource.clone(),
396 },
397 );
398 if decision.allow {
399 return Ok(());
400 }
401
402 let message = match decision.reason.as_deref() {
403 Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
404 None => format!("EACCES: permission denied, {resource}"),
405 };
406 Err(SidecarError::Execution(message))
407 }
408
409 pub(crate) fn set_vm_permissions(
410 &self,
411 vm_id: &str,
412 permissions: &PermissionsPolicy,
413 ) -> Result<(), SidecarError> {
414 #[cfg(test)]
415 {
416 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
417 SidecarError::Bridge(String::from(
418 "native sidecar test set_vm_permissions outcome lock poisoned",
419 ))
420 })?;
421 if let Some(Some(error)) = outcomes.pop_front() {
422 return Err(error);
423 }
424 }
425
426 let mut stored = self.permissions.lock().map_err(|_| {
427 SidecarError::Bridge(String::from(
428 "native sidecar permission policy lock poisoned",
429 ))
430 })?;
431 stored.insert(vm_id.to_owned(), permissions.clone());
432 Ok(())
433 }
434
435 pub(crate) fn restore_vm_permissions_fail_closed(
436 &self,
437 vm_id: &str,
438 original_permissions: &PermissionsPolicy,
439 context: &str,
440 operation_error: &SidecarError,
441 ) -> Result<(), SidecarError> {
442 match self.set_vm_permissions(vm_id, original_permissions) {
443 Ok(()) => Ok(()),
444 Err(restore_error) => {
445 let deny_all = PermissionsPolicy::deny_all();
446 match self.set_vm_permissions(vm_id, &deny_all) {
447 Ok(()) => Err(SidecarError::InvalidState(format!(
448 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
449 ))),
450 Err(deny_all_error) => panic!(
451 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
452 ),
453 }
454 }
455 }
456 }
457
458 pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
459 let mut stored = self.permissions.lock().map_err(|_| {
460 SidecarError::Bridge(String::from(
461 "native sidecar permission policy lock poisoned",
462 ))
463 })?;
464 stored.remove(vm_id);
465 Ok(())
466 }
467
468 pub(crate) fn static_permission_decision(
469 &self,
470 vm_id: &str,
471 capability: &str,
472 domain: &str,
473 resource: Option<&str>,
474 ) -> Option<PermissionDecision> {
475 let stored = self.permissions.lock().ok()?;
476 let permissions = stored.get(vm_id)?;
477 let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
478 Some(permission_mode_to_kernel_decision(mode, capability))
479 }
480}
481
482pub(crate) fn evaluate_permissions_policy(
483 permissions: &PermissionsPolicy,
484 domain: &str,
485 capability: &str,
486 resource: Option<&str>,
487) -> PermissionMode {
488 match domain {
489 "fs" => evaluate_fs_permission_scope(
490 permissions.fs.as_ref(),
491 capability_operation(capability, domain),
492 resource,
493 ),
494 "network" => evaluate_pattern_permission_scope(
495 permissions.network.as_ref(),
496 capability_operation(capability, domain),
497 resource,
498 ),
499 "child_process" => evaluate_pattern_permission_scope(
500 permissions.child_process.as_ref(),
501 capability_operation(capability, domain),
502 resource,
503 ),
504 "process" => evaluate_pattern_permission_scope(
505 permissions.process.as_ref(),
506 capability_operation(capability, domain),
507 resource,
508 ),
509 "env" => evaluate_pattern_permission_scope(
510 permissions.env.as_ref(),
511 capability_operation(capability, domain),
512 resource,
513 ),
514 "binding" => evaluate_pattern_permission_scope(
515 permissions.binding.as_ref(),
516 capability_operation(capability, domain),
517 resource,
518 ),
519 _ => PermissionMode::Deny,
520 }
521}
522
523fn evaluate_fs_permission_scope(
524 scope: Option<&FsPermissionScope>,
525 operation: &str,
526 resource: Option<&str>,
527) -> PermissionMode {
528 match scope {
529 Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
530 Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
531 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
532 for rule in &rules.rules {
533 if fs_rule_matches(rule, operation, resource) {
534 mode = rule.mode.clone();
535 }
536 }
537 mode
538 }
539 None => PermissionMode::Deny,
540 }
541}
542
543fn evaluate_pattern_permission_scope(
544 scope: Option<&PatternPermissionScope>,
545 operation: &str,
546 resource: Option<&str>,
547) -> PermissionMode {
548 match scope {
549 Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
550 Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
551 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
552 for rule in &rules.rules {
553 if pattern_rule_matches(rule, operation, resource) {
554 mode = rule.mode.clone();
555 }
556 }
557 mode
558 }
559 None => PermissionMode::Deny,
560 }
561}
562
563fn fs_rule_matches(
564 rule: &crate::protocol::FsPermissionRule,
565 operation: &str,
566 resource: Option<&str>,
567) -> bool {
568 let operations_match = permission_operation_matches(&rule.operations, operation);
569 let paths_match = permission_resource_matches(&rule.paths, resource);
570 operations_match && paths_match
571}
572
573fn pattern_rule_matches(
574 rule: &PatternPermissionRule,
575 operation: &str,
576 resource: Option<&str>,
577) -> bool {
578 let operations_match = permission_operation_matches(&rule.operations, operation);
579 let patterns_match = permission_resource_matches(&rule.patterns, resource);
580 operations_match && patterns_match
581}
582
583fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
584 candidates
585 .iter()
586 .any(|candidate| candidate == "*" || candidate == operation)
587}
588
589fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
590 resource.is_some_and(|value| {
591 patterns
592 .iter()
593 .any(|pattern| permission_glob_matches(pattern, value))
594 })
595}
596
597pub(crate) fn validate_permissions_policy(
598 permissions: &PermissionsPolicy,
599) -> Result<(), SidecarError> {
600 if let Some(scope) = permissions.fs.as_ref() {
601 validate_fs_permission_scope("fs", scope)?;
602 }
603 if let Some(scope) = permissions.network.as_ref() {
604 validate_pattern_permission_scope("network", scope)?;
605 }
606 if let Some(scope) = permissions.child_process.as_ref() {
607 validate_pattern_permission_scope("child_process", scope)?;
608 }
609 if let Some(scope) = permissions.process.as_ref() {
610 validate_pattern_permission_scope("process", scope)?;
611 }
612 if let Some(scope) = permissions.env.as_ref() {
613 validate_pattern_permission_scope("env", scope)?;
614 }
615 if let Some(scope) = permissions.binding.as_ref() {
616 validate_pattern_permission_scope("binding", scope)?;
617 }
618 Ok(())
619}
620
621fn validate_fs_permission_scope(
622 domain: &str,
623 scope: &FsPermissionScope,
624) -> Result<(), SidecarError> {
625 let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
626 return Ok(());
627 };
628
629 for (index, rule) in rule_set.rules.iter().enumerate() {
630 validate_permission_rule_field(
631 &rule.operations,
632 &format!("{domain}.rules[{index}].operations"),
633 )?;
634 validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
635 }
636
637 Ok(())
638}
639
640fn validate_pattern_permission_scope(
641 domain: &str,
642 scope: &PatternPermissionScope,
643) -> Result<(), SidecarError> {
644 let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
645 return Ok(());
646 };
647
648 for (index, rule) in rule_set.rules.iter().enumerate() {
649 validate_permission_rule_field(
650 &rule.operations,
651 &format!("{domain}.rules[{index}].operations"),
652 )?;
653 validate_permission_rule_field(
654 &rule.patterns,
655 &format!("{domain}.rules[{index}].patterns"),
656 )?;
657 }
658
659 Ok(())
660}
661
662fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
663 if values.is_empty() {
664 return Err(SidecarError::InvalidState(format!(
665 "invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
666 )));
667 }
668 Ok(())
669}
670
671fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
672 capability
673 .strip_prefix(domain)
674 .and_then(|value| value.strip_prefix('.'))
675 .unwrap_or("")
676}
677
678fn permission_mode_to_kernel_decision(
679 mode: PermissionMode,
680 capability: &str,
681) -> PermissionDecision {
682 match mode {
683 PermissionMode::Allow => PermissionDecision::allow(),
684 PermissionMode::Ask => {
685 PermissionDecision::deny(format!("permission prompt required for {capability}"))
686 }
687 PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
688 }
689}
690
691pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
692 match access {
693 FilesystemAccess::Read => "fs.read",
694 FilesystemAccess::Write => "fs.write",
695 FilesystemAccess::Stat => "fs.stat",
696 FilesystemAccess::ReadDir => "fs.readdir",
697 FilesystemAccess::CreateDir => "fs.create_dir",
698 FilesystemAccess::Remove => "fs.rm",
699 FilesystemAccess::Rename => "fs.rename",
700 FilesystemAccess::Symlink => "fs.symlink",
701 FilesystemAccess::ReadLink => "fs.readlink",
702 FilesystemAccess::Chmod => "fs.chmod",
703 FilesystemAccess::Truncate => "fs.truncate",
704 }
705}
706
707fn network_permission_capability(operation: NetworkOperation) -> &'static str {
708 match operation {
709 NetworkOperation::Fetch => "network.fetch",
710 NetworkOperation::Http => "network.http",
711 NetworkOperation::Dns => "network.dns",
712 NetworkOperation::Listen => "network.listen",
713 }
714}
715
716fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
717 match operation {
718 EnvironmentOperation::Read => "env.read",
719 EnvironmentOperation::Write => "env.write",
720 }
721}
722
723fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
724 match request.command.as_str() {
725 "node" => request
726 .env
727 .keys()
728 .any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
729 "wasm" => request
730 .env
731 .keys()
732 .any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
733 "python" => request.env.keys().any(|key| {
734 INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
735 .iter()
736 .any(|prefix| key.starts_with(prefix))
737 }),
738 _ => false,
739 }
740}
741
742fn ownership_matches_process_event(
743 ownership: &OwnershipScope,
744 event: &ProcessEventEnvelope,
745) -> bool {
746 match ownership {
747 OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
748 OwnershipScope::SessionOwnership(inner) => {
749 inner.connection_id == event.connection_id && inner.session_id == event.session_id
750 }
751 OwnershipScope::VmOwnership(inner) => {
752 inner.connection_id == event.connection_id
753 && inner.session_id == event.session_id
754 && inner.vm_id == event.vm_id
755 }
756 }
757}
758
759fn public_process_event_matches_ownership<B>(
760 sidecar: &NativeSidecar<B>,
761 ownership: &OwnershipScope,
762 event: &ProcessEventEnvelope,
763) -> bool
764where
765 B: NativeSidecarBridge + Send + 'static,
766 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
767{
768 if !ownership_matches_process_event(ownership, event) {
769 return false;
770 }
771
772 if event.process_id.contains('/') {
773 return false;
774 }
775
776 let _ = sidecar;
779 true
780}
781
782fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
783 let mut context = Context::from_waker(Waker::noop());
784 match future.poll(&mut context) {
785 Poll::Ready(output) => Some(output),
786 Poll::Pending => None,
787 }
788}
789
790impl JavascriptSocketPathContext {
795 pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
796 self.loopback_exempt_ports.contains(&port)
797 || self
798 .tcp_loopback_guest_to_host_ports
799 .keys()
800 .any(|(_, guest_port)| *guest_port == port)
801 }
802
803 pub(crate) fn translate_tcp_loopback_port(
804 &self,
805 family: JavascriptSocketFamily,
806 port: u16,
807 ) -> Option<u16> {
808 self.tcp_loopback_guest_to_host_ports
809 .get(&(family, port))
810 .copied()
811 }
812
813 pub(crate) fn http_loopback_target(
814 &self,
815 family: JavascriptSocketFamily,
816 port: u16,
817 ) -> Option<&crate::state::JavascriptHttpLoopbackTarget> {
818 self.http_loopback_targets.get(&(family, port))
819 }
820
821 pub(crate) fn translate_udp_loopback_port(
822 &self,
823 family: JavascriptSocketFamily,
824 port: u16,
825 ) -> Option<u16> {
826 self.udp_loopback_guest_to_host_ports
827 .get(&(family, port))
828 .copied()
829 }
830
831 pub(crate) fn guest_udp_port_for_host_port(
832 &self,
833 family: JavascriptSocketFamily,
834 port: u16,
835 ) -> Option<u16> {
836 self.udp_loopback_host_to_guest_ports
837 .get(&(family, port))
838 .copied()
839 }
840}
841
842pub struct NativeSidecar<B> {
845 pub(crate) config: NativeSidecarConfig,
846 pub(crate) bridge: SharedBridge<B>,
847 pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
848 pub(crate) cache_root: PathBuf,
849 pub(crate) javascript_engine: JavascriptExecutionEngine,
850 pub(crate) python_engine: PythonExecutionEngine,
851 pub(crate) wasm_engine: WasmExecutionEngine,
852 pub(crate) next_connection_id: usize,
853 pub(crate) next_session_id: usize,
854 pub(crate) next_vm_id: usize,
855 pub(crate) next_sidecar_request_id: RequestId,
856 pub(crate) connections: BTreeMap<String, ConnectionState>,
857 pub(crate) sessions: BTreeMap<String, SessionState>,
858 pub(crate) vms: BTreeMap<String, VmState>,
859 #[allow(dead_code)]
860 pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
861 pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
862 pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
863 pub(crate) pending_sidecar_responses: SidecarResponseTracker,
864 pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
865 pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
866 pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
867 pub(crate) sidecar_requests: SharedSidecarRequestClient,
868 pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
869 pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
870 pub(crate) extension_process_output_buffers:
871 BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
872}
873
874#[derive(Debug)]
875pub(crate) struct ExtensionSessionResources {
876 pub(crate) ownership: OwnershipScope,
877 pub(crate) process_ids: BTreeSet<String>,
878 pub(crate) vm_ids: BTreeSet<String>,
879}
880
881impl<B> fmt::Debug for NativeSidecar<B> {
882 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883 f.debug_struct("NativeSidecar")
884 .field("config", &self.config)
885 .field("cache_root", &self.cache_root)
886 .field("next_connection_id", &self.next_connection_id)
887 .field("next_session_id", &self.next_session_id)
888 .field("next_vm_id", &self.next_vm_id)
889 .field("connection_count", &self.connections.len())
890 .field("session_count", &self.sessions.len())
891 .field("vm_count", &self.vms.len())
892 .field("extension_session_count", &self.extension_sessions.len())
893 .field(
894 "extension_process_output_buffer_count",
895 &self.extension_process_output_buffers.len(),
896 )
897 .finish()
898 }
899}
900
901impl<B> NativeSidecar<B>
902where
903 B: NativeSidecarBridge + Send + 'static,
904 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
905{
906 pub fn new(bridge: B) -> Result<Self, SidecarError> {
907 Self::with_config(bridge, NativeSidecarConfig::default())
908 }
909
910 pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
911 if matches!(config.expected_auth_token.as_deref(), Some("")) {
912 return Err(SidecarError::InvalidState(String::from(
913 "native sidecar expected_auth_token must not be empty",
914 )));
915 }
916
917 let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
918 std::env::temp_dir().join(format!(
919 "{}-{}",
920 config.sidecar_id,
921 SystemTime::now()
922 .duration_since(UNIX_EPOCH)
923 .expect("system time before unix epoch")
924 .as_nanos()
925 ))
926 });
927 fs::create_dir_all(&cache_root).map_err(|error| {
928 SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
929 })?;
930
931 let bridge = SharedBridge::new(bridge);
932 let mount_plugins = build_mount_plugin_registry::<B>()?;
933 let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
934
935 Ok(Self {
936 config,
937 bridge,
938 mount_plugins,
939 cache_root,
940 javascript_engine: JavascriptExecutionEngine::default(),
941 python_engine: PythonExecutionEngine::default(),
942 wasm_engine: WasmExecutionEngine::default(),
943 next_connection_id: 0,
944 next_session_id: 0,
945 next_vm_id: 0,
946 next_sidecar_request_id: -1,
947 connections: BTreeMap::new(),
948 sessions: BTreeMap::new(),
949 vms: BTreeMap::new(),
950 process_event_sender,
951 process_event_receiver: Some(process_event_receiver),
952 pending_process_events: VecDeque::new(),
953 pending_sidecar_responses: SidecarResponseTracker::default(),
954 outbound_sidecar_requests: VecDeque::new(),
955 completed_sidecar_responses: BTreeMap::new(),
956 completed_sidecar_response_order: VecDeque::new(),
957 sidecar_requests: SharedSidecarRequestClient::default(),
958 extensions: BTreeMap::new(),
959 extension_sessions: BTreeMap::new(),
960 extension_process_output_buffers: BTreeMap::new(),
961 })
962 }
963
964 pub fn with_config_and_extensions(
965 bridge: B,
966 config: NativeSidecarConfig,
967 extensions: Vec<Box<dyn Extension>>,
968 ) -> Result<Self, SidecarError> {
969 let mut sidecar = Self::with_config(bridge, config)?;
970 for extension in extensions {
971 sidecar.register_extension(extension)?;
972 }
973 Ok(sidecar)
974 }
975
976 pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
977 self.extension_sessions.retain(|_, resources| {
978 resources.process_ids.remove(process_id);
979 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
980 });
981 }
982
983 pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
984 self.extension_sessions.retain(|_, resources| {
985 if matches!(
986 &resources.ownership,
987 OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
988 ) {
989 resources.process_ids.clear();
990 }
991 resources.vm_ids.remove(vm_id);
992 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
993 });
994 }
995
996 pub(crate) fn capture_extension_process_output_event(
997 &mut self,
998 vm_id: &str,
999 process_id: &str,
1000 event: &ActiveExecutionEvent,
1001 ) -> bool {
1002 let Some(buffer) = self
1003 .extension_process_output_buffers
1004 .get_mut(&(vm_id.to_string(), process_id.to_string()))
1005 else {
1006 return false;
1007 };
1008 match event {
1009 ActiveExecutionEvent::Stdout(chunk) => {
1010 buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1011 true
1012 }
1013 ActiveExecutionEvent::Stderr(chunk) => {
1014 buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1015 true
1016 }
1017 ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
1018 | ActiveExecutionEvent::PythonVfsRpcRequest(_)
1019 | ActiveExecutionEvent::SignalState { .. }
1020 | ActiveExecutionEvent::Exited(_) => false,
1021 }
1022 }
1023
1024 fn bind_extension_process_resource(
1025 &mut self,
1026 ownership: OwnershipScope,
1027 namespace: String,
1028 ext_session_id: String,
1029 process_id: String,
1030 ) -> Result<(), SidecarError> {
1031 if ext_session_id.is_empty() {
1032 return Err(SidecarError::InvalidState(String::from(
1033 "extension session id must not be empty",
1034 )));
1035 }
1036 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1037 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1038 let process_exists = self
1039 .vms
1040 .get(&vm_id)
1041 .is_some_and(|vm| vm.active_processes.contains_key(&process_id));
1042 if !process_exists {
1043 return Err(SidecarError::InvalidState(format!(
1044 "VM {vm_id} has no active process {process_id}"
1045 )));
1046 }
1047
1048 let key = (namespace, ext_session_id);
1049 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1050 if resources.ownership != ownership {
1051 return Err(SidecarError::InvalidState(String::from(
1052 "extension session ownership did not match existing resources",
1053 )));
1054 }
1055 resources.process_ids.insert(process_id);
1056 } else {
1057 self.extension_sessions.insert(
1058 key,
1059 ExtensionSessionResources {
1060 ownership,
1061 process_ids: BTreeSet::from([process_id]),
1062 vm_ids: BTreeSet::new(),
1063 },
1064 );
1065 }
1066 Ok(())
1067 }
1068
1069 fn bind_extension_vm_resource(
1070 &mut self,
1071 ownership: OwnershipScope,
1072 namespace: String,
1073 ext_session_id: String,
1074 ) -> Result<(), SidecarError> {
1075 if ext_session_id.is_empty() {
1076 return Err(SidecarError::InvalidState(String::from(
1077 "extension session id must not be empty",
1078 )));
1079 }
1080 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1081 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1082
1083 let key = (namespace, ext_session_id);
1084 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1085 if resources.ownership != ownership {
1086 return Err(SidecarError::InvalidState(String::from(
1087 "extension session ownership did not match existing resources",
1088 )));
1089 }
1090 resources.vm_ids.insert(vm_id);
1091 } else {
1092 self.extension_sessions.insert(
1093 key,
1094 ExtensionSessionResources {
1095 ownership,
1096 process_ids: BTreeSet::new(),
1097 vm_ids: BTreeSet::from([vm_id]),
1098 },
1099 );
1100 }
1101 Ok(())
1102 }
1103
1104 pub fn sidecar_id(&self) -> &str {
1105 &self.config.sidecar_id
1106 }
1107
1108 pub fn with_bridge_mut<T>(
1109 &self,
1110 operation: impl FnOnce(&mut B) -> T,
1111 ) -> Result<T, SidecarError> {
1112 self.bridge.inspect(operation)
1113 }
1114
1115 pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
1116 self.sidecar_requests.set_transport(transport);
1117 }
1118
1119 pub fn register_extension(
1120 &mut self,
1121 extension: Box<dyn Extension>,
1122 ) -> Result<(), SidecarError> {
1123 let namespace = extension.namespace().to_owned();
1124 if namespace.is_empty() {
1125 return Err(SidecarError::InvalidState(String::from(
1126 "extension namespace must not be empty",
1127 )));
1128 }
1129 if self.extensions.contains_key(&namespace) {
1130 return Err(SidecarError::Conflict(format!(
1131 "extension namespace {namespace} is already registered",
1132 )));
1133 }
1134 self.extensions.insert(namespace, Arc::from(extension));
1135 Ok(())
1136 }
1137
1138 pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
1139 where
1140 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1141 + Send
1142 + Sync
1143 + 'static,
1144 {
1145 struct HandlerTransport<F>(F);
1146
1147 impl<F> SidecarRequestTransport for HandlerTransport<F>
1148 where
1149 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1150 + Send
1151 + Sync
1152 + 'static,
1153 {
1154 fn send_request(
1155 &self,
1156 request: SidecarRequestFrame,
1157 _timeout: Duration,
1158 ) -> Result<SidecarResponseFrame, SidecarError> {
1159 let payload = (self.0)(request.clone())?;
1160 Ok(SidecarResponseFrame::new(
1161 request.request_id,
1162 request.ownership,
1163 payload,
1164 ))
1165 }
1166 }
1167
1168 self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
1169 }
1170
1171 pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
1172 where
1173 F: Fn(
1174 crate::wire::SidecarRequestFrame,
1175 ) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
1176 + Send
1177 + Sync
1178 + 'static,
1179 {
1180 self.set_sidecar_request_handler(move |request| {
1181 let request = crate::wire::sidecar_request_frame_from_compat(request)
1182 .map_err(wire_protocol_error)?;
1183 let response = handler(request)?;
1184 let response = crate::wire::sidecar_response_frame_to_compat(response)
1185 .map_err(wire_protocol_error)?;
1186 Ok(response.payload)
1187 });
1188 }
1189
1190 pub(crate) fn queue_pending_process_event(
1191 &mut self,
1192 envelope: ProcessEventEnvelope,
1193 ) -> Result<(), SidecarError> {
1194 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1195 return Err(process_event_queue_overflow_error());
1196 }
1197 self.pending_process_events.push_back(envelope);
1198 Ok(())
1199 }
1200
1201 pub(crate) fn queue_front_pending_process_event(
1202 &mut self,
1203 envelope: ProcessEventEnvelope,
1204 ) -> Result<(), SidecarError> {
1205 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1206 return Err(process_event_queue_overflow_error());
1207 }
1208 self.pending_process_events.push_front(envelope);
1209 Ok(())
1210 }
1211
1212 pub(crate) fn pending_process_event_capacity(&self) -> usize {
1213 MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
1214 }
1215
1216 pub fn dispatch_blocking(
1217 &mut self,
1218 request: RequestFrame,
1219 ) -> Result<DispatchResult, SidecarError> {
1220 let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
1221 if matches!(
1222 request.payload,
1223 RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
1224 ) && !inside_runtime
1225 {
1226 return tokio::runtime::Builder::new_current_thread()
1227 .enable_all()
1228 .build()
1229 .expect("sidecar dispatch runtime")
1230 .block_on(self.dispatch(request));
1231 }
1232
1233 let mut future = std::pin::pin!(self.dispatch(request));
1234 match poll_future_once(future.as_mut()) {
1235 Some(result) => result,
1236 None if inside_runtime => Err(SidecarError::InvalidState(String::from(
1237 "dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
1238 ))),
1239 None => tokio::runtime::Builder::new_current_thread()
1240 .enable_all()
1241 .build()
1242 .expect("sidecar dispatch runtime")
1243 .block_on(future),
1244 }
1245 }
1246
1247 pub fn dispatch_wire_blocking(
1248 &mut self,
1249 request: crate::wire::RequestFrame,
1250 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1251 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1252 let result = self.dispatch_blocking(request)?;
1253 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1254 }
1255
1256 pub fn poll_event_blocking(
1257 &mut self,
1258 ownership: &OwnershipScope,
1259 timeout: Duration,
1260 ) -> Result<Option<EventFrame>, SidecarError> {
1261 tokio::runtime::Builder::new_current_thread()
1262 .enable_all()
1263 .build()
1264 .expect("sidecar poll runtime")
1265 .block_on(self.poll_event(ownership, timeout))
1266 }
1267
1268 pub fn poll_event_wire_blocking(
1269 &mut self,
1270 ownership: &crate::wire::OwnershipScope,
1271 timeout: Duration,
1272 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1273 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1274 self.poll_event_blocking(&ownership, timeout)?
1275 .map(crate::wire::event_frame_from_compat)
1276 .transpose()
1277 .map_err(wire_protocol_error)
1278 }
1279
1280 pub fn close_session_blocking(
1281 &mut self,
1282 connection_id: &str,
1283 session_id: &str,
1284 ) -> Result<Vec<EventFrame>, SidecarError> {
1285 tokio::runtime::Builder::new_current_thread()
1286 .enable_all()
1287 .build()
1288 .expect("sidecar close-session runtime")
1289 .block_on(self.close_session(connection_id, session_id))
1290 }
1291
1292 pub fn remove_connection_blocking(
1293 &mut self,
1294 connection_id: &str,
1295 ) -> Result<Vec<EventFrame>, SidecarError> {
1296 tokio::runtime::Builder::new_current_thread()
1297 .enable_all()
1298 .build()
1299 .expect("sidecar remove-connection runtime")
1300 .block_on(self.remove_connection(connection_id))
1301 }
1302
1303 pub fn dispose_vm_internal_blocking(
1304 &mut self,
1305 connection_id: &str,
1306 session_id: &str,
1307 vm_id: &str,
1308 reason: DisposeReason,
1309 ) -> Result<Vec<EventFrame>, SidecarError> {
1310 tokio::runtime::Builder::new_current_thread()
1311 .enable_all()
1312 .build()
1313 .expect("sidecar dispose-vm runtime")
1314 .block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
1315 }
1316
1317 pub async fn dispatch(
1318 &mut self,
1319 request: RequestFrame,
1320 ) -> Result<DispatchResult, SidecarError> {
1321 if let Err(error) = self.ensure_request_within_frame_limit(&request) {
1322 return Ok(DispatchResult {
1323 response: self.reject(&request, error_code(&error), &error.to_string()),
1324 events: Vec::new(),
1325 });
1326 }
1327
1328 let result = match request.payload.clone() {
1329 RequestPayload::Authenticate(payload) => {
1330 self.authenticate_connection(&request, payload).await
1331 }
1332 RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
1333 RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
1334 RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
1335 RequestPayload::BootstrapRootFilesystem(payload) => {
1336 self.bootstrap_root_filesystem(&request, payload.entries)
1337 .await
1338 }
1339 RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
1340 RequestPayload::RegisterHostCallbacks(payload) => {
1341 register_host_callbacks(self, &request, payload)
1342 }
1343 RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
1344 RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
1345 RequestPayload::ImportSnapshot(payload) => {
1346 self.import_snapshot(&request, payload).await
1347 }
1348 RequestPayload::ExportSnapshot(payload) => {
1349 self.export_snapshot(&request, payload).await
1350 }
1351 RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
1352 RequestPayload::GuestFilesystemCall(payload) => {
1353 self.guest_filesystem_call(&request, payload).await
1354 }
1355 RequestPayload::SnapshotRootFilesystem(payload) => {
1356 self.snapshot_root_filesystem(&request, payload).await
1357 }
1358 RequestPayload::Execute(payload) => self.execute(&request, payload).await,
1359 RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
1360 RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
1361 RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
1362 RequestPayload::GetProcessSnapshot(payload) => {
1363 self.get_process_snapshot(&request, payload).await
1364 }
1365 RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
1366 RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
1367 RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
1368 RequestPayload::GetSignalState(payload) => {
1369 self.get_signal_state(&request, payload).await
1370 }
1371 RequestPayload::GetZombieTimerCount(payload) => {
1372 self.get_zombie_timer_count(&request, payload).await
1373 }
1374 RequestPayload::HostFilesystemCall(_)
1375 | RequestPayload::PersistenceLoad(_)
1376 | RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
1377 response: self.reject(
1378 &request,
1379 "unsupported_direction",
1380 "host callback request categories are sidecar-to-host only in this scaffold",
1381 ),
1382 events: Vec::new(),
1383 }),
1384 RequestPayload::Ext(payload) => {
1385 self.dispatch_extension_request(&request, payload).await
1386 }
1387 };
1388
1389 match result {
1390 Ok(dispatch) => Ok(dispatch),
1391 Err(error @ SidecarError::Io(_)) => Err(error),
1392 Err(error) => Ok(DispatchResult {
1393 response: self.reject(&request, error_code(&error), &error.to_string()),
1394 events: Vec::new(),
1395 }),
1396 }
1397 }
1398
1399 pub async fn dispatch_wire(
1400 &mut self,
1401 request: crate::wire::RequestFrame,
1402 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1403 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1404 let result = self.dispatch(request).await?;
1405 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1406 }
1407
1408 pub async fn poll_event_wire(
1409 &mut self,
1410 ownership: &crate::wire::OwnershipScope,
1411 timeout: Duration,
1412 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1413 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1414 self.poll_event(&ownership, timeout)
1415 .await?
1416 .map(crate::wire::event_frame_from_compat)
1417 .transpose()
1418 .map_err(wire_protocol_error)
1419 }
1420
1421 async fn dispatch_extension_request(
1422 &mut self,
1423 request: &RequestFrame,
1424 envelope: ExtEnvelope,
1425 ) -> Result<DispatchResult, SidecarError> {
1426 let namespace = envelope.namespace;
1427 let Some(extension) = self.extensions.get(&namespace).cloned() else {
1428 return Ok(DispatchResult {
1429 response: self.reject(
1430 request,
1431 "unknown_extension",
1432 &format!("no extension registered for namespace {namespace}"),
1433 ),
1434 events: Vec::new(),
1435 });
1436 };
1437 let snapshot = ExtensionSnapshot::new(
1438 namespace.clone(),
1439 request.ownership.clone(),
1440 self.sidecar_requests.clone(),
1441 );
1442 let ctx = ExtensionContext::new(snapshot, self);
1443 let response = extension.handle_request(ctx, envelope.payload).await?;
1444 Ok(DispatchResult {
1445 response: self.respond(
1446 request,
1447 ResponsePayload::ExtResult(ExtEnvelope {
1448 namespace,
1449 payload: response.payload,
1450 }),
1451 ),
1452 events: response.events,
1453 })
1454 }
1455
1456 pub async fn poll_event(
1457 &mut self,
1458 ownership: &OwnershipScope,
1459 timeout: Duration,
1460 ) -> Result<Option<EventFrame>, SidecarError> {
1461 let deadline = Instant::now() + timeout;
1462 loop {
1463 if let Some(index) = self
1464 .pending_process_events
1465 .iter()
1466 .position(|event| public_process_event_matches_ownership(self, ownership, event))
1467 {
1468 let Some(envelope) = self.pending_process_events.remove(index) else {
1469 continue;
1470 };
1471 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1472 return Ok(Some(frame));
1473 }
1474 continue;
1475 }
1476
1477 if !timeout.is_zero() {
1478 let _ = self.pump_process_events(ownership).await?;
1479 }
1480
1481 let queued_envelopes = {
1482 let pending_capacity = self.pending_process_event_capacity();
1483 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
1484 SidecarError::InvalidState(String::from("process event receiver unavailable"))
1485 })?;
1486 let mut queued = Vec::new();
1487 loop {
1488 if queued.len() >= pending_capacity {
1489 if receiver.is_empty() {
1490 break;
1491 }
1492 return Err(process_event_queue_overflow_error());
1493 }
1494 match receiver.try_recv() {
1495 Ok(envelope) => queued.push(envelope),
1496 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1497 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
1498 }
1499 }
1500 queued
1501 };
1502
1503 let mut matching_envelope = None;
1504 for envelope in queued_envelopes {
1505 if matching_envelope.is_none()
1506 && public_process_event_matches_ownership(self, ownership, &envelope)
1507 {
1508 matching_envelope = Some(envelope);
1509 } else {
1510 self.queue_pending_process_event(envelope)?;
1511 }
1512 }
1513
1514 if let Some(envelope) = matching_envelope {
1515 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1516 return Ok(Some(frame));
1517 }
1518 continue;
1519 }
1520
1521 if Instant::now() >= deadline {
1522 return Ok(None);
1523 }
1524
1525 let remaining = deadline.saturating_duration_since(Instant::now());
1526 time::sleep(remaining.min(Duration::from_millis(10))).await;
1527 }
1528 }
1529
1530 pub(crate) fn handle_process_event_envelope(
1531 &mut self,
1532 envelope: ProcessEventEnvelope,
1533 ) -> Result<Option<EventFrame>, SidecarError> {
1534 let ProcessEventEnvelope {
1535 connection_id,
1536 session_id,
1537 vm_id,
1538 process_id,
1539 event,
1540 } = envelope;
1541
1542 if matches!(event, ActiveExecutionEvent::Exited(_)) {
1543 let mut trailing = Vec::new();
1544 let mut deferred = VecDeque::new();
1545 while let Some(pending) = self.pending_process_events.pop_front() {
1546 if pending.vm_id == vm_id
1547 && pending.process_id == process_id
1548 && !matches!(pending.event, ActiveExecutionEvent::Exited(_))
1549 {
1550 trailing.push(pending.event);
1551 } else {
1552 deferred.push_back(pending);
1553 }
1554 }
1555 self.pending_process_events = deferred;
1556 let drain_limit = self
1557 .pending_process_event_capacity()
1558 .saturating_sub(trailing.len().saturating_add(1));
1559 trailing.extend(
1560 self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
1561 .into_iter()
1562 .filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
1563 );
1564
1565 if !trailing.is_empty() {
1566 if self.pending_process_event_capacity() < trailing.len() {
1567 return Err(process_event_queue_overflow_error());
1568 }
1569 let emit_now = if self.pending_process_event_capacity() == trailing.len() {
1570 Some(trailing.remove(0))
1571 } else {
1572 None
1573 };
1574 self.queue_front_pending_process_event(ProcessEventEnvelope {
1575 connection_id: connection_id.clone(),
1576 session_id: session_id.clone(),
1577 vm_id: vm_id.clone(),
1578 process_id: process_id.clone(),
1579 event,
1580 })?;
1581 for event in trailing.into_iter().rev() {
1582 self.queue_front_pending_process_event(ProcessEventEnvelope {
1583 connection_id: connection_id.clone(),
1584 session_id: session_id.clone(),
1585 vm_id: vm_id.clone(),
1586 process_id: process_id.clone(),
1587 event,
1588 })?;
1589 }
1590 if let Some(event) = emit_now {
1591 return self.handle_execution_event(&vm_id, &process_id, event);
1592 }
1593 return Ok(None);
1594 }
1595 }
1596
1597 self.handle_execution_event(&vm_id, &process_id, event)
1598 }
1599
1600 pub async fn close_session(
1603 &mut self,
1604 connection_id: &str,
1605 session_id: &str,
1606 ) -> Result<Vec<EventFrame>, SidecarError> {
1607 self.dispose_session(connection_id, session_id, DisposeReason::Requested)
1608 .await
1609 }
1610
1611 pub async fn remove_connection(
1612 &mut self,
1613 connection_id: &str,
1614 ) -> Result<Vec<EventFrame>, SidecarError> {
1615 self.require_authenticated_connection(connection_id)?;
1616
1617 let session_ids = self
1618 .connections
1619 .get(connection_id)
1620 .expect("authenticated connection should exist")
1621 .sessions
1622 .iter()
1623 .cloned()
1624 .collect::<Vec<_>>();
1625
1626 let mut events = Vec::new();
1627 for session_id in session_ids {
1628 events.extend(
1629 self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
1630 .await?,
1631 );
1632 }
1633
1634 self.connections.remove(connection_id);
1635 Ok(events)
1636 }
1637
1638 async fn authenticate_connection(
1639 &mut self,
1640 request: &RequestFrame,
1641 payload: crate::protocol::AuthenticateRequest,
1642 ) -> Result<DispatchResult, SidecarError> {
1643 let _ = self.connection_id_for(&request.ownership)?;
1644 if let Err(error) = self.validate_auth_token(&payload.auth_token) {
1645 let mut fields = audit_fields([
1646 (String::from("source"), payload.client_name.clone()),
1647 (String::from("reason"), error.to_string()),
1648 ]);
1649 if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
1650 fields.insert(String::from("connection_id"), inner.connection_id.clone());
1651 }
1652 emit_security_audit_event(
1653 &self.bridge,
1654 &self.config.sidecar_id,
1655 "security.auth.failed",
1656 fields,
1657 );
1658 return Err(error);
1659 }
1660
1661 if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
1662 return Err(SidecarError::ProtocolVersionMismatch(format!(
1663 "sidecar protocol version mismatch: expected {}, got {}",
1664 crate::wire::PROTOCOL_VERSION,
1665 payload.protocol_version
1666 )));
1667 }
1668
1669 let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
1670 if payload.bridge_version != expected_bridge_version {
1671 return Err(SidecarError::BridgeVersionMismatch(format!(
1672 "bridge contract version mismatch: expected {expected_bridge_version}, got {}",
1673 payload.bridge_version
1674 )));
1675 }
1676
1677 let connection_id = self.allocate_connection_id();
1678 self.connections.insert(
1679 connection_id.clone(),
1680 ConnectionState {
1681 auth_token: payload.auth_token,
1682 sessions: BTreeSet::new(),
1683 },
1684 );
1685
1686 let response = self.response_with_ownership(
1687 request.request_id,
1688 OwnershipScope::connection(&connection_id),
1689 ResponsePayload::Authenticated(AuthenticatedResponse {
1690 sidecar_id: self.config.sidecar_id.clone(),
1691 connection_id,
1692 max_frame_bytes: self.config.max_frame_bytes as u32,
1693 }),
1694 );
1695 Ok(DispatchResult {
1696 response,
1697 events: Vec::new(),
1698 })
1699 }
1700
1701 async fn open_session(
1702 &mut self,
1703 request: &RequestFrame,
1704 payload: OpenSessionRequest,
1705 ) -> Result<DispatchResult, SidecarError> {
1706 let connection_id = self.connection_id_for(&request.ownership)?;
1707 self.require_authenticated_connection(&connection_id)?;
1708
1709 self.next_session_id += 1;
1710 let session_id = format!("session-{}", self.next_session_id);
1711 self.sessions.insert(
1712 session_id.clone(),
1713 SessionState {
1714 connection_id: connection_id.clone(),
1715 placement: payload.placement,
1716 metadata: payload.metadata.into_iter().collect(),
1717 vm_ids: BTreeSet::new(),
1718 },
1719 );
1720 self.connections
1721 .get_mut(&connection_id)
1722 .expect("authenticated connection should exist")
1723 .sessions
1724 .insert(session_id.clone());
1725
1726 Ok(DispatchResult {
1727 response: self.respond(
1728 request,
1729 ResponsePayload::SessionOpened(SessionOpenedResponse {
1730 session_id,
1731 owner_connection_id: connection_id,
1732 }),
1733 ),
1734 events: Vec::new(),
1735 })
1736 }
1737
1738 async fn guest_filesystem_call(
1741 &mut self,
1742 request: &RequestFrame,
1743 payload: GuestFilesystemCallRequest,
1744 ) -> Result<DispatchResult, SidecarError> {
1745 filesystem_guest_filesystem_call(self, request, payload).await
1746 }
1747
1748 async fn dispose_session(
1754 &mut self,
1755 connection_id: &str,
1756 session_id: &str,
1757 reason: DisposeReason,
1758 ) -> Result<Vec<EventFrame>, SidecarError> {
1759 self.require_owned_session(connection_id, session_id)?;
1760
1761 let vm_ids = self
1762 .sessions
1763 .get(session_id)
1764 .expect("owned session should exist")
1765 .vm_ids
1766 .iter()
1767 .cloned()
1768 .collect::<Vec<_>>();
1769
1770 let mut events = Vec::new();
1771 for vm_id in vm_ids {
1772 events.extend(
1773 self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
1774 .await?,
1775 );
1776 }
1777
1778 self.sessions.remove(session_id);
1779 if let Some(connection) = self.connections.get_mut(connection_id) {
1780 connection.sessions.remove(session_id);
1781 }
1782 Ok(events)
1783 }
1784
1785 pub(crate) fn handle_javascript_sync_rpc_request(
1793 &mut self,
1794 vm_id: &str,
1795 process_id: &str,
1796 request: JavascriptSyncRpcRequest,
1797 ) -> Result<(), SidecarError> {
1798 let Some(vm) = self.vms.get(vm_id) else {
1799 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1800 return Ok(());
1801 };
1802 if !vm.active_processes.contains_key(process_id) {
1803 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1804 return Ok(());
1805 }
1806
1807 let response: Result<Value, SidecarError> = match request.method.as_str() {
1808 "child_process.spawn" => {
1809 let Some(vm) = self.vms.get(vm_id) else {
1810 log_stale_process_event(
1811 &self.bridge,
1812 vm_id,
1813 process_id,
1814 "javascript sync RPC child_process.spawn",
1815 );
1816 return Ok(());
1817 };
1818 let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
1819 self.spawn_javascript_child_process(vm_id, process_id, payload)
1820 }
1821 "child_process.spawn_sync" => {
1822 let Some(vm) = self.vms.get(vm_id) else {
1823 log_stale_process_event(
1824 &self.bridge,
1825 vm_id,
1826 process_id,
1827 "javascript sync RPC child_process.spawn_sync",
1828 );
1829 return Ok(());
1830 };
1831 let (payload, max_buffer) =
1832 parse_javascript_child_process_spawn_request(vm, &request.args)?;
1833 self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
1834 }
1835 "child_process.poll" => {
1836 let child_process_id =
1837 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
1838 let wait_ms = javascript_sync_rpc_arg_u64_optional(
1839 &request.args,
1840 1,
1841 "child_process.poll wait ms",
1842 )?
1843 .unwrap_or_default();
1844 self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
1845 }
1846 "child_process.write_stdin" => {
1847 let child_process_id = javascript_sync_rpc_arg_str(
1848 &request.args,
1849 0,
1850 "child_process.write_stdin child id",
1851 )?;
1852 let chunk = javascript_sync_rpc_bytes_arg(
1853 &request.args,
1854 1,
1855 "child_process.write_stdin chunk",
1856 )?;
1857 self.write_javascript_child_process_stdin(
1858 vm_id,
1859 process_id,
1860 child_process_id,
1861 &chunk,
1862 )?;
1863 Ok(Value::Null)
1864 }
1865 "child_process.close_stdin" => {
1866 let child_process_id = javascript_sync_rpc_arg_str(
1867 &request.args,
1868 0,
1869 "child_process.close_stdin child id",
1870 )?;
1871 self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
1872 Ok(Value::Null)
1873 }
1874 "child_process.kill" => {
1875 let child_process_id =
1876 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
1877 let signal =
1878 javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
1879 self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
1880 Ok(Value::Null)
1881 }
1882 "process.kill" => {
1883 let target_pid =
1884 javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
1885 let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
1886 let parsed_signal = parse_signal(signal)?;
1887 if parsed_signal == 0 {
1888 let Some(vm) = self.vms.get(vm_id) else {
1889 log_stale_process_event(
1890 &self.bridge,
1891 vm_id,
1892 process_id,
1893 "javascript sync RPC process.kill",
1894 );
1895 return Ok(());
1896 };
1897 if !vm.active_processes.contains_key(process_id) {
1898 log_stale_process_event(
1899 &self.bridge,
1900 vm_id,
1901 process_id,
1902 "javascript sync RPC process.kill",
1903 );
1904 return Ok(());
1905 }
1906 vm.kernel
1907 .signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
1908 .map(|()| Value::Null)
1909 .map_err(kernel_error)
1910 } else if target_pid < 0 {
1911 let caller_kernel_pid = {
1912 let Some(vm) = self.vms.get(vm_id) else {
1913 log_stale_process_event(
1914 &self.bridge,
1915 vm_id,
1916 process_id,
1917 "javascript sync RPC process.kill",
1918 );
1919 return Ok(());
1920 };
1921 let Some(caller) = vm.active_processes.get(process_id) else {
1922 log_stale_process_event(
1923 &self.bridge,
1924 vm_id,
1925 process_id,
1926 "javascript sync RPC process.kill",
1927 );
1928 return Ok(());
1929 };
1930 caller.kernel_pid
1931 };
1932 let pgid = target_pid.unsigned_abs();
1933 match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
1934 Ok(true) => {
1935 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1936 }
1937 Ok(false) => Ok(Value::Null),
1938 Err(error) => Err(error),
1939 }
1940 } else {
1941 enum ProcessKillTarget {
1942 SelfProcess,
1943 Child(String),
1944 TopLevel(String),
1945 KernelPid(u32),
1946 }
1947 let target = {
1948 let Some(vm) = self.vms.get(vm_id) else {
1949 log_stale_process_event(
1950 &self.bridge,
1951 vm_id,
1952 process_id,
1953 "javascript sync RPC process.kill",
1954 );
1955 return Ok(());
1956 };
1957 let Some(caller) = vm.active_processes.get(process_id) else {
1958 log_stale_process_event(
1959 &self.bridge,
1960 vm_id,
1961 process_id,
1962 "javascript sync RPC process.kill",
1963 );
1964 return Ok(());
1965 };
1966 let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
1967 SidecarError::InvalidState("caller pid exceeds i32".into())
1968 })?;
1969 if caller_pid == target_pid {
1970 ProcessKillTarget::SelfProcess
1971 } else if let Some((child_process_id, _)) = caller
1972 .child_processes
1973 .iter()
1974 .find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
1975 {
1976 ProcessKillTarget::Child(child_process_id.clone())
1977 } else if let Some((target_process_id, _)) =
1978 vm.active_processes.iter().find(|(_, process)| {
1979 i32::try_from(process.kernel_pid) == Ok(target_pid)
1980 })
1981 {
1982 ProcessKillTarget::TopLevel(target_process_id.clone())
1983 } else {
1984 let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
1985 SidecarError::InvalidState(format!(
1986 "EINVAL: invalid process pid {target_pid}"
1987 ))
1988 })?;
1989 ProcessKillTarget::KernelPid(target_kernel_pid)
1990 }
1991 };
1992 match target {
1993 ProcessKillTarget::SelfProcess => {
1994 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1995 }
1996 ProcessKillTarget::Child(child_process_id) => {
1997 self.kill_javascript_child_process(
1998 vm_id,
1999 process_id,
2000 &child_process_id,
2001 signal,
2002 )?;
2003 Ok(Value::Null)
2004 }
2005 ProcessKillTarget::TopLevel(target_process_id) => {
2006 self.kill_process_internal(vm_id, &target_process_id, signal)?;
2007 Ok(Value::Null)
2008 }
2009 ProcessKillTarget::KernelPid(target_kernel_pid) => {
2010 self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
2014 .map(|()| Value::Null)
2015 }
2016 }
2017 }
2018 }
2019 "process.signal_state" => {
2020 let signal =
2021 javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
2022 let action =
2023 javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
2024 let mask_json =
2025 javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
2026 let flags =
2027 javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
2028 let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
2029 SidecarError::InvalidState(format!(
2030 "process.signal_state mask must be valid JSON: {error}"
2031 ))
2032 })?;
2033 let action = match action.trim().to_ascii_lowercase().as_str() {
2034 "default" => SignalDispositionAction::Default,
2035 "ignore" => SignalDispositionAction::Ignore,
2036 "user" => SignalDispositionAction::User,
2037 other => {
2038 return Err(SidecarError::InvalidState(format!(
2039 "unsupported process.signal_state action {other}"
2040 )));
2041 }
2042 };
2043 let Some(vm) = self.vms.get_mut(vm_id) else {
2044 log_stale_process_event(
2045 &self.bridge,
2046 vm_id,
2047 process_id,
2048 "javascript sync RPC process.signal_state",
2049 );
2050 return Ok(());
2051 };
2052 if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
2053 let remove_process_entry = vm
2054 .signal_states
2055 .get_mut(process_id)
2056 .map(|handlers| {
2057 handlers.remove(&signal);
2058 handlers.is_empty()
2059 })
2060 .unwrap_or(false);
2061 if remove_process_entry {
2062 vm.signal_states.remove(process_id);
2063 }
2064 } else {
2065 vm.signal_states
2066 .entry(process_id.to_owned())
2067 .or_default()
2068 .insert(
2069 signal,
2070 SignalHandlerRegistration {
2071 action,
2072 mask,
2073 flags,
2074 },
2075 );
2076 }
2077 Ok(Value::Null)
2078 }
2079 "net.http_request" => {
2080 let payload = request
2081 .args
2082 .first()
2083 .cloned()
2084 .ok_or_else(|| {
2085 SidecarError::InvalidState(String::from(
2086 "net.http_request requires a request payload",
2087 ))
2088 })
2089 .and_then(|value| {
2090 serde_json::from_value::<JavascriptHttpLoopbackRequest>(value).map_err(
2091 |error| {
2092 SidecarError::InvalidState(format!(
2093 "invalid net.http_request payload: {error}"
2094 ))
2095 },
2096 )
2097 })?;
2098 if !is_javascript_loopback_host(&payload.host) {
2099 return Err(SidecarError::Execution(format!(
2100 "EACCES: HTTP loopback request requires a loopback host, got {}",
2101 payload.host
2102 )));
2103 }
2104 self.bridge.require_network_access(
2105 vm_id,
2106 NetworkOperation::Http,
2107 format_tcp_resource(&payload.host, payload.port),
2108 )?;
2109 let Some(vm) = self.vms.get_mut(vm_id) else {
2110 log_stale_process_event(
2111 &self.bridge,
2112 vm_id,
2113 process_id,
2114 "javascript sync RPC net.http_request",
2115 );
2116 return Ok(());
2117 };
2118 let resource_limits = vm.kernel.resource_limits().clone();
2119 let socket_paths = build_javascript_socket_path_context(vm)?;
2120 let target_is_current =
2121 [JavascriptSocketFamily::Ipv4, JavascriptSocketFamily::Ipv6]
2122 .iter()
2123 .any(|family| {
2124 socket_paths
2125 .http_loopback_target(*family, payload.port)
2126 .is_some_and(|target| {
2127 target.process_id == payload.process_id
2128 && target.server_id == payload.server_id
2129 })
2130 });
2131 if !target_is_current {
2132 return Err(SidecarError::InvalidState(format!(
2133 "unknown HTTP loopback target {}:{} for server {} in process {}",
2134 payload.host, payload.port, payload.server_id, payload.process_id
2135 )));
2136 }
2137 let Some(target_process) = vm.active_processes.get_mut(&payload.process_id) else {
2138 return Err(SidecarError::InvalidState(format!(
2139 "unknown HTTP loopback process {}",
2140 payload.process_id
2141 )));
2142 };
2143 dispatch_loopback_http_request(LoopbackHttpDispatchRequest {
2144 bridge: &self.bridge,
2145 vm_id,
2146 dns: &vm.dns,
2147 socket_paths: &socket_paths,
2148 kernel: &mut vm.kernel,
2149 process: target_process,
2150 resource_limits: &resource_limits,
2151 server_id: payload.server_id,
2152 request_json: &payload.request,
2153 })
2154 .map(Value::String)
2155 }
2156 _ => {
2157 let Some(vm) = self.vms.get_mut(vm_id) else {
2158 log_stale_process_event(
2159 &self.bridge,
2160 vm_id,
2161 process_id,
2162 "javascript sync RPC bridge dispatch",
2163 );
2164 return Ok(());
2165 };
2166 let resource_limits = vm.kernel.resource_limits().clone();
2167 let network_counts = vm_network_resource_counts(vm);
2168 let socket_paths = build_javascript_socket_path_context(vm)?;
2169 let Some(process) = vm.active_processes.get_mut(process_id) else {
2170 log_stale_process_event(
2171 &self.bridge,
2172 vm_id,
2173 process_id,
2174 "javascript sync RPC bridge dispatch",
2175 );
2176 return Ok(());
2177 };
2178 service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
2179 bridge: &self.bridge,
2180 vm_id,
2181 dns: &vm.dns,
2182 socket_paths: &socket_paths,
2183 kernel: &mut vm.kernel,
2184 process,
2185 sync_request: &request,
2186 resource_limits: &resource_limits,
2187 network_counts,
2188 })
2189 }
2190 };
2191
2192 let Some(vm) = self.vms.get_mut(vm_id) else {
2193 log_stale_process_event(
2194 &self.bridge,
2195 vm_id,
2196 process_id,
2197 "javascript sync RPC response delivery",
2198 );
2199 return Ok(());
2200 };
2201 let shadow_root = vm.cwd.clone();
2202 let Some(process) = vm.active_processes.get_mut(process_id) else {
2203 log_stale_process_event(
2204 &self.bridge,
2205 vm_id,
2206 process_id,
2207 "javascript sync RPC response delivery",
2208 );
2209 return Ok(());
2210 };
2211
2212 if response.is_ok()
2213 && matches!(
2214 request.method.as_str(),
2215 "fs.chmodSync" | "fs.promises.chmod"
2216 )
2217 {
2218 let guest_path =
2219 javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
2220 let mode =
2221 javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
2222 let host_path =
2223 shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
2224 if host_path.exists() {
2225 fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
2226 |error| {
2227 SidecarError::Io(format!(
2228 "failed to mirror chmod to shadow path {}: {error}",
2229 host_path.display()
2230 ))
2231 },
2232 )?;
2233 }
2234 }
2235
2236 match response {
2237 Ok(result) => process
2238 .execution
2239 .respond_javascript_sync_rpc_success(request.id, result)
2240 .or_else(ignore_stale_javascript_sync_rpc_response),
2241 Err(error) => process
2242 .execution
2243 .respond_javascript_sync_rpc_error(
2244 request.id,
2245 javascript_sync_rpc_error_code(&error),
2246 error.to_string(),
2247 )
2248 .or_else(ignore_stale_javascript_sync_rpc_response),
2249 }
2250 }
2251
2252 fn apply_self_process_kill(
2255 &mut self,
2256 vm_id: &str,
2257 process_id: &str,
2258 parsed_signal: i32,
2259 ) -> Value {
2260 let action = self
2261 .vms
2262 .get(vm_id)
2263 .and_then(|vm| vm.signal_states.get(process_id))
2264 .and_then(|handlers| handlers.get(&(parsed_signal as u32)))
2265 .map(|registration| registration.action.clone())
2266 .unwrap_or(SignalDispositionAction::Default);
2267 if action == SignalDispositionAction::Default
2268 && parsed_signal != 0
2269 && !matches!(
2270 canonical_signal_name(parsed_signal),
2271 Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
2272 )
2273 {
2274 if let Some(vm) = self.vms.get_mut(vm_id) {
2275 if let Some(process) = vm.active_processes.get_mut(process_id) {
2276 process.pending_self_signal_exit = Some(parsed_signal);
2277 }
2278 }
2279 }
2280 json!({
2281 "self": true,
2282 "action": match action {
2283 SignalDispositionAction::Default => "default",
2284 SignalDispositionAction::Ignore => "ignore",
2285 SignalDispositionAction::User => "user",
2286 },
2287 })
2288 }
2289
2290 pub(crate) fn vm_ids_for_scope(
2291 &self,
2292 ownership: &OwnershipScope,
2293 ) -> Result<Vec<String>, SidecarError> {
2294 match ownership {
2295 OwnershipScope::SessionOwnership(inner) => {
2296 self.require_owned_session(&inner.connection_id, &inner.session_id)?;
2297 Ok(self
2298 .sessions
2299 .get(&inner.session_id)
2300 .expect("owned session should exist")
2301 .vm_ids
2302 .iter()
2303 .cloned()
2304 .collect())
2305 }
2306 OwnershipScope::VmOwnership(inner) => {
2307 self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
2308 Ok(vec![inner.vm_id.clone()])
2309 }
2310 OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
2311 String::from("event polling requires session or VM ownership scope"),
2312 )),
2313 }
2314 }
2315
2316 pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
2317 let vm = self
2318 .vms
2319 .get(vm_id)
2320 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2321 Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
2322 }
2323
2324 pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
2325 self.vms
2326 .get(vm_id)
2327 .is_some_and(|vm| !vm.active_processes.is_empty())
2328 }
2329
2330 fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
2331 if self.connections.contains_key(connection_id) {
2332 Ok(())
2333 } else {
2334 Err(SidecarError::InvalidState(format!(
2335 "connection {connection_id} has not authenticated"
2336 )))
2337 }
2338 }
2339
2340 pub(crate) fn require_owned_session(
2341 &self,
2342 connection_id: &str,
2343 session_id: &str,
2344 ) -> Result<(), SidecarError> {
2345 self.require_authenticated_connection(connection_id)?;
2346 let session = self.sessions.get(session_id).ok_or_else(|| {
2347 SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
2348 })?;
2349 if session.connection_id == connection_id {
2350 Ok(())
2351 } else {
2352 Err(SidecarError::InvalidState(format!(
2353 "session {session_id} is not owned by connection {connection_id}"
2354 )))
2355 }
2356 }
2357
2358 pub(crate) fn require_owned_vm(
2359 &self,
2360 connection_id: &str,
2361 session_id: &str,
2362 vm_id: &str,
2363 ) -> Result<(), SidecarError> {
2364 self.require_owned_session(connection_id, session_id)?;
2365 let vm = self
2366 .vms
2367 .get(vm_id)
2368 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2369 if vm.connection_id != connection_id || vm.session_id != session_id {
2370 return Err(SidecarError::InvalidState(format!(
2371 "VM {vm_id} is not owned by {connection_id}/{session_id}"
2372 )));
2373 }
2374 Ok(())
2375 }
2376
2377 fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
2378 match ownership {
2379 OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
2380 OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2381 Err(SidecarError::InvalidState(String::from(
2382 "request requires connection ownership scope",
2383 )))
2384 }
2385 }
2386 }
2387
2388 fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
2389 let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
2390 return Ok(());
2391 };
2392
2393 if auth_token == expected_auth_token {
2394 Ok(())
2395 } else {
2396 Err(SidecarError::Unauthorized(String::from(
2397 "authenticate request provided an invalid auth token",
2398 )))
2399 }
2400 }
2401
2402 fn allocate_connection_id(&mut self) -> String {
2403 self.next_connection_id += 1;
2404 format!("conn-{}", self.next_connection_id)
2405 }
2406
2407 fn take_matching_process_event_envelope(
2408 &mut self,
2409 vm_id: &str,
2410 process_id: &str,
2411 ) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
2412 if let Some(index) = self
2413 .pending_process_events
2414 .iter()
2415 .position(|event| event.vm_id == vm_id && event.process_id == process_id)
2416 {
2417 return Ok(self.pending_process_events.remove(index));
2418 }
2419
2420 let mut matching_envelope = None;
2421 let mut deferred = Vec::new();
2422 {
2423 let pending_capacity = self.pending_process_event_capacity();
2424 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
2425 SidecarError::InvalidState(String::from("process event receiver unavailable"))
2426 })?;
2427 loop {
2428 if deferred.len() >= pending_capacity {
2429 if receiver.is_empty() {
2430 break;
2431 }
2432 return Err(process_event_queue_overflow_error());
2433 }
2434 let envelope = match receiver.try_recv() {
2435 Ok(envelope) => envelope,
2436 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
2437 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
2438 };
2439 if matching_envelope.is_none()
2440 && envelope.vm_id == vm_id
2441 && envelope.process_id == process_id
2442 {
2443 matching_envelope = Some(envelope);
2444 break;
2445 }
2446 deferred.push(envelope);
2447 }
2448 }
2449 for envelope in deferred {
2450 self.queue_pending_process_event(envelope)?;
2451 }
2452
2453 Ok(matching_envelope)
2454 }
2455
2456 fn allocate_sidecar_request_id(&mut self) -> RequestId {
2457 let request_id = self.next_sidecar_request_id;
2458 self.next_sidecar_request_id -= 1;
2459 request_id
2460 }
2461
2462 pub(crate) fn session_scope_for(
2463 &self,
2464 ownership: &OwnershipScope,
2465 ) -> Result<(String, String), SidecarError> {
2466 match ownership {
2467 OwnershipScope::SessionOwnership(inner) => {
2468 Ok((inner.connection_id.clone(), inner.session_id.clone()))
2469 }
2470 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2471 Err(SidecarError::InvalidState(String::from(
2472 "request requires session ownership scope",
2473 )))
2474 }
2475 }
2476 }
2477
2478 pub(crate) fn vm_scope_for(
2479 &self,
2480 ownership: &OwnershipScope,
2481 ) -> Result<(String, String, String), SidecarError> {
2482 match ownership {
2483 OwnershipScope::VmOwnership(inner) => Ok((
2484 inner.connection_id.clone(),
2485 inner.session_id.clone(),
2486 inner.vm_id.clone(),
2487 )),
2488 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
2489 SidecarError::InvalidState(String::from("request requires VM ownership scope")),
2490 ),
2491 }
2492 }
2493
2494 fn response_with_ownership(
2495 &self,
2496 request_id: RequestId,
2497 ownership: OwnershipScope,
2498 payload: ResponsePayload,
2499 ) -> ResponseFrame {
2500 ResponseFrame {
2501 schema: ProtocolSchema::current(),
2502 request_id,
2503 ownership,
2504 payload,
2505 }
2506 }
2507
2508 pub(crate) fn respond(
2509 &self,
2510 request: &RequestFrame,
2511 payload: ResponsePayload,
2512 ) -> ResponseFrame {
2513 self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
2514 }
2515
2516 fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
2517 self.respond(
2518 request,
2519 ResponsePayload::Rejected(RejectedResponse {
2520 code: code.to_owned(),
2521 message: message.to_owned(),
2522 }),
2523 )
2524 }
2525
2526 pub fn queue_sidecar_request(
2527 &mut self,
2528 ownership: OwnershipScope,
2529 payload: SidecarRequestPayload,
2530 ) -> Result<RequestId, SidecarError> {
2531 if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
2532 return Err(outbound_sidecar_request_queue_overflow_error());
2533 }
2534 if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
2535 return Err(sidecar_response_pending_overflow_error());
2536 }
2537 let request_id = self.allocate_sidecar_request_id();
2538 let request = SidecarRequestFrame::new(request_id, ownership, payload);
2539 self.pending_sidecar_responses
2540 .register_request(&request)
2541 .map_err(sidecar_response_tracker_error)?;
2542 self.outbound_sidecar_requests.push_back(request);
2543 Ok(request_id)
2544 }
2545
2546 pub fn queue_wire_sidecar_request(
2547 &mut self,
2548 ownership: crate::wire::OwnershipScope,
2549 payload: crate::wire::SidecarRequestPayload,
2550 ) -> Result<crate::wire::RequestId, SidecarError> {
2551 let ownership = crate::wire::ownership_scope_to_compat(ownership);
2552 let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
2553 .map_err(wire_protocol_error)?;
2554 self.queue_sidecar_request(ownership, payload)
2555 }
2556
2557 pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
2558 self.outbound_sidecar_requests.pop_front()
2559 }
2560
2561 pub fn pop_wire_sidecar_request(
2562 &mut self,
2563 ) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
2564 self.pop_sidecar_request()
2565 .map(crate::wire::sidecar_request_frame_from_compat)
2566 .transpose()
2567 .map_err(wire_protocol_error)
2568 }
2569
2570 pub fn accept_sidecar_response(
2571 &mut self,
2572 response: SidecarResponseFrame,
2573 ) -> Result<(), SidecarError> {
2574 self.pending_sidecar_responses
2575 .accept_response(&response)
2576 .map_err(sidecar_response_tracker_error)?;
2577 self.completed_sidecar_response_order
2578 .push_back(response.request_id);
2579 self.completed_sidecar_responses
2580 .insert(response.request_id, response);
2581 while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
2582 if let Some(evicted) = self.completed_sidecar_response_order.pop_front() {
2583 self.completed_sidecar_responses.remove(&evicted);
2584 }
2585 }
2586 Ok(())
2587 }
2588
2589 pub fn accept_wire_sidecar_response(
2590 &mut self,
2591 response: crate::wire::SidecarResponseFrame,
2592 ) -> Result<(), SidecarError> {
2593 let response =
2594 crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
2595 self.accept_sidecar_response(response)
2596 }
2597
2598 pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
2599 let response = self.completed_sidecar_responses.remove(&request_id);
2600 if response.is_some() {
2601 self.completed_sidecar_response_order
2602 .retain(|completed_id| completed_id != &request_id);
2603 }
2604 response
2605 }
2606
2607 pub fn take_wire_sidecar_response(
2608 &mut self,
2609 request_id: crate::wire::RequestId,
2610 ) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
2611 self.take_sidecar_response(request_id)
2612 .map(|response| {
2613 crate::wire::sidecar_response_frame_from_compat(response)
2614 .map_err(wire_protocol_error)
2615 })
2616 .transpose()
2617 }
2618
2619 pub(crate) fn vm_lifecycle_event(
2620 &self,
2621 connection_id: &str,
2622 session_id: &str,
2623 vm_id: &str,
2624 state: VmLifecycleState,
2625 ) -> EventFrame {
2626 EventFrame::new(
2627 OwnershipScope::vm(connection_id, session_id, vm_id),
2628 EventPayload::VmLifecycle(VmLifecycleEvent { state }),
2629 )
2630 }
2631
2632 fn ensure_request_within_frame_limit(
2633 &self,
2634 request: &RequestFrame,
2635 ) -> Result<(), SidecarError> {
2636 let frame = crate::protocol::to_generated_protocol_frame(
2637 &crate::protocol::ProtocolFrame::Request(request.clone()),
2638 )
2639 .map_err(|error| {
2640 SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
2641 })?;
2642 let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
2643 return Err(SidecarError::InvalidState(String::from(
2644 "request converted to non-request wire frame",
2645 )));
2646 };
2647
2648 crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
2649 .encode(&frame)
2650 .map(|_| ())
2651 .map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
2652 }
2653}
2654
2655impl<B> ExtensionHost for NativeSidecar<B>
2656where
2657 B: NativeSidecarBridge + Send + 'static,
2658 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2659{
2660 fn spawn_process<'a>(
2661 &'a mut self,
2662 ownership: OwnershipScope,
2663 payload: ExecuteRequest,
2664 ) -> ExtensionFuture<'a, ProcessStartedResponse> {
2665 Box::pin(async move {
2666 let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
2667 let dispatch = NativeSidecar::execute(self, &request, payload).await?;
2668 match dispatch.response.payload {
2669 ResponsePayload::ProcessStarted(response) => Ok(response),
2670 other => Err(unexpected_extension_host_response("execute", other)),
2671 }
2672 })
2673 }
2674
2675 fn write_stdin<'a>(
2676 &'a mut self,
2677 ownership: OwnershipScope,
2678 payload: WriteStdinRequest,
2679 ) -> ExtensionFuture<'a, StdinWrittenResponse> {
2680 Box::pin(async move {
2681 let request =
2682 RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
2683 let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
2684 match dispatch.response.payload {
2685 ResponsePayload::StdinWritten(response) => Ok(response),
2686 other => Err(unexpected_extension_host_response("write_stdin", other)),
2687 }
2688 })
2689 }
2690
2691 fn close_stdin<'a>(
2692 &'a mut self,
2693 ownership: OwnershipScope,
2694 payload: CloseStdinRequest,
2695 ) -> ExtensionFuture<'a, StdinClosedResponse> {
2696 Box::pin(async move {
2697 let request =
2698 RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
2699 let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
2700 match dispatch.response.payload {
2701 ResponsePayload::StdinClosed(response) => Ok(response),
2702 other => Err(unexpected_extension_host_response("close_stdin", other)),
2703 }
2704 })
2705 }
2706
2707 fn kill_process<'a>(
2708 &'a mut self,
2709 ownership: OwnershipScope,
2710 payload: KillProcessRequest,
2711 ) -> ExtensionFuture<'a, ProcessKilledResponse> {
2712 Box::pin(async move {
2713 let request =
2714 RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
2715 let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
2716 match dispatch.response.payload {
2717 ResponsePayload::ProcessKilled(response) => Ok(response),
2718 other => Err(unexpected_extension_host_response("kill_process", other)),
2719 }
2720 })
2721 }
2722
2723 fn poll_event<'a>(
2724 &'a mut self,
2725 ownership: OwnershipScope,
2726 timeout: Duration,
2727 ) -> ExtensionFuture<'a, Option<EventFrame>> {
2728 Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
2729 }
2730
2731 fn guest_filesystem_call<'a>(
2732 &'a mut self,
2733 ownership: OwnershipScope,
2734 payload: GuestFilesystemCallRequest,
2735 ) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
2736 Box::pin(async move {
2737 let request = RequestFrame::new(
2738 0,
2739 ownership,
2740 RequestPayload::GuestFilesystemCall(payload.clone()),
2741 );
2742 let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
2743 match dispatch.response.payload {
2744 ResponsePayload::GuestFilesystemResult(response) => Ok(response),
2745 other => Err(unexpected_extension_host_response(
2746 "guest_filesystem_call",
2747 other,
2748 )),
2749 }
2750 })
2751 }
2752
2753 fn bind_process_to_session<'a>(
2754 &'a mut self,
2755 ownership: OwnershipScope,
2756 namespace: String,
2757 ext_session_id: String,
2758 process_id: String,
2759 ) -> ExtensionFuture<'a, ()> {
2760 Box::pin(async move {
2761 self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
2762 })
2763 }
2764
2765 fn bind_vm_to_session<'a>(
2766 &'a mut self,
2767 ownership: OwnershipScope,
2768 namespace: String,
2769 ext_session_id: String,
2770 ) -> ExtensionFuture<'a, ()> {
2771 Box::pin(
2772 async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
2773 )
2774 }
2775
2776 fn dispose_session_resources<'a>(
2777 &'a mut self,
2778 ownership: OwnershipScope,
2779 namespace: String,
2780 ext_session_id: String,
2781 ) -> ExtensionFuture<'a, Vec<EventFrame>> {
2782 Box::pin(async move {
2783 let key = (namespace, ext_session_id);
2784 let Some(resources) = self.extension_sessions.get(&key) else {
2785 return Ok(Vec::new());
2786 };
2787 if resources.ownership != ownership {
2788 return Err(SidecarError::InvalidState(String::from(
2789 "extension session ownership did not match dispose request",
2790 )));
2791 }
2792 let resources = self
2793 .extension_sessions
2794 .remove(&key)
2795 .expect("extension resources existed before removal");
2796 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2797 for process_id in resources.process_ids {
2798 if self
2799 .vms
2800 .get(&vm_id)
2801 .is_some_and(|vm| vm.active_processes.contains_key(&process_id))
2802 {
2803 self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
2804 }
2805 }
2806 let mut events = Vec::new();
2807 for resource_vm_id in resources.vm_ids {
2808 if self.vms.contains_key(&resource_vm_id) {
2809 events.extend(
2810 self.dispose_vm_internal(
2811 &connection_id,
2812 &session_id,
2813 &resource_vm_id,
2814 DisposeReason::Requested,
2815 )
2816 .await?,
2817 );
2818 }
2819 }
2820 Ok(events)
2821 })
2822 }
2823
2824 fn start_buffering_process_output<'a>(
2825 &'a mut self,
2826 ownership: OwnershipScope,
2827 process_id: String,
2828 ) -> ExtensionFuture<'a, ()> {
2829 Box::pin(async move {
2830 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2831 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2832 let key = (vm_id, process_id);
2833 if self.extension_process_output_buffers.contains_key(&key) {
2834 return Err(SidecarError::Conflict(String::from(
2835 "extension process output buffering already started",
2836 )));
2837 }
2838 self.extension_process_output_buffers
2839 .insert(key, ExtensionBufferedProcessOutput::default());
2840 Ok(())
2841 })
2842 }
2843
2844 fn handoff_buffered_process_output<'a>(
2845 &'a mut self,
2846 ownership: OwnershipScope,
2847 namespace: String,
2848 ext_session_id: String,
2849 process_id: String,
2850 timeout: Duration,
2851 ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
2852 Box::pin(async move {
2853 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2854 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2855 let key = (vm_id.clone(), process_id.clone());
2856 let deadline = Instant::now() + timeout;
2857 loop {
2858 self.pump_process_events(&ownership).await?;
2859 while let Some(envelope) =
2860 self.take_matching_process_event_envelope(&vm_id, &process_id)?
2861 {
2862 if self.capture_extension_process_output_event(
2863 &vm_id,
2864 &process_id,
2865 &envelope.event,
2866 ) {
2867 continue;
2868 }
2869 self.queue_pending_process_event(envelope)?;
2870 break;
2871 }
2872 let buffered = self
2873 .extension_process_output_buffers
2874 .get(&key)
2875 .is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
2876 if buffered || timeout.is_zero() || Instant::now() >= deadline {
2877 break;
2878 }
2879 let remaining = deadline.saturating_duration_since(Instant::now());
2880 time::sleep(remaining.min(Duration::from_millis(10))).await;
2881 }
2882 self.bind_extension_process_resource(
2883 ownership,
2884 namespace,
2885 ext_session_id,
2886 process_id.clone(),
2887 )?;
2888 self.extension_process_output_buffers
2889 .remove(&key)
2890 .ok_or_else(|| {
2891 SidecarError::InvalidState(String::from(
2892 "extension process output buffering was not started",
2893 ))
2894 })
2895 })
2896 }
2897}
2898
2899fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
2900 match payload {
2901 ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
2902 "extension {operation} rejected with {}: {}",
2903 response.code, response.message
2904 )),
2905 other => SidecarError::InvalidState(format!(
2906 "extension {operation} returned unexpected response: {other:?}"
2907 )),
2908 }
2909}
2910
2911fn shadow_host_path_for_process(
2912 shadow_root: &Path,
2913 process_guest_cwd: &str,
2914 guest_path: &str,
2915) -> PathBuf {
2916 let normalized_guest_path = if guest_path.starts_with('/') {
2917 normalize_path(guest_path)
2918 } else {
2919 normalize_path(&format!(
2920 "{}/{}",
2921 process_guest_cwd.trim_end_matches('/'),
2922 guest_path
2923 ))
2924 };
2925 if normalized_guest_path == "/" {
2926 shadow_root.to_path_buf()
2927 } else {
2928 shadow_root.join(normalized_guest_path.trim_start_matches('/'))
2929 }
2930}
2931
2932fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
2933 SidecarError::InvalidState(format!(
2934 "invalid sidecar response correlation state: {error}"
2935 ))
2936}
2937
2938fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
2939 match decision.verdict {
2940 secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
2941 secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
2942 decision
2943 .reason
2944 .unwrap_or_else(|| String::from("denied by host")),
2945 ),
2946 secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
2947 decision
2948 .reason
2949 .unwrap_or_else(|| String::from("permission prompt required")),
2950 ),
2951 }
2952}
2953
2954fn audit_timestamp() -> String {
2955 SystemTime::now()
2956 .duration_since(UNIX_EPOCH)
2957 .expect("system time before unix epoch")
2958 .as_millis()
2959 .to_string()
2960}
2961
2962pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
2963where
2964 I: IntoIterator<Item = (K, V)>,
2965 K: Into<String>,
2966 V: Into<String>,
2967{
2968 let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
2969 for (key, value) in fields {
2970 mapped.insert(key.into(), value.into());
2971 }
2972 mapped
2973}
2974
2975pub(crate) fn emit_structured_event<B>(
2976 bridge: &SharedBridge<B>,
2977 vm_id: &str,
2978 name: &str,
2979 fields: BTreeMap<String, String>,
2980) -> Result<(), SidecarError>
2981where
2982 B: NativeSidecarBridge + Send + 'static,
2983 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2984{
2985 bridge.with_mut(|bridge| {
2986 bridge.emit_structured_event(StructuredEventRecord {
2987 vm_id: vm_id.to_owned(),
2988 name: name.to_owned(),
2989 fields,
2990 })
2991 })
2992}
2993
2994pub(crate) fn emit_security_audit_event<B>(
2995 bridge: &SharedBridge<B>,
2996 vm_id: &str,
2997 name: &str,
2998 fields: BTreeMap<String, String>,
2999) where
3000 B: NativeSidecarBridge + Send + 'static,
3001 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3002{
3003 let _ = emit_structured_event(bridge, vm_id, name, fields);
3004}
3005
3006pub(crate) fn log_stale_process_event<B>(
3007 bridge: &SharedBridge<B>,
3008 vm_id: &str,
3009 process_id: &str,
3010 context: &str,
3011) where
3012 B: NativeSidecarBridge + Send + 'static,
3013 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3014{
3015 let _ = bridge.emit_log(
3016 vm_id,
3017 format!(
3018 "Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
3019 ),
3020 );
3021}
3022
3023pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
3026 SidecarError::InvalidState(format!("root filesystem: {error}"))
3027}
3028
3029pub(crate) fn normalize_path(path: &str) -> String {
3030 let mut segments = Vec::new();
3031 for component in Path::new(path).components() {
3032 match component {
3033 Component::RootDir => segments.clear(),
3034 Component::ParentDir => {
3035 segments.pop();
3036 }
3037 Component::CurDir => {}
3038 Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
3039 Component::Prefix(prefix) => {
3040 segments.push(prefix.as_os_str().to_string_lossy().into_owned());
3041 }
3042 }
3043 }
3044
3045 let normalized = format!("/{}", segments.join("/"));
3046 if normalized.is_empty() {
3047 String::from("/")
3048 } else {
3049 normalized
3050 }
3051}
3052
3053pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
3054 let mut normalized = PathBuf::new();
3055
3056 for component in path.components() {
3057 match component {
3058 Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
3059 Component::RootDir => normalized.push(Path::new("/")),
3060 Component::CurDir => {}
3061 Component::ParentDir => {
3062 if normalized != Path::new("/") {
3063 normalized.pop();
3064 }
3065 }
3066 Component::Normal(part) => normalized.push(part),
3067 }
3068 }
3069
3070 if normalized.as_os_str().is_empty() {
3071 if path.is_absolute() {
3072 PathBuf::from("/")
3073 } else {
3074 PathBuf::from(".")
3075 }
3076 } else {
3077 normalized
3078 }
3079}
3080
3081pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
3082 path == root || path.starts_with(root)
3083}
3084
3085pub(crate) fn dirname(path: &str) -> String {
3086 let normalized = normalize_path(path);
3087 let parent = Path::new(&normalized)
3088 .parent()
3089 .unwrap_or_else(|| Path::new("/"));
3090 let value = parent.to_string_lossy();
3091 if value.is_empty() {
3092 String::from("/")
3093 } else {
3094 value.into_owned()
3095 }
3096}
3097
3098pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
3099 SidecarError::Kernel(error.to_string())
3100}
3101
3102pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
3103 SidecarError::Plugin(error.to_string())
3104}
3105
3106pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
3107 SidecarError::Execution(error.to_string())
3108}
3109
3110pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
3111 SidecarError::Execution(error.to_string())
3112}
3113
3114pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
3115 SidecarError::Execution(error.to_string())
3116}
3117
3118pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
3119 SidecarError::Kernel(error.to_string())
3120}
3121
3122#[allow(dead_code)]
3132const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n - pnpm -> add `node-linker=hoisted` to .npmrc, then reinstall\n - yarn berry -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n - bun -> install dependencies outside a workspace (workspaces use a .bun store)\n - npm / yarn classic -> already flat, no change needed";
3133
3134#[allow(dead_code)]
3144fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
3145 const STORE_MARKERS: &[&str] = &[
3150 "node_modules/.pnpm/",
3151 "node_modules/.bun/",
3152 "node_modules/.store/",
3153 "/__virtual__/",
3154 ];
3155 const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
3159 const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
3160
3161 if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
3162 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3163 }
3164
3165 let missing = stderr.contains("ENOENT")
3166 || stderr.contains("no such file or directory")
3167 || stderr.contains("Cannot find module")
3168 || stderr.contains("MODULE_NOT_FOUND");
3169 if !missing {
3170 return None;
3171 }
3172 if STORE_MARKERS.iter().any(|m| stderr.contains(m))
3173 || PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
3174 {
3175 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3176 }
3177 None
3178}
3179
3180#[cfg(test)]
3181mod symlinked_node_modules_hint_tests {
3182 use super::symlinked_node_modules_hint;
3183
3184 #[test]
3186 fn matches_pnpm_store_enoent() {
3187 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
3190 let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
3191 assert!(hint.contains("secure-exec can't load mounted node_modules"));
3192 assert!(!hint.contains("agentos"));
3193 }
3194
3195 #[test]
3196 fn matches_bun_store_enoent() {
3197 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
3198 assert!(symlinked_node_modules_hint(stderr).is_some());
3199 }
3200
3201 #[test]
3202 fn matches_yarn_pnpm_store_enoent() {
3203 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
3204 assert!(symlinked_node_modules_hint(stderr).is_some());
3205 }
3206
3207 #[test]
3208 fn matches_pnp_declared_error() {
3209 let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
3211 assert!(symlinked_node_modules_hint(stderr).is_some());
3212 }
3213
3214 #[test]
3215 fn matches_pnp_cjs_module_not_found() {
3216 let stderr = "Error: Cannot find module 'is-odd'\n at /root/.pnp.cjs:12345:18\n code: 'MODULE_NOT_FOUND'";
3217 assert!(symlinked_node_modules_hint(stderr).is_some());
3218 }
3219
3220 #[test]
3221 fn matches_virtual_instance() {
3222 let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
3223 assert!(symlinked_node_modules_hint(stderr).is_some());
3224 }
3225
3226 #[test]
3228 fn ignores_enoent_outside_a_store() {
3229 let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
3230 assert!(symlinked_node_modules_hint(stderr).is_none());
3231 }
3232
3233 #[test]
3234 fn ignores_store_path_without_missing_file() {
3235 let stderr =
3236 "loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
3237 assert!(symlinked_node_modules_hint(stderr).is_none());
3238 }
3239
3240 #[test]
3241 fn ignores_flat_node_modules_enoent() {
3242 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
3244 assert!(symlinked_node_modules_hint(stderr).is_none());
3245 }
3246
3247 #[test]
3248 fn ignores_unrelated_failure() {
3249 let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
3250 assert!(symlinked_node_modules_hint(stderr).is_none());
3251 }
3252}