Skip to main content

secure_exec_sidecar/
service.rs

1use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
2pub(crate) use crate::execution::{
3    build_javascript_socket_path_context, canonical_signal_name, error_code,
4    ignore_stale_javascript_sync_rpc_response, javascript_sync_rpc_arg_i32,
5    javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32, javascript_sync_rpc_arg_u32_optional,
6    javascript_sync_rpc_arg_u64, javascript_sync_rpc_arg_u64_optional,
7    javascript_sync_rpc_bytes_arg, javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding,
8    javascript_sync_rpc_error_code, javascript_sync_rpc_option_bool,
9    javascript_sync_rpc_option_u32, parse_signal,
10    sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
11    vm_network_resource_counts, JavascriptSyncRpcServiceRequest,
12};
13use crate::extension::{
14    Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
15    ExtensionSnapshot,
16};
17use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
18use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
19use crate::protocol::{
20    AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
21    ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
22    GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
23    JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
24    PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
25    ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
26    RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
27    SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
28    SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
29    SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
30    VmLifecycleState, WriteStdinRequest,
31};
32use crate::state::{
33    ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
34    JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
35    SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
36};
37use crate::tools::register_host_callbacks;
38use crate::NativeSidecarBridge;
39use secure_exec_bridge::{
40    CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
41    FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
42    NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
43};
44use secure_exec_execution::{
45    JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
46    PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
47};
48use secure_exec_kernel::kernel::KernelError;
49use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
50use secure_exec_kernel::permissions::{
51    permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
52    NetworkAccessRequest, NetworkOperation, PermissionDecision,
53};
54// root_fs types moved to crate::vm
55use secure_exec_kernel::vfs::VfsError;
56use serde::Deserialize;
57use serde_json::{json, Value};
58use std::collections::{BTreeMap, BTreeSet, VecDeque};
59use std::fmt;
60use std::fs;
61use std::os::unix::fs::PermissionsExt;
62use std::path::{Component, Path, PathBuf};
63use std::sync::{Arc, Mutex};
64use std::task::{Context, Poll, Waker};
65use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
66use tokio::sync::mpsc::{channel, Receiver, Sender};
67use tokio::time;
68
69// Constants and type aliases moved to crate::state
70
71const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
72    &["AGENT_OS_ENTRYPOINT", "AGENT_OS_BOOTSTRAP_MODULE"];
73const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
74    &["AGENT_OS_WASM_MODULE_PATH", "AGENT_OS_WASM_MODULE_BASE64"];
75const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENT_OS_PYTHON_"];
76pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
77pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
78pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
79pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
80
81pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
82    SidecarError::InvalidState(format!(
83        "process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
84    ))
85}
86
87fn sidecar_response_pending_overflow_error() -> SidecarError {
88    SidecarError::InvalidState(format!(
89        "sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
90    ))
91}
92
93fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
94    SidecarError::InvalidState(format!(
95        "outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
96    ))
97}
98
99fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
100    SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
101}
102
103// NativeSidecarConfig, DispatchResult, SidecarError moved to crate::state
104pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
105
106// SharedBridge struct and Clone impl moved to crate::state
107
108#[derive(Debug, Default, Deserialize)]
109struct LegacyJavascriptChildProcessSpawnOptions {
110    #[serde(default)]
111    cwd: Option<String>,
112    #[serde(default)]
113    env: BTreeMap<String, String>,
114    #[serde(default)]
115    input: Option<Value>,
116    #[serde(default)]
117    shell: bool,
118    #[serde(default)]
119    detached: bool,
120    #[serde(default)]
121    stdio: Vec<String>,
122    #[serde(default, rename = "maxBuffer")]
123    max_buffer: Option<usize>,
124    #[serde(default)]
125    timeout: Option<u64>,
126    #[serde(default, rename = "killSignal")]
127    kill_signal: Option<String>,
128}
129
130pub(crate) fn parse_javascript_child_process_spawn_request(
131    vm: &VmState,
132    args: &[Value],
133) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
134    if let Some(value) = args.first().cloned() {
135        if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
136            return Ok((request, None));
137        }
138    }
139
140    let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
141    let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
142    let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
143
144    let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
145        SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
146    })?;
147    let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
148        raw_options,
149    )
150    .map_err(|error| {
151        SidecarError::InvalidState(format!(
152            "invalid child_process.spawn options payload: {error}"
153        ))
154    })?;
155
156    Ok((
157        JavascriptChildProcessSpawnRequest {
158            command,
159            args: parsed_args,
160            options: JavascriptChildProcessSpawnOptions {
161                cwd: parsed_options.cwd,
162                env: parsed_options.env,
163                internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
164                    &vm.guest_env,
165                ),
166                input: parsed_options.input,
167                shell: parsed_options.shell,
168                detached: parsed_options.detached,
169                stdio: parsed_options.stdio,
170                timeout: parsed_options.timeout,
171                kill_signal: parsed_options.kill_signal,
172            },
173        },
174        parsed_options.max_buffer,
175    ))
176}
177
178impl<B> SharedBridge<B> {
179    fn new(bridge: B) -> Self {
180        Self {
181            inner: Arc::new(Mutex::new(bridge)),
182            permissions: Arc::new(Mutex::new(BTreeMap::new())),
183            #[cfg(test)]
184            set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
185        }
186    }
187}
188
189impl<B> SharedBridge<B>
190where
191    B: NativeSidecarBridge + Send + 'static,
192    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
193{
194    pub(crate) fn with_mut<T>(
195        &self,
196        operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
197    ) -> Result<T, SidecarError> {
198        let mut bridge = self.inner.lock().map_err(|_| {
199            SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
200        })?;
201        operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
202    }
203
204    fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
205        let mut bridge = self.inner.lock().map_err(|_| {
206            SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
207        })?;
208        Ok(operation(&mut bridge))
209    }
210
211    #[cfg(test)]
212    #[allow(dead_code)]
213    pub(crate) fn queue_set_vm_permissions_result(
214        &self,
215        result: Result<(), SidecarError>,
216    ) -> Result<(), SidecarError> {
217        let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
218            SidecarError::Bridge(String::from(
219                "native sidecar test set_vm_permissions outcome lock poisoned",
220            ))
221        })?;
222        outcomes.push_back(result.err());
223        Ok(())
224    }
225
226    pub(crate) fn emit_lifecycle(
227        &self,
228        vm_id: &str,
229        state: LifecycleState,
230    ) -> Result<(), SidecarError> {
231        self.with_mut(|bridge| {
232            bridge.emit_lifecycle(LifecycleEventRecord {
233                vm_id: vm_id.to_owned(),
234                state,
235                detail: None,
236            })
237        })
238    }
239
240    pub(crate) fn emit_log(
241        &self,
242        vm_id: &str,
243        message: impl Into<String>,
244    ) -> Result<(), SidecarError> {
245        self.with_mut(|bridge| {
246            bridge.emit_log(LogRecord {
247                vm_id: vm_id.to_owned(),
248                level: LogLevel::Info,
249                message: message.into(),
250            })
251        })
252    }
253
254    pub(crate) fn filesystem_decision(
255        &self,
256        vm_id: &str,
257        path: &str,
258        access: FilesystemAccess,
259    ) -> PermissionDecision {
260        if let Some(decision) = self.static_permission_decision(
261            vm_id,
262            filesystem_permission_capability(access),
263            "fs",
264            Some(path),
265        ) {
266            return decision;
267        }
268        match self.with_mut(|bridge| {
269            bridge.check_filesystem_access(FilesystemPermissionRequest {
270                vm_id: vm_id.to_owned(),
271                path: path.to_owned(),
272                access,
273            })
274        }) {
275            Ok(decision) => map_bridge_permission(decision),
276            Err(error) => PermissionDecision::deny(error.to_string()),
277        }
278    }
279
280    pub(crate) fn command_decision(
281        &self,
282        vm_id: &str,
283        request: &CommandAccessRequest,
284    ) -> PermissionDecision {
285        if is_internal_runtime_command_request(request) {
286            return PermissionDecision::allow();
287        }
288        if let Some(decision) = self.static_permission_decision(
289            vm_id,
290            "child_process.spawn",
291            "child_process",
292            Some(&request.command),
293        ) {
294            return decision;
295        }
296        match self.with_mut(|bridge| {
297            bridge.check_command_execution(CommandPermissionRequest {
298                vm_id: vm_id.to_owned(),
299                command: request.command.clone(),
300                args: request.args.clone(),
301                cwd: request.cwd.clone(),
302                env: request.env.clone(),
303            })
304        }) {
305            Ok(decision) => map_bridge_permission(decision),
306            Err(error) => PermissionDecision::deny(error.to_string()),
307        }
308    }
309
310    pub(crate) fn environment_decision(
311        &self,
312        vm_id: &str,
313        request: &EnvAccessRequest,
314    ) -> PermissionDecision {
315        if let Some(decision) = self.static_permission_decision(
316            vm_id,
317            environment_permission_capability(request.op),
318            "env",
319            Some(&request.key),
320        ) {
321            return decision;
322        }
323        match self.with_mut(|bridge| {
324            bridge.check_environment_access(EnvironmentPermissionRequest {
325                vm_id: vm_id.to_owned(),
326                access: match request.op {
327                    EnvironmentOperation::Read => EnvironmentAccess::Read,
328                    EnvironmentOperation::Write => EnvironmentAccess::Write,
329                },
330                key: request.key.clone(),
331                value: request.value.clone(),
332            })
333        }) {
334            Ok(decision) => map_bridge_permission(decision),
335            Err(error) => PermissionDecision::deny(error.to_string()),
336        }
337    }
338
339    pub(crate) fn network_decision(
340        &self,
341        vm_id: &str,
342        request: &NetworkAccessRequest,
343    ) -> PermissionDecision {
344        if let Some(decision) = self.static_permission_decision(
345            vm_id,
346            network_permission_capability(request.op),
347            "network",
348            Some(&request.resource),
349        ) {
350            return decision;
351        }
352        match self.with_mut(|bridge| {
353            bridge.check_network_access(NetworkPermissionRequest {
354                vm_id: vm_id.to_owned(),
355                access: match request.op {
356                    NetworkOperation::Fetch => NetworkAccess::Fetch,
357                    NetworkOperation::Http => NetworkAccess::Http,
358                    NetworkOperation::Dns => NetworkAccess::Dns,
359                    NetworkOperation::Listen => NetworkAccess::Listen,
360                },
361                resource: request.resource.clone(),
362            })
363        }) {
364            Ok(decision) => map_bridge_permission(decision),
365            Err(error) => PermissionDecision::deny(error.to_string()),
366        }
367    }
368
369    pub(crate) fn require_network_access(
370        &self,
371        vm_id: &str,
372        op: NetworkOperation,
373        resource: impl Into<String>,
374    ) -> Result<(), SidecarError> {
375        let resource = resource.into();
376        let decision = self.network_decision(
377            vm_id,
378            &NetworkAccessRequest {
379                vm_id: vm_id.to_owned(),
380                op,
381                resource: resource.clone(),
382            },
383        );
384        if decision.allow {
385            return Ok(());
386        }
387
388        let message = match decision.reason.as_deref() {
389            Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
390            None => format!("EACCES: permission denied, {resource}"),
391        };
392        Err(SidecarError::Execution(message))
393    }
394
395    pub(crate) fn set_vm_permissions(
396        &self,
397        vm_id: &str,
398        permissions: &PermissionsPolicy,
399    ) -> Result<(), SidecarError> {
400        #[cfg(test)]
401        {
402            let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
403                SidecarError::Bridge(String::from(
404                    "native sidecar test set_vm_permissions outcome lock poisoned",
405                ))
406            })?;
407            if let Some(Some(error)) = outcomes.pop_front() {
408                return Err(error);
409            }
410        }
411
412        let mut stored = self.permissions.lock().map_err(|_| {
413            SidecarError::Bridge(String::from(
414                "native sidecar permission policy lock poisoned",
415            ))
416        })?;
417        stored.insert(vm_id.to_owned(), permissions.clone());
418        Ok(())
419    }
420
421    pub(crate) fn restore_vm_permissions_fail_closed(
422        &self,
423        vm_id: &str,
424        original_permissions: &PermissionsPolicy,
425        context: &str,
426        operation_error: &SidecarError,
427    ) -> Result<(), SidecarError> {
428        match self.set_vm_permissions(vm_id, original_permissions) {
429            Ok(()) => Ok(()),
430            Err(restore_error) => {
431                let deny_all = PermissionsPolicy::deny_all();
432                match self.set_vm_permissions(vm_id, &deny_all) {
433                    Ok(()) => Err(SidecarError::InvalidState(format!(
434                        "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
435                    ))),
436                    Err(deny_all_error) => panic!(
437                        "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
438                    ),
439                }
440            }
441        }
442    }
443
444    pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
445        let mut stored = self.permissions.lock().map_err(|_| {
446            SidecarError::Bridge(String::from(
447                "native sidecar permission policy lock poisoned",
448            ))
449        })?;
450        stored.remove(vm_id);
451        Ok(())
452    }
453
454    pub(crate) fn static_permission_decision(
455        &self,
456        vm_id: &str,
457        capability: &str,
458        domain: &str,
459        resource: Option<&str>,
460    ) -> Option<PermissionDecision> {
461        let stored = self.permissions.lock().ok()?;
462        let permissions = stored.get(vm_id)?;
463        let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
464        Some(permission_mode_to_kernel_decision(mode, capability))
465    }
466}
467
468pub(crate) fn evaluate_permissions_policy(
469    permissions: &PermissionsPolicy,
470    domain: &str,
471    capability: &str,
472    resource: Option<&str>,
473) -> PermissionMode {
474    match domain {
475        "fs" => evaluate_fs_permission_scope(
476            permissions.fs.as_ref(),
477            capability_operation(capability, domain),
478            resource,
479        ),
480        "network" => evaluate_pattern_permission_scope(
481            permissions.network.as_ref(),
482            capability_operation(capability, domain),
483            resource,
484        ),
485        "child_process" => evaluate_pattern_permission_scope(
486            permissions.child_process.as_ref(),
487            capability_operation(capability, domain),
488            resource,
489        ),
490        "process" => evaluate_pattern_permission_scope(
491            permissions.process.as_ref(),
492            capability_operation(capability, domain),
493            resource,
494        ),
495        "env" => evaluate_pattern_permission_scope(
496            permissions.env.as_ref(),
497            capability_operation(capability, domain),
498            resource,
499        ),
500        "tool" => evaluate_pattern_permission_scope(
501            permissions.tool.as_ref(),
502            capability_operation(capability, domain),
503            resource,
504        ),
505        _ => PermissionMode::Deny,
506    }
507}
508
509fn evaluate_fs_permission_scope(
510    scope: Option<&FsPermissionScope>,
511    operation: &str,
512    resource: Option<&str>,
513) -> PermissionMode {
514    match scope {
515        Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
516        Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
517            let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
518            for rule in &rules.rules {
519                if fs_rule_matches(rule, operation, resource) {
520                    mode = rule.mode.clone();
521                }
522            }
523            mode
524        }
525        None => PermissionMode::Deny,
526    }
527}
528
529fn evaluate_pattern_permission_scope(
530    scope: Option<&PatternPermissionScope>,
531    operation: &str,
532    resource: Option<&str>,
533) -> PermissionMode {
534    match scope {
535        Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
536        Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
537            let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
538            for rule in &rules.rules {
539                if pattern_rule_matches(rule, operation, resource) {
540                    mode = rule.mode.clone();
541                }
542            }
543            mode
544        }
545        None => PermissionMode::Deny,
546    }
547}
548
549fn fs_rule_matches(
550    rule: &crate::protocol::FsPermissionRule,
551    operation: &str,
552    resource: Option<&str>,
553) -> bool {
554    let operations_match = permission_operation_matches(&rule.operations, operation);
555    let paths_match = permission_resource_matches(&rule.paths, resource);
556    operations_match && paths_match
557}
558
559fn pattern_rule_matches(
560    rule: &PatternPermissionRule,
561    operation: &str,
562    resource: Option<&str>,
563) -> bool {
564    let operations_match = permission_operation_matches(&rule.operations, operation);
565    let patterns_match = permission_resource_matches(&rule.patterns, resource);
566    operations_match && patterns_match
567}
568
569fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
570    candidates
571        .iter()
572        .any(|candidate| candidate == "*" || candidate == operation)
573}
574
575fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
576    resource.is_some_and(|value| {
577        patterns
578            .iter()
579            .any(|pattern| permission_glob_matches(pattern, value))
580    })
581}
582
583pub(crate) fn validate_permissions_policy(
584    permissions: &PermissionsPolicy,
585) -> Result<(), SidecarError> {
586    if let Some(scope) = permissions.fs.as_ref() {
587        validate_fs_permission_scope("fs", scope)?;
588    }
589    if let Some(scope) = permissions.network.as_ref() {
590        validate_pattern_permission_scope("network", scope)?;
591    }
592    if let Some(scope) = permissions.child_process.as_ref() {
593        validate_pattern_permission_scope("child_process", scope)?;
594    }
595    if let Some(scope) = permissions.process.as_ref() {
596        validate_pattern_permission_scope("process", scope)?;
597    }
598    if let Some(scope) = permissions.env.as_ref() {
599        validate_pattern_permission_scope("env", scope)?;
600    }
601    if let Some(scope) = permissions.tool.as_ref() {
602        validate_pattern_permission_scope("tool", scope)?;
603    }
604    Ok(())
605}
606
607fn validate_fs_permission_scope(
608    domain: &str,
609    scope: &FsPermissionScope,
610) -> Result<(), SidecarError> {
611    let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
612        return Ok(());
613    };
614
615    for (index, rule) in rule_set.rules.iter().enumerate() {
616        validate_permission_rule_field(
617            &rule.operations,
618            &format!("{domain}.rules[{index}].operations"),
619        )?;
620        validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
621    }
622
623    Ok(())
624}
625
626fn validate_pattern_permission_scope(
627    domain: &str,
628    scope: &PatternPermissionScope,
629) -> Result<(), SidecarError> {
630    let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
631        return Ok(());
632    };
633
634    for (index, rule) in rule_set.rules.iter().enumerate() {
635        validate_permission_rule_field(
636            &rule.operations,
637            &format!("{domain}.rules[{index}].operations"),
638        )?;
639        validate_permission_rule_field(
640            &rule.patterns,
641            &format!("{domain}.rules[{index}].patterns"),
642        )?;
643    }
644
645    Ok(())
646}
647
648fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
649    if values.is_empty() {
650        return Err(SidecarError::InvalidState(format!(
651            "invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
652        )));
653    }
654    Ok(())
655}
656
657fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
658    capability
659        .strip_prefix(domain)
660        .and_then(|value| value.strip_prefix('.'))
661        .unwrap_or("")
662}
663
664fn permission_mode_to_kernel_decision(
665    mode: PermissionMode,
666    capability: &str,
667) -> PermissionDecision {
668    match mode {
669        PermissionMode::Allow => PermissionDecision::allow(),
670        PermissionMode::Ask => {
671            PermissionDecision::deny(format!("permission prompt required for {capability}"))
672        }
673        PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
674    }
675}
676
677pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
678    match access {
679        FilesystemAccess::Read => "fs.read",
680        FilesystemAccess::Write => "fs.write",
681        FilesystemAccess::Stat => "fs.stat",
682        FilesystemAccess::ReadDir => "fs.readdir",
683        FilesystemAccess::CreateDir => "fs.create_dir",
684        FilesystemAccess::Remove => "fs.rm",
685        FilesystemAccess::Rename => "fs.rename",
686        FilesystemAccess::Symlink => "fs.symlink",
687        FilesystemAccess::ReadLink => "fs.readlink",
688        FilesystemAccess::Chmod => "fs.chmod",
689        FilesystemAccess::Truncate => "fs.truncate",
690    }
691}
692
693fn network_permission_capability(operation: NetworkOperation) -> &'static str {
694    match operation {
695        NetworkOperation::Fetch => "network.fetch",
696        NetworkOperation::Http => "network.http",
697        NetworkOperation::Dns => "network.dns",
698        NetworkOperation::Listen => "network.listen",
699    }
700}
701
702fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
703    match operation {
704        EnvironmentOperation::Read => "env.read",
705        EnvironmentOperation::Write => "env.write",
706    }
707}
708
709fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
710    match request.command.as_str() {
711        "node" => request
712            .env
713            .keys()
714            .any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
715        "wasm" => request
716            .env
717            .keys()
718            .any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
719        "python" => request.env.keys().any(|key| {
720            INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
721                .iter()
722                .any(|prefix| key.starts_with(prefix))
723        }),
724        _ => false,
725    }
726}
727
728fn ownership_matches_process_event(
729    ownership: &OwnershipScope,
730    event: &ProcessEventEnvelope,
731) -> bool {
732    match ownership {
733        OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
734        OwnershipScope::SessionOwnership(inner) => {
735            inner.connection_id == event.connection_id && inner.session_id == event.session_id
736        }
737        OwnershipScope::VmOwnership(inner) => {
738            inner.connection_id == event.connection_id
739                && inner.session_id == event.session_id
740                && inner.vm_id == event.vm_id
741        }
742    }
743}
744
745fn public_process_event_matches_ownership<B>(
746    sidecar: &NativeSidecar<B>,
747    ownership: &OwnershipScope,
748    event: &ProcessEventEnvelope,
749) -> bool
750where
751    B: NativeSidecarBridge + Send + 'static,
752    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
753{
754    if !ownership_matches_process_event(ownership, event) {
755        return false;
756    }
757
758    if event.process_id.contains('/') {
759        return false;
760    }
761
762    // Stale queued events must still be drained through handle_process_event_envelope()
763    // so the sidecar can emit the expected fail-closed log when teardown wins the race.
764    let _ = sidecar;
765    true
766}
767
768fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
769    let mut context = Context::from_waker(Waker::noop());
770    match future.poll(&mut context) {
771        Poll::Ready(output) => Some(output),
772        Poll::Pending => None,
773    }
774}
775
776// ConnectionState, SessionState, VmConfiguration, VmState moved to crate::state
777
778// JavascriptSocketPathContext, JavascriptSocketFamily, VmListenPolicy moved to crate::state
779
780impl JavascriptSocketPathContext {
781    pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
782        self.loopback_exempt_ports.contains(&port)
783            || self
784                .tcp_loopback_guest_to_host_ports
785                .keys()
786                .any(|(_, guest_port)| *guest_port == port)
787    }
788
789    pub(crate) fn translate_tcp_loopback_port(
790        &self,
791        family: JavascriptSocketFamily,
792        port: u16,
793    ) -> Option<u16> {
794        self.tcp_loopback_guest_to_host_ports
795            .get(&(family, port))
796            .copied()
797    }
798
799    pub(crate) fn translate_udp_loopback_port(
800        &self,
801        family: JavascriptSocketFamily,
802        port: u16,
803    ) -> Option<u16> {
804        self.udp_loopback_guest_to_host_ports
805            .get(&(family, port))
806            .copied()
807    }
808
809    pub(crate) fn guest_udp_port_for_host_port(
810        &self,
811        family: JavascriptSocketFamily,
812        port: u16,
813    ) -> Option<u16> {
814        self.udp_loopback_host_to_guest_ports
815            .get(&(family, port))
816            .copied()
817    }
818}
819
820// ActiveProcess, NetworkResourceCounts moved to crate::state
821
822pub struct NativeSidecar<B> {
823    pub(crate) config: NativeSidecarConfig,
824    pub(crate) bridge: SharedBridge<B>,
825    pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
826    pub(crate) cache_root: PathBuf,
827    pub(crate) javascript_engine: JavascriptExecutionEngine,
828    pub(crate) python_engine: PythonExecutionEngine,
829    pub(crate) wasm_engine: WasmExecutionEngine,
830    pub(crate) next_connection_id: usize,
831    pub(crate) next_session_id: usize,
832    pub(crate) next_vm_id: usize,
833    pub(crate) next_sidecar_request_id: RequestId,
834    pub(crate) connections: BTreeMap<String, ConnectionState>,
835    pub(crate) sessions: BTreeMap<String, SessionState>,
836    pub(crate) vms: BTreeMap<String, VmState>,
837    #[allow(dead_code)]
838    pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
839    pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
840    pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
841    pub(crate) pending_sidecar_responses: SidecarResponseTracker,
842    pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
843    pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
844    pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
845    pub(crate) sidecar_requests: SharedSidecarRequestClient,
846    pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
847    pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
848    pub(crate) extension_process_output_buffers:
849        BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
850}
851
852#[derive(Debug)]
853pub(crate) struct ExtensionSessionResources {
854    pub(crate) ownership: OwnershipScope,
855    pub(crate) process_ids: BTreeSet<String>,
856    pub(crate) vm_ids: BTreeSet<String>,
857}
858
859impl<B> fmt::Debug for NativeSidecar<B> {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        f.debug_struct("NativeSidecar")
862            .field("config", &self.config)
863            .field("cache_root", &self.cache_root)
864            .field("next_connection_id", &self.next_connection_id)
865            .field("next_session_id", &self.next_session_id)
866            .field("next_vm_id", &self.next_vm_id)
867            .field("connection_count", &self.connections.len())
868            .field("session_count", &self.sessions.len())
869            .field("vm_count", &self.vms.len())
870            .field("extension_session_count", &self.extension_sessions.len())
871            .field(
872                "extension_process_output_buffer_count",
873                &self.extension_process_output_buffers.len(),
874            )
875            .finish()
876    }
877}
878
879impl<B> NativeSidecar<B>
880where
881    B: NativeSidecarBridge + Send + 'static,
882    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
883{
884    pub fn new(bridge: B) -> Result<Self, SidecarError> {
885        Self::with_config(bridge, NativeSidecarConfig::default())
886    }
887
888    pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
889        if matches!(config.expected_auth_token.as_deref(), Some("")) {
890            return Err(SidecarError::InvalidState(String::from(
891                "native sidecar expected_auth_token must not be empty",
892            )));
893        }
894
895        let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
896            std::env::temp_dir().join(format!(
897                "{}-{}",
898                config.sidecar_id,
899                SystemTime::now()
900                    .duration_since(UNIX_EPOCH)
901                    .expect("system time before unix epoch")
902                    .as_nanos()
903            ))
904        });
905        fs::create_dir_all(&cache_root).map_err(|error| {
906            SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
907        })?;
908
909        let bridge = SharedBridge::new(bridge);
910        let mount_plugins = build_mount_plugin_registry::<B>()?;
911        let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
912
913        Ok(Self {
914            config,
915            bridge,
916            mount_plugins,
917            cache_root,
918            javascript_engine: JavascriptExecutionEngine::default(),
919            python_engine: PythonExecutionEngine::default(),
920            wasm_engine: WasmExecutionEngine::default(),
921            next_connection_id: 0,
922            next_session_id: 0,
923            next_vm_id: 0,
924            next_sidecar_request_id: -1,
925            connections: BTreeMap::new(),
926            sessions: BTreeMap::new(),
927            vms: BTreeMap::new(),
928            process_event_sender,
929            process_event_receiver: Some(process_event_receiver),
930            pending_process_events: VecDeque::new(),
931            pending_sidecar_responses: SidecarResponseTracker::default(),
932            outbound_sidecar_requests: VecDeque::new(),
933            completed_sidecar_responses: BTreeMap::new(),
934            completed_sidecar_response_order: VecDeque::new(),
935            sidecar_requests: SharedSidecarRequestClient::default(),
936            extensions: BTreeMap::new(),
937            extension_sessions: BTreeMap::new(),
938            extension_process_output_buffers: BTreeMap::new(),
939        })
940    }
941
942    pub fn with_config_and_extensions(
943        bridge: B,
944        config: NativeSidecarConfig,
945        extensions: Vec<Box<dyn Extension>>,
946    ) -> Result<Self, SidecarError> {
947        let mut sidecar = Self::with_config(bridge, config)?;
948        for extension in extensions {
949            sidecar.register_extension(extension)?;
950        }
951        Ok(sidecar)
952    }
953
954    pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
955        self.extension_sessions.retain(|_, resources| {
956            resources.process_ids.remove(process_id);
957            !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
958        });
959    }
960
961    pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
962        self.extension_sessions.retain(|_, resources| {
963            if matches!(
964                &resources.ownership,
965                OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
966            ) {
967                resources.process_ids.clear();
968            }
969            resources.vm_ids.remove(vm_id);
970            !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
971        });
972    }
973
974    pub(crate) fn capture_extension_process_output_event(
975        &mut self,
976        vm_id: &str,
977        process_id: &str,
978        event: &ActiveExecutionEvent,
979    ) -> bool {
980        let Some(buffer) = self
981            .extension_process_output_buffers
982            .get_mut(&(vm_id.to_string(), process_id.to_string()))
983        else {
984            return false;
985        };
986        match event {
987            ActiveExecutionEvent::Stdout(chunk) => {
988                buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
989                true
990            }
991            ActiveExecutionEvent::Stderr(chunk) => {
992                buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
993                true
994            }
995            ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
996            | ActiveExecutionEvent::PythonVfsRpcRequest(_)
997            | ActiveExecutionEvent::SignalState { .. }
998            | ActiveExecutionEvent::Exited(_) => false,
999        }
1000    }
1001
1002    fn bind_extension_process_resource(
1003        &mut self,
1004        ownership: OwnershipScope,
1005        namespace: String,
1006        ext_session_id: String,
1007        process_id: String,
1008    ) -> Result<(), SidecarError> {
1009        if ext_session_id.is_empty() {
1010            return Err(SidecarError::InvalidState(String::from(
1011                "extension session id must not be empty",
1012            )));
1013        }
1014        let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1015        self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1016        let process_exists = self
1017            .vms
1018            .get(&vm_id)
1019            .is_some_and(|vm| vm.active_processes.contains_key(&process_id));
1020        if !process_exists {
1021            return Err(SidecarError::InvalidState(format!(
1022                "VM {vm_id} has no active process {process_id}"
1023            )));
1024        }
1025
1026        let key = (namespace, ext_session_id);
1027        if let Some(resources) = self.extension_sessions.get_mut(&key) {
1028            if resources.ownership != ownership {
1029                return Err(SidecarError::InvalidState(String::from(
1030                    "extension session ownership did not match existing resources",
1031                )));
1032            }
1033            resources.process_ids.insert(process_id);
1034        } else {
1035            self.extension_sessions.insert(
1036                key,
1037                ExtensionSessionResources {
1038                    ownership,
1039                    process_ids: BTreeSet::from([process_id]),
1040                    vm_ids: BTreeSet::new(),
1041                },
1042            );
1043        }
1044        Ok(())
1045    }
1046
1047    fn bind_extension_vm_resource(
1048        &mut self,
1049        ownership: OwnershipScope,
1050        namespace: String,
1051        ext_session_id: String,
1052    ) -> Result<(), SidecarError> {
1053        if ext_session_id.is_empty() {
1054            return Err(SidecarError::InvalidState(String::from(
1055                "extension session id must not be empty",
1056            )));
1057        }
1058        let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1059        self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1060
1061        let key = (namespace, ext_session_id);
1062        if let Some(resources) = self.extension_sessions.get_mut(&key) {
1063            if resources.ownership != ownership {
1064                return Err(SidecarError::InvalidState(String::from(
1065                    "extension session ownership did not match existing resources",
1066                )));
1067            }
1068            resources.vm_ids.insert(vm_id);
1069        } else {
1070            self.extension_sessions.insert(
1071                key,
1072                ExtensionSessionResources {
1073                    ownership,
1074                    process_ids: BTreeSet::new(),
1075                    vm_ids: BTreeSet::from([vm_id]),
1076                },
1077            );
1078        }
1079        Ok(())
1080    }
1081
1082    pub fn sidecar_id(&self) -> &str {
1083        &self.config.sidecar_id
1084    }
1085
1086    pub fn with_bridge_mut<T>(
1087        &self,
1088        operation: impl FnOnce(&mut B) -> T,
1089    ) -> Result<T, SidecarError> {
1090        self.bridge.inspect(operation)
1091    }
1092
1093    pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
1094        self.sidecar_requests.set_transport(transport);
1095    }
1096
1097    pub fn register_extension(
1098        &mut self,
1099        extension: Box<dyn Extension>,
1100    ) -> Result<(), SidecarError> {
1101        let namespace = extension.namespace().to_owned();
1102        if namespace.is_empty() {
1103            return Err(SidecarError::InvalidState(String::from(
1104                "extension namespace must not be empty",
1105            )));
1106        }
1107        if self.extensions.contains_key(&namespace) {
1108            return Err(SidecarError::Conflict(format!(
1109                "extension namespace {namespace} is already registered",
1110            )));
1111        }
1112        self.extensions.insert(namespace, Arc::from(extension));
1113        Ok(())
1114    }
1115
1116    pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
1117    where
1118        F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1119            + Send
1120            + Sync
1121            + 'static,
1122    {
1123        struct HandlerTransport<F>(F);
1124
1125        impl<F> SidecarRequestTransport for HandlerTransport<F>
1126        where
1127            F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1128                + Send
1129                + Sync
1130                + 'static,
1131        {
1132            fn send_request(
1133                &self,
1134                request: SidecarRequestFrame,
1135                _timeout: Duration,
1136            ) -> Result<SidecarResponseFrame, SidecarError> {
1137                let payload = (self.0)(request.clone())?;
1138                Ok(SidecarResponseFrame::new(
1139                    request.request_id,
1140                    request.ownership,
1141                    payload,
1142                ))
1143            }
1144        }
1145
1146        self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
1147    }
1148
1149    pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
1150    where
1151        F: Fn(
1152                crate::wire::SidecarRequestFrame,
1153            ) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
1154            + Send
1155            + Sync
1156            + 'static,
1157    {
1158        self.set_sidecar_request_handler(move |request| {
1159            let request = crate::wire::sidecar_request_frame_from_compat(request)
1160                .map_err(wire_protocol_error)?;
1161            let response = handler(request)?;
1162            let response = crate::wire::sidecar_response_frame_to_compat(response)
1163                .map_err(wire_protocol_error)?;
1164            Ok(response.payload)
1165        });
1166    }
1167
1168    pub(crate) fn queue_pending_process_event(
1169        &mut self,
1170        envelope: ProcessEventEnvelope,
1171    ) -> Result<(), SidecarError> {
1172        if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1173            return Err(process_event_queue_overflow_error());
1174        }
1175        self.pending_process_events.push_back(envelope);
1176        Ok(())
1177    }
1178
1179    pub(crate) fn queue_front_pending_process_event(
1180        &mut self,
1181        envelope: ProcessEventEnvelope,
1182    ) -> Result<(), SidecarError> {
1183        if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1184            return Err(process_event_queue_overflow_error());
1185        }
1186        self.pending_process_events.push_front(envelope);
1187        Ok(())
1188    }
1189
1190    pub(crate) fn pending_process_event_capacity(&self) -> usize {
1191        MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
1192    }
1193
1194    pub fn dispatch_blocking(
1195        &mut self,
1196        request: RequestFrame,
1197    ) -> Result<DispatchResult, SidecarError> {
1198        let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
1199        if matches!(
1200            request.payload,
1201            RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
1202        ) && !inside_runtime
1203        {
1204            return tokio::runtime::Builder::new_current_thread()
1205                .enable_all()
1206                .build()
1207                .expect("sidecar dispatch runtime")
1208                .block_on(self.dispatch(request));
1209        }
1210
1211        let mut future = std::pin::pin!(self.dispatch(request));
1212        match poll_future_once(future.as_mut()) {
1213            Some(result) => result,
1214            None if inside_runtime => Err(SidecarError::InvalidState(String::from(
1215                "dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
1216            ))),
1217            None => tokio::runtime::Builder::new_current_thread()
1218                .enable_all()
1219                .build()
1220                .expect("sidecar dispatch runtime")
1221                .block_on(future),
1222        }
1223    }
1224
1225    pub fn dispatch_wire_blocking(
1226        &mut self,
1227        request: crate::wire::RequestFrame,
1228    ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1229        let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1230        let result = self.dispatch_blocking(request)?;
1231        crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1232    }
1233
1234    pub fn poll_event_blocking(
1235        &mut self,
1236        ownership: &OwnershipScope,
1237        timeout: Duration,
1238    ) -> Result<Option<EventFrame>, SidecarError> {
1239        tokio::runtime::Builder::new_current_thread()
1240            .enable_all()
1241            .build()
1242            .expect("sidecar poll runtime")
1243            .block_on(self.poll_event(ownership, timeout))
1244    }
1245
1246    pub fn poll_event_wire_blocking(
1247        &mut self,
1248        ownership: &crate::wire::OwnershipScope,
1249        timeout: Duration,
1250    ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1251        let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1252        self.poll_event_blocking(&ownership, timeout)?
1253            .map(crate::wire::event_frame_from_compat)
1254            .transpose()
1255            .map_err(wire_protocol_error)
1256    }
1257
1258    pub fn close_session_blocking(
1259        &mut self,
1260        connection_id: &str,
1261        session_id: &str,
1262    ) -> Result<Vec<EventFrame>, SidecarError> {
1263        tokio::runtime::Builder::new_current_thread()
1264            .enable_all()
1265            .build()
1266            .expect("sidecar close-session runtime")
1267            .block_on(self.close_session(connection_id, session_id))
1268    }
1269
1270    pub fn remove_connection_blocking(
1271        &mut self,
1272        connection_id: &str,
1273    ) -> Result<Vec<EventFrame>, SidecarError> {
1274        tokio::runtime::Builder::new_current_thread()
1275            .enable_all()
1276            .build()
1277            .expect("sidecar remove-connection runtime")
1278            .block_on(self.remove_connection(connection_id))
1279    }
1280
1281    pub fn dispose_vm_internal_blocking(
1282        &mut self,
1283        connection_id: &str,
1284        session_id: &str,
1285        vm_id: &str,
1286        reason: DisposeReason,
1287    ) -> Result<Vec<EventFrame>, SidecarError> {
1288        tokio::runtime::Builder::new_current_thread()
1289            .enable_all()
1290            .build()
1291            .expect("sidecar dispose-vm runtime")
1292            .block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
1293    }
1294
1295    pub async fn dispatch(
1296        &mut self,
1297        request: RequestFrame,
1298    ) -> Result<DispatchResult, SidecarError> {
1299        if let Err(error) = self.ensure_request_within_frame_limit(&request) {
1300            return Ok(DispatchResult {
1301                response: self.reject(&request, error_code(&error), &error.to_string()),
1302                events: Vec::new(),
1303            });
1304        }
1305
1306        let result = match request.payload.clone() {
1307            RequestPayload::Authenticate(payload) => {
1308                self.authenticate_connection(&request, payload).await
1309            }
1310            RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
1311            RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
1312            RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
1313            RequestPayload::BootstrapRootFilesystem(payload) => {
1314                self.bootstrap_root_filesystem(&request, payload.entries)
1315                    .await
1316            }
1317            RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
1318            RequestPayload::RegisterHostCallbacks(payload) => {
1319                register_host_callbacks(self, &request, payload)
1320            }
1321            RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
1322            RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
1323            RequestPayload::ImportSnapshot(payload) => {
1324                self.import_snapshot(&request, payload).await
1325            }
1326            RequestPayload::ExportSnapshot(payload) => {
1327                self.export_snapshot(&request, payload).await
1328            }
1329            RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
1330            RequestPayload::GuestFilesystemCall(payload) => {
1331                self.guest_filesystem_call(&request, payload).await
1332            }
1333            RequestPayload::SnapshotRootFilesystem(payload) => {
1334                self.snapshot_root_filesystem(&request, payload).await
1335            }
1336            RequestPayload::Execute(payload) => self.execute(&request, payload).await,
1337            RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
1338            RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
1339            RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
1340            RequestPayload::GetProcessSnapshot(payload) => {
1341                self.get_process_snapshot(&request, payload).await
1342            }
1343            RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
1344            RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
1345            RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
1346            RequestPayload::GetSignalState(payload) => {
1347                self.get_signal_state(&request, payload).await
1348            }
1349            RequestPayload::GetZombieTimerCount(payload) => {
1350                self.get_zombie_timer_count(&request, payload).await
1351            }
1352            RequestPayload::HostFilesystemCall(_)
1353            | RequestPayload::PersistenceLoad(_)
1354            | RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
1355                response: self.reject(
1356                    &request,
1357                    "unsupported_direction",
1358                    "host callback request categories are sidecar-to-host only in this scaffold",
1359                ),
1360                events: Vec::new(),
1361            }),
1362            RequestPayload::Ext(payload) => {
1363                self.dispatch_extension_request(&request, payload).await
1364            }
1365        };
1366
1367        match result {
1368            Ok(dispatch) => Ok(dispatch),
1369            Err(error @ SidecarError::Io(_)) => Err(error),
1370            Err(error) => Ok(DispatchResult {
1371                response: self.reject(&request, error_code(&error), &error.to_string()),
1372                events: Vec::new(),
1373            }),
1374        }
1375    }
1376
1377    pub async fn dispatch_wire(
1378        &mut self,
1379        request: crate::wire::RequestFrame,
1380    ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1381        let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1382        let result = self.dispatch(request).await?;
1383        crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1384    }
1385
1386    pub async fn poll_event_wire(
1387        &mut self,
1388        ownership: &crate::wire::OwnershipScope,
1389        timeout: Duration,
1390    ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1391        let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1392        self.poll_event(&ownership, timeout)
1393            .await?
1394            .map(crate::wire::event_frame_from_compat)
1395            .transpose()
1396            .map_err(wire_protocol_error)
1397    }
1398
1399    async fn dispatch_extension_request(
1400        &mut self,
1401        request: &RequestFrame,
1402        envelope: ExtEnvelope,
1403    ) -> Result<DispatchResult, SidecarError> {
1404        let namespace = envelope.namespace;
1405        let Some(extension) = self.extensions.get(&namespace).cloned() else {
1406            return Ok(DispatchResult {
1407                response: self.reject(
1408                    request,
1409                    "unknown_extension",
1410                    &format!("no extension registered for namespace {namespace}"),
1411                ),
1412                events: Vec::new(),
1413            });
1414        };
1415        let snapshot = ExtensionSnapshot::new(
1416            namespace.clone(),
1417            request.ownership.clone(),
1418            self.sidecar_requests.clone(),
1419        );
1420        let ctx = ExtensionContext::new(snapshot, self);
1421        let response = extension.handle_request(ctx, envelope.payload).await?;
1422        Ok(DispatchResult {
1423            response: self.respond(
1424                request,
1425                ResponsePayload::ExtResult(ExtEnvelope {
1426                    namespace,
1427                    payload: response.payload,
1428                }),
1429            ),
1430            events: response.events,
1431        })
1432    }
1433
1434    pub async fn poll_event(
1435        &mut self,
1436        ownership: &OwnershipScope,
1437        timeout: Duration,
1438    ) -> Result<Option<EventFrame>, SidecarError> {
1439        let deadline = Instant::now() + timeout;
1440        loop {
1441            if let Some(index) = self
1442                .pending_process_events
1443                .iter()
1444                .position(|event| public_process_event_matches_ownership(self, ownership, event))
1445            {
1446                let Some(envelope) = self.pending_process_events.remove(index) else {
1447                    continue;
1448                };
1449                if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1450                    return Ok(Some(frame));
1451                }
1452                continue;
1453            }
1454
1455            if !timeout.is_zero() {
1456                let _ = self.pump_process_events(ownership).await?;
1457            }
1458
1459            let queued_envelopes = {
1460                let pending_capacity = self.pending_process_event_capacity();
1461                let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
1462                    SidecarError::InvalidState(String::from("process event receiver unavailable"))
1463                })?;
1464                let mut queued = Vec::new();
1465                loop {
1466                    if queued.len() >= pending_capacity {
1467                        if receiver.is_empty() {
1468                            break;
1469                        }
1470                        return Err(process_event_queue_overflow_error());
1471                    }
1472                    match receiver.try_recv() {
1473                        Ok(envelope) => queued.push(envelope),
1474                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1475                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
1476                    }
1477                }
1478                queued
1479            };
1480
1481            let mut matching_envelope = None;
1482            for envelope in queued_envelopes {
1483                if matching_envelope.is_none()
1484                    && public_process_event_matches_ownership(self, ownership, &envelope)
1485                {
1486                    matching_envelope = Some(envelope);
1487                } else {
1488                    self.queue_pending_process_event(envelope)?;
1489                }
1490            }
1491
1492            if let Some(envelope) = matching_envelope {
1493                if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1494                    return Ok(Some(frame));
1495                }
1496                continue;
1497            }
1498
1499            if Instant::now() >= deadline {
1500                return Ok(None);
1501            }
1502
1503            let remaining = deadline.saturating_duration_since(Instant::now());
1504            time::sleep(remaining.min(Duration::from_millis(10))).await;
1505        }
1506    }
1507
1508    pub(crate) fn handle_process_event_envelope(
1509        &mut self,
1510        envelope: ProcessEventEnvelope,
1511    ) -> Result<Option<EventFrame>, SidecarError> {
1512        let ProcessEventEnvelope {
1513            connection_id,
1514            session_id,
1515            vm_id,
1516            process_id,
1517            event,
1518        } = envelope;
1519
1520        if matches!(event, ActiveExecutionEvent::Exited(_)) {
1521            let mut trailing = Vec::new();
1522            let mut deferred = VecDeque::new();
1523            while let Some(pending) = self.pending_process_events.pop_front() {
1524                if pending.vm_id == vm_id
1525                    && pending.process_id == process_id
1526                    && !matches!(pending.event, ActiveExecutionEvent::Exited(_))
1527                {
1528                    trailing.push(pending.event);
1529                } else {
1530                    deferred.push_back(pending);
1531                }
1532            }
1533            self.pending_process_events = deferred;
1534            let drain_limit = self
1535                .pending_process_event_capacity()
1536                .saturating_sub(trailing.len().saturating_add(1));
1537            trailing.extend(
1538                self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
1539                    .into_iter()
1540                    .filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
1541            );
1542
1543            if !trailing.is_empty() {
1544                if self.pending_process_event_capacity() < trailing.len() {
1545                    return Err(process_event_queue_overflow_error());
1546                }
1547                let emit_now = if self.pending_process_event_capacity() == trailing.len() {
1548                    Some(trailing.remove(0))
1549                } else {
1550                    None
1551                };
1552                self.queue_front_pending_process_event(ProcessEventEnvelope {
1553                    connection_id: connection_id.clone(),
1554                    session_id: session_id.clone(),
1555                    vm_id: vm_id.clone(),
1556                    process_id: process_id.clone(),
1557                    event,
1558                })?;
1559                for event in trailing.into_iter().rev() {
1560                    self.queue_front_pending_process_event(ProcessEventEnvelope {
1561                        connection_id: connection_id.clone(),
1562                        session_id: session_id.clone(),
1563                        vm_id: vm_id.clone(),
1564                        process_id: process_id.clone(),
1565                        event,
1566                    })?;
1567                }
1568                if let Some(event) = emit_now {
1569                    return self.handle_execution_event(&vm_id, &process_id, event);
1570                }
1571                return Ok(None);
1572            }
1573        }
1574
1575        self.handle_execution_event(&vm_id, &process_id, event)
1576    }
1577
1578    // try_poll_event moved to crate::execution
1579
1580    pub async fn close_session(
1581        &mut self,
1582        connection_id: &str,
1583        session_id: &str,
1584    ) -> Result<Vec<EventFrame>, SidecarError> {
1585        self.dispose_session(connection_id, session_id, DisposeReason::Requested)
1586            .await
1587    }
1588
1589    pub async fn remove_connection(
1590        &mut self,
1591        connection_id: &str,
1592    ) -> Result<Vec<EventFrame>, SidecarError> {
1593        self.require_authenticated_connection(connection_id)?;
1594
1595        let session_ids = self
1596            .connections
1597            .get(connection_id)
1598            .expect("authenticated connection should exist")
1599            .sessions
1600            .iter()
1601            .cloned()
1602            .collect::<Vec<_>>();
1603
1604        let mut events = Vec::new();
1605        for session_id in session_ids {
1606            events.extend(
1607                self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
1608                    .await?,
1609            );
1610        }
1611
1612        self.connections.remove(connection_id);
1613        Ok(events)
1614    }
1615
1616    async fn authenticate_connection(
1617        &mut self,
1618        request: &RequestFrame,
1619        payload: crate::protocol::AuthenticateRequest,
1620    ) -> Result<DispatchResult, SidecarError> {
1621        let _ = self.connection_id_for(&request.ownership)?;
1622        if let Err(error) = self.validate_auth_token(&payload.auth_token) {
1623            let mut fields = audit_fields([
1624                (String::from("source"), payload.client_name.clone()),
1625                (String::from("reason"), error.to_string()),
1626            ]);
1627            if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
1628                fields.insert(String::from("connection_id"), inner.connection_id.clone());
1629            }
1630            emit_security_audit_event(
1631                &self.bridge,
1632                &self.config.sidecar_id,
1633                "security.auth.failed",
1634                fields,
1635            );
1636            return Err(error);
1637        }
1638
1639        if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
1640            return Err(SidecarError::ProtocolVersionMismatch(format!(
1641                "sidecar protocol version mismatch: expected {}, got {}",
1642                crate::wire::PROTOCOL_VERSION,
1643                payload.protocol_version
1644            )));
1645        }
1646
1647        let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
1648        if payload.bridge_version != expected_bridge_version {
1649            return Err(SidecarError::BridgeVersionMismatch(format!(
1650                "bridge contract version mismatch: expected {expected_bridge_version}, got {}",
1651                payload.bridge_version
1652            )));
1653        }
1654
1655        let connection_id = self.allocate_connection_id();
1656        self.connections.insert(
1657            connection_id.clone(),
1658            ConnectionState {
1659                auth_token: payload.auth_token,
1660                sessions: BTreeSet::new(),
1661            },
1662        );
1663
1664        let response = self.response_with_ownership(
1665            request.request_id,
1666            OwnershipScope::connection(&connection_id),
1667            ResponsePayload::Authenticated(AuthenticatedResponse {
1668                sidecar_id: self.config.sidecar_id.clone(),
1669                connection_id,
1670                max_frame_bytes: self.config.max_frame_bytes as u32,
1671            }),
1672        );
1673        Ok(DispatchResult {
1674            response,
1675            events: Vec::new(),
1676        })
1677    }
1678
1679    async fn open_session(
1680        &mut self,
1681        request: &RequestFrame,
1682        payload: OpenSessionRequest,
1683    ) -> Result<DispatchResult, SidecarError> {
1684        let connection_id = self.connection_id_for(&request.ownership)?;
1685        self.require_authenticated_connection(&connection_id)?;
1686
1687        self.next_session_id += 1;
1688        let session_id = format!("session-{}", self.next_session_id);
1689        self.sessions.insert(
1690            session_id.clone(),
1691            SessionState {
1692                connection_id: connection_id.clone(),
1693                placement: payload.placement,
1694                metadata: payload.metadata.into_iter().collect(),
1695                vm_ids: BTreeSet::new(),
1696            },
1697        );
1698        self.connections
1699            .get_mut(&connection_id)
1700            .expect("authenticated connection should exist")
1701            .sessions
1702            .insert(session_id.clone());
1703
1704        Ok(DispatchResult {
1705            response: self.respond(
1706                request,
1707                ResponsePayload::SessionOpened(SessionOpenedResponse {
1708                    session_id,
1709                    owner_connection_id: connection_id,
1710                }),
1711            ),
1712            events: Vec::new(),
1713        })
1714    }
1715
1716    // create_vm, dispose_vm, bootstrap_root_filesystem, configure_vm moved to crate::vm
1717
1718    async fn guest_filesystem_call(
1719        &mut self,
1720        request: &RequestFrame,
1721        payload: GuestFilesystemCallRequest,
1722    ) -> Result<DispatchResult, SidecarError> {
1723        filesystem_guest_filesystem_call(self, request, payload).await
1724    }
1725
1726    // snapshot_root_filesystem moved to crate::vm
1727
1728    // execute, write_stdin, close_stdin, kill_process, find_listener, find_bound_udp,
1729    // get_signal_state, get_zombie_timer_count moved to crate::execution
1730
1731    async fn dispose_session(
1732        &mut self,
1733        connection_id: &str,
1734        session_id: &str,
1735        reason: DisposeReason,
1736    ) -> Result<Vec<EventFrame>, SidecarError> {
1737        self.require_owned_session(connection_id, session_id)?;
1738
1739        let vm_ids = self
1740            .sessions
1741            .get(session_id)
1742            .expect("owned session should exist")
1743            .vm_ids
1744            .iter()
1745            .cloned()
1746            .collect::<Vec<_>>();
1747
1748        let mut events = Vec::new();
1749        for vm_id in vm_ids {
1750            events.extend(
1751                self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
1752                    .await?,
1753            );
1754        }
1755
1756        self.sessions.remove(session_id);
1757        if let Some(connection) = self.connections.get_mut(connection_id) {
1758            connection.sessions.remove(session_id);
1759        }
1760        Ok(events)
1761    }
1762
1763    // dispose_vm_internal, terminate_vm_processes, wait_for_vm_processes_to_exit moved to crate::vm
1764
1765    // kill_process_internal, handle_execution_event, handle_python_vfs_rpc_request,
1766    // resolve_javascript_child_process_execution, spawn_javascript_child_process,
1767    // poll_javascript_child_process, write_javascript_child_process_stdin,
1768    // close_javascript_child_process_stdin, kill_javascript_child_process moved to crate::execution
1769
1770    pub(crate) fn handle_javascript_sync_rpc_request(
1771        &mut self,
1772        vm_id: &str,
1773        process_id: &str,
1774        request: JavascriptSyncRpcRequest,
1775    ) -> Result<(), SidecarError> {
1776        let Some(vm) = self.vms.get(vm_id) else {
1777            log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1778            return Ok(());
1779        };
1780        if !vm.active_processes.contains_key(process_id) {
1781            log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1782            return Ok(());
1783        }
1784
1785        let response: Result<Value, SidecarError> = match request.method.as_str() {
1786            "child_process.spawn" => {
1787                let Some(vm) = self.vms.get(vm_id) else {
1788                    log_stale_process_event(
1789                        &self.bridge,
1790                        vm_id,
1791                        process_id,
1792                        "javascript sync RPC child_process.spawn",
1793                    );
1794                    return Ok(());
1795                };
1796                let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
1797                self.spawn_javascript_child_process(vm_id, process_id, payload)
1798            }
1799            "child_process.spawn_sync" => {
1800                let Some(vm) = self.vms.get(vm_id) else {
1801                    log_stale_process_event(
1802                        &self.bridge,
1803                        vm_id,
1804                        process_id,
1805                        "javascript sync RPC child_process.spawn_sync",
1806                    );
1807                    return Ok(());
1808                };
1809                let (payload, max_buffer) =
1810                    parse_javascript_child_process_spawn_request(vm, &request.args)?;
1811                self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
1812            }
1813            "child_process.poll" => {
1814                let child_process_id =
1815                    javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
1816                let wait_ms = javascript_sync_rpc_arg_u64_optional(
1817                    &request.args,
1818                    1,
1819                    "child_process.poll wait ms",
1820                )?
1821                .unwrap_or_default();
1822                self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
1823            }
1824            "child_process.write_stdin" => {
1825                let child_process_id = javascript_sync_rpc_arg_str(
1826                    &request.args,
1827                    0,
1828                    "child_process.write_stdin child id",
1829                )?;
1830                let chunk = javascript_sync_rpc_bytes_arg(
1831                    &request.args,
1832                    1,
1833                    "child_process.write_stdin chunk",
1834                )?;
1835                self.write_javascript_child_process_stdin(
1836                    vm_id,
1837                    process_id,
1838                    child_process_id,
1839                    &chunk,
1840                )?;
1841                Ok(Value::Null)
1842            }
1843            "child_process.close_stdin" => {
1844                let child_process_id = javascript_sync_rpc_arg_str(
1845                    &request.args,
1846                    0,
1847                    "child_process.close_stdin child id",
1848                )?;
1849                self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
1850                Ok(Value::Null)
1851            }
1852            "child_process.kill" => {
1853                let child_process_id =
1854                    javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
1855                let signal =
1856                    javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
1857                self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
1858                Ok(Value::Null)
1859            }
1860            "process.kill" => {
1861                let target_pid =
1862                    javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
1863                let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
1864                let parsed_signal = parse_signal(signal)?;
1865                if parsed_signal == 0 {
1866                    let Some(vm) = self.vms.get(vm_id) else {
1867                        log_stale_process_event(
1868                            &self.bridge,
1869                            vm_id,
1870                            process_id,
1871                            "javascript sync RPC process.kill",
1872                        );
1873                        return Ok(());
1874                    };
1875                    if !vm.active_processes.contains_key(process_id) {
1876                        log_stale_process_event(
1877                            &self.bridge,
1878                            vm_id,
1879                            process_id,
1880                            "javascript sync RPC process.kill",
1881                        );
1882                        return Ok(());
1883                    }
1884                    vm.kernel
1885                        .signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
1886                        .map(|()| Value::Null)
1887                        .map_err(kernel_error)
1888                } else if target_pid < 0 {
1889                    let caller_kernel_pid = {
1890                        let Some(vm) = self.vms.get(vm_id) else {
1891                            log_stale_process_event(
1892                                &self.bridge,
1893                                vm_id,
1894                                process_id,
1895                                "javascript sync RPC process.kill",
1896                            );
1897                            return Ok(());
1898                        };
1899                        let Some(caller) = vm.active_processes.get(process_id) else {
1900                            log_stale_process_event(
1901                                &self.bridge,
1902                                vm_id,
1903                                process_id,
1904                                "javascript sync RPC process.kill",
1905                            );
1906                            return Ok(());
1907                        };
1908                        caller.kernel_pid
1909                    };
1910                    let pgid = target_pid.unsigned_abs();
1911                    match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
1912                        Ok(true) => {
1913                            Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1914                        }
1915                        Ok(false) => Ok(Value::Null),
1916                        Err(error) => Err(error),
1917                    }
1918                } else {
1919                    enum ProcessKillTarget {
1920                        SelfProcess,
1921                        Child(String),
1922                        TopLevel(String),
1923                        KernelPid(u32),
1924                    }
1925                    let target = {
1926                        let Some(vm) = self.vms.get(vm_id) else {
1927                            log_stale_process_event(
1928                                &self.bridge,
1929                                vm_id,
1930                                process_id,
1931                                "javascript sync RPC process.kill",
1932                            );
1933                            return Ok(());
1934                        };
1935                        let Some(caller) = vm.active_processes.get(process_id) else {
1936                            log_stale_process_event(
1937                                &self.bridge,
1938                                vm_id,
1939                                process_id,
1940                                "javascript sync RPC process.kill",
1941                            );
1942                            return Ok(());
1943                        };
1944                        let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
1945                            SidecarError::InvalidState("caller pid exceeds i32".into())
1946                        })?;
1947                        if caller_pid == target_pid {
1948                            ProcessKillTarget::SelfProcess
1949                        } else if let Some((child_process_id, _)) = caller
1950                            .child_processes
1951                            .iter()
1952                            .find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
1953                        {
1954                            ProcessKillTarget::Child(child_process_id.clone())
1955                        } else if let Some((target_process_id, _)) =
1956                            vm.active_processes.iter().find(|(_, process)| {
1957                                i32::try_from(process.kernel_pid) == Ok(target_pid)
1958                            })
1959                        {
1960                            ProcessKillTarget::TopLevel(target_process_id.clone())
1961                        } else {
1962                            let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
1963                                SidecarError::InvalidState(format!(
1964                                    "EINVAL: invalid process pid {target_pid}"
1965                                ))
1966                            })?;
1967                            ProcessKillTarget::KernelPid(target_kernel_pid)
1968                        }
1969                    };
1970                    match target {
1971                        ProcessKillTarget::SelfProcess => {
1972                            Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1973                        }
1974                        ProcessKillTarget::Child(child_process_id) => {
1975                            self.kill_javascript_child_process(
1976                                vm_id,
1977                                process_id,
1978                                &child_process_id,
1979                                signal,
1980                            )?;
1981                            Ok(Value::Null)
1982                        }
1983                        ProcessKillTarget::TopLevel(target_process_id) => {
1984                            self.kill_process_internal(vm_id, &target_process_id, signal)?;
1985                            Ok(Value::Null)
1986                        }
1987                        ProcessKillTarget::KernelPid(target_kernel_pid) => {
1988                            // Grandchildren and untracked kernel processes are
1989                            // resolved VM-wide instead of failing with an
1990                            // unknown-pid error.
1991                            self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
1992                                .map(|()| Value::Null)
1993                        }
1994                    }
1995                }
1996            }
1997            "process.signal_state" => {
1998                let signal =
1999                    javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
2000                let action =
2001                    javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
2002                let mask_json =
2003                    javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
2004                let flags =
2005                    javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
2006                let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
2007                    SidecarError::InvalidState(format!(
2008                        "process.signal_state mask must be valid JSON: {error}"
2009                    ))
2010                })?;
2011                let action = match action.trim().to_ascii_lowercase().as_str() {
2012                    "default" => SignalDispositionAction::Default,
2013                    "ignore" => SignalDispositionAction::Ignore,
2014                    "user" => SignalDispositionAction::User,
2015                    other => {
2016                        return Err(SidecarError::InvalidState(format!(
2017                            "unsupported process.signal_state action {other}"
2018                        )));
2019                    }
2020                };
2021                let Some(vm) = self.vms.get_mut(vm_id) else {
2022                    log_stale_process_event(
2023                        &self.bridge,
2024                        vm_id,
2025                        process_id,
2026                        "javascript sync RPC process.signal_state",
2027                    );
2028                    return Ok(());
2029                };
2030                if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
2031                    let remove_process_entry = vm
2032                        .signal_states
2033                        .get_mut(process_id)
2034                        .map(|handlers| {
2035                            handlers.remove(&signal);
2036                            handlers.is_empty()
2037                        })
2038                        .unwrap_or(false);
2039                    if remove_process_entry {
2040                        vm.signal_states.remove(process_id);
2041                    }
2042                } else {
2043                    vm.signal_states
2044                        .entry(process_id.to_owned())
2045                        .or_default()
2046                        .insert(
2047                            signal,
2048                            SignalHandlerRegistration {
2049                                action,
2050                                mask,
2051                                flags,
2052                            },
2053                        );
2054                }
2055                Ok(Value::Null)
2056            }
2057            _ => {
2058                let Some(vm) = self.vms.get_mut(vm_id) else {
2059                    log_stale_process_event(
2060                        &self.bridge,
2061                        vm_id,
2062                        process_id,
2063                        "javascript sync RPC bridge dispatch",
2064                    );
2065                    return Ok(());
2066                };
2067                let resource_limits = vm.kernel.resource_limits().clone();
2068                let network_counts = vm_network_resource_counts(vm);
2069                let socket_paths = build_javascript_socket_path_context(vm)?;
2070                let Some(process) = vm.active_processes.get_mut(process_id) else {
2071                    log_stale_process_event(
2072                        &self.bridge,
2073                        vm_id,
2074                        process_id,
2075                        "javascript sync RPC bridge dispatch",
2076                    );
2077                    return Ok(());
2078                };
2079                service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
2080                    bridge: &self.bridge,
2081                    vm_id,
2082                    dns: &vm.dns,
2083                    socket_paths: &socket_paths,
2084                    kernel: &mut vm.kernel,
2085                    process,
2086                    sync_request: &request,
2087                    resource_limits: &resource_limits,
2088                    network_counts,
2089                })
2090            }
2091        };
2092
2093        let Some(vm) = self.vms.get_mut(vm_id) else {
2094            log_stale_process_event(
2095                &self.bridge,
2096                vm_id,
2097                process_id,
2098                "javascript sync RPC response delivery",
2099            );
2100            return Ok(());
2101        };
2102        let shadow_root = vm.cwd.clone();
2103        let Some(process) = vm.active_processes.get_mut(process_id) else {
2104            log_stale_process_event(
2105                &self.bridge,
2106                vm_id,
2107                process_id,
2108                "javascript sync RPC response delivery",
2109            );
2110            return Ok(());
2111        };
2112
2113        if response.is_ok()
2114            && matches!(
2115                request.method.as_str(),
2116                "fs.chmodSync" | "fs.promises.chmod"
2117            )
2118        {
2119            let guest_path =
2120                javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
2121            let mode =
2122                javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
2123            let host_path =
2124                shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
2125            if host_path.exists() {
2126                fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
2127                    |error| {
2128                        SidecarError::Io(format!(
2129                            "failed to mirror chmod to shadow path {}: {error}",
2130                            host_path.display()
2131                        ))
2132                    },
2133                )?;
2134            }
2135        }
2136
2137        match response {
2138            Ok(result) => process
2139                .execution
2140                .respond_javascript_sync_rpc_success(request.id, result)
2141                .or_else(ignore_stale_javascript_sync_rpc_response),
2142            Err(error) => process
2143                .execution
2144                .respond_javascript_sync_rpc_error(
2145                    request.id,
2146                    javascript_sync_rpc_error_code(&error),
2147                    error.to_string(),
2148                )
2149                .or_else(ignore_stale_javascript_sync_rpc_response),
2150        }
2151    }
2152
2153    /// Applies a `process.kill` aimed at the calling process itself and
2154    /// returns the self-delivery action payload for the bridge.
2155    fn apply_self_process_kill(
2156        &mut self,
2157        vm_id: &str,
2158        process_id: &str,
2159        parsed_signal: i32,
2160    ) -> Value {
2161        let action = self
2162            .vms
2163            .get(vm_id)
2164            .and_then(|vm| vm.signal_states.get(process_id))
2165            .and_then(|handlers| handlers.get(&(parsed_signal as u32)))
2166            .map(|registration| registration.action.clone())
2167            .unwrap_or(SignalDispositionAction::Default);
2168        if action == SignalDispositionAction::Default
2169            && parsed_signal != 0
2170            && !matches!(
2171                canonical_signal_name(parsed_signal),
2172                Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
2173            )
2174        {
2175            if let Some(vm) = self.vms.get_mut(vm_id) {
2176                if let Some(process) = vm.active_processes.get_mut(process_id) {
2177                    process.pending_self_signal_exit = Some(parsed_signal);
2178                }
2179            }
2180        }
2181        json!({
2182            "self": true,
2183            "action": match action {
2184                SignalDispositionAction::Default => "default",
2185                SignalDispositionAction::Ignore => "ignore",
2186                SignalDispositionAction::User => "user",
2187            },
2188        })
2189    }
2190
2191    pub(crate) fn vm_ids_for_scope(
2192        &self,
2193        ownership: &OwnershipScope,
2194    ) -> Result<Vec<String>, SidecarError> {
2195        match ownership {
2196            OwnershipScope::SessionOwnership(inner) => {
2197                self.require_owned_session(&inner.connection_id, &inner.session_id)?;
2198                Ok(self
2199                    .sessions
2200                    .get(&inner.session_id)
2201                    .expect("owned session should exist")
2202                    .vm_ids
2203                    .iter()
2204                    .cloned()
2205                    .collect())
2206            }
2207            OwnershipScope::VmOwnership(inner) => {
2208                self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
2209                Ok(vec![inner.vm_id.clone()])
2210            }
2211            OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
2212                String::from("event polling requires session or VM ownership scope"),
2213            )),
2214        }
2215    }
2216
2217    pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
2218        let vm = self
2219            .vms
2220            .get(vm_id)
2221            .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2222        Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
2223    }
2224
2225    pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
2226        self.vms
2227            .get(vm_id)
2228            .is_some_and(|vm| !vm.active_processes.is_empty())
2229    }
2230
2231    fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
2232        if self.connections.contains_key(connection_id) {
2233            Ok(())
2234        } else {
2235            Err(SidecarError::InvalidState(format!(
2236                "connection {connection_id} has not authenticated"
2237            )))
2238        }
2239    }
2240
2241    pub(crate) fn require_owned_session(
2242        &self,
2243        connection_id: &str,
2244        session_id: &str,
2245    ) -> Result<(), SidecarError> {
2246        self.require_authenticated_connection(connection_id)?;
2247        let session = self.sessions.get(session_id).ok_or_else(|| {
2248            SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
2249        })?;
2250        if session.connection_id == connection_id {
2251            Ok(())
2252        } else {
2253            Err(SidecarError::InvalidState(format!(
2254                "session {session_id} is not owned by connection {connection_id}"
2255            )))
2256        }
2257    }
2258
2259    pub(crate) fn require_owned_vm(
2260        &self,
2261        connection_id: &str,
2262        session_id: &str,
2263        vm_id: &str,
2264    ) -> Result<(), SidecarError> {
2265        self.require_owned_session(connection_id, session_id)?;
2266        let vm = self
2267            .vms
2268            .get(vm_id)
2269            .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2270        if vm.connection_id != connection_id || vm.session_id != session_id {
2271            return Err(SidecarError::InvalidState(format!(
2272                "VM {vm_id} is not owned by {connection_id}/{session_id}"
2273            )));
2274        }
2275        Ok(())
2276    }
2277
2278    fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
2279        match ownership {
2280            OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
2281            OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2282                Err(SidecarError::InvalidState(String::from(
2283                    "request requires connection ownership scope",
2284                )))
2285            }
2286        }
2287    }
2288
2289    fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
2290        let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
2291            return Ok(());
2292        };
2293
2294        if auth_token == expected_auth_token {
2295            Ok(())
2296        } else {
2297            Err(SidecarError::Unauthorized(String::from(
2298                "authenticate request provided an invalid auth token",
2299            )))
2300        }
2301    }
2302
2303    fn allocate_connection_id(&mut self) -> String {
2304        self.next_connection_id += 1;
2305        format!("conn-{}", self.next_connection_id)
2306    }
2307
2308    fn take_matching_process_event_envelope(
2309        &mut self,
2310        vm_id: &str,
2311        process_id: &str,
2312    ) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
2313        if let Some(index) = self
2314            .pending_process_events
2315            .iter()
2316            .position(|event| event.vm_id == vm_id && event.process_id == process_id)
2317        {
2318            return Ok(self.pending_process_events.remove(index));
2319        }
2320
2321        let mut matching_envelope = None;
2322        let mut deferred = Vec::new();
2323        {
2324            let pending_capacity = self.pending_process_event_capacity();
2325            let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
2326                SidecarError::InvalidState(String::from("process event receiver unavailable"))
2327            })?;
2328            loop {
2329                if deferred.len() >= pending_capacity {
2330                    if receiver.is_empty() {
2331                        break;
2332                    }
2333                    return Err(process_event_queue_overflow_error());
2334                }
2335                let envelope = match receiver.try_recv() {
2336                    Ok(envelope) => envelope,
2337                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
2338                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
2339                };
2340                if matching_envelope.is_none()
2341                    && envelope.vm_id == vm_id
2342                    && envelope.process_id == process_id
2343                {
2344                    matching_envelope = Some(envelope);
2345                    break;
2346                }
2347                deferred.push(envelope);
2348            }
2349        }
2350        for envelope in deferred {
2351            self.queue_pending_process_event(envelope)?;
2352        }
2353
2354        Ok(matching_envelope)
2355    }
2356
2357    fn allocate_sidecar_request_id(&mut self) -> RequestId {
2358        let request_id = self.next_sidecar_request_id;
2359        self.next_sidecar_request_id -= 1;
2360        request_id
2361    }
2362
2363    pub(crate) fn session_scope_for(
2364        &self,
2365        ownership: &OwnershipScope,
2366    ) -> Result<(String, String), SidecarError> {
2367        match ownership {
2368            OwnershipScope::SessionOwnership(inner) => {
2369                Ok((inner.connection_id.clone(), inner.session_id.clone()))
2370            }
2371            OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2372                Err(SidecarError::InvalidState(String::from(
2373                    "request requires session ownership scope",
2374                )))
2375            }
2376        }
2377    }
2378
2379    pub(crate) fn vm_scope_for(
2380        &self,
2381        ownership: &OwnershipScope,
2382    ) -> Result<(String, String, String), SidecarError> {
2383        match ownership {
2384            OwnershipScope::VmOwnership(inner) => Ok((
2385                inner.connection_id.clone(),
2386                inner.session_id.clone(),
2387                inner.vm_id.clone(),
2388            )),
2389            OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
2390                SidecarError::InvalidState(String::from("request requires VM ownership scope")),
2391            ),
2392        }
2393    }
2394
2395    fn response_with_ownership(
2396        &self,
2397        request_id: RequestId,
2398        ownership: OwnershipScope,
2399        payload: ResponsePayload,
2400    ) -> ResponseFrame {
2401        ResponseFrame {
2402            schema: ProtocolSchema::current(),
2403            request_id,
2404            ownership,
2405            payload,
2406        }
2407    }
2408
2409    pub(crate) fn respond(
2410        &self,
2411        request: &RequestFrame,
2412        payload: ResponsePayload,
2413    ) -> ResponseFrame {
2414        self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
2415    }
2416
2417    fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
2418        self.respond(
2419            request,
2420            ResponsePayload::Rejected(RejectedResponse {
2421                code: code.to_owned(),
2422                message: message.to_owned(),
2423            }),
2424        )
2425    }
2426
2427    pub fn queue_sidecar_request(
2428        &mut self,
2429        ownership: OwnershipScope,
2430        payload: SidecarRequestPayload,
2431    ) -> Result<RequestId, SidecarError> {
2432        if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
2433            return Err(outbound_sidecar_request_queue_overflow_error());
2434        }
2435        if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
2436            return Err(sidecar_response_pending_overflow_error());
2437        }
2438        let request_id = self.allocate_sidecar_request_id();
2439        let request = SidecarRequestFrame::new(request_id, ownership, payload);
2440        self.pending_sidecar_responses
2441            .register_request(&request)
2442            .map_err(sidecar_response_tracker_error)?;
2443        self.outbound_sidecar_requests.push_back(request);
2444        Ok(request_id)
2445    }
2446
2447    pub fn queue_wire_sidecar_request(
2448        &mut self,
2449        ownership: crate::wire::OwnershipScope,
2450        payload: crate::wire::SidecarRequestPayload,
2451    ) -> Result<crate::wire::RequestId, SidecarError> {
2452        let ownership = crate::wire::ownership_scope_to_compat(ownership);
2453        let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
2454            .map_err(wire_protocol_error)?;
2455        self.queue_sidecar_request(ownership, payload)
2456    }
2457
2458    pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
2459        self.outbound_sidecar_requests.pop_front()
2460    }
2461
2462    pub fn pop_wire_sidecar_request(
2463        &mut self,
2464    ) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
2465        self.pop_sidecar_request()
2466            .map(crate::wire::sidecar_request_frame_from_compat)
2467            .transpose()
2468            .map_err(wire_protocol_error)
2469    }
2470
2471    pub fn accept_sidecar_response(
2472        &mut self,
2473        response: SidecarResponseFrame,
2474    ) -> Result<(), SidecarError> {
2475        self.pending_sidecar_responses
2476            .accept_response(&response)
2477            .map_err(sidecar_response_tracker_error)?;
2478        self.completed_sidecar_response_order
2479            .push_back(response.request_id);
2480        self.completed_sidecar_responses
2481            .insert(response.request_id, response);
2482        while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
2483            if let Some(evicted) = self.completed_sidecar_response_order.pop_front() {
2484                self.completed_sidecar_responses.remove(&evicted);
2485            }
2486        }
2487        Ok(())
2488    }
2489
2490    pub fn accept_wire_sidecar_response(
2491        &mut self,
2492        response: crate::wire::SidecarResponseFrame,
2493    ) -> Result<(), SidecarError> {
2494        let response =
2495            crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
2496        self.accept_sidecar_response(response)
2497    }
2498
2499    pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
2500        let response = self.completed_sidecar_responses.remove(&request_id);
2501        if response.is_some() {
2502            self.completed_sidecar_response_order
2503                .retain(|completed_id| completed_id != &request_id);
2504        }
2505        response
2506    }
2507
2508    pub fn take_wire_sidecar_response(
2509        &mut self,
2510        request_id: crate::wire::RequestId,
2511    ) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
2512        self.take_sidecar_response(request_id)
2513            .map(|response| {
2514                crate::wire::sidecar_response_frame_from_compat(response)
2515                    .map_err(wire_protocol_error)
2516            })
2517            .transpose()
2518    }
2519
2520    pub(crate) fn vm_lifecycle_event(
2521        &self,
2522        connection_id: &str,
2523        session_id: &str,
2524        vm_id: &str,
2525        state: VmLifecycleState,
2526    ) -> EventFrame {
2527        EventFrame::new(
2528            OwnershipScope::vm(connection_id, session_id, vm_id),
2529            EventPayload::VmLifecycle(VmLifecycleEvent { state }),
2530        )
2531    }
2532
2533    fn ensure_request_within_frame_limit(
2534        &self,
2535        request: &RequestFrame,
2536    ) -> Result<(), SidecarError> {
2537        let frame = crate::protocol::to_generated_protocol_frame(
2538            &crate::protocol::ProtocolFrame::Request(request.clone()),
2539        )
2540        .map_err(|error| {
2541            SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
2542        })?;
2543        let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
2544            return Err(SidecarError::InvalidState(String::from(
2545                "request converted to non-request wire frame",
2546            )));
2547        };
2548
2549        crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
2550            .encode(&frame)
2551            .map(|_| ())
2552            .map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
2553    }
2554}
2555
2556impl<B> ExtensionHost for NativeSidecar<B>
2557where
2558    B: NativeSidecarBridge + Send + 'static,
2559    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2560{
2561    fn spawn_process<'a>(
2562        &'a mut self,
2563        ownership: OwnershipScope,
2564        payload: ExecuteRequest,
2565    ) -> ExtensionFuture<'a, ProcessStartedResponse> {
2566        Box::pin(async move {
2567            let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
2568            let dispatch = NativeSidecar::execute(self, &request, payload).await?;
2569            match dispatch.response.payload {
2570                ResponsePayload::ProcessStarted(response) => Ok(response),
2571                other => Err(unexpected_extension_host_response("execute", other)),
2572            }
2573        })
2574    }
2575
2576    fn write_stdin<'a>(
2577        &'a mut self,
2578        ownership: OwnershipScope,
2579        payload: WriteStdinRequest,
2580    ) -> ExtensionFuture<'a, StdinWrittenResponse> {
2581        Box::pin(async move {
2582            let request =
2583                RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
2584            let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
2585            match dispatch.response.payload {
2586                ResponsePayload::StdinWritten(response) => Ok(response),
2587                other => Err(unexpected_extension_host_response("write_stdin", other)),
2588            }
2589        })
2590    }
2591
2592    fn close_stdin<'a>(
2593        &'a mut self,
2594        ownership: OwnershipScope,
2595        payload: CloseStdinRequest,
2596    ) -> ExtensionFuture<'a, StdinClosedResponse> {
2597        Box::pin(async move {
2598            let request =
2599                RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
2600            let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
2601            match dispatch.response.payload {
2602                ResponsePayload::StdinClosed(response) => Ok(response),
2603                other => Err(unexpected_extension_host_response("close_stdin", other)),
2604            }
2605        })
2606    }
2607
2608    fn kill_process<'a>(
2609        &'a mut self,
2610        ownership: OwnershipScope,
2611        payload: KillProcessRequest,
2612    ) -> ExtensionFuture<'a, ProcessKilledResponse> {
2613        Box::pin(async move {
2614            let request =
2615                RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
2616            let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
2617            match dispatch.response.payload {
2618                ResponsePayload::ProcessKilled(response) => Ok(response),
2619                other => Err(unexpected_extension_host_response("kill_process", other)),
2620            }
2621        })
2622    }
2623
2624    fn poll_event<'a>(
2625        &'a mut self,
2626        ownership: OwnershipScope,
2627        timeout: Duration,
2628    ) -> ExtensionFuture<'a, Option<EventFrame>> {
2629        Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
2630    }
2631
2632    fn guest_filesystem_call<'a>(
2633        &'a mut self,
2634        ownership: OwnershipScope,
2635        payload: GuestFilesystemCallRequest,
2636    ) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
2637        Box::pin(async move {
2638            let request = RequestFrame::new(
2639                0,
2640                ownership,
2641                RequestPayload::GuestFilesystemCall(payload.clone()),
2642            );
2643            let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
2644            match dispatch.response.payload {
2645                ResponsePayload::GuestFilesystemResult(response) => Ok(response),
2646                other => Err(unexpected_extension_host_response(
2647                    "guest_filesystem_call",
2648                    other,
2649                )),
2650            }
2651        })
2652    }
2653
2654    fn bind_process_to_session<'a>(
2655        &'a mut self,
2656        ownership: OwnershipScope,
2657        namespace: String,
2658        ext_session_id: String,
2659        process_id: String,
2660    ) -> ExtensionFuture<'a, ()> {
2661        Box::pin(async move {
2662            self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
2663        })
2664    }
2665
2666    fn bind_vm_to_session<'a>(
2667        &'a mut self,
2668        ownership: OwnershipScope,
2669        namespace: String,
2670        ext_session_id: String,
2671    ) -> ExtensionFuture<'a, ()> {
2672        Box::pin(
2673            async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
2674        )
2675    }
2676
2677    fn dispose_session_resources<'a>(
2678        &'a mut self,
2679        ownership: OwnershipScope,
2680        namespace: String,
2681        ext_session_id: String,
2682    ) -> ExtensionFuture<'a, Vec<EventFrame>> {
2683        Box::pin(async move {
2684            let key = (namespace, ext_session_id);
2685            let Some(resources) = self.extension_sessions.get(&key) else {
2686                return Ok(Vec::new());
2687            };
2688            if resources.ownership != ownership {
2689                return Err(SidecarError::InvalidState(String::from(
2690                    "extension session ownership did not match dispose request",
2691                )));
2692            }
2693            let resources = self
2694                .extension_sessions
2695                .remove(&key)
2696                .expect("extension resources existed before removal");
2697            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2698            for process_id in resources.process_ids {
2699                if self
2700                    .vms
2701                    .get(&vm_id)
2702                    .is_some_and(|vm| vm.active_processes.contains_key(&process_id))
2703                {
2704                    self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
2705                }
2706            }
2707            let mut events = Vec::new();
2708            for resource_vm_id in resources.vm_ids {
2709                if self.vms.contains_key(&resource_vm_id) {
2710                    events.extend(
2711                        self.dispose_vm_internal(
2712                            &connection_id,
2713                            &session_id,
2714                            &resource_vm_id,
2715                            DisposeReason::Requested,
2716                        )
2717                        .await?,
2718                    );
2719                }
2720            }
2721            Ok(events)
2722        })
2723    }
2724
2725    fn start_buffering_process_output<'a>(
2726        &'a mut self,
2727        ownership: OwnershipScope,
2728        process_id: String,
2729    ) -> ExtensionFuture<'a, ()> {
2730        Box::pin(async move {
2731            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2732            self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2733            let key = (vm_id, process_id);
2734            if self.extension_process_output_buffers.contains_key(&key) {
2735                return Err(SidecarError::Conflict(String::from(
2736                    "extension process output buffering already started",
2737                )));
2738            }
2739            self.extension_process_output_buffers
2740                .insert(key, ExtensionBufferedProcessOutput::default());
2741            Ok(())
2742        })
2743    }
2744
2745    fn handoff_buffered_process_output<'a>(
2746        &'a mut self,
2747        ownership: OwnershipScope,
2748        namespace: String,
2749        ext_session_id: String,
2750        process_id: String,
2751        timeout: Duration,
2752    ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
2753        Box::pin(async move {
2754            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2755            self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2756            let key = (vm_id.clone(), process_id.clone());
2757            let deadline = Instant::now() + timeout;
2758            loop {
2759                self.pump_process_events(&ownership).await?;
2760                while let Some(envelope) =
2761                    self.take_matching_process_event_envelope(&vm_id, &process_id)?
2762                {
2763                    if self.capture_extension_process_output_event(
2764                        &vm_id,
2765                        &process_id,
2766                        &envelope.event,
2767                    ) {
2768                        continue;
2769                    }
2770                    self.queue_pending_process_event(envelope)?;
2771                    break;
2772                }
2773                let buffered = self
2774                    .extension_process_output_buffers
2775                    .get(&key)
2776                    .is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
2777                if buffered || timeout.is_zero() || Instant::now() >= deadline {
2778                    break;
2779                }
2780                let remaining = deadline.saturating_duration_since(Instant::now());
2781                time::sleep(remaining.min(Duration::from_millis(10))).await;
2782            }
2783            self.bind_extension_process_resource(
2784                ownership,
2785                namespace,
2786                ext_session_id,
2787                process_id.clone(),
2788            )?;
2789            self.extension_process_output_buffers
2790                .remove(&key)
2791                .ok_or_else(|| {
2792                    SidecarError::InvalidState(String::from(
2793                        "extension process output buffering was not started",
2794                    ))
2795                })
2796        })
2797    }
2798}
2799
2800fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
2801    match payload {
2802        ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
2803            "extension {operation} rejected with {}: {}",
2804            response.code, response.message
2805        )),
2806        other => SidecarError::InvalidState(format!(
2807            "extension {operation} returned unexpected response: {other:?}"
2808        )),
2809    }
2810}
2811
2812fn shadow_host_path_for_process(
2813    shadow_root: &Path,
2814    process_guest_cwd: &str,
2815    guest_path: &str,
2816) -> PathBuf {
2817    let normalized_guest_path = if guest_path.starts_with('/') {
2818        normalize_path(guest_path)
2819    } else {
2820        normalize_path(&format!(
2821            "{}/{}",
2822            process_guest_cwd.trim_end_matches('/'),
2823            guest_path
2824        ))
2825    };
2826    if normalized_guest_path == "/" {
2827        shadow_root.to_path_buf()
2828    } else {
2829        shadow_root.join(normalized_guest_path.trim_start_matches('/'))
2830    }
2831}
2832
2833fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
2834    SidecarError::InvalidState(format!(
2835        "invalid sidecar response correlation state: {error}"
2836    ))
2837}
2838
2839fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
2840    match decision.verdict {
2841        secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
2842        secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
2843            decision
2844                .reason
2845                .unwrap_or_else(|| String::from("denied by host")),
2846        ),
2847        secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
2848            decision
2849                .reason
2850                .unwrap_or_else(|| String::from("permission prompt required")),
2851        ),
2852    }
2853}
2854
2855fn audit_timestamp() -> String {
2856    SystemTime::now()
2857        .duration_since(UNIX_EPOCH)
2858        .expect("system time before unix epoch")
2859        .as_millis()
2860        .to_string()
2861}
2862
2863pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
2864where
2865    I: IntoIterator<Item = (K, V)>,
2866    K: Into<String>,
2867    V: Into<String>,
2868{
2869    let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
2870    for (key, value) in fields {
2871        mapped.insert(key.into(), value.into());
2872    }
2873    mapped
2874}
2875
2876pub(crate) fn emit_structured_event<B>(
2877    bridge: &SharedBridge<B>,
2878    vm_id: &str,
2879    name: &str,
2880    fields: BTreeMap<String, String>,
2881) -> Result<(), SidecarError>
2882where
2883    B: NativeSidecarBridge + Send + 'static,
2884    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2885{
2886    bridge.with_mut(|bridge| {
2887        bridge.emit_structured_event(StructuredEventRecord {
2888            vm_id: vm_id.to_owned(),
2889            name: name.to_owned(),
2890            fields,
2891        })
2892    })
2893}
2894
2895pub(crate) fn emit_security_audit_event<B>(
2896    bridge: &SharedBridge<B>,
2897    vm_id: &str,
2898    name: &str,
2899    fields: BTreeMap<String, String>,
2900) where
2901    B: NativeSidecarBridge + Send + 'static,
2902    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2903{
2904    let _ = emit_structured_event(bridge, vm_id, name, fields);
2905}
2906
2907pub(crate) fn log_stale_process_event<B>(
2908    bridge: &SharedBridge<B>,
2909    vm_id: &str,
2910    process_id: &str,
2911    context: &str,
2912) where
2913    B: NativeSidecarBridge + Send + 'static,
2914    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2915{
2916    let _ = bridge.emit_log(
2917        vm_id,
2918        format!(
2919            "Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
2920        ),
2921    );
2922}
2923
2924// filesystem_operation_label moved to crate::vm
2925
2926pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
2927    SidecarError::InvalidState(format!("root filesystem: {error}"))
2928}
2929
2930pub(crate) fn normalize_path(path: &str) -> String {
2931    let mut segments = Vec::new();
2932    for component in Path::new(path).components() {
2933        match component {
2934            Component::RootDir => segments.clear(),
2935            Component::ParentDir => {
2936                segments.pop();
2937            }
2938            Component::CurDir => {}
2939            Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
2940            Component::Prefix(prefix) => {
2941                segments.push(prefix.as_os_str().to_string_lossy().into_owned());
2942            }
2943        }
2944    }
2945
2946    let normalized = format!("/{}", segments.join("/"));
2947    if normalized.is_empty() {
2948        String::from("/")
2949    } else {
2950        normalized
2951    }
2952}
2953
2954pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
2955    let mut normalized = PathBuf::new();
2956
2957    for component in path.components() {
2958        match component {
2959            Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
2960            Component::RootDir => normalized.push(Path::new("/")),
2961            Component::CurDir => {}
2962            Component::ParentDir => {
2963                if normalized != Path::new("/") {
2964                    normalized.pop();
2965                }
2966            }
2967            Component::Normal(part) => normalized.push(part),
2968        }
2969    }
2970
2971    if normalized.as_os_str().is_empty() {
2972        if path.is_absolute() {
2973            PathBuf::from("/")
2974        } else {
2975            PathBuf::from(".")
2976        }
2977    } else {
2978        normalized
2979    }
2980}
2981
2982pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
2983    path == root || path.starts_with(root)
2984}
2985
2986pub(crate) fn dirname(path: &str) -> String {
2987    let normalized = normalize_path(path);
2988    let parent = Path::new(&normalized)
2989        .parent()
2990        .unwrap_or_else(|| Path::new("/"));
2991    let value = parent.to_string_lossy();
2992    if value.is_empty() {
2993        String::from("/")
2994    } else {
2995        value.into_owned()
2996    }
2997}
2998
2999pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
3000    SidecarError::Kernel(error.to_string())
3001}
3002
3003pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
3004    SidecarError::Plugin(error.to_string())
3005}
3006
3007pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
3008    SidecarError::Execution(error.to_string())
3009}
3010
3011pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
3012    SidecarError::Execution(error.to_string())
3013}
3014
3015pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
3016    SidecarError::Execution(error.to_string())
3017}
3018
3019pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
3020    SidecarError::Kernel(error.to_string())
3021}
3022
3023/// Actionable guidance shown when guest package resolution fails because the packages live in a
3024/// non-flat `node_modules` whose package store is not visible in the VM. Mounting host `node_modules`
3025/// is a bind mount, so symlinked/store layouts
3026/// do not resolve inside the VM: Node canonicalizes a module to its store
3027/// realpath (e.g. `node_modules/.pnpm/...`, `.bun/...`, `.store/...`) which lives
3028/// above the mounted directory and the guest `fs` cannot read. Plug'n'Play
3029/// (yarn-berry default) has no `node_modules` at all. A flat (hoisted) layout is
3030/// required. The empirically-supported package managers are captured in
3031/// `crates/sidecar/tests/module_layout_e2e.rs`.
3032#[allow(dead_code)]
3033const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n  - pnpm        -> add `node-linker=hoisted` to .npmrc, then reinstall\n  - yarn berry  -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n  - bun         -> install dependencies outside a workspace (workspaces use a .bun store)\n  - npm / yarn classic -> already flat, no change needed";
3034
3035/// Detect, from an adapter's captured stderr, a non-flat-`node_modules` failure
3036/// signature. Returns the actionable guidance to fold into the surfaced error,
3037/// or `None` when the failure is unrelated.
3038///
3039/// Two signatures, both kept specific so they never fire on unrelated crashes:
3040/// - a missing-file / cannot-resolve error referencing a package STORE path that
3041///   lives above the mounted project (`.pnpm`, `.bun`, `.store`, PnP `__virtual__`),
3042/// - a yarn Plug'n'Play fingerprint (`.pnp.cjs`, the zip cache, or PnP's
3043///   "isn't declared in your dependencies" resolver error).
3044#[allow(dead_code)]
3045fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
3046    // Package stores that only appear in a path when a non-flat layout is used.
3047    // pnpm (isolated), bun (workspace), yarn-berry (nodeLinker: pnpm), and PnP
3048    // virtual instances all keep real package files under these store dirs, which
3049    // sit above the mounted project node_modules and so are not guest-visible.
3050    const STORE_MARKERS: &[&str] = &[
3051        "node_modules/.pnpm/",
3052        "node_modules/.bun/",
3053        "node_modules/.store/",
3054        "/__virtual__/",
3055    ];
3056    // Yarn Plug'n'Play has no node_modules at all; resolution fails against the
3057    // .pnp runtime / zip cache. "isn't declared in your dependencies" is PnP's
3058    // distinctive resolver error and is specific enough to fire on its own.
3059    const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
3060    const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
3061
3062    if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
3063        return Some(HOISTED_NODE_MODULES_GUIDANCE);
3064    }
3065
3066    let missing = stderr.contains("ENOENT")
3067        || stderr.contains("no such file or directory")
3068        || stderr.contains("Cannot find module")
3069        || stderr.contains("MODULE_NOT_FOUND");
3070    if !missing {
3071        return None;
3072    }
3073    if STORE_MARKERS.iter().any(|m| stderr.contains(m))
3074        || PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
3075    {
3076        return Some(HOISTED_NODE_MODULES_GUIDANCE);
3077    }
3078    None
3079}
3080
3081#[cfg(test)]
3082mod symlinked_node_modules_hint_tests {
3083    use super::symlinked_node_modules_hint;
3084
3085    // Positive cases: each non-flat package manager's store/PnP signature.
3086    #[test]
3087    fn matches_pnpm_store_enoent() {
3088        // Real pi-coding-agent failure: getPackageDir() falls back to a
3089        // dist/package.json inside the unreachable .pnpm store.
3090        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
3091        let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
3092        assert!(hint.contains("secure-exec can't load mounted node_modules"));
3093        assert!(!hint.contains("agent-os"));
3094    }
3095
3096    #[test]
3097    fn matches_bun_store_enoent() {
3098        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
3099        assert!(symlinked_node_modules_hint(stderr).is_some());
3100    }
3101
3102    #[test]
3103    fn matches_yarn_pnpm_store_enoent() {
3104        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
3105        assert!(symlinked_node_modules_hint(stderr).is_some());
3106    }
3107
3108    #[test]
3109    fn matches_pnp_declared_error() {
3110        // Yarn PnP's distinctive resolver error (no node_modules at all).
3111        let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
3112        assert!(symlinked_node_modules_hint(stderr).is_some());
3113    }
3114
3115    #[test]
3116    fn matches_pnp_cjs_module_not_found() {
3117        let stderr = "Error: Cannot find module 'is-odd'\n    at /root/.pnp.cjs:12345:18\n    code: 'MODULE_NOT_FOUND'";
3118        assert!(symlinked_node_modules_hint(stderr).is_some());
3119    }
3120
3121    #[test]
3122    fn matches_virtual_instance() {
3123        let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
3124        assert!(symlinked_node_modules_hint(stderr).is_some());
3125    }
3126
3127    // Negative cases: must not fire.
3128    #[test]
3129    fn ignores_enoent_outside_a_store() {
3130        let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
3131        assert!(symlinked_node_modules_hint(stderr).is_none());
3132    }
3133
3134    #[test]
3135    fn ignores_store_path_without_missing_file() {
3136        let stderr =
3137            "loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
3138        assert!(symlinked_node_modules_hint(stderr).is_none());
3139    }
3140
3141    #[test]
3142    fn ignores_flat_node_modules_enoent() {
3143        // npm / yarn-nm / pnpm-hoisted: flat, no store dir in the path.
3144        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
3145        assert!(symlinked_node_modules_hint(stderr).is_none());
3146    }
3147
3148    #[test]
3149    fn ignores_unrelated_failure() {
3150        let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
3151        assert!(symlinked_node_modules_hint(stderr).is_none());
3152    }
3153}