use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use scc::{HashMap as SccHashMap, HashSet as SccHashSet};
use tokio::sync::{broadcast, oneshot, watch};
use tokio::task::JoinHandle;
use agent_os_sidecar::protocol::{
ConfigureVmRequest, CreateVmRequest, DisposeReason, DisposeVmRequest, EventPayload,
GuestRuntimeKind, MountDescriptor, MountPluginDescriptor, OpenSessionRequest, OwnershipScope,
PermissionsPolicy, RegisterToolkitRequest, RegisteredToolDefinition, RequestPayload,
ResponsePayload, RootFilesystemDescriptor, SidecarPlacement, SidecarRequestPayload,
SidecarResponsePayload, SoftwareDescriptor, ToolInvocationRequest, ToolInvocationResultResponse,
VmLifecycleState,
};
use crate::config::{AgentOsConfig, HostTool, SoftwareKind, TimerScheduleDriver};
use crate::cron::CronManager;
use crate::error::ClientError;
use crate::json_rpc::SequencedEvent;
use crate::process::SYNTHETIC_PID_BASE;
use crate::session::{
AgentCapabilities, AgentInfo, PermissionReply, PermissionRequest, SessionConfigOption,
SessionModeState,
};
use crate::sidecar::{AgentOsSidecar, AgentOsSidecarPlacement, AgentOsSidecarVmLease};
use crate::transport::{SidecarCallback, SidecarTransport};
use once_cell::sync::OnceCell;
pub(crate) struct ProcessEntry {
pub command: String,
pub args: Vec<String>,
pub stdout_tx: broadcast::Sender<Vec<u8>>,
pub stderr_tx: broadcast::Sender<Vec<u8>>,
pub exit_tx: watch::Sender<Option<i32>>,
pub process_id: String,
pub kernel_pid: watch::Sender<Option<u32>>,
}
pub(crate) struct ShellEntry {
pub pid: u32,
pub data_tx: broadcast::Sender<Vec<u8>>,
pub stderr_tx: broadcast::Sender<Vec<u8>>,
pub process_id: String,
pub spawned_tx: watch::Sender<bool>,
}
pub(crate) struct SessionEntry {
pub agent_type: String,
pub modes: parking_lot::Mutex<Option<SessionModeState>>,
pub config_options: parking_lot::Mutex<Vec<SessionConfigOption>>,
pub capabilities: parking_lot::Mutex<Option<AgentCapabilities>>,
pub agent_info: parking_lot::Mutex<Option<AgentInfo>>,
pub config_overrides: parking_lot::Mutex<std::collections::BTreeMap<String, String>>,
pub event_ring: parking_lot::Mutex<VecDeque<SequencedEvent>>,
pub highest_sequence_number: AtomicI64,
pub event_tx: broadcast::Sender<SequencedEvent>,
pub permission_tx: broadcast::Sender<PermissionRequest>,
pub pending_permission_replies: SccHashMap<String, oneshot::Sender<PermissionReply>>,
pub pending_prompt_resolvers: SccHashMap<i64, oneshot::Sender<crate::json_rpc::JsonRpcResponse>>,
}
#[derive(Clone)]
pub struct AgentOs {
inner: Arc<AgentOsInner>,
}
pub(crate) struct AgentOsInner {
pub(crate) transport: Arc<SidecarTransport>,
pub(crate) connection_id: String,
pub(crate) session_id: String,
pub(crate) vm_id: String,
pub(crate) request_counter: AtomicI64,
pub(crate) sidecar_request_counter: AtomicI64,
pub(crate) max_frame_bytes: AtomicUsize,
pub(crate) processes: SccHashMap<u32, ProcessEntry>,
pub(crate) process_counter: AtomicU64,
pub(crate) synthetic_pid_counter: AtomicU64,
pub(crate) observed_process_start_times: SccHashMap<String, f64>,
pub(crate) observed_process_exit_times: SccHashMap<String, f64>,
pub(crate) shells: SccHashMap<String, ShellEntry>,
pub(crate) shell_counter: AtomicU64,
pub(crate) pending_shell_exits: SccHashMap<u64, JoinHandle<()>>,
pub(crate) acp_terminal_pids: SccHashSet<u32>,
pub(crate) sessions: SccHashMap<String, SessionEntry>,
pub(crate) closed_session_ids: parking_lot::Mutex<VecDeque<String>>,
pub(crate) closing_session_ids: SccHashSet<String>,
pub(crate) cron: Arc<CronManager>,
pub(crate) config: Arc<AgentOsConfig>,
pub(crate) sidecar: Arc<AgentOsSidecar>,
pub(crate) sidecar_lease: parking_lot::Mutex<Option<AgentOsSidecarVmLease>>,
pub(crate) in_process_mounts: SccHashMap<String, crate::fs::MountedFs>,
pub(crate) disposed: AtomicBool,
}
impl AgentOs {
pub async fn create(options: AgentOsConfig) -> Result<AgentOs, ClientError> {
let config = Arc::new(options);
let sidecar = match &config.sidecar {
Some(crate::config::AgentOsSidecarConfig::Explicit { handle }) => handle.clone(),
Some(crate::config::AgentOsSidecarConfig::Shared { pool }) => {
AgentOs::get_shared_sidecar(pool.clone()).await?
}
None => AgentOs::get_shared_sidecar(None).await?,
};
let (transport, connection_id, max_frame_bytes) = sidecar.ensure_connection().await?;
let session = match transport
.request(
OwnershipScope::connection(&connection_id),
RequestPayload::OpenSession(OpenSessionRequest {
placement: sidecar_wire_placement(&sidecar),
metadata: BTreeMap::new(),
}),
)
.await?
{
ResponsePayload::SessionOpened(opened) => opened,
ResponsePayload::Rejected(rejected) => return Err(rejected_to_error(rejected)),
_ => return Err(ClientError::Sidecar("unexpected open_session response".to_string())),
};
let session_id = session.session_id;
let mut events = transport.subscribe_events();
let vm = match transport
.request(
OwnershipScope::session(&connection_id, &session_id),
RequestPayload::CreateVm(CreateVmRequest {
runtime: GuestRuntimeKind::JavaScript,
metadata: BTreeMap::new(),
root_filesystem: RootFilesystemDescriptor::default(),
permissions: Some(PermissionsPolicy::allow_all()),
}),
)
.await?
{
ResponsePayload::VmCreated(created) => created,
ResponsePayload::Rejected(rejected) => return Err(rejected_to_error(rejected)),
_ => return Err(ClientError::Sidecar("unexpected create_vm response".to_string())),
};
let vm_id = vm.vm_id;
wait_for_vm_ready(&mut events, &vm_id, crate::VM_READY_TIMEOUT_MS).await?;
let resolved_software = resolve_software(&config)?;
let command_mounts = build_command_mounts(&resolved_software);
let software: Vec<SoftwareDescriptor> = resolved_software
.into_iter()
.map(|entry| entry.descriptor)
.collect();
match transport
.request(
OwnershipScope::vm(&connection_id, &session_id, &vm_id),
RequestPayload::ConfigureVm(ConfigureVmRequest {
mounts: command_mounts,
software,
permissions: Some(PermissionsPolicy::allow_all()),
module_access_cwd: config.module_access_cwd.clone(),
instructions: config
.additional_instructions
.clone()
.into_iter()
.collect(),
projected_modules: Vec::new(),
command_permissions: BTreeMap::new(),
allowed_node_builtins: config.allowed_node_builtins.clone().unwrap_or_default(),
loopback_exempt_ports: config.loopback_exempt_ports.clone(),
}),
)
.await?
{
ResponsePayload::VmConfigured(_) => {}
ResponsePayload::Rejected(rejected) => return Err(rejected_to_error(rejected)),
_ => return Err(ClientError::Sidecar("unexpected configure_vm response".to_string())),
}
if !config.tool_kits.is_empty() {
let mut tool_map: std::collections::HashMap<String, HostTool> =
std::collections::HashMap::new();
for kit in &config.tool_kits {
let mut tools = BTreeMap::new();
for tool in &kit.tools {
tools.insert(
tool.name.clone(),
RegisteredToolDefinition {
description: tool.description.clone(),
input_schema: tool.input_schema.clone(),
timeout_ms: tool.timeout_ms,
examples: Vec::new(),
},
);
tool_map.insert(format!("{}:{}", kit.name, tool.name), tool.clone());
}
match transport
.request(
OwnershipScope::vm(&connection_id, &session_id, &vm_id),
RequestPayload::RegisterToolkit(RegisterToolkitRequest {
name: kit.name.clone(),
description: kit.description.clone(),
tools,
}),
)
.await?
{
ResponsePayload::ToolkitRegistered(_) => {}
ResponsePayload::Rejected(rejected) => return Err(rejected_to_error(rejected)),
_ => {
return Err(ClientError::Sidecar(
"unexpected register_toolkit response".to_string(),
))
}
}
}
let _ = vm_tools().insert(vm_id.clone(), Arc::new(tool_map));
transport.register_callback("tool_invocation", tool_invocation_callback());
}
sidecar.active_vm_count.fetch_add(1, Ordering::SeqCst);
let lease = AgentOsSidecarVmLease {
vm_id: vm_id.clone(),
sidecar: sidecar.clone(),
};
let driver = config
.schedule_driver
.clone()
.unwrap_or_else(|| Arc::new(TimerScheduleDriver::new()));
let cron = Arc::new(CronManager::new(driver));
let inner = AgentOsInner {
transport,
connection_id,
session_id,
vm_id,
request_counter: AtomicI64::new(1),
sidecar_request_counter: AtomicI64::new(-1),
max_frame_bytes: AtomicUsize::new(max_frame_bytes),
processes: SccHashMap::new(),
process_counter: AtomicU64::new(1),
synthetic_pid_counter: AtomicU64::new(SYNTHETIC_PID_BASE),
observed_process_start_times: SccHashMap::new(),
observed_process_exit_times: SccHashMap::new(),
shells: SccHashMap::new(),
shell_counter: AtomicU64::new(0),
pending_shell_exits: SccHashMap::new(),
acp_terminal_pids: SccHashSet::new(),
sessions: SccHashMap::new(),
closed_session_ids: parking_lot::Mutex::new(VecDeque::new()),
closing_session_ids: SccHashSet::new(),
cron,
config,
sidecar,
sidecar_lease: parking_lot::Mutex::new(Some(lease)),
in_process_mounts: SccHashMap::new(),
disposed: AtomicBool::new(false),
};
Ok(AgentOs {
inner: Arc::new(inner),
})
}
pub async fn shutdown(&self) -> Result<(), ClientError> {
if self.inner.disposed.swap(true, Ordering::SeqCst) {
return Ok(());
}
self.inner.cron.dispose();
let mut exit_tasks = Vec::new();
self.inner.pending_shell_exits.retain(|_, task| {
exit_tasks.push(std::mem::replace(task, tokio::spawn(async {})));
false
});
if !exit_tasks.is_empty() {
let drain = async {
for task in exit_tasks {
let _ = task.await;
}
};
let _ = tokio::time::timeout(
Duration::from_millis(crate::SHELL_DISPOSE_TIMEOUT_MS),
drain,
)
.await;
}
let lease = self.inner.sidecar_lease.lock().take();
let _ = self
.transport()
.request(
OwnershipScope::vm(
&self.inner.connection_id,
&self.inner.session_id,
&self.inner.vm_id,
),
RequestPayload::DisposeVm(DisposeVmRequest {
reason: DisposeReason::Requested,
}),
)
.await;
let _ = vm_tools().remove(&self.inner.vm_id);
let sidecar = self.inner.sidecar.clone();
if let Some(lease) = lease {
lease.dispose().await?;
}
if sidecar.active_vm_count.load(Ordering::SeqCst) == 0 {
sidecar.kill_connection().await;
let _ = sidecar.dispose().await;
}
Ok(())
}
pub(crate) fn inner(&self) -> &AgentOsInner {
&self.inner
}
pub(crate) fn transport(&self) -> &Arc<SidecarTransport> {
&self.inner.transport
}
pub(crate) fn connection_id(&self) -> &str {
&self.inner.connection_id
}
pub(crate) fn wire_session_id(&self) -> &str {
&self.inner.session_id
}
pub(crate) fn vm_id(&self) -> &str {
&self.inner.vm_id
}
pub(crate) fn config(&self) -> &Arc<AgentOsConfig> {
&self.inner.config
}
pub(crate) fn cron(&self) -> &Arc<CronManager> {
&self.inner.cron
}
pub fn sidecar(&self) -> Arc<AgentOsSidecar> {
self.inner.sidecar.clone()
}
}
fn sidecar_wire_placement(sidecar: &AgentOsSidecar) -> SidecarPlacement {
match &sidecar.placement {
AgentOsSidecarPlacement::Shared { pool } => SidecarPlacement::Shared { pool: pool.clone() },
AgentOsSidecarPlacement::Explicit { sidecar_id } => SidecarPlacement::Explicit {
sidecar_id: sidecar_id.clone(),
},
}
}
async fn wait_for_vm_ready(
events: &mut broadcast::Receiver<(OwnershipScope, EventPayload)>,
vm_id: &str,
timeout_ms: u64,
) -> Result<(), ClientError> {
let wait = async {
loop {
match events.recv().await {
Ok((ownership, payload)) => match payload {
EventPayload::VmLifecycle(event) => {
if matches!(event.state, VmLifecycleState::Ready)
&& ownership_vm_id(&ownership) == Some(vm_id)
{
return Ok(());
}
}
EventPayload::ProcessOutput(_)
| EventPayload::ProcessExited(_)
| EventPayload::Structured(_) => {}
},
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
return Err(ClientError::Sidecar(
"sidecar transport closed before the VM became ready".to_string(),
));
}
}
}
};
tokio::time::timeout(Duration::from_millis(timeout_ms), wait)
.await
.map_err(|_| {
ClientError::Sidecar("timed out waiting for the VM to become ready".to_string())
})?
}
static VM_TOOLS: OnceCell<SccHashMap<String, Arc<std::collections::HashMap<String, HostTool>>>> =
OnceCell::new();
fn vm_tools() -> &'static SccHashMap<String, Arc<std::collections::HashMap<String, HostTool>>> {
VM_TOOLS.get_or_init(SccHashMap::new)
}
fn tool_invocation_callback() -> SidecarCallback {
Arc::new(|payload, ownership| {
Box::pin(async move {
let request = match payload {
SidecarRequestPayload::ToolInvocation(request) => request,
_ => {
return Ok(SidecarResponsePayload::ToolInvocationResult(
ToolInvocationResultResponse {
invocation_id: "unknown".to_string(),
result: None,
error: Some(
"tool-invocation callback received a non-tool request".to_string(),
),
},
));
}
};
Ok(SidecarResponsePayload::ToolInvocationResult(
run_tool_invocation(&ownership, request).await,
))
})
})
}
async fn run_tool_invocation(
ownership: &OwnershipScope,
request: ToolInvocationRequest,
) -> ToolInvocationResultResponse {
let vm_id = ownership_vm_id(ownership).unwrap_or("");
let tool = vm_tools()
.read(vm_id, |_, map| map.clone())
.and_then(|map| map.get(&request.tool_key).cloned());
let Some(tool) = tool else {
return ToolInvocationResultResponse {
invocation_id: request.invocation_id,
result: None,
error: Some(format!("Unknown tool \"{}\"", request.tool_key)),
};
};
let timeout = Duration::from_millis(request.timeout_ms.max(1));
match tokio::time::timeout(timeout, (tool.execute)(request.input)).await {
Ok(Ok(value)) => ToolInvocationResultResponse {
invocation_id: request.invocation_id,
result: Some(value),
error: None,
},
Ok(Err(error)) => ToolInvocationResultResponse {
invocation_id: request.invocation_id,
result: None,
error: Some(error),
},
Err(_) => ToolInvocationResultResponse {
invocation_id: request.invocation_id,
result: None,
error: Some(format!(
"Tool \"{}\" timed out after {}ms",
request.tool_key, request.timeout_ms
)),
},
}
}
struct ResolvedSoftware {
descriptor: SoftwareDescriptor,
kind: SoftwareKind,
}
fn resolve_software(config: &AgentOsConfig) -> Result<Vec<ResolvedSoftware>, ClientError> {
if config.software.is_empty() {
return Ok(Vec::new());
}
let module_access_cwd = config
.module_access_cwd
.clone()
.unwrap_or_else(|| ".".to_string());
let mut resolved = Vec::with_capacity(config.software.len());
for input in &config.software {
let root = std::path::Path::new(&module_access_cwd)
.join("node_modules")
.join(&input.package);
if !root.exists() {
return Err(ClientError::Sidecar(format!(
"software package not found: {} (looked in {})",
input.package,
root.display()
)));
}
resolved.push(ResolvedSoftware {
descriptor: SoftwareDescriptor {
package_name: input.package.clone(),
root: root.to_string_lossy().into_owned(),
},
kind: input.kind,
});
}
Ok(resolved)
}
fn build_command_mounts(resolved: &[ResolvedSoftware]) -> Vec<MountDescriptor> {
let mut mounts = Vec::new();
for entry in resolved {
match entry.kind {
SoftwareKind::WasmCommands => {
let index = mounts.len();
mounts.push(MountDescriptor {
guest_path: format!("/__agentos/commands/{index:03}"),
read_only: true,
plugin: MountPluginDescriptor {
id: String::from("host_dir"),
config: serde_json::json!({
"hostPath": entry.descriptor.root,
"readOnly": true,
}),
},
});
}
SoftwareKind::Agent | SoftwareKind::Tool => {}
}
}
mounts
}
fn ownership_vm_id(ownership: &OwnershipScope) -> Option<&str> {
match ownership {
OwnershipScope::Vm { vm_id, .. } => Some(vm_id),
OwnershipScope::Connection { .. } | OwnershipScope::Session { .. } => None,
}
}
fn rejected_to_error(rejected: agent_os_sidecar::protocol::RejectedResponse) -> ClientError {
ClientError::Kernel {
code: rejected.code,
message: rejected.message,
}
}