1use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
2pub(crate) use crate::execution::{
3 build_javascript_socket_path_context, canonical_signal_name, error_code,
4 ignore_stale_javascript_sync_rpc_response, javascript_sync_rpc_arg_i32,
5 javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32, javascript_sync_rpc_arg_u32_optional,
6 javascript_sync_rpc_arg_u64, javascript_sync_rpc_arg_u64_optional,
7 javascript_sync_rpc_bytes_arg, javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding,
8 javascript_sync_rpc_error_code, javascript_sync_rpc_option_bool,
9 javascript_sync_rpc_option_u32, parse_signal,
10 sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
11 vm_network_resource_counts, JavascriptSyncRpcServiceRequest,
12};
13use crate::extension::{
14 Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
15 ExtensionSnapshot,
16};
17use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
18use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
19use crate::protocol::{
20 AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
21 ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
22 GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
23 JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
24 PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
25 ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
26 RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
27 SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
28 SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
29 SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
30 VmLifecycleState, WriteStdinRequest,
31};
32use crate::state::{
33 ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
34 JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
35 SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
36};
37use crate::tools::register_host_callbacks;
38use crate::NativeSidecarBridge;
39use secure_exec_bridge::{
40 CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
41 FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
42 NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
43};
44use secure_exec_execution::{
45 JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
46 PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
47};
48use secure_exec_kernel::kernel::KernelError;
49use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
50use secure_exec_kernel::permissions::{
51 permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
52 NetworkAccessRequest, NetworkOperation, PermissionDecision,
53};
54use secure_exec_kernel::vfs::VfsError;
56use serde::Deserialize;
57use serde_json::{json, Value};
58use std::collections::{BTreeMap, BTreeSet, VecDeque};
59use std::fmt;
60use std::fs;
61use std::os::unix::fs::PermissionsExt;
62use std::path::{Component, Path, PathBuf};
63use std::sync::{Arc, Mutex};
64use std::task::{Context, Poll, Waker};
65use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
66use tokio::sync::mpsc::{channel, Receiver, Sender};
67use tokio::time;
68
69const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
72 &["AGENT_OS_ENTRYPOINT", "AGENT_OS_BOOTSTRAP_MODULE"];
73const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
74 &["AGENT_OS_WASM_MODULE_PATH", "AGENT_OS_WASM_MODULE_BASE64"];
75const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENT_OS_PYTHON_"];
76pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
77pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
78pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
79pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
80
81pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
82 SidecarError::InvalidState(format!(
83 "process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
84 ))
85}
86
87fn sidecar_response_pending_overflow_error() -> SidecarError {
88 SidecarError::InvalidState(format!(
89 "sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
90 ))
91}
92
93fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
94 SidecarError::InvalidState(format!(
95 "outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
96 ))
97}
98
99fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
100 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
101}
102
103pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
105
106#[derive(Debug, Default, Deserialize)]
109struct LegacyJavascriptChildProcessSpawnOptions {
110 #[serde(default)]
111 cwd: Option<String>,
112 #[serde(default)]
113 env: BTreeMap<String, String>,
114 #[serde(default)]
115 input: Option<Value>,
116 #[serde(default)]
117 shell: bool,
118 #[serde(default)]
119 detached: bool,
120 #[serde(default)]
121 stdio: Vec<String>,
122 #[serde(default, rename = "maxBuffer")]
123 max_buffer: Option<usize>,
124 #[serde(default)]
125 timeout: Option<u64>,
126 #[serde(default, rename = "killSignal")]
127 kill_signal: Option<String>,
128}
129
130pub(crate) fn parse_javascript_child_process_spawn_request(
131 vm: &VmState,
132 args: &[Value],
133) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
134 if let Some(value) = args.first().cloned() {
135 if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
136 return Ok((request, None));
137 }
138 }
139
140 let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
141 let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
142 let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
143
144 let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
145 SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
146 })?;
147 let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
148 raw_options,
149 )
150 .map_err(|error| {
151 SidecarError::InvalidState(format!(
152 "invalid child_process.spawn options payload: {error}"
153 ))
154 })?;
155
156 Ok((
157 JavascriptChildProcessSpawnRequest {
158 command,
159 args: parsed_args,
160 options: JavascriptChildProcessSpawnOptions {
161 cwd: parsed_options.cwd,
162 env: parsed_options.env,
163 internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
164 &vm.guest_env,
165 ),
166 input: parsed_options.input,
167 shell: parsed_options.shell,
168 detached: parsed_options.detached,
169 stdio: parsed_options.stdio,
170 timeout: parsed_options.timeout,
171 kill_signal: parsed_options.kill_signal,
172 },
173 },
174 parsed_options.max_buffer,
175 ))
176}
177
178impl<B> SharedBridge<B> {
179 fn new(bridge: B) -> Self {
180 Self {
181 inner: Arc::new(Mutex::new(bridge)),
182 permissions: Arc::new(Mutex::new(BTreeMap::new())),
183 #[cfg(test)]
184 set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
185 }
186 }
187}
188
189impl<B> SharedBridge<B>
190where
191 B: NativeSidecarBridge + Send + 'static,
192 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
193{
194 pub(crate) fn with_mut<T>(
195 &self,
196 operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
197 ) -> Result<T, SidecarError> {
198 let mut bridge = self.inner.lock().map_err(|_| {
199 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
200 })?;
201 operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
202 }
203
204 fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
205 let mut bridge = self.inner.lock().map_err(|_| {
206 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
207 })?;
208 Ok(operation(&mut bridge))
209 }
210
211 #[cfg(test)]
212 #[allow(dead_code)]
213 pub(crate) fn queue_set_vm_permissions_result(
214 &self,
215 result: Result<(), SidecarError>,
216 ) -> Result<(), SidecarError> {
217 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
218 SidecarError::Bridge(String::from(
219 "native sidecar test set_vm_permissions outcome lock poisoned",
220 ))
221 })?;
222 outcomes.push_back(result.err());
223 Ok(())
224 }
225
226 pub(crate) fn emit_lifecycle(
227 &self,
228 vm_id: &str,
229 state: LifecycleState,
230 ) -> Result<(), SidecarError> {
231 self.with_mut(|bridge| {
232 bridge.emit_lifecycle(LifecycleEventRecord {
233 vm_id: vm_id.to_owned(),
234 state,
235 detail: None,
236 })
237 })
238 }
239
240 pub(crate) fn emit_log(
241 &self,
242 vm_id: &str,
243 message: impl Into<String>,
244 ) -> Result<(), SidecarError> {
245 self.with_mut(|bridge| {
246 bridge.emit_log(LogRecord {
247 vm_id: vm_id.to_owned(),
248 level: LogLevel::Info,
249 message: message.into(),
250 })
251 })
252 }
253
254 pub(crate) fn filesystem_decision(
255 &self,
256 vm_id: &str,
257 path: &str,
258 access: FilesystemAccess,
259 ) -> PermissionDecision {
260 if let Some(decision) = self.static_permission_decision(
261 vm_id,
262 filesystem_permission_capability(access),
263 "fs",
264 Some(path),
265 ) {
266 return decision;
267 }
268 match self.with_mut(|bridge| {
269 bridge.check_filesystem_access(FilesystemPermissionRequest {
270 vm_id: vm_id.to_owned(),
271 path: path.to_owned(),
272 access,
273 })
274 }) {
275 Ok(decision) => map_bridge_permission(decision),
276 Err(error) => PermissionDecision::deny(error.to_string()),
277 }
278 }
279
280 pub(crate) fn command_decision(
281 &self,
282 vm_id: &str,
283 request: &CommandAccessRequest,
284 ) -> PermissionDecision {
285 if is_internal_runtime_command_request(request) {
286 return PermissionDecision::allow();
287 }
288 if let Some(decision) = self.static_permission_decision(
289 vm_id,
290 "child_process.spawn",
291 "child_process",
292 Some(&request.command),
293 ) {
294 return decision;
295 }
296 match self.with_mut(|bridge| {
297 bridge.check_command_execution(CommandPermissionRequest {
298 vm_id: vm_id.to_owned(),
299 command: request.command.clone(),
300 args: request.args.clone(),
301 cwd: request.cwd.clone(),
302 env: request.env.clone(),
303 })
304 }) {
305 Ok(decision) => map_bridge_permission(decision),
306 Err(error) => PermissionDecision::deny(error.to_string()),
307 }
308 }
309
310 pub(crate) fn environment_decision(
311 &self,
312 vm_id: &str,
313 request: &EnvAccessRequest,
314 ) -> PermissionDecision {
315 if let Some(decision) = self.static_permission_decision(
316 vm_id,
317 environment_permission_capability(request.op),
318 "env",
319 Some(&request.key),
320 ) {
321 return decision;
322 }
323 match self.with_mut(|bridge| {
324 bridge.check_environment_access(EnvironmentPermissionRequest {
325 vm_id: vm_id.to_owned(),
326 access: match request.op {
327 EnvironmentOperation::Read => EnvironmentAccess::Read,
328 EnvironmentOperation::Write => EnvironmentAccess::Write,
329 },
330 key: request.key.clone(),
331 value: request.value.clone(),
332 })
333 }) {
334 Ok(decision) => map_bridge_permission(decision),
335 Err(error) => PermissionDecision::deny(error.to_string()),
336 }
337 }
338
339 pub(crate) fn network_decision(
340 &self,
341 vm_id: &str,
342 request: &NetworkAccessRequest,
343 ) -> PermissionDecision {
344 if let Some(decision) = self.static_permission_decision(
345 vm_id,
346 network_permission_capability(request.op),
347 "network",
348 Some(&request.resource),
349 ) {
350 return decision;
351 }
352 match self.with_mut(|bridge| {
353 bridge.check_network_access(NetworkPermissionRequest {
354 vm_id: vm_id.to_owned(),
355 access: match request.op {
356 NetworkOperation::Fetch => NetworkAccess::Fetch,
357 NetworkOperation::Http => NetworkAccess::Http,
358 NetworkOperation::Dns => NetworkAccess::Dns,
359 NetworkOperation::Listen => NetworkAccess::Listen,
360 },
361 resource: request.resource.clone(),
362 })
363 }) {
364 Ok(decision) => map_bridge_permission(decision),
365 Err(error) => PermissionDecision::deny(error.to_string()),
366 }
367 }
368
369 pub(crate) fn require_network_access(
370 &self,
371 vm_id: &str,
372 op: NetworkOperation,
373 resource: impl Into<String>,
374 ) -> Result<(), SidecarError> {
375 let resource = resource.into();
376 let decision = self.network_decision(
377 vm_id,
378 &NetworkAccessRequest {
379 vm_id: vm_id.to_owned(),
380 op,
381 resource: resource.clone(),
382 },
383 );
384 if decision.allow {
385 return Ok(());
386 }
387
388 let message = match decision.reason.as_deref() {
389 Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
390 None => format!("EACCES: permission denied, {resource}"),
391 };
392 Err(SidecarError::Execution(message))
393 }
394
395 pub(crate) fn set_vm_permissions(
396 &self,
397 vm_id: &str,
398 permissions: &PermissionsPolicy,
399 ) -> Result<(), SidecarError> {
400 #[cfg(test)]
401 {
402 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
403 SidecarError::Bridge(String::from(
404 "native sidecar test set_vm_permissions outcome lock poisoned",
405 ))
406 })?;
407 if let Some(Some(error)) = outcomes.pop_front() {
408 return Err(error);
409 }
410 }
411
412 let mut stored = self.permissions.lock().map_err(|_| {
413 SidecarError::Bridge(String::from(
414 "native sidecar permission policy lock poisoned",
415 ))
416 })?;
417 stored.insert(vm_id.to_owned(), permissions.clone());
418 Ok(())
419 }
420
421 pub(crate) fn restore_vm_permissions_fail_closed(
422 &self,
423 vm_id: &str,
424 original_permissions: &PermissionsPolicy,
425 context: &str,
426 operation_error: &SidecarError,
427 ) -> Result<(), SidecarError> {
428 match self.set_vm_permissions(vm_id, original_permissions) {
429 Ok(()) => Ok(()),
430 Err(restore_error) => {
431 let deny_all = PermissionsPolicy::deny_all();
432 match self.set_vm_permissions(vm_id, &deny_all) {
433 Ok(()) => Err(SidecarError::InvalidState(format!(
434 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
435 ))),
436 Err(deny_all_error) => panic!(
437 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
438 ),
439 }
440 }
441 }
442 }
443
444 pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
445 let mut stored = self.permissions.lock().map_err(|_| {
446 SidecarError::Bridge(String::from(
447 "native sidecar permission policy lock poisoned",
448 ))
449 })?;
450 stored.remove(vm_id);
451 Ok(())
452 }
453
454 pub(crate) fn static_permission_decision(
455 &self,
456 vm_id: &str,
457 capability: &str,
458 domain: &str,
459 resource: Option<&str>,
460 ) -> Option<PermissionDecision> {
461 let stored = self.permissions.lock().ok()?;
462 let permissions = stored.get(vm_id)?;
463 let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
464 Some(permission_mode_to_kernel_decision(mode, capability))
465 }
466}
467
468pub(crate) fn evaluate_permissions_policy(
469 permissions: &PermissionsPolicy,
470 domain: &str,
471 capability: &str,
472 resource: Option<&str>,
473) -> PermissionMode {
474 match domain {
475 "fs" => evaluate_fs_permission_scope(
476 permissions.fs.as_ref(),
477 capability_operation(capability, domain),
478 resource,
479 ),
480 "network" => evaluate_pattern_permission_scope(
481 permissions.network.as_ref(),
482 capability_operation(capability, domain),
483 resource,
484 ),
485 "child_process" => evaluate_pattern_permission_scope(
486 permissions.child_process.as_ref(),
487 capability_operation(capability, domain),
488 resource,
489 ),
490 "process" => evaluate_pattern_permission_scope(
491 permissions.process.as_ref(),
492 capability_operation(capability, domain),
493 resource,
494 ),
495 "env" => evaluate_pattern_permission_scope(
496 permissions.env.as_ref(),
497 capability_operation(capability, domain),
498 resource,
499 ),
500 "tool" => evaluate_pattern_permission_scope(
501 permissions.tool.as_ref(),
502 capability_operation(capability, domain),
503 resource,
504 ),
505 _ => PermissionMode::Deny,
506 }
507}
508
509fn evaluate_fs_permission_scope(
510 scope: Option<&FsPermissionScope>,
511 operation: &str,
512 resource: Option<&str>,
513) -> PermissionMode {
514 match scope {
515 Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
516 Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
517 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
518 for rule in &rules.rules {
519 if fs_rule_matches(rule, operation, resource) {
520 mode = rule.mode.clone();
521 }
522 }
523 mode
524 }
525 None => PermissionMode::Deny,
526 }
527}
528
529fn evaluate_pattern_permission_scope(
530 scope: Option<&PatternPermissionScope>,
531 operation: &str,
532 resource: Option<&str>,
533) -> PermissionMode {
534 match scope {
535 Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
536 Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
537 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
538 for rule in &rules.rules {
539 if pattern_rule_matches(rule, operation, resource) {
540 mode = rule.mode.clone();
541 }
542 }
543 mode
544 }
545 None => PermissionMode::Deny,
546 }
547}
548
549fn fs_rule_matches(
550 rule: &crate::protocol::FsPermissionRule,
551 operation: &str,
552 resource: Option<&str>,
553) -> bool {
554 let operations_match = permission_operation_matches(&rule.operations, operation);
555 let paths_match = permission_resource_matches(&rule.paths, resource);
556 operations_match && paths_match
557}
558
559fn pattern_rule_matches(
560 rule: &PatternPermissionRule,
561 operation: &str,
562 resource: Option<&str>,
563) -> bool {
564 let operations_match = permission_operation_matches(&rule.operations, operation);
565 let patterns_match = permission_resource_matches(&rule.patterns, resource);
566 operations_match && patterns_match
567}
568
569fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
570 candidates
571 .iter()
572 .any(|candidate| candidate == "*" || candidate == operation)
573}
574
575fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
576 resource.is_some_and(|value| {
577 patterns
578 .iter()
579 .any(|pattern| permission_glob_matches(pattern, value))
580 })
581}
582
583pub(crate) fn validate_permissions_policy(
584 permissions: &PermissionsPolicy,
585) -> Result<(), SidecarError> {
586 if let Some(scope) = permissions.fs.as_ref() {
587 validate_fs_permission_scope("fs", scope)?;
588 }
589 if let Some(scope) = permissions.network.as_ref() {
590 validate_pattern_permission_scope("network", scope)?;
591 }
592 if let Some(scope) = permissions.child_process.as_ref() {
593 validate_pattern_permission_scope("child_process", scope)?;
594 }
595 if let Some(scope) = permissions.process.as_ref() {
596 validate_pattern_permission_scope("process", scope)?;
597 }
598 if let Some(scope) = permissions.env.as_ref() {
599 validate_pattern_permission_scope("env", scope)?;
600 }
601 if let Some(scope) = permissions.tool.as_ref() {
602 validate_pattern_permission_scope("tool", scope)?;
603 }
604 Ok(())
605}
606
607fn validate_fs_permission_scope(
608 domain: &str,
609 scope: &FsPermissionScope,
610) -> Result<(), SidecarError> {
611 let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
612 return Ok(());
613 };
614
615 for (index, rule) in rule_set.rules.iter().enumerate() {
616 validate_permission_rule_field(
617 &rule.operations,
618 &format!("{domain}.rules[{index}].operations"),
619 )?;
620 validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
621 }
622
623 Ok(())
624}
625
626fn validate_pattern_permission_scope(
627 domain: &str,
628 scope: &PatternPermissionScope,
629) -> Result<(), SidecarError> {
630 let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
631 return Ok(());
632 };
633
634 for (index, rule) in rule_set.rules.iter().enumerate() {
635 validate_permission_rule_field(
636 &rule.operations,
637 &format!("{domain}.rules[{index}].operations"),
638 )?;
639 validate_permission_rule_field(
640 &rule.patterns,
641 &format!("{domain}.rules[{index}].patterns"),
642 )?;
643 }
644
645 Ok(())
646}
647
648fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
649 if values.is_empty() {
650 return Err(SidecarError::InvalidState(format!(
651 "invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
652 )));
653 }
654 Ok(())
655}
656
657fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
658 capability
659 .strip_prefix(domain)
660 .and_then(|value| value.strip_prefix('.'))
661 .unwrap_or("")
662}
663
664fn permission_mode_to_kernel_decision(
665 mode: PermissionMode,
666 capability: &str,
667) -> PermissionDecision {
668 match mode {
669 PermissionMode::Allow => PermissionDecision::allow(),
670 PermissionMode::Ask => {
671 PermissionDecision::deny(format!("permission prompt required for {capability}"))
672 }
673 PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
674 }
675}
676
677pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
678 match access {
679 FilesystemAccess::Read => "fs.read",
680 FilesystemAccess::Write => "fs.write",
681 FilesystemAccess::Stat => "fs.stat",
682 FilesystemAccess::ReadDir => "fs.readdir",
683 FilesystemAccess::CreateDir => "fs.create_dir",
684 FilesystemAccess::Remove => "fs.rm",
685 FilesystemAccess::Rename => "fs.rename",
686 FilesystemAccess::Symlink => "fs.symlink",
687 FilesystemAccess::ReadLink => "fs.readlink",
688 FilesystemAccess::Chmod => "fs.chmod",
689 FilesystemAccess::Truncate => "fs.truncate",
690 }
691}
692
693fn network_permission_capability(operation: NetworkOperation) -> &'static str {
694 match operation {
695 NetworkOperation::Fetch => "network.fetch",
696 NetworkOperation::Http => "network.http",
697 NetworkOperation::Dns => "network.dns",
698 NetworkOperation::Listen => "network.listen",
699 }
700}
701
702fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
703 match operation {
704 EnvironmentOperation::Read => "env.read",
705 EnvironmentOperation::Write => "env.write",
706 }
707}
708
709fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
710 match request.command.as_str() {
711 "node" => request
712 .env
713 .keys()
714 .any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
715 "wasm" => request
716 .env
717 .keys()
718 .any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
719 "python" => request.env.keys().any(|key| {
720 INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
721 .iter()
722 .any(|prefix| key.starts_with(prefix))
723 }),
724 _ => false,
725 }
726}
727
728fn ownership_matches_process_event(
729 ownership: &OwnershipScope,
730 event: &ProcessEventEnvelope,
731) -> bool {
732 match ownership {
733 OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
734 OwnershipScope::SessionOwnership(inner) => {
735 inner.connection_id == event.connection_id && inner.session_id == event.session_id
736 }
737 OwnershipScope::VmOwnership(inner) => {
738 inner.connection_id == event.connection_id
739 && inner.session_id == event.session_id
740 && inner.vm_id == event.vm_id
741 }
742 }
743}
744
745fn public_process_event_matches_ownership<B>(
746 sidecar: &NativeSidecar<B>,
747 ownership: &OwnershipScope,
748 event: &ProcessEventEnvelope,
749) -> bool
750where
751 B: NativeSidecarBridge + Send + 'static,
752 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
753{
754 if !ownership_matches_process_event(ownership, event) {
755 return false;
756 }
757
758 if event.process_id.contains('/') {
759 return false;
760 }
761
762 let _ = sidecar;
765 true
766}
767
768fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
769 let mut context = Context::from_waker(Waker::noop());
770 match future.poll(&mut context) {
771 Poll::Ready(output) => Some(output),
772 Poll::Pending => None,
773 }
774}
775
776impl JavascriptSocketPathContext {
781 pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
782 self.loopback_exempt_ports.contains(&port)
783 || self
784 .tcp_loopback_guest_to_host_ports
785 .keys()
786 .any(|(_, guest_port)| *guest_port == port)
787 }
788
789 pub(crate) fn translate_tcp_loopback_port(
790 &self,
791 family: JavascriptSocketFamily,
792 port: u16,
793 ) -> Option<u16> {
794 self.tcp_loopback_guest_to_host_ports
795 .get(&(family, port))
796 .copied()
797 }
798
799 pub(crate) fn translate_udp_loopback_port(
800 &self,
801 family: JavascriptSocketFamily,
802 port: u16,
803 ) -> Option<u16> {
804 self.udp_loopback_guest_to_host_ports
805 .get(&(family, port))
806 .copied()
807 }
808
809 pub(crate) fn guest_udp_port_for_host_port(
810 &self,
811 family: JavascriptSocketFamily,
812 port: u16,
813 ) -> Option<u16> {
814 self.udp_loopback_host_to_guest_ports
815 .get(&(family, port))
816 .copied()
817 }
818}
819
820pub struct NativeSidecar<B> {
823 pub(crate) config: NativeSidecarConfig,
824 pub(crate) bridge: SharedBridge<B>,
825 pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
826 pub(crate) cache_root: PathBuf,
827 pub(crate) javascript_engine: JavascriptExecutionEngine,
828 pub(crate) python_engine: PythonExecutionEngine,
829 pub(crate) wasm_engine: WasmExecutionEngine,
830 pub(crate) next_connection_id: usize,
831 pub(crate) next_session_id: usize,
832 pub(crate) next_vm_id: usize,
833 pub(crate) next_sidecar_request_id: RequestId,
834 pub(crate) connections: BTreeMap<String, ConnectionState>,
835 pub(crate) sessions: BTreeMap<String, SessionState>,
836 pub(crate) vms: BTreeMap<String, VmState>,
837 #[allow(dead_code)]
838 pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
839 pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
840 pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
841 pub(crate) pending_sidecar_responses: SidecarResponseTracker,
842 pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
843 pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
844 pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
845 pub(crate) sidecar_requests: SharedSidecarRequestClient,
846 pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
847 pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
848 pub(crate) extension_process_output_buffers:
849 BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
850}
851
852#[derive(Debug)]
853pub(crate) struct ExtensionSessionResources {
854 pub(crate) ownership: OwnershipScope,
855 pub(crate) process_ids: BTreeSet<String>,
856 pub(crate) vm_ids: BTreeSet<String>,
857}
858
859impl<B> fmt::Debug for NativeSidecar<B> {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 f.debug_struct("NativeSidecar")
862 .field("config", &self.config)
863 .field("cache_root", &self.cache_root)
864 .field("next_connection_id", &self.next_connection_id)
865 .field("next_session_id", &self.next_session_id)
866 .field("next_vm_id", &self.next_vm_id)
867 .field("connection_count", &self.connections.len())
868 .field("session_count", &self.sessions.len())
869 .field("vm_count", &self.vms.len())
870 .field("extension_session_count", &self.extension_sessions.len())
871 .field(
872 "extension_process_output_buffer_count",
873 &self.extension_process_output_buffers.len(),
874 )
875 .finish()
876 }
877}
878
879impl<B> NativeSidecar<B>
880where
881 B: NativeSidecarBridge + Send + 'static,
882 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
883{
884 pub fn new(bridge: B) -> Result<Self, SidecarError> {
885 Self::with_config(bridge, NativeSidecarConfig::default())
886 }
887
888 pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
889 if matches!(config.expected_auth_token.as_deref(), Some("")) {
890 return Err(SidecarError::InvalidState(String::from(
891 "native sidecar expected_auth_token must not be empty",
892 )));
893 }
894
895 let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
896 std::env::temp_dir().join(format!(
897 "{}-{}",
898 config.sidecar_id,
899 SystemTime::now()
900 .duration_since(UNIX_EPOCH)
901 .expect("system time before unix epoch")
902 .as_nanos()
903 ))
904 });
905 fs::create_dir_all(&cache_root).map_err(|error| {
906 SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
907 })?;
908
909 let bridge = SharedBridge::new(bridge);
910 let mount_plugins = build_mount_plugin_registry::<B>()?;
911 let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
912
913 Ok(Self {
914 config,
915 bridge,
916 mount_plugins,
917 cache_root,
918 javascript_engine: JavascriptExecutionEngine::default(),
919 python_engine: PythonExecutionEngine::default(),
920 wasm_engine: WasmExecutionEngine::default(),
921 next_connection_id: 0,
922 next_session_id: 0,
923 next_vm_id: 0,
924 next_sidecar_request_id: -1,
925 connections: BTreeMap::new(),
926 sessions: BTreeMap::new(),
927 vms: BTreeMap::new(),
928 process_event_sender,
929 process_event_receiver: Some(process_event_receiver),
930 pending_process_events: VecDeque::new(),
931 pending_sidecar_responses: SidecarResponseTracker::default(),
932 outbound_sidecar_requests: VecDeque::new(),
933 completed_sidecar_responses: BTreeMap::new(),
934 completed_sidecar_response_order: VecDeque::new(),
935 sidecar_requests: SharedSidecarRequestClient::default(),
936 extensions: BTreeMap::new(),
937 extension_sessions: BTreeMap::new(),
938 extension_process_output_buffers: BTreeMap::new(),
939 })
940 }
941
942 pub fn with_config_and_extensions(
943 bridge: B,
944 config: NativeSidecarConfig,
945 extensions: Vec<Box<dyn Extension>>,
946 ) -> Result<Self, SidecarError> {
947 let mut sidecar = Self::with_config(bridge, config)?;
948 for extension in extensions {
949 sidecar.register_extension(extension)?;
950 }
951 Ok(sidecar)
952 }
953
954 pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
955 self.extension_sessions.retain(|_, resources| {
956 resources.process_ids.remove(process_id);
957 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
958 });
959 }
960
961 pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
962 self.extension_sessions.retain(|_, resources| {
963 if matches!(
964 &resources.ownership,
965 OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
966 ) {
967 resources.process_ids.clear();
968 }
969 resources.vm_ids.remove(vm_id);
970 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
971 });
972 }
973
974 pub(crate) fn capture_extension_process_output_event(
975 &mut self,
976 vm_id: &str,
977 process_id: &str,
978 event: &ActiveExecutionEvent,
979 ) -> bool {
980 let Some(buffer) = self
981 .extension_process_output_buffers
982 .get_mut(&(vm_id.to_string(), process_id.to_string()))
983 else {
984 return false;
985 };
986 match event {
987 ActiveExecutionEvent::Stdout(chunk) => {
988 buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
989 true
990 }
991 ActiveExecutionEvent::Stderr(chunk) => {
992 buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
993 true
994 }
995 ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
996 | ActiveExecutionEvent::PythonVfsRpcRequest(_)
997 | ActiveExecutionEvent::SignalState { .. }
998 | ActiveExecutionEvent::Exited(_) => false,
999 }
1000 }
1001
1002 fn bind_extension_process_resource(
1003 &mut self,
1004 ownership: OwnershipScope,
1005 namespace: String,
1006 ext_session_id: String,
1007 process_id: String,
1008 ) -> Result<(), SidecarError> {
1009 if ext_session_id.is_empty() {
1010 return Err(SidecarError::InvalidState(String::from(
1011 "extension session id must not be empty",
1012 )));
1013 }
1014 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1015 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1016 let process_exists = self
1017 .vms
1018 .get(&vm_id)
1019 .is_some_and(|vm| vm.active_processes.contains_key(&process_id));
1020 if !process_exists {
1021 return Err(SidecarError::InvalidState(format!(
1022 "VM {vm_id} has no active process {process_id}"
1023 )));
1024 }
1025
1026 let key = (namespace, ext_session_id);
1027 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1028 if resources.ownership != ownership {
1029 return Err(SidecarError::InvalidState(String::from(
1030 "extension session ownership did not match existing resources",
1031 )));
1032 }
1033 resources.process_ids.insert(process_id);
1034 } else {
1035 self.extension_sessions.insert(
1036 key,
1037 ExtensionSessionResources {
1038 ownership,
1039 process_ids: BTreeSet::from([process_id]),
1040 vm_ids: BTreeSet::new(),
1041 },
1042 );
1043 }
1044 Ok(())
1045 }
1046
1047 fn bind_extension_vm_resource(
1048 &mut self,
1049 ownership: OwnershipScope,
1050 namespace: String,
1051 ext_session_id: String,
1052 ) -> Result<(), SidecarError> {
1053 if ext_session_id.is_empty() {
1054 return Err(SidecarError::InvalidState(String::from(
1055 "extension session id must not be empty",
1056 )));
1057 }
1058 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1059 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1060
1061 let key = (namespace, ext_session_id);
1062 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1063 if resources.ownership != ownership {
1064 return Err(SidecarError::InvalidState(String::from(
1065 "extension session ownership did not match existing resources",
1066 )));
1067 }
1068 resources.vm_ids.insert(vm_id);
1069 } else {
1070 self.extension_sessions.insert(
1071 key,
1072 ExtensionSessionResources {
1073 ownership,
1074 process_ids: BTreeSet::new(),
1075 vm_ids: BTreeSet::from([vm_id]),
1076 },
1077 );
1078 }
1079 Ok(())
1080 }
1081
1082 pub fn sidecar_id(&self) -> &str {
1083 &self.config.sidecar_id
1084 }
1085
1086 pub fn with_bridge_mut<T>(
1087 &self,
1088 operation: impl FnOnce(&mut B) -> T,
1089 ) -> Result<T, SidecarError> {
1090 self.bridge.inspect(operation)
1091 }
1092
1093 pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
1094 self.sidecar_requests.set_transport(transport);
1095 }
1096
1097 pub fn register_extension(
1098 &mut self,
1099 extension: Box<dyn Extension>,
1100 ) -> Result<(), SidecarError> {
1101 let namespace = extension.namespace().to_owned();
1102 if namespace.is_empty() {
1103 return Err(SidecarError::InvalidState(String::from(
1104 "extension namespace must not be empty",
1105 )));
1106 }
1107 if self.extensions.contains_key(&namespace) {
1108 return Err(SidecarError::Conflict(format!(
1109 "extension namespace {namespace} is already registered",
1110 )));
1111 }
1112 self.extensions.insert(namespace, Arc::from(extension));
1113 Ok(())
1114 }
1115
1116 pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
1117 where
1118 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1119 + Send
1120 + Sync
1121 + 'static,
1122 {
1123 struct HandlerTransport<F>(F);
1124
1125 impl<F> SidecarRequestTransport for HandlerTransport<F>
1126 where
1127 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1128 + Send
1129 + Sync
1130 + 'static,
1131 {
1132 fn send_request(
1133 &self,
1134 request: SidecarRequestFrame,
1135 _timeout: Duration,
1136 ) -> Result<SidecarResponseFrame, SidecarError> {
1137 let payload = (self.0)(request.clone())?;
1138 Ok(SidecarResponseFrame::new(
1139 request.request_id,
1140 request.ownership,
1141 payload,
1142 ))
1143 }
1144 }
1145
1146 self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
1147 }
1148
1149 pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
1150 where
1151 F: Fn(
1152 crate::wire::SidecarRequestFrame,
1153 ) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
1154 + Send
1155 + Sync
1156 + 'static,
1157 {
1158 self.set_sidecar_request_handler(move |request| {
1159 let request = crate::wire::sidecar_request_frame_from_compat(request)
1160 .map_err(wire_protocol_error)?;
1161 let response = handler(request)?;
1162 let response = crate::wire::sidecar_response_frame_to_compat(response)
1163 .map_err(wire_protocol_error)?;
1164 Ok(response.payload)
1165 });
1166 }
1167
1168 pub(crate) fn queue_pending_process_event(
1169 &mut self,
1170 envelope: ProcessEventEnvelope,
1171 ) -> Result<(), SidecarError> {
1172 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1173 return Err(process_event_queue_overflow_error());
1174 }
1175 self.pending_process_events.push_back(envelope);
1176 Ok(())
1177 }
1178
1179 pub(crate) fn queue_front_pending_process_event(
1180 &mut self,
1181 envelope: ProcessEventEnvelope,
1182 ) -> Result<(), SidecarError> {
1183 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1184 return Err(process_event_queue_overflow_error());
1185 }
1186 self.pending_process_events.push_front(envelope);
1187 Ok(())
1188 }
1189
1190 pub(crate) fn pending_process_event_capacity(&self) -> usize {
1191 MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
1192 }
1193
1194 pub fn dispatch_blocking(
1195 &mut self,
1196 request: RequestFrame,
1197 ) -> Result<DispatchResult, SidecarError> {
1198 let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
1199 if matches!(
1200 request.payload,
1201 RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
1202 ) && !inside_runtime
1203 {
1204 return tokio::runtime::Builder::new_current_thread()
1205 .enable_all()
1206 .build()
1207 .expect("sidecar dispatch runtime")
1208 .block_on(self.dispatch(request));
1209 }
1210
1211 let mut future = std::pin::pin!(self.dispatch(request));
1212 match poll_future_once(future.as_mut()) {
1213 Some(result) => result,
1214 None if inside_runtime => Err(SidecarError::InvalidState(String::from(
1215 "dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
1216 ))),
1217 None => tokio::runtime::Builder::new_current_thread()
1218 .enable_all()
1219 .build()
1220 .expect("sidecar dispatch runtime")
1221 .block_on(future),
1222 }
1223 }
1224
1225 pub fn dispatch_wire_blocking(
1226 &mut self,
1227 request: crate::wire::RequestFrame,
1228 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1229 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1230 let result = self.dispatch_blocking(request)?;
1231 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1232 }
1233
1234 pub fn poll_event_blocking(
1235 &mut self,
1236 ownership: &OwnershipScope,
1237 timeout: Duration,
1238 ) -> Result<Option<EventFrame>, SidecarError> {
1239 tokio::runtime::Builder::new_current_thread()
1240 .enable_all()
1241 .build()
1242 .expect("sidecar poll runtime")
1243 .block_on(self.poll_event(ownership, timeout))
1244 }
1245
1246 pub fn poll_event_wire_blocking(
1247 &mut self,
1248 ownership: &crate::wire::OwnershipScope,
1249 timeout: Duration,
1250 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1251 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1252 self.poll_event_blocking(&ownership, timeout)?
1253 .map(crate::wire::event_frame_from_compat)
1254 .transpose()
1255 .map_err(wire_protocol_error)
1256 }
1257
1258 pub fn close_session_blocking(
1259 &mut self,
1260 connection_id: &str,
1261 session_id: &str,
1262 ) -> Result<Vec<EventFrame>, SidecarError> {
1263 tokio::runtime::Builder::new_current_thread()
1264 .enable_all()
1265 .build()
1266 .expect("sidecar close-session runtime")
1267 .block_on(self.close_session(connection_id, session_id))
1268 }
1269
1270 pub fn remove_connection_blocking(
1271 &mut self,
1272 connection_id: &str,
1273 ) -> Result<Vec<EventFrame>, SidecarError> {
1274 tokio::runtime::Builder::new_current_thread()
1275 .enable_all()
1276 .build()
1277 .expect("sidecar remove-connection runtime")
1278 .block_on(self.remove_connection(connection_id))
1279 }
1280
1281 pub fn dispose_vm_internal_blocking(
1282 &mut self,
1283 connection_id: &str,
1284 session_id: &str,
1285 vm_id: &str,
1286 reason: DisposeReason,
1287 ) -> Result<Vec<EventFrame>, SidecarError> {
1288 tokio::runtime::Builder::new_current_thread()
1289 .enable_all()
1290 .build()
1291 .expect("sidecar dispose-vm runtime")
1292 .block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
1293 }
1294
1295 pub async fn dispatch(
1296 &mut self,
1297 request: RequestFrame,
1298 ) -> Result<DispatchResult, SidecarError> {
1299 if let Err(error) = self.ensure_request_within_frame_limit(&request) {
1300 return Ok(DispatchResult {
1301 response: self.reject(&request, error_code(&error), &error.to_string()),
1302 events: Vec::new(),
1303 });
1304 }
1305
1306 let result = match request.payload.clone() {
1307 RequestPayload::Authenticate(payload) => {
1308 self.authenticate_connection(&request, payload).await
1309 }
1310 RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
1311 RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
1312 RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
1313 RequestPayload::BootstrapRootFilesystem(payload) => {
1314 self.bootstrap_root_filesystem(&request, payload.entries)
1315 .await
1316 }
1317 RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
1318 RequestPayload::RegisterHostCallbacks(payload) => {
1319 register_host_callbacks(self, &request, payload)
1320 }
1321 RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
1322 RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
1323 RequestPayload::ImportSnapshot(payload) => {
1324 self.import_snapshot(&request, payload).await
1325 }
1326 RequestPayload::ExportSnapshot(payload) => {
1327 self.export_snapshot(&request, payload).await
1328 }
1329 RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
1330 RequestPayload::GuestFilesystemCall(payload) => {
1331 self.guest_filesystem_call(&request, payload).await
1332 }
1333 RequestPayload::SnapshotRootFilesystem(payload) => {
1334 self.snapshot_root_filesystem(&request, payload).await
1335 }
1336 RequestPayload::Execute(payload) => self.execute(&request, payload).await,
1337 RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
1338 RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
1339 RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
1340 RequestPayload::GetProcessSnapshot(payload) => {
1341 self.get_process_snapshot(&request, payload).await
1342 }
1343 RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
1344 RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
1345 RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
1346 RequestPayload::GetSignalState(payload) => {
1347 self.get_signal_state(&request, payload).await
1348 }
1349 RequestPayload::GetZombieTimerCount(payload) => {
1350 self.get_zombie_timer_count(&request, payload).await
1351 }
1352 RequestPayload::HostFilesystemCall(_)
1353 | RequestPayload::PersistenceLoad(_)
1354 | RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
1355 response: self.reject(
1356 &request,
1357 "unsupported_direction",
1358 "host callback request categories are sidecar-to-host only in this scaffold",
1359 ),
1360 events: Vec::new(),
1361 }),
1362 RequestPayload::Ext(payload) => {
1363 self.dispatch_extension_request(&request, payload).await
1364 }
1365 };
1366
1367 match result {
1368 Ok(dispatch) => Ok(dispatch),
1369 Err(error @ SidecarError::Io(_)) => Err(error),
1370 Err(error) => Ok(DispatchResult {
1371 response: self.reject(&request, error_code(&error), &error.to_string()),
1372 events: Vec::new(),
1373 }),
1374 }
1375 }
1376
1377 pub async fn dispatch_wire(
1378 &mut self,
1379 request: crate::wire::RequestFrame,
1380 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1381 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1382 let result = self.dispatch(request).await?;
1383 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1384 }
1385
1386 pub async fn poll_event_wire(
1387 &mut self,
1388 ownership: &crate::wire::OwnershipScope,
1389 timeout: Duration,
1390 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1391 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1392 self.poll_event(&ownership, timeout)
1393 .await?
1394 .map(crate::wire::event_frame_from_compat)
1395 .transpose()
1396 .map_err(wire_protocol_error)
1397 }
1398
1399 async fn dispatch_extension_request(
1400 &mut self,
1401 request: &RequestFrame,
1402 envelope: ExtEnvelope,
1403 ) -> Result<DispatchResult, SidecarError> {
1404 let namespace = envelope.namespace;
1405 let Some(extension) = self.extensions.get(&namespace).cloned() else {
1406 return Ok(DispatchResult {
1407 response: self.reject(
1408 request,
1409 "unknown_extension",
1410 &format!("no extension registered for namespace {namespace}"),
1411 ),
1412 events: Vec::new(),
1413 });
1414 };
1415 let snapshot = ExtensionSnapshot::new(
1416 namespace.clone(),
1417 request.ownership.clone(),
1418 self.sidecar_requests.clone(),
1419 );
1420 let ctx = ExtensionContext::new(snapshot, self);
1421 let response = extension.handle_request(ctx, envelope.payload).await?;
1422 Ok(DispatchResult {
1423 response: self.respond(
1424 request,
1425 ResponsePayload::ExtResult(ExtEnvelope {
1426 namespace,
1427 payload: response.payload,
1428 }),
1429 ),
1430 events: response.events,
1431 })
1432 }
1433
1434 pub async fn poll_event(
1435 &mut self,
1436 ownership: &OwnershipScope,
1437 timeout: Duration,
1438 ) -> Result<Option<EventFrame>, SidecarError> {
1439 let deadline = Instant::now() + timeout;
1440 loop {
1441 if let Some(index) = self
1442 .pending_process_events
1443 .iter()
1444 .position(|event| public_process_event_matches_ownership(self, ownership, event))
1445 {
1446 let Some(envelope) = self.pending_process_events.remove(index) else {
1447 continue;
1448 };
1449 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1450 return Ok(Some(frame));
1451 }
1452 continue;
1453 }
1454
1455 if !timeout.is_zero() {
1456 let _ = self.pump_process_events(ownership).await?;
1457 }
1458
1459 let queued_envelopes = {
1460 let pending_capacity = self.pending_process_event_capacity();
1461 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
1462 SidecarError::InvalidState(String::from("process event receiver unavailable"))
1463 })?;
1464 let mut queued = Vec::new();
1465 loop {
1466 if queued.len() >= pending_capacity {
1467 if receiver.is_empty() {
1468 break;
1469 }
1470 return Err(process_event_queue_overflow_error());
1471 }
1472 match receiver.try_recv() {
1473 Ok(envelope) => queued.push(envelope),
1474 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1475 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
1476 }
1477 }
1478 queued
1479 };
1480
1481 let mut matching_envelope = None;
1482 for envelope in queued_envelopes {
1483 if matching_envelope.is_none()
1484 && public_process_event_matches_ownership(self, ownership, &envelope)
1485 {
1486 matching_envelope = Some(envelope);
1487 } else {
1488 self.queue_pending_process_event(envelope)?;
1489 }
1490 }
1491
1492 if let Some(envelope) = matching_envelope {
1493 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1494 return Ok(Some(frame));
1495 }
1496 continue;
1497 }
1498
1499 if Instant::now() >= deadline {
1500 return Ok(None);
1501 }
1502
1503 let remaining = deadline.saturating_duration_since(Instant::now());
1504 time::sleep(remaining.min(Duration::from_millis(10))).await;
1505 }
1506 }
1507
1508 pub(crate) fn handle_process_event_envelope(
1509 &mut self,
1510 envelope: ProcessEventEnvelope,
1511 ) -> Result<Option<EventFrame>, SidecarError> {
1512 let ProcessEventEnvelope {
1513 connection_id,
1514 session_id,
1515 vm_id,
1516 process_id,
1517 event,
1518 } = envelope;
1519
1520 if matches!(event, ActiveExecutionEvent::Exited(_)) {
1521 let mut trailing = Vec::new();
1522 let mut deferred = VecDeque::new();
1523 while let Some(pending) = self.pending_process_events.pop_front() {
1524 if pending.vm_id == vm_id
1525 && pending.process_id == process_id
1526 && !matches!(pending.event, ActiveExecutionEvent::Exited(_))
1527 {
1528 trailing.push(pending.event);
1529 } else {
1530 deferred.push_back(pending);
1531 }
1532 }
1533 self.pending_process_events = deferred;
1534 let drain_limit = self
1535 .pending_process_event_capacity()
1536 .saturating_sub(trailing.len().saturating_add(1));
1537 trailing.extend(
1538 self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
1539 .into_iter()
1540 .filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
1541 );
1542
1543 if !trailing.is_empty() {
1544 if self.pending_process_event_capacity() < trailing.len() {
1545 return Err(process_event_queue_overflow_error());
1546 }
1547 let emit_now = if self.pending_process_event_capacity() == trailing.len() {
1548 Some(trailing.remove(0))
1549 } else {
1550 None
1551 };
1552 self.queue_front_pending_process_event(ProcessEventEnvelope {
1553 connection_id: connection_id.clone(),
1554 session_id: session_id.clone(),
1555 vm_id: vm_id.clone(),
1556 process_id: process_id.clone(),
1557 event,
1558 })?;
1559 for event in trailing.into_iter().rev() {
1560 self.queue_front_pending_process_event(ProcessEventEnvelope {
1561 connection_id: connection_id.clone(),
1562 session_id: session_id.clone(),
1563 vm_id: vm_id.clone(),
1564 process_id: process_id.clone(),
1565 event,
1566 })?;
1567 }
1568 if let Some(event) = emit_now {
1569 return self.handle_execution_event(&vm_id, &process_id, event);
1570 }
1571 return Ok(None);
1572 }
1573 }
1574
1575 self.handle_execution_event(&vm_id, &process_id, event)
1576 }
1577
1578 pub async fn close_session(
1581 &mut self,
1582 connection_id: &str,
1583 session_id: &str,
1584 ) -> Result<Vec<EventFrame>, SidecarError> {
1585 self.dispose_session(connection_id, session_id, DisposeReason::Requested)
1586 .await
1587 }
1588
1589 pub async fn remove_connection(
1590 &mut self,
1591 connection_id: &str,
1592 ) -> Result<Vec<EventFrame>, SidecarError> {
1593 self.require_authenticated_connection(connection_id)?;
1594
1595 let session_ids = self
1596 .connections
1597 .get(connection_id)
1598 .expect("authenticated connection should exist")
1599 .sessions
1600 .iter()
1601 .cloned()
1602 .collect::<Vec<_>>();
1603
1604 let mut events = Vec::new();
1605 for session_id in session_ids {
1606 events.extend(
1607 self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
1608 .await?,
1609 );
1610 }
1611
1612 self.connections.remove(connection_id);
1613 Ok(events)
1614 }
1615
1616 async fn authenticate_connection(
1617 &mut self,
1618 request: &RequestFrame,
1619 payload: crate::protocol::AuthenticateRequest,
1620 ) -> Result<DispatchResult, SidecarError> {
1621 let _ = self.connection_id_for(&request.ownership)?;
1622 if let Err(error) = self.validate_auth_token(&payload.auth_token) {
1623 let mut fields = audit_fields([
1624 (String::from("source"), payload.client_name.clone()),
1625 (String::from("reason"), error.to_string()),
1626 ]);
1627 if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
1628 fields.insert(String::from("connection_id"), inner.connection_id.clone());
1629 }
1630 emit_security_audit_event(
1631 &self.bridge,
1632 &self.config.sidecar_id,
1633 "security.auth.failed",
1634 fields,
1635 );
1636 return Err(error);
1637 }
1638
1639 if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
1640 return Err(SidecarError::ProtocolVersionMismatch(format!(
1641 "sidecar protocol version mismatch: expected {}, got {}",
1642 crate::wire::PROTOCOL_VERSION,
1643 payload.protocol_version
1644 )));
1645 }
1646
1647 let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
1648 if payload.bridge_version != expected_bridge_version {
1649 return Err(SidecarError::BridgeVersionMismatch(format!(
1650 "bridge contract version mismatch: expected {expected_bridge_version}, got {}",
1651 payload.bridge_version
1652 )));
1653 }
1654
1655 let connection_id = self.allocate_connection_id();
1656 self.connections.insert(
1657 connection_id.clone(),
1658 ConnectionState {
1659 auth_token: payload.auth_token,
1660 sessions: BTreeSet::new(),
1661 },
1662 );
1663
1664 let response = self.response_with_ownership(
1665 request.request_id,
1666 OwnershipScope::connection(&connection_id),
1667 ResponsePayload::Authenticated(AuthenticatedResponse {
1668 sidecar_id: self.config.sidecar_id.clone(),
1669 connection_id,
1670 max_frame_bytes: self.config.max_frame_bytes as u32,
1671 }),
1672 );
1673 Ok(DispatchResult {
1674 response,
1675 events: Vec::new(),
1676 })
1677 }
1678
1679 async fn open_session(
1680 &mut self,
1681 request: &RequestFrame,
1682 payload: OpenSessionRequest,
1683 ) -> Result<DispatchResult, SidecarError> {
1684 let connection_id = self.connection_id_for(&request.ownership)?;
1685 self.require_authenticated_connection(&connection_id)?;
1686
1687 self.next_session_id += 1;
1688 let session_id = format!("session-{}", self.next_session_id);
1689 self.sessions.insert(
1690 session_id.clone(),
1691 SessionState {
1692 connection_id: connection_id.clone(),
1693 placement: payload.placement,
1694 metadata: payload.metadata.into_iter().collect(),
1695 vm_ids: BTreeSet::new(),
1696 },
1697 );
1698 self.connections
1699 .get_mut(&connection_id)
1700 .expect("authenticated connection should exist")
1701 .sessions
1702 .insert(session_id.clone());
1703
1704 Ok(DispatchResult {
1705 response: self.respond(
1706 request,
1707 ResponsePayload::SessionOpened(SessionOpenedResponse {
1708 session_id,
1709 owner_connection_id: connection_id,
1710 }),
1711 ),
1712 events: Vec::new(),
1713 })
1714 }
1715
1716 async fn guest_filesystem_call(
1719 &mut self,
1720 request: &RequestFrame,
1721 payload: GuestFilesystemCallRequest,
1722 ) -> Result<DispatchResult, SidecarError> {
1723 filesystem_guest_filesystem_call(self, request, payload).await
1724 }
1725
1726 async fn dispose_session(
1732 &mut self,
1733 connection_id: &str,
1734 session_id: &str,
1735 reason: DisposeReason,
1736 ) -> Result<Vec<EventFrame>, SidecarError> {
1737 self.require_owned_session(connection_id, session_id)?;
1738
1739 let vm_ids = self
1740 .sessions
1741 .get(session_id)
1742 .expect("owned session should exist")
1743 .vm_ids
1744 .iter()
1745 .cloned()
1746 .collect::<Vec<_>>();
1747
1748 let mut events = Vec::new();
1749 for vm_id in vm_ids {
1750 events.extend(
1751 self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
1752 .await?,
1753 );
1754 }
1755
1756 self.sessions.remove(session_id);
1757 if let Some(connection) = self.connections.get_mut(connection_id) {
1758 connection.sessions.remove(session_id);
1759 }
1760 Ok(events)
1761 }
1762
1763 pub(crate) fn handle_javascript_sync_rpc_request(
1771 &mut self,
1772 vm_id: &str,
1773 process_id: &str,
1774 request: JavascriptSyncRpcRequest,
1775 ) -> Result<(), SidecarError> {
1776 let Some(vm) = self.vms.get(vm_id) else {
1777 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1778 return Ok(());
1779 };
1780 if !vm.active_processes.contains_key(process_id) {
1781 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1782 return Ok(());
1783 }
1784
1785 let response: Result<Value, SidecarError> = match request.method.as_str() {
1786 "child_process.spawn" => {
1787 let Some(vm) = self.vms.get(vm_id) else {
1788 log_stale_process_event(
1789 &self.bridge,
1790 vm_id,
1791 process_id,
1792 "javascript sync RPC child_process.spawn",
1793 );
1794 return Ok(());
1795 };
1796 let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
1797 self.spawn_javascript_child_process(vm_id, process_id, payload)
1798 }
1799 "child_process.spawn_sync" => {
1800 let Some(vm) = self.vms.get(vm_id) else {
1801 log_stale_process_event(
1802 &self.bridge,
1803 vm_id,
1804 process_id,
1805 "javascript sync RPC child_process.spawn_sync",
1806 );
1807 return Ok(());
1808 };
1809 let (payload, max_buffer) =
1810 parse_javascript_child_process_spawn_request(vm, &request.args)?;
1811 self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
1812 }
1813 "child_process.poll" => {
1814 let child_process_id =
1815 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
1816 let wait_ms = javascript_sync_rpc_arg_u64_optional(
1817 &request.args,
1818 1,
1819 "child_process.poll wait ms",
1820 )?
1821 .unwrap_or_default();
1822 self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
1823 }
1824 "child_process.write_stdin" => {
1825 let child_process_id = javascript_sync_rpc_arg_str(
1826 &request.args,
1827 0,
1828 "child_process.write_stdin child id",
1829 )?;
1830 let chunk = javascript_sync_rpc_bytes_arg(
1831 &request.args,
1832 1,
1833 "child_process.write_stdin chunk",
1834 )?;
1835 self.write_javascript_child_process_stdin(
1836 vm_id,
1837 process_id,
1838 child_process_id,
1839 &chunk,
1840 )?;
1841 Ok(Value::Null)
1842 }
1843 "child_process.close_stdin" => {
1844 let child_process_id = javascript_sync_rpc_arg_str(
1845 &request.args,
1846 0,
1847 "child_process.close_stdin child id",
1848 )?;
1849 self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
1850 Ok(Value::Null)
1851 }
1852 "child_process.kill" => {
1853 let child_process_id =
1854 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
1855 let signal =
1856 javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
1857 self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
1858 Ok(Value::Null)
1859 }
1860 "process.kill" => {
1861 let target_pid =
1862 javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
1863 let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
1864 let parsed_signal = parse_signal(signal)?;
1865 if parsed_signal == 0 {
1866 let Some(vm) = self.vms.get(vm_id) else {
1867 log_stale_process_event(
1868 &self.bridge,
1869 vm_id,
1870 process_id,
1871 "javascript sync RPC process.kill",
1872 );
1873 return Ok(());
1874 };
1875 if !vm.active_processes.contains_key(process_id) {
1876 log_stale_process_event(
1877 &self.bridge,
1878 vm_id,
1879 process_id,
1880 "javascript sync RPC process.kill",
1881 );
1882 return Ok(());
1883 }
1884 vm.kernel
1885 .signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
1886 .map(|()| Value::Null)
1887 .map_err(kernel_error)
1888 } else if target_pid < 0 {
1889 let caller_kernel_pid = {
1890 let Some(vm) = self.vms.get(vm_id) else {
1891 log_stale_process_event(
1892 &self.bridge,
1893 vm_id,
1894 process_id,
1895 "javascript sync RPC process.kill",
1896 );
1897 return Ok(());
1898 };
1899 let Some(caller) = vm.active_processes.get(process_id) else {
1900 log_stale_process_event(
1901 &self.bridge,
1902 vm_id,
1903 process_id,
1904 "javascript sync RPC process.kill",
1905 );
1906 return Ok(());
1907 };
1908 caller.kernel_pid
1909 };
1910 let pgid = target_pid.unsigned_abs();
1911 match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
1912 Ok(true) => {
1913 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1914 }
1915 Ok(false) => Ok(Value::Null),
1916 Err(error) => Err(error),
1917 }
1918 } else {
1919 enum ProcessKillTarget {
1920 SelfProcess,
1921 Child(String),
1922 TopLevel(String),
1923 KernelPid(u32),
1924 }
1925 let target = {
1926 let Some(vm) = self.vms.get(vm_id) else {
1927 log_stale_process_event(
1928 &self.bridge,
1929 vm_id,
1930 process_id,
1931 "javascript sync RPC process.kill",
1932 );
1933 return Ok(());
1934 };
1935 let Some(caller) = vm.active_processes.get(process_id) else {
1936 log_stale_process_event(
1937 &self.bridge,
1938 vm_id,
1939 process_id,
1940 "javascript sync RPC process.kill",
1941 );
1942 return Ok(());
1943 };
1944 let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
1945 SidecarError::InvalidState("caller pid exceeds i32".into())
1946 })?;
1947 if caller_pid == target_pid {
1948 ProcessKillTarget::SelfProcess
1949 } else if let Some((child_process_id, _)) = caller
1950 .child_processes
1951 .iter()
1952 .find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
1953 {
1954 ProcessKillTarget::Child(child_process_id.clone())
1955 } else if let Some((target_process_id, _)) =
1956 vm.active_processes.iter().find(|(_, process)| {
1957 i32::try_from(process.kernel_pid) == Ok(target_pid)
1958 })
1959 {
1960 ProcessKillTarget::TopLevel(target_process_id.clone())
1961 } else {
1962 let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
1963 SidecarError::InvalidState(format!(
1964 "EINVAL: invalid process pid {target_pid}"
1965 ))
1966 })?;
1967 ProcessKillTarget::KernelPid(target_kernel_pid)
1968 }
1969 };
1970 match target {
1971 ProcessKillTarget::SelfProcess => {
1972 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1973 }
1974 ProcessKillTarget::Child(child_process_id) => {
1975 self.kill_javascript_child_process(
1976 vm_id,
1977 process_id,
1978 &child_process_id,
1979 signal,
1980 )?;
1981 Ok(Value::Null)
1982 }
1983 ProcessKillTarget::TopLevel(target_process_id) => {
1984 self.kill_process_internal(vm_id, &target_process_id, signal)?;
1985 Ok(Value::Null)
1986 }
1987 ProcessKillTarget::KernelPid(target_kernel_pid) => {
1988 self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
1992 .map(|()| Value::Null)
1993 }
1994 }
1995 }
1996 }
1997 "process.signal_state" => {
1998 let signal =
1999 javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
2000 let action =
2001 javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
2002 let mask_json =
2003 javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
2004 let flags =
2005 javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
2006 let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
2007 SidecarError::InvalidState(format!(
2008 "process.signal_state mask must be valid JSON: {error}"
2009 ))
2010 })?;
2011 let action = match action.trim().to_ascii_lowercase().as_str() {
2012 "default" => SignalDispositionAction::Default,
2013 "ignore" => SignalDispositionAction::Ignore,
2014 "user" => SignalDispositionAction::User,
2015 other => {
2016 return Err(SidecarError::InvalidState(format!(
2017 "unsupported process.signal_state action {other}"
2018 )));
2019 }
2020 };
2021 let Some(vm) = self.vms.get_mut(vm_id) else {
2022 log_stale_process_event(
2023 &self.bridge,
2024 vm_id,
2025 process_id,
2026 "javascript sync RPC process.signal_state",
2027 );
2028 return Ok(());
2029 };
2030 if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
2031 let remove_process_entry = vm
2032 .signal_states
2033 .get_mut(process_id)
2034 .map(|handlers| {
2035 handlers.remove(&signal);
2036 handlers.is_empty()
2037 })
2038 .unwrap_or(false);
2039 if remove_process_entry {
2040 vm.signal_states.remove(process_id);
2041 }
2042 } else {
2043 vm.signal_states
2044 .entry(process_id.to_owned())
2045 .or_default()
2046 .insert(
2047 signal,
2048 SignalHandlerRegistration {
2049 action,
2050 mask,
2051 flags,
2052 },
2053 );
2054 }
2055 Ok(Value::Null)
2056 }
2057 _ => {
2058 let Some(vm) = self.vms.get_mut(vm_id) else {
2059 log_stale_process_event(
2060 &self.bridge,
2061 vm_id,
2062 process_id,
2063 "javascript sync RPC bridge dispatch",
2064 );
2065 return Ok(());
2066 };
2067 let resource_limits = vm.kernel.resource_limits().clone();
2068 let network_counts = vm_network_resource_counts(vm);
2069 let socket_paths = build_javascript_socket_path_context(vm)?;
2070 let Some(process) = vm.active_processes.get_mut(process_id) else {
2071 log_stale_process_event(
2072 &self.bridge,
2073 vm_id,
2074 process_id,
2075 "javascript sync RPC bridge dispatch",
2076 );
2077 return Ok(());
2078 };
2079 service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
2080 bridge: &self.bridge,
2081 vm_id,
2082 dns: &vm.dns,
2083 socket_paths: &socket_paths,
2084 kernel: &mut vm.kernel,
2085 process,
2086 sync_request: &request,
2087 resource_limits: &resource_limits,
2088 network_counts,
2089 })
2090 }
2091 };
2092
2093 let Some(vm) = self.vms.get_mut(vm_id) else {
2094 log_stale_process_event(
2095 &self.bridge,
2096 vm_id,
2097 process_id,
2098 "javascript sync RPC response delivery",
2099 );
2100 return Ok(());
2101 };
2102 let shadow_root = vm.cwd.clone();
2103 let Some(process) = vm.active_processes.get_mut(process_id) else {
2104 log_stale_process_event(
2105 &self.bridge,
2106 vm_id,
2107 process_id,
2108 "javascript sync RPC response delivery",
2109 );
2110 return Ok(());
2111 };
2112
2113 if response.is_ok()
2114 && matches!(
2115 request.method.as_str(),
2116 "fs.chmodSync" | "fs.promises.chmod"
2117 )
2118 {
2119 let guest_path =
2120 javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
2121 let mode =
2122 javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
2123 let host_path =
2124 shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
2125 if host_path.exists() {
2126 fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
2127 |error| {
2128 SidecarError::Io(format!(
2129 "failed to mirror chmod to shadow path {}: {error}",
2130 host_path.display()
2131 ))
2132 },
2133 )?;
2134 }
2135 }
2136
2137 match response {
2138 Ok(result) => process
2139 .execution
2140 .respond_javascript_sync_rpc_success(request.id, result)
2141 .or_else(ignore_stale_javascript_sync_rpc_response),
2142 Err(error) => process
2143 .execution
2144 .respond_javascript_sync_rpc_error(
2145 request.id,
2146 javascript_sync_rpc_error_code(&error),
2147 error.to_string(),
2148 )
2149 .or_else(ignore_stale_javascript_sync_rpc_response),
2150 }
2151 }
2152
2153 fn apply_self_process_kill(
2156 &mut self,
2157 vm_id: &str,
2158 process_id: &str,
2159 parsed_signal: i32,
2160 ) -> Value {
2161 let action = self
2162 .vms
2163 .get(vm_id)
2164 .and_then(|vm| vm.signal_states.get(process_id))
2165 .and_then(|handlers| handlers.get(&(parsed_signal as u32)))
2166 .map(|registration| registration.action.clone())
2167 .unwrap_or(SignalDispositionAction::Default);
2168 if action == SignalDispositionAction::Default
2169 && parsed_signal != 0
2170 && !matches!(
2171 canonical_signal_name(parsed_signal),
2172 Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
2173 )
2174 {
2175 if let Some(vm) = self.vms.get_mut(vm_id) {
2176 if let Some(process) = vm.active_processes.get_mut(process_id) {
2177 process.pending_self_signal_exit = Some(parsed_signal);
2178 }
2179 }
2180 }
2181 json!({
2182 "self": true,
2183 "action": match action {
2184 SignalDispositionAction::Default => "default",
2185 SignalDispositionAction::Ignore => "ignore",
2186 SignalDispositionAction::User => "user",
2187 },
2188 })
2189 }
2190
2191 pub(crate) fn vm_ids_for_scope(
2192 &self,
2193 ownership: &OwnershipScope,
2194 ) -> Result<Vec<String>, SidecarError> {
2195 match ownership {
2196 OwnershipScope::SessionOwnership(inner) => {
2197 self.require_owned_session(&inner.connection_id, &inner.session_id)?;
2198 Ok(self
2199 .sessions
2200 .get(&inner.session_id)
2201 .expect("owned session should exist")
2202 .vm_ids
2203 .iter()
2204 .cloned()
2205 .collect())
2206 }
2207 OwnershipScope::VmOwnership(inner) => {
2208 self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
2209 Ok(vec![inner.vm_id.clone()])
2210 }
2211 OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
2212 String::from("event polling requires session or VM ownership scope"),
2213 )),
2214 }
2215 }
2216
2217 pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
2218 let vm = self
2219 .vms
2220 .get(vm_id)
2221 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2222 Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
2223 }
2224
2225 pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
2226 self.vms
2227 .get(vm_id)
2228 .is_some_and(|vm| !vm.active_processes.is_empty())
2229 }
2230
2231 fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
2232 if self.connections.contains_key(connection_id) {
2233 Ok(())
2234 } else {
2235 Err(SidecarError::InvalidState(format!(
2236 "connection {connection_id} has not authenticated"
2237 )))
2238 }
2239 }
2240
2241 pub(crate) fn require_owned_session(
2242 &self,
2243 connection_id: &str,
2244 session_id: &str,
2245 ) -> Result<(), SidecarError> {
2246 self.require_authenticated_connection(connection_id)?;
2247 let session = self.sessions.get(session_id).ok_or_else(|| {
2248 SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
2249 })?;
2250 if session.connection_id == connection_id {
2251 Ok(())
2252 } else {
2253 Err(SidecarError::InvalidState(format!(
2254 "session {session_id} is not owned by connection {connection_id}"
2255 )))
2256 }
2257 }
2258
2259 pub(crate) fn require_owned_vm(
2260 &self,
2261 connection_id: &str,
2262 session_id: &str,
2263 vm_id: &str,
2264 ) -> Result<(), SidecarError> {
2265 self.require_owned_session(connection_id, session_id)?;
2266 let vm = self
2267 .vms
2268 .get(vm_id)
2269 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2270 if vm.connection_id != connection_id || vm.session_id != session_id {
2271 return Err(SidecarError::InvalidState(format!(
2272 "VM {vm_id} is not owned by {connection_id}/{session_id}"
2273 )));
2274 }
2275 Ok(())
2276 }
2277
2278 fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
2279 match ownership {
2280 OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
2281 OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2282 Err(SidecarError::InvalidState(String::from(
2283 "request requires connection ownership scope",
2284 )))
2285 }
2286 }
2287 }
2288
2289 fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
2290 let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
2291 return Ok(());
2292 };
2293
2294 if auth_token == expected_auth_token {
2295 Ok(())
2296 } else {
2297 Err(SidecarError::Unauthorized(String::from(
2298 "authenticate request provided an invalid auth token",
2299 )))
2300 }
2301 }
2302
2303 fn allocate_connection_id(&mut self) -> String {
2304 self.next_connection_id += 1;
2305 format!("conn-{}", self.next_connection_id)
2306 }
2307
2308 fn take_matching_process_event_envelope(
2309 &mut self,
2310 vm_id: &str,
2311 process_id: &str,
2312 ) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
2313 if let Some(index) = self
2314 .pending_process_events
2315 .iter()
2316 .position(|event| event.vm_id == vm_id && event.process_id == process_id)
2317 {
2318 return Ok(self.pending_process_events.remove(index));
2319 }
2320
2321 let mut matching_envelope = None;
2322 let mut deferred = Vec::new();
2323 {
2324 let pending_capacity = self.pending_process_event_capacity();
2325 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
2326 SidecarError::InvalidState(String::from("process event receiver unavailable"))
2327 })?;
2328 loop {
2329 if deferred.len() >= pending_capacity {
2330 if receiver.is_empty() {
2331 break;
2332 }
2333 return Err(process_event_queue_overflow_error());
2334 }
2335 let envelope = match receiver.try_recv() {
2336 Ok(envelope) => envelope,
2337 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
2338 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
2339 };
2340 if matching_envelope.is_none()
2341 && envelope.vm_id == vm_id
2342 && envelope.process_id == process_id
2343 {
2344 matching_envelope = Some(envelope);
2345 break;
2346 }
2347 deferred.push(envelope);
2348 }
2349 }
2350 for envelope in deferred {
2351 self.queue_pending_process_event(envelope)?;
2352 }
2353
2354 Ok(matching_envelope)
2355 }
2356
2357 fn allocate_sidecar_request_id(&mut self) -> RequestId {
2358 let request_id = self.next_sidecar_request_id;
2359 self.next_sidecar_request_id -= 1;
2360 request_id
2361 }
2362
2363 pub(crate) fn session_scope_for(
2364 &self,
2365 ownership: &OwnershipScope,
2366 ) -> Result<(String, String), SidecarError> {
2367 match ownership {
2368 OwnershipScope::SessionOwnership(inner) => {
2369 Ok((inner.connection_id.clone(), inner.session_id.clone()))
2370 }
2371 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2372 Err(SidecarError::InvalidState(String::from(
2373 "request requires session ownership scope",
2374 )))
2375 }
2376 }
2377 }
2378
2379 pub(crate) fn vm_scope_for(
2380 &self,
2381 ownership: &OwnershipScope,
2382 ) -> Result<(String, String, String), SidecarError> {
2383 match ownership {
2384 OwnershipScope::VmOwnership(inner) => Ok((
2385 inner.connection_id.clone(),
2386 inner.session_id.clone(),
2387 inner.vm_id.clone(),
2388 )),
2389 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
2390 SidecarError::InvalidState(String::from("request requires VM ownership scope")),
2391 ),
2392 }
2393 }
2394
2395 fn response_with_ownership(
2396 &self,
2397 request_id: RequestId,
2398 ownership: OwnershipScope,
2399 payload: ResponsePayload,
2400 ) -> ResponseFrame {
2401 ResponseFrame {
2402 schema: ProtocolSchema::current(),
2403 request_id,
2404 ownership,
2405 payload,
2406 }
2407 }
2408
2409 pub(crate) fn respond(
2410 &self,
2411 request: &RequestFrame,
2412 payload: ResponsePayload,
2413 ) -> ResponseFrame {
2414 self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
2415 }
2416
2417 fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
2418 self.respond(
2419 request,
2420 ResponsePayload::Rejected(RejectedResponse {
2421 code: code.to_owned(),
2422 message: message.to_owned(),
2423 }),
2424 )
2425 }
2426
2427 pub fn queue_sidecar_request(
2428 &mut self,
2429 ownership: OwnershipScope,
2430 payload: SidecarRequestPayload,
2431 ) -> Result<RequestId, SidecarError> {
2432 if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
2433 return Err(outbound_sidecar_request_queue_overflow_error());
2434 }
2435 if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
2436 return Err(sidecar_response_pending_overflow_error());
2437 }
2438 let request_id = self.allocate_sidecar_request_id();
2439 let request = SidecarRequestFrame::new(request_id, ownership, payload);
2440 self.pending_sidecar_responses
2441 .register_request(&request)
2442 .map_err(sidecar_response_tracker_error)?;
2443 self.outbound_sidecar_requests.push_back(request);
2444 Ok(request_id)
2445 }
2446
2447 pub fn queue_wire_sidecar_request(
2448 &mut self,
2449 ownership: crate::wire::OwnershipScope,
2450 payload: crate::wire::SidecarRequestPayload,
2451 ) -> Result<crate::wire::RequestId, SidecarError> {
2452 let ownership = crate::wire::ownership_scope_to_compat(ownership);
2453 let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
2454 .map_err(wire_protocol_error)?;
2455 self.queue_sidecar_request(ownership, payload)
2456 }
2457
2458 pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
2459 self.outbound_sidecar_requests.pop_front()
2460 }
2461
2462 pub fn pop_wire_sidecar_request(
2463 &mut self,
2464 ) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
2465 self.pop_sidecar_request()
2466 .map(crate::wire::sidecar_request_frame_from_compat)
2467 .transpose()
2468 .map_err(wire_protocol_error)
2469 }
2470
2471 pub fn accept_sidecar_response(
2472 &mut self,
2473 response: SidecarResponseFrame,
2474 ) -> Result<(), SidecarError> {
2475 self.pending_sidecar_responses
2476 .accept_response(&response)
2477 .map_err(sidecar_response_tracker_error)?;
2478 self.completed_sidecar_response_order
2479 .push_back(response.request_id);
2480 self.completed_sidecar_responses
2481 .insert(response.request_id, response);
2482 while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
2483 if let Some(evicted) = self.completed_sidecar_response_order.pop_front() {
2484 self.completed_sidecar_responses.remove(&evicted);
2485 }
2486 }
2487 Ok(())
2488 }
2489
2490 pub fn accept_wire_sidecar_response(
2491 &mut self,
2492 response: crate::wire::SidecarResponseFrame,
2493 ) -> Result<(), SidecarError> {
2494 let response =
2495 crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
2496 self.accept_sidecar_response(response)
2497 }
2498
2499 pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
2500 let response = self.completed_sidecar_responses.remove(&request_id);
2501 if response.is_some() {
2502 self.completed_sidecar_response_order
2503 .retain(|completed_id| completed_id != &request_id);
2504 }
2505 response
2506 }
2507
2508 pub fn take_wire_sidecar_response(
2509 &mut self,
2510 request_id: crate::wire::RequestId,
2511 ) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
2512 self.take_sidecar_response(request_id)
2513 .map(|response| {
2514 crate::wire::sidecar_response_frame_from_compat(response)
2515 .map_err(wire_protocol_error)
2516 })
2517 .transpose()
2518 }
2519
2520 pub(crate) fn vm_lifecycle_event(
2521 &self,
2522 connection_id: &str,
2523 session_id: &str,
2524 vm_id: &str,
2525 state: VmLifecycleState,
2526 ) -> EventFrame {
2527 EventFrame::new(
2528 OwnershipScope::vm(connection_id, session_id, vm_id),
2529 EventPayload::VmLifecycle(VmLifecycleEvent { state }),
2530 )
2531 }
2532
2533 fn ensure_request_within_frame_limit(
2534 &self,
2535 request: &RequestFrame,
2536 ) -> Result<(), SidecarError> {
2537 let frame = crate::protocol::to_generated_protocol_frame(
2538 &crate::protocol::ProtocolFrame::Request(request.clone()),
2539 )
2540 .map_err(|error| {
2541 SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
2542 })?;
2543 let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
2544 return Err(SidecarError::InvalidState(String::from(
2545 "request converted to non-request wire frame",
2546 )));
2547 };
2548
2549 crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
2550 .encode(&frame)
2551 .map(|_| ())
2552 .map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
2553 }
2554}
2555
2556impl<B> ExtensionHost for NativeSidecar<B>
2557where
2558 B: NativeSidecarBridge + Send + 'static,
2559 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2560{
2561 fn spawn_process<'a>(
2562 &'a mut self,
2563 ownership: OwnershipScope,
2564 payload: ExecuteRequest,
2565 ) -> ExtensionFuture<'a, ProcessStartedResponse> {
2566 Box::pin(async move {
2567 let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
2568 let dispatch = NativeSidecar::execute(self, &request, payload).await?;
2569 match dispatch.response.payload {
2570 ResponsePayload::ProcessStarted(response) => Ok(response),
2571 other => Err(unexpected_extension_host_response("execute", other)),
2572 }
2573 })
2574 }
2575
2576 fn write_stdin<'a>(
2577 &'a mut self,
2578 ownership: OwnershipScope,
2579 payload: WriteStdinRequest,
2580 ) -> ExtensionFuture<'a, StdinWrittenResponse> {
2581 Box::pin(async move {
2582 let request =
2583 RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
2584 let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
2585 match dispatch.response.payload {
2586 ResponsePayload::StdinWritten(response) => Ok(response),
2587 other => Err(unexpected_extension_host_response("write_stdin", other)),
2588 }
2589 })
2590 }
2591
2592 fn close_stdin<'a>(
2593 &'a mut self,
2594 ownership: OwnershipScope,
2595 payload: CloseStdinRequest,
2596 ) -> ExtensionFuture<'a, StdinClosedResponse> {
2597 Box::pin(async move {
2598 let request =
2599 RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
2600 let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
2601 match dispatch.response.payload {
2602 ResponsePayload::StdinClosed(response) => Ok(response),
2603 other => Err(unexpected_extension_host_response("close_stdin", other)),
2604 }
2605 })
2606 }
2607
2608 fn kill_process<'a>(
2609 &'a mut self,
2610 ownership: OwnershipScope,
2611 payload: KillProcessRequest,
2612 ) -> ExtensionFuture<'a, ProcessKilledResponse> {
2613 Box::pin(async move {
2614 let request =
2615 RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
2616 let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
2617 match dispatch.response.payload {
2618 ResponsePayload::ProcessKilled(response) => Ok(response),
2619 other => Err(unexpected_extension_host_response("kill_process", other)),
2620 }
2621 })
2622 }
2623
2624 fn poll_event<'a>(
2625 &'a mut self,
2626 ownership: OwnershipScope,
2627 timeout: Duration,
2628 ) -> ExtensionFuture<'a, Option<EventFrame>> {
2629 Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
2630 }
2631
2632 fn guest_filesystem_call<'a>(
2633 &'a mut self,
2634 ownership: OwnershipScope,
2635 payload: GuestFilesystemCallRequest,
2636 ) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
2637 Box::pin(async move {
2638 let request = RequestFrame::new(
2639 0,
2640 ownership,
2641 RequestPayload::GuestFilesystemCall(payload.clone()),
2642 );
2643 let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
2644 match dispatch.response.payload {
2645 ResponsePayload::GuestFilesystemResult(response) => Ok(response),
2646 other => Err(unexpected_extension_host_response(
2647 "guest_filesystem_call",
2648 other,
2649 )),
2650 }
2651 })
2652 }
2653
2654 fn bind_process_to_session<'a>(
2655 &'a mut self,
2656 ownership: OwnershipScope,
2657 namespace: String,
2658 ext_session_id: String,
2659 process_id: String,
2660 ) -> ExtensionFuture<'a, ()> {
2661 Box::pin(async move {
2662 self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
2663 })
2664 }
2665
2666 fn bind_vm_to_session<'a>(
2667 &'a mut self,
2668 ownership: OwnershipScope,
2669 namespace: String,
2670 ext_session_id: String,
2671 ) -> ExtensionFuture<'a, ()> {
2672 Box::pin(
2673 async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
2674 )
2675 }
2676
2677 fn dispose_session_resources<'a>(
2678 &'a mut self,
2679 ownership: OwnershipScope,
2680 namespace: String,
2681 ext_session_id: String,
2682 ) -> ExtensionFuture<'a, Vec<EventFrame>> {
2683 Box::pin(async move {
2684 let key = (namespace, ext_session_id);
2685 let Some(resources) = self.extension_sessions.get(&key) else {
2686 return Ok(Vec::new());
2687 };
2688 if resources.ownership != ownership {
2689 return Err(SidecarError::InvalidState(String::from(
2690 "extension session ownership did not match dispose request",
2691 )));
2692 }
2693 let resources = self
2694 .extension_sessions
2695 .remove(&key)
2696 .expect("extension resources existed before removal");
2697 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2698 for process_id in resources.process_ids {
2699 if self
2700 .vms
2701 .get(&vm_id)
2702 .is_some_and(|vm| vm.active_processes.contains_key(&process_id))
2703 {
2704 self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
2705 }
2706 }
2707 let mut events = Vec::new();
2708 for resource_vm_id in resources.vm_ids {
2709 if self.vms.contains_key(&resource_vm_id) {
2710 events.extend(
2711 self.dispose_vm_internal(
2712 &connection_id,
2713 &session_id,
2714 &resource_vm_id,
2715 DisposeReason::Requested,
2716 )
2717 .await?,
2718 );
2719 }
2720 }
2721 Ok(events)
2722 })
2723 }
2724
2725 fn start_buffering_process_output<'a>(
2726 &'a mut self,
2727 ownership: OwnershipScope,
2728 process_id: String,
2729 ) -> ExtensionFuture<'a, ()> {
2730 Box::pin(async move {
2731 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2732 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2733 let key = (vm_id, process_id);
2734 if self.extension_process_output_buffers.contains_key(&key) {
2735 return Err(SidecarError::Conflict(String::from(
2736 "extension process output buffering already started",
2737 )));
2738 }
2739 self.extension_process_output_buffers
2740 .insert(key, ExtensionBufferedProcessOutput::default());
2741 Ok(())
2742 })
2743 }
2744
2745 fn handoff_buffered_process_output<'a>(
2746 &'a mut self,
2747 ownership: OwnershipScope,
2748 namespace: String,
2749 ext_session_id: String,
2750 process_id: String,
2751 timeout: Duration,
2752 ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
2753 Box::pin(async move {
2754 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2755 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2756 let key = (vm_id.clone(), process_id.clone());
2757 let deadline = Instant::now() + timeout;
2758 loop {
2759 self.pump_process_events(&ownership).await?;
2760 while let Some(envelope) =
2761 self.take_matching_process_event_envelope(&vm_id, &process_id)?
2762 {
2763 if self.capture_extension_process_output_event(
2764 &vm_id,
2765 &process_id,
2766 &envelope.event,
2767 ) {
2768 continue;
2769 }
2770 self.queue_pending_process_event(envelope)?;
2771 break;
2772 }
2773 let buffered = self
2774 .extension_process_output_buffers
2775 .get(&key)
2776 .is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
2777 if buffered || timeout.is_zero() || Instant::now() >= deadline {
2778 break;
2779 }
2780 let remaining = deadline.saturating_duration_since(Instant::now());
2781 time::sleep(remaining.min(Duration::from_millis(10))).await;
2782 }
2783 self.bind_extension_process_resource(
2784 ownership,
2785 namespace,
2786 ext_session_id,
2787 process_id.clone(),
2788 )?;
2789 self.extension_process_output_buffers
2790 .remove(&key)
2791 .ok_or_else(|| {
2792 SidecarError::InvalidState(String::from(
2793 "extension process output buffering was not started",
2794 ))
2795 })
2796 })
2797 }
2798}
2799
2800fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
2801 match payload {
2802 ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
2803 "extension {operation} rejected with {}: {}",
2804 response.code, response.message
2805 )),
2806 other => SidecarError::InvalidState(format!(
2807 "extension {operation} returned unexpected response: {other:?}"
2808 )),
2809 }
2810}
2811
2812fn shadow_host_path_for_process(
2813 shadow_root: &Path,
2814 process_guest_cwd: &str,
2815 guest_path: &str,
2816) -> PathBuf {
2817 let normalized_guest_path = if guest_path.starts_with('/') {
2818 normalize_path(guest_path)
2819 } else {
2820 normalize_path(&format!(
2821 "{}/{}",
2822 process_guest_cwd.trim_end_matches('/'),
2823 guest_path
2824 ))
2825 };
2826 if normalized_guest_path == "/" {
2827 shadow_root.to_path_buf()
2828 } else {
2829 shadow_root.join(normalized_guest_path.trim_start_matches('/'))
2830 }
2831}
2832
2833fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
2834 SidecarError::InvalidState(format!(
2835 "invalid sidecar response correlation state: {error}"
2836 ))
2837}
2838
2839fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
2840 match decision.verdict {
2841 secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
2842 secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
2843 decision
2844 .reason
2845 .unwrap_or_else(|| String::from("denied by host")),
2846 ),
2847 secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
2848 decision
2849 .reason
2850 .unwrap_or_else(|| String::from("permission prompt required")),
2851 ),
2852 }
2853}
2854
2855fn audit_timestamp() -> String {
2856 SystemTime::now()
2857 .duration_since(UNIX_EPOCH)
2858 .expect("system time before unix epoch")
2859 .as_millis()
2860 .to_string()
2861}
2862
2863pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
2864where
2865 I: IntoIterator<Item = (K, V)>,
2866 K: Into<String>,
2867 V: Into<String>,
2868{
2869 let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
2870 for (key, value) in fields {
2871 mapped.insert(key.into(), value.into());
2872 }
2873 mapped
2874}
2875
2876pub(crate) fn emit_structured_event<B>(
2877 bridge: &SharedBridge<B>,
2878 vm_id: &str,
2879 name: &str,
2880 fields: BTreeMap<String, String>,
2881) -> Result<(), SidecarError>
2882where
2883 B: NativeSidecarBridge + Send + 'static,
2884 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2885{
2886 bridge.with_mut(|bridge| {
2887 bridge.emit_structured_event(StructuredEventRecord {
2888 vm_id: vm_id.to_owned(),
2889 name: name.to_owned(),
2890 fields,
2891 })
2892 })
2893}
2894
2895pub(crate) fn emit_security_audit_event<B>(
2896 bridge: &SharedBridge<B>,
2897 vm_id: &str,
2898 name: &str,
2899 fields: BTreeMap<String, String>,
2900) where
2901 B: NativeSidecarBridge + Send + 'static,
2902 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2903{
2904 let _ = emit_structured_event(bridge, vm_id, name, fields);
2905}
2906
2907pub(crate) fn log_stale_process_event<B>(
2908 bridge: &SharedBridge<B>,
2909 vm_id: &str,
2910 process_id: &str,
2911 context: &str,
2912) where
2913 B: NativeSidecarBridge + Send + 'static,
2914 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2915{
2916 let _ = bridge.emit_log(
2917 vm_id,
2918 format!(
2919 "Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
2920 ),
2921 );
2922}
2923
2924pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
2927 SidecarError::InvalidState(format!("root filesystem: {error}"))
2928}
2929
2930pub(crate) fn normalize_path(path: &str) -> String {
2931 let mut segments = Vec::new();
2932 for component in Path::new(path).components() {
2933 match component {
2934 Component::RootDir => segments.clear(),
2935 Component::ParentDir => {
2936 segments.pop();
2937 }
2938 Component::CurDir => {}
2939 Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
2940 Component::Prefix(prefix) => {
2941 segments.push(prefix.as_os_str().to_string_lossy().into_owned());
2942 }
2943 }
2944 }
2945
2946 let normalized = format!("/{}", segments.join("/"));
2947 if normalized.is_empty() {
2948 String::from("/")
2949 } else {
2950 normalized
2951 }
2952}
2953
2954pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
2955 let mut normalized = PathBuf::new();
2956
2957 for component in path.components() {
2958 match component {
2959 Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
2960 Component::RootDir => normalized.push(Path::new("/")),
2961 Component::CurDir => {}
2962 Component::ParentDir => {
2963 if normalized != Path::new("/") {
2964 normalized.pop();
2965 }
2966 }
2967 Component::Normal(part) => normalized.push(part),
2968 }
2969 }
2970
2971 if normalized.as_os_str().is_empty() {
2972 if path.is_absolute() {
2973 PathBuf::from("/")
2974 } else {
2975 PathBuf::from(".")
2976 }
2977 } else {
2978 normalized
2979 }
2980}
2981
2982pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
2983 path == root || path.starts_with(root)
2984}
2985
2986pub(crate) fn dirname(path: &str) -> String {
2987 let normalized = normalize_path(path);
2988 let parent = Path::new(&normalized)
2989 .parent()
2990 .unwrap_or_else(|| Path::new("/"));
2991 let value = parent.to_string_lossy();
2992 if value.is_empty() {
2993 String::from("/")
2994 } else {
2995 value.into_owned()
2996 }
2997}
2998
2999pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
3000 SidecarError::Kernel(error.to_string())
3001}
3002
3003pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
3004 SidecarError::Plugin(error.to_string())
3005}
3006
3007pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
3008 SidecarError::Execution(error.to_string())
3009}
3010
3011pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
3012 SidecarError::Execution(error.to_string())
3013}
3014
3015pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
3016 SidecarError::Execution(error.to_string())
3017}
3018
3019pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
3020 SidecarError::Kernel(error.to_string())
3021}
3022
3023#[allow(dead_code)]
3033const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n - pnpm -> add `node-linker=hoisted` to .npmrc, then reinstall\n - yarn berry -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n - bun -> install dependencies outside a workspace (workspaces use a .bun store)\n - npm / yarn classic -> already flat, no change needed";
3034
3035#[allow(dead_code)]
3045fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
3046 const STORE_MARKERS: &[&str] = &[
3051 "node_modules/.pnpm/",
3052 "node_modules/.bun/",
3053 "node_modules/.store/",
3054 "/__virtual__/",
3055 ];
3056 const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
3060 const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
3061
3062 if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
3063 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3064 }
3065
3066 let missing = stderr.contains("ENOENT")
3067 || stderr.contains("no such file or directory")
3068 || stderr.contains("Cannot find module")
3069 || stderr.contains("MODULE_NOT_FOUND");
3070 if !missing {
3071 return None;
3072 }
3073 if STORE_MARKERS.iter().any(|m| stderr.contains(m))
3074 || PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
3075 {
3076 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3077 }
3078 None
3079}
3080
3081#[cfg(test)]
3082mod symlinked_node_modules_hint_tests {
3083 use super::symlinked_node_modules_hint;
3084
3085 #[test]
3087 fn matches_pnpm_store_enoent() {
3088 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
3091 let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
3092 assert!(hint.contains("secure-exec can't load mounted node_modules"));
3093 assert!(!hint.contains("agent-os"));
3094 }
3095
3096 #[test]
3097 fn matches_bun_store_enoent() {
3098 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
3099 assert!(symlinked_node_modules_hint(stderr).is_some());
3100 }
3101
3102 #[test]
3103 fn matches_yarn_pnpm_store_enoent() {
3104 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
3105 assert!(symlinked_node_modules_hint(stderr).is_some());
3106 }
3107
3108 #[test]
3109 fn matches_pnp_declared_error() {
3110 let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
3112 assert!(symlinked_node_modules_hint(stderr).is_some());
3113 }
3114
3115 #[test]
3116 fn matches_pnp_cjs_module_not_found() {
3117 let stderr = "Error: Cannot find module 'is-odd'\n at /root/.pnp.cjs:12345:18\n code: 'MODULE_NOT_FOUND'";
3118 assert!(symlinked_node_modules_hint(stderr).is_some());
3119 }
3120
3121 #[test]
3122 fn matches_virtual_instance() {
3123 let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
3124 assert!(symlinked_node_modules_hint(stderr).is_some());
3125 }
3126
3127 #[test]
3129 fn ignores_enoent_outside_a_store() {
3130 let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
3131 assert!(symlinked_node_modules_hint(stderr).is_none());
3132 }
3133
3134 #[test]
3135 fn ignores_store_path_without_missing_file() {
3136 let stderr =
3137 "loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
3138 assert!(symlinked_node_modules_hint(stderr).is_none());
3139 }
3140
3141 #[test]
3142 fn ignores_flat_node_modules_enoent() {
3143 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
3145 assert!(symlinked_node_modules_hint(stderr).is_none());
3146 }
3147
3148 #[test]
3149 fn ignores_unrelated_failure() {
3150 let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
3151 assert!(symlinked_node_modules_hint(stderr).is_none());
3152 }
3153}