1use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
2pub(crate) use crate::execution::{
3 build_javascript_socket_path_context, canonical_signal_name, dispatch_loopback_http_request,
4 error_code, format_tcp_resource, ignore_stale_javascript_sync_rpc_response,
5 javascript_sync_rpc_arg_i32, javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32,
6 javascript_sync_rpc_arg_u32_optional, javascript_sync_rpc_arg_u64,
7 javascript_sync_rpc_arg_u64_optional, javascript_sync_rpc_bytes_arg,
8 javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding, javascript_sync_rpc_error_code,
9 javascript_sync_rpc_option_bool, javascript_sync_rpc_option_u32, parse_signal,
10 sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
11 vm_network_resource_counts, JavascriptSyncRpcServiceRequest, LoopbackHttpDispatchRequest,
12};
13use crate::extension::{
14 Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
15 ExtensionSnapshot,
16};
17use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
18use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
19use crate::protocol::{
20 AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
21 ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
22 GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
23 JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
24 PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
25 ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
26 RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
27 SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
28 SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
29 SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
30 VmLifecycleState, WriteStdinRequest,
31};
32use crate::state::{
33 ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
34 JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
35 SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
36};
37use crate::tools::register_host_callbacks;
38use crate::NativeSidecarBridge;
39use secure_exec_bridge::queue_tracker::{register_queue, QueueGauge, TrackedLimit};
40use secure_exec_bridge::{
41 CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
42 FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
43 NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
44};
45use secure_exec_execution::{
46 JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
47 PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
48};
49use secure_exec_kernel::kernel::KernelError;
50use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
51use secure_exec_kernel::permissions::{
52 permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
53 NetworkAccessRequest, NetworkOperation, PermissionDecision,
54};
55use secure_exec_kernel::vfs::VfsError;
57use serde::Deserialize;
58use serde_json::{json, Value};
59use std::collections::{BTreeMap, BTreeSet, VecDeque};
60use std::fmt;
61use std::fs;
62use std::os::unix::fs::PermissionsExt;
63use std::path::{Component, Path, PathBuf};
64use std::sync::{Arc, Mutex};
65use std::task::{Context, Poll, Waker};
66use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
67use tokio::sync::mpsc::{channel, Receiver, Sender};
68use tokio::time;
69
70const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
73 &["AGENTOS_ENTRYPOINT", "AGENTOS_BOOTSTRAP_MODULE"];
74const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
75 &["AGENTOS_WASM_MODULE_PATH", "AGENTOS_WASM_MODULE_BASE64"];
76const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENTOS_PYTHON_"];
77pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
78pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
79pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
80pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
81
82pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
83 SidecarError::InvalidState(format!(
84 "process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
85 ))
86}
87
88fn sidecar_response_pending_overflow_error() -> SidecarError {
89 SidecarError::InvalidState(format!(
90 "sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
91 ))
92}
93
94fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
95 SidecarError::InvalidState(format!(
96 "outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
97 ))
98}
99
100fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
101 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
102}
103
104pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
106
107#[derive(Debug, Default, Deserialize)]
110struct LegacyJavascriptChildProcessSpawnOptions {
111 #[serde(default)]
112 cwd: Option<String>,
113 #[serde(default)]
114 env: BTreeMap<String, String>,
115 #[serde(default)]
116 input: Option<Value>,
117 #[serde(default)]
118 shell: bool,
119 #[serde(default)]
120 detached: bool,
121 #[serde(default)]
122 stdio: Vec<String>,
123 #[serde(default, rename = "maxBuffer")]
124 max_buffer: Option<usize>,
125 #[serde(default)]
126 timeout: Option<u64>,
127 #[serde(default, rename = "killSignal")]
128 kill_signal: Option<String>,
129}
130
131#[derive(Debug, Deserialize)]
132#[serde(rename_all = "camelCase")]
133struct JavascriptHttpLoopbackRequest {
134 process_id: String,
135 server_id: u64,
136 host: String,
137 port: u16,
138 request: String,
139}
140
141fn is_javascript_loopback_host(host: &str) -> bool {
142 host == "127.0.0.1" || host == "::1" || host.eq_ignore_ascii_case("localhost")
143}
144
145pub(crate) fn parse_javascript_child_process_spawn_request(
146 vm: &VmState,
147 args: &[Value],
148) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
149 if let Some(value) = args.first().cloned() {
150 if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
151 return Ok((request, None));
152 }
153 }
154
155 let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
156 let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
157 let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
158
159 let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
160 SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
161 })?;
162 let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
163 raw_options,
164 )
165 .map_err(|error| {
166 SidecarError::InvalidState(format!(
167 "invalid child_process.spawn options payload: {error}"
168 ))
169 })?;
170
171 Ok((
172 JavascriptChildProcessSpawnRequest {
173 command,
174 args: parsed_args,
175 options: JavascriptChildProcessSpawnOptions {
176 cwd: parsed_options.cwd,
177 env: parsed_options.env,
178 internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
179 &vm.guest_env,
180 ),
181 input: parsed_options.input,
182 shell: parsed_options.shell,
183 detached: parsed_options.detached,
184 stdio: parsed_options.stdio,
185 timeout: parsed_options.timeout,
186 kill_signal: parsed_options.kill_signal,
187 },
188 },
189 parsed_options.max_buffer,
190 ))
191}
192
193impl<B> SharedBridge<B> {
194 fn new(bridge: B) -> Self {
195 Self {
196 inner: Arc::new(Mutex::new(bridge)),
197 permissions: Arc::new(Mutex::new(BTreeMap::new())),
198 #[cfg(test)]
199 set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
200 }
201 }
202}
203
204impl<B> SharedBridge<B>
205where
206 B: NativeSidecarBridge + Send + 'static,
207 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
208{
209 pub(crate) fn with_mut<T>(
210 &self,
211 operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
212 ) -> Result<T, SidecarError> {
213 let mut bridge = self.inner.lock().map_err(|_| {
214 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
215 })?;
216 operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
217 }
218
219 fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
220 let mut bridge = self.inner.lock().map_err(|_| {
221 SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
222 })?;
223 Ok(operation(&mut bridge))
224 }
225
226 #[cfg(test)]
227 #[allow(dead_code)]
228 pub(crate) fn queue_set_vm_permissions_result(
229 &self,
230 result: Result<(), SidecarError>,
231 ) -> Result<(), SidecarError> {
232 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
233 SidecarError::Bridge(String::from(
234 "native sidecar test set_vm_permissions outcome lock poisoned",
235 ))
236 })?;
237 outcomes.push_back(result.err());
238 Ok(())
239 }
240
241 pub(crate) fn emit_lifecycle(
242 &self,
243 vm_id: &str,
244 state: LifecycleState,
245 ) -> Result<(), SidecarError> {
246 self.with_mut(|bridge| {
247 bridge.emit_lifecycle(LifecycleEventRecord {
248 vm_id: vm_id.to_owned(),
249 state,
250 detail: None,
251 })
252 })
253 }
254
255 pub(crate) fn emit_log(
256 &self,
257 vm_id: &str,
258 message: impl Into<String>,
259 ) -> Result<(), SidecarError> {
260 self.with_mut(|bridge| {
261 bridge.emit_log(LogRecord {
262 vm_id: vm_id.to_owned(),
263 level: LogLevel::Info,
264 message: message.into(),
265 })
266 })
267 }
268
269 pub(crate) fn filesystem_decision(
270 &self,
271 vm_id: &str,
272 path: &str,
273 access: FilesystemAccess,
274 ) -> PermissionDecision {
275 if let Some(decision) = self.static_permission_decision(
276 vm_id,
277 filesystem_permission_capability(access),
278 "fs",
279 Some(path),
280 ) {
281 return decision;
282 }
283 match self.with_mut(|bridge| {
284 bridge.check_filesystem_access(FilesystemPermissionRequest {
285 vm_id: vm_id.to_owned(),
286 path: path.to_owned(),
287 access,
288 })
289 }) {
290 Ok(decision) => map_bridge_permission(decision),
291 Err(error) => PermissionDecision::deny(error.to_string()),
292 }
293 }
294
295 pub(crate) fn command_decision(
296 &self,
297 vm_id: &str,
298 request: &CommandAccessRequest,
299 ) -> PermissionDecision {
300 if is_internal_runtime_command_request(request) {
301 return PermissionDecision::allow();
302 }
303 if let Some(decision) = self.static_permission_decision(
304 vm_id,
305 "child_process.spawn",
306 "child_process",
307 Some(&request.command),
308 ) {
309 return decision;
310 }
311 match self.with_mut(|bridge| {
312 bridge.check_command_execution(CommandPermissionRequest {
313 vm_id: vm_id.to_owned(),
314 command: request.command.clone(),
315 args: request.args.clone(),
316 cwd: request.cwd.clone(),
317 env: request.env.clone(),
318 })
319 }) {
320 Ok(decision) => map_bridge_permission(decision),
321 Err(error) => PermissionDecision::deny(error.to_string()),
322 }
323 }
324
325 pub(crate) fn environment_decision(
326 &self,
327 vm_id: &str,
328 request: &EnvAccessRequest,
329 ) -> PermissionDecision {
330 if let Some(decision) = self.static_permission_decision(
331 vm_id,
332 environment_permission_capability(request.op),
333 "env",
334 Some(&request.key),
335 ) {
336 return decision;
337 }
338 match self.with_mut(|bridge| {
339 bridge.check_environment_access(EnvironmentPermissionRequest {
340 vm_id: vm_id.to_owned(),
341 access: match request.op {
342 EnvironmentOperation::Read => EnvironmentAccess::Read,
343 EnvironmentOperation::Write => EnvironmentAccess::Write,
344 },
345 key: request.key.clone(),
346 value: request.value.clone(),
347 })
348 }) {
349 Ok(decision) => map_bridge_permission(decision),
350 Err(error) => PermissionDecision::deny(error.to_string()),
351 }
352 }
353
354 pub(crate) fn network_decision(
355 &self,
356 vm_id: &str,
357 request: &NetworkAccessRequest,
358 ) -> PermissionDecision {
359 if let Some(decision) = self.static_permission_decision(
360 vm_id,
361 network_permission_capability(request.op),
362 "network",
363 Some(&request.resource),
364 ) {
365 return decision;
366 }
367 match self.with_mut(|bridge| {
368 bridge.check_network_access(NetworkPermissionRequest {
369 vm_id: vm_id.to_owned(),
370 access: match request.op {
371 NetworkOperation::Fetch => NetworkAccess::Fetch,
372 NetworkOperation::Http => NetworkAccess::Http,
373 NetworkOperation::Dns => NetworkAccess::Dns,
374 NetworkOperation::Listen => NetworkAccess::Listen,
375 },
376 resource: request.resource.clone(),
377 })
378 }) {
379 Ok(decision) => map_bridge_permission(decision),
380 Err(error) => PermissionDecision::deny(error.to_string()),
381 }
382 }
383
384 pub(crate) fn require_network_access(
385 &self,
386 vm_id: &str,
387 op: NetworkOperation,
388 resource: impl Into<String>,
389 ) -> Result<(), SidecarError> {
390 let resource = resource.into();
391 let decision = self.network_decision(
392 vm_id,
393 &NetworkAccessRequest {
394 vm_id: vm_id.to_owned(),
395 op,
396 resource: resource.clone(),
397 },
398 );
399 if decision.allow {
400 return Ok(());
401 }
402
403 let message = match decision.reason.as_deref() {
404 Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
405 None => format!("EACCES: permission denied, {resource}"),
406 };
407 Err(SidecarError::Execution(message))
408 }
409
410 pub(crate) fn set_vm_permissions(
411 &self,
412 vm_id: &str,
413 permissions: &PermissionsPolicy,
414 ) -> Result<(), SidecarError> {
415 #[cfg(test)]
416 {
417 let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
418 SidecarError::Bridge(String::from(
419 "native sidecar test set_vm_permissions outcome lock poisoned",
420 ))
421 })?;
422 if let Some(Some(error)) = outcomes.pop_front() {
423 return Err(error);
424 }
425 }
426
427 let mut stored = self.permissions.lock().map_err(|_| {
428 SidecarError::Bridge(String::from(
429 "native sidecar permission policy lock poisoned",
430 ))
431 })?;
432 stored.insert(vm_id.to_owned(), permissions.clone());
433 Ok(())
434 }
435
436 pub(crate) fn restore_vm_permissions_fail_closed(
437 &self,
438 vm_id: &str,
439 original_permissions: &PermissionsPolicy,
440 context: &str,
441 operation_error: &SidecarError,
442 ) -> Result<(), SidecarError> {
443 match self.set_vm_permissions(vm_id, original_permissions) {
444 Ok(()) => Ok(()),
445 Err(restore_error) => {
446 let deny_all = PermissionsPolicy::deny_all();
447 match self.set_vm_permissions(vm_id, &deny_all) {
448 Ok(()) => Err(SidecarError::InvalidState(format!(
449 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
450 ))),
451 Err(deny_all_error) => panic!(
452 "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
453 ),
454 }
455 }
456 }
457 }
458
459 pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
460 let mut stored = self.permissions.lock().map_err(|_| {
461 SidecarError::Bridge(String::from(
462 "native sidecar permission policy lock poisoned",
463 ))
464 })?;
465 stored.remove(vm_id);
466 Ok(())
467 }
468
469 pub(crate) fn static_permission_decision(
470 &self,
471 vm_id: &str,
472 capability: &str,
473 domain: &str,
474 resource: Option<&str>,
475 ) -> Option<PermissionDecision> {
476 let stored = self.permissions.lock().ok()?;
477 let permissions = stored.get(vm_id)?;
478 let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
479 Some(permission_mode_to_kernel_decision(mode, capability))
480 }
481}
482
483pub(crate) fn evaluate_permissions_policy(
484 permissions: &PermissionsPolicy,
485 domain: &str,
486 capability: &str,
487 resource: Option<&str>,
488) -> PermissionMode {
489 match domain {
490 "fs" => evaluate_fs_permission_scope(
491 permissions.fs.as_ref(),
492 capability_operation(capability, domain),
493 resource,
494 ),
495 "network" => evaluate_pattern_permission_scope(
496 permissions.network.as_ref(),
497 capability_operation(capability, domain),
498 resource,
499 ),
500 "child_process" => evaluate_pattern_permission_scope(
501 permissions.child_process.as_ref(),
502 capability_operation(capability, domain),
503 resource,
504 ),
505 "process" => evaluate_pattern_permission_scope(
506 permissions.process.as_ref(),
507 capability_operation(capability, domain),
508 resource,
509 ),
510 "env" => evaluate_pattern_permission_scope(
511 permissions.env.as_ref(),
512 capability_operation(capability, domain),
513 resource,
514 ),
515 "binding" => evaluate_pattern_permission_scope(
516 permissions.binding.as_ref(),
517 capability_operation(capability, domain),
518 resource,
519 ),
520 _ => PermissionMode::Deny,
521 }
522}
523
524fn evaluate_fs_permission_scope(
525 scope: Option<&FsPermissionScope>,
526 operation: &str,
527 resource: Option<&str>,
528) -> PermissionMode {
529 match scope {
530 Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
531 Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
532 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
533 for rule in &rules.rules {
534 if fs_rule_matches(rule, operation, resource) {
535 mode = rule.mode.clone();
536 }
537 }
538 mode
539 }
540 None => PermissionMode::Deny,
541 }
542}
543
544fn evaluate_pattern_permission_scope(
545 scope: Option<&PatternPermissionScope>,
546 operation: &str,
547 resource: Option<&str>,
548) -> PermissionMode {
549 match scope {
550 Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
551 Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
552 let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
553 for rule in &rules.rules {
554 if pattern_rule_matches(rule, operation, resource) {
555 mode = rule.mode.clone();
556 }
557 }
558 mode
559 }
560 None => PermissionMode::Deny,
561 }
562}
563
564fn fs_rule_matches(
565 rule: &crate::protocol::FsPermissionRule,
566 operation: &str,
567 resource: Option<&str>,
568) -> bool {
569 let operations_match = permission_operation_matches(&rule.operations, operation);
570 let paths_match = permission_resource_matches(&rule.paths, resource);
571 operations_match && paths_match
572}
573
574fn pattern_rule_matches(
575 rule: &PatternPermissionRule,
576 operation: &str,
577 resource: Option<&str>,
578) -> bool {
579 let operations_match = permission_operation_matches(&rule.operations, operation);
580 let patterns_match = permission_resource_matches(&rule.patterns, resource);
581 operations_match && patterns_match
582}
583
584fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
585 candidates
586 .iter()
587 .any(|candidate| candidate == "*" || candidate == operation)
588}
589
590fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
591 resource.is_some_and(|value| {
592 patterns
593 .iter()
594 .any(|pattern| permission_glob_matches(pattern, value))
595 })
596}
597
598pub(crate) fn validate_permissions_policy(
599 permissions: &PermissionsPolicy,
600) -> Result<(), SidecarError> {
601 if let Some(scope) = permissions.fs.as_ref() {
602 validate_fs_permission_scope("fs", scope)?;
603 }
604 if let Some(scope) = permissions.network.as_ref() {
605 validate_pattern_permission_scope("network", scope)?;
606 }
607 if let Some(scope) = permissions.child_process.as_ref() {
608 validate_pattern_permission_scope("child_process", scope)?;
609 }
610 if let Some(scope) = permissions.process.as_ref() {
611 validate_pattern_permission_scope("process", scope)?;
612 }
613 if let Some(scope) = permissions.env.as_ref() {
614 validate_pattern_permission_scope("env", scope)?;
615 }
616 if let Some(scope) = permissions.binding.as_ref() {
617 validate_pattern_permission_scope("binding", scope)?;
618 }
619 Ok(())
620}
621
622fn validate_fs_permission_scope(
623 domain: &str,
624 scope: &FsPermissionScope,
625) -> Result<(), SidecarError> {
626 let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
627 return Ok(());
628 };
629
630 for (index, rule) in rule_set.rules.iter().enumerate() {
631 validate_permission_rule_field(
632 &rule.operations,
633 &format!("{domain}.rules[{index}].operations"),
634 )?;
635 validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
636 }
637
638 Ok(())
639}
640
641fn validate_pattern_permission_scope(
642 domain: &str,
643 scope: &PatternPermissionScope,
644) -> Result<(), SidecarError> {
645 let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
646 return Ok(());
647 };
648
649 for (index, rule) in rule_set.rules.iter().enumerate() {
650 validate_permission_rule_field(
651 &rule.operations,
652 &format!("{domain}.rules[{index}].operations"),
653 )?;
654 validate_permission_rule_field(
655 &rule.patterns,
656 &format!("{domain}.rules[{index}].patterns"),
657 )?;
658 }
659
660 Ok(())
661}
662
663fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
664 if values.is_empty() {
665 return Err(SidecarError::InvalidState(format!(
666 "invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
667 )));
668 }
669 Ok(())
670}
671
672fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
673 capability
674 .strip_prefix(domain)
675 .and_then(|value| value.strip_prefix('.'))
676 .unwrap_or("")
677}
678
679fn permission_mode_to_kernel_decision(
680 mode: PermissionMode,
681 capability: &str,
682) -> PermissionDecision {
683 match mode {
684 PermissionMode::Allow => PermissionDecision::allow(),
685 PermissionMode::Ask => {
686 PermissionDecision::deny(format!("permission prompt required for {capability}"))
687 }
688 PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
689 }
690}
691
692pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
693 match access {
694 FilesystemAccess::Read => "fs.read",
695 FilesystemAccess::Write => "fs.write",
696 FilesystemAccess::Stat => "fs.stat",
697 FilesystemAccess::ReadDir => "fs.readdir",
698 FilesystemAccess::CreateDir => "fs.create_dir",
699 FilesystemAccess::Remove => "fs.rm",
700 FilesystemAccess::Rename => "fs.rename",
701 FilesystemAccess::Symlink => "fs.symlink",
702 FilesystemAccess::ReadLink => "fs.readlink",
703 FilesystemAccess::Chmod => "fs.chmod",
704 FilesystemAccess::Truncate => "fs.truncate",
705 }
706}
707
708fn network_permission_capability(operation: NetworkOperation) -> &'static str {
709 match operation {
710 NetworkOperation::Fetch => "network.fetch",
711 NetworkOperation::Http => "network.http",
712 NetworkOperation::Dns => "network.dns",
713 NetworkOperation::Listen => "network.listen",
714 }
715}
716
717fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
718 match operation {
719 EnvironmentOperation::Read => "env.read",
720 EnvironmentOperation::Write => "env.write",
721 }
722}
723
724fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
725 match request.command.as_str() {
726 "node" => request
727 .env
728 .keys()
729 .any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
730 "wasm" => request
731 .env
732 .keys()
733 .any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
734 "python" => request.env.keys().any(|key| {
735 INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
736 .iter()
737 .any(|prefix| key.starts_with(prefix))
738 }),
739 _ => false,
740 }
741}
742
743fn ownership_matches_process_event(
744 ownership: &OwnershipScope,
745 event: &ProcessEventEnvelope,
746) -> bool {
747 match ownership {
748 OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
749 OwnershipScope::SessionOwnership(inner) => {
750 inner.connection_id == event.connection_id && inner.session_id == event.session_id
751 }
752 OwnershipScope::VmOwnership(inner) => {
753 inner.connection_id == event.connection_id
754 && inner.session_id == event.session_id
755 && inner.vm_id == event.vm_id
756 }
757 }
758}
759
760fn public_process_event_matches_ownership<B>(
761 sidecar: &NativeSidecar<B>,
762 ownership: &OwnershipScope,
763 event: &ProcessEventEnvelope,
764) -> bool
765where
766 B: NativeSidecarBridge + Send + 'static,
767 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
768{
769 if !ownership_matches_process_event(ownership, event) {
770 return false;
771 }
772
773 if event.process_id.contains('/') {
774 return false;
775 }
776
777 let _ = sidecar;
780 true
781}
782
783fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
784 let mut context = Context::from_waker(Waker::noop());
785 match future.poll(&mut context) {
786 Poll::Ready(output) => Some(output),
787 Poll::Pending => None,
788 }
789}
790
791impl JavascriptSocketPathContext {
796 pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
797 self.loopback_exempt_ports.contains(&port)
798 || self
799 .tcp_loopback_guest_to_host_ports
800 .keys()
801 .any(|(_, guest_port)| *guest_port == port)
802 }
803
804 pub(crate) fn translate_tcp_loopback_port(
805 &self,
806 family: JavascriptSocketFamily,
807 port: u16,
808 ) -> Option<u16> {
809 self.tcp_loopback_guest_to_host_ports
810 .get(&(family, port))
811 .copied()
812 }
813
814 pub(crate) fn http_loopback_target(
815 &self,
816 family: JavascriptSocketFamily,
817 port: u16,
818 ) -> Option<&crate::state::JavascriptHttpLoopbackTarget> {
819 self.http_loopback_targets.get(&(family, port))
820 }
821
822 pub(crate) fn translate_udp_loopback_port(
823 &self,
824 family: JavascriptSocketFamily,
825 port: u16,
826 ) -> Option<u16> {
827 self.udp_loopback_guest_to_host_ports
828 .get(&(family, port))
829 .copied()
830 }
831
832 pub(crate) fn guest_udp_port_for_host_port(
833 &self,
834 family: JavascriptSocketFamily,
835 port: u16,
836 ) -> Option<u16> {
837 self.udp_loopback_host_to_guest_ports
838 .get(&(family, port))
839 .copied()
840 }
841}
842
843pub struct NativeSidecar<B> {
846 pub(crate) config: NativeSidecarConfig,
847 pub(crate) bridge: SharedBridge<B>,
848 pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
849 pub(crate) cache_root: PathBuf,
850 pub(crate) javascript_engine: JavascriptExecutionEngine,
851 pub(crate) python_engine: PythonExecutionEngine,
852 pub(crate) wasm_engine: WasmExecutionEngine,
853 pub(crate) next_connection_id: usize,
854 pub(crate) next_session_id: usize,
855 pub(crate) next_vm_id: usize,
856 pub(crate) next_sidecar_request_id: RequestId,
857 pub(crate) connections: BTreeMap<String, ConnectionState>,
858 pub(crate) sessions: BTreeMap<String, SessionState>,
859 pub(crate) vms: BTreeMap<String, VmState>,
860 #[allow(dead_code)]
861 pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
862 pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
863 pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
864 pub(crate) pending_sidecar_responses: SidecarResponseTracker,
865 pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
866 pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
867 pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
868 pub(crate) completed_sidecar_responses_gauge: Arc<QueueGauge>,
869 pub(crate) pending_process_events_gauge: Arc<QueueGauge>,
870 pub(crate) pending_sidecar_responses_gauge: Arc<QueueGauge>,
871 pub(crate) outbound_sidecar_requests_gauge: Arc<QueueGauge>,
872 pub(crate) sidecar_requests: SharedSidecarRequestClient,
873 pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
874 pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
875 pub(crate) extension_process_output_buffers:
876 BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
877}
878
879#[derive(Debug)]
880pub(crate) struct ExtensionSessionResources {
881 pub(crate) ownership: OwnershipScope,
882 pub(crate) process_ids: BTreeSet<String>,
883 pub(crate) vm_ids: BTreeSet<String>,
884}
885
886impl<B> fmt::Debug for NativeSidecar<B> {
887 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888 f.debug_struct("NativeSidecar")
889 .field("config", &self.config)
890 .field("cache_root", &self.cache_root)
891 .field("next_connection_id", &self.next_connection_id)
892 .field("next_session_id", &self.next_session_id)
893 .field("next_vm_id", &self.next_vm_id)
894 .field("connection_count", &self.connections.len())
895 .field("session_count", &self.sessions.len())
896 .field("vm_count", &self.vms.len())
897 .field("extension_session_count", &self.extension_sessions.len())
898 .field(
899 "extension_process_output_buffer_count",
900 &self.extension_process_output_buffers.len(),
901 )
902 .finish()
903 }
904}
905
906impl<B> NativeSidecar<B>
907where
908 B: NativeSidecarBridge + Send + 'static,
909 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
910{
911 pub fn new(bridge: B) -> Result<Self, SidecarError> {
912 Self::with_config(bridge, NativeSidecarConfig::default())
913 }
914
915 pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
916 if matches!(config.expected_auth_token.as_deref(), Some("")) {
917 return Err(SidecarError::InvalidState(String::from(
918 "native sidecar expected_auth_token must not be empty",
919 )));
920 }
921
922 let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
923 std::env::temp_dir().join(format!(
924 "{}-{}",
925 config.sidecar_id,
926 SystemTime::now()
927 .duration_since(UNIX_EPOCH)
928 .expect("system time before unix epoch")
929 .as_nanos()
930 ))
931 });
932 fs::create_dir_all(&cache_root).map_err(|error| {
933 SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
934 })?;
935
936 let bridge = SharedBridge::new(bridge);
937 let mount_plugins = build_mount_plugin_registry::<B>()?;
938 let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
939
940 Ok(Self {
941 config,
942 bridge,
943 mount_plugins,
944 cache_root,
945 javascript_engine: JavascriptExecutionEngine::default(),
946 python_engine: PythonExecutionEngine::default(),
947 wasm_engine: WasmExecutionEngine::default(),
948 next_connection_id: 0,
949 next_session_id: 0,
950 next_vm_id: 0,
951 next_sidecar_request_id: -1,
952 connections: BTreeMap::new(),
953 sessions: BTreeMap::new(),
954 vms: BTreeMap::new(),
955 process_event_sender,
956 process_event_receiver: Some(process_event_receiver),
957 pending_process_events: VecDeque::new(),
958 pending_sidecar_responses: SidecarResponseTracker::default(),
959 outbound_sidecar_requests: VecDeque::new(),
960 completed_sidecar_responses: BTreeMap::new(),
961 completed_sidecar_response_order: VecDeque::new(),
962 completed_sidecar_responses_gauge: register_queue(
963 TrackedLimit::CompletedSidecarResponses,
964 MAX_COMPLETED_SIDECAR_RESPONSES,
965 ),
966 pending_process_events_gauge: register_queue(
967 TrackedLimit::PendingProcessEvents,
968 MAX_PROCESS_EVENT_QUEUE,
969 ),
970 pending_sidecar_responses_gauge: register_queue(
971 TrackedLimit::PendingSidecarResponses,
972 MAX_PENDING_SIDECAR_RESPONSES,
973 ),
974 outbound_sidecar_requests_gauge: register_queue(
975 TrackedLimit::OutboundSidecarRequests,
976 MAX_OUTBOUND_SIDECAR_REQUESTS,
977 ),
978 sidecar_requests: SharedSidecarRequestClient::default(),
979 extensions: BTreeMap::new(),
980 extension_sessions: BTreeMap::new(),
981 extension_process_output_buffers: BTreeMap::new(),
982 })
983 }
984
985 pub fn with_config_and_extensions(
986 bridge: B,
987 config: NativeSidecarConfig,
988 extensions: Vec<Box<dyn Extension>>,
989 ) -> Result<Self, SidecarError> {
990 let mut sidecar = Self::with_config(bridge, config)?;
991 for extension in extensions {
992 sidecar.register_extension(extension)?;
993 }
994 Ok(sidecar)
995 }
996
997 pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
998 self.extension_sessions.retain(|_, resources| {
999 resources.process_ids.remove(process_id);
1000 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
1001 });
1002 }
1003
1004 pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
1005 self.extension_sessions.retain(|_, resources| {
1006 if matches!(
1007 &resources.ownership,
1008 OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
1009 ) {
1010 resources.process_ids.clear();
1011 }
1012 resources.vm_ids.remove(vm_id);
1013 !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
1014 });
1015 }
1016
1017 pub(crate) fn capture_extension_process_output_event(
1018 &mut self,
1019 vm_id: &str,
1020 process_id: &str,
1021 event: &ActiveExecutionEvent,
1022 ) -> bool {
1023 let Some(buffer) = self
1024 .extension_process_output_buffers
1025 .get_mut(&(vm_id.to_string(), process_id.to_string()))
1026 else {
1027 return false;
1028 };
1029 match event {
1030 ActiveExecutionEvent::Stdout(chunk) => {
1031 buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1032 true
1033 }
1034 ActiveExecutionEvent::Stderr(chunk) => {
1035 buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1036 true
1037 }
1038 ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
1039 | ActiveExecutionEvent::PythonVfsRpcRequest(_)
1040 | ActiveExecutionEvent::SignalState { .. }
1041 | ActiveExecutionEvent::Exited(_) => false,
1042 }
1043 }
1044
1045 fn bind_extension_process_resource(
1046 &mut self,
1047 ownership: OwnershipScope,
1048 namespace: String,
1049 ext_session_id: String,
1050 process_id: String,
1051 ) -> Result<(), SidecarError> {
1052 if ext_session_id.is_empty() {
1053 return Err(SidecarError::InvalidState(String::from(
1054 "extension session id must not be empty",
1055 )));
1056 }
1057 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1058 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1059 let process_exists = self
1060 .vms
1061 .get(&vm_id)
1062 .is_some_and(|vm| vm.active_processes.contains_key(&process_id));
1063 if !process_exists {
1064 return Err(SidecarError::InvalidState(format!(
1065 "VM {vm_id} has no active process {process_id}"
1066 )));
1067 }
1068
1069 let key = (namespace, ext_session_id);
1070 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1071 if resources.ownership != ownership {
1072 return Err(SidecarError::InvalidState(String::from(
1073 "extension session ownership did not match existing resources",
1074 )));
1075 }
1076 resources.process_ids.insert(process_id);
1077 } else {
1078 self.extension_sessions.insert(
1079 key,
1080 ExtensionSessionResources {
1081 ownership,
1082 process_ids: BTreeSet::from([process_id]),
1083 vm_ids: BTreeSet::new(),
1084 },
1085 );
1086 }
1087 Ok(())
1088 }
1089
1090 fn bind_extension_vm_resource(
1091 &mut self,
1092 ownership: OwnershipScope,
1093 namespace: String,
1094 ext_session_id: String,
1095 ) -> Result<(), SidecarError> {
1096 if ext_session_id.is_empty() {
1097 return Err(SidecarError::InvalidState(String::from(
1098 "extension session id must not be empty",
1099 )));
1100 }
1101 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1102 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1103
1104 let key = (namespace, ext_session_id);
1105 if let Some(resources) = self.extension_sessions.get_mut(&key) {
1106 if resources.ownership != ownership {
1107 return Err(SidecarError::InvalidState(String::from(
1108 "extension session ownership did not match existing resources",
1109 )));
1110 }
1111 resources.vm_ids.insert(vm_id);
1112 } else {
1113 self.extension_sessions.insert(
1114 key,
1115 ExtensionSessionResources {
1116 ownership,
1117 process_ids: BTreeSet::new(),
1118 vm_ids: BTreeSet::from([vm_id]),
1119 },
1120 );
1121 }
1122 Ok(())
1123 }
1124
1125 pub fn sidecar_id(&self) -> &str {
1126 &self.config.sidecar_id
1127 }
1128
1129 pub fn with_bridge_mut<T>(
1130 &self,
1131 operation: impl FnOnce(&mut B) -> T,
1132 ) -> Result<T, SidecarError> {
1133 self.bridge.inspect(operation)
1134 }
1135
1136 pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
1137 self.sidecar_requests.set_transport(transport);
1138 }
1139
1140 pub fn register_extension(
1141 &mut self,
1142 extension: Box<dyn Extension>,
1143 ) -> Result<(), SidecarError> {
1144 let namespace = extension.namespace().to_owned();
1145 if namespace.is_empty() {
1146 return Err(SidecarError::InvalidState(String::from(
1147 "extension namespace must not be empty",
1148 )));
1149 }
1150 if self.extensions.contains_key(&namespace) {
1151 return Err(SidecarError::Conflict(format!(
1152 "extension namespace {namespace} is already registered",
1153 )));
1154 }
1155 self.extensions.insert(namespace, Arc::from(extension));
1156 Ok(())
1157 }
1158
1159 pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
1160 where
1161 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1162 + Send
1163 + Sync
1164 + 'static,
1165 {
1166 struct HandlerTransport<F>(F);
1167
1168 impl<F> SidecarRequestTransport for HandlerTransport<F>
1169 where
1170 F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1171 + Send
1172 + Sync
1173 + 'static,
1174 {
1175 fn send_request(
1176 &self,
1177 request: SidecarRequestFrame,
1178 _timeout: Duration,
1179 ) -> Result<SidecarResponseFrame, SidecarError> {
1180 let payload = (self.0)(request.clone())?;
1181 Ok(SidecarResponseFrame::new(
1182 request.request_id,
1183 request.ownership,
1184 payload,
1185 ))
1186 }
1187 }
1188
1189 self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
1190 }
1191
1192 pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
1193 where
1194 F: Fn(
1195 crate::wire::SidecarRequestFrame,
1196 ) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
1197 + Send
1198 + Sync
1199 + 'static,
1200 {
1201 self.set_sidecar_request_handler(move |request| {
1202 let request = crate::wire::sidecar_request_frame_from_compat(request)
1203 .map_err(wire_protocol_error)?;
1204 let response = handler(request)?;
1205 let response = crate::wire::sidecar_response_frame_to_compat(response)
1206 .map_err(wire_protocol_error)?;
1207 Ok(response.payload)
1208 });
1209 }
1210
1211 pub(crate) fn queue_pending_process_event(
1212 &mut self,
1213 envelope: ProcessEventEnvelope,
1214 ) -> Result<(), SidecarError> {
1215 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1216 return Err(process_event_queue_overflow_error());
1217 }
1218 self.pending_process_events.push_back(envelope);
1219 self.pending_process_events_gauge
1220 .observe_depth(self.pending_process_events.len());
1221 Ok(())
1222 }
1223
1224 pub(crate) fn queue_front_pending_process_event(
1225 &mut self,
1226 envelope: ProcessEventEnvelope,
1227 ) -> Result<(), SidecarError> {
1228 if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1229 return Err(process_event_queue_overflow_error());
1230 }
1231 self.pending_process_events.push_front(envelope);
1232 self.pending_process_events_gauge
1233 .observe_depth(self.pending_process_events.len());
1234 Ok(())
1235 }
1236
1237 pub(crate) fn pending_process_event_capacity(&self) -> usize {
1238 MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
1239 }
1240
1241 pub fn dispatch_blocking(
1242 &mut self,
1243 request: RequestFrame,
1244 ) -> Result<DispatchResult, SidecarError> {
1245 let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
1246 if matches!(
1247 request.payload,
1248 RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
1249 ) && !inside_runtime
1250 {
1251 return tokio::runtime::Builder::new_current_thread()
1252 .enable_all()
1253 .build()
1254 .expect("sidecar dispatch runtime")
1255 .block_on(self.dispatch(request));
1256 }
1257
1258 let mut future = std::pin::pin!(self.dispatch(request));
1259 match poll_future_once(future.as_mut()) {
1260 Some(result) => result,
1261 None if inside_runtime => Err(SidecarError::InvalidState(String::from(
1262 "dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
1263 ))),
1264 None => tokio::runtime::Builder::new_current_thread()
1265 .enable_all()
1266 .build()
1267 .expect("sidecar dispatch runtime")
1268 .block_on(future),
1269 }
1270 }
1271
1272 pub fn dispatch_wire_blocking(
1273 &mut self,
1274 request: crate::wire::RequestFrame,
1275 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1276 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1277 let result = self.dispatch_blocking(request)?;
1278 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1279 }
1280
1281 pub fn poll_event_blocking(
1282 &mut self,
1283 ownership: &OwnershipScope,
1284 timeout: Duration,
1285 ) -> Result<Option<EventFrame>, SidecarError> {
1286 tokio::runtime::Builder::new_current_thread()
1287 .enable_all()
1288 .build()
1289 .expect("sidecar poll runtime")
1290 .block_on(self.poll_event(ownership, timeout))
1291 }
1292
1293 pub fn poll_event_wire_blocking(
1294 &mut self,
1295 ownership: &crate::wire::OwnershipScope,
1296 timeout: Duration,
1297 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1298 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1299 self.poll_event_blocking(&ownership, timeout)?
1300 .map(crate::wire::event_frame_from_compat)
1301 .transpose()
1302 .map_err(wire_protocol_error)
1303 }
1304
1305 pub fn close_session_blocking(
1306 &mut self,
1307 connection_id: &str,
1308 session_id: &str,
1309 ) -> Result<Vec<EventFrame>, SidecarError> {
1310 tokio::runtime::Builder::new_current_thread()
1311 .enable_all()
1312 .build()
1313 .expect("sidecar close-session runtime")
1314 .block_on(self.close_session(connection_id, session_id))
1315 }
1316
1317 pub fn remove_connection_blocking(
1318 &mut self,
1319 connection_id: &str,
1320 ) -> Result<Vec<EventFrame>, SidecarError> {
1321 tokio::runtime::Builder::new_current_thread()
1322 .enable_all()
1323 .build()
1324 .expect("sidecar remove-connection runtime")
1325 .block_on(self.remove_connection(connection_id))
1326 }
1327
1328 pub fn dispose_vm_internal_blocking(
1329 &mut self,
1330 connection_id: &str,
1331 session_id: &str,
1332 vm_id: &str,
1333 reason: DisposeReason,
1334 ) -> Result<Vec<EventFrame>, SidecarError> {
1335 tokio::runtime::Builder::new_current_thread()
1336 .enable_all()
1337 .build()
1338 .expect("sidecar dispose-vm runtime")
1339 .block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
1340 }
1341
1342 pub async fn dispatch(
1343 &mut self,
1344 request: RequestFrame,
1345 ) -> Result<DispatchResult, SidecarError> {
1346 if let Err(error) = self.ensure_request_within_frame_limit(&request) {
1347 return Ok(DispatchResult {
1348 response: self.reject(&request, error_code(&error), &error.to_string()),
1349 events: Vec::new(),
1350 });
1351 }
1352
1353 let result = match request.payload.clone() {
1354 RequestPayload::Authenticate(payload) => {
1355 self.authenticate_connection(&request, payload).await
1356 }
1357 RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
1358 RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
1359 RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
1360 RequestPayload::BootstrapRootFilesystem(payload) => {
1361 self.bootstrap_root_filesystem(&request, payload.entries)
1362 .await
1363 }
1364 RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
1365 RequestPayload::RegisterHostCallbacks(payload) => {
1366 register_host_callbacks(self, &request, payload)
1367 }
1368 RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
1369 RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
1370 RequestPayload::ImportSnapshot(payload) => {
1371 self.import_snapshot(&request, payload).await
1372 }
1373 RequestPayload::ExportSnapshot(payload) => {
1374 self.export_snapshot(&request, payload).await
1375 }
1376 RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
1377 RequestPayload::GuestFilesystemCall(payload) => {
1378 self.guest_filesystem_call(&request, payload).await
1379 }
1380 RequestPayload::SnapshotRootFilesystem(payload) => {
1381 self.snapshot_root_filesystem(&request, payload).await
1382 }
1383 RequestPayload::Execute(payload) => self.execute(&request, payload).await,
1384 RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
1385 RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
1386 RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
1387 RequestPayload::GetProcessSnapshot(payload) => {
1388 self.get_process_snapshot(&request, payload).await
1389 }
1390 RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
1391 RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
1392 RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
1393 RequestPayload::GetSignalState(payload) => {
1394 self.get_signal_state(&request, payload).await
1395 }
1396 RequestPayload::GetZombieTimerCount(payload) => {
1397 self.get_zombie_timer_count(&request, payload).await
1398 }
1399 RequestPayload::HostFilesystemCall(_)
1400 | RequestPayload::PersistenceLoad(_)
1401 | RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
1402 response: self.reject(
1403 &request,
1404 "unsupported_direction",
1405 "host callback request categories are sidecar-to-host only in this scaffold",
1406 ),
1407 events: Vec::new(),
1408 }),
1409 RequestPayload::Ext(payload) => {
1410 self.dispatch_extension_request(&request, payload).await
1411 }
1412 };
1413
1414 match result {
1415 Ok(dispatch) => Ok(dispatch),
1416 Err(error @ SidecarError::Io(_)) => Err(error),
1417 Err(error) => Ok(DispatchResult {
1418 response: self.reject(&request, error_code(&error), &error.to_string()),
1419 events: Vec::new(),
1420 }),
1421 }
1422 }
1423
1424 pub async fn dispatch_wire(
1425 &mut self,
1426 request: crate::wire::RequestFrame,
1427 ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1428 let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1429 let result = self.dispatch(request).await?;
1430 crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1431 }
1432
1433 pub async fn poll_event_wire(
1434 &mut self,
1435 ownership: &crate::wire::OwnershipScope,
1436 timeout: Duration,
1437 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1438 let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1439 self.poll_event(&ownership, timeout)
1440 .await?
1441 .map(crate::wire::event_frame_from_compat)
1442 .transpose()
1443 .map_err(wire_protocol_error)
1444 }
1445
1446 async fn dispatch_extension_request(
1447 &mut self,
1448 request: &RequestFrame,
1449 envelope: ExtEnvelope,
1450 ) -> Result<DispatchResult, SidecarError> {
1451 let namespace = envelope.namespace;
1452 let Some(extension) = self.extensions.get(&namespace).cloned() else {
1453 return Ok(DispatchResult {
1454 response: self.reject(
1455 request,
1456 "unknown_extension",
1457 &format!("no extension registered for namespace {namespace}"),
1458 ),
1459 events: Vec::new(),
1460 });
1461 };
1462 let snapshot = ExtensionSnapshot::new(
1463 namespace.clone(),
1464 request.ownership.clone(),
1465 self.sidecar_requests.clone(),
1466 );
1467 let ctx = ExtensionContext::new(snapshot, self);
1468 let response = extension.handle_request(ctx, envelope.payload).await?;
1469 Ok(DispatchResult {
1470 response: self.respond(
1471 request,
1472 ResponsePayload::ExtResult(ExtEnvelope {
1473 namespace,
1474 payload: response.payload,
1475 }),
1476 ),
1477 events: response.events,
1478 })
1479 }
1480
1481 pub async fn poll_event(
1482 &mut self,
1483 ownership: &OwnershipScope,
1484 timeout: Duration,
1485 ) -> Result<Option<EventFrame>, SidecarError> {
1486 let deadline = Instant::now() + timeout;
1487 loop {
1488 if let Some(index) = self
1489 .pending_process_events
1490 .iter()
1491 .position(|event| public_process_event_matches_ownership(self, ownership, event))
1492 {
1493 let Some(envelope) = self.pending_process_events.remove(index) else {
1494 continue;
1495 };
1496 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1497 return Ok(Some(frame));
1498 }
1499 continue;
1500 }
1501
1502 if !timeout.is_zero() {
1503 let _ = self.pump_process_events(ownership).await?;
1504 }
1505
1506 let queued_envelopes = {
1507 let pending_capacity = self.pending_process_event_capacity();
1508 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
1509 SidecarError::InvalidState(String::from("process event receiver unavailable"))
1510 })?;
1511 let mut queued = Vec::new();
1512 loop {
1513 if queued.len() >= pending_capacity {
1514 if receiver.is_empty() {
1515 break;
1516 }
1517 return Err(process_event_queue_overflow_error());
1518 }
1519 match receiver.try_recv() {
1520 Ok(envelope) => queued.push(envelope),
1521 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1522 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
1523 }
1524 }
1525 queued
1526 };
1527
1528 let mut matching_envelope = None;
1529 for envelope in queued_envelopes {
1530 if matching_envelope.is_none()
1531 && public_process_event_matches_ownership(self, ownership, &envelope)
1532 {
1533 matching_envelope = Some(envelope);
1534 } else {
1535 self.queue_pending_process_event(envelope)?;
1536 }
1537 }
1538
1539 if let Some(envelope) = matching_envelope {
1540 if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1541 return Ok(Some(frame));
1542 }
1543 continue;
1544 }
1545
1546 if Instant::now() >= deadline {
1547 return Ok(None);
1548 }
1549
1550 let remaining = deadline.saturating_duration_since(Instant::now());
1551 time::sleep(remaining.min(Duration::from_millis(10))).await;
1552 }
1553 }
1554
1555 pub(crate) fn handle_process_event_envelope(
1556 &mut self,
1557 envelope: ProcessEventEnvelope,
1558 ) -> Result<Option<EventFrame>, SidecarError> {
1559 let ProcessEventEnvelope {
1560 connection_id,
1561 session_id,
1562 vm_id,
1563 process_id,
1564 event,
1565 } = envelope;
1566
1567 if matches!(event, ActiveExecutionEvent::Exited(_)) {
1568 let mut trailing = Vec::new();
1569 let mut deferred = VecDeque::new();
1570 while let Some(pending) = self.pending_process_events.pop_front() {
1571 if pending.vm_id == vm_id
1572 && pending.process_id == process_id
1573 && !matches!(pending.event, ActiveExecutionEvent::Exited(_))
1574 {
1575 trailing.push(pending.event);
1576 } else {
1577 deferred.push_back(pending);
1578 }
1579 }
1580 self.pending_process_events = deferred;
1581 let drain_limit = self
1582 .pending_process_event_capacity()
1583 .saturating_sub(trailing.len().saturating_add(1));
1584 trailing.extend(
1585 self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
1586 .into_iter()
1587 .filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
1588 );
1589
1590 if !trailing.is_empty() {
1591 if self.pending_process_event_capacity() < trailing.len() {
1592 return Err(process_event_queue_overflow_error());
1593 }
1594 let emit_now = if self.pending_process_event_capacity() == trailing.len() {
1595 Some(trailing.remove(0))
1596 } else {
1597 None
1598 };
1599 self.queue_front_pending_process_event(ProcessEventEnvelope {
1600 connection_id: connection_id.clone(),
1601 session_id: session_id.clone(),
1602 vm_id: vm_id.clone(),
1603 process_id: process_id.clone(),
1604 event,
1605 })?;
1606 for event in trailing.into_iter().rev() {
1607 self.queue_front_pending_process_event(ProcessEventEnvelope {
1608 connection_id: connection_id.clone(),
1609 session_id: session_id.clone(),
1610 vm_id: vm_id.clone(),
1611 process_id: process_id.clone(),
1612 event,
1613 })?;
1614 }
1615 if let Some(event) = emit_now {
1616 return self.handle_execution_event(&vm_id, &process_id, event);
1617 }
1618 return Ok(None);
1619 }
1620 }
1621
1622 self.handle_execution_event(&vm_id, &process_id, event)
1623 }
1624
1625 pub async fn close_session(
1628 &mut self,
1629 connection_id: &str,
1630 session_id: &str,
1631 ) -> Result<Vec<EventFrame>, SidecarError> {
1632 self.dispose_session(connection_id, session_id, DisposeReason::Requested)
1633 .await
1634 }
1635
1636 pub async fn remove_connection(
1637 &mut self,
1638 connection_id: &str,
1639 ) -> Result<Vec<EventFrame>, SidecarError> {
1640 self.require_authenticated_connection(connection_id)?;
1641
1642 let session_ids = self
1643 .connections
1644 .get(connection_id)
1645 .expect("authenticated connection should exist")
1646 .sessions
1647 .iter()
1648 .cloned()
1649 .collect::<Vec<_>>();
1650
1651 let mut events = Vec::new();
1652 for session_id in session_ids {
1653 events.extend(
1654 self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
1655 .await?,
1656 );
1657 }
1658
1659 self.connections.remove(connection_id);
1660 Ok(events)
1661 }
1662
1663 async fn authenticate_connection(
1664 &mut self,
1665 request: &RequestFrame,
1666 payload: crate::protocol::AuthenticateRequest,
1667 ) -> Result<DispatchResult, SidecarError> {
1668 let _ = self.connection_id_for(&request.ownership)?;
1669 if let Err(error) = self.validate_auth_token(&payload.auth_token) {
1670 let mut fields = audit_fields([
1671 (String::from("source"), payload.client_name.clone()),
1672 (String::from("reason"), error.to_string()),
1673 ]);
1674 if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
1675 fields.insert(String::from("connection_id"), inner.connection_id.clone());
1676 }
1677 emit_security_audit_event(
1678 &self.bridge,
1679 &self.config.sidecar_id,
1680 "security.auth.failed",
1681 fields,
1682 );
1683 return Err(error);
1684 }
1685
1686 if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
1687 return Err(SidecarError::ProtocolVersionMismatch(format!(
1688 "sidecar protocol version mismatch: expected {}, got {}",
1689 crate::wire::PROTOCOL_VERSION,
1690 payload.protocol_version
1691 )));
1692 }
1693
1694 let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
1695 if payload.bridge_version != expected_bridge_version {
1696 return Err(SidecarError::BridgeVersionMismatch(format!(
1697 "bridge contract version mismatch: expected {expected_bridge_version}, got {}",
1698 payload.bridge_version
1699 )));
1700 }
1701
1702 let connection_id = self.allocate_connection_id();
1703 self.connections.insert(
1704 connection_id.clone(),
1705 ConnectionState {
1706 auth_token: payload.auth_token,
1707 sessions: BTreeSet::new(),
1708 },
1709 );
1710
1711 let response = self.response_with_ownership(
1712 request.request_id,
1713 OwnershipScope::connection(&connection_id),
1714 ResponsePayload::Authenticated(AuthenticatedResponse {
1715 sidecar_id: self.config.sidecar_id.clone(),
1716 connection_id,
1717 max_frame_bytes: self.config.max_frame_bytes as u32,
1718 }),
1719 );
1720 Ok(DispatchResult {
1721 response,
1722 events: Vec::new(),
1723 })
1724 }
1725
1726 async fn open_session(
1727 &mut self,
1728 request: &RequestFrame,
1729 payload: OpenSessionRequest,
1730 ) -> Result<DispatchResult, SidecarError> {
1731 let connection_id = self.connection_id_for(&request.ownership)?;
1732 self.require_authenticated_connection(&connection_id)?;
1733
1734 self.next_session_id += 1;
1735 let session_id = format!("session-{}", self.next_session_id);
1736 self.sessions.insert(
1737 session_id.clone(),
1738 SessionState {
1739 connection_id: connection_id.clone(),
1740 placement: payload.placement,
1741 metadata: payload.metadata.into_iter().collect(),
1742 vm_ids: BTreeSet::new(),
1743 },
1744 );
1745 self.connections
1746 .get_mut(&connection_id)
1747 .expect("authenticated connection should exist")
1748 .sessions
1749 .insert(session_id.clone());
1750
1751 Ok(DispatchResult {
1752 response: self.respond(
1753 request,
1754 ResponsePayload::SessionOpened(SessionOpenedResponse {
1755 session_id,
1756 owner_connection_id: connection_id,
1757 }),
1758 ),
1759 events: Vec::new(),
1760 })
1761 }
1762
1763 async fn guest_filesystem_call(
1766 &mut self,
1767 request: &RequestFrame,
1768 payload: GuestFilesystemCallRequest,
1769 ) -> Result<DispatchResult, SidecarError> {
1770 filesystem_guest_filesystem_call(self, request, payload).await
1771 }
1772
1773 async fn dispose_session(
1779 &mut self,
1780 connection_id: &str,
1781 session_id: &str,
1782 reason: DisposeReason,
1783 ) -> Result<Vec<EventFrame>, SidecarError> {
1784 self.require_owned_session(connection_id, session_id)?;
1785
1786 let vm_ids = self
1787 .sessions
1788 .get(session_id)
1789 .expect("owned session should exist")
1790 .vm_ids
1791 .iter()
1792 .cloned()
1793 .collect::<Vec<_>>();
1794
1795 let mut events = Vec::new();
1796 for vm_id in vm_ids {
1797 events.extend(
1798 self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
1799 .await?,
1800 );
1801 }
1802
1803 self.sessions.remove(session_id);
1804 if let Some(connection) = self.connections.get_mut(connection_id) {
1805 connection.sessions.remove(session_id);
1806 }
1807 Ok(events)
1808 }
1809
1810 pub(crate) fn handle_javascript_sync_rpc_request(
1818 &mut self,
1819 vm_id: &str,
1820 process_id: &str,
1821 request: JavascriptSyncRpcRequest,
1822 ) -> Result<(), SidecarError> {
1823 let Some(vm) = self.vms.get(vm_id) else {
1824 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1825 return Ok(());
1826 };
1827 if !vm.active_processes.contains_key(process_id) {
1828 log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1829 return Ok(());
1830 }
1831
1832 let response: Result<Value, SidecarError> = match request.method.as_str() {
1833 "child_process.spawn" => {
1834 let Some(vm) = self.vms.get(vm_id) else {
1835 log_stale_process_event(
1836 &self.bridge,
1837 vm_id,
1838 process_id,
1839 "javascript sync RPC child_process.spawn",
1840 );
1841 return Ok(());
1842 };
1843 let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
1844 self.spawn_javascript_child_process(vm_id, process_id, payload)
1845 }
1846 "child_process.spawn_sync" => {
1847 let Some(vm) = self.vms.get(vm_id) else {
1848 log_stale_process_event(
1849 &self.bridge,
1850 vm_id,
1851 process_id,
1852 "javascript sync RPC child_process.spawn_sync",
1853 );
1854 return Ok(());
1855 };
1856 let (payload, max_buffer) =
1857 parse_javascript_child_process_spawn_request(vm, &request.args)?;
1858 self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
1859 }
1860 "child_process.poll" => {
1861 let child_process_id =
1862 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
1863 let wait_ms = javascript_sync_rpc_arg_u64_optional(
1864 &request.args,
1865 1,
1866 "child_process.poll wait ms",
1867 )?
1868 .unwrap_or_default();
1869 self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
1870 }
1871 "child_process.write_stdin" => {
1872 let child_process_id = javascript_sync_rpc_arg_str(
1873 &request.args,
1874 0,
1875 "child_process.write_stdin child id",
1876 )?;
1877 let chunk = javascript_sync_rpc_bytes_arg(
1878 &request.args,
1879 1,
1880 "child_process.write_stdin chunk",
1881 )?;
1882 self.write_javascript_child_process_stdin(
1883 vm_id,
1884 process_id,
1885 child_process_id,
1886 &chunk,
1887 )?;
1888 Ok(Value::Null)
1889 }
1890 "child_process.close_stdin" => {
1891 let child_process_id = javascript_sync_rpc_arg_str(
1892 &request.args,
1893 0,
1894 "child_process.close_stdin child id",
1895 )?;
1896 self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
1897 Ok(Value::Null)
1898 }
1899 "child_process.kill" => {
1900 let child_process_id =
1901 javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
1902 let signal =
1903 javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
1904 self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
1905 Ok(Value::Null)
1906 }
1907 "process.kill" => {
1908 let target_pid =
1909 javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
1910 let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
1911 let parsed_signal = parse_signal(signal)?;
1912 if parsed_signal == 0 {
1913 let Some(vm) = self.vms.get(vm_id) else {
1914 log_stale_process_event(
1915 &self.bridge,
1916 vm_id,
1917 process_id,
1918 "javascript sync RPC process.kill",
1919 );
1920 return Ok(());
1921 };
1922 if !vm.active_processes.contains_key(process_id) {
1923 log_stale_process_event(
1924 &self.bridge,
1925 vm_id,
1926 process_id,
1927 "javascript sync RPC process.kill",
1928 );
1929 return Ok(());
1930 }
1931 vm.kernel
1932 .signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
1933 .map(|()| Value::Null)
1934 .map_err(kernel_error)
1935 } else if target_pid < 0 {
1936 let caller_kernel_pid = {
1937 let Some(vm) = self.vms.get(vm_id) else {
1938 log_stale_process_event(
1939 &self.bridge,
1940 vm_id,
1941 process_id,
1942 "javascript sync RPC process.kill",
1943 );
1944 return Ok(());
1945 };
1946 let Some(caller) = vm.active_processes.get(process_id) else {
1947 log_stale_process_event(
1948 &self.bridge,
1949 vm_id,
1950 process_id,
1951 "javascript sync RPC process.kill",
1952 );
1953 return Ok(());
1954 };
1955 caller.kernel_pid
1956 };
1957 let pgid = target_pid.unsigned_abs();
1958 match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
1959 Ok(true) => {
1960 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1961 }
1962 Ok(false) => Ok(Value::Null),
1963 Err(error) => Err(error),
1964 }
1965 } else {
1966 enum ProcessKillTarget {
1967 SelfProcess,
1968 Child(String),
1969 TopLevel(String),
1970 KernelPid(u32),
1971 }
1972 let target = {
1973 let Some(vm) = self.vms.get(vm_id) else {
1974 log_stale_process_event(
1975 &self.bridge,
1976 vm_id,
1977 process_id,
1978 "javascript sync RPC process.kill",
1979 );
1980 return Ok(());
1981 };
1982 let Some(caller) = vm.active_processes.get(process_id) else {
1983 log_stale_process_event(
1984 &self.bridge,
1985 vm_id,
1986 process_id,
1987 "javascript sync RPC process.kill",
1988 );
1989 return Ok(());
1990 };
1991 let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
1992 SidecarError::InvalidState("caller pid exceeds i32".into())
1993 })?;
1994 if caller_pid == target_pid {
1995 ProcessKillTarget::SelfProcess
1996 } else if let Some((child_process_id, _)) = caller
1997 .child_processes
1998 .iter()
1999 .find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
2000 {
2001 ProcessKillTarget::Child(child_process_id.clone())
2002 } else if let Some((target_process_id, _)) =
2003 vm.active_processes.iter().find(|(_, process)| {
2004 i32::try_from(process.kernel_pid) == Ok(target_pid)
2005 })
2006 {
2007 ProcessKillTarget::TopLevel(target_process_id.clone())
2008 } else {
2009 let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
2010 SidecarError::InvalidState(format!(
2011 "EINVAL: invalid process pid {target_pid}"
2012 ))
2013 })?;
2014 ProcessKillTarget::KernelPid(target_kernel_pid)
2015 }
2016 };
2017 match target {
2018 ProcessKillTarget::SelfProcess => {
2019 Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
2020 }
2021 ProcessKillTarget::Child(child_process_id) => {
2022 self.kill_javascript_child_process(
2023 vm_id,
2024 process_id,
2025 &child_process_id,
2026 signal,
2027 )?;
2028 Ok(Value::Null)
2029 }
2030 ProcessKillTarget::TopLevel(target_process_id) => {
2031 self.kill_process_internal(vm_id, &target_process_id, signal)?;
2032 Ok(Value::Null)
2033 }
2034 ProcessKillTarget::KernelPid(target_kernel_pid) => {
2035 self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
2039 .map(|()| Value::Null)
2040 }
2041 }
2042 }
2043 }
2044 "process.signal_state" => {
2045 let signal =
2046 javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
2047 let action =
2048 javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
2049 let mask_json =
2050 javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
2051 let flags =
2052 javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
2053 let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
2054 SidecarError::InvalidState(format!(
2055 "process.signal_state mask must be valid JSON: {error}"
2056 ))
2057 })?;
2058 let action = match action.trim().to_ascii_lowercase().as_str() {
2059 "default" => SignalDispositionAction::Default,
2060 "ignore" => SignalDispositionAction::Ignore,
2061 "user" => SignalDispositionAction::User,
2062 other => {
2063 return Err(SidecarError::InvalidState(format!(
2064 "unsupported process.signal_state action {other}"
2065 )));
2066 }
2067 };
2068 let Some(vm) = self.vms.get_mut(vm_id) else {
2069 log_stale_process_event(
2070 &self.bridge,
2071 vm_id,
2072 process_id,
2073 "javascript sync RPC process.signal_state",
2074 );
2075 return Ok(());
2076 };
2077 if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
2078 let remove_process_entry = vm
2079 .signal_states
2080 .get_mut(process_id)
2081 .map(|handlers| {
2082 handlers.remove(&signal);
2083 handlers.is_empty()
2084 })
2085 .unwrap_or(false);
2086 if remove_process_entry {
2087 vm.signal_states.remove(process_id);
2088 }
2089 } else {
2090 vm.signal_states
2091 .entry(process_id.to_owned())
2092 .or_default()
2093 .insert(
2094 signal,
2095 SignalHandlerRegistration {
2096 action,
2097 mask,
2098 flags,
2099 },
2100 );
2101 }
2102 Ok(Value::Null)
2103 }
2104 "net.http_request" => {
2105 let payload = request
2106 .args
2107 .first()
2108 .cloned()
2109 .ok_or_else(|| {
2110 SidecarError::InvalidState(String::from(
2111 "net.http_request requires a request payload",
2112 ))
2113 })
2114 .and_then(|value| {
2115 serde_json::from_value::<JavascriptHttpLoopbackRequest>(value).map_err(
2116 |error| {
2117 SidecarError::InvalidState(format!(
2118 "invalid net.http_request payload: {error}"
2119 ))
2120 },
2121 )
2122 })?;
2123 if !is_javascript_loopback_host(&payload.host) {
2124 return Err(SidecarError::Execution(format!(
2125 "EACCES: HTTP loopback request requires a loopback host, got {}",
2126 payload.host
2127 )));
2128 }
2129 self.bridge.require_network_access(
2130 vm_id,
2131 NetworkOperation::Http,
2132 format_tcp_resource(&payload.host, payload.port),
2133 )?;
2134 let Some(vm) = self.vms.get_mut(vm_id) else {
2135 log_stale_process_event(
2136 &self.bridge,
2137 vm_id,
2138 process_id,
2139 "javascript sync RPC net.http_request",
2140 );
2141 return Ok(());
2142 };
2143 let resource_limits = vm.kernel.resource_limits().clone();
2144 let socket_paths = build_javascript_socket_path_context(vm)?;
2145 let target_is_current =
2146 [JavascriptSocketFamily::Ipv4, JavascriptSocketFamily::Ipv6]
2147 .iter()
2148 .any(|family| {
2149 socket_paths
2150 .http_loopback_target(*family, payload.port)
2151 .is_some_and(|target| {
2152 target.process_id == payload.process_id
2153 && target.server_id == payload.server_id
2154 })
2155 });
2156 if !target_is_current {
2157 return Err(SidecarError::InvalidState(format!(
2158 "unknown HTTP loopback target {}:{} for server {} in process {}",
2159 payload.host, payload.port, payload.server_id, payload.process_id
2160 )));
2161 }
2162 let Some(target_process) = vm.active_processes.get_mut(&payload.process_id) else {
2163 return Err(SidecarError::InvalidState(format!(
2164 "unknown HTTP loopback process {}",
2165 payload.process_id
2166 )));
2167 };
2168 dispatch_loopback_http_request(LoopbackHttpDispatchRequest {
2169 bridge: &self.bridge,
2170 vm_id,
2171 dns: &vm.dns,
2172 socket_paths: &socket_paths,
2173 kernel: &mut vm.kernel,
2174 process: target_process,
2175 resource_limits: &resource_limits,
2176 server_id: payload.server_id,
2177 request_json: &payload.request,
2178 })
2179 .map(Value::String)
2180 }
2181 _ => {
2182 let Some(vm) = self.vms.get_mut(vm_id) else {
2183 log_stale_process_event(
2184 &self.bridge,
2185 vm_id,
2186 process_id,
2187 "javascript sync RPC bridge dispatch",
2188 );
2189 return Ok(());
2190 };
2191 let resource_limits = vm.kernel.resource_limits().clone();
2192 let network_counts = vm_network_resource_counts(vm);
2193 let socket_paths = build_javascript_socket_path_context(vm)?;
2194 let Some(process) = vm.active_processes.get_mut(process_id) else {
2195 log_stale_process_event(
2196 &self.bridge,
2197 vm_id,
2198 process_id,
2199 "javascript sync RPC bridge dispatch",
2200 );
2201 return Ok(());
2202 };
2203 service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
2204 bridge: &self.bridge,
2205 vm_id,
2206 dns: &vm.dns,
2207 socket_paths: &socket_paths,
2208 kernel: &mut vm.kernel,
2209 process,
2210 sync_request: &request,
2211 resource_limits: &resource_limits,
2212 network_counts,
2213 })
2214 }
2215 };
2216
2217 let Some(vm) = self.vms.get_mut(vm_id) else {
2218 log_stale_process_event(
2219 &self.bridge,
2220 vm_id,
2221 process_id,
2222 "javascript sync RPC response delivery",
2223 );
2224 return Ok(());
2225 };
2226 let shadow_root = vm.cwd.clone();
2227 let Some(process) = vm.active_processes.get_mut(process_id) else {
2228 log_stale_process_event(
2229 &self.bridge,
2230 vm_id,
2231 process_id,
2232 "javascript sync RPC response delivery",
2233 );
2234 return Ok(());
2235 };
2236
2237 if response.is_ok()
2238 && matches!(
2239 request.method.as_str(),
2240 "fs.chmodSync" | "fs.promises.chmod"
2241 )
2242 {
2243 let guest_path =
2244 javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
2245 let mode =
2246 javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
2247 let host_path =
2248 shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
2249 if host_path.exists() {
2250 fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
2251 |error| {
2252 SidecarError::Io(format!(
2253 "failed to mirror chmod to shadow path {}: {error}",
2254 host_path.display()
2255 ))
2256 },
2257 )?;
2258 }
2259 }
2260
2261 match response {
2262 Ok(result) => process
2263 .execution
2264 .respond_javascript_sync_rpc_success(request.id, result)
2265 .or_else(ignore_stale_javascript_sync_rpc_response),
2266 Err(error) => process
2267 .execution
2268 .respond_javascript_sync_rpc_error(
2269 request.id,
2270 javascript_sync_rpc_error_code(&error),
2271 error.to_string(),
2272 )
2273 .or_else(ignore_stale_javascript_sync_rpc_response),
2274 }
2275 }
2276
2277 fn apply_self_process_kill(
2280 &mut self,
2281 vm_id: &str,
2282 process_id: &str,
2283 parsed_signal: i32,
2284 ) -> Value {
2285 let action = self
2286 .vms
2287 .get(vm_id)
2288 .and_then(|vm| vm.signal_states.get(process_id))
2289 .and_then(|handlers| handlers.get(&(parsed_signal as u32)))
2290 .map(|registration| registration.action.clone())
2291 .unwrap_or(SignalDispositionAction::Default);
2292 if action == SignalDispositionAction::Default
2293 && parsed_signal != 0
2294 && !matches!(
2295 canonical_signal_name(parsed_signal),
2296 Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
2297 )
2298 {
2299 if let Some(vm) = self.vms.get_mut(vm_id) {
2300 if let Some(process) = vm.active_processes.get_mut(process_id) {
2301 process.pending_self_signal_exit = Some(parsed_signal);
2302 }
2303 }
2304 }
2305 json!({
2306 "self": true,
2307 "action": match action {
2308 SignalDispositionAction::Default => "default",
2309 SignalDispositionAction::Ignore => "ignore",
2310 SignalDispositionAction::User => "user",
2311 },
2312 })
2313 }
2314
2315 pub(crate) fn vm_ids_for_scope(
2316 &self,
2317 ownership: &OwnershipScope,
2318 ) -> Result<Vec<String>, SidecarError> {
2319 match ownership {
2320 OwnershipScope::SessionOwnership(inner) => {
2321 self.require_owned_session(&inner.connection_id, &inner.session_id)?;
2322 Ok(self
2323 .sessions
2324 .get(&inner.session_id)
2325 .expect("owned session should exist")
2326 .vm_ids
2327 .iter()
2328 .cloned()
2329 .collect())
2330 }
2331 OwnershipScope::VmOwnership(inner) => {
2332 self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
2333 Ok(vec![inner.vm_id.clone()])
2334 }
2335 OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
2336 String::from("event polling requires session or VM ownership scope"),
2337 )),
2338 }
2339 }
2340
2341 pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
2342 let vm = self
2343 .vms
2344 .get(vm_id)
2345 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2346 Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
2347 }
2348
2349 pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
2350 self.vms
2351 .get(vm_id)
2352 .is_some_and(|vm| !vm.active_processes.is_empty())
2353 }
2354
2355 fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
2356 if self.connections.contains_key(connection_id) {
2357 Ok(())
2358 } else {
2359 Err(SidecarError::InvalidState(format!(
2360 "connection {connection_id} has not authenticated"
2361 )))
2362 }
2363 }
2364
2365 pub(crate) fn require_owned_session(
2366 &self,
2367 connection_id: &str,
2368 session_id: &str,
2369 ) -> Result<(), SidecarError> {
2370 self.require_authenticated_connection(connection_id)?;
2371 let session = self.sessions.get(session_id).ok_or_else(|| {
2372 SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
2373 })?;
2374 if session.connection_id == connection_id {
2375 Ok(())
2376 } else {
2377 Err(SidecarError::InvalidState(format!(
2378 "session {session_id} is not owned by connection {connection_id}"
2379 )))
2380 }
2381 }
2382
2383 pub(crate) fn require_owned_vm(
2384 &self,
2385 connection_id: &str,
2386 session_id: &str,
2387 vm_id: &str,
2388 ) -> Result<(), SidecarError> {
2389 self.require_owned_session(connection_id, session_id)?;
2390 let vm = self
2391 .vms
2392 .get(vm_id)
2393 .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2394 if vm.connection_id != connection_id || vm.session_id != session_id {
2395 return Err(SidecarError::InvalidState(format!(
2396 "VM {vm_id} is not owned by {connection_id}/{session_id}"
2397 )));
2398 }
2399 Ok(())
2400 }
2401
2402 fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
2403 match ownership {
2404 OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
2405 OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2406 Err(SidecarError::InvalidState(String::from(
2407 "request requires connection ownership scope",
2408 )))
2409 }
2410 }
2411 }
2412
2413 fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
2414 let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
2415 return Ok(());
2416 };
2417
2418 if auth_token == expected_auth_token {
2419 Ok(())
2420 } else {
2421 Err(SidecarError::Unauthorized(String::from(
2422 "authenticate request provided an invalid auth token",
2423 )))
2424 }
2425 }
2426
2427 fn allocate_connection_id(&mut self) -> String {
2428 self.next_connection_id += 1;
2429 format!("conn-{}", self.next_connection_id)
2430 }
2431
2432 fn take_matching_process_event_envelope(
2433 &mut self,
2434 vm_id: &str,
2435 process_id: &str,
2436 ) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
2437 if let Some(index) = self
2438 .pending_process_events
2439 .iter()
2440 .position(|event| event.vm_id == vm_id && event.process_id == process_id)
2441 {
2442 return Ok(self.pending_process_events.remove(index));
2443 }
2444
2445 let mut matching_envelope = None;
2446 let mut deferred = Vec::new();
2447 {
2448 let pending_capacity = self.pending_process_event_capacity();
2449 let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
2450 SidecarError::InvalidState(String::from("process event receiver unavailable"))
2451 })?;
2452 loop {
2453 if deferred.len() >= pending_capacity {
2454 if receiver.is_empty() {
2455 break;
2456 }
2457 return Err(process_event_queue_overflow_error());
2458 }
2459 let envelope = match receiver.try_recv() {
2460 Ok(envelope) => envelope,
2461 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
2462 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
2463 };
2464 if matching_envelope.is_none()
2465 && envelope.vm_id == vm_id
2466 && envelope.process_id == process_id
2467 {
2468 matching_envelope = Some(envelope);
2469 break;
2470 }
2471 deferred.push(envelope);
2472 }
2473 }
2474 for envelope in deferred {
2475 self.queue_pending_process_event(envelope)?;
2476 }
2477
2478 Ok(matching_envelope)
2479 }
2480
2481 fn allocate_sidecar_request_id(&mut self) -> RequestId {
2482 let request_id = self.next_sidecar_request_id;
2483 self.next_sidecar_request_id -= 1;
2484 request_id
2485 }
2486
2487 pub(crate) fn session_scope_for(
2488 &self,
2489 ownership: &OwnershipScope,
2490 ) -> Result<(String, String), SidecarError> {
2491 match ownership {
2492 OwnershipScope::SessionOwnership(inner) => {
2493 Ok((inner.connection_id.clone(), inner.session_id.clone()))
2494 }
2495 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2496 Err(SidecarError::InvalidState(String::from(
2497 "request requires session ownership scope",
2498 )))
2499 }
2500 }
2501 }
2502
2503 pub(crate) fn vm_scope_for(
2504 &self,
2505 ownership: &OwnershipScope,
2506 ) -> Result<(String, String, String), SidecarError> {
2507 match ownership {
2508 OwnershipScope::VmOwnership(inner) => Ok((
2509 inner.connection_id.clone(),
2510 inner.session_id.clone(),
2511 inner.vm_id.clone(),
2512 )),
2513 OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
2514 SidecarError::InvalidState(String::from("request requires VM ownership scope")),
2515 ),
2516 }
2517 }
2518
2519 fn response_with_ownership(
2520 &self,
2521 request_id: RequestId,
2522 ownership: OwnershipScope,
2523 payload: ResponsePayload,
2524 ) -> ResponseFrame {
2525 ResponseFrame {
2526 schema: ProtocolSchema::current(),
2527 request_id,
2528 ownership,
2529 payload,
2530 }
2531 }
2532
2533 pub(crate) fn respond(
2534 &self,
2535 request: &RequestFrame,
2536 payload: ResponsePayload,
2537 ) -> ResponseFrame {
2538 self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
2539 }
2540
2541 fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
2542 self.respond(
2543 request,
2544 ResponsePayload::Rejected(RejectedResponse {
2545 code: code.to_owned(),
2546 message: message.to_owned(),
2547 }),
2548 )
2549 }
2550
2551 pub fn queue_sidecar_request(
2552 &mut self,
2553 ownership: OwnershipScope,
2554 payload: SidecarRequestPayload,
2555 ) -> Result<RequestId, SidecarError> {
2556 if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
2557 return Err(outbound_sidecar_request_queue_overflow_error());
2558 }
2559 if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
2560 return Err(sidecar_response_pending_overflow_error());
2561 }
2562 let request_id = self.allocate_sidecar_request_id();
2563 let request = SidecarRequestFrame::new(request_id, ownership, payload);
2564 self.pending_sidecar_responses
2565 .register_request(&request)
2566 .map_err(sidecar_response_tracker_error)?;
2567 self.outbound_sidecar_requests.push_back(request);
2568 self.outbound_sidecar_requests_gauge
2569 .observe_depth(self.outbound_sidecar_requests.len());
2570 self.pending_sidecar_responses_gauge
2571 .observe_depth(self.pending_sidecar_responses.pending_count());
2572 Ok(request_id)
2573 }
2574
2575 pub fn queue_wire_sidecar_request(
2576 &mut self,
2577 ownership: crate::wire::OwnershipScope,
2578 payload: crate::wire::SidecarRequestPayload,
2579 ) -> Result<crate::wire::RequestId, SidecarError> {
2580 let ownership = crate::wire::ownership_scope_to_compat(ownership);
2581 let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
2582 .map_err(wire_protocol_error)?;
2583 self.queue_sidecar_request(ownership, payload)
2584 }
2585
2586 pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
2587 let request = self.outbound_sidecar_requests.pop_front();
2588 self.outbound_sidecar_requests_gauge
2589 .observe_depth(self.outbound_sidecar_requests.len());
2590 request
2591 }
2592
2593 pub fn pop_wire_sidecar_request(
2594 &mut self,
2595 ) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
2596 self.pop_sidecar_request()
2597 .map(crate::wire::sidecar_request_frame_from_compat)
2598 .transpose()
2599 .map_err(wire_protocol_error)
2600 }
2601
2602 pub fn accept_sidecar_response(
2603 &mut self,
2604 response: SidecarResponseFrame,
2605 ) -> Result<(), SidecarError> {
2606 self.pending_sidecar_responses
2607 .accept_response(&response)
2608 .map_err(sidecar_response_tracker_error)?;
2609 self.pending_sidecar_responses_gauge
2610 .observe_depth(self.pending_sidecar_responses.pending_count());
2611 self.completed_sidecar_response_order
2612 .push_back(response.request_id);
2613 self.completed_sidecar_responses
2614 .insert(response.request_id, response);
2615 self.completed_sidecar_responses_gauge
2616 .observe_depth(self.completed_sidecar_responses.len());
2617 while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
2618 match self.completed_sidecar_response_order.pop_front() {
2619 Some(evicted) => {
2623 if self.completed_sidecar_responses.remove(&evicted).is_some() {
2624 tracing::warn!(
2625 queue = "completed_sidecar_responses",
2626 evicted_request_id = evicted,
2627 capacity = MAX_COMPLETED_SIDECAR_RESPONSES,
2628 "dropping an unretrieved completed sidecar response to stay within cap; the host can no longer fetch it (response lost)"
2629 );
2630 self.completed_sidecar_responses_gauge
2631 .observe_depth(self.completed_sidecar_responses.len());
2632 }
2633 }
2634 None => break,
2635 }
2636 }
2637 Ok(())
2638 }
2639
2640 pub fn accept_wire_sidecar_response(
2641 &mut self,
2642 response: crate::wire::SidecarResponseFrame,
2643 ) -> Result<(), SidecarError> {
2644 let response =
2645 crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
2646 self.accept_sidecar_response(response)
2647 }
2648
2649 pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
2650 let response = self.completed_sidecar_responses.remove(&request_id);
2651 if response.is_some() {
2652 self.completed_sidecar_response_order
2653 .retain(|completed_id| completed_id != &request_id);
2654 self.completed_sidecar_responses_gauge
2655 .observe_depth(self.completed_sidecar_responses.len());
2656 }
2657 response
2658 }
2659
2660 pub fn take_wire_sidecar_response(
2661 &mut self,
2662 request_id: crate::wire::RequestId,
2663 ) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
2664 self.take_sidecar_response(request_id)
2665 .map(|response| {
2666 crate::wire::sidecar_response_frame_from_compat(response)
2667 .map_err(wire_protocol_error)
2668 })
2669 .transpose()
2670 }
2671
2672 pub(crate) fn vm_lifecycle_event(
2673 &self,
2674 connection_id: &str,
2675 session_id: &str,
2676 vm_id: &str,
2677 state: VmLifecycleState,
2678 ) -> EventFrame {
2679 EventFrame::new(
2680 OwnershipScope::vm(connection_id, session_id, vm_id),
2681 EventPayload::VmLifecycle(VmLifecycleEvent { state }),
2682 )
2683 }
2684
2685 fn ensure_request_within_frame_limit(
2686 &self,
2687 request: &RequestFrame,
2688 ) -> Result<(), SidecarError> {
2689 let frame = crate::protocol::to_generated_protocol_frame(
2690 &crate::protocol::ProtocolFrame::Request(request.clone()),
2691 )
2692 .map_err(|error| {
2693 SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
2694 })?;
2695 let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
2696 return Err(SidecarError::InvalidState(String::from(
2697 "request converted to non-request wire frame",
2698 )));
2699 };
2700
2701 crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
2702 .encode(&frame)
2703 .map(|_| ())
2704 .map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
2705 }
2706}
2707
2708impl<B> ExtensionHost for NativeSidecar<B>
2709where
2710 B: NativeSidecarBridge + Send + 'static,
2711 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2712{
2713 fn spawn_process<'a>(
2714 &'a mut self,
2715 ownership: OwnershipScope,
2716 payload: ExecuteRequest,
2717 ) -> ExtensionFuture<'a, ProcessStartedResponse> {
2718 Box::pin(async move {
2719 let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
2720 let dispatch = NativeSidecar::execute(self, &request, payload).await?;
2721 match dispatch.response.payload {
2722 ResponsePayload::ProcessStarted(response) => Ok(response),
2723 other => Err(unexpected_extension_host_response("execute", other)),
2724 }
2725 })
2726 }
2727
2728 fn write_stdin<'a>(
2729 &'a mut self,
2730 ownership: OwnershipScope,
2731 payload: WriteStdinRequest,
2732 ) -> ExtensionFuture<'a, StdinWrittenResponse> {
2733 Box::pin(async move {
2734 let request =
2735 RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
2736 let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
2737 match dispatch.response.payload {
2738 ResponsePayload::StdinWritten(response) => Ok(response),
2739 other => Err(unexpected_extension_host_response("write_stdin", other)),
2740 }
2741 })
2742 }
2743
2744 fn close_stdin<'a>(
2745 &'a mut self,
2746 ownership: OwnershipScope,
2747 payload: CloseStdinRequest,
2748 ) -> ExtensionFuture<'a, StdinClosedResponse> {
2749 Box::pin(async move {
2750 let request =
2751 RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
2752 let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
2753 match dispatch.response.payload {
2754 ResponsePayload::StdinClosed(response) => Ok(response),
2755 other => Err(unexpected_extension_host_response("close_stdin", other)),
2756 }
2757 })
2758 }
2759
2760 fn kill_process<'a>(
2761 &'a mut self,
2762 ownership: OwnershipScope,
2763 payload: KillProcessRequest,
2764 ) -> ExtensionFuture<'a, ProcessKilledResponse> {
2765 Box::pin(async move {
2766 let request =
2767 RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
2768 let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
2769 match dispatch.response.payload {
2770 ResponsePayload::ProcessKilled(response) => Ok(response),
2771 other => Err(unexpected_extension_host_response("kill_process", other)),
2772 }
2773 })
2774 }
2775
2776 fn poll_event<'a>(
2777 &'a mut self,
2778 ownership: OwnershipScope,
2779 timeout: Duration,
2780 ) -> ExtensionFuture<'a, Option<EventFrame>> {
2781 Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
2782 }
2783
2784 fn guest_filesystem_call<'a>(
2785 &'a mut self,
2786 ownership: OwnershipScope,
2787 payload: GuestFilesystemCallRequest,
2788 ) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
2789 Box::pin(async move {
2790 let request = RequestFrame::new(
2791 0,
2792 ownership,
2793 RequestPayload::GuestFilesystemCall(payload.clone()),
2794 );
2795 let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
2796 match dispatch.response.payload {
2797 ResponsePayload::GuestFilesystemResult(response) => Ok(response),
2798 other => Err(unexpected_extension_host_response(
2799 "guest_filesystem_call",
2800 other,
2801 )),
2802 }
2803 })
2804 }
2805
2806 fn bind_process_to_session<'a>(
2807 &'a mut self,
2808 ownership: OwnershipScope,
2809 namespace: String,
2810 ext_session_id: String,
2811 process_id: String,
2812 ) -> ExtensionFuture<'a, ()> {
2813 Box::pin(async move {
2814 self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
2815 })
2816 }
2817
2818 fn bind_vm_to_session<'a>(
2819 &'a mut self,
2820 ownership: OwnershipScope,
2821 namespace: String,
2822 ext_session_id: String,
2823 ) -> ExtensionFuture<'a, ()> {
2824 Box::pin(
2825 async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
2826 )
2827 }
2828
2829 fn dispose_session_resources<'a>(
2830 &'a mut self,
2831 ownership: OwnershipScope,
2832 namespace: String,
2833 ext_session_id: String,
2834 ) -> ExtensionFuture<'a, Vec<EventFrame>> {
2835 Box::pin(async move {
2836 let key = (namespace, ext_session_id);
2837 let Some(resources) = self.extension_sessions.get(&key) else {
2838 return Ok(Vec::new());
2839 };
2840 if resources.ownership != ownership {
2841 return Err(SidecarError::InvalidState(String::from(
2842 "extension session ownership did not match dispose request",
2843 )));
2844 }
2845 let resources = self
2846 .extension_sessions
2847 .remove(&key)
2848 .expect("extension resources existed before removal");
2849 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2850 for process_id in resources.process_ids {
2851 if self
2852 .vms
2853 .get(&vm_id)
2854 .is_some_and(|vm| vm.active_processes.contains_key(&process_id))
2855 {
2856 self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
2857 }
2858 }
2859 let mut events = Vec::new();
2860 for resource_vm_id in resources.vm_ids {
2861 if self.vms.contains_key(&resource_vm_id) {
2862 events.extend(
2863 self.dispose_vm_internal(
2864 &connection_id,
2865 &session_id,
2866 &resource_vm_id,
2867 DisposeReason::Requested,
2868 )
2869 .await?,
2870 );
2871 }
2872 }
2873 Ok(events)
2874 })
2875 }
2876
2877 fn start_buffering_process_output<'a>(
2878 &'a mut self,
2879 ownership: OwnershipScope,
2880 process_id: String,
2881 ) -> ExtensionFuture<'a, ()> {
2882 Box::pin(async move {
2883 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2884 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2885 let key = (vm_id, process_id);
2886 if self.extension_process_output_buffers.contains_key(&key) {
2887 return Err(SidecarError::Conflict(String::from(
2888 "extension process output buffering already started",
2889 )));
2890 }
2891 self.extension_process_output_buffers
2892 .insert(key, ExtensionBufferedProcessOutput::default());
2893 Ok(())
2894 })
2895 }
2896
2897 fn handoff_buffered_process_output<'a>(
2898 &'a mut self,
2899 ownership: OwnershipScope,
2900 namespace: String,
2901 ext_session_id: String,
2902 process_id: String,
2903 timeout: Duration,
2904 ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
2905 Box::pin(async move {
2906 let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2907 self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2908 let key = (vm_id.clone(), process_id.clone());
2909 let deadline = Instant::now() + timeout;
2910 loop {
2911 self.pump_process_events(&ownership).await?;
2912 while let Some(envelope) =
2913 self.take_matching_process_event_envelope(&vm_id, &process_id)?
2914 {
2915 if self.capture_extension_process_output_event(
2916 &vm_id,
2917 &process_id,
2918 &envelope.event,
2919 ) {
2920 continue;
2921 }
2922 self.queue_pending_process_event(envelope)?;
2923 break;
2924 }
2925 let buffered = self
2926 .extension_process_output_buffers
2927 .get(&key)
2928 .is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
2929 if buffered || timeout.is_zero() || Instant::now() >= deadline {
2930 break;
2931 }
2932 let remaining = deadline.saturating_duration_since(Instant::now());
2933 time::sleep(remaining.min(Duration::from_millis(10))).await;
2934 }
2935 self.bind_extension_process_resource(
2936 ownership,
2937 namespace,
2938 ext_session_id,
2939 process_id.clone(),
2940 )?;
2941 self.extension_process_output_buffers
2942 .remove(&key)
2943 .ok_or_else(|| {
2944 SidecarError::InvalidState(String::from(
2945 "extension process output buffering was not started",
2946 ))
2947 })
2948 })
2949 }
2950}
2951
2952fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
2953 match payload {
2954 ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
2955 "extension {operation} rejected with {}: {}",
2956 response.code, response.message
2957 )),
2958 other => SidecarError::InvalidState(format!(
2959 "extension {operation} returned unexpected response: {other:?}"
2960 )),
2961 }
2962}
2963
2964fn shadow_host_path_for_process(
2965 shadow_root: &Path,
2966 process_guest_cwd: &str,
2967 guest_path: &str,
2968) -> PathBuf {
2969 let normalized_guest_path = if guest_path.starts_with('/') {
2970 normalize_path(guest_path)
2971 } else {
2972 normalize_path(&format!(
2973 "{}/{}",
2974 process_guest_cwd.trim_end_matches('/'),
2975 guest_path
2976 ))
2977 };
2978 if normalized_guest_path == "/" {
2979 shadow_root.to_path_buf()
2980 } else {
2981 shadow_root.join(normalized_guest_path.trim_start_matches('/'))
2982 }
2983}
2984
2985fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
2986 SidecarError::InvalidState(format!(
2987 "invalid sidecar response correlation state: {error}"
2988 ))
2989}
2990
2991fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
2992 match decision.verdict {
2993 secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
2994 secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
2995 decision
2996 .reason
2997 .unwrap_or_else(|| String::from("denied by host")),
2998 ),
2999 secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
3000 decision
3001 .reason
3002 .unwrap_or_else(|| String::from("permission prompt required")),
3003 ),
3004 }
3005}
3006
3007fn audit_timestamp() -> String {
3008 SystemTime::now()
3009 .duration_since(UNIX_EPOCH)
3010 .expect("system time before unix epoch")
3011 .as_millis()
3012 .to_string()
3013}
3014
3015pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
3016where
3017 I: IntoIterator<Item = (K, V)>,
3018 K: Into<String>,
3019 V: Into<String>,
3020{
3021 let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
3022 for (key, value) in fields {
3023 mapped.insert(key.into(), value.into());
3024 }
3025 mapped
3026}
3027
3028pub(crate) fn emit_structured_event<B>(
3029 bridge: &SharedBridge<B>,
3030 vm_id: &str,
3031 name: &str,
3032 fields: BTreeMap<String, String>,
3033) -> Result<(), SidecarError>
3034where
3035 B: NativeSidecarBridge + Send + 'static,
3036 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3037{
3038 bridge.with_mut(|bridge| {
3039 bridge.emit_structured_event(StructuredEventRecord {
3040 vm_id: vm_id.to_owned(),
3041 name: name.to_owned(),
3042 fields,
3043 })
3044 })
3045}
3046
3047pub(crate) fn emit_security_audit_event<B>(
3048 bridge: &SharedBridge<B>,
3049 vm_id: &str,
3050 name: &str,
3051 fields: BTreeMap<String, String>,
3052) where
3053 B: NativeSidecarBridge + Send + 'static,
3054 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3055{
3056 let _ = emit_structured_event(bridge, vm_id, name, fields);
3057}
3058
3059pub(crate) fn structured_event_frame(
3066 connection_id: &str,
3067 name: &str,
3068 detail: std::collections::HashMap<String, String>,
3069) -> Result<crate::wire::EventFrame, SidecarError> {
3070 let event = EventFrame::new(
3071 OwnershipScope::connection(connection_id),
3072 EventPayload::Structured(crate::protocol::StructuredEvent {
3073 name: name.to_owned(),
3074 detail,
3075 }),
3076 );
3077 crate::wire::event_frame_from_compat(event).map_err(|error| {
3078 SidecarError::InvalidState(format!("invalid structured event frame: {error}"))
3079 })
3080}
3081
3082pub(crate) fn log_stale_process_event<B>(
3083 bridge: &SharedBridge<B>,
3084 vm_id: &str,
3085 process_id: &str,
3086 context: &str,
3087) where
3088 B: NativeSidecarBridge + Send + 'static,
3089 BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3090{
3091 let _ = bridge.emit_log(
3092 vm_id,
3093 format!(
3094 "Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
3095 ),
3096 );
3097}
3098
3099pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
3102 SidecarError::InvalidState(format!("root filesystem: {error}"))
3103}
3104
3105pub(crate) fn normalize_path(path: &str) -> String {
3106 let mut segments = Vec::new();
3107 for component in Path::new(path).components() {
3108 match component {
3109 Component::RootDir => segments.clear(),
3110 Component::ParentDir => {
3111 segments.pop();
3112 }
3113 Component::CurDir => {}
3114 Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
3115 Component::Prefix(prefix) => {
3116 segments.push(prefix.as_os_str().to_string_lossy().into_owned());
3117 }
3118 }
3119 }
3120
3121 let normalized = format!("/{}", segments.join("/"));
3122 if normalized.is_empty() {
3123 String::from("/")
3124 } else {
3125 normalized
3126 }
3127}
3128
3129pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
3130 let mut normalized = PathBuf::new();
3131
3132 for component in path.components() {
3133 match component {
3134 Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
3135 Component::RootDir => normalized.push(Path::new("/")),
3136 Component::CurDir => {}
3137 Component::ParentDir => {
3138 if normalized != Path::new("/") {
3139 normalized.pop();
3140 }
3141 }
3142 Component::Normal(part) => normalized.push(part),
3143 }
3144 }
3145
3146 if normalized.as_os_str().is_empty() {
3147 if path.is_absolute() {
3148 PathBuf::from("/")
3149 } else {
3150 PathBuf::from(".")
3151 }
3152 } else {
3153 normalized
3154 }
3155}
3156
3157pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
3158 path == root || path.starts_with(root)
3159}
3160
3161pub(crate) fn dirname(path: &str) -> String {
3162 let normalized = normalize_path(path);
3163 let parent = Path::new(&normalized)
3164 .parent()
3165 .unwrap_or_else(|| Path::new("/"));
3166 let value = parent.to_string_lossy();
3167 if value.is_empty() {
3168 String::from("/")
3169 } else {
3170 value.into_owned()
3171 }
3172}
3173
3174pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
3175 SidecarError::Kernel(error.to_string())
3176}
3177
3178pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
3179 SidecarError::Plugin(error.to_string())
3180}
3181
3182pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
3183 SidecarError::Execution(error.to_string())
3184}
3185
3186pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
3187 SidecarError::Execution(error.to_string())
3188}
3189
3190pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
3191 SidecarError::Execution(error.to_string())
3192}
3193
3194pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
3195 SidecarError::Kernel(error.to_string())
3196}
3197
3198#[allow(dead_code)]
3208const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n - pnpm -> add `node-linker=hoisted` to .npmrc, then reinstall\n - yarn berry -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n - bun -> install dependencies outside a workspace (workspaces use a .bun store)\n - npm / yarn classic -> already flat, no change needed";
3209
3210#[allow(dead_code)]
3220fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
3221 const STORE_MARKERS: &[&str] = &[
3226 "node_modules/.pnpm/",
3227 "node_modules/.bun/",
3228 "node_modules/.store/",
3229 "/__virtual__/",
3230 ];
3231 const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
3235 const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
3236
3237 if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
3238 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3239 }
3240
3241 let missing = stderr.contains("ENOENT")
3242 || stderr.contains("no such file or directory")
3243 || stderr.contains("Cannot find module")
3244 || stderr.contains("MODULE_NOT_FOUND");
3245 if !missing {
3246 return None;
3247 }
3248 if STORE_MARKERS.iter().any(|m| stderr.contains(m))
3249 || PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
3250 {
3251 return Some(HOISTED_NODE_MODULES_GUIDANCE);
3252 }
3253 None
3254}
3255
3256#[cfg(test)]
3257mod symlinked_node_modules_hint_tests {
3258 use super::symlinked_node_modules_hint;
3259
3260 #[test]
3262 fn matches_pnpm_store_enoent() {
3263 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
3266 let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
3267 assert!(hint.contains("secure-exec can't load mounted node_modules"));
3268 assert!(!hint.contains("agentos"));
3269 }
3270
3271 #[test]
3272 fn matches_bun_store_enoent() {
3273 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
3274 assert!(symlinked_node_modules_hint(stderr).is_some());
3275 }
3276
3277 #[test]
3278 fn matches_yarn_pnpm_store_enoent() {
3279 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
3280 assert!(symlinked_node_modules_hint(stderr).is_some());
3281 }
3282
3283 #[test]
3284 fn matches_pnp_declared_error() {
3285 let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
3287 assert!(symlinked_node_modules_hint(stderr).is_some());
3288 }
3289
3290 #[test]
3291 fn matches_pnp_cjs_module_not_found() {
3292 let stderr = "Error: Cannot find module 'is-odd'\n at /root/.pnp.cjs:12345:18\n code: 'MODULE_NOT_FOUND'";
3293 assert!(symlinked_node_modules_hint(stderr).is_some());
3294 }
3295
3296 #[test]
3297 fn matches_virtual_instance() {
3298 let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
3299 assert!(symlinked_node_modules_hint(stderr).is_some());
3300 }
3301
3302 #[test]
3304 fn ignores_enoent_outside_a_store() {
3305 let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
3306 assert!(symlinked_node_modules_hint(stderr).is_none());
3307 }
3308
3309 #[test]
3310 fn ignores_store_path_without_missing_file() {
3311 let stderr =
3312 "loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
3313 assert!(symlinked_node_modules_hint(stderr).is_none());
3314 }
3315
3316 #[test]
3317 fn ignores_flat_node_modules_enoent() {
3318 let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
3320 assert!(symlinked_node_modules_hint(stderr).is_none());
3321 }
3322
3323 #[test]
3324 fn ignores_unrelated_failure() {
3325 let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
3326 assert!(symlinked_node_modules_hint(stderr).is_none());
3327 }
3328}
3329
3330#[cfg(test)]
3331mod structured_event_frame_tests {
3332 use super::*;
3333
3334 #[test]
3335 fn structured_event_frame_round_trips_limit_warning() {
3336 let mut detail = std::collections::HashMap::new();
3337 let limit_name = TrackedLimit::JavascriptEventChannel.as_str();
3339 detail.insert(String::from("limit"), String::from(limit_name));
3340 detail.insert(String::from("fillPercent"), String::from("82"));
3341
3342 let wire = structured_event_frame("conn-1", "limit_warning", detail)
3343 .expect("build structured event frame");
3344 let compat = crate::wire::event_frame_to_compat(wire).expect("convert to compat");
3345
3346 match compat.payload {
3347 EventPayload::Structured(event) => {
3348 assert_eq!(event.name, "limit_warning");
3349 assert_eq!(
3350 event.detail.get("limit").map(String::as_str),
3351 Some(limit_name)
3352 );
3353 assert_eq!(
3354 event.detail.get("fillPercent").map(String::as_str),
3355 Some("82")
3356 );
3357 }
3358 other => panic!("expected structured payload, got {other:?}"),
3359 }
3360 match compat.ownership {
3361 OwnershipScope::ConnectionOwnership(inner) => {
3362 assert_eq!(inner.connection_id, "conn-1");
3363 }
3364 other => panic!("expected connection ownership, got {other:?}"),
3365 }
3366 }
3367}