Skip to main content

secure_exec_sidecar/
service.rs

1use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
2pub(crate) use crate::execution::{
3    build_javascript_socket_path_context, canonical_signal_name, dispatch_loopback_http_request,
4    error_code, format_tcp_resource, ignore_stale_javascript_sync_rpc_response,
5    javascript_sync_rpc_arg_i32, javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32,
6    javascript_sync_rpc_arg_u32_optional, javascript_sync_rpc_arg_u64,
7    javascript_sync_rpc_arg_u64_optional, javascript_sync_rpc_bytes_arg,
8    javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding, javascript_sync_rpc_error_code,
9    javascript_sync_rpc_option_bool, javascript_sync_rpc_option_u32, parse_signal,
10    sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
11    vm_network_resource_counts, JavascriptSyncRpcServiceRequest, LoopbackHttpDispatchRequest,
12};
13use crate::extension::{
14    Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
15    ExtensionSnapshot,
16};
17use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
18use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
19use crate::protocol::{
20    AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
21    ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
22    GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
23    JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
24    PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
25    ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
26    RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
27    SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
28    SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
29    SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
30    VmLifecycleState, WriteStdinRequest,
31};
32use crate::state::{
33    ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
34    JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
35    SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
36};
37use crate::tools::register_host_callbacks;
38use crate::NativeSidecarBridge;
39use secure_exec_bridge::queue_tracker::{register_queue, QueueGauge, TrackedLimit};
40use secure_exec_bridge::{
41    CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
42    FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
43    NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
44};
45use secure_exec_execution::{
46    JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
47    PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
48};
49use secure_exec_kernel::kernel::KernelError;
50use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
51use secure_exec_kernel::permissions::{
52    permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
53    NetworkAccessRequest, NetworkOperation, PermissionDecision,
54};
55// root_fs types moved to crate::vm
56use secure_exec_kernel::vfs::VfsError;
57use serde::Deserialize;
58use serde_json::{json, Value};
59use std::collections::{BTreeMap, BTreeSet, VecDeque};
60use std::fmt;
61use std::fs;
62use std::os::unix::fs::PermissionsExt;
63use std::path::{Component, Path, PathBuf};
64use std::sync::{Arc, Mutex};
65use std::task::{Context, Poll, Waker};
66use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
67use tokio::sync::mpsc::{channel, Receiver, Sender};
68use tokio::time;
69
70// Constants and type aliases moved to crate::state
71
72const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
73    &["AGENTOS_ENTRYPOINT", "AGENTOS_BOOTSTRAP_MODULE"];
74const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
75    &["AGENTOS_WASM_MODULE_PATH", "AGENTOS_WASM_MODULE_BASE64"];
76const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENTOS_PYTHON_"];
77pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
78pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
79pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
80pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
81
82pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
83    SidecarError::InvalidState(format!(
84        "process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
85    ))
86}
87
88fn sidecar_response_pending_overflow_error() -> SidecarError {
89    SidecarError::InvalidState(format!(
90        "sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
91    ))
92}
93
94fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
95    SidecarError::InvalidState(format!(
96        "outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
97    ))
98}
99
100fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
101    SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
102}
103
104// NativeSidecarConfig, DispatchResult, SidecarError moved to crate::state
105pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
106
107// SharedBridge struct and Clone impl moved to crate::state
108
109#[derive(Debug, Default, Deserialize)]
110struct LegacyJavascriptChildProcessSpawnOptions {
111    #[serde(default)]
112    cwd: Option<String>,
113    #[serde(default)]
114    env: BTreeMap<String, String>,
115    #[serde(default)]
116    input: Option<Value>,
117    #[serde(default)]
118    shell: bool,
119    #[serde(default)]
120    detached: bool,
121    #[serde(default)]
122    stdio: Vec<String>,
123    #[serde(default, rename = "maxBuffer")]
124    max_buffer: Option<usize>,
125    #[serde(default)]
126    timeout: Option<u64>,
127    #[serde(default, rename = "killSignal")]
128    kill_signal: Option<String>,
129}
130
131#[derive(Debug, Deserialize)]
132#[serde(rename_all = "camelCase")]
133struct JavascriptHttpLoopbackRequest {
134    process_id: String,
135    server_id: u64,
136    host: String,
137    port: u16,
138    request: String,
139}
140
141fn is_javascript_loopback_host(host: &str) -> bool {
142    host == "127.0.0.1" || host == "::1" || host.eq_ignore_ascii_case("localhost")
143}
144
145pub(crate) fn parse_javascript_child_process_spawn_request(
146    vm: &VmState,
147    args: &[Value],
148) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
149    if let Some(value) = args.first().cloned() {
150        if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
151            return Ok((request, None));
152        }
153    }
154
155    let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
156    let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
157    let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
158
159    let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
160        SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
161    })?;
162    let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
163        raw_options,
164    )
165    .map_err(|error| {
166        SidecarError::InvalidState(format!(
167            "invalid child_process.spawn options payload: {error}"
168        ))
169    })?;
170
171    Ok((
172        JavascriptChildProcessSpawnRequest {
173            command,
174            args: parsed_args,
175            options: JavascriptChildProcessSpawnOptions {
176                cwd: parsed_options.cwd,
177                env: parsed_options.env,
178                internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
179                    &vm.guest_env,
180                ),
181                input: parsed_options.input,
182                shell: parsed_options.shell,
183                detached: parsed_options.detached,
184                stdio: parsed_options.stdio,
185                timeout: parsed_options.timeout,
186                kill_signal: parsed_options.kill_signal,
187            },
188        },
189        parsed_options.max_buffer,
190    ))
191}
192
193impl<B> SharedBridge<B> {
194    fn new(bridge: B) -> Self {
195        Self {
196            inner: Arc::new(Mutex::new(bridge)),
197            permissions: Arc::new(Mutex::new(BTreeMap::new())),
198            #[cfg(test)]
199            set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
200        }
201    }
202}
203
204impl<B> SharedBridge<B>
205where
206    B: NativeSidecarBridge + Send + 'static,
207    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
208{
209    pub(crate) fn with_mut<T>(
210        &self,
211        operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
212    ) -> Result<T, SidecarError> {
213        let mut bridge = self.inner.lock().map_err(|_| {
214            SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
215        })?;
216        operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
217    }
218
219    fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
220        let mut bridge = self.inner.lock().map_err(|_| {
221            SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
222        })?;
223        Ok(operation(&mut bridge))
224    }
225
226    #[cfg(test)]
227    #[allow(dead_code)]
228    pub(crate) fn queue_set_vm_permissions_result(
229        &self,
230        result: Result<(), SidecarError>,
231    ) -> Result<(), SidecarError> {
232        let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
233            SidecarError::Bridge(String::from(
234                "native sidecar test set_vm_permissions outcome lock poisoned",
235            ))
236        })?;
237        outcomes.push_back(result.err());
238        Ok(())
239    }
240
241    pub(crate) fn emit_lifecycle(
242        &self,
243        vm_id: &str,
244        state: LifecycleState,
245    ) -> Result<(), SidecarError> {
246        self.with_mut(|bridge| {
247            bridge.emit_lifecycle(LifecycleEventRecord {
248                vm_id: vm_id.to_owned(),
249                state,
250                detail: None,
251            })
252        })
253    }
254
255    pub(crate) fn emit_log(
256        &self,
257        vm_id: &str,
258        message: impl Into<String>,
259    ) -> Result<(), SidecarError> {
260        self.with_mut(|bridge| {
261            bridge.emit_log(LogRecord {
262                vm_id: vm_id.to_owned(),
263                level: LogLevel::Info,
264                message: message.into(),
265            })
266        })
267    }
268
269    pub(crate) fn filesystem_decision(
270        &self,
271        vm_id: &str,
272        path: &str,
273        access: FilesystemAccess,
274    ) -> PermissionDecision {
275        if let Some(decision) = self.static_permission_decision(
276            vm_id,
277            filesystem_permission_capability(access),
278            "fs",
279            Some(path),
280        ) {
281            return decision;
282        }
283        match self.with_mut(|bridge| {
284            bridge.check_filesystem_access(FilesystemPermissionRequest {
285                vm_id: vm_id.to_owned(),
286                path: path.to_owned(),
287                access,
288            })
289        }) {
290            Ok(decision) => map_bridge_permission(decision),
291            Err(error) => PermissionDecision::deny(error.to_string()),
292        }
293    }
294
295    pub(crate) fn command_decision(
296        &self,
297        vm_id: &str,
298        request: &CommandAccessRequest,
299    ) -> PermissionDecision {
300        if is_internal_runtime_command_request(request) {
301            return PermissionDecision::allow();
302        }
303        if let Some(decision) = self.static_permission_decision(
304            vm_id,
305            "child_process.spawn",
306            "child_process",
307            Some(&request.command),
308        ) {
309            return decision;
310        }
311        match self.with_mut(|bridge| {
312            bridge.check_command_execution(CommandPermissionRequest {
313                vm_id: vm_id.to_owned(),
314                command: request.command.clone(),
315                args: request.args.clone(),
316                cwd: request.cwd.clone(),
317                env: request.env.clone(),
318            })
319        }) {
320            Ok(decision) => map_bridge_permission(decision),
321            Err(error) => PermissionDecision::deny(error.to_string()),
322        }
323    }
324
325    pub(crate) fn environment_decision(
326        &self,
327        vm_id: &str,
328        request: &EnvAccessRequest,
329    ) -> PermissionDecision {
330        if let Some(decision) = self.static_permission_decision(
331            vm_id,
332            environment_permission_capability(request.op),
333            "env",
334            Some(&request.key),
335        ) {
336            return decision;
337        }
338        match self.with_mut(|bridge| {
339            bridge.check_environment_access(EnvironmentPermissionRequest {
340                vm_id: vm_id.to_owned(),
341                access: match request.op {
342                    EnvironmentOperation::Read => EnvironmentAccess::Read,
343                    EnvironmentOperation::Write => EnvironmentAccess::Write,
344                },
345                key: request.key.clone(),
346                value: request.value.clone(),
347            })
348        }) {
349            Ok(decision) => map_bridge_permission(decision),
350            Err(error) => PermissionDecision::deny(error.to_string()),
351        }
352    }
353
354    pub(crate) fn network_decision(
355        &self,
356        vm_id: &str,
357        request: &NetworkAccessRequest,
358    ) -> PermissionDecision {
359        if let Some(decision) = self.static_permission_decision(
360            vm_id,
361            network_permission_capability(request.op),
362            "network",
363            Some(&request.resource),
364        ) {
365            return decision;
366        }
367        match self.with_mut(|bridge| {
368            bridge.check_network_access(NetworkPermissionRequest {
369                vm_id: vm_id.to_owned(),
370                access: match request.op {
371                    NetworkOperation::Fetch => NetworkAccess::Fetch,
372                    NetworkOperation::Http => NetworkAccess::Http,
373                    NetworkOperation::Dns => NetworkAccess::Dns,
374                    NetworkOperation::Listen => NetworkAccess::Listen,
375                },
376                resource: request.resource.clone(),
377            })
378        }) {
379            Ok(decision) => map_bridge_permission(decision),
380            Err(error) => PermissionDecision::deny(error.to_string()),
381        }
382    }
383
384    pub(crate) fn require_network_access(
385        &self,
386        vm_id: &str,
387        op: NetworkOperation,
388        resource: impl Into<String>,
389    ) -> Result<(), SidecarError> {
390        let resource = resource.into();
391        let decision = self.network_decision(
392            vm_id,
393            &NetworkAccessRequest {
394                vm_id: vm_id.to_owned(),
395                op,
396                resource: resource.clone(),
397            },
398        );
399        if decision.allow {
400            return Ok(());
401        }
402
403        let message = match decision.reason.as_deref() {
404            Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
405            None => format!("EACCES: permission denied, {resource}"),
406        };
407        Err(SidecarError::Execution(message))
408    }
409
410    pub(crate) fn set_vm_permissions(
411        &self,
412        vm_id: &str,
413        permissions: &PermissionsPolicy,
414    ) -> Result<(), SidecarError> {
415        #[cfg(test)]
416        {
417            let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
418                SidecarError::Bridge(String::from(
419                    "native sidecar test set_vm_permissions outcome lock poisoned",
420                ))
421            })?;
422            if let Some(Some(error)) = outcomes.pop_front() {
423                return Err(error);
424            }
425        }
426
427        let mut stored = self.permissions.lock().map_err(|_| {
428            SidecarError::Bridge(String::from(
429                "native sidecar permission policy lock poisoned",
430            ))
431        })?;
432        stored.insert(vm_id.to_owned(), permissions.clone());
433        Ok(())
434    }
435
436    pub(crate) fn restore_vm_permissions_fail_closed(
437        &self,
438        vm_id: &str,
439        original_permissions: &PermissionsPolicy,
440        context: &str,
441        operation_error: &SidecarError,
442    ) -> Result<(), SidecarError> {
443        match self.set_vm_permissions(vm_id, original_permissions) {
444            Ok(()) => Ok(()),
445            Err(restore_error) => {
446                let deny_all = PermissionsPolicy::deny_all();
447                match self.set_vm_permissions(vm_id, &deny_all) {
448                    Ok(()) => Err(SidecarError::InvalidState(format!(
449                        "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
450                    ))),
451                    Err(deny_all_error) => panic!(
452                        "{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
453                    ),
454                }
455            }
456        }
457    }
458
459    pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
460        let mut stored = self.permissions.lock().map_err(|_| {
461            SidecarError::Bridge(String::from(
462                "native sidecar permission policy lock poisoned",
463            ))
464        })?;
465        stored.remove(vm_id);
466        Ok(())
467    }
468
469    pub(crate) fn static_permission_decision(
470        &self,
471        vm_id: &str,
472        capability: &str,
473        domain: &str,
474        resource: Option<&str>,
475    ) -> Option<PermissionDecision> {
476        let stored = self.permissions.lock().ok()?;
477        let permissions = stored.get(vm_id)?;
478        let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
479        Some(permission_mode_to_kernel_decision(mode, capability))
480    }
481}
482
483pub(crate) fn evaluate_permissions_policy(
484    permissions: &PermissionsPolicy,
485    domain: &str,
486    capability: &str,
487    resource: Option<&str>,
488) -> PermissionMode {
489    match domain {
490        "fs" => evaluate_fs_permission_scope(
491            permissions.fs.as_ref(),
492            capability_operation(capability, domain),
493            resource,
494        ),
495        "network" => evaluate_pattern_permission_scope(
496            permissions.network.as_ref(),
497            capability_operation(capability, domain),
498            resource,
499        ),
500        "child_process" => evaluate_pattern_permission_scope(
501            permissions.child_process.as_ref(),
502            capability_operation(capability, domain),
503            resource,
504        ),
505        "process" => evaluate_pattern_permission_scope(
506            permissions.process.as_ref(),
507            capability_operation(capability, domain),
508            resource,
509        ),
510        "env" => evaluate_pattern_permission_scope(
511            permissions.env.as_ref(),
512            capability_operation(capability, domain),
513            resource,
514        ),
515        "binding" => evaluate_pattern_permission_scope(
516            permissions.binding.as_ref(),
517            capability_operation(capability, domain),
518            resource,
519        ),
520        _ => PermissionMode::Deny,
521    }
522}
523
524fn evaluate_fs_permission_scope(
525    scope: Option<&FsPermissionScope>,
526    operation: &str,
527    resource: Option<&str>,
528) -> PermissionMode {
529    match scope {
530        Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
531        Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
532            let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
533            for rule in &rules.rules {
534                if fs_rule_matches(rule, operation, resource) {
535                    mode = rule.mode.clone();
536                }
537            }
538            mode
539        }
540        None => PermissionMode::Deny,
541    }
542}
543
544fn evaluate_pattern_permission_scope(
545    scope: Option<&PatternPermissionScope>,
546    operation: &str,
547    resource: Option<&str>,
548) -> PermissionMode {
549    match scope {
550        Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
551        Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
552            let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
553            for rule in &rules.rules {
554                if pattern_rule_matches(rule, operation, resource) {
555                    mode = rule.mode.clone();
556                }
557            }
558            mode
559        }
560        None => PermissionMode::Deny,
561    }
562}
563
564fn fs_rule_matches(
565    rule: &crate::protocol::FsPermissionRule,
566    operation: &str,
567    resource: Option<&str>,
568) -> bool {
569    let operations_match = permission_operation_matches(&rule.operations, operation);
570    let paths_match = permission_resource_matches(&rule.paths, resource);
571    operations_match && paths_match
572}
573
574fn pattern_rule_matches(
575    rule: &PatternPermissionRule,
576    operation: &str,
577    resource: Option<&str>,
578) -> bool {
579    let operations_match = permission_operation_matches(&rule.operations, operation);
580    let patterns_match = permission_resource_matches(&rule.patterns, resource);
581    operations_match && patterns_match
582}
583
584fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
585    candidates
586        .iter()
587        .any(|candidate| candidate == "*" || candidate == operation)
588}
589
590fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
591    resource.is_some_and(|value| {
592        patterns
593            .iter()
594            .any(|pattern| permission_glob_matches(pattern, value))
595    })
596}
597
598pub(crate) fn validate_permissions_policy(
599    permissions: &PermissionsPolicy,
600) -> Result<(), SidecarError> {
601    if let Some(scope) = permissions.fs.as_ref() {
602        validate_fs_permission_scope("fs", scope)?;
603    }
604    if let Some(scope) = permissions.network.as_ref() {
605        validate_pattern_permission_scope("network", scope)?;
606    }
607    if let Some(scope) = permissions.child_process.as_ref() {
608        validate_pattern_permission_scope("child_process", scope)?;
609    }
610    if let Some(scope) = permissions.process.as_ref() {
611        validate_pattern_permission_scope("process", scope)?;
612    }
613    if let Some(scope) = permissions.env.as_ref() {
614        validate_pattern_permission_scope("env", scope)?;
615    }
616    if let Some(scope) = permissions.binding.as_ref() {
617        validate_pattern_permission_scope("binding", scope)?;
618    }
619    Ok(())
620}
621
622fn validate_fs_permission_scope(
623    domain: &str,
624    scope: &FsPermissionScope,
625) -> Result<(), SidecarError> {
626    let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
627        return Ok(());
628    };
629
630    for (index, rule) in rule_set.rules.iter().enumerate() {
631        validate_permission_rule_field(
632            &rule.operations,
633            &format!("{domain}.rules[{index}].operations"),
634        )?;
635        validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
636    }
637
638    Ok(())
639}
640
641fn validate_pattern_permission_scope(
642    domain: &str,
643    scope: &PatternPermissionScope,
644) -> Result<(), SidecarError> {
645    let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
646        return Ok(());
647    };
648
649    for (index, rule) in rule_set.rules.iter().enumerate() {
650        validate_permission_rule_field(
651            &rule.operations,
652            &format!("{domain}.rules[{index}].operations"),
653        )?;
654        validate_permission_rule_field(
655            &rule.patterns,
656            &format!("{domain}.rules[{index}].patterns"),
657        )?;
658    }
659
660    Ok(())
661}
662
663fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
664    if values.is_empty() {
665        return Err(SidecarError::InvalidState(format!(
666            "invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
667        )));
668    }
669    Ok(())
670}
671
672fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
673    capability
674        .strip_prefix(domain)
675        .and_then(|value| value.strip_prefix('.'))
676        .unwrap_or("")
677}
678
679fn permission_mode_to_kernel_decision(
680    mode: PermissionMode,
681    capability: &str,
682) -> PermissionDecision {
683    match mode {
684        PermissionMode::Allow => PermissionDecision::allow(),
685        PermissionMode::Ask => {
686            PermissionDecision::deny(format!("permission prompt required for {capability}"))
687        }
688        PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
689    }
690}
691
692pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
693    match access {
694        FilesystemAccess::Read => "fs.read",
695        FilesystemAccess::Write => "fs.write",
696        FilesystemAccess::Stat => "fs.stat",
697        FilesystemAccess::ReadDir => "fs.readdir",
698        FilesystemAccess::CreateDir => "fs.create_dir",
699        FilesystemAccess::Remove => "fs.rm",
700        FilesystemAccess::Rename => "fs.rename",
701        FilesystemAccess::Symlink => "fs.symlink",
702        FilesystemAccess::ReadLink => "fs.readlink",
703        FilesystemAccess::Chmod => "fs.chmod",
704        FilesystemAccess::Truncate => "fs.truncate",
705    }
706}
707
708fn network_permission_capability(operation: NetworkOperation) -> &'static str {
709    match operation {
710        NetworkOperation::Fetch => "network.fetch",
711        NetworkOperation::Http => "network.http",
712        NetworkOperation::Dns => "network.dns",
713        NetworkOperation::Listen => "network.listen",
714    }
715}
716
717fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
718    match operation {
719        EnvironmentOperation::Read => "env.read",
720        EnvironmentOperation::Write => "env.write",
721    }
722}
723
724fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
725    match request.command.as_str() {
726        "node" => request
727            .env
728            .keys()
729            .any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
730        "wasm" => request
731            .env
732            .keys()
733            .any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
734        "python" => request.env.keys().any(|key| {
735            INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
736                .iter()
737                .any(|prefix| key.starts_with(prefix))
738        }),
739        _ => false,
740    }
741}
742
743fn ownership_matches_process_event(
744    ownership: &OwnershipScope,
745    event: &ProcessEventEnvelope,
746) -> bool {
747    match ownership {
748        OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
749        OwnershipScope::SessionOwnership(inner) => {
750            inner.connection_id == event.connection_id && inner.session_id == event.session_id
751        }
752        OwnershipScope::VmOwnership(inner) => {
753            inner.connection_id == event.connection_id
754                && inner.session_id == event.session_id
755                && inner.vm_id == event.vm_id
756        }
757    }
758}
759
760fn public_process_event_matches_ownership<B>(
761    sidecar: &NativeSidecar<B>,
762    ownership: &OwnershipScope,
763    event: &ProcessEventEnvelope,
764) -> bool
765where
766    B: NativeSidecarBridge + Send + 'static,
767    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
768{
769    if !ownership_matches_process_event(ownership, event) {
770        return false;
771    }
772
773    if event.process_id.contains('/') {
774        return false;
775    }
776
777    // Stale queued events must still be drained through handle_process_event_envelope()
778    // so the sidecar can emit the expected fail-closed log when teardown wins the race.
779    let _ = sidecar;
780    true
781}
782
783fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
784    let mut context = Context::from_waker(Waker::noop());
785    match future.poll(&mut context) {
786        Poll::Ready(output) => Some(output),
787        Poll::Pending => None,
788    }
789}
790
791// ConnectionState, SessionState, VmConfiguration, VmState moved to crate::state
792
793// JavascriptSocketPathContext, JavascriptSocketFamily, VmListenPolicy moved to crate::state
794
795impl JavascriptSocketPathContext {
796    pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
797        self.loopback_exempt_ports.contains(&port)
798            || self
799                .tcp_loopback_guest_to_host_ports
800                .keys()
801                .any(|(_, guest_port)| *guest_port == port)
802    }
803
804    pub(crate) fn translate_tcp_loopback_port(
805        &self,
806        family: JavascriptSocketFamily,
807        port: u16,
808    ) -> Option<u16> {
809        self.tcp_loopback_guest_to_host_ports
810            .get(&(family, port))
811            .copied()
812    }
813
814    pub(crate) fn http_loopback_target(
815        &self,
816        family: JavascriptSocketFamily,
817        port: u16,
818    ) -> Option<&crate::state::JavascriptHttpLoopbackTarget> {
819        self.http_loopback_targets.get(&(family, port))
820    }
821
822    pub(crate) fn translate_udp_loopback_port(
823        &self,
824        family: JavascriptSocketFamily,
825        port: u16,
826    ) -> Option<u16> {
827        self.udp_loopback_guest_to_host_ports
828            .get(&(family, port))
829            .copied()
830    }
831
832    pub(crate) fn guest_udp_port_for_host_port(
833        &self,
834        family: JavascriptSocketFamily,
835        port: u16,
836    ) -> Option<u16> {
837        self.udp_loopback_host_to_guest_ports
838            .get(&(family, port))
839            .copied()
840    }
841}
842
843// ActiveProcess, NetworkResourceCounts moved to crate::state
844
845pub struct NativeSidecar<B> {
846    pub(crate) config: NativeSidecarConfig,
847    pub(crate) bridge: SharedBridge<B>,
848    pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
849    pub(crate) cache_root: PathBuf,
850    pub(crate) javascript_engine: JavascriptExecutionEngine,
851    pub(crate) python_engine: PythonExecutionEngine,
852    pub(crate) wasm_engine: WasmExecutionEngine,
853    pub(crate) next_connection_id: usize,
854    pub(crate) next_session_id: usize,
855    pub(crate) next_vm_id: usize,
856    pub(crate) next_sidecar_request_id: RequestId,
857    pub(crate) connections: BTreeMap<String, ConnectionState>,
858    pub(crate) sessions: BTreeMap<String, SessionState>,
859    pub(crate) vms: BTreeMap<String, VmState>,
860    #[allow(dead_code)]
861    pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
862    pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
863    pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
864    pub(crate) pending_sidecar_responses: SidecarResponseTracker,
865    pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
866    pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
867    pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
868    pub(crate) completed_sidecar_responses_gauge: Arc<QueueGauge>,
869    pub(crate) pending_process_events_gauge: Arc<QueueGauge>,
870    pub(crate) pending_sidecar_responses_gauge: Arc<QueueGauge>,
871    pub(crate) outbound_sidecar_requests_gauge: Arc<QueueGauge>,
872    pub(crate) sidecar_requests: SharedSidecarRequestClient,
873    pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
874    pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
875    pub(crate) extension_process_output_buffers:
876        BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
877}
878
879#[derive(Debug)]
880pub(crate) struct ExtensionSessionResources {
881    pub(crate) ownership: OwnershipScope,
882    pub(crate) process_ids: BTreeSet<String>,
883    pub(crate) vm_ids: BTreeSet<String>,
884}
885
886impl<B> fmt::Debug for NativeSidecar<B> {
887    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888        f.debug_struct("NativeSidecar")
889            .field("config", &self.config)
890            .field("cache_root", &self.cache_root)
891            .field("next_connection_id", &self.next_connection_id)
892            .field("next_session_id", &self.next_session_id)
893            .field("next_vm_id", &self.next_vm_id)
894            .field("connection_count", &self.connections.len())
895            .field("session_count", &self.sessions.len())
896            .field("vm_count", &self.vms.len())
897            .field("extension_session_count", &self.extension_sessions.len())
898            .field(
899                "extension_process_output_buffer_count",
900                &self.extension_process_output_buffers.len(),
901            )
902            .finish()
903    }
904}
905
906impl<B> NativeSidecar<B>
907where
908    B: NativeSidecarBridge + Send + 'static,
909    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
910{
911    pub fn new(bridge: B) -> Result<Self, SidecarError> {
912        Self::with_config(bridge, NativeSidecarConfig::default())
913    }
914
915    pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
916        if matches!(config.expected_auth_token.as_deref(), Some("")) {
917            return Err(SidecarError::InvalidState(String::from(
918                "native sidecar expected_auth_token must not be empty",
919            )));
920        }
921
922        let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
923            std::env::temp_dir().join(format!(
924                "{}-{}",
925                config.sidecar_id,
926                SystemTime::now()
927                    .duration_since(UNIX_EPOCH)
928                    .expect("system time before unix epoch")
929                    .as_nanos()
930            ))
931        });
932        fs::create_dir_all(&cache_root).map_err(|error| {
933            SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
934        })?;
935
936        let bridge = SharedBridge::new(bridge);
937        let mount_plugins = build_mount_plugin_registry::<B>()?;
938        let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
939
940        Ok(Self {
941            config,
942            bridge,
943            mount_plugins,
944            cache_root,
945            javascript_engine: JavascriptExecutionEngine::default(),
946            python_engine: PythonExecutionEngine::default(),
947            wasm_engine: WasmExecutionEngine::default(),
948            next_connection_id: 0,
949            next_session_id: 0,
950            next_vm_id: 0,
951            next_sidecar_request_id: -1,
952            connections: BTreeMap::new(),
953            sessions: BTreeMap::new(),
954            vms: BTreeMap::new(),
955            process_event_sender,
956            process_event_receiver: Some(process_event_receiver),
957            pending_process_events: VecDeque::new(),
958            pending_sidecar_responses: SidecarResponseTracker::default(),
959            outbound_sidecar_requests: VecDeque::new(),
960            completed_sidecar_responses: BTreeMap::new(),
961            completed_sidecar_response_order: VecDeque::new(),
962            completed_sidecar_responses_gauge: register_queue(
963                TrackedLimit::CompletedSidecarResponses,
964                MAX_COMPLETED_SIDECAR_RESPONSES,
965            ),
966            pending_process_events_gauge: register_queue(
967                TrackedLimit::PendingProcessEvents,
968                MAX_PROCESS_EVENT_QUEUE,
969            ),
970            pending_sidecar_responses_gauge: register_queue(
971                TrackedLimit::PendingSidecarResponses,
972                MAX_PENDING_SIDECAR_RESPONSES,
973            ),
974            outbound_sidecar_requests_gauge: register_queue(
975                TrackedLimit::OutboundSidecarRequests,
976                MAX_OUTBOUND_SIDECAR_REQUESTS,
977            ),
978            sidecar_requests: SharedSidecarRequestClient::default(),
979            extensions: BTreeMap::new(),
980            extension_sessions: BTreeMap::new(),
981            extension_process_output_buffers: BTreeMap::new(),
982        })
983    }
984
985    pub fn with_config_and_extensions(
986        bridge: B,
987        config: NativeSidecarConfig,
988        extensions: Vec<Box<dyn Extension>>,
989    ) -> Result<Self, SidecarError> {
990        let mut sidecar = Self::with_config(bridge, config)?;
991        for extension in extensions {
992            sidecar.register_extension(extension)?;
993        }
994        Ok(sidecar)
995    }
996
997    pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
998        self.extension_sessions.retain(|_, resources| {
999            resources.process_ids.remove(process_id);
1000            !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
1001        });
1002    }
1003
1004    pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
1005        self.extension_sessions.retain(|_, resources| {
1006            if matches!(
1007                &resources.ownership,
1008                OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
1009            ) {
1010                resources.process_ids.clear();
1011            }
1012            resources.vm_ids.remove(vm_id);
1013            !resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
1014        });
1015    }
1016
1017    pub(crate) fn capture_extension_process_output_event(
1018        &mut self,
1019        vm_id: &str,
1020        process_id: &str,
1021        event: &ActiveExecutionEvent,
1022    ) -> bool {
1023        let Some(buffer) = self
1024            .extension_process_output_buffers
1025            .get_mut(&(vm_id.to_string(), process_id.to_string()))
1026        else {
1027            return false;
1028        };
1029        match event {
1030            ActiveExecutionEvent::Stdout(chunk) => {
1031                buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1032                true
1033            }
1034            ActiveExecutionEvent::Stderr(chunk) => {
1035                buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
1036                true
1037            }
1038            ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
1039            | ActiveExecutionEvent::PythonVfsRpcRequest(_)
1040            | ActiveExecutionEvent::SignalState { .. }
1041            | ActiveExecutionEvent::Exited(_) => false,
1042        }
1043    }
1044
1045    fn bind_extension_process_resource(
1046        &mut self,
1047        ownership: OwnershipScope,
1048        namespace: String,
1049        ext_session_id: String,
1050        process_id: String,
1051    ) -> Result<(), SidecarError> {
1052        if ext_session_id.is_empty() {
1053            return Err(SidecarError::InvalidState(String::from(
1054                "extension session id must not be empty",
1055            )));
1056        }
1057        let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1058        self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1059        let process_exists = self
1060            .vms
1061            .get(&vm_id)
1062            .is_some_and(|vm| vm.active_processes.contains_key(&process_id));
1063        if !process_exists {
1064            return Err(SidecarError::InvalidState(format!(
1065                "VM {vm_id} has no active process {process_id}"
1066            )));
1067        }
1068
1069        let key = (namespace, ext_session_id);
1070        if let Some(resources) = self.extension_sessions.get_mut(&key) {
1071            if resources.ownership != ownership {
1072                return Err(SidecarError::InvalidState(String::from(
1073                    "extension session ownership did not match existing resources",
1074                )));
1075            }
1076            resources.process_ids.insert(process_id);
1077        } else {
1078            self.extension_sessions.insert(
1079                key,
1080                ExtensionSessionResources {
1081                    ownership,
1082                    process_ids: BTreeSet::from([process_id]),
1083                    vm_ids: BTreeSet::new(),
1084                },
1085            );
1086        }
1087        Ok(())
1088    }
1089
1090    fn bind_extension_vm_resource(
1091        &mut self,
1092        ownership: OwnershipScope,
1093        namespace: String,
1094        ext_session_id: String,
1095    ) -> Result<(), SidecarError> {
1096        if ext_session_id.is_empty() {
1097            return Err(SidecarError::InvalidState(String::from(
1098                "extension session id must not be empty",
1099            )));
1100        }
1101        let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
1102        self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
1103
1104        let key = (namespace, ext_session_id);
1105        if let Some(resources) = self.extension_sessions.get_mut(&key) {
1106            if resources.ownership != ownership {
1107                return Err(SidecarError::InvalidState(String::from(
1108                    "extension session ownership did not match existing resources",
1109                )));
1110            }
1111            resources.vm_ids.insert(vm_id);
1112        } else {
1113            self.extension_sessions.insert(
1114                key,
1115                ExtensionSessionResources {
1116                    ownership,
1117                    process_ids: BTreeSet::new(),
1118                    vm_ids: BTreeSet::from([vm_id]),
1119                },
1120            );
1121        }
1122        Ok(())
1123    }
1124
1125    pub fn sidecar_id(&self) -> &str {
1126        &self.config.sidecar_id
1127    }
1128
1129    pub fn with_bridge_mut<T>(
1130        &self,
1131        operation: impl FnOnce(&mut B) -> T,
1132    ) -> Result<T, SidecarError> {
1133        self.bridge.inspect(operation)
1134    }
1135
1136    pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
1137        self.sidecar_requests.set_transport(transport);
1138    }
1139
1140    pub fn register_extension(
1141        &mut self,
1142        extension: Box<dyn Extension>,
1143    ) -> Result<(), SidecarError> {
1144        let namespace = extension.namespace().to_owned();
1145        if namespace.is_empty() {
1146            return Err(SidecarError::InvalidState(String::from(
1147                "extension namespace must not be empty",
1148            )));
1149        }
1150        if self.extensions.contains_key(&namespace) {
1151            return Err(SidecarError::Conflict(format!(
1152                "extension namespace {namespace} is already registered",
1153            )));
1154        }
1155        self.extensions.insert(namespace, Arc::from(extension));
1156        Ok(())
1157    }
1158
1159    pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
1160    where
1161        F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1162            + Send
1163            + Sync
1164            + 'static,
1165    {
1166        struct HandlerTransport<F>(F);
1167
1168        impl<F> SidecarRequestTransport for HandlerTransport<F>
1169        where
1170            F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
1171                + Send
1172                + Sync
1173                + 'static,
1174        {
1175            fn send_request(
1176                &self,
1177                request: SidecarRequestFrame,
1178                _timeout: Duration,
1179            ) -> Result<SidecarResponseFrame, SidecarError> {
1180                let payload = (self.0)(request.clone())?;
1181                Ok(SidecarResponseFrame::new(
1182                    request.request_id,
1183                    request.ownership,
1184                    payload,
1185                ))
1186            }
1187        }
1188
1189        self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
1190    }
1191
1192    pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
1193    where
1194        F: Fn(
1195                crate::wire::SidecarRequestFrame,
1196            ) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
1197            + Send
1198            + Sync
1199            + 'static,
1200    {
1201        self.set_sidecar_request_handler(move |request| {
1202            let request = crate::wire::sidecar_request_frame_from_compat(request)
1203                .map_err(wire_protocol_error)?;
1204            let response = handler(request)?;
1205            let response = crate::wire::sidecar_response_frame_to_compat(response)
1206                .map_err(wire_protocol_error)?;
1207            Ok(response.payload)
1208        });
1209    }
1210
1211    pub(crate) fn queue_pending_process_event(
1212        &mut self,
1213        envelope: ProcessEventEnvelope,
1214    ) -> Result<(), SidecarError> {
1215        if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1216            return Err(process_event_queue_overflow_error());
1217        }
1218        self.pending_process_events.push_back(envelope);
1219        self.pending_process_events_gauge
1220            .observe_depth(self.pending_process_events.len());
1221        Ok(())
1222    }
1223
1224    pub(crate) fn queue_front_pending_process_event(
1225        &mut self,
1226        envelope: ProcessEventEnvelope,
1227    ) -> Result<(), SidecarError> {
1228        if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
1229            return Err(process_event_queue_overflow_error());
1230        }
1231        self.pending_process_events.push_front(envelope);
1232        self.pending_process_events_gauge
1233            .observe_depth(self.pending_process_events.len());
1234        Ok(())
1235    }
1236
1237    pub(crate) fn pending_process_event_capacity(&self) -> usize {
1238        MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
1239    }
1240
1241    pub fn dispatch_blocking(
1242        &mut self,
1243        request: RequestFrame,
1244    ) -> Result<DispatchResult, SidecarError> {
1245        let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
1246        if matches!(
1247            request.payload,
1248            RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
1249        ) && !inside_runtime
1250        {
1251            return tokio::runtime::Builder::new_current_thread()
1252                .enable_all()
1253                .build()
1254                .expect("sidecar dispatch runtime")
1255                .block_on(self.dispatch(request));
1256        }
1257
1258        let mut future = std::pin::pin!(self.dispatch(request));
1259        match poll_future_once(future.as_mut()) {
1260            Some(result) => result,
1261            None if inside_runtime => Err(SidecarError::InvalidState(String::from(
1262                "dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
1263            ))),
1264            None => tokio::runtime::Builder::new_current_thread()
1265                .enable_all()
1266                .build()
1267                .expect("sidecar dispatch runtime")
1268                .block_on(future),
1269        }
1270    }
1271
1272    pub fn dispatch_wire_blocking(
1273        &mut self,
1274        request: crate::wire::RequestFrame,
1275    ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1276        let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1277        let result = self.dispatch_blocking(request)?;
1278        crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1279    }
1280
1281    pub fn poll_event_blocking(
1282        &mut self,
1283        ownership: &OwnershipScope,
1284        timeout: Duration,
1285    ) -> Result<Option<EventFrame>, SidecarError> {
1286        tokio::runtime::Builder::new_current_thread()
1287            .enable_all()
1288            .build()
1289            .expect("sidecar poll runtime")
1290            .block_on(self.poll_event(ownership, timeout))
1291    }
1292
1293    pub fn poll_event_wire_blocking(
1294        &mut self,
1295        ownership: &crate::wire::OwnershipScope,
1296        timeout: Duration,
1297    ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1298        let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1299        self.poll_event_blocking(&ownership, timeout)?
1300            .map(crate::wire::event_frame_from_compat)
1301            .transpose()
1302            .map_err(wire_protocol_error)
1303    }
1304
1305    pub fn close_session_blocking(
1306        &mut self,
1307        connection_id: &str,
1308        session_id: &str,
1309    ) -> Result<Vec<EventFrame>, SidecarError> {
1310        tokio::runtime::Builder::new_current_thread()
1311            .enable_all()
1312            .build()
1313            .expect("sidecar close-session runtime")
1314            .block_on(self.close_session(connection_id, session_id))
1315    }
1316
1317    pub fn remove_connection_blocking(
1318        &mut self,
1319        connection_id: &str,
1320    ) -> Result<Vec<EventFrame>, SidecarError> {
1321        tokio::runtime::Builder::new_current_thread()
1322            .enable_all()
1323            .build()
1324            .expect("sidecar remove-connection runtime")
1325            .block_on(self.remove_connection(connection_id))
1326    }
1327
1328    pub fn dispose_vm_internal_blocking(
1329        &mut self,
1330        connection_id: &str,
1331        session_id: &str,
1332        vm_id: &str,
1333        reason: DisposeReason,
1334    ) -> Result<Vec<EventFrame>, SidecarError> {
1335        tokio::runtime::Builder::new_current_thread()
1336            .enable_all()
1337            .build()
1338            .expect("sidecar dispose-vm runtime")
1339            .block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
1340    }
1341
1342    pub async fn dispatch(
1343        &mut self,
1344        request: RequestFrame,
1345    ) -> Result<DispatchResult, SidecarError> {
1346        if let Err(error) = self.ensure_request_within_frame_limit(&request) {
1347            return Ok(DispatchResult {
1348                response: self.reject(&request, error_code(&error), &error.to_string()),
1349                events: Vec::new(),
1350            });
1351        }
1352
1353        let result = match request.payload.clone() {
1354            RequestPayload::Authenticate(payload) => {
1355                self.authenticate_connection(&request, payload).await
1356            }
1357            RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
1358            RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
1359            RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
1360            RequestPayload::BootstrapRootFilesystem(payload) => {
1361                self.bootstrap_root_filesystem(&request, payload.entries)
1362                    .await
1363            }
1364            RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
1365            RequestPayload::RegisterHostCallbacks(payload) => {
1366                register_host_callbacks(self, &request, payload)
1367            }
1368            RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
1369            RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
1370            RequestPayload::ImportSnapshot(payload) => {
1371                self.import_snapshot(&request, payload).await
1372            }
1373            RequestPayload::ExportSnapshot(payload) => {
1374                self.export_snapshot(&request, payload).await
1375            }
1376            RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
1377            RequestPayload::GuestFilesystemCall(payload) => {
1378                self.guest_filesystem_call(&request, payload).await
1379            }
1380            RequestPayload::SnapshotRootFilesystem(payload) => {
1381                self.snapshot_root_filesystem(&request, payload).await
1382            }
1383            RequestPayload::Execute(payload) => self.execute(&request, payload).await,
1384            RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
1385            RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
1386            RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
1387            RequestPayload::GetProcessSnapshot(payload) => {
1388                self.get_process_snapshot(&request, payload).await
1389            }
1390            RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
1391            RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
1392            RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
1393            RequestPayload::GetSignalState(payload) => {
1394                self.get_signal_state(&request, payload).await
1395            }
1396            RequestPayload::GetZombieTimerCount(payload) => {
1397                self.get_zombie_timer_count(&request, payload).await
1398            }
1399            RequestPayload::HostFilesystemCall(_)
1400            | RequestPayload::PersistenceLoad(_)
1401            | RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
1402                response: self.reject(
1403                    &request,
1404                    "unsupported_direction",
1405                    "host callback request categories are sidecar-to-host only in this scaffold",
1406                ),
1407                events: Vec::new(),
1408            }),
1409            RequestPayload::Ext(payload) => {
1410                self.dispatch_extension_request(&request, payload).await
1411            }
1412        };
1413
1414        match result {
1415            Ok(dispatch) => Ok(dispatch),
1416            Err(error @ SidecarError::Io(_)) => Err(error),
1417            Err(error) => Ok(DispatchResult {
1418                response: self.reject(&request, error_code(&error), &error.to_string()),
1419                events: Vec::new(),
1420            }),
1421        }
1422    }
1423
1424    pub async fn dispatch_wire(
1425        &mut self,
1426        request: crate::wire::RequestFrame,
1427    ) -> Result<crate::wire::WireDispatchResult, SidecarError> {
1428        let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
1429        let result = self.dispatch(request).await?;
1430        crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
1431    }
1432
1433    pub async fn poll_event_wire(
1434        &mut self,
1435        ownership: &crate::wire::OwnershipScope,
1436        timeout: Duration,
1437    ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
1438        let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
1439        self.poll_event(&ownership, timeout)
1440            .await?
1441            .map(crate::wire::event_frame_from_compat)
1442            .transpose()
1443            .map_err(wire_protocol_error)
1444    }
1445
1446    async fn dispatch_extension_request(
1447        &mut self,
1448        request: &RequestFrame,
1449        envelope: ExtEnvelope,
1450    ) -> Result<DispatchResult, SidecarError> {
1451        let namespace = envelope.namespace;
1452        let Some(extension) = self.extensions.get(&namespace).cloned() else {
1453            return Ok(DispatchResult {
1454                response: self.reject(
1455                    request,
1456                    "unknown_extension",
1457                    &format!("no extension registered for namespace {namespace}"),
1458                ),
1459                events: Vec::new(),
1460            });
1461        };
1462        let snapshot = ExtensionSnapshot::new(
1463            namespace.clone(),
1464            request.ownership.clone(),
1465            self.sidecar_requests.clone(),
1466        );
1467        let ctx = ExtensionContext::new(snapshot, self);
1468        let response = extension.handle_request(ctx, envelope.payload).await?;
1469        Ok(DispatchResult {
1470            response: self.respond(
1471                request,
1472                ResponsePayload::ExtResult(ExtEnvelope {
1473                    namespace,
1474                    payload: response.payload,
1475                }),
1476            ),
1477            events: response.events,
1478        })
1479    }
1480
1481    pub async fn poll_event(
1482        &mut self,
1483        ownership: &OwnershipScope,
1484        timeout: Duration,
1485    ) -> Result<Option<EventFrame>, SidecarError> {
1486        let deadline = Instant::now() + timeout;
1487        loop {
1488            if let Some(index) = self
1489                .pending_process_events
1490                .iter()
1491                .position(|event| public_process_event_matches_ownership(self, ownership, event))
1492            {
1493                let Some(envelope) = self.pending_process_events.remove(index) else {
1494                    continue;
1495                };
1496                if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1497                    return Ok(Some(frame));
1498                }
1499                continue;
1500            }
1501
1502            if !timeout.is_zero() {
1503                let _ = self.pump_process_events(ownership).await?;
1504            }
1505
1506            let queued_envelopes = {
1507                let pending_capacity = self.pending_process_event_capacity();
1508                let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
1509                    SidecarError::InvalidState(String::from("process event receiver unavailable"))
1510                })?;
1511                let mut queued = Vec::new();
1512                loop {
1513                    if queued.len() >= pending_capacity {
1514                        if receiver.is_empty() {
1515                            break;
1516                        }
1517                        return Err(process_event_queue_overflow_error());
1518                    }
1519                    match receiver.try_recv() {
1520                        Ok(envelope) => queued.push(envelope),
1521                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1522                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
1523                    }
1524                }
1525                queued
1526            };
1527
1528            let mut matching_envelope = None;
1529            for envelope in queued_envelopes {
1530                if matching_envelope.is_none()
1531                    && public_process_event_matches_ownership(self, ownership, &envelope)
1532                {
1533                    matching_envelope = Some(envelope);
1534                } else {
1535                    self.queue_pending_process_event(envelope)?;
1536                }
1537            }
1538
1539            if let Some(envelope) = matching_envelope {
1540                if let Some(frame) = self.handle_process_event_envelope(envelope)? {
1541                    return Ok(Some(frame));
1542                }
1543                continue;
1544            }
1545
1546            if Instant::now() >= deadline {
1547                return Ok(None);
1548            }
1549
1550            let remaining = deadline.saturating_duration_since(Instant::now());
1551            time::sleep(remaining.min(Duration::from_millis(10))).await;
1552        }
1553    }
1554
1555    pub(crate) fn handle_process_event_envelope(
1556        &mut self,
1557        envelope: ProcessEventEnvelope,
1558    ) -> Result<Option<EventFrame>, SidecarError> {
1559        let ProcessEventEnvelope {
1560            connection_id,
1561            session_id,
1562            vm_id,
1563            process_id,
1564            event,
1565        } = envelope;
1566
1567        if matches!(event, ActiveExecutionEvent::Exited(_)) {
1568            let mut trailing = Vec::new();
1569            let mut deferred = VecDeque::new();
1570            while let Some(pending) = self.pending_process_events.pop_front() {
1571                if pending.vm_id == vm_id
1572                    && pending.process_id == process_id
1573                    && !matches!(pending.event, ActiveExecutionEvent::Exited(_))
1574                {
1575                    trailing.push(pending.event);
1576                } else {
1577                    deferred.push_back(pending);
1578                }
1579            }
1580            self.pending_process_events = deferred;
1581            let drain_limit = self
1582                .pending_process_event_capacity()
1583                .saturating_sub(trailing.len().saturating_add(1));
1584            trailing.extend(
1585                self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
1586                    .into_iter()
1587                    .filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
1588            );
1589
1590            if !trailing.is_empty() {
1591                if self.pending_process_event_capacity() < trailing.len() {
1592                    return Err(process_event_queue_overflow_error());
1593                }
1594                let emit_now = if self.pending_process_event_capacity() == trailing.len() {
1595                    Some(trailing.remove(0))
1596                } else {
1597                    None
1598                };
1599                self.queue_front_pending_process_event(ProcessEventEnvelope {
1600                    connection_id: connection_id.clone(),
1601                    session_id: session_id.clone(),
1602                    vm_id: vm_id.clone(),
1603                    process_id: process_id.clone(),
1604                    event,
1605                })?;
1606                for event in trailing.into_iter().rev() {
1607                    self.queue_front_pending_process_event(ProcessEventEnvelope {
1608                        connection_id: connection_id.clone(),
1609                        session_id: session_id.clone(),
1610                        vm_id: vm_id.clone(),
1611                        process_id: process_id.clone(),
1612                        event,
1613                    })?;
1614                }
1615                if let Some(event) = emit_now {
1616                    return self.handle_execution_event(&vm_id, &process_id, event);
1617                }
1618                return Ok(None);
1619            }
1620        }
1621
1622        self.handle_execution_event(&vm_id, &process_id, event)
1623    }
1624
1625    // try_poll_event moved to crate::execution
1626
1627    pub async fn close_session(
1628        &mut self,
1629        connection_id: &str,
1630        session_id: &str,
1631    ) -> Result<Vec<EventFrame>, SidecarError> {
1632        self.dispose_session(connection_id, session_id, DisposeReason::Requested)
1633            .await
1634    }
1635
1636    pub async fn remove_connection(
1637        &mut self,
1638        connection_id: &str,
1639    ) -> Result<Vec<EventFrame>, SidecarError> {
1640        self.require_authenticated_connection(connection_id)?;
1641
1642        let session_ids = self
1643            .connections
1644            .get(connection_id)
1645            .expect("authenticated connection should exist")
1646            .sessions
1647            .iter()
1648            .cloned()
1649            .collect::<Vec<_>>();
1650
1651        let mut events = Vec::new();
1652        for session_id in session_ids {
1653            events.extend(
1654                self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
1655                    .await?,
1656            );
1657        }
1658
1659        self.connections.remove(connection_id);
1660        Ok(events)
1661    }
1662
1663    async fn authenticate_connection(
1664        &mut self,
1665        request: &RequestFrame,
1666        payload: crate::protocol::AuthenticateRequest,
1667    ) -> Result<DispatchResult, SidecarError> {
1668        let _ = self.connection_id_for(&request.ownership)?;
1669        if let Err(error) = self.validate_auth_token(&payload.auth_token) {
1670            let mut fields = audit_fields([
1671                (String::from("source"), payload.client_name.clone()),
1672                (String::from("reason"), error.to_string()),
1673            ]);
1674            if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
1675                fields.insert(String::from("connection_id"), inner.connection_id.clone());
1676            }
1677            emit_security_audit_event(
1678                &self.bridge,
1679                &self.config.sidecar_id,
1680                "security.auth.failed",
1681                fields,
1682            );
1683            return Err(error);
1684        }
1685
1686        if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
1687            return Err(SidecarError::ProtocolVersionMismatch(format!(
1688                "sidecar protocol version mismatch: expected {}, got {}",
1689                crate::wire::PROTOCOL_VERSION,
1690                payload.protocol_version
1691            )));
1692        }
1693
1694        let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
1695        if payload.bridge_version != expected_bridge_version {
1696            return Err(SidecarError::BridgeVersionMismatch(format!(
1697                "bridge contract version mismatch: expected {expected_bridge_version}, got {}",
1698                payload.bridge_version
1699            )));
1700        }
1701
1702        let connection_id = self.allocate_connection_id();
1703        self.connections.insert(
1704            connection_id.clone(),
1705            ConnectionState {
1706                auth_token: payload.auth_token,
1707                sessions: BTreeSet::new(),
1708            },
1709        );
1710
1711        let response = self.response_with_ownership(
1712            request.request_id,
1713            OwnershipScope::connection(&connection_id),
1714            ResponsePayload::Authenticated(AuthenticatedResponse {
1715                sidecar_id: self.config.sidecar_id.clone(),
1716                connection_id,
1717                max_frame_bytes: self.config.max_frame_bytes as u32,
1718            }),
1719        );
1720        Ok(DispatchResult {
1721            response,
1722            events: Vec::new(),
1723        })
1724    }
1725
1726    async fn open_session(
1727        &mut self,
1728        request: &RequestFrame,
1729        payload: OpenSessionRequest,
1730    ) -> Result<DispatchResult, SidecarError> {
1731        let connection_id = self.connection_id_for(&request.ownership)?;
1732        self.require_authenticated_connection(&connection_id)?;
1733
1734        self.next_session_id += 1;
1735        let session_id = format!("session-{}", self.next_session_id);
1736        self.sessions.insert(
1737            session_id.clone(),
1738            SessionState {
1739                connection_id: connection_id.clone(),
1740                placement: payload.placement,
1741                metadata: payload.metadata.into_iter().collect(),
1742                vm_ids: BTreeSet::new(),
1743            },
1744        );
1745        self.connections
1746            .get_mut(&connection_id)
1747            .expect("authenticated connection should exist")
1748            .sessions
1749            .insert(session_id.clone());
1750
1751        Ok(DispatchResult {
1752            response: self.respond(
1753                request,
1754                ResponsePayload::SessionOpened(SessionOpenedResponse {
1755                    session_id,
1756                    owner_connection_id: connection_id,
1757                }),
1758            ),
1759            events: Vec::new(),
1760        })
1761    }
1762
1763    // create_vm, dispose_vm, bootstrap_root_filesystem, configure_vm moved to crate::vm
1764
1765    async fn guest_filesystem_call(
1766        &mut self,
1767        request: &RequestFrame,
1768        payload: GuestFilesystemCallRequest,
1769    ) -> Result<DispatchResult, SidecarError> {
1770        filesystem_guest_filesystem_call(self, request, payload).await
1771    }
1772
1773    // snapshot_root_filesystem moved to crate::vm
1774
1775    // execute, write_stdin, close_stdin, kill_process, find_listener, find_bound_udp,
1776    // get_signal_state, get_zombie_timer_count moved to crate::execution
1777
1778    async fn dispose_session(
1779        &mut self,
1780        connection_id: &str,
1781        session_id: &str,
1782        reason: DisposeReason,
1783    ) -> Result<Vec<EventFrame>, SidecarError> {
1784        self.require_owned_session(connection_id, session_id)?;
1785
1786        let vm_ids = self
1787            .sessions
1788            .get(session_id)
1789            .expect("owned session should exist")
1790            .vm_ids
1791            .iter()
1792            .cloned()
1793            .collect::<Vec<_>>();
1794
1795        let mut events = Vec::new();
1796        for vm_id in vm_ids {
1797            events.extend(
1798                self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
1799                    .await?,
1800            );
1801        }
1802
1803        self.sessions.remove(session_id);
1804        if let Some(connection) = self.connections.get_mut(connection_id) {
1805            connection.sessions.remove(session_id);
1806        }
1807        Ok(events)
1808    }
1809
1810    // dispose_vm_internal, terminate_vm_processes, wait_for_vm_processes_to_exit moved to crate::vm
1811
1812    // kill_process_internal, handle_execution_event, handle_python_vfs_rpc_request,
1813    // resolve_javascript_child_process_execution, spawn_javascript_child_process,
1814    // poll_javascript_child_process, write_javascript_child_process_stdin,
1815    // close_javascript_child_process_stdin, kill_javascript_child_process moved to crate::execution
1816
1817    pub(crate) fn handle_javascript_sync_rpc_request(
1818        &mut self,
1819        vm_id: &str,
1820        process_id: &str,
1821        request: JavascriptSyncRpcRequest,
1822    ) -> Result<(), SidecarError> {
1823        let Some(vm) = self.vms.get(vm_id) else {
1824            log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1825            return Ok(());
1826        };
1827        if !vm.active_processes.contains_key(process_id) {
1828            log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
1829            return Ok(());
1830        }
1831
1832        let response: Result<Value, SidecarError> = match request.method.as_str() {
1833            "child_process.spawn" => {
1834                let Some(vm) = self.vms.get(vm_id) else {
1835                    log_stale_process_event(
1836                        &self.bridge,
1837                        vm_id,
1838                        process_id,
1839                        "javascript sync RPC child_process.spawn",
1840                    );
1841                    return Ok(());
1842                };
1843                let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
1844                self.spawn_javascript_child_process(vm_id, process_id, payload)
1845            }
1846            "child_process.spawn_sync" => {
1847                let Some(vm) = self.vms.get(vm_id) else {
1848                    log_stale_process_event(
1849                        &self.bridge,
1850                        vm_id,
1851                        process_id,
1852                        "javascript sync RPC child_process.spawn_sync",
1853                    );
1854                    return Ok(());
1855                };
1856                let (payload, max_buffer) =
1857                    parse_javascript_child_process_spawn_request(vm, &request.args)?;
1858                self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
1859            }
1860            "child_process.poll" => {
1861                let child_process_id =
1862                    javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
1863                let wait_ms = javascript_sync_rpc_arg_u64_optional(
1864                    &request.args,
1865                    1,
1866                    "child_process.poll wait ms",
1867                )?
1868                .unwrap_or_default();
1869                self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
1870            }
1871            "child_process.write_stdin" => {
1872                let child_process_id = javascript_sync_rpc_arg_str(
1873                    &request.args,
1874                    0,
1875                    "child_process.write_stdin child id",
1876                )?;
1877                let chunk = javascript_sync_rpc_bytes_arg(
1878                    &request.args,
1879                    1,
1880                    "child_process.write_stdin chunk",
1881                )?;
1882                self.write_javascript_child_process_stdin(
1883                    vm_id,
1884                    process_id,
1885                    child_process_id,
1886                    &chunk,
1887                )?;
1888                Ok(Value::Null)
1889            }
1890            "child_process.close_stdin" => {
1891                let child_process_id = javascript_sync_rpc_arg_str(
1892                    &request.args,
1893                    0,
1894                    "child_process.close_stdin child id",
1895                )?;
1896                self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
1897                Ok(Value::Null)
1898            }
1899            "child_process.kill" => {
1900                let child_process_id =
1901                    javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
1902                let signal =
1903                    javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
1904                self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
1905                Ok(Value::Null)
1906            }
1907            "process.kill" => {
1908                let target_pid =
1909                    javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
1910                let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
1911                let parsed_signal = parse_signal(signal)?;
1912                if parsed_signal == 0 {
1913                    let Some(vm) = self.vms.get(vm_id) else {
1914                        log_stale_process_event(
1915                            &self.bridge,
1916                            vm_id,
1917                            process_id,
1918                            "javascript sync RPC process.kill",
1919                        );
1920                        return Ok(());
1921                    };
1922                    if !vm.active_processes.contains_key(process_id) {
1923                        log_stale_process_event(
1924                            &self.bridge,
1925                            vm_id,
1926                            process_id,
1927                            "javascript sync RPC process.kill",
1928                        );
1929                        return Ok(());
1930                    }
1931                    vm.kernel
1932                        .signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
1933                        .map(|()| Value::Null)
1934                        .map_err(kernel_error)
1935                } else if target_pid < 0 {
1936                    let caller_kernel_pid = {
1937                        let Some(vm) = self.vms.get(vm_id) else {
1938                            log_stale_process_event(
1939                                &self.bridge,
1940                                vm_id,
1941                                process_id,
1942                                "javascript sync RPC process.kill",
1943                            );
1944                            return Ok(());
1945                        };
1946                        let Some(caller) = vm.active_processes.get(process_id) else {
1947                            log_stale_process_event(
1948                                &self.bridge,
1949                                vm_id,
1950                                process_id,
1951                                "javascript sync RPC process.kill",
1952                            );
1953                            return Ok(());
1954                        };
1955                        caller.kernel_pid
1956                    };
1957                    let pgid = target_pid.unsigned_abs();
1958                    match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
1959                        Ok(true) => {
1960                            Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
1961                        }
1962                        Ok(false) => Ok(Value::Null),
1963                        Err(error) => Err(error),
1964                    }
1965                } else {
1966                    enum ProcessKillTarget {
1967                        SelfProcess,
1968                        Child(String),
1969                        TopLevel(String),
1970                        KernelPid(u32),
1971                    }
1972                    let target = {
1973                        let Some(vm) = self.vms.get(vm_id) else {
1974                            log_stale_process_event(
1975                                &self.bridge,
1976                                vm_id,
1977                                process_id,
1978                                "javascript sync RPC process.kill",
1979                            );
1980                            return Ok(());
1981                        };
1982                        let Some(caller) = vm.active_processes.get(process_id) else {
1983                            log_stale_process_event(
1984                                &self.bridge,
1985                                vm_id,
1986                                process_id,
1987                                "javascript sync RPC process.kill",
1988                            );
1989                            return Ok(());
1990                        };
1991                        let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
1992                            SidecarError::InvalidState("caller pid exceeds i32".into())
1993                        })?;
1994                        if caller_pid == target_pid {
1995                            ProcessKillTarget::SelfProcess
1996                        } else if let Some((child_process_id, _)) = caller
1997                            .child_processes
1998                            .iter()
1999                            .find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
2000                        {
2001                            ProcessKillTarget::Child(child_process_id.clone())
2002                        } else if let Some((target_process_id, _)) =
2003                            vm.active_processes.iter().find(|(_, process)| {
2004                                i32::try_from(process.kernel_pid) == Ok(target_pid)
2005                            })
2006                        {
2007                            ProcessKillTarget::TopLevel(target_process_id.clone())
2008                        } else {
2009                            let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
2010                                SidecarError::InvalidState(format!(
2011                                    "EINVAL: invalid process pid {target_pid}"
2012                                ))
2013                            })?;
2014                            ProcessKillTarget::KernelPid(target_kernel_pid)
2015                        }
2016                    };
2017                    match target {
2018                        ProcessKillTarget::SelfProcess => {
2019                            Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
2020                        }
2021                        ProcessKillTarget::Child(child_process_id) => {
2022                            self.kill_javascript_child_process(
2023                                vm_id,
2024                                process_id,
2025                                &child_process_id,
2026                                signal,
2027                            )?;
2028                            Ok(Value::Null)
2029                        }
2030                        ProcessKillTarget::TopLevel(target_process_id) => {
2031                            self.kill_process_internal(vm_id, &target_process_id, signal)?;
2032                            Ok(Value::Null)
2033                        }
2034                        ProcessKillTarget::KernelPid(target_kernel_pid) => {
2035                            // Grandchildren and untracked kernel processes are
2036                            // resolved VM-wide instead of failing with an
2037                            // unknown-pid error.
2038                            self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
2039                                .map(|()| Value::Null)
2040                        }
2041                    }
2042                }
2043            }
2044            "process.signal_state" => {
2045                let signal =
2046                    javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
2047                let action =
2048                    javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
2049                let mask_json =
2050                    javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
2051                let flags =
2052                    javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
2053                let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
2054                    SidecarError::InvalidState(format!(
2055                        "process.signal_state mask must be valid JSON: {error}"
2056                    ))
2057                })?;
2058                let action = match action.trim().to_ascii_lowercase().as_str() {
2059                    "default" => SignalDispositionAction::Default,
2060                    "ignore" => SignalDispositionAction::Ignore,
2061                    "user" => SignalDispositionAction::User,
2062                    other => {
2063                        return Err(SidecarError::InvalidState(format!(
2064                            "unsupported process.signal_state action {other}"
2065                        )));
2066                    }
2067                };
2068                let Some(vm) = self.vms.get_mut(vm_id) else {
2069                    log_stale_process_event(
2070                        &self.bridge,
2071                        vm_id,
2072                        process_id,
2073                        "javascript sync RPC process.signal_state",
2074                    );
2075                    return Ok(());
2076                };
2077                if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
2078                    let remove_process_entry = vm
2079                        .signal_states
2080                        .get_mut(process_id)
2081                        .map(|handlers| {
2082                            handlers.remove(&signal);
2083                            handlers.is_empty()
2084                        })
2085                        .unwrap_or(false);
2086                    if remove_process_entry {
2087                        vm.signal_states.remove(process_id);
2088                    }
2089                } else {
2090                    vm.signal_states
2091                        .entry(process_id.to_owned())
2092                        .or_default()
2093                        .insert(
2094                            signal,
2095                            SignalHandlerRegistration {
2096                                action,
2097                                mask,
2098                                flags,
2099                            },
2100                        );
2101                }
2102                Ok(Value::Null)
2103            }
2104            "net.http_request" => {
2105                let payload = request
2106                    .args
2107                    .first()
2108                    .cloned()
2109                    .ok_or_else(|| {
2110                        SidecarError::InvalidState(String::from(
2111                            "net.http_request requires a request payload",
2112                        ))
2113                    })
2114                    .and_then(|value| {
2115                        serde_json::from_value::<JavascriptHttpLoopbackRequest>(value).map_err(
2116                            |error| {
2117                                SidecarError::InvalidState(format!(
2118                                    "invalid net.http_request payload: {error}"
2119                                ))
2120                            },
2121                        )
2122                    })?;
2123                if !is_javascript_loopback_host(&payload.host) {
2124                    return Err(SidecarError::Execution(format!(
2125                        "EACCES: HTTP loopback request requires a loopback host, got {}",
2126                        payload.host
2127                    )));
2128                }
2129                self.bridge.require_network_access(
2130                    vm_id,
2131                    NetworkOperation::Http,
2132                    format_tcp_resource(&payload.host, payload.port),
2133                )?;
2134                let Some(vm) = self.vms.get_mut(vm_id) else {
2135                    log_stale_process_event(
2136                        &self.bridge,
2137                        vm_id,
2138                        process_id,
2139                        "javascript sync RPC net.http_request",
2140                    );
2141                    return Ok(());
2142                };
2143                let resource_limits = vm.kernel.resource_limits().clone();
2144                let socket_paths = build_javascript_socket_path_context(vm)?;
2145                let target_is_current =
2146                    [JavascriptSocketFamily::Ipv4, JavascriptSocketFamily::Ipv6]
2147                        .iter()
2148                        .any(|family| {
2149                            socket_paths
2150                                .http_loopback_target(*family, payload.port)
2151                                .is_some_and(|target| {
2152                                    target.process_id == payload.process_id
2153                                        && target.server_id == payload.server_id
2154                                })
2155                        });
2156                if !target_is_current {
2157                    return Err(SidecarError::InvalidState(format!(
2158                        "unknown HTTP loopback target {}:{} for server {} in process {}",
2159                        payload.host, payload.port, payload.server_id, payload.process_id
2160                    )));
2161                }
2162                let Some(target_process) = vm.active_processes.get_mut(&payload.process_id) else {
2163                    return Err(SidecarError::InvalidState(format!(
2164                        "unknown HTTP loopback process {}",
2165                        payload.process_id
2166                    )));
2167                };
2168                dispatch_loopback_http_request(LoopbackHttpDispatchRequest {
2169                    bridge: &self.bridge,
2170                    vm_id,
2171                    dns: &vm.dns,
2172                    socket_paths: &socket_paths,
2173                    kernel: &mut vm.kernel,
2174                    process: target_process,
2175                    resource_limits: &resource_limits,
2176                    server_id: payload.server_id,
2177                    request_json: &payload.request,
2178                })
2179                .map(Value::String)
2180            }
2181            _ => {
2182                let Some(vm) = self.vms.get_mut(vm_id) else {
2183                    log_stale_process_event(
2184                        &self.bridge,
2185                        vm_id,
2186                        process_id,
2187                        "javascript sync RPC bridge dispatch",
2188                    );
2189                    return Ok(());
2190                };
2191                let resource_limits = vm.kernel.resource_limits().clone();
2192                let network_counts = vm_network_resource_counts(vm);
2193                let socket_paths = build_javascript_socket_path_context(vm)?;
2194                let Some(process) = vm.active_processes.get_mut(process_id) else {
2195                    log_stale_process_event(
2196                        &self.bridge,
2197                        vm_id,
2198                        process_id,
2199                        "javascript sync RPC bridge dispatch",
2200                    );
2201                    return Ok(());
2202                };
2203                service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
2204                    bridge: &self.bridge,
2205                    vm_id,
2206                    dns: &vm.dns,
2207                    socket_paths: &socket_paths,
2208                    kernel: &mut vm.kernel,
2209                    process,
2210                    sync_request: &request,
2211                    resource_limits: &resource_limits,
2212                    network_counts,
2213                })
2214            }
2215        };
2216
2217        let Some(vm) = self.vms.get_mut(vm_id) else {
2218            log_stale_process_event(
2219                &self.bridge,
2220                vm_id,
2221                process_id,
2222                "javascript sync RPC response delivery",
2223            );
2224            return Ok(());
2225        };
2226        let shadow_root = vm.cwd.clone();
2227        let Some(process) = vm.active_processes.get_mut(process_id) else {
2228            log_stale_process_event(
2229                &self.bridge,
2230                vm_id,
2231                process_id,
2232                "javascript sync RPC response delivery",
2233            );
2234            return Ok(());
2235        };
2236
2237        if response.is_ok()
2238            && matches!(
2239                request.method.as_str(),
2240                "fs.chmodSync" | "fs.promises.chmod"
2241            )
2242        {
2243            let guest_path =
2244                javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
2245            let mode =
2246                javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
2247            let host_path =
2248                shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
2249            if host_path.exists() {
2250                fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
2251                    |error| {
2252                        SidecarError::Io(format!(
2253                            "failed to mirror chmod to shadow path {}: {error}",
2254                            host_path.display()
2255                        ))
2256                    },
2257                )?;
2258            }
2259        }
2260
2261        match response {
2262            Ok(result) => process
2263                .execution
2264                .respond_javascript_sync_rpc_success(request.id, result)
2265                .or_else(ignore_stale_javascript_sync_rpc_response),
2266            Err(error) => process
2267                .execution
2268                .respond_javascript_sync_rpc_error(
2269                    request.id,
2270                    javascript_sync_rpc_error_code(&error),
2271                    error.to_string(),
2272                )
2273                .or_else(ignore_stale_javascript_sync_rpc_response),
2274        }
2275    }
2276
2277    /// Applies a `process.kill` aimed at the calling process itself and
2278    /// returns the self-delivery action payload for the bridge.
2279    fn apply_self_process_kill(
2280        &mut self,
2281        vm_id: &str,
2282        process_id: &str,
2283        parsed_signal: i32,
2284    ) -> Value {
2285        let action = self
2286            .vms
2287            .get(vm_id)
2288            .and_then(|vm| vm.signal_states.get(process_id))
2289            .and_then(|handlers| handlers.get(&(parsed_signal as u32)))
2290            .map(|registration| registration.action.clone())
2291            .unwrap_or(SignalDispositionAction::Default);
2292        if action == SignalDispositionAction::Default
2293            && parsed_signal != 0
2294            && !matches!(
2295                canonical_signal_name(parsed_signal),
2296                Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
2297            )
2298        {
2299            if let Some(vm) = self.vms.get_mut(vm_id) {
2300                if let Some(process) = vm.active_processes.get_mut(process_id) {
2301                    process.pending_self_signal_exit = Some(parsed_signal);
2302                }
2303            }
2304        }
2305        json!({
2306            "self": true,
2307            "action": match action {
2308                SignalDispositionAction::Default => "default",
2309                SignalDispositionAction::Ignore => "ignore",
2310                SignalDispositionAction::User => "user",
2311            },
2312        })
2313    }
2314
2315    pub(crate) fn vm_ids_for_scope(
2316        &self,
2317        ownership: &OwnershipScope,
2318    ) -> Result<Vec<String>, SidecarError> {
2319        match ownership {
2320            OwnershipScope::SessionOwnership(inner) => {
2321                self.require_owned_session(&inner.connection_id, &inner.session_id)?;
2322                Ok(self
2323                    .sessions
2324                    .get(&inner.session_id)
2325                    .expect("owned session should exist")
2326                    .vm_ids
2327                    .iter()
2328                    .cloned()
2329                    .collect())
2330            }
2331            OwnershipScope::VmOwnership(inner) => {
2332                self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
2333                Ok(vec![inner.vm_id.clone()])
2334            }
2335            OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
2336                String::from("event polling requires session or VM ownership scope"),
2337            )),
2338        }
2339    }
2340
2341    pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
2342        let vm = self
2343            .vms
2344            .get(vm_id)
2345            .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2346        Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
2347    }
2348
2349    pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
2350        self.vms
2351            .get(vm_id)
2352            .is_some_and(|vm| !vm.active_processes.is_empty())
2353    }
2354
2355    fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
2356        if self.connections.contains_key(connection_id) {
2357            Ok(())
2358        } else {
2359            Err(SidecarError::InvalidState(format!(
2360                "connection {connection_id} has not authenticated"
2361            )))
2362        }
2363    }
2364
2365    pub(crate) fn require_owned_session(
2366        &self,
2367        connection_id: &str,
2368        session_id: &str,
2369    ) -> Result<(), SidecarError> {
2370        self.require_authenticated_connection(connection_id)?;
2371        let session = self.sessions.get(session_id).ok_or_else(|| {
2372            SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
2373        })?;
2374        if session.connection_id == connection_id {
2375            Ok(())
2376        } else {
2377            Err(SidecarError::InvalidState(format!(
2378                "session {session_id} is not owned by connection {connection_id}"
2379            )))
2380        }
2381    }
2382
2383    pub(crate) fn require_owned_vm(
2384        &self,
2385        connection_id: &str,
2386        session_id: &str,
2387        vm_id: &str,
2388    ) -> Result<(), SidecarError> {
2389        self.require_owned_session(connection_id, session_id)?;
2390        let vm = self
2391            .vms
2392            .get(vm_id)
2393            .ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
2394        if vm.connection_id != connection_id || vm.session_id != session_id {
2395            return Err(SidecarError::InvalidState(format!(
2396                "VM {vm_id} is not owned by {connection_id}/{session_id}"
2397            )));
2398        }
2399        Ok(())
2400    }
2401
2402    fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
2403        match ownership {
2404            OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
2405            OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2406                Err(SidecarError::InvalidState(String::from(
2407                    "request requires connection ownership scope",
2408                )))
2409            }
2410        }
2411    }
2412
2413    fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
2414        let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
2415            return Ok(());
2416        };
2417
2418        if auth_token == expected_auth_token {
2419            Ok(())
2420        } else {
2421            Err(SidecarError::Unauthorized(String::from(
2422                "authenticate request provided an invalid auth token",
2423            )))
2424        }
2425    }
2426
2427    fn allocate_connection_id(&mut self) -> String {
2428        self.next_connection_id += 1;
2429        format!("conn-{}", self.next_connection_id)
2430    }
2431
2432    fn take_matching_process_event_envelope(
2433        &mut self,
2434        vm_id: &str,
2435        process_id: &str,
2436    ) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
2437        if let Some(index) = self
2438            .pending_process_events
2439            .iter()
2440            .position(|event| event.vm_id == vm_id && event.process_id == process_id)
2441        {
2442            return Ok(self.pending_process_events.remove(index));
2443        }
2444
2445        let mut matching_envelope = None;
2446        let mut deferred = Vec::new();
2447        {
2448            let pending_capacity = self.pending_process_event_capacity();
2449            let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
2450                SidecarError::InvalidState(String::from("process event receiver unavailable"))
2451            })?;
2452            loop {
2453                if deferred.len() >= pending_capacity {
2454                    if receiver.is_empty() {
2455                        break;
2456                    }
2457                    return Err(process_event_queue_overflow_error());
2458                }
2459                let envelope = match receiver.try_recv() {
2460                    Ok(envelope) => envelope,
2461                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
2462                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
2463                };
2464                if matching_envelope.is_none()
2465                    && envelope.vm_id == vm_id
2466                    && envelope.process_id == process_id
2467                {
2468                    matching_envelope = Some(envelope);
2469                    break;
2470                }
2471                deferred.push(envelope);
2472            }
2473        }
2474        for envelope in deferred {
2475            self.queue_pending_process_event(envelope)?;
2476        }
2477
2478        Ok(matching_envelope)
2479    }
2480
2481    fn allocate_sidecar_request_id(&mut self) -> RequestId {
2482        let request_id = self.next_sidecar_request_id;
2483        self.next_sidecar_request_id -= 1;
2484        request_id
2485    }
2486
2487    pub(crate) fn session_scope_for(
2488        &self,
2489        ownership: &OwnershipScope,
2490    ) -> Result<(String, String), SidecarError> {
2491        match ownership {
2492            OwnershipScope::SessionOwnership(inner) => {
2493                Ok((inner.connection_id.clone(), inner.session_id.clone()))
2494            }
2495            OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
2496                Err(SidecarError::InvalidState(String::from(
2497                    "request requires session ownership scope",
2498                )))
2499            }
2500        }
2501    }
2502
2503    pub(crate) fn vm_scope_for(
2504        &self,
2505        ownership: &OwnershipScope,
2506    ) -> Result<(String, String, String), SidecarError> {
2507        match ownership {
2508            OwnershipScope::VmOwnership(inner) => Ok((
2509                inner.connection_id.clone(),
2510                inner.session_id.clone(),
2511                inner.vm_id.clone(),
2512            )),
2513            OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
2514                SidecarError::InvalidState(String::from("request requires VM ownership scope")),
2515            ),
2516        }
2517    }
2518
2519    fn response_with_ownership(
2520        &self,
2521        request_id: RequestId,
2522        ownership: OwnershipScope,
2523        payload: ResponsePayload,
2524    ) -> ResponseFrame {
2525        ResponseFrame {
2526            schema: ProtocolSchema::current(),
2527            request_id,
2528            ownership,
2529            payload,
2530        }
2531    }
2532
2533    pub(crate) fn respond(
2534        &self,
2535        request: &RequestFrame,
2536        payload: ResponsePayload,
2537    ) -> ResponseFrame {
2538        self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
2539    }
2540
2541    fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
2542        self.respond(
2543            request,
2544            ResponsePayload::Rejected(RejectedResponse {
2545                code: code.to_owned(),
2546                message: message.to_owned(),
2547            }),
2548        )
2549    }
2550
2551    pub fn queue_sidecar_request(
2552        &mut self,
2553        ownership: OwnershipScope,
2554        payload: SidecarRequestPayload,
2555    ) -> Result<RequestId, SidecarError> {
2556        if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
2557            return Err(outbound_sidecar_request_queue_overflow_error());
2558        }
2559        if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
2560            return Err(sidecar_response_pending_overflow_error());
2561        }
2562        let request_id = self.allocate_sidecar_request_id();
2563        let request = SidecarRequestFrame::new(request_id, ownership, payload);
2564        self.pending_sidecar_responses
2565            .register_request(&request)
2566            .map_err(sidecar_response_tracker_error)?;
2567        self.outbound_sidecar_requests.push_back(request);
2568        self.outbound_sidecar_requests_gauge
2569            .observe_depth(self.outbound_sidecar_requests.len());
2570        self.pending_sidecar_responses_gauge
2571            .observe_depth(self.pending_sidecar_responses.pending_count());
2572        Ok(request_id)
2573    }
2574
2575    pub fn queue_wire_sidecar_request(
2576        &mut self,
2577        ownership: crate::wire::OwnershipScope,
2578        payload: crate::wire::SidecarRequestPayload,
2579    ) -> Result<crate::wire::RequestId, SidecarError> {
2580        let ownership = crate::wire::ownership_scope_to_compat(ownership);
2581        let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
2582            .map_err(wire_protocol_error)?;
2583        self.queue_sidecar_request(ownership, payload)
2584    }
2585
2586    pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
2587        let request = self.outbound_sidecar_requests.pop_front();
2588        self.outbound_sidecar_requests_gauge
2589            .observe_depth(self.outbound_sidecar_requests.len());
2590        request
2591    }
2592
2593    pub fn pop_wire_sidecar_request(
2594        &mut self,
2595    ) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
2596        self.pop_sidecar_request()
2597            .map(crate::wire::sidecar_request_frame_from_compat)
2598            .transpose()
2599            .map_err(wire_protocol_error)
2600    }
2601
2602    pub fn accept_sidecar_response(
2603        &mut self,
2604        response: SidecarResponseFrame,
2605    ) -> Result<(), SidecarError> {
2606        self.pending_sidecar_responses
2607            .accept_response(&response)
2608            .map_err(sidecar_response_tracker_error)?;
2609        self.pending_sidecar_responses_gauge
2610            .observe_depth(self.pending_sidecar_responses.pending_count());
2611        self.completed_sidecar_response_order
2612            .push_back(response.request_id);
2613        self.completed_sidecar_responses
2614            .insert(response.request_id, response);
2615        self.completed_sidecar_responses_gauge
2616            .observe_depth(self.completed_sidecar_responses.len());
2617        while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
2618            match self.completed_sidecar_response_order.pop_front() {
2619                // Only a response that was never retrieved is a real loss; an id
2620                // already taken via take_sidecar_response leaves a stale order
2621                // entry that removes to None and is not a dropped response.
2622                Some(evicted) => {
2623                    if self.completed_sidecar_responses.remove(&evicted).is_some() {
2624                        tracing::warn!(
2625                            queue = "completed_sidecar_responses",
2626                            evicted_request_id = evicted,
2627                            capacity = MAX_COMPLETED_SIDECAR_RESPONSES,
2628                            "dropping an unretrieved completed sidecar response to stay within cap; the host can no longer fetch it (response lost)"
2629                        );
2630                        self.completed_sidecar_responses_gauge
2631                            .observe_depth(self.completed_sidecar_responses.len());
2632                    }
2633                }
2634                None => break,
2635            }
2636        }
2637        Ok(())
2638    }
2639
2640    pub fn accept_wire_sidecar_response(
2641        &mut self,
2642        response: crate::wire::SidecarResponseFrame,
2643    ) -> Result<(), SidecarError> {
2644        let response =
2645            crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
2646        self.accept_sidecar_response(response)
2647    }
2648
2649    pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
2650        let response = self.completed_sidecar_responses.remove(&request_id);
2651        if response.is_some() {
2652            self.completed_sidecar_response_order
2653                .retain(|completed_id| completed_id != &request_id);
2654            self.completed_sidecar_responses_gauge
2655                .observe_depth(self.completed_sidecar_responses.len());
2656        }
2657        response
2658    }
2659
2660    pub fn take_wire_sidecar_response(
2661        &mut self,
2662        request_id: crate::wire::RequestId,
2663    ) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
2664        self.take_sidecar_response(request_id)
2665            .map(|response| {
2666                crate::wire::sidecar_response_frame_from_compat(response)
2667                    .map_err(wire_protocol_error)
2668            })
2669            .transpose()
2670    }
2671
2672    pub(crate) fn vm_lifecycle_event(
2673        &self,
2674        connection_id: &str,
2675        session_id: &str,
2676        vm_id: &str,
2677        state: VmLifecycleState,
2678    ) -> EventFrame {
2679        EventFrame::new(
2680            OwnershipScope::vm(connection_id, session_id, vm_id),
2681            EventPayload::VmLifecycle(VmLifecycleEvent { state }),
2682        )
2683    }
2684
2685    fn ensure_request_within_frame_limit(
2686        &self,
2687        request: &RequestFrame,
2688    ) -> Result<(), SidecarError> {
2689        let frame = crate::protocol::to_generated_protocol_frame(
2690            &crate::protocol::ProtocolFrame::Request(request.clone()),
2691        )
2692        .map_err(|error| {
2693            SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
2694        })?;
2695        let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
2696            return Err(SidecarError::InvalidState(String::from(
2697                "request converted to non-request wire frame",
2698            )));
2699        };
2700
2701        crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
2702            .encode(&frame)
2703            .map(|_| ())
2704            .map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
2705    }
2706}
2707
2708impl<B> ExtensionHost for NativeSidecar<B>
2709where
2710    B: NativeSidecarBridge + Send + 'static,
2711    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
2712{
2713    fn spawn_process<'a>(
2714        &'a mut self,
2715        ownership: OwnershipScope,
2716        payload: ExecuteRequest,
2717    ) -> ExtensionFuture<'a, ProcessStartedResponse> {
2718        Box::pin(async move {
2719            let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
2720            let dispatch = NativeSidecar::execute(self, &request, payload).await?;
2721            match dispatch.response.payload {
2722                ResponsePayload::ProcessStarted(response) => Ok(response),
2723                other => Err(unexpected_extension_host_response("execute", other)),
2724            }
2725        })
2726    }
2727
2728    fn write_stdin<'a>(
2729        &'a mut self,
2730        ownership: OwnershipScope,
2731        payload: WriteStdinRequest,
2732    ) -> ExtensionFuture<'a, StdinWrittenResponse> {
2733        Box::pin(async move {
2734            let request =
2735                RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
2736            let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
2737            match dispatch.response.payload {
2738                ResponsePayload::StdinWritten(response) => Ok(response),
2739                other => Err(unexpected_extension_host_response("write_stdin", other)),
2740            }
2741        })
2742    }
2743
2744    fn close_stdin<'a>(
2745        &'a mut self,
2746        ownership: OwnershipScope,
2747        payload: CloseStdinRequest,
2748    ) -> ExtensionFuture<'a, StdinClosedResponse> {
2749        Box::pin(async move {
2750            let request =
2751                RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
2752            let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
2753            match dispatch.response.payload {
2754                ResponsePayload::StdinClosed(response) => Ok(response),
2755                other => Err(unexpected_extension_host_response("close_stdin", other)),
2756            }
2757        })
2758    }
2759
2760    fn kill_process<'a>(
2761        &'a mut self,
2762        ownership: OwnershipScope,
2763        payload: KillProcessRequest,
2764    ) -> ExtensionFuture<'a, ProcessKilledResponse> {
2765        Box::pin(async move {
2766            let request =
2767                RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
2768            let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
2769            match dispatch.response.payload {
2770                ResponsePayload::ProcessKilled(response) => Ok(response),
2771                other => Err(unexpected_extension_host_response("kill_process", other)),
2772            }
2773        })
2774    }
2775
2776    fn poll_event<'a>(
2777        &'a mut self,
2778        ownership: OwnershipScope,
2779        timeout: Duration,
2780    ) -> ExtensionFuture<'a, Option<EventFrame>> {
2781        Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
2782    }
2783
2784    fn guest_filesystem_call<'a>(
2785        &'a mut self,
2786        ownership: OwnershipScope,
2787        payload: GuestFilesystemCallRequest,
2788    ) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
2789        Box::pin(async move {
2790            let request = RequestFrame::new(
2791                0,
2792                ownership,
2793                RequestPayload::GuestFilesystemCall(payload.clone()),
2794            );
2795            let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
2796            match dispatch.response.payload {
2797                ResponsePayload::GuestFilesystemResult(response) => Ok(response),
2798                other => Err(unexpected_extension_host_response(
2799                    "guest_filesystem_call",
2800                    other,
2801                )),
2802            }
2803        })
2804    }
2805
2806    fn bind_process_to_session<'a>(
2807        &'a mut self,
2808        ownership: OwnershipScope,
2809        namespace: String,
2810        ext_session_id: String,
2811        process_id: String,
2812    ) -> ExtensionFuture<'a, ()> {
2813        Box::pin(async move {
2814            self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
2815        })
2816    }
2817
2818    fn bind_vm_to_session<'a>(
2819        &'a mut self,
2820        ownership: OwnershipScope,
2821        namespace: String,
2822        ext_session_id: String,
2823    ) -> ExtensionFuture<'a, ()> {
2824        Box::pin(
2825            async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
2826        )
2827    }
2828
2829    fn dispose_session_resources<'a>(
2830        &'a mut self,
2831        ownership: OwnershipScope,
2832        namespace: String,
2833        ext_session_id: String,
2834    ) -> ExtensionFuture<'a, Vec<EventFrame>> {
2835        Box::pin(async move {
2836            let key = (namespace, ext_session_id);
2837            let Some(resources) = self.extension_sessions.get(&key) else {
2838                return Ok(Vec::new());
2839            };
2840            if resources.ownership != ownership {
2841                return Err(SidecarError::InvalidState(String::from(
2842                    "extension session ownership did not match dispose request",
2843                )));
2844            }
2845            let resources = self
2846                .extension_sessions
2847                .remove(&key)
2848                .expect("extension resources existed before removal");
2849            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2850            for process_id in resources.process_ids {
2851                if self
2852                    .vms
2853                    .get(&vm_id)
2854                    .is_some_and(|vm| vm.active_processes.contains_key(&process_id))
2855                {
2856                    self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
2857                }
2858            }
2859            let mut events = Vec::new();
2860            for resource_vm_id in resources.vm_ids {
2861                if self.vms.contains_key(&resource_vm_id) {
2862                    events.extend(
2863                        self.dispose_vm_internal(
2864                            &connection_id,
2865                            &session_id,
2866                            &resource_vm_id,
2867                            DisposeReason::Requested,
2868                        )
2869                        .await?,
2870                    );
2871                }
2872            }
2873            Ok(events)
2874        })
2875    }
2876
2877    fn start_buffering_process_output<'a>(
2878        &'a mut self,
2879        ownership: OwnershipScope,
2880        process_id: String,
2881    ) -> ExtensionFuture<'a, ()> {
2882        Box::pin(async move {
2883            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2884            self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2885            let key = (vm_id, process_id);
2886            if self.extension_process_output_buffers.contains_key(&key) {
2887                return Err(SidecarError::Conflict(String::from(
2888                    "extension process output buffering already started",
2889                )));
2890            }
2891            self.extension_process_output_buffers
2892                .insert(key, ExtensionBufferedProcessOutput::default());
2893            Ok(())
2894        })
2895    }
2896
2897    fn handoff_buffered_process_output<'a>(
2898        &'a mut self,
2899        ownership: OwnershipScope,
2900        namespace: String,
2901        ext_session_id: String,
2902        process_id: String,
2903        timeout: Duration,
2904    ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
2905        Box::pin(async move {
2906            let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
2907            self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
2908            let key = (vm_id.clone(), process_id.clone());
2909            let deadline = Instant::now() + timeout;
2910            loop {
2911                self.pump_process_events(&ownership).await?;
2912                while let Some(envelope) =
2913                    self.take_matching_process_event_envelope(&vm_id, &process_id)?
2914                {
2915                    if self.capture_extension_process_output_event(
2916                        &vm_id,
2917                        &process_id,
2918                        &envelope.event,
2919                    ) {
2920                        continue;
2921                    }
2922                    self.queue_pending_process_event(envelope)?;
2923                    break;
2924                }
2925                let buffered = self
2926                    .extension_process_output_buffers
2927                    .get(&key)
2928                    .is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
2929                if buffered || timeout.is_zero() || Instant::now() >= deadline {
2930                    break;
2931                }
2932                let remaining = deadline.saturating_duration_since(Instant::now());
2933                time::sleep(remaining.min(Duration::from_millis(10))).await;
2934            }
2935            self.bind_extension_process_resource(
2936                ownership,
2937                namespace,
2938                ext_session_id,
2939                process_id.clone(),
2940            )?;
2941            self.extension_process_output_buffers
2942                .remove(&key)
2943                .ok_or_else(|| {
2944                    SidecarError::InvalidState(String::from(
2945                        "extension process output buffering was not started",
2946                    ))
2947                })
2948        })
2949    }
2950}
2951
2952fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
2953    match payload {
2954        ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
2955            "extension {operation} rejected with {}: {}",
2956            response.code, response.message
2957        )),
2958        other => SidecarError::InvalidState(format!(
2959            "extension {operation} returned unexpected response: {other:?}"
2960        )),
2961    }
2962}
2963
2964fn shadow_host_path_for_process(
2965    shadow_root: &Path,
2966    process_guest_cwd: &str,
2967    guest_path: &str,
2968) -> PathBuf {
2969    let normalized_guest_path = if guest_path.starts_with('/') {
2970        normalize_path(guest_path)
2971    } else {
2972        normalize_path(&format!(
2973            "{}/{}",
2974            process_guest_cwd.trim_end_matches('/'),
2975            guest_path
2976        ))
2977    };
2978    if normalized_guest_path == "/" {
2979        shadow_root.to_path_buf()
2980    } else {
2981        shadow_root.join(normalized_guest_path.trim_start_matches('/'))
2982    }
2983}
2984
2985fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
2986    SidecarError::InvalidState(format!(
2987        "invalid sidecar response correlation state: {error}"
2988    ))
2989}
2990
2991fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
2992    match decision.verdict {
2993        secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
2994        secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
2995            decision
2996                .reason
2997                .unwrap_or_else(|| String::from("denied by host")),
2998        ),
2999        secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
3000            decision
3001                .reason
3002                .unwrap_or_else(|| String::from("permission prompt required")),
3003        ),
3004    }
3005}
3006
3007fn audit_timestamp() -> String {
3008    SystemTime::now()
3009        .duration_since(UNIX_EPOCH)
3010        .expect("system time before unix epoch")
3011        .as_millis()
3012        .to_string()
3013}
3014
3015pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
3016where
3017    I: IntoIterator<Item = (K, V)>,
3018    K: Into<String>,
3019    V: Into<String>,
3020{
3021    let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
3022    for (key, value) in fields {
3023        mapped.insert(key.into(), value.into());
3024    }
3025    mapped
3026}
3027
3028pub(crate) fn emit_structured_event<B>(
3029    bridge: &SharedBridge<B>,
3030    vm_id: &str,
3031    name: &str,
3032    fields: BTreeMap<String, String>,
3033) -> Result<(), SidecarError>
3034where
3035    B: NativeSidecarBridge + Send + 'static,
3036    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3037{
3038    bridge.with_mut(|bridge| {
3039        bridge.emit_structured_event(StructuredEventRecord {
3040            vm_id: vm_id.to_owned(),
3041            name: name.to_owned(),
3042            fields,
3043        })
3044    })
3045}
3046
3047pub(crate) fn emit_security_audit_event<B>(
3048    bridge: &SharedBridge<B>,
3049    vm_id: &str,
3050    name: &str,
3051    fields: BTreeMap<String, String>,
3052) where
3053    B: NativeSidecarBridge + Send + 'static,
3054    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3055{
3056    let _ = emit_structured_event(bridge, vm_id, name, fields);
3057}
3058
3059/// Build a wire `EventFrame` carrying a `StructuredEvent` (name + string-map
3060/// detail) scoped to a connection. Used to forward limit-registry warnings to the
3061/// host as `{type:"structured", name:"limit_warning", detail}` events without a
3062/// protocol schema change. Emitted directly to the host (not via the polled,
3063/// per-session bridge queue, which is a no-op in the stdio sidecar), so a
3064/// process-global signal is delivered against the active connection.
3065pub(crate) fn structured_event_frame(
3066    connection_id: &str,
3067    name: &str,
3068    detail: std::collections::HashMap<String, String>,
3069) -> Result<crate::wire::EventFrame, SidecarError> {
3070    let event = EventFrame::new(
3071        OwnershipScope::connection(connection_id),
3072        EventPayload::Structured(crate::protocol::StructuredEvent {
3073            name: name.to_owned(),
3074            detail,
3075        }),
3076    );
3077    crate::wire::event_frame_from_compat(event).map_err(|error| {
3078        SidecarError::InvalidState(format!("invalid structured event frame: {error}"))
3079    })
3080}
3081
3082pub(crate) fn log_stale_process_event<B>(
3083    bridge: &SharedBridge<B>,
3084    vm_id: &str,
3085    process_id: &str,
3086    context: &str,
3087) where
3088    B: NativeSidecarBridge + Send + 'static,
3089    BridgeError<B>: fmt::Debug + Send + Sync + 'static,
3090{
3091    let _ = bridge.emit_log(
3092        vm_id,
3093        format!(
3094            "Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
3095        ),
3096    );
3097}
3098
3099// filesystem_operation_label moved to crate::vm
3100
3101pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
3102    SidecarError::InvalidState(format!("root filesystem: {error}"))
3103}
3104
3105pub(crate) fn normalize_path(path: &str) -> String {
3106    let mut segments = Vec::new();
3107    for component in Path::new(path).components() {
3108        match component {
3109            Component::RootDir => segments.clear(),
3110            Component::ParentDir => {
3111                segments.pop();
3112            }
3113            Component::CurDir => {}
3114            Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
3115            Component::Prefix(prefix) => {
3116                segments.push(prefix.as_os_str().to_string_lossy().into_owned());
3117            }
3118        }
3119    }
3120
3121    let normalized = format!("/{}", segments.join("/"));
3122    if normalized.is_empty() {
3123        String::from("/")
3124    } else {
3125        normalized
3126    }
3127}
3128
3129pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
3130    let mut normalized = PathBuf::new();
3131
3132    for component in path.components() {
3133        match component {
3134            Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
3135            Component::RootDir => normalized.push(Path::new("/")),
3136            Component::CurDir => {}
3137            Component::ParentDir => {
3138                if normalized != Path::new("/") {
3139                    normalized.pop();
3140                }
3141            }
3142            Component::Normal(part) => normalized.push(part),
3143        }
3144    }
3145
3146    if normalized.as_os_str().is_empty() {
3147        if path.is_absolute() {
3148            PathBuf::from("/")
3149        } else {
3150            PathBuf::from(".")
3151        }
3152    } else {
3153        normalized
3154    }
3155}
3156
3157pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
3158    path == root || path.starts_with(root)
3159}
3160
3161pub(crate) fn dirname(path: &str) -> String {
3162    let normalized = normalize_path(path);
3163    let parent = Path::new(&normalized)
3164        .parent()
3165        .unwrap_or_else(|| Path::new("/"));
3166    let value = parent.to_string_lossy();
3167    if value.is_empty() {
3168        String::from("/")
3169    } else {
3170        value.into_owned()
3171    }
3172}
3173
3174pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
3175    SidecarError::Kernel(error.to_string())
3176}
3177
3178pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
3179    SidecarError::Plugin(error.to_string())
3180}
3181
3182pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
3183    SidecarError::Execution(error.to_string())
3184}
3185
3186pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
3187    SidecarError::Execution(error.to_string())
3188}
3189
3190pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
3191    SidecarError::Execution(error.to_string())
3192}
3193
3194pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
3195    SidecarError::Kernel(error.to_string())
3196}
3197
3198/// Actionable guidance shown when guest package resolution fails because the packages live in a
3199/// non-flat `node_modules` whose package store is not visible in the VM. Mounting host `node_modules`
3200/// is a bind mount, so symlinked/store layouts
3201/// do not resolve inside the VM: Node canonicalizes a module to its store
3202/// realpath (e.g. `node_modules/.pnpm/...`, `.bun/...`, `.store/...`) which lives
3203/// above the mounted directory and the guest `fs` cannot read. Plug'n'Play
3204/// (yarn-berry default) has no `node_modules` at all. A flat (hoisted) layout is
3205/// required. The empirically-supported package managers are captured in
3206/// `crates/sidecar/tests/module_layout_e2e.rs`.
3207#[allow(dead_code)]
3208const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n  - pnpm        -> add `node-linker=hoisted` to .npmrc, then reinstall\n  - yarn berry  -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n  - bun         -> install dependencies outside a workspace (workspaces use a .bun store)\n  - npm / yarn classic -> already flat, no change needed";
3209
3210/// Detect, from an adapter's captured stderr, a non-flat-`node_modules` failure
3211/// signature. Returns the actionable guidance to fold into the surfaced error,
3212/// or `None` when the failure is unrelated.
3213///
3214/// Two signatures, both kept specific so they never fire on unrelated crashes:
3215/// - a missing-file / cannot-resolve error referencing a package STORE path that
3216///   lives above the mounted project (`.pnpm`, `.bun`, `.store`, PnP `__virtual__`),
3217/// - a yarn Plug'n'Play fingerprint (`.pnp.cjs`, the zip cache, or PnP's
3218///   "isn't declared in your dependencies" resolver error).
3219#[allow(dead_code)]
3220fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
3221    // Package stores that only appear in a path when a non-flat layout is used.
3222    // pnpm (isolated), bun (workspace), yarn-berry (nodeLinker: pnpm), and PnP
3223    // virtual instances all keep real package files under these store dirs, which
3224    // sit above the mounted project node_modules and so are not guest-visible.
3225    const STORE_MARKERS: &[&str] = &[
3226        "node_modules/.pnpm/",
3227        "node_modules/.bun/",
3228        "node_modules/.store/",
3229        "/__virtual__/",
3230    ];
3231    // Yarn Plug'n'Play has no node_modules at all; resolution fails against the
3232    // .pnp runtime / zip cache. "isn't declared in your dependencies" is PnP's
3233    // distinctive resolver error and is specific enough to fire on its own.
3234    const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
3235    const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
3236
3237    if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
3238        return Some(HOISTED_NODE_MODULES_GUIDANCE);
3239    }
3240
3241    let missing = stderr.contains("ENOENT")
3242        || stderr.contains("no such file or directory")
3243        || stderr.contains("Cannot find module")
3244        || stderr.contains("MODULE_NOT_FOUND");
3245    if !missing {
3246        return None;
3247    }
3248    if STORE_MARKERS.iter().any(|m| stderr.contains(m))
3249        || PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
3250    {
3251        return Some(HOISTED_NODE_MODULES_GUIDANCE);
3252    }
3253    None
3254}
3255
3256#[cfg(test)]
3257mod symlinked_node_modules_hint_tests {
3258    use super::symlinked_node_modules_hint;
3259
3260    // Positive cases: each non-flat package manager's store/PnP signature.
3261    #[test]
3262    fn matches_pnpm_store_enoent() {
3263        // Real pi-coding-agent failure: getPackageDir() falls back to a
3264        // dist/package.json inside the unreachable .pnpm store.
3265        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
3266        let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
3267        assert!(hint.contains("secure-exec can't load mounted node_modules"));
3268        assert!(!hint.contains("agentos"));
3269    }
3270
3271    #[test]
3272    fn matches_bun_store_enoent() {
3273        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
3274        assert!(symlinked_node_modules_hint(stderr).is_some());
3275    }
3276
3277    #[test]
3278    fn matches_yarn_pnpm_store_enoent() {
3279        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
3280        assert!(symlinked_node_modules_hint(stderr).is_some());
3281    }
3282
3283    #[test]
3284    fn matches_pnp_declared_error() {
3285        // Yarn PnP's distinctive resolver error (no node_modules at all).
3286        let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
3287        assert!(symlinked_node_modules_hint(stderr).is_some());
3288    }
3289
3290    #[test]
3291    fn matches_pnp_cjs_module_not_found() {
3292        let stderr = "Error: Cannot find module 'is-odd'\n    at /root/.pnp.cjs:12345:18\n    code: 'MODULE_NOT_FOUND'";
3293        assert!(symlinked_node_modules_hint(stderr).is_some());
3294    }
3295
3296    #[test]
3297    fn matches_virtual_instance() {
3298        let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
3299        assert!(symlinked_node_modules_hint(stderr).is_some());
3300    }
3301
3302    // Negative cases: must not fire.
3303    #[test]
3304    fn ignores_enoent_outside_a_store() {
3305        let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
3306        assert!(symlinked_node_modules_hint(stderr).is_none());
3307    }
3308
3309    #[test]
3310    fn ignores_store_path_without_missing_file() {
3311        let stderr =
3312            "loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
3313        assert!(symlinked_node_modules_hint(stderr).is_none());
3314    }
3315
3316    #[test]
3317    fn ignores_flat_node_modules_enoent() {
3318        // npm / yarn-nm / pnpm-hoisted: flat, no store dir in the path.
3319        let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
3320        assert!(symlinked_node_modules_hint(stderr).is_none());
3321    }
3322
3323    #[test]
3324    fn ignores_unrelated_failure() {
3325        let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
3326        assert!(symlinked_node_modules_hint(stderr).is_none());
3327    }
3328}
3329
3330#[cfg(test)]
3331mod structured_event_frame_tests {
3332    use super::*;
3333
3334    #[test]
3335    fn structured_event_frame_round_trips_limit_warning() {
3336        let mut detail = std::collections::HashMap::new();
3337        // Pin a real emitted limit name rather than a fictional string.
3338        let limit_name = TrackedLimit::JavascriptEventChannel.as_str();
3339        detail.insert(String::from("limit"), String::from(limit_name));
3340        detail.insert(String::from("fillPercent"), String::from("82"));
3341
3342        let wire = structured_event_frame("conn-1", "limit_warning", detail)
3343            .expect("build structured event frame");
3344        let compat = crate::wire::event_frame_to_compat(wire).expect("convert to compat");
3345
3346        match compat.payload {
3347            EventPayload::Structured(event) => {
3348                assert_eq!(event.name, "limit_warning");
3349                assert_eq!(
3350                    event.detail.get("limit").map(String::as_str),
3351                    Some(limit_name)
3352                );
3353                assert_eq!(
3354                    event.detail.get("fillPercent").map(String::as_str),
3355                    Some("82")
3356                );
3357            }
3358            other => panic!("expected structured payload, got {other:?}"),
3359        }
3360        match compat.ownership {
3361            OwnershipScope::ConnectionOwnership(inner) => {
3362                assert_eq!(inner.connection_id, "conn-1");
3363            }
3364            other => panic!("expected connection ownership, got {other:?}"),
3365        }
3366    }
3367}