use crate::bridge::{build_mount_plugin_registry, MountPluginContext};
pub(crate) use crate::execution::{
build_javascript_socket_path_context, canonical_signal_name, dispatch_loopback_http_request,
error_code, format_tcp_resource, ignore_stale_javascript_sync_rpc_response,
javascript_sync_rpc_arg_i32, javascript_sync_rpc_arg_str, javascript_sync_rpc_arg_u32,
javascript_sync_rpc_arg_u32_optional, javascript_sync_rpc_arg_u64,
javascript_sync_rpc_arg_u64_optional, javascript_sync_rpc_bytes_arg,
javascript_sync_rpc_bytes_value, javascript_sync_rpc_encoding, javascript_sync_rpc_error_code,
javascript_sync_rpc_option_bool, javascript_sync_rpc_option_u32, parse_signal,
sanitize_javascript_child_process_internal_bootstrap_env, service_javascript_sync_rpc,
vm_network_resource_counts, JavascriptSyncRpcServiceRequest, LoopbackHttpDispatchRequest,
};
use crate::extension::{
Extension, ExtensionBufferedProcessOutput, ExtensionContext, ExtensionFuture, ExtensionHost,
ExtensionSnapshot,
};
use crate::filesystem::guest_filesystem_call as filesystem_guest_filesystem_call;
use crate::limits::DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT;
use crate::protocol::{
AuthenticatedResponse, CloseStdinRequest, DisposeReason, EventFrame, EventPayload,
ExecuteRequest, ExtEnvelope, FsPermissionScope, GuestFilesystemCallRequest,
GuestFilesystemResultResponse, JavascriptChildProcessSpawnOptions,
JavascriptChildProcessSpawnRequest, KillProcessRequest, OpenSessionRequest, OwnershipScope,
PatternPermissionRule, PatternPermissionScope, PermissionMode, PermissionsPolicy,
ProcessKilledResponse, ProcessStartedResponse, ProtocolSchema, RejectedResponse, RequestFrame,
RequestId, RequestPayload, ResponseFrame, ResponsePayload, SessionOpenedResponse,
SidecarRequestFrame, SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload,
SidecarResponseTracker, SidecarResponseTrackerError, SignalDispositionAction,
SignalHandlerRegistration, StdinClosedResponse, StdinWrittenResponse, VmLifecycleEvent,
VmLifecycleState, WriteStdinRequest,
};
use crate::state::{
ActiveExecutionEvent, BridgeError, ConnectionState, JavascriptSocketFamily,
JavascriptSocketPathContext, ProcessEventEnvelope, SessionState, SharedBridge,
SharedSidecarRequestClient, SidecarRequestTransport, VmState, EXECUTION_DRIVER_NAME,
};
use crate::tools::register_host_callbacks;
use crate::NativeSidecarBridge;
use secure_exec_bridge::queue_tracker::{register_queue, QueueGauge, TrackedLimit};
use secure_exec_bridge::{
CommandPermissionRequest, EnvironmentAccess, EnvironmentPermissionRequest, FilesystemAccess,
FilesystemPermissionRequest, LifecycleEventRecord, LifecycleState, LogLevel, LogRecord,
NetworkAccess, NetworkPermissionRequest, StructuredEventRecord,
};
use secure_exec_execution::{
JavascriptExecutionEngine, JavascriptExecutionError, JavascriptSyncRpcRequest,
PythonExecutionEngine, PythonExecutionError, WasmExecutionEngine, WasmExecutionError,
};
use secure_exec_kernel::kernel::KernelError;
use secure_exec_kernel::mount_plugin::{FileSystemPluginRegistry, PluginError};
use secure_exec_kernel::permissions::{
permission_glob_matches, CommandAccessRequest, EnvAccessRequest, EnvironmentOperation,
NetworkAccessRequest, NetworkOperation, PermissionDecision,
};
// root_fs types moved to crate::vm
use secure_exec_kernel::vfs::VfsError;
use serde::Deserialize;
use serde_json::{json, Value};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time;
// Constants and type aliases moved to crate::state
const INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS: &[&str] =
&["AGENTOS_ENTRYPOINT", "AGENTOS_BOOTSTRAP_MODULE"];
const INTERNAL_WASM_ENTRYPOINT_ENV_KEYS: &[&str] =
&["AGENTOS_WASM_MODULE_PATH", "AGENTOS_WASM_MODULE_BASE64"];
const INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES: &[&str] = &["AGENTOS_PYTHON_"];
pub(crate) const MAX_PROCESS_EVENT_QUEUE: usize = 10_000;
pub(crate) const MAX_PENDING_SIDECAR_RESPONSES: usize = 10_000;
pub(crate) const MAX_OUTBOUND_SIDECAR_REQUESTS: usize = 10_000;
pub(crate) const MAX_COMPLETED_SIDECAR_RESPONSES: usize = 10_000;
pub(crate) fn process_event_queue_overflow_error() -> SidecarError {
SidecarError::InvalidState(format!(
"process event queue exceeded {MAX_PROCESS_EVENT_QUEUE} pending events"
))
}
fn sidecar_response_pending_overflow_error() -> SidecarError {
SidecarError::InvalidState(format!(
"sidecar response tracker exceeded {MAX_PENDING_SIDECAR_RESPONSES} pending responses"
))
}
fn outbound_sidecar_request_queue_overflow_error() -> SidecarError {
SidecarError::InvalidState(format!(
"outbound sidecar request queue exceeded {MAX_OUTBOUND_SIDECAR_REQUESTS} pending requests"
))
}
fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
}
// NativeSidecarConfig, DispatchResult, SidecarError moved to crate::state
pub use crate::state::{DispatchResult, NativeSidecarConfig, SidecarError};
// SharedBridge struct and Clone impl moved to crate::state
#[derive(Debug, Default, Deserialize)]
struct LegacyJavascriptChildProcessSpawnOptions {
#[serde(default)]
cwd: Option<String>,
#[serde(default)]
env: BTreeMap<String, String>,
#[serde(default)]
input: Option<Value>,
#[serde(default)]
shell: bool,
#[serde(default)]
detached: bool,
#[serde(default)]
stdio: Vec<String>,
#[serde(default, rename = "maxBuffer")]
max_buffer: Option<usize>,
#[serde(default)]
timeout: Option<u64>,
#[serde(default, rename = "killSignal")]
kill_signal: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct JavascriptHttpLoopbackRequest {
process_id: String,
server_id: u64,
host: String,
port: u16,
request: String,
}
fn is_javascript_loopback_host(host: &str) -> bool {
host == "127.0.0.1" || host == "::1" || host.eq_ignore_ascii_case("localhost")
}
pub(crate) fn parse_javascript_child_process_spawn_request(
vm: &VmState,
args: &[Value],
) -> Result<(JavascriptChildProcessSpawnRequest, Option<usize>), SidecarError> {
if let Some(value) = args.first().cloned() {
if let Ok(request) = serde_json::from_value::<JavascriptChildProcessSpawnRequest>(value) {
return Ok((request, None));
}
}
let command = javascript_sync_rpc_arg_str(args, 0, "child_process.spawn command")?.to_owned();
let raw_args = javascript_sync_rpc_arg_str(args, 1, "child_process.spawn args")?;
let raw_options = javascript_sync_rpc_arg_str(args, 2, "child_process.spawn options")?;
let parsed_args = serde_json::from_str::<Vec<String>>(raw_args).map_err(|error| {
SidecarError::InvalidState(format!("invalid child_process.spawn args payload: {error}"))
})?;
let parsed_options = serde_json::from_str::<LegacyJavascriptChildProcessSpawnOptions>(
raw_options,
)
.map_err(|error| {
SidecarError::InvalidState(format!(
"invalid child_process.spawn options payload: {error}"
))
})?;
Ok((
JavascriptChildProcessSpawnRequest {
command,
args: parsed_args,
options: JavascriptChildProcessSpawnOptions {
cwd: parsed_options.cwd,
env: parsed_options.env,
internal_bootstrap_env: sanitize_javascript_child_process_internal_bootstrap_env(
&vm.guest_env,
),
input: parsed_options.input,
shell: parsed_options.shell,
detached: parsed_options.detached,
stdio: parsed_options.stdio,
timeout: parsed_options.timeout,
kill_signal: parsed_options.kill_signal,
},
},
parsed_options.max_buffer,
))
}
impl<B> SharedBridge<B> {
fn new(bridge: B) -> Self {
Self {
inner: Arc::new(Mutex::new(bridge)),
permissions: Arc::new(Mutex::new(BTreeMap::new())),
#[cfg(test)]
set_vm_permissions_outcomes: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
impl<B> SharedBridge<B>
where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
pub(crate) fn with_mut<T>(
&self,
operation: impl FnOnce(&mut B) -> Result<T, BridgeError<B>>,
) -> Result<T, SidecarError> {
let mut bridge = self.inner.lock().map_err(|_| {
SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
})?;
operation(&mut bridge).map_err(|error| SidecarError::Bridge(format!("{error:?}")))
}
fn inspect<T>(&self, operation: impl FnOnce(&mut B) -> T) -> Result<T, SidecarError> {
let mut bridge = self.inner.lock().map_err(|_| {
SidecarError::Bridge(String::from("native sidecar bridge lock poisoned"))
})?;
Ok(operation(&mut bridge))
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn queue_set_vm_permissions_result(
&self,
result: Result<(), SidecarError>,
) -> Result<(), SidecarError> {
let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
SidecarError::Bridge(String::from(
"native sidecar test set_vm_permissions outcome lock poisoned",
))
})?;
outcomes.push_back(result.err());
Ok(())
}
pub(crate) fn emit_lifecycle(
&self,
vm_id: &str,
state: LifecycleState,
) -> Result<(), SidecarError> {
self.with_mut(|bridge| {
bridge.emit_lifecycle(LifecycleEventRecord {
vm_id: vm_id.to_owned(),
state,
detail: None,
})
})
}
pub(crate) fn emit_log(
&self,
vm_id: &str,
message: impl Into<String>,
) -> Result<(), SidecarError> {
self.with_mut(|bridge| {
bridge.emit_log(LogRecord {
vm_id: vm_id.to_owned(),
level: LogLevel::Info,
message: message.into(),
})
})
}
pub(crate) fn filesystem_decision(
&self,
vm_id: &str,
path: &str,
access: FilesystemAccess,
) -> PermissionDecision {
if let Some(decision) = self.static_permission_decision(
vm_id,
filesystem_permission_capability(access),
"fs",
Some(path),
) {
return decision;
}
match self.with_mut(|bridge| {
bridge.check_filesystem_access(FilesystemPermissionRequest {
vm_id: vm_id.to_owned(),
path: path.to_owned(),
access,
})
}) {
Ok(decision) => map_bridge_permission(decision),
Err(error) => PermissionDecision::deny(error.to_string()),
}
}
pub(crate) fn command_decision(
&self,
vm_id: &str,
request: &CommandAccessRequest,
) -> PermissionDecision {
if is_internal_runtime_command_request(request) {
return PermissionDecision::allow();
}
if let Some(decision) = self.static_permission_decision(
vm_id,
"child_process.spawn",
"child_process",
Some(&request.command),
) {
return decision;
}
match self.with_mut(|bridge| {
bridge.check_command_execution(CommandPermissionRequest {
vm_id: vm_id.to_owned(),
command: request.command.clone(),
args: request.args.clone(),
cwd: request.cwd.clone(),
env: request.env.clone(),
})
}) {
Ok(decision) => map_bridge_permission(decision),
Err(error) => PermissionDecision::deny(error.to_string()),
}
}
pub(crate) fn environment_decision(
&self,
vm_id: &str,
request: &EnvAccessRequest,
) -> PermissionDecision {
if let Some(decision) = self.static_permission_decision(
vm_id,
environment_permission_capability(request.op),
"env",
Some(&request.key),
) {
return decision;
}
match self.with_mut(|bridge| {
bridge.check_environment_access(EnvironmentPermissionRequest {
vm_id: vm_id.to_owned(),
access: match request.op {
EnvironmentOperation::Read => EnvironmentAccess::Read,
EnvironmentOperation::Write => EnvironmentAccess::Write,
},
key: request.key.clone(),
value: request.value.clone(),
})
}) {
Ok(decision) => map_bridge_permission(decision),
Err(error) => PermissionDecision::deny(error.to_string()),
}
}
pub(crate) fn network_decision(
&self,
vm_id: &str,
request: &NetworkAccessRequest,
) -> PermissionDecision {
if let Some(decision) = self.static_permission_decision(
vm_id,
network_permission_capability(request.op),
"network",
Some(&request.resource),
) {
return decision;
}
match self.with_mut(|bridge| {
bridge.check_network_access(NetworkPermissionRequest {
vm_id: vm_id.to_owned(),
access: match request.op {
NetworkOperation::Fetch => NetworkAccess::Fetch,
NetworkOperation::Http => NetworkAccess::Http,
NetworkOperation::Dns => NetworkAccess::Dns,
NetworkOperation::Listen => NetworkAccess::Listen,
},
resource: request.resource.clone(),
})
}) {
Ok(decision) => map_bridge_permission(decision),
Err(error) => PermissionDecision::deny(error.to_string()),
}
}
pub(crate) fn require_network_access(
&self,
vm_id: &str,
op: NetworkOperation,
resource: impl Into<String>,
) -> Result<(), SidecarError> {
let resource = resource.into();
let decision = self.network_decision(
vm_id,
&NetworkAccessRequest {
vm_id: vm_id.to_owned(),
op,
resource: resource.clone(),
},
);
if decision.allow {
return Ok(());
}
let message = match decision.reason.as_deref() {
Some(reason) => format!("EACCES: permission denied, {resource}: {reason}"),
None => format!("EACCES: permission denied, {resource}"),
};
Err(SidecarError::Execution(message))
}
pub(crate) fn set_vm_permissions(
&self,
vm_id: &str,
permissions: &PermissionsPolicy,
) -> Result<(), SidecarError> {
#[cfg(test)]
{
let mut outcomes = self.set_vm_permissions_outcomes.lock().map_err(|_| {
SidecarError::Bridge(String::from(
"native sidecar test set_vm_permissions outcome lock poisoned",
))
})?;
if let Some(Some(error)) = outcomes.pop_front() {
return Err(error);
}
}
let mut stored = self.permissions.lock().map_err(|_| {
SidecarError::Bridge(String::from(
"native sidecar permission policy lock poisoned",
))
})?;
stored.insert(vm_id.to_owned(), permissions.clone());
Ok(())
}
pub(crate) fn restore_vm_permissions_fail_closed(
&self,
vm_id: &str,
original_permissions: &PermissionsPolicy,
context: &str,
operation_error: &SidecarError,
) -> Result<(), SidecarError> {
match self.set_vm_permissions(vm_id, original_permissions) {
Ok(()) => Ok(()),
Err(restore_error) => {
let deny_all = PermissionsPolicy::deny_all();
match self.set_vm_permissions(vm_id, &deny_all) {
Ok(()) => Err(SidecarError::InvalidState(format!(
"{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; applied deny-all fallback"
))),
Err(deny_all_error) => panic!(
"{context} failed: {operation_error}; restoring original permissions failed: {restore_error}; deny-all fallback failed: {deny_all_error}"
),
}
}
}
}
pub(crate) fn clear_vm_permissions(&self, vm_id: &str) -> Result<(), SidecarError> {
let mut stored = self.permissions.lock().map_err(|_| {
SidecarError::Bridge(String::from(
"native sidecar permission policy lock poisoned",
))
})?;
stored.remove(vm_id);
Ok(())
}
pub(crate) fn static_permission_decision(
&self,
vm_id: &str,
capability: &str,
domain: &str,
resource: Option<&str>,
) -> Option<PermissionDecision> {
let stored = self.permissions.lock().ok()?;
let permissions = stored.get(vm_id)?;
let mode = evaluate_permissions_policy(permissions, domain, capability, resource);
Some(permission_mode_to_kernel_decision(mode, capability))
}
}
pub(crate) fn evaluate_permissions_policy(
permissions: &PermissionsPolicy,
domain: &str,
capability: &str,
resource: Option<&str>,
) -> PermissionMode {
match domain {
"fs" => evaluate_fs_permission_scope(
permissions.fs.as_ref(),
capability_operation(capability, domain),
resource,
),
"network" => evaluate_pattern_permission_scope(
permissions.network.as_ref(),
capability_operation(capability, domain),
resource,
),
"child_process" => evaluate_pattern_permission_scope(
permissions.child_process.as_ref(),
capability_operation(capability, domain),
resource,
),
"process" => evaluate_pattern_permission_scope(
permissions.process.as_ref(),
capability_operation(capability, domain),
resource,
),
"env" => evaluate_pattern_permission_scope(
permissions.env.as_ref(),
capability_operation(capability, domain),
resource,
),
"binding" => evaluate_pattern_permission_scope(
permissions.binding.as_ref(),
capability_operation(capability, domain),
resource,
),
_ => PermissionMode::Deny,
}
}
fn evaluate_fs_permission_scope(
scope: Option<&FsPermissionScope>,
operation: &str,
resource: Option<&str>,
) -> PermissionMode {
match scope {
Some(FsPermissionScope::PermissionMode(mode)) => mode.clone(),
Some(FsPermissionScope::FsPermissionRuleSet(rules)) => {
let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
for rule in &rules.rules {
if fs_rule_matches(rule, operation, resource) {
mode = rule.mode.clone();
}
}
mode
}
None => PermissionMode::Deny,
}
}
fn evaluate_pattern_permission_scope(
scope: Option<&PatternPermissionScope>,
operation: &str,
resource: Option<&str>,
) -> PermissionMode {
match scope {
Some(PatternPermissionScope::PermissionMode(mode)) => mode.clone(),
Some(PatternPermissionScope::PatternPermissionRuleSet(rules)) => {
let mut mode = rules.default.clone().unwrap_or(PermissionMode::Deny);
for rule in &rules.rules {
if pattern_rule_matches(rule, operation, resource) {
mode = rule.mode.clone();
}
}
mode
}
None => PermissionMode::Deny,
}
}
fn fs_rule_matches(
rule: &crate::protocol::FsPermissionRule,
operation: &str,
resource: Option<&str>,
) -> bool {
let operations_match = permission_operation_matches(&rule.operations, operation);
let paths_match = permission_resource_matches(&rule.paths, resource);
operations_match && paths_match
}
fn pattern_rule_matches(
rule: &PatternPermissionRule,
operation: &str,
resource: Option<&str>,
) -> bool {
let operations_match = permission_operation_matches(&rule.operations, operation);
let patterns_match = permission_resource_matches(&rule.patterns, resource);
operations_match && patterns_match
}
fn permission_operation_matches(candidates: &[String], operation: &str) -> bool {
candidates
.iter()
.any(|candidate| candidate == "*" || candidate == operation)
}
fn permission_resource_matches(patterns: &[String], resource: Option<&str>) -> bool {
resource.is_some_and(|value| {
patterns
.iter()
.any(|pattern| permission_glob_matches(pattern, value))
})
}
pub(crate) fn validate_permissions_policy(
permissions: &PermissionsPolicy,
) -> Result<(), SidecarError> {
if let Some(scope) = permissions.fs.as_ref() {
validate_fs_permission_scope("fs", scope)?;
}
if let Some(scope) = permissions.network.as_ref() {
validate_pattern_permission_scope("network", scope)?;
}
if let Some(scope) = permissions.child_process.as_ref() {
validate_pattern_permission_scope("child_process", scope)?;
}
if let Some(scope) = permissions.process.as_ref() {
validate_pattern_permission_scope("process", scope)?;
}
if let Some(scope) = permissions.env.as_ref() {
validate_pattern_permission_scope("env", scope)?;
}
if let Some(scope) = permissions.binding.as_ref() {
validate_pattern_permission_scope("binding", scope)?;
}
Ok(())
}
fn validate_fs_permission_scope(
domain: &str,
scope: &FsPermissionScope,
) -> Result<(), SidecarError> {
let FsPermissionScope::FsPermissionRuleSet(rule_set) = scope else {
return Ok(());
};
for (index, rule) in rule_set.rules.iter().enumerate() {
validate_permission_rule_field(
&rule.operations,
&format!("{domain}.rules[{index}].operations"),
)?;
validate_permission_rule_field(&rule.paths, &format!("{domain}.rules[{index}].paths"))?;
}
Ok(())
}
fn validate_pattern_permission_scope(
domain: &str,
scope: &PatternPermissionScope,
) -> Result<(), SidecarError> {
let PatternPermissionScope::PatternPermissionRuleSet(rule_set) = scope else {
return Ok(());
};
for (index, rule) in rule_set.rules.iter().enumerate() {
validate_permission_rule_field(
&rule.operations,
&format!("{domain}.rules[{index}].operations"),
)?;
validate_permission_rule_field(
&rule.patterns,
&format!("{domain}.rules[{index}].patterns"),
)?;
}
Ok(())
}
fn validate_permission_rule_field(values: &[String], field: &str) -> Result<(), SidecarError> {
if values.is_empty() {
return Err(SidecarError::InvalidState(format!(
"invalid permissions policy: {field} must not be empty; use [\"*\"] for wildcard"
)));
}
Ok(())
}
fn capability_operation<'a>(capability: &'a str, domain: &str) -> &'a str {
capability
.strip_prefix(domain)
.and_then(|value| value.strip_prefix('.'))
.unwrap_or("")
}
fn permission_mode_to_kernel_decision(
mode: PermissionMode,
capability: &str,
) -> PermissionDecision {
match mode {
PermissionMode::Allow => PermissionDecision::allow(),
PermissionMode::Ask => {
PermissionDecision::deny(format!("permission prompt required for {capability}"))
}
PermissionMode::Deny => PermissionDecision::deny(format!("blocked by {capability} policy")),
}
}
pub(crate) fn filesystem_permission_capability(access: FilesystemAccess) -> &'static str {
match access {
FilesystemAccess::Read => "fs.read",
FilesystemAccess::Write => "fs.write",
FilesystemAccess::Stat => "fs.stat",
FilesystemAccess::ReadDir => "fs.readdir",
FilesystemAccess::CreateDir => "fs.create_dir",
FilesystemAccess::Remove => "fs.rm",
FilesystemAccess::Rename => "fs.rename",
FilesystemAccess::Symlink => "fs.symlink",
FilesystemAccess::ReadLink => "fs.readlink",
FilesystemAccess::Chmod => "fs.chmod",
FilesystemAccess::Truncate => "fs.truncate",
}
}
fn network_permission_capability(operation: NetworkOperation) -> &'static str {
match operation {
NetworkOperation::Fetch => "network.fetch",
NetworkOperation::Http => "network.http",
NetworkOperation::Dns => "network.dns",
NetworkOperation::Listen => "network.listen",
}
}
fn environment_permission_capability(operation: EnvironmentOperation) -> &'static str {
match operation {
EnvironmentOperation::Read => "env.read",
EnvironmentOperation::Write => "env.write",
}
}
fn is_internal_runtime_command_request(request: &CommandAccessRequest) -> bool {
match request.command.as_str() {
"node" => request
.env
.keys()
.any(|key| INTERNAL_JAVASCRIPT_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
"wasm" => request
.env
.keys()
.any(|key| INTERNAL_WASM_ENTRYPOINT_ENV_KEYS.contains(&key.as_str())),
"python" => request.env.keys().any(|key| {
INTERNAL_PYTHON_ENTRYPOINT_ENV_PREFIXES
.iter()
.any(|prefix| key.starts_with(prefix))
}),
_ => false,
}
}
fn ownership_matches_process_event(
ownership: &OwnershipScope,
event: &ProcessEventEnvelope,
) -> bool {
match ownership {
OwnershipScope::ConnectionOwnership(inner) => inner.connection_id == event.connection_id,
OwnershipScope::SessionOwnership(inner) => {
inner.connection_id == event.connection_id && inner.session_id == event.session_id
}
OwnershipScope::VmOwnership(inner) => {
inner.connection_id == event.connection_id
&& inner.session_id == event.session_id
&& inner.vm_id == event.vm_id
}
}
}
fn public_process_event_matches_ownership<B>(
sidecar: &NativeSidecar<B>,
ownership: &OwnershipScope,
event: &ProcessEventEnvelope,
) -> bool
where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
if !ownership_matches_process_event(ownership, event) {
return false;
}
if event.process_id.contains('/') {
return false;
}
// Stale queued events must still be drained through handle_process_event_envelope()
// so the sidecar can emit the expected fail-closed log when teardown wins the race.
let _ = sidecar;
true
}
fn poll_future_once<F: std::future::Future>(future: std::pin::Pin<&mut F>) -> Option<F::Output> {
let mut context = Context::from_waker(Waker::noop());
match future.poll(&mut context) {
Poll::Ready(output) => Some(output),
Poll::Pending => None,
}
}
// ConnectionState, SessionState, VmConfiguration, VmState moved to crate::state
// JavascriptSocketPathContext, JavascriptSocketFamily, VmListenPolicy moved to crate::state
impl JavascriptSocketPathContext {
pub(crate) fn loopback_port_allowed(&self, port: u16) -> bool {
self.loopback_exempt_ports.contains(&port)
|| self
.tcp_loopback_guest_to_host_ports
.keys()
.any(|(_, guest_port)| *guest_port == port)
}
pub(crate) fn translate_tcp_loopback_port(
&self,
family: JavascriptSocketFamily,
port: u16,
) -> Option<u16> {
self.tcp_loopback_guest_to_host_ports
.get(&(family, port))
.copied()
}
pub(crate) fn http_loopback_target(
&self,
family: JavascriptSocketFamily,
port: u16,
) -> Option<&crate::state::JavascriptHttpLoopbackTarget> {
self.http_loopback_targets.get(&(family, port))
}
pub(crate) fn translate_udp_loopback_port(
&self,
family: JavascriptSocketFamily,
port: u16,
) -> Option<u16> {
self.udp_loopback_guest_to_host_ports
.get(&(family, port))
.copied()
}
pub(crate) fn guest_udp_port_for_host_port(
&self,
family: JavascriptSocketFamily,
port: u16,
) -> Option<u16> {
self.udp_loopback_host_to_guest_ports
.get(&(family, port))
.copied()
}
}
// ActiveProcess, NetworkResourceCounts moved to crate::state
pub struct NativeSidecar<B> {
pub(crate) config: NativeSidecarConfig,
pub(crate) bridge: SharedBridge<B>,
pub(crate) mount_plugins: FileSystemPluginRegistry<MountPluginContext<B>>,
pub(crate) cache_root: PathBuf,
pub(crate) javascript_engine: JavascriptExecutionEngine,
pub(crate) python_engine: PythonExecutionEngine,
pub(crate) wasm_engine: WasmExecutionEngine,
pub(crate) next_connection_id: usize,
pub(crate) next_session_id: usize,
pub(crate) next_vm_id: usize,
pub(crate) next_sidecar_request_id: RequestId,
pub(crate) connections: BTreeMap<String, ConnectionState>,
pub(crate) sessions: BTreeMap<String, SessionState>,
pub(crate) vms: BTreeMap<String, VmState>,
#[allow(dead_code)]
pub(crate) process_event_sender: Sender<ProcessEventEnvelope>,
pub(crate) process_event_receiver: Option<Receiver<ProcessEventEnvelope>>,
pub(crate) pending_process_events: VecDeque<ProcessEventEnvelope>,
pub(crate) pending_sidecar_responses: SidecarResponseTracker,
pub(crate) outbound_sidecar_requests: VecDeque<SidecarRequestFrame>,
pub(crate) completed_sidecar_responses: BTreeMap<RequestId, SidecarResponseFrame>,
pub(crate) completed_sidecar_response_order: VecDeque<RequestId>,
pub(crate) completed_sidecar_responses_gauge: Arc<QueueGauge>,
pub(crate) pending_process_events_gauge: Arc<QueueGauge>,
pub(crate) pending_sidecar_responses_gauge: Arc<QueueGauge>,
pub(crate) outbound_sidecar_requests_gauge: Arc<QueueGauge>,
pub(crate) sidecar_requests: SharedSidecarRequestClient,
pub(crate) extensions: BTreeMap<String, Arc<dyn Extension>>,
pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>,
pub(crate) extension_process_output_buffers:
BTreeMap<(String, String), ExtensionBufferedProcessOutput>,
}
#[derive(Debug)]
pub(crate) struct ExtensionSessionResources {
pub(crate) ownership: OwnershipScope,
pub(crate) process_ids: BTreeSet<String>,
pub(crate) vm_ids: BTreeSet<String>,
}
impl<B> fmt::Debug for NativeSidecar<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NativeSidecar")
.field("config", &self.config)
.field("cache_root", &self.cache_root)
.field("next_connection_id", &self.next_connection_id)
.field("next_session_id", &self.next_session_id)
.field("next_vm_id", &self.next_vm_id)
.field("connection_count", &self.connections.len())
.field("session_count", &self.sessions.len())
.field("vm_count", &self.vms.len())
.field("extension_session_count", &self.extension_sessions.len())
.field(
"extension_process_output_buffer_count",
&self.extension_process_output_buffers.len(),
)
.finish()
}
}
impl<B> NativeSidecar<B>
where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
pub fn new(bridge: B) -> Result<Self, SidecarError> {
Self::with_config(bridge, NativeSidecarConfig::default())
}
pub fn with_config(bridge: B, config: NativeSidecarConfig) -> Result<Self, SidecarError> {
if matches!(config.expected_auth_token.as_deref(), Some("")) {
return Err(SidecarError::InvalidState(String::from(
"native sidecar expected_auth_token must not be empty",
)));
}
let cache_root = config.compile_cache_root.clone().unwrap_or_else(|| {
std::env::temp_dir().join(format!(
"{}-{}",
config.sidecar_id,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before unix epoch")
.as_nanos()
))
});
fs::create_dir_all(&cache_root).map_err(|error| {
SidecarError::Io(format!("failed to prepare sidecar cache root: {error}"))
})?;
let bridge = SharedBridge::new(bridge);
let mount_plugins = build_mount_plugin_registry::<B>()?;
let (process_event_sender, process_event_receiver) = channel(MAX_PROCESS_EVENT_QUEUE);
Ok(Self {
config,
bridge,
mount_plugins,
cache_root,
javascript_engine: JavascriptExecutionEngine::default(),
python_engine: PythonExecutionEngine::default(),
wasm_engine: WasmExecutionEngine::default(),
next_connection_id: 0,
next_session_id: 0,
next_vm_id: 0,
next_sidecar_request_id: -1,
connections: BTreeMap::new(),
sessions: BTreeMap::new(),
vms: BTreeMap::new(),
process_event_sender,
process_event_receiver: Some(process_event_receiver),
pending_process_events: VecDeque::new(),
pending_sidecar_responses: SidecarResponseTracker::default(),
outbound_sidecar_requests: VecDeque::new(),
completed_sidecar_responses: BTreeMap::new(),
completed_sidecar_response_order: VecDeque::new(),
completed_sidecar_responses_gauge: register_queue(
TrackedLimit::CompletedSidecarResponses,
MAX_COMPLETED_SIDECAR_RESPONSES,
),
pending_process_events_gauge: register_queue(
TrackedLimit::PendingProcessEvents,
MAX_PROCESS_EVENT_QUEUE,
),
pending_sidecar_responses_gauge: register_queue(
TrackedLimit::PendingSidecarResponses,
MAX_PENDING_SIDECAR_RESPONSES,
),
outbound_sidecar_requests_gauge: register_queue(
TrackedLimit::OutboundSidecarRequests,
MAX_OUTBOUND_SIDECAR_REQUESTS,
),
sidecar_requests: SharedSidecarRequestClient::default(),
extensions: BTreeMap::new(),
extension_sessions: BTreeMap::new(),
extension_process_output_buffers: BTreeMap::new(),
})
}
pub fn with_config_and_extensions(
bridge: B,
config: NativeSidecarConfig,
extensions: Vec<Box<dyn Extension>>,
) -> Result<Self, SidecarError> {
let mut sidecar = Self::with_config(bridge, config)?;
for extension in extensions {
sidecar.register_extension(extension)?;
}
Ok(sidecar)
}
pub(crate) fn prune_extension_process_resource(&mut self, process_id: &str) {
self.extension_sessions.retain(|_, resources| {
resources.process_ids.remove(process_id);
!resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
});
}
pub(crate) fn prune_extension_vm_resource(&mut self, vm_id: &str) {
self.extension_sessions.retain(|_, resources| {
if matches!(
&resources.ownership,
OwnershipScope::VmOwnership(inner) if inner.vm_id == vm_id
) {
resources.process_ids.clear();
}
resources.vm_ids.remove(vm_id);
!resources.process_ids.is_empty() || !resources.vm_ids.is_empty()
});
}
pub(crate) fn capture_extension_process_output_event(
&mut self,
vm_id: &str,
process_id: &str,
event: &ActiveExecutionEvent,
) -> bool {
let Some(buffer) = self
.extension_process_output_buffers
.get_mut(&(vm_id.to_string(), process_id.to_string()))
else {
return false;
};
match event {
ActiveExecutionEvent::Stdout(chunk) => {
buffer.append_stdout(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
true
}
ActiveExecutionEvent::Stderr(chunk) => {
buffer.append_stderr(chunk, DEFAULT_ACP_STDOUT_BUFFER_BYTE_LIMIT);
true
}
ActiveExecutionEvent::JavascriptSyncRpcRequest(_)
| ActiveExecutionEvent::PythonVfsRpcRequest(_)
| ActiveExecutionEvent::SignalState { .. }
| ActiveExecutionEvent::Exited(_) => false,
}
}
fn bind_extension_process_resource(
&mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
process_id: String,
) -> Result<(), SidecarError> {
if ext_session_id.is_empty() {
return Err(SidecarError::InvalidState(String::from(
"extension session id must not be empty",
)));
}
let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
let process_exists = self
.vms
.get(&vm_id)
.is_some_and(|vm| vm.active_processes.contains_key(&process_id));
if !process_exists {
return Err(SidecarError::InvalidState(format!(
"VM {vm_id} has no active process {process_id}"
)));
}
let key = (namespace, ext_session_id);
if let Some(resources) = self.extension_sessions.get_mut(&key) {
if resources.ownership != ownership {
return Err(SidecarError::InvalidState(String::from(
"extension session ownership did not match existing resources",
)));
}
resources.process_ids.insert(process_id);
} else {
self.extension_sessions.insert(
key,
ExtensionSessionResources {
ownership,
process_ids: BTreeSet::from([process_id]),
vm_ids: BTreeSet::new(),
},
);
}
Ok(())
}
fn bind_extension_vm_resource(
&mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
) -> Result<(), SidecarError> {
if ext_session_id.is_empty() {
return Err(SidecarError::InvalidState(String::from(
"extension session id must not be empty",
)));
}
let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
let key = (namespace, ext_session_id);
if let Some(resources) = self.extension_sessions.get_mut(&key) {
if resources.ownership != ownership {
return Err(SidecarError::InvalidState(String::from(
"extension session ownership did not match existing resources",
)));
}
resources.vm_ids.insert(vm_id);
} else {
self.extension_sessions.insert(
key,
ExtensionSessionResources {
ownership,
process_ids: BTreeSet::new(),
vm_ids: BTreeSet::from([vm_id]),
},
);
}
Ok(())
}
pub fn sidecar_id(&self) -> &str {
&self.config.sidecar_id
}
pub fn with_bridge_mut<T>(
&self,
operation: impl FnOnce(&mut B) -> T,
) -> Result<T, SidecarError> {
self.bridge.inspect(operation)
}
pub fn set_sidecar_request_transport(&mut self, transport: Arc<dyn SidecarRequestTransport>) {
self.sidecar_requests.set_transport(transport);
}
pub fn register_extension(
&mut self,
extension: Box<dyn Extension>,
) -> Result<(), SidecarError> {
let namespace = extension.namespace().to_owned();
if namespace.is_empty() {
return Err(SidecarError::InvalidState(String::from(
"extension namespace must not be empty",
)));
}
if self.extensions.contains_key(&namespace) {
return Err(SidecarError::Conflict(format!(
"extension namespace {namespace} is already registered",
)));
}
self.extensions.insert(namespace, Arc::from(extension));
Ok(())
}
pub fn set_sidecar_request_handler<F>(&mut self, handler: F)
where
F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
+ Send
+ Sync
+ 'static,
{
struct HandlerTransport<F>(F);
impl<F> SidecarRequestTransport for HandlerTransport<F>
where
F: Fn(SidecarRequestFrame) -> Result<SidecarResponsePayload, SidecarError>
+ Send
+ Sync
+ 'static,
{
fn send_request(
&self,
request: SidecarRequestFrame,
_timeout: Duration,
) -> Result<SidecarResponseFrame, SidecarError> {
let payload = (self.0)(request.clone())?;
Ok(SidecarResponseFrame::new(
request.request_id,
request.ownership,
payload,
))
}
}
self.set_sidecar_request_transport(Arc::new(HandlerTransport(handler)));
}
pub fn set_wire_sidecar_request_handler<F>(&mut self, handler: F)
where
F: Fn(
crate::wire::SidecarRequestFrame,
) -> Result<crate::wire::SidecarResponseFrame, SidecarError>
+ Send
+ Sync
+ 'static,
{
self.set_sidecar_request_handler(move |request| {
let request = crate::wire::sidecar_request_frame_from_compat(request)
.map_err(wire_protocol_error)?;
let response = handler(request)?;
let response = crate::wire::sidecar_response_frame_to_compat(response)
.map_err(wire_protocol_error)?;
Ok(response.payload)
});
}
pub(crate) fn queue_pending_process_event(
&mut self,
envelope: ProcessEventEnvelope,
) -> Result<(), SidecarError> {
if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
return Err(process_event_queue_overflow_error());
}
self.pending_process_events.push_back(envelope);
self.pending_process_events_gauge
.observe_depth(self.pending_process_events.len());
Ok(())
}
pub(crate) fn queue_front_pending_process_event(
&mut self,
envelope: ProcessEventEnvelope,
) -> Result<(), SidecarError> {
if self.pending_process_events.len() >= MAX_PROCESS_EVENT_QUEUE {
return Err(process_event_queue_overflow_error());
}
self.pending_process_events.push_front(envelope);
self.pending_process_events_gauge
.observe_depth(self.pending_process_events.len());
Ok(())
}
pub(crate) fn pending_process_event_capacity(&self) -> usize {
MAX_PROCESS_EVENT_QUEUE.saturating_sub(self.pending_process_events.len())
}
pub fn dispatch_blocking(
&mut self,
request: RequestFrame,
) -> Result<DispatchResult, SidecarError> {
let inside_runtime = tokio::runtime::Handle::try_current().is_ok();
if matches!(
request.payload,
RequestPayload::DisposeVm(_) | RequestPayload::Ext(_)
) && !inside_runtime
{
return tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar dispatch runtime")
.block_on(self.dispatch(request));
}
let mut future = std::pin::pin!(self.dispatch(request));
match poll_future_once(future.as_mut()) {
Some(result) => result,
None if inside_runtime => Err(SidecarError::InvalidState(String::from(
"dispatch_blocking cannot wait for an async sidecar request inside a Tokio runtime; use dispatch().await",
))),
None => tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar dispatch runtime")
.block_on(future),
}
}
pub fn dispatch_wire_blocking(
&mut self,
request: crate::wire::RequestFrame,
) -> Result<crate::wire::WireDispatchResult, SidecarError> {
let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
let result = self.dispatch_blocking(request)?;
crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
}
pub fn poll_event_blocking(
&mut self,
ownership: &OwnershipScope,
timeout: Duration,
) -> Result<Option<EventFrame>, SidecarError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar poll runtime")
.block_on(self.poll_event(ownership, timeout))
}
pub fn poll_event_wire_blocking(
&mut self,
ownership: &crate::wire::OwnershipScope,
timeout: Duration,
) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
self.poll_event_blocking(&ownership, timeout)?
.map(crate::wire::event_frame_from_compat)
.transpose()
.map_err(wire_protocol_error)
}
pub fn close_session_blocking(
&mut self,
connection_id: &str,
session_id: &str,
) -> Result<Vec<EventFrame>, SidecarError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar close-session runtime")
.block_on(self.close_session(connection_id, session_id))
}
pub fn remove_connection_blocking(
&mut self,
connection_id: &str,
) -> Result<Vec<EventFrame>, SidecarError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar remove-connection runtime")
.block_on(self.remove_connection(connection_id))
}
pub fn dispose_vm_internal_blocking(
&mut self,
connection_id: &str,
session_id: &str,
vm_id: &str,
reason: DisposeReason,
) -> Result<Vec<EventFrame>, SidecarError> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("sidecar dispose-vm runtime")
.block_on(self.dispose_vm_internal(connection_id, session_id, vm_id, reason))
}
pub async fn dispatch(
&mut self,
request: RequestFrame,
) -> Result<DispatchResult, SidecarError> {
if let Err(error) = self.ensure_request_within_frame_limit(&request) {
return Ok(DispatchResult {
response: self.reject(&request, error_code(&error), &error.to_string()),
events: Vec::new(),
});
}
let result = match request.payload.clone() {
RequestPayload::Authenticate(payload) => {
self.authenticate_connection(&request, payload).await
}
RequestPayload::OpenSession(payload) => self.open_session(&request, payload).await,
RequestPayload::CreateVm(payload) => self.create_vm(&request, payload).await,
RequestPayload::DisposeVm(payload) => self.dispose_vm(&request, payload).await,
RequestPayload::BootstrapRootFilesystem(payload) => {
self.bootstrap_root_filesystem(&request, payload.entries)
.await
}
RequestPayload::ConfigureVm(payload) => self.configure_vm(&request, payload).await,
RequestPayload::RegisterHostCallbacks(payload) => {
register_host_callbacks(self, &request, payload)
}
RequestPayload::CreateLayer(payload) => self.create_layer(&request, payload).await,
RequestPayload::SealLayer(payload) => self.seal_layer(&request, payload).await,
RequestPayload::ImportSnapshot(payload) => {
self.import_snapshot(&request, payload).await
}
RequestPayload::ExportSnapshot(payload) => {
self.export_snapshot(&request, payload).await
}
RequestPayload::CreateOverlay(payload) => self.create_overlay(&request, payload).await,
RequestPayload::GuestFilesystemCall(payload) => {
self.guest_filesystem_call(&request, payload).await
}
RequestPayload::SnapshotRootFilesystem(payload) => {
self.snapshot_root_filesystem(&request, payload).await
}
RequestPayload::Execute(payload) => self.execute(&request, payload).await,
RequestPayload::WriteStdin(payload) => self.write_stdin(&request, payload).await,
RequestPayload::CloseStdin(payload) => self.close_stdin(&request, payload).await,
RequestPayload::KillProcess(payload) => self.kill_process(&request, payload).await,
RequestPayload::GetProcessSnapshot(payload) => {
self.get_process_snapshot(&request, payload).await
}
RequestPayload::FindListener(payload) => self.find_listener(&request, payload).await,
RequestPayload::FindBoundUdp(payload) => self.find_bound_udp(&request, payload).await,
RequestPayload::VmFetch(payload) => self.vm_fetch(&request, payload).await,
RequestPayload::GetSignalState(payload) => {
self.get_signal_state(&request, payload).await
}
RequestPayload::GetZombieTimerCount(payload) => {
self.get_zombie_timer_count(&request, payload).await
}
RequestPayload::HostFilesystemCall(_)
| RequestPayload::PersistenceLoad(_)
| RequestPayload::PersistenceFlush(_) => Ok(DispatchResult {
response: self.reject(
&request,
"unsupported_direction",
"host callback request categories are sidecar-to-host only in this scaffold",
),
events: Vec::new(),
}),
RequestPayload::Ext(payload) => {
self.dispatch_extension_request(&request, payload).await
}
};
match result {
Ok(dispatch) => Ok(dispatch),
Err(error @ SidecarError::Io(_)) => Err(error),
Err(error) => Ok(DispatchResult {
response: self.reject(&request, error_code(&error), &error.to_string()),
events: Vec::new(),
}),
}
}
pub async fn dispatch_wire(
&mut self,
request: crate::wire::RequestFrame,
) -> Result<crate::wire::WireDispatchResult, SidecarError> {
let request = crate::wire::request_frame_to_compat(request).map_err(wire_protocol_error)?;
let result = self.dispatch(request).await?;
crate::wire::dispatch_result_from_compat(result).map_err(wire_protocol_error)
}
pub async fn poll_event_wire(
&mut self,
ownership: &crate::wire::OwnershipScope,
timeout: Duration,
) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
let ownership = crate::wire::ownership_scope_to_compat(ownership.clone());
self.poll_event(&ownership, timeout)
.await?
.map(crate::wire::event_frame_from_compat)
.transpose()
.map_err(wire_protocol_error)
}
async fn dispatch_extension_request(
&mut self,
request: &RequestFrame,
envelope: ExtEnvelope,
) -> Result<DispatchResult, SidecarError> {
let namespace = envelope.namespace;
let Some(extension) = self.extensions.get(&namespace).cloned() else {
return Ok(DispatchResult {
response: self.reject(
request,
"unknown_extension",
&format!("no extension registered for namespace {namespace}"),
),
events: Vec::new(),
});
};
let snapshot = ExtensionSnapshot::new(
namespace.clone(),
request.ownership.clone(),
self.sidecar_requests.clone(),
);
let ctx = ExtensionContext::new(snapshot, self);
let response = extension.handle_request(ctx, envelope.payload).await?;
Ok(DispatchResult {
response: self.respond(
request,
ResponsePayload::ExtResult(ExtEnvelope {
namespace,
payload: response.payload,
}),
),
events: response.events,
})
}
pub async fn poll_event(
&mut self,
ownership: &OwnershipScope,
timeout: Duration,
) -> Result<Option<EventFrame>, SidecarError> {
let deadline = Instant::now() + timeout;
loop {
if let Some(index) = self
.pending_process_events
.iter()
.position(|event| public_process_event_matches_ownership(self, ownership, event))
{
let Some(envelope) = self.pending_process_events.remove(index) else {
continue;
};
if let Some(frame) = self.handle_process_event_envelope(envelope)? {
return Ok(Some(frame));
}
continue;
}
if !timeout.is_zero() {
let _ = self.pump_process_events(ownership).await?;
}
let queued_envelopes = {
let pending_capacity = self.pending_process_event_capacity();
let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
SidecarError::InvalidState(String::from("process event receiver unavailable"))
})?;
let mut queued = Vec::new();
loop {
if queued.len() >= pending_capacity {
if receiver.is_empty() {
break;
}
return Err(process_event_queue_overflow_error());
}
match receiver.try_recv() {
Ok(envelope) => queued.push(envelope),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
}
}
queued
};
let mut matching_envelope = None;
for envelope in queued_envelopes {
if matching_envelope.is_none()
&& public_process_event_matches_ownership(self, ownership, &envelope)
{
matching_envelope = Some(envelope);
} else {
self.queue_pending_process_event(envelope)?;
}
}
if let Some(envelope) = matching_envelope {
if let Some(frame) = self.handle_process_event_envelope(envelope)? {
return Ok(Some(frame));
}
continue;
}
if Instant::now() >= deadline {
return Ok(None);
}
let remaining = deadline.saturating_duration_since(Instant::now());
time::sleep(remaining.min(Duration::from_millis(10))).await;
}
}
pub(crate) fn handle_process_event_envelope(
&mut self,
envelope: ProcessEventEnvelope,
) -> Result<Option<EventFrame>, SidecarError> {
let ProcessEventEnvelope {
connection_id,
session_id,
vm_id,
process_id,
event,
} = envelope;
if matches!(event, ActiveExecutionEvent::Exited(_)) {
let mut trailing = Vec::new();
let mut deferred = VecDeque::new();
while let Some(pending) = self.pending_process_events.pop_front() {
if pending.vm_id == vm_id
&& pending.process_id == process_id
&& !matches!(pending.event, ActiveExecutionEvent::Exited(_))
{
trailing.push(pending.event);
} else {
deferred.push_back(pending);
}
}
self.pending_process_events = deferred;
let drain_limit = self
.pending_process_event_capacity()
.saturating_sub(trailing.len().saturating_add(1));
trailing.extend(
self.drain_process_events_blocking_with_limit(&vm_id, &process_id, drain_limit)?
.into_iter()
.filter(|event| !matches!(event, ActiveExecutionEvent::Exited(_))),
);
if !trailing.is_empty() {
if self.pending_process_event_capacity() < trailing.len() {
return Err(process_event_queue_overflow_error());
}
let emit_now = if self.pending_process_event_capacity() == trailing.len() {
Some(trailing.remove(0))
} else {
None
};
self.queue_front_pending_process_event(ProcessEventEnvelope {
connection_id: connection_id.clone(),
session_id: session_id.clone(),
vm_id: vm_id.clone(),
process_id: process_id.clone(),
event,
})?;
for event in trailing.into_iter().rev() {
self.queue_front_pending_process_event(ProcessEventEnvelope {
connection_id: connection_id.clone(),
session_id: session_id.clone(),
vm_id: vm_id.clone(),
process_id: process_id.clone(),
event,
})?;
}
if let Some(event) = emit_now {
return self.handle_execution_event(&vm_id, &process_id, event);
}
return Ok(None);
}
}
self.handle_execution_event(&vm_id, &process_id, event)
}
// try_poll_event moved to crate::execution
pub async fn close_session(
&mut self,
connection_id: &str,
session_id: &str,
) -> Result<Vec<EventFrame>, SidecarError> {
self.dispose_session(connection_id, session_id, DisposeReason::Requested)
.await
}
pub async fn remove_connection(
&mut self,
connection_id: &str,
) -> Result<Vec<EventFrame>, SidecarError> {
self.require_authenticated_connection(connection_id)?;
let session_ids = self
.connections
.get(connection_id)
.expect("authenticated connection should exist")
.sessions
.iter()
.cloned()
.collect::<Vec<_>>();
let mut events = Vec::new();
for session_id in session_ids {
events.extend(
self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed)
.await?,
);
}
self.connections.remove(connection_id);
Ok(events)
}
async fn authenticate_connection(
&mut self,
request: &RequestFrame,
payload: crate::protocol::AuthenticateRequest,
) -> Result<DispatchResult, SidecarError> {
let _ = self.connection_id_for(&request.ownership)?;
if let Err(error) = self.validate_auth_token(&payload.auth_token) {
let mut fields = audit_fields([
(String::from("source"), payload.client_name.clone()),
(String::from("reason"), error.to_string()),
]);
if let OwnershipScope::ConnectionOwnership(inner) = &request.ownership {
fields.insert(String::from("connection_id"), inner.connection_id.clone());
}
emit_security_audit_event(
&self.bridge,
&self.config.sidecar_id,
"security.auth.failed",
fields,
);
return Err(error);
}
if payload.protocol_version != crate::wire::PROTOCOL_VERSION {
return Err(SidecarError::ProtocolVersionMismatch(format!(
"sidecar protocol version mismatch: expected {}, got {}",
crate::wire::PROTOCOL_VERSION,
payload.protocol_version
)));
}
let expected_bridge_version = secure_exec_bridge::bridge_contract().version;
if payload.bridge_version != expected_bridge_version {
return Err(SidecarError::BridgeVersionMismatch(format!(
"bridge contract version mismatch: expected {expected_bridge_version}, got {}",
payload.bridge_version
)));
}
let connection_id = self.allocate_connection_id();
self.connections.insert(
connection_id.clone(),
ConnectionState {
auth_token: payload.auth_token,
sessions: BTreeSet::new(),
},
);
let response = self.response_with_ownership(
request.request_id,
OwnershipScope::connection(&connection_id),
ResponsePayload::Authenticated(AuthenticatedResponse {
sidecar_id: self.config.sidecar_id.clone(),
connection_id,
max_frame_bytes: self.config.max_frame_bytes as u32,
}),
);
Ok(DispatchResult {
response,
events: Vec::new(),
})
}
async fn open_session(
&mut self,
request: &RequestFrame,
payload: OpenSessionRequest,
) -> Result<DispatchResult, SidecarError> {
let connection_id = self.connection_id_for(&request.ownership)?;
self.require_authenticated_connection(&connection_id)?;
self.next_session_id += 1;
let session_id = format!("session-{}", self.next_session_id);
self.sessions.insert(
session_id.clone(),
SessionState {
connection_id: connection_id.clone(),
placement: payload.placement,
metadata: payload.metadata.into_iter().collect(),
vm_ids: BTreeSet::new(),
},
);
self.connections
.get_mut(&connection_id)
.expect("authenticated connection should exist")
.sessions
.insert(session_id.clone());
Ok(DispatchResult {
response: self.respond(
request,
ResponsePayload::SessionOpened(SessionOpenedResponse {
session_id,
owner_connection_id: connection_id,
}),
),
events: Vec::new(),
})
}
// create_vm, dispose_vm, bootstrap_root_filesystem, configure_vm moved to crate::vm
async fn guest_filesystem_call(
&mut self,
request: &RequestFrame,
payload: GuestFilesystemCallRequest,
) -> Result<DispatchResult, SidecarError> {
filesystem_guest_filesystem_call(self, request, payload).await
}
// snapshot_root_filesystem moved to crate::vm
// execute, write_stdin, close_stdin, kill_process, find_listener, find_bound_udp,
// get_signal_state, get_zombie_timer_count moved to crate::execution
async fn dispose_session(
&mut self,
connection_id: &str,
session_id: &str,
reason: DisposeReason,
) -> Result<Vec<EventFrame>, SidecarError> {
self.require_owned_session(connection_id, session_id)?;
let vm_ids = self
.sessions
.get(session_id)
.expect("owned session should exist")
.vm_ids
.iter()
.cloned()
.collect::<Vec<_>>();
let mut events = Vec::new();
for vm_id in vm_ids {
events.extend(
self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone())
.await?,
);
}
self.sessions.remove(session_id);
if let Some(connection) = self.connections.get_mut(connection_id) {
connection.sessions.remove(session_id);
}
Ok(events)
}
// dispose_vm_internal, terminate_vm_processes, wait_for_vm_processes_to_exit moved to crate::vm
// kill_process_internal, handle_execution_event, handle_python_vfs_rpc_request,
// resolve_javascript_child_process_execution, spawn_javascript_child_process,
// poll_javascript_child_process, write_javascript_child_process_stdin,
// close_javascript_child_process_stdin, kill_javascript_child_process moved to crate::execution
pub(crate) fn handle_javascript_sync_rpc_request(
&mut self,
vm_id: &str,
process_id: &str,
request: JavascriptSyncRpcRequest,
) -> Result<(), SidecarError> {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
return Ok(());
};
if !vm.active_processes.contains_key(process_id) {
log_stale_process_event(&self.bridge, vm_id, process_id, "javascript sync RPC");
return Ok(());
}
let response: Result<Value, SidecarError> = match request.method.as_str() {
"child_process.spawn" => {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC child_process.spawn",
);
return Ok(());
};
let (payload, _) = parse_javascript_child_process_spawn_request(vm, &request.args)?;
self.spawn_javascript_child_process(vm_id, process_id, payload)
}
"child_process.spawn_sync" => {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC child_process.spawn_sync",
);
return Ok(());
};
let (payload, max_buffer) =
parse_javascript_child_process_spawn_request(vm, &request.args)?;
self.spawn_javascript_child_process_sync(vm_id, process_id, payload, max_buffer)
}
"child_process.poll" => {
let child_process_id =
javascript_sync_rpc_arg_str(&request.args, 0, "child_process.poll child id")?;
let wait_ms = javascript_sync_rpc_arg_u64_optional(
&request.args,
1,
"child_process.poll wait ms",
)?
.unwrap_or_default();
self.poll_javascript_child_process(vm_id, process_id, child_process_id, wait_ms)
}
"child_process.write_stdin" => {
let child_process_id = javascript_sync_rpc_arg_str(
&request.args,
0,
"child_process.write_stdin child id",
)?;
let chunk = javascript_sync_rpc_bytes_arg(
&request.args,
1,
"child_process.write_stdin chunk",
)?;
self.write_javascript_child_process_stdin(
vm_id,
process_id,
child_process_id,
&chunk,
)?;
Ok(Value::Null)
}
"child_process.close_stdin" => {
let child_process_id = javascript_sync_rpc_arg_str(
&request.args,
0,
"child_process.close_stdin child id",
)?;
self.close_javascript_child_process_stdin(vm_id, process_id, child_process_id)?;
Ok(Value::Null)
}
"child_process.kill" => {
let child_process_id =
javascript_sync_rpc_arg_str(&request.args, 0, "child_process.kill child id")?;
let signal =
javascript_sync_rpc_arg_str(&request.args, 1, "child_process.kill signal")?;
self.kill_javascript_child_process(vm_id, process_id, child_process_id, signal)?;
Ok(Value::Null)
}
"process.kill" => {
let target_pid =
javascript_sync_rpc_arg_i32(&request.args, 0, "process.kill target pid")?;
let signal = javascript_sync_rpc_arg_str(&request.args, 1, "process.kill signal")?;
let parsed_signal = parse_signal(signal)?;
if parsed_signal == 0 {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
};
if !vm.active_processes.contains_key(process_id) {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
}
vm.kernel
.signal_process(EXECUTION_DRIVER_NAME, target_pid, parsed_signal)
.map(|()| Value::Null)
.map_err(kernel_error)
} else if target_pid < 0 {
let caller_kernel_pid = {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
};
let Some(caller) = vm.active_processes.get(process_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
};
caller.kernel_pid
};
let pgid = target_pid.unsigned_abs();
match self.signal_vm_process_group(vm_id, caller_kernel_pid, pgid, signal) {
Ok(true) => {
Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
}
Ok(false) => Ok(Value::Null),
Err(error) => Err(error),
}
} else {
enum ProcessKillTarget {
SelfProcess,
Child(String),
TopLevel(String),
KernelPid(u32),
}
let target = {
let Some(vm) = self.vms.get(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
};
let Some(caller) = vm.active_processes.get(process_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.kill",
);
return Ok(());
};
let caller_pid = i32::try_from(caller.kernel_pid).map_err(|_| {
SidecarError::InvalidState("caller pid exceeds i32".into())
})?;
if caller_pid == target_pid {
ProcessKillTarget::SelfProcess
} else if let Some((child_process_id, _)) = caller
.child_processes
.iter()
.find(|(_, child)| i32::try_from(child.kernel_pid) == Ok(target_pid))
{
ProcessKillTarget::Child(child_process_id.clone())
} else if let Some((target_process_id, _)) =
vm.active_processes.iter().find(|(_, process)| {
i32::try_from(process.kernel_pid) == Ok(target_pid)
})
{
ProcessKillTarget::TopLevel(target_process_id.clone())
} else {
let target_kernel_pid = u32::try_from(target_pid).map_err(|_| {
SidecarError::InvalidState(format!(
"EINVAL: invalid process pid {target_pid}"
))
})?;
ProcessKillTarget::KernelPid(target_kernel_pid)
}
};
match target {
ProcessKillTarget::SelfProcess => {
Ok(self.apply_self_process_kill(vm_id, process_id, parsed_signal))
}
ProcessKillTarget::Child(child_process_id) => {
self.kill_javascript_child_process(
vm_id,
process_id,
&child_process_id,
signal,
)?;
Ok(Value::Null)
}
ProcessKillTarget::TopLevel(target_process_id) => {
self.kill_process_internal(vm_id, &target_process_id, signal)?;
Ok(Value::Null)
}
ProcessKillTarget::KernelPid(target_kernel_pid) => {
// Grandchildren and untracked kernel processes are
// resolved VM-wide instead of failing with an
// unknown-pid error.
self.signal_vm_kernel_pid(vm_id, target_kernel_pid, signal)
.map(|()| Value::Null)
}
}
}
}
"process.signal_state" => {
let signal =
javascript_sync_rpc_arg_u32(&request.args, 0, "process.signal_state signal")?;
let action =
javascript_sync_rpc_arg_str(&request.args, 1, "process.signal_state action")?;
let mask_json =
javascript_sync_rpc_arg_str(&request.args, 2, "process.signal_state mask")?;
let flags =
javascript_sync_rpc_arg_u32(&request.args, 3, "process.signal_state flags")?;
let mask: Vec<u32> = serde_json::from_str(mask_json).map_err(|error| {
SidecarError::InvalidState(format!(
"process.signal_state mask must be valid JSON: {error}"
))
})?;
let action = match action.trim().to_ascii_lowercase().as_str() {
"default" => SignalDispositionAction::Default,
"ignore" => SignalDispositionAction::Ignore,
"user" => SignalDispositionAction::User,
other => {
return Err(SidecarError::InvalidState(format!(
"unsupported process.signal_state action {other}"
)));
}
};
let Some(vm) = self.vms.get_mut(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC process.signal_state",
);
return Ok(());
};
if action == SignalDispositionAction::Default && mask.is_empty() && flags == 0 {
let remove_process_entry = vm
.signal_states
.get_mut(process_id)
.map(|handlers| {
handlers.remove(&signal);
handlers.is_empty()
})
.unwrap_or(false);
if remove_process_entry {
vm.signal_states.remove(process_id);
}
} else {
vm.signal_states
.entry(process_id.to_owned())
.or_default()
.insert(
signal,
SignalHandlerRegistration {
action,
mask,
flags,
},
);
}
Ok(Value::Null)
}
"net.http_request" => {
let payload = request
.args
.first()
.cloned()
.ok_or_else(|| {
SidecarError::InvalidState(String::from(
"net.http_request requires a request payload",
))
})
.and_then(|value| {
serde_json::from_value::<JavascriptHttpLoopbackRequest>(value).map_err(
|error| {
SidecarError::InvalidState(format!(
"invalid net.http_request payload: {error}"
))
},
)
})?;
if !is_javascript_loopback_host(&payload.host) {
return Err(SidecarError::Execution(format!(
"EACCES: HTTP loopback request requires a loopback host, got {}",
payload.host
)));
}
self.bridge.require_network_access(
vm_id,
NetworkOperation::Http,
format_tcp_resource(&payload.host, payload.port),
)?;
let Some(vm) = self.vms.get_mut(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC net.http_request",
);
return Ok(());
};
let resource_limits = vm.kernel.resource_limits().clone();
let socket_paths = build_javascript_socket_path_context(vm)?;
let target_is_current =
[JavascriptSocketFamily::Ipv4, JavascriptSocketFamily::Ipv6]
.iter()
.any(|family| {
socket_paths
.http_loopback_target(*family, payload.port)
.is_some_and(|target| {
target.process_id == payload.process_id
&& target.server_id == payload.server_id
})
});
if !target_is_current {
return Err(SidecarError::InvalidState(format!(
"unknown HTTP loopback target {}:{} for server {} in process {}",
payload.host, payload.port, payload.server_id, payload.process_id
)));
}
let Some(target_process) = vm.active_processes.get_mut(&payload.process_id) else {
return Err(SidecarError::InvalidState(format!(
"unknown HTTP loopback process {}",
payload.process_id
)));
};
dispatch_loopback_http_request(LoopbackHttpDispatchRequest {
bridge: &self.bridge,
vm_id,
dns: &vm.dns,
socket_paths: &socket_paths,
kernel: &mut vm.kernel,
process: target_process,
resource_limits: &resource_limits,
server_id: payload.server_id,
request_json: &payload.request,
})
.map(Value::String)
}
_ => {
let Some(vm) = self.vms.get_mut(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC bridge dispatch",
);
return Ok(());
};
let resource_limits = vm.kernel.resource_limits().clone();
let network_counts = vm_network_resource_counts(vm);
let socket_paths = build_javascript_socket_path_context(vm)?;
let Some(process) = vm.active_processes.get_mut(process_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC bridge dispatch",
);
return Ok(());
};
service_javascript_sync_rpc(JavascriptSyncRpcServiceRequest {
bridge: &self.bridge,
vm_id,
dns: &vm.dns,
socket_paths: &socket_paths,
kernel: &mut vm.kernel,
process,
sync_request: &request,
resource_limits: &resource_limits,
network_counts,
})
}
};
let Some(vm) = self.vms.get_mut(vm_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC response delivery",
);
return Ok(());
};
let shadow_root = vm.cwd.clone();
let Some(process) = vm.active_processes.get_mut(process_id) else {
log_stale_process_event(
&self.bridge,
vm_id,
process_id,
"javascript sync RPC response delivery",
);
return Ok(());
};
if response.is_ok()
&& matches!(
request.method.as_str(),
"fs.chmodSync" | "fs.promises.chmod"
)
{
let guest_path =
javascript_sync_rpc_arg_str(&request.args, 0, "filesystem chmod path")?;
let mode =
javascript_sync_rpc_arg_u32(&request.args, 1, "filesystem chmod mode")? & 0o7777;
let host_path =
shadow_host_path_for_process(&shadow_root, &process.guest_cwd, guest_path);
if host_path.exists() {
fs::set_permissions(&host_path, fs::Permissions::from_mode(mode)).map_err(
|error| {
SidecarError::Io(format!(
"failed to mirror chmod to shadow path {}: {error}",
host_path.display()
))
},
)?;
}
}
match response {
Ok(result) => process
.execution
.respond_javascript_sync_rpc_success(request.id, result)
.or_else(ignore_stale_javascript_sync_rpc_response),
Err(error) => process
.execution
.respond_javascript_sync_rpc_error(
request.id,
javascript_sync_rpc_error_code(&error),
error.to_string(),
)
.or_else(ignore_stale_javascript_sync_rpc_response),
}
}
/// Applies a `process.kill` aimed at the calling process itself and
/// returns the self-delivery action payload for the bridge.
fn apply_self_process_kill(
&mut self,
vm_id: &str,
process_id: &str,
parsed_signal: i32,
) -> Value {
let action = self
.vms
.get(vm_id)
.and_then(|vm| vm.signal_states.get(process_id))
.and_then(|handlers| handlers.get(&(parsed_signal as u32)))
.map(|registration| registration.action.clone())
.unwrap_or(SignalDispositionAction::Default);
if action == SignalDispositionAction::Default
&& parsed_signal != 0
&& !matches!(
canonical_signal_name(parsed_signal),
Some("SIGWINCH" | "SIGCHLD" | "SIGCONT" | "SIGURG")
)
{
if let Some(vm) = self.vms.get_mut(vm_id) {
if let Some(process) = vm.active_processes.get_mut(process_id) {
process.pending_self_signal_exit = Some(parsed_signal);
}
}
}
json!({
"self": true,
"action": match action {
SignalDispositionAction::Default => "default",
SignalDispositionAction::Ignore => "ignore",
SignalDispositionAction::User => "user",
},
})
}
pub(crate) fn vm_ids_for_scope(
&self,
ownership: &OwnershipScope,
) -> Result<Vec<String>, SidecarError> {
match ownership {
OwnershipScope::SessionOwnership(inner) => {
self.require_owned_session(&inner.connection_id, &inner.session_id)?;
Ok(self
.sessions
.get(&inner.session_id)
.expect("owned session should exist")
.vm_ids
.iter()
.cloned()
.collect())
}
OwnershipScope::VmOwnership(inner) => {
self.require_owned_vm(&inner.connection_id, &inner.session_id, &inner.vm_id)?;
Ok(vec![inner.vm_id.clone()])
}
OwnershipScope::ConnectionOwnership(..) => Err(SidecarError::InvalidState(
String::from("event polling requires session or VM ownership scope"),
)),
}
}
pub(crate) fn vm_ownership(&self, vm_id: &str) -> Result<OwnershipScope, SidecarError> {
let vm = self
.vms
.get(vm_id)
.ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
Ok(OwnershipScope::vm(&vm.connection_id, &vm.session_id, vm_id))
}
pub(crate) fn vm_has_active_processes(&self, vm_id: &str) -> bool {
self.vms
.get(vm_id)
.is_some_and(|vm| !vm.active_processes.is_empty())
}
fn require_authenticated_connection(&self, connection_id: &str) -> Result<(), SidecarError> {
if self.connections.contains_key(connection_id) {
Ok(())
} else {
Err(SidecarError::InvalidState(format!(
"connection {connection_id} has not authenticated"
)))
}
}
pub(crate) fn require_owned_session(
&self,
connection_id: &str,
session_id: &str,
) -> Result<(), SidecarError> {
self.require_authenticated_connection(connection_id)?;
let session = self.sessions.get(session_id).ok_or_else(|| {
SidecarError::InvalidState(format!("unknown sidecar session {session_id}"))
})?;
if session.connection_id == connection_id {
Ok(())
} else {
Err(SidecarError::InvalidState(format!(
"session {session_id} is not owned by connection {connection_id}"
)))
}
}
pub(crate) fn require_owned_vm(
&self,
connection_id: &str,
session_id: &str,
vm_id: &str,
) -> Result<(), SidecarError> {
self.require_owned_session(connection_id, session_id)?;
let vm = self
.vms
.get(vm_id)
.ok_or_else(|| SidecarError::InvalidState(format!("unknown sidecar VM {vm_id}")))?;
if vm.connection_id != connection_id || vm.session_id != session_id {
return Err(SidecarError::InvalidState(format!(
"VM {vm_id} is not owned by {connection_id}/{session_id}"
)));
}
Ok(())
}
fn connection_id_for(&self, ownership: &OwnershipScope) -> Result<String, SidecarError> {
match ownership {
OwnershipScope::ConnectionOwnership(inner) => Ok(inner.connection_id.clone()),
OwnershipScope::SessionOwnership(..) | OwnershipScope::VmOwnership(..) => {
Err(SidecarError::InvalidState(String::from(
"request requires connection ownership scope",
)))
}
}
}
fn validate_auth_token(&self, auth_token: &str) -> Result<(), SidecarError> {
let Some(expected_auth_token) = self.config.expected_auth_token.as_deref() else {
return Ok(());
};
if auth_token == expected_auth_token {
Ok(())
} else {
Err(SidecarError::Unauthorized(String::from(
"authenticate request provided an invalid auth token",
)))
}
}
fn allocate_connection_id(&mut self) -> String {
self.next_connection_id += 1;
format!("conn-{}", self.next_connection_id)
}
fn take_matching_process_event_envelope(
&mut self,
vm_id: &str,
process_id: &str,
) -> Result<Option<ProcessEventEnvelope>, SidecarError> {
if let Some(index) = self
.pending_process_events
.iter()
.position(|event| event.vm_id == vm_id && event.process_id == process_id)
{
return Ok(self.pending_process_events.remove(index));
}
let mut matching_envelope = None;
let mut deferred = Vec::new();
{
let pending_capacity = self.pending_process_event_capacity();
let receiver = self.process_event_receiver.as_mut().ok_or_else(|| {
SidecarError::InvalidState(String::from("process event receiver unavailable"))
})?;
loop {
if deferred.len() >= pending_capacity {
if receiver.is_empty() {
break;
}
return Err(process_event_queue_overflow_error());
}
let envelope = match receiver.try_recv() {
Ok(envelope) => envelope,
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
};
if matching_envelope.is_none()
&& envelope.vm_id == vm_id
&& envelope.process_id == process_id
{
matching_envelope = Some(envelope);
break;
}
deferred.push(envelope);
}
}
for envelope in deferred {
self.queue_pending_process_event(envelope)?;
}
Ok(matching_envelope)
}
fn allocate_sidecar_request_id(&mut self) -> RequestId {
let request_id = self.next_sidecar_request_id;
self.next_sidecar_request_id -= 1;
request_id
}
pub(crate) fn session_scope_for(
&self,
ownership: &OwnershipScope,
) -> Result<(String, String), SidecarError> {
match ownership {
OwnershipScope::SessionOwnership(inner) => {
Ok((inner.connection_id.clone(), inner.session_id.clone()))
}
OwnershipScope::ConnectionOwnership(..) | OwnershipScope::VmOwnership(..) => {
Err(SidecarError::InvalidState(String::from(
"request requires session ownership scope",
)))
}
}
}
pub(crate) fn vm_scope_for(
&self,
ownership: &OwnershipScope,
) -> Result<(String, String, String), SidecarError> {
match ownership {
OwnershipScope::VmOwnership(inner) => Ok((
inner.connection_id.clone(),
inner.session_id.clone(),
inner.vm_id.clone(),
)),
OwnershipScope::ConnectionOwnership(..) | OwnershipScope::SessionOwnership(..) => Err(
SidecarError::InvalidState(String::from("request requires VM ownership scope")),
),
}
}
fn response_with_ownership(
&self,
request_id: RequestId,
ownership: OwnershipScope,
payload: ResponsePayload,
) -> ResponseFrame {
ResponseFrame {
schema: ProtocolSchema::current(),
request_id,
ownership,
payload,
}
}
pub(crate) fn respond(
&self,
request: &RequestFrame,
payload: ResponsePayload,
) -> ResponseFrame {
self.response_with_ownership(request.request_id, request.ownership.clone(), payload)
}
fn reject(&self, request: &RequestFrame, code: &str, message: &str) -> ResponseFrame {
self.respond(
request,
ResponsePayload::Rejected(RejectedResponse {
code: code.to_owned(),
message: message.to_owned(),
}),
)
}
pub fn queue_sidecar_request(
&mut self,
ownership: OwnershipScope,
payload: SidecarRequestPayload,
) -> Result<RequestId, SidecarError> {
if self.outbound_sidecar_requests.len() >= MAX_OUTBOUND_SIDECAR_REQUESTS {
return Err(outbound_sidecar_request_queue_overflow_error());
}
if self.pending_sidecar_responses.pending_count() >= MAX_PENDING_SIDECAR_RESPONSES {
return Err(sidecar_response_pending_overflow_error());
}
let request_id = self.allocate_sidecar_request_id();
let request = SidecarRequestFrame::new(request_id, ownership, payload);
self.pending_sidecar_responses
.register_request(&request)
.map_err(sidecar_response_tracker_error)?;
self.outbound_sidecar_requests.push_back(request);
self.outbound_sidecar_requests_gauge
.observe_depth(self.outbound_sidecar_requests.len());
self.pending_sidecar_responses_gauge
.observe_depth(self.pending_sidecar_responses.pending_count());
Ok(request_id)
}
pub fn queue_wire_sidecar_request(
&mut self,
ownership: crate::wire::OwnershipScope,
payload: crate::wire::SidecarRequestPayload,
) -> Result<crate::wire::RequestId, SidecarError> {
let ownership = crate::wire::ownership_scope_to_compat(ownership);
let payload = crate::wire::sidecar_request_payload_to_compat(&ownership, payload)
.map_err(wire_protocol_error)?;
self.queue_sidecar_request(ownership, payload)
}
pub fn pop_sidecar_request(&mut self) -> Option<SidecarRequestFrame> {
let request = self.outbound_sidecar_requests.pop_front();
self.outbound_sidecar_requests_gauge
.observe_depth(self.outbound_sidecar_requests.len());
request
}
pub fn pop_wire_sidecar_request(
&mut self,
) -> Result<Option<crate::wire::SidecarRequestFrame>, SidecarError> {
self.pop_sidecar_request()
.map(crate::wire::sidecar_request_frame_from_compat)
.transpose()
.map_err(wire_protocol_error)
}
pub fn accept_sidecar_response(
&mut self,
response: SidecarResponseFrame,
) -> Result<(), SidecarError> {
self.pending_sidecar_responses
.accept_response(&response)
.map_err(sidecar_response_tracker_error)?;
self.pending_sidecar_responses_gauge
.observe_depth(self.pending_sidecar_responses.pending_count());
self.completed_sidecar_response_order
.push_back(response.request_id);
self.completed_sidecar_responses
.insert(response.request_id, response);
self.completed_sidecar_responses_gauge
.observe_depth(self.completed_sidecar_responses.len());
while self.completed_sidecar_responses.len() > MAX_COMPLETED_SIDECAR_RESPONSES {
match self.completed_sidecar_response_order.pop_front() {
// Only a response that was never retrieved is a real loss; an id
// already taken via take_sidecar_response leaves a stale order
// entry that removes to None and is not a dropped response.
Some(evicted) => {
if self.completed_sidecar_responses.remove(&evicted).is_some() {
tracing::warn!(
queue = "completed_sidecar_responses",
evicted_request_id = evicted,
capacity = MAX_COMPLETED_SIDECAR_RESPONSES,
"dropping an unretrieved completed sidecar response to stay within cap; the host can no longer fetch it (response lost)"
);
self.completed_sidecar_responses_gauge
.observe_depth(self.completed_sidecar_responses.len());
}
}
None => break,
}
}
Ok(())
}
pub fn accept_wire_sidecar_response(
&mut self,
response: crate::wire::SidecarResponseFrame,
) -> Result<(), SidecarError> {
let response =
crate::wire::sidecar_response_frame_to_compat(response).map_err(wire_protocol_error)?;
self.accept_sidecar_response(response)
}
pub fn take_sidecar_response(&mut self, request_id: RequestId) -> Option<SidecarResponseFrame> {
let response = self.completed_sidecar_responses.remove(&request_id);
if response.is_some() {
self.completed_sidecar_response_order
.retain(|completed_id| completed_id != &request_id);
self.completed_sidecar_responses_gauge
.observe_depth(self.completed_sidecar_responses.len());
}
response
}
pub fn take_wire_sidecar_response(
&mut self,
request_id: crate::wire::RequestId,
) -> Result<Option<crate::wire::SidecarResponseFrame>, SidecarError> {
self.take_sidecar_response(request_id)
.map(|response| {
crate::wire::sidecar_response_frame_from_compat(response)
.map_err(wire_protocol_error)
})
.transpose()
}
pub(crate) fn vm_lifecycle_event(
&self,
connection_id: &str,
session_id: &str,
vm_id: &str,
state: VmLifecycleState,
) -> EventFrame {
EventFrame::new(
OwnershipScope::vm(connection_id, session_id, vm_id),
EventPayload::VmLifecycle(VmLifecycleEvent { state }),
)
}
fn ensure_request_within_frame_limit(
&self,
request: &RequestFrame,
) -> Result<(), SidecarError> {
let frame = crate::protocol::to_generated_protocol_frame(
&crate::protocol::ProtocolFrame::Request(request.clone()),
)
.map_err(|error| {
SidecarError::InvalidState(format!("failed to convert request frame: {error}"))
})?;
let crate::wire::ProtocolFrame::RequestFrame(_) = &frame else {
return Err(SidecarError::InvalidState(String::from(
"request converted to non-request wire frame",
)));
};
crate::wire::WireFrameCodec::new(self.config.max_frame_bytes)
.encode(&frame)
.map(|_| ())
.map_err(|error| SidecarError::FrameTooLarge(error.to_string()))
}
}
impl<B> ExtensionHost for NativeSidecar<B>
where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
fn spawn_process<'a>(
&'a mut self,
ownership: OwnershipScope,
payload: ExecuteRequest,
) -> ExtensionFuture<'a, ProcessStartedResponse> {
Box::pin(async move {
let request = RequestFrame::new(0, ownership, RequestPayload::Execute(payload.clone()));
let dispatch = NativeSidecar::execute(self, &request, payload).await?;
match dispatch.response.payload {
ResponsePayload::ProcessStarted(response) => Ok(response),
other => Err(unexpected_extension_host_response("execute", other)),
}
})
}
fn write_stdin<'a>(
&'a mut self,
ownership: OwnershipScope,
payload: WriteStdinRequest,
) -> ExtensionFuture<'a, StdinWrittenResponse> {
Box::pin(async move {
let request =
RequestFrame::new(0, ownership, RequestPayload::WriteStdin(payload.clone()));
let dispatch = NativeSidecar::write_stdin(self, &request, payload).await?;
match dispatch.response.payload {
ResponsePayload::StdinWritten(response) => Ok(response),
other => Err(unexpected_extension_host_response("write_stdin", other)),
}
})
}
fn close_stdin<'a>(
&'a mut self,
ownership: OwnershipScope,
payload: CloseStdinRequest,
) -> ExtensionFuture<'a, StdinClosedResponse> {
Box::pin(async move {
let request =
RequestFrame::new(0, ownership, RequestPayload::CloseStdin(payload.clone()));
let dispatch = NativeSidecar::close_stdin(self, &request, payload).await?;
match dispatch.response.payload {
ResponsePayload::StdinClosed(response) => Ok(response),
other => Err(unexpected_extension_host_response("close_stdin", other)),
}
})
}
fn kill_process<'a>(
&'a mut self,
ownership: OwnershipScope,
payload: KillProcessRequest,
) -> ExtensionFuture<'a, ProcessKilledResponse> {
Box::pin(async move {
let request =
RequestFrame::new(0, ownership, RequestPayload::KillProcess(payload.clone()));
let dispatch = NativeSidecar::kill_process(self, &request, payload).await?;
match dispatch.response.payload {
ResponsePayload::ProcessKilled(response) => Ok(response),
other => Err(unexpected_extension_host_response("kill_process", other)),
}
})
}
fn poll_event<'a>(
&'a mut self,
ownership: OwnershipScope,
timeout: Duration,
) -> ExtensionFuture<'a, Option<EventFrame>> {
Box::pin(async move { NativeSidecar::poll_event(self, &ownership, timeout).await })
}
fn guest_filesystem_call<'a>(
&'a mut self,
ownership: OwnershipScope,
payload: GuestFilesystemCallRequest,
) -> ExtensionFuture<'a, GuestFilesystemResultResponse> {
Box::pin(async move {
let request = RequestFrame::new(
0,
ownership,
RequestPayload::GuestFilesystemCall(payload.clone()),
);
let dispatch = NativeSidecar::guest_filesystem_call(self, &request, payload).await?;
match dispatch.response.payload {
ResponsePayload::GuestFilesystemResult(response) => Ok(response),
other => Err(unexpected_extension_host_response(
"guest_filesystem_call",
other,
)),
}
})
}
fn bind_process_to_session<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
process_id: String,
) -> ExtensionFuture<'a, ()> {
Box::pin(async move {
self.bind_extension_process_resource(ownership, namespace, ext_session_id, process_id)
})
}
fn bind_vm_to_session<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
) -> ExtensionFuture<'a, ()> {
Box::pin(
async move { self.bind_extension_vm_resource(ownership, namespace, ext_session_id) },
)
}
fn dispose_session_resources<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
) -> ExtensionFuture<'a, Vec<EventFrame>> {
Box::pin(async move {
let key = (namespace, ext_session_id);
let Some(resources) = self.extension_sessions.get(&key) else {
return Ok(Vec::new());
};
if resources.ownership != ownership {
return Err(SidecarError::InvalidState(String::from(
"extension session ownership did not match dispose request",
)));
}
let resources = self
.extension_sessions
.remove(&key)
.expect("extension resources existed before removal");
let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
for process_id in resources.process_ids {
if self
.vms
.get(&vm_id)
.is_some_and(|vm| vm.active_processes.contains_key(&process_id))
{
self.kill_process_internal(&vm_id, &process_id, "SIGTERM")?;
}
}
let mut events = Vec::new();
for resource_vm_id in resources.vm_ids {
if self.vms.contains_key(&resource_vm_id) {
events.extend(
self.dispose_vm_internal(
&connection_id,
&session_id,
&resource_vm_id,
DisposeReason::Requested,
)
.await?,
);
}
}
Ok(events)
})
}
fn start_buffering_process_output<'a>(
&'a mut self,
ownership: OwnershipScope,
process_id: String,
) -> ExtensionFuture<'a, ()> {
Box::pin(async move {
let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
let key = (vm_id, process_id);
if self.extension_process_output_buffers.contains_key(&key) {
return Err(SidecarError::Conflict(String::from(
"extension process output buffering already started",
)));
}
self.extension_process_output_buffers
.insert(key, ExtensionBufferedProcessOutput::default());
Ok(())
})
}
fn handoff_buffered_process_output<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
process_id: String,
timeout: Duration,
) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput> {
Box::pin(async move {
let (connection_id, session_id, vm_id) = self.vm_scope_for(&ownership)?;
self.require_owned_vm(&connection_id, &session_id, &vm_id)?;
let key = (vm_id.clone(), process_id.clone());
let deadline = Instant::now() + timeout;
loop {
self.pump_process_events(&ownership).await?;
while let Some(envelope) =
self.take_matching_process_event_envelope(&vm_id, &process_id)?
{
if self.capture_extension_process_output_event(
&vm_id,
&process_id,
&envelope.event,
) {
continue;
}
self.queue_pending_process_event(envelope)?;
break;
}
let buffered = self
.extension_process_output_buffers
.get(&key)
.is_some_and(|buffer| !buffer.stdout.is_empty() || !buffer.stderr.is_empty());
if buffered || timeout.is_zero() || Instant::now() >= deadline {
break;
}
let remaining = deadline.saturating_duration_since(Instant::now());
time::sleep(remaining.min(Duration::from_millis(10))).await;
}
self.bind_extension_process_resource(
ownership,
namespace,
ext_session_id,
process_id.clone(),
)?;
self.extension_process_output_buffers
.remove(&key)
.ok_or_else(|| {
SidecarError::InvalidState(String::from(
"extension process output buffering was not started",
))
})
})
}
}
fn unexpected_extension_host_response(operation: &str, payload: ResponsePayload) -> SidecarError {
match payload {
ResponsePayload::Rejected(response) => SidecarError::InvalidState(format!(
"extension {operation} rejected with {}: {}",
response.code, response.message
)),
other => SidecarError::InvalidState(format!(
"extension {operation} returned unexpected response: {other:?}"
)),
}
}
fn shadow_host_path_for_process(
shadow_root: &Path,
process_guest_cwd: &str,
guest_path: &str,
) -> PathBuf {
let normalized_guest_path = if guest_path.starts_with('/') {
normalize_path(guest_path)
} else {
normalize_path(&format!(
"{}/{}",
process_guest_cwd.trim_end_matches('/'),
guest_path
))
};
if normalized_guest_path == "/" {
shadow_root.to_path_buf()
} else {
shadow_root.join(normalized_guest_path.trim_start_matches('/'))
}
}
fn sidecar_response_tracker_error(error: SidecarResponseTrackerError) -> SidecarError {
SidecarError::InvalidState(format!(
"invalid sidecar response correlation state: {error}"
))
}
fn map_bridge_permission(decision: secure_exec_bridge::PermissionDecision) -> PermissionDecision {
match decision.verdict {
secure_exec_bridge::PermissionVerdict::Allow => PermissionDecision::allow(),
secure_exec_bridge::PermissionVerdict::Deny => PermissionDecision::deny(
decision
.reason
.unwrap_or_else(|| String::from("denied by host")),
),
secure_exec_bridge::PermissionVerdict::Prompt => PermissionDecision::deny(
decision
.reason
.unwrap_or_else(|| String::from("permission prompt required")),
),
}
}
fn audit_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before unix epoch")
.as_millis()
.to_string()
}
pub(crate) fn audit_fields<I, K, V>(fields: I) -> BTreeMap<String, String>
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
let mut mapped = BTreeMap::from([(String::from("timestamp"), audit_timestamp())]);
for (key, value) in fields {
mapped.insert(key.into(), value.into());
}
mapped
}
pub(crate) fn emit_structured_event<B>(
bridge: &SharedBridge<B>,
vm_id: &str,
name: &str,
fields: BTreeMap<String, String>,
) -> Result<(), SidecarError>
where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
bridge.with_mut(|bridge| {
bridge.emit_structured_event(StructuredEventRecord {
vm_id: vm_id.to_owned(),
name: name.to_owned(),
fields,
})
})
}
pub(crate) fn emit_security_audit_event<B>(
bridge: &SharedBridge<B>,
vm_id: &str,
name: &str,
fields: BTreeMap<String, String>,
) where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
let _ = emit_structured_event(bridge, vm_id, name, fields);
}
/// Build a wire `EventFrame` carrying a `StructuredEvent` (name + string-map
/// detail) scoped to a connection. Used to forward limit-registry warnings to the
/// host as `{type:"structured", name:"limit_warning", detail}` events without a
/// protocol schema change. Emitted directly to the host (not via the polled,
/// per-session bridge queue, which is a no-op in the stdio sidecar), so a
/// process-global signal is delivered against the active connection.
pub(crate) fn structured_event_frame(
connection_id: &str,
name: &str,
detail: std::collections::HashMap<String, String>,
) -> Result<crate::wire::EventFrame, SidecarError> {
let event = EventFrame::new(
OwnershipScope::connection(connection_id),
EventPayload::Structured(crate::protocol::StructuredEvent {
name: name.to_owned(),
detail,
}),
);
crate::wire::event_frame_from_compat(event).map_err(|error| {
SidecarError::InvalidState(format!("invalid structured event frame: {error}"))
})
}
pub(crate) fn log_stale_process_event<B>(
bridge: &SharedBridge<B>,
vm_id: &str,
process_id: &str,
context: &str,
) where
B: NativeSidecarBridge + Send + 'static,
BridgeError<B>: fmt::Debug + Send + Sync + 'static,
{
let _ = bridge.emit_log(
vm_id,
format!(
"Ignoring stale process event during {context}: VM {vm_id} process {process_id} was already reaped"
),
);
}
// filesystem_operation_label moved to crate::vm
pub(crate) fn root_filesystem_error(error: impl std::fmt::Display) -> SidecarError {
SidecarError::InvalidState(format!("root filesystem: {error}"))
}
pub(crate) fn normalize_path(path: &str) -> String {
let mut segments = Vec::new();
for component in Path::new(path).components() {
match component {
Component::RootDir => segments.clear(),
Component::ParentDir => {
segments.pop();
}
Component::CurDir => {}
Component::Normal(value) => segments.push(value.to_string_lossy().into_owned()),
Component::Prefix(prefix) => {
segments.push(prefix.as_os_str().to_string_lossy().into_owned());
}
}
}
let normalized = format!("/{}", segments.join("/"));
if normalized.is_empty() {
String::from("/")
} else {
normalized
}
}
pub(crate) fn normalize_host_path(path: &Path) -> PathBuf {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
Component::Prefix(prefix) => normalized.push(prefix.as_os_str()),
Component::RootDir => normalized.push(Path::new("/")),
Component::CurDir => {}
Component::ParentDir => {
if normalized != Path::new("/") {
normalized.pop();
}
}
Component::Normal(part) => normalized.push(part),
}
}
if normalized.as_os_str().is_empty() {
if path.is_absolute() {
PathBuf::from("/")
} else {
PathBuf::from(".")
}
} else {
normalized
}
}
pub(crate) fn path_is_within_root(path: &Path, root: &Path) -> bool {
path == root || path.starts_with(root)
}
pub(crate) fn dirname(path: &str) -> String {
let normalized = normalize_path(path);
let parent = Path::new(&normalized)
.parent()
.unwrap_or_else(|| Path::new("/"));
let value = parent.to_string_lossy();
if value.is_empty() {
String::from("/")
} else {
value.into_owned()
}
}
pub(crate) fn kernel_error(error: KernelError) -> SidecarError {
SidecarError::Kernel(error.to_string())
}
pub(crate) fn plugin_error(error: PluginError) -> SidecarError {
SidecarError::Plugin(error.to_string())
}
pub(crate) fn javascript_error(error: JavascriptExecutionError) -> SidecarError {
SidecarError::Execution(error.to_string())
}
pub(crate) fn wasm_error(error: WasmExecutionError) -> SidecarError {
SidecarError::Execution(error.to_string())
}
pub(crate) fn python_error(error: PythonExecutionError) -> SidecarError {
SidecarError::Execution(error.to_string())
}
pub(crate) fn vfs_error(error: VfsError) -> SidecarError {
SidecarError::Kernel(error.to_string())
}
/// Actionable guidance shown when guest package resolution fails because the packages live in a
/// non-flat `node_modules` whose package store is not visible in the VM. Mounting host `node_modules`
/// is a bind mount, so symlinked/store layouts
/// do not resolve inside the VM: Node canonicalizes a module to its store
/// realpath (e.g. `node_modules/.pnpm/...`, `.bun/...`, `.store/...`) which lives
/// above the mounted directory and the guest `fs` cannot read. Plug'n'Play
/// (yarn-berry default) has no `node_modules` at all. A flat (hoisted) layout is
/// required. The empirically-supported package managers are captured in
/// `crates/sidecar/tests/module_layout_e2e.rs`.
#[allow(dead_code)]
const HOISTED_NODE_MODULES_GUIDANCE: &str = "secure-exec can't load mounted node_modules: the directory uses a non-flat layout (pnpm / bun / yarn workspaces store, or yarn Plug'n'Play) whose package store isn't visible inside the VM. A flat (hoisted) node_modules is required.\n - pnpm -> add `node-linker=hoisted` to .npmrc, then reinstall\n - yarn berry -> set `nodeLinker: node-modules` in .yarnrc.yml (not pnp/pnpm)\n - bun -> install dependencies outside a workspace (workspaces use a .bun store)\n - npm / yarn classic -> already flat, no change needed";
/// Detect, from an adapter's captured stderr, a non-flat-`node_modules` failure
/// signature. Returns the actionable guidance to fold into the surfaced error,
/// or `None` when the failure is unrelated.
///
/// Two signatures, both kept specific so they never fire on unrelated crashes:
/// - a missing-file / cannot-resolve error referencing a package STORE path that
/// lives above the mounted project (`.pnpm`, `.bun`, `.store`, PnP `__virtual__`),
/// - a yarn Plug'n'Play fingerprint (`.pnp.cjs`, the zip cache, or PnP's
/// "isn't declared in your dependencies" resolver error).
#[allow(dead_code)]
fn symlinked_node_modules_hint(stderr: &str) -> Option<&'static str> {
// Package stores that only appear in a path when a non-flat layout is used.
// pnpm (isolated), bun (workspace), yarn-berry (nodeLinker: pnpm), and PnP
// virtual instances all keep real package files under these store dirs, which
// sit above the mounted project node_modules and so are not guest-visible.
const STORE_MARKERS: &[&str] = &[
"node_modules/.pnpm/",
"node_modules/.bun/",
"node_modules/.store/",
"/__virtual__/",
];
// Yarn Plug'n'Play has no node_modules at all; resolution fails against the
// .pnp runtime / zip cache. "isn't declared in your dependencies" is PnP's
// distinctive resolver error and is specific enough to fire on its own.
const PNP_STRICT_MARKERS: &[&str] = &["isn't declared in your dependencies"];
const PNP_PATH_MARKERS: &[&str] = &[".pnp.cjs", ".pnp.loader.mjs", "/.yarn/cache/"];
if PNP_STRICT_MARKERS.iter().any(|m| stderr.contains(m)) {
return Some(HOISTED_NODE_MODULES_GUIDANCE);
}
let missing = stderr.contains("ENOENT")
|| stderr.contains("no such file or directory")
|| stderr.contains("Cannot find module")
|| stderr.contains("MODULE_NOT_FOUND");
if !missing {
return None;
}
if STORE_MARKERS.iter().any(|m| stderr.contains(m))
|| PNP_PATH_MARKERS.iter().any(|m| stderr.contains(m))
{
return Some(HOISTED_NODE_MODULES_GUIDANCE);
}
None
}
#[cfg(test)]
mod symlinked_node_modules_hint_tests {
use super::symlinked_node_modules_hint;
// Positive cases: each non-flat package manager's store/PnP signature.
#[test]
fn matches_pnpm_store_enoent() {
// Real pi-coding-agent failure: getPackageDir() falls back to a
// dist/package.json inside the unreachable .pnpm store.
let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.pnpm/@mariozechner+pi-coding-agent@0.60.0_x/node_modules/@mariozechner/pi-coding-agent/dist/package.json'";
let hint = symlinked_node_modules_hint(stderr).expect("expected hoisted guidance");
assert!(hint.contains("secure-exec can't load mounted node_modules"));
assert!(!hint.contains("agentos"));
}
#[test]
fn matches_bun_store_enoent() {
let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.bun/is-odd@3.0.1/node_modules/is-odd/package.json'";
assert!(symlinked_node_modules_hint(stderr).is_some());
}
#[test]
fn matches_yarn_pnpm_store_enoent() {
let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/.store/is-odd-npm-3.0.1-93c3c3f41b/package/package.json'";
assert!(symlinked_node_modules_hint(stderr).is_some());
}
#[test]
fn matches_pnp_declared_error() {
// Yarn PnP's distinctive resolver error (no node_modules at all).
let stderr = "Error: Your application tried to access is-number, but it isn't declared in your dependencies; this makes the require call ambiguous and unsound.";
assert!(symlinked_node_modules_hint(stderr).is_some());
}
#[test]
fn matches_pnp_cjs_module_not_found() {
let stderr = "Error: Cannot find module 'is-odd'\n at /root/.pnp.cjs:12345:18\n code: 'MODULE_NOT_FOUND'";
assert!(symlinked_node_modules_hint(stderr).is_some());
}
#[test]
fn matches_virtual_instance() {
let stderr = "Error: ENOENT: no such file or directory, open '/root/.yarn/__virtual__/is-odd-abc/1/node_modules/is-odd/package.json'";
assert!(symlinked_node_modules_hint(stderr).is_some());
}
// Negative cases: must not fire.
#[test]
fn ignores_enoent_outside_a_store() {
let stderr = "Error: ENOENT: no such file or directory, open '/tmp/scratch/config.json'";
assert!(symlinked_node_modules_hint(stderr).is_none());
}
#[test]
fn ignores_store_path_without_missing_file() {
let stderr =
"loaded /root/node_modules/.pnpm/some-pkg@1.0.0/node_modules/some-pkg/index.js";
assert!(symlinked_node_modules_hint(stderr).is_none());
}
#[test]
fn ignores_flat_node_modules_enoent() {
// npm / yarn-nm / pnpm-hoisted: flat, no store dir in the path.
let stderr = "Error: ENOENT: no such file or directory, open '/root/node_modules/is-odd/missing-asset.json'";
assert!(symlinked_node_modules_hint(stderr).is_none());
}
#[test]
fn ignores_unrelated_failure() {
let stderr = "Error: connect ECONNREFUSED 127.0.0.1:443";
assert!(symlinked_node_modules_hint(stderr).is_none());
}
}
#[cfg(test)]
mod structured_event_frame_tests {
use super::*;
#[test]
fn structured_event_frame_round_trips_limit_warning() {
let mut detail = std::collections::HashMap::new();
// Pin a real emitted limit name rather than a fictional string.
let limit_name = TrackedLimit::JavascriptEventChannel.as_str();
detail.insert(String::from("limit"), String::from(limit_name));
detail.insert(String::from("fillPercent"), String::from("82"));
let wire = structured_event_frame("conn-1", "limit_warning", detail)
.expect("build structured event frame");
let compat = crate::wire::event_frame_to_compat(wire).expect("convert to compat");
match compat.payload {
EventPayload::Structured(event) => {
assert_eq!(event.name, "limit_warning");
assert_eq!(
event.detail.get("limit").map(String::as_str),
Some(limit_name)
);
assert_eq!(
event.detail.get("fillPercent").map(String::as_str),
Some("82")
);
}
other => panic!("expected structured payload, got {other:?}"),
}
match compat.ownership {
OwnershipScope::ConnectionOwnership(inner) => {
assert_eq!(inner.connection_id, "conn-1");
}
other => panic!("expected connection ownership, got {other:?}"),
}
}
}