use std::collections::{BTreeMap, VecDeque};
use std::pin::Pin;
use std::sync::atomic::Ordering;
use anyhow::Result;
use futures::Stream;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use agent_os_sidecar::protocol::{
CloseAgentSessionRequest, CreateSessionRequest, GetSessionStateRequest, GuestRuntimeKind,
OwnershipScope, RequestPayload, ResponsePayload, SessionCreatedResponse, SessionRequest,
SessionStateResponse,
};
use crate::agent_os::{AgentOs, SessionEntry};
use crate::error::ClientError;
use crate::json_rpc::{JsonRpcError, JsonRpcId, JsonRpcNotification, JsonRpcResponse, SequencedEvent};
use crate::stream::{subscribe_with_replay, Subscription};
use crate::{ACP_SESSION_EVENT_RETENTION_LIMIT, CLOSED_SESSION_ID_RETENTION_LIMIT, PERMISSION_TIMEOUT_MS};
const LEGACY_PERMISSION_METHOD: &str = "request/permission";
const ACP_PERMISSION_METHOD: &str = "session/request_permission";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionInfo {
#[serde(rename = "sessionId")]
pub session_id: String,
#[serde(rename = "agentType")]
pub agent_type: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AgentRegistryEntry {
pub id: String,
#[serde(rename = "acpAdapter")]
pub acp_adapter: String,
#[serde(rename = "agentPackage")]
pub agent_package: String,
pub installed: bool,
}
const BUILTIN_AGENT_IDS: [&str; 5] = ["pi", "pi-cli", "opencode", "claude", "codex"];
const OPENCODE_CONTEXT_PATHS: [&str; 12] = [
".github/copilot-instructions.md",
".cursorrules",
".cursor/rules/",
"CLAUDE.md",
"CLAUDE.local.md",
"opencode.md",
"opencode.local.md",
"OpenCode.md",
"OpenCode.local.md",
"OPENCODE.md",
"OPENCODE.local.md",
"/etc/agentos/instructions.md",
];
struct AgentConfigDef {
acp_adapter: &'static str,
agent_package: &'static str,
default_env: &'static [(&'static str, &'static str)],
}
fn agent_config(agent_type: &str) -> Option<AgentConfigDef> {
Some(match agent_type {
"pi" => AgentConfigDef {
acp_adapter: "@rivet-dev/agent-os-pi",
agent_package: "@mariozechner/pi-coding-agent",
default_env: &[],
},
"pi-cli" => AgentConfigDef {
acp_adapter: "pi-acp",
agent_package: "@mariozechner/pi-coding-agent",
default_env: &[],
},
"opencode" => AgentConfigDef {
acp_adapter: "@rivet-dev/agent-os-opencode",
agent_package: "@rivet-dev/agent-os-opencode",
default_env: &[
("OPENCODE_DISABLE_CONFIG_DEP_INSTALL", "1"),
("OPENCODE_DISABLE_EMBEDDED_WEB_UI", "1"),
],
},
"claude" => AgentConfigDef {
acp_adapter: "@rivet-dev/agent-os-claude",
agent_package: "@anthropic-ai/claude-agent-sdk",
default_env: &[
("CLAUDE_AGENT_SDK_CLIENT_APP", "@rivet-dev/agent-os"),
("CLAUDE_CODE_SIMPLE", "1"),
("CLAUDE_CODE_FORCE_AGENT_OS_RIPGREP", "1"),
("CLAUDE_CODE_DEFER_GROWTHBOOK_INIT", "1"),
("CLAUDE_CODE_DISABLE_CWD_PERSIST", "1"),
("CLAUDE_CODE_DISABLE_DEV_NULL_REDIRECT", "1"),
("CLAUDE_CODE_NODE_SHELL_WRAPPER", "1"),
("CLAUDE_CODE_DISABLE_STREAM_JSON_HOOK_EVENTS", "1"),
("CLAUDE_CODE_SHELL", "/bin/sh"),
("CLAUDE_CODE_SKIP_INITIAL_MESSAGES", "1"),
("CLAUDE_CODE_SKIP_SANDBOX_INIT", "1"),
("CLAUDE_CODE_SIMPLE_SHELL_EXEC", "1"),
("CLAUDE_CODE_SWAP_STDIO", "0"),
("CLAUDE_CODE_USE_PIPE_OUTPUT", "1"),
("DISABLE_TELEMETRY", "1"),
("SHELL", "/bin/sh"),
("USE_BUILTIN_RIPGREP", "0"),
],
},
"codex" => AgentConfigDef {
acp_adapter: "@rivet-dev/agent-os-codex-agent",
agent_package: "@rivet-dev/agent-os-codex",
default_env: &[],
},
_ => return None,
})
}
fn find_package_dir(module_access_cwd: &str, package_name: &str) -> Option<std::path::PathBuf> {
let node_modules = std::path::Path::new(module_access_cwd).join("node_modules");
let hoisted = node_modules.join(package_name);
if hoisted.join("package.json").is_file() {
return Some(hoisted);
}
let pnpm_store = node_modules.join(".pnpm");
for entry in std::fs::read_dir(&pnpm_store).ok()?.flatten() {
let candidate = entry.path().join("node_modules").join(package_name);
if candidate.join("package.json").is_file() {
return Some(candidate);
}
}
None
}
fn host_node_modules_path_to_guest(module_access_cwd: &str, host_path: &std::path::Path) -> Option<String> {
let node_modules = std::path::Path::new(module_access_cwd).join("node_modules");
let relative = host_path.strip_prefix(&node_modules).ok()?;
Some(format!("/root/node_modules/{}", relative.to_string_lossy()))
}
fn resolve_package_bin(
module_access_cwd: &str,
package_name: &str,
bin_name: Option<&str>,
) -> std::result::Result<String, ClientError> {
let package_dir = find_package_dir(module_access_cwd, package_name).ok_or_else(|| {
ClientError::Sidecar(format!(
"package not found: {package_name} (looked under {module_access_cwd}/node_modules and its .pnpm store)"
))
})?;
let pkg_json_path = package_dir.join("package.json");
let contents = std::fs::read_to_string(&pkg_json_path).map_err(|error| {
ClientError::Sidecar(format!("cannot read {}: {error}", pkg_json_path.display()))
})?;
let pkg: Value = serde_json::from_str(&contents).map_err(|error| {
ClientError::Sidecar(format!("invalid package.json for {package_name}: {error}"))
})?;
let bin_entry: Option<String> = match &pkg["bin"] {
Value::String(bin) => Some(bin.clone()),
Value::Object(map) => bin_name
.and_then(|name| map.get(name))
.or_else(|| map.get(package_name))
.or_else(|| map.values().next())
.and_then(|value| value.as_str())
.map(|bin| bin.to_string()),
_ => None,
};
let bin_entry = bin_entry.ok_or_else(|| {
ClientError::Sidecar(format!("No bin entry found in {package_name}/package.json"))
})?;
let bin_host_path = package_dir.join(bin_entry.trim_start_matches("./"));
host_node_modules_path_to_guest(module_access_cwd, &bin_host_path).ok_or_else(|| {
ClientError::Sidecar(format!(
"resolved bin for {package_name} is outside module access node_modules: {}",
bin_host_path.display()
))
})
}
#[cfg(test)]
mod resolve_package_bin_tests {
use super::resolve_package_bin;
use std::fs;
use std::path::{Path, PathBuf};
fn fixture(label: &str) -> PathBuf {
let dir = std::env::temp_dir().join(format!(
"agentos-resolve-bin-{}-{}",
std::process::id(),
label
));
let _ = fs::remove_dir_all(&dir);
dir
}
fn write_pkg(root: &Path, rel_pkg_dir: &str, bin_json: &str) {
let pkg_dir = root.join(rel_pkg_dir);
fs::create_dir_all(&pkg_dir).expect("mkdir pkg");
fs::write(
pkg_dir.join("package.json"),
format!(r#"{{"name":"x","bin":{bin_json}}}"#),
)
.expect("write package.json");
}
#[test]
fn resolves_hoisted_package_to_top_level_guest_path() {
let root = fixture("hoisted");
write_pkg(
&root,
"node_modules/@scope/pkg",
r#"{"the-bin":"./dist/adapter.js"}"#,
);
let result = resolve_package_bin(root.to_str().unwrap(), "@scope/pkg", Some("the-bin"));
let _ = fs::remove_dir_all(&root);
assert_eq!(
result.unwrap(),
"/root/node_modules/@scope/pkg/dist/adapter.js"
);
}
#[test]
fn resolves_pnpm_nested_package_to_its_real_deep_guest_path() {
let root = fixture("pnpm");
let key = "@scope+pkg@1.0.0_peer";
write_pkg(
&root,
&format!("node_modules/.pnpm/{key}/node_modules/@scope/pkg"),
r#"{"the-bin":"./dist/adapter.js"}"#,
);
let result = resolve_package_bin(root.to_str().unwrap(), "@scope/pkg", Some("the-bin"));
let _ = fs::remove_dir_all(&root);
assert_eq!(
result.unwrap(),
format!("/root/node_modules/.pnpm/{key}/node_modules/@scope/pkg/dist/adapter.js")
);
}
#[test]
fn prefers_hoisted_over_pnpm_when_both_exist() {
let root = fixture("both");
write_pkg(&root, "node_modules/pkg", r#""./hoisted.js""#);
write_pkg(
&root,
"node_modules/.pnpm/pkg@1/node_modules/pkg",
r#""./nested.js""#,
);
let result = resolve_package_bin(root.to_str().unwrap(), "pkg", None);
let _ = fs::remove_dir_all(&root);
assert_eq!(result.unwrap(), "/root/node_modules/pkg/hoisted.js");
}
#[test]
fn missing_package_is_an_error() {
let root = fixture("missing");
fs::create_dir_all(root.join("node_modules")).expect("mkdir");
let result = resolve_package_bin(root.to_str().unwrap(), "nope", None);
let _ = fs::remove_dir_all(&root);
assert!(result.is_err());
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum McpServerConfig {
Local {
command: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
args: Vec<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
env: BTreeMap<String, String>,
},
Remote {
url: String,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
headers: BTreeMap<String, String>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateSessionOptions {
pub cwd: Option<String>,
pub env: BTreeMap<String, String>,
pub mcp_servers: Vec<McpServerConfig>,
pub skip_os_instructions: bool,
pub additional_instructions: Option<String>,
}
impl Default for CreateSessionOptions {
fn default() -> Self {
Self {
cwd: None,
env: BTreeMap::new(),
mcp_servers: Vec::new(),
skip_os_instructions: false,
additional_instructions: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionId {
#[serde(rename = "sessionId")]
pub session_id: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PromptResult {
pub response: JsonRpcResponse,
pub text: String,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct GetEventsOptions {
pub since: Option<i64>,
pub method: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SessionMode {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct SessionModeState {
#[serde(default, rename = "currentModeId")]
pub current_mode_id: String,
#[serde(default, rename = "availableModes")]
pub available_modes: Vec<SessionMode>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ConfigAllowedValue {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionConfigOption {
#[serde(default)]
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub category: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, rename = "currentValue", skip_serializing_if = "Option::is_none")]
pub current_value: Option<String>,
#[serde(default, rename = "allowedValues", skip_serializing_if = "Option::is_none")]
pub allowed_values: Option<Vec<ConfigAllowedValue>>,
#[serde(default, rename = "readOnly", skip_serializing_if = "Option::is_none")]
pub read_only: Option<bool>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct PromptCapabilities {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub audio: Option<bool>,
#[serde(default, rename = "embeddedContext", skip_serializing_if = "Option::is_none")]
pub embedded_context: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub image: Option<bool>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct AgentCapabilities {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub permissions: Option<bool>,
#[serde(default, rename = "plan_mode", skip_serializing_if = "Option::is_none")]
pub plan_mode: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub questions: Option<bool>,
#[serde(default, rename = "tool_calls", skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<bool>,
#[serde(default, rename = "text_messages", skip_serializing_if = "Option::is_none")]
pub text_messages: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub images: Option<bool>,
#[serde(default, rename = "file_attachments", skip_serializing_if = "Option::is_none")]
pub file_attachments: Option<bool>,
#[serde(default, rename = "session_lifecycle", skip_serializing_if = "Option::is_none")]
pub session_lifecycle: Option<bool>,
#[serde(default, rename = "error_events", skip_serializing_if = "Option::is_none")]
pub error_events: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reasoning: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<bool>,
#[serde(default, rename = "streaming_deltas", skip_serializing_if = "Option::is_none")]
pub streaming_deltas: Option<bool>,
#[serde(default, rename = "mcp_tools", skip_serializing_if = "Option::is_none")]
pub mcp_tools: Option<bool>,
#[serde(default, rename = "promptCapabilities", skip_serializing_if = "Option::is_none")]
pub prompt_capabilities: Option<PromptCapabilities>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AgentInfo {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct SessionInitData {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub modes: Option<SessionModeState>,
#[serde(default, rename = "configOptions", skip_serializing_if = "Option::is_none")]
pub config_options: Option<Vec<SessionConfigOption>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub capabilities: Option<AgentCapabilities>,
#[serde(default, rename = "agentInfo", skip_serializing_if = "Option::is_none")]
pub agent_info: Option<AgentInfo>,
}
#[derive(Clone)]
pub struct PermissionResponder {
inner: std::sync::Arc<parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<PermissionReply>>>>,
}
impl PermissionResponder {
pub fn new() -> (Self, tokio::sync::oneshot::Receiver<PermissionReply>) {
let (tx, rx) = tokio::sync::oneshot::channel();
(
Self {
inner: std::sync::Arc::new(parking_lot::Mutex::new(Some(tx))),
},
rx,
)
}
pub fn respond(&self, reply: PermissionReply) {
if let Some(tx) = self.inner.lock().take() {
let _ = tx.send(reply);
}
}
}
#[derive(Clone)]
pub struct PermissionRequest {
pub permission_id: String,
pub description: Option<String>,
pub params: Value,
pub responder: PermissionResponder,
}
impl std::fmt::Debug for PermissionRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PermissionRequest")
.field("permission_id", &self.permission_id)
.field("description", &self.description)
.field("params", &self.params)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PermissionReply {
Once,
Always,
Reject,
}
fn normalize_acp_permission_option_id(
options: Option<&Vec<Value>>,
reply: PermissionReply,
) -> String {
let (option_ids, kinds): (&[&str], &[&str]) = match reply {
PermissionReply::Always => (&["always", "allow_always"], &["allow_always"]),
PermissionReply::Once => (&["once", "allow_once"], &["allow_once"]),
PermissionReply::Reject => (&["reject", "reject_once"], &["reject_once"]),
};
let matched = options.and_then(|options| {
options.iter().find_map(|option| {
let option_id = option.get("optionId").and_then(Value::as_str);
let kind = option.get("kind").and_then(Value::as_str);
let hit = option_id.map(|id| option_ids.contains(&id)).unwrap_or(false)
|| kind.map(|kind| kinds.contains(&kind)).unwrap_or(false);
if hit {
option_id.map(str::to_string)
} else {
None
}
})
});
matched.unwrap_or_else(|| {
match reply {
PermissionReply::Always => "allow_always",
PermissionReply::Once => "allow_once",
PermissionReply::Reject => "reject_once",
}
.to_string()
})
}
fn build_acp_permission_result(reply: PermissionReply, params: &Value) -> Value {
let options: Option<Vec<Value>> = params.get("options").and_then(Value::as_array).map(|array| {
array
.iter()
.filter(|option| option.is_object())
.cloned()
.collect()
});
let option_id = normalize_acp_permission_option_id(options.as_ref(), reply);
json!({
"outcome": {
"outcome": "selected",
"optionId": option_id,
}
})
}
fn agent_capabilities_is_empty(caps: &AgentCapabilities) -> bool {
caps.permissions.is_none()
&& caps.plan_mode.is_none()
&& caps.questions.is_none()
&& caps.tool_calls.is_none()
&& caps.text_messages.is_none()
&& caps.images.is_none()
&& caps.file_attachments.is_none()
&& caps.session_lifecycle.is_none()
&& caps.error_events.is_none()
&& caps.reasoning.is_none()
&& caps.status.is_none()
&& caps.streaming_deltas.is_none()
&& caps.mcp_tools.is_none()
&& caps.prompt_capabilities.is_none()
&& caps.extra.is_empty()
}
fn should_dispatch_to_session_event_handlers(notification: &JsonRpcNotification) -> bool {
notification.method == "session/update"
}
fn merge_sequenced_events(ring: &mut VecDeque<SequencedEvent>, incoming: Vec<SequencedEvent>) {
let mut by_sequence: BTreeMap<i64, SequencedEvent> = BTreeMap::new();
for event in ring.drain(..) {
by_sequence.insert(event.sequence_number, event);
}
for event in incoming {
by_sequence.insert(event.sequence_number, event);
}
let mut merged: Vec<SequencedEvent> = by_sequence.into_values().collect();
if merged.len() > ACP_SESSION_EVENT_RETENTION_LIMIT {
let start = merged.len() - ACP_SESSION_EVENT_RETENTION_LIMIT;
merged.drain(0..start);
}
*ring = merged.into();
}
fn next_highest_sequence_number(current: Option<i64>, ring: &VecDeque<SequencedEvent>) -> Option<i64> {
let Some(latest) = ring.back().map(|event| event.sequence_number) else {
return current;
};
match current {
None => Some(latest),
Some(current) => Some(current.max(latest)),
}
}
fn next_synthetic_sequence_number(ring: &VecDeque<SequencedEvent>) -> i64 {
let min = ring
.iter()
.map(|event| event.sequence_number)
.fold(0i64, i64::min);
min - 1
}
fn apply_session_update(entry: &SessionEntry, notification: &JsonRpcNotification) {
if notification.method != "session/update" {
return;
}
let params = notification.params.clone().unwrap_or(Value::Null);
let update = params
.get("update")
.cloned()
.unwrap_or_else(|| params.clone());
let session_update = update.get("sessionUpdate").and_then(Value::as_str);
if session_update == Some("current_mode_update") {
if let Some(current_mode_id) = update.get("currentModeId").and_then(Value::as_str) {
let mut modes = entry.modes.lock();
if let Some(modes) = modes.as_mut() {
modes.current_mode_id = current_mode_id.to_string();
}
}
}
if matches!(
session_update,
Some("config_option_update") | Some("config_options_update")
) {
if let Some(config_options) = update.get("configOptions").and_then(Value::as_array) {
if let Ok(parsed) =
serde_json::from_value::<Vec<SessionConfigOption>>(Value::Array(config_options.clone()))
{
*entry.config_options.lock() = parsed;
}
}
}
}
fn apply_synthetic_config_overrides(entry: &SessionEntry) {
let overrides = entry.config_overrides.lock().clone();
if overrides.is_empty() {
return;
}
let mut options = entry.config_options.lock();
for option in options.iter_mut() {
let override_value = overrides
.get(&option.id)
.filter(|_| !option.id.starts_with(PENDING_METHOD_PREFIX))
.cloned()
.or_else(|| {
option
.category
.as_ref()
.and_then(|category| overrides.get(category).cloned())
});
if let Some(value) = override_value {
option.current_value = Some(value);
}
}
}
const PENDING_METHOD_PREFIX: &str = "__pending_method::";
fn record_session_notification(
entry: &SessionEntry,
sequence_number: i64,
notification: JsonRpcNotification,
) {
{
let mut ring = entry.event_ring.lock();
merge_sequenced_events(
&mut ring,
vec![SequencedEvent {
sequence_number,
notification: notification.clone(),
}],
);
let next = next_highest_sequence_number(
Some(entry.highest_sequence_number.load(Ordering::SeqCst)),
&ring,
);
if let Some(next) = next {
entry.highest_sequence_number.store(next, Ordering::SeqCst);
}
}
apply_session_update(entry, ¬ification);
if should_dispatch_to_session_event_handlers(¬ification) {
let _ = entry.event_tx.send(SequencedEvent {
sequence_number,
notification: notification.clone(),
});
}
if notification.method == LEGACY_PERMISSION_METHOD
|| notification.method == ACP_PERMISSION_METHOD
{
let params = notification.params.clone().unwrap_or(Value::Null);
let permission_id = match params.get("permissionId") {
Some(Value::String(id)) => Some(id.clone()),
Some(Value::Number(num)) => Some(num.to_string()),
_ => None,
};
if let Some(permission_id) = permission_id {
let description = params
.get("description")
.and_then(Value::as_str)
.map(str::to_string);
let (responder, _receiver) = PermissionResponder::new();
let request = PermissionRequest {
permission_id,
description,
params,
responder,
};
let _ = entry.permission_tx.send(request);
}
}
}
fn build_permission_request(
notification: &JsonRpcNotification,
) -> Option<(
String,
PermissionRequest,
Value,
tokio::sync::oneshot::Receiver<PermissionReply>,
)> {
let raw_params = notification.params.clone().unwrap_or(Value::Null);
let permission_id = match raw_params.get("permissionId") {
Some(Value::String(id)) => id.clone(),
Some(Value::Number(num)) => num.to_string(),
_ => return None,
};
let delivered_params = if notification.method == ACP_PERMISSION_METHOD {
let mut object = match raw_params {
Value::Object(existing) => existing,
_ => serde_json::Map::new(),
};
object.insert("permissionId".to_string(), Value::String(permission_id.clone()));
object.insert(
"_acpMethod".to_string(),
Value::String(notification.method.clone()),
);
Value::Object(object)
} else {
raw_params
};
let description = delivered_params
.get("description")
.and_then(Value::as_str)
.map(str::to_string);
let (responder, receiver) = PermissionResponder::new();
let request = PermissionRequest {
permission_id: permission_id.clone(),
description,
params: delivered_params.clone(),
responder,
};
Some((permission_id, request, delivered_params, receiver))
}
fn sync_session_state(entry: &SessionEntry, state: &SessionStateResponse) {
*entry.modes.lock() = state
.modes
.as_ref()
.filter(|value| value.is_object())
.and_then(|value| serde_json::from_value(value.clone()).ok());
*entry.config_options.lock() = state
.config_options
.iter()
.filter_map(|value| serde_json::from_value(value.clone()).ok())
.collect();
apply_synthetic_config_overrides(entry);
*entry.capabilities.lock() = state
.agent_capabilities
.as_ref()
.filter(|value| value.is_object())
.and_then(|value| serde_json::from_value(value.clone()).ok());
*entry.agent_info.lock() = state
.agent_info
.as_ref()
.filter(|value| value.is_object())
.and_then(|value| serde_json::from_value(value.clone()).ok());
let incoming: Vec<SequencedEvent> = state
.events
.iter()
.filter_map(|event| {
serde_json::from_value::<JsonRpcNotification>(event.notification.clone())
.ok()
.map(|notification| SequencedEvent {
sequence_number: event.sequence_number as i64,
notification,
})
})
.collect();
let mut ring = entry.event_ring.lock();
merge_sequenced_events(&mut ring, incoming);
let next = next_highest_sequence_number(
Some(entry.highest_sequence_number.load(Ordering::SeqCst)),
&ring,
);
if let Some(next) = next {
entry.highest_sequence_number.store(next, Ordering::SeqCst);
}
}
fn unsupported_config_response(agent_type: &str, category: &str) -> JsonRpcResponse {
let message = if agent_type == "opencode" && category == "model" {
"OpenCode reports available models, but model switching must be configured before createSession() because ACP session/set_config_option is not implemented.".to_string()
} else {
format!("The {category} config option is read-only for {agent_type} sessions.")
};
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: None,
error: Some(JsonRpcError {
code: -32601,
message,
data: None,
}),
}
}
fn apply_codex_config_fallback(entry: &SessionEntry, category: &str, value: &str) -> JsonRpcResponse {
{
let options = entry.config_options.lock();
let matching_id = options
.iter()
.find(|option| option.category.as_deref() == Some(category))
.map(|option| option.id.clone());
drop(options);
let mut overrides = entry.config_overrides.lock();
if let Some(id) = matching_id {
overrides.insert(id, value.to_string());
}
overrides.insert(category.to_string(), value.to_string());
}
apply_synthetic_config_overrides(entry);
let config_options = entry.config_options.lock().clone();
let synthetic_seq = {
let ring = entry.event_ring.lock();
next_synthetic_sequence_number(&ring)
};
record_session_notification(
entry,
synthetic_seq,
JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "session/update".to_string(),
params: Some(json!({
"update": {
"sessionUpdate": "config_option_update",
"configOptions": config_options,
}
})),
},
);
let config_options = entry.config_options.lock().clone();
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: Some(json!({
"configOptions": config_options,
"via": "codex-config-fallback",
})),
error: None,
}
}
fn augment_prompt_params(entry: &SessionEntry, params: Option<Value>) -> Option<Value> {
if entry.agent_type != "codex" {
return params;
}
let (model, thought_level) = {
let options = entry.config_options.lock();
let model = options
.iter()
.find(|option| option.category.as_deref() == Some("model"))
.and_then(|option| option.current_value.clone());
let thought_level = options
.iter()
.find(|option| option.category.as_deref() == Some("thought_level"))
.and_then(|option| option.current_value.clone());
(model, thought_level)
};
if model.is_none() && thought_level.is_none() {
return params;
}
let mut meta = match params.as_ref().and_then(|p| p.get("_meta")) {
Some(Value::Object(existing)) => existing.clone(),
_ => serde_json::Map::new(),
};
let mut codex_config = serde_json::Map::new();
if let Some(model) = model {
codex_config.insert("model".to_string(), Value::String(model));
}
if let Some(thought_level) = thought_level {
codex_config.insert("thought_level".to_string(), Value::String(thought_level));
}
meta.insert(
"agentOsCodexConfig".to_string(),
Value::Object(codex_config),
);
let mut object = match params {
Some(Value::Object(existing)) => existing,
_ => serde_json::Map::new(),
};
object.insert("_meta".to_string(), Value::Object(meta));
Some(Value::Object(object))
}
fn session_closed_response(session_id: &str) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Session closed: {session_id}"),
data: None,
}),
}
}
impl AgentOs {
fn session_ownership(&self) -> OwnershipScope {
OwnershipScope::vm(
self.connection_id().to_string(),
self.wire_session_id().to_string(),
self.vm_id().to_string(),
)
}
fn require_session<R>(
&self,
session_id: &str,
f: impl FnOnce(&SessionEntry) -> R,
) -> std::result::Result<R, ClientError> {
self.inner()
.sessions
.read(session_id, |_, entry| f(entry))
.ok_or_else(|| ClientError::SessionNotFound(session_id.to_string()))
}
async fn hydrate_session_state(&self, session_id: &str) -> std::result::Result<(), ClientError> {
let acknowledged = self.require_session(session_id, |entry| {
let highest = entry.highest_sequence_number.load(Ordering::SeqCst);
if highest >= 0 {
Some(highest as u64)
} else {
None
}
})?;
let response = self
.transport()
.request(
self.session_ownership(),
RequestPayload::GetSessionState(GetSessionStateRequest {
session_id: session_id.to_string(),
acknowledged_sequence_number: acknowledged,
}),
)
.await?;
let state = match response {
ResponsePayload::SessionState(state) => state,
ResponsePayload::Rejected(rejected) => {
return Err(ClientError::Kernel {
code: rejected.code,
message: rejected.message,
});
}
other => {
return Err(ClientError::Sidecar(format!(
"unexpected response to GetSessionState: {other:?}"
)));
}
};
self.require_session(session_id, |entry| sync_session_state(entry, &state))?;
Ok(())
}
pub(crate) async fn send_session_request(
&self,
session_id: &str,
method: &str,
params: Option<Value>,
) -> std::result::Result<JsonRpcResponse, ClientError> {
let request_params = if method == "session/prompt" {
self.require_session(session_id, |entry| augment_prompt_params(entry, params.clone()))?
} else {
params
};
let resolver_id = self.inner().request_counter.fetch_add(1, Ordering::SeqCst);
let (resolve_tx, resolve_rx) =
tokio::sync::oneshot::channel::<JsonRpcResponse>();
self.require_session(session_id, |entry| {
let _ = entry.pending_prompt_resolvers.insert(resolver_id, resolve_tx);
entry
.config_overrides
.lock()
.entry(format!("{PENDING_METHOD_PREFIX}{resolver_id}"))
.or_insert_with(|| method.to_string());
})?;
let transport = self.transport();
let ownership = self.session_ownership();
let session_request = SessionRequest {
session_id: session_id.to_string(),
method: method.to_string(),
params: request_params.clone(),
};
let rpc = transport.request(ownership, RequestPayload::SessionRequest(session_request));
tokio::pin!(rpc);
let response = tokio::select! {
biased;
resolved = resolve_rx => {
self.cleanup_pending_resolver(session_id, resolver_id);
match resolved {
Ok(response) => return Ok(response),
Err(_) => return Ok(session_closed_response(session_id)),
}
}
result = &mut rpc => {
self.cleanup_pending_resolver(session_id, resolver_id);
result?
}
};
let response = match response {
ResponsePayload::SessionRpc(rpc) => {
serde_json::from_value::<JsonRpcResponse>(rpc.response).map_err(|err| {
ClientError::Sidecar(format!("malformed session rpc response: {err}"))
})?
}
ResponsePayload::Rejected(rejected) => {
return Err(ClientError::Kernel {
code: rejected.code,
message: rejected.message,
});
}
other => {
return Err(ClientError::Sidecar(format!(
"unexpected response to SessionRequest: {other:?}"
)));
}
};
let _ = self.hydrate_session_state(session_id).await;
if response.error.is_none() {
self.apply_post_send_cache_updates(session_id, method, request_params.as_ref())?;
}
Ok(response)
}
fn cleanup_pending_resolver(&self, session_id: &str, resolver_id: i64) {
let _ = self.require_session(session_id, |entry| {
let _ = entry.pending_prompt_resolvers.remove(&resolver_id);
entry
.config_overrides
.lock()
.remove(&format!("{PENDING_METHOD_PREFIX}{resolver_id}"));
});
}
fn apply_post_send_cache_updates(
&self,
session_id: &str,
method: &str,
params: Option<&Value>,
) -> std::result::Result<(), ClientError> {
self.require_session(session_id, |entry| {
if method == "session/set_mode" {
if let Some(mode_id) = params.and_then(|p| p.get("modeId")).and_then(Value::as_str) {
let mut modes = entry.modes.lock();
if let Some(modes) = modes.as_mut() {
modes.current_mode_id = mode_id.to_string();
}
}
}
if method == "session/set_config_option" {
let config_id = params.and_then(|p| p.get("configId")).and_then(Value::as_str);
let value = params.and_then(|p| p.get("value")).and_then(Value::as_str);
if let (Some(config_id), Some(value)) = (config_id, value) {
let mut options = entry.config_options.lock();
for option in options.iter_mut() {
if option.id == config_id {
option.current_value = Some(value.to_string());
}
}
}
}
})
}
async fn set_session_config_by_category(
&self,
session_id: &str,
category: &str,
value: &str,
) -> std::result::Result<JsonRpcResponse, ClientError> {
let (read_only, config_id, agent_type) = self.require_session(session_id, |entry| {
let options = entry.config_options.lock();
let option = options
.iter()
.find(|option| option.category.as_deref() == Some(category));
(
option.and_then(|option| option.read_only).unwrap_or(false),
option.map(|option| option.id.clone()),
entry.agent_type.clone(),
)
})?;
if read_only {
return Ok(unsupported_config_response(&agent_type, category));
}
let config_id = config_id.unwrap_or_else(|| category.to_string());
let response = self
.send_session_request(
session_id,
"session/set_config_option",
Some(json!({ "configId": config_id, "value": value })),
)
.await?;
let is_codex_method_not_found = agent_type == "codex"
&& response
.error
.as_ref()
.map(|error| {
error.code == -32601
&& error
.data
.as_ref()
.and_then(|data| data.get("method"))
.and_then(Value::as_str)
== Some("session/set_config_option")
})
.unwrap_or(false);
if is_codex_method_not_found {
return self.require_session(session_id, |entry| {
apply_codex_config_fallback(entry, category, value)
});
}
Ok(response)
}
pub fn list_sessions(&self) -> Vec<SessionInfo> {
let mut sessions = Vec::new();
self.inner().sessions.scan(|session_id, entry| {
sessions.push(SessionInfo {
session_id: session_id.clone(),
agent_type: entry.agent_type.clone(),
});
});
sessions
}
pub fn list_agents(&self) -> Vec<AgentRegistryEntry> {
let module_access_cwd = self
.config()
.module_access_cwd
.clone()
.unwrap_or_else(|| ".".to_string());
BUILTIN_AGENT_IDS
.iter()
.filter_map(|id| {
let config = agent_config(id)?;
let installed =
resolve_package_bin(&module_access_cwd, config.acp_adapter, None).is_ok();
Some(AgentRegistryEntry {
id: (*id).to_string(),
acp_adapter: config.acp_adapter.to_string(),
agent_package: config.agent_package.to_string(),
installed,
})
})
.collect()
}
pub async fn create_session(
&self,
agent_type: &str,
options: CreateSessionOptions,
) -> Result<SessionId> {
let config = agent_config(agent_type)
.ok_or_else(|| ClientError::Sidecar(format!("Unknown agent type: {agent_type}")))?;
let module_access_cwd = self
.config()
.module_access_cwd
.clone()
.unwrap_or_else(|| ".".to_string());
let adapter_entrypoint =
resolve_package_bin(&module_access_cwd, config.acp_adapter, None)?;
let (args, prepared_env) = self.prepare_instructions(agent_type, &options).await?;
let mut env: BTreeMap<String, String> = config
.default_env
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect();
for (key, value) in prepared_env {
env.insert(key, value);
}
for (key, value) in &options.env {
env.insert(key.clone(), value.clone());
}
if (agent_type == "pi" || agent_type == "pi-cli")
&& !env.contains_key("PI_ACP_PI_COMMAND")
{
if let Ok(pi_command) =
resolve_package_bin(&module_access_cwd, config.agent_package, Some("pi"))
{
env.insert("PI_ACP_PI_COMMAND".to_string(), pi_command);
}
}
let cwd = options.cwd.clone().unwrap_or_else(|| "/home/user".to_string());
let mcp_servers: Vec<Value> = options
.mcp_servers
.iter()
.filter_map(|server| serde_json::to_value(server).ok())
.collect();
let client_capabilities = json!({
"fs": { "readTextFile": true, "writeTextFile": true },
"terminal": true,
});
let response = self
.transport()
.request(
self.session_ownership(),
RequestPayload::CreateSession(CreateSessionRequest {
agent_type: agent_type.to_string(),
runtime: GuestRuntimeKind::JavaScript,
adapter_entrypoint,
args,
env,
cwd,
mcp_servers,
protocol_version: crate::ACP_PROTOCOL_VERSION,
client_capabilities,
}),
)
.await?;
let created: SessionCreatedResponse = match response {
ResponsePayload::SessionCreated(created) => created,
ResponsePayload::Rejected(rejected) => {
return Err(ClientError::Kernel {
code: rejected.code,
message: rejected.message,
}
.into());
}
other => {
return Err(ClientError::Sidecar(format!(
"unexpected create_session response: {other:?}"
))
.into());
}
};
let state = SessionStateResponse {
session_id: created.session_id.clone(),
agent_type: agent_type.to_string(),
process_id: String::new(),
pid: created.pid,
closed: false,
modes: created.modes,
config_options: created.config_options,
agent_capabilities: created.agent_capabilities,
agent_info: created.agent_info,
events: Vec::new(),
};
self.register_session(&created.session_id, agent_type, &state)
.await?;
Ok(SessionId {
session_id: created.session_id,
})
}
pub(crate) async fn register_session(
&self,
session_id: &str,
agent_type: &str,
state: &SessionStateResponse,
) -> std::result::Result<(), ClientError> {
{
let mut closed = self.inner().closed_session_ids.lock();
closed.retain(|id| id != session_id);
}
let (event_tx, _) = tokio::sync::broadcast::channel(ACP_SESSION_EVENT_RETENTION_LIMIT.max(1));
let (permission_tx, _) = tokio::sync::broadcast::channel(64);
let entry = SessionEntry {
agent_type: agent_type.to_string(),
modes: parking_lot::Mutex::new(None),
config_options: parking_lot::Mutex::new(Vec::new()),
capabilities: parking_lot::Mutex::new(None),
agent_info: parking_lot::Mutex::new(None),
config_overrides: parking_lot::Mutex::new(BTreeMap::new()),
event_ring: parking_lot::Mutex::new(VecDeque::new()),
highest_sequence_number: std::sync::atomic::AtomicI64::new(-1),
event_tx,
permission_tx,
pending_permission_replies: scc::HashMap::new(),
pending_prompt_resolvers: scc::HashMap::new(),
};
sync_session_state(&entry, state);
let _ = self
.inner()
.sessions
.insert(session_id.to_string(), entry);
match self.hydrate_session_state(session_id).await {
Ok(()) => Ok(()),
Err(error) => {
let _ = self.inner().sessions.remove(session_id);
Err(error)
}
}
}
async fn read_vm_instructions(
&self,
additional: Option<&str>,
skip_base: bool,
) -> Result<String> {
let mut parts: Vec<String> = Vec::new();
if !skip_base {
let data = self.read_file("/etc/agentos/instructions.md").await?;
parts.push(String::from_utf8_lossy(&data).into_owned());
}
if let Some(additional) = additional {
if !additional.is_empty() {
parts.push(additional.to_string());
}
}
if parts.is_empty() {
return Ok(String::new());
}
parts.push("---".to_string());
Ok(parts.join("\n\n"))
}
async fn prepare_instructions(
&self,
agent_type: &str,
options: &CreateSessionOptions,
) -> Result<(Vec<String>, BTreeMap<String, String>)> {
let skip_base = options.skip_os_instructions;
match agent_type {
"pi" | "pi-cli" | "claude" | "codex" => {
let flag = if agent_type == "codex" {
"--append-developer-instructions"
} else {
"--append-system-prompt"
};
if !skip_base || options.additional_instructions.is_some() {
let instructions = self
.read_vm_instructions(options.additional_instructions.as_deref(), skip_base)
.await?;
if !instructions.is_empty() {
return Ok((vec![flag.to_string(), instructions], BTreeMap::new()));
}
}
Ok((Vec::new(), BTreeMap::new()))
}
"opencode" => {
let mut context_paths: Vec<String> = if skip_base {
Vec::new()
} else {
OPENCODE_CONTEXT_PATHS
.iter()
.map(|path| (*path).to_string())
.collect()
};
if let Some(additional) = options.additional_instructions.as_deref() {
if !additional.is_empty() {
let path = "/tmp/agentos-additional-instructions.md";
self.write_file(path, crate::fs::FileContent::Text(additional.to_string()))
.await?;
context_paths.push(path.to_string());
}
}
if context_paths.is_empty() {
return Ok((Vec::new(), BTreeMap::new()));
}
let mut env = BTreeMap::new();
env.insert(
"OPENCODE_CONTEXTPATHS".to_string(),
serde_json::to_string(&context_paths).unwrap_or_default(),
);
Ok((Vec::new(), env))
}
_ => Ok((Vec::new(), BTreeMap::new())),
}
}
pub fn resume_session(&self, session_id: &str) -> std::result::Result<SessionId, ClientError> {
self.require_session(session_id, |_| ())?;
Ok(SessionId {
session_id: session_id.to_string(),
})
}
pub async fn destroy_session(&self, session_id: &str) -> Result<()> {
self.require_session(session_id, |_| ())?;
let _ = self.cancel_session(session_id).await;
self.close_session_internal(session_id).await?;
Ok(())
}
pub async fn prompt(&self, session_id: &str, text: &str) -> Result<PromptResult> {
let (mut rx, start_after) = self.require_session(session_id, |entry| {
let ring = entry.event_ring.lock();
let latest = ring
.iter()
.rev()
.find(|event| should_dispatch_to_session_event_handlers(&event.notification))
.map(|event| event.sequence_number)
.unwrap_or(i64::MIN);
(entry.event_tx.subscribe(), latest)
})?;
let mut chunks: BTreeMap<i64, String> = BTreeMap::new();
let accumulate = |event: &SequencedEvent, chunks: &mut BTreeMap<i64, String>| {
if event.sequence_number <= start_after {
return;
}
let params = event.notification.params.clone().unwrap_or(Value::Null);
let update = params.get("update").cloned().unwrap_or(Value::Null);
if update.get("sessionUpdate").and_then(Value::as_str) == Some("agent_message_chunk") {
if let Some(chunk) = update
.get("content")
.and_then(|content| content.get("text"))
.and_then(Value::as_str)
{
chunks.insert(event.sequence_number, chunk.to_string());
}
}
};
let request = self.send_session_request(
session_id,
"session/prompt",
Some(json!({ "prompt": [{ "type": "text", "text": text }] })),
);
tokio::pin!(request);
let response = loop {
tokio::select! {
biased;
result = &mut request => break result,
event = rx.recv() => {
match event {
Ok(event) => accumulate(&event, &mut chunks),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break (&mut request).await;
}
}
}
}
};
loop {
match rx.try_recv() {
Ok(event) => accumulate(&event, &mut chunks),
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
| Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break,
}
}
drop(rx);
if let Ok(ring_events) = self.get_session_events(
session_id,
GetEventsOptions {
since: Some(start_after),
method: None,
},
) {
for event in &ring_events {
accumulate(event, &mut chunks);
}
}
let response = response?;
let agent_text: String = chunks.into_values().collect();
Ok(PromptResult {
response,
text: agent_text,
})
}
pub async fn cancel_session(&self, session_id: &str) -> Result<JsonRpcResponse> {
self.require_session(session_id, |_| ())?;
let cancelled_pending_prompt = self.cancel_pending_prompt_requests(session_id)?;
if cancelled_pending_prompt {
let this = self.clone();
let session_id_owned = session_id.to_string();
tokio::spawn(async move {
let _ = this
.send_session_request(&session_id_owned, "session/cancel", None)
.await;
});
return Ok(JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: Some(json!({
"cancelled": true,
"requested": true,
"via": "prompt-fallback",
})),
error: None,
});
}
Ok(self
.send_session_request(session_id, "session/cancel", None)
.await?)
}
fn cancel_pending_prompt_requests(
&self,
session_id: &str,
) -> std::result::Result<bool, ClientError> {
self.require_session(session_id, |entry| {
let mut prompt_resolver_ids = Vec::new();
{
let overrides = entry.config_overrides.lock();
for (key, method) in overrides.iter() {
if let Some(id) = key.strip_prefix(PENDING_METHOD_PREFIX) {
if method == "session/prompt" {
if let Ok(id) = id.parse::<i64>() {
prompt_resolver_ids.push(id);
}
}
}
}
}
let mut cancelled = false;
for id in prompt_resolver_ids {
if let Some((_, resolver)) = entry.pending_prompt_resolvers.remove(&id) {
let _ = resolver.send(JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: Some(json!({ "stopReason": "cancelled" })),
error: None,
});
cancelled = true;
}
entry
.config_overrides
.lock()
.remove(&format!("{PENDING_METHOD_PREFIX}{id}"));
}
cancelled
})
}
fn abort_pending_session_requests(&self, session_id: &str) {
let _ = self.require_session(session_id, |entry| {
let mut ids = Vec::new();
entry
.pending_prompt_resolvers
.scan(|id, _| ids.push(*id));
for id in ids {
if let Some((_, resolver)) = entry.pending_prompt_resolvers.remove(&id) {
let _ = resolver.send(session_closed_response(session_id));
}
entry
.config_overrides
.lock()
.remove(&format!("{PENDING_METHOD_PREFIX}{id}"));
}
});
}
fn reject_pending_permission_replies(&self, session_id: &str) {
let _ = self.require_session(session_id, |entry| {
let mut ids = Vec::new();
entry
.pending_permission_replies
.scan(|id, _| ids.push(id.clone()));
for id in ids {
let _ = entry.pending_permission_replies.remove(&id);
}
});
}
pub fn close_session(&self, session_id: &str) -> std::result::Result<(), ClientError> {
let known = self.inner().sessions.contains(session_id)
|| self.inner().closing_session_ids.contains(session_id)
|| self
.inner()
.closed_session_ids
.lock()
.iter()
.any(|id| id == session_id);
if !known {
return Err(ClientError::SessionNotFound(session_id.to_string()));
}
let _ = self
.inner()
.closing_session_ids
.insert(session_id.to_string());
let this = self.clone();
let session_id_owned = session_id.to_string();
tokio::spawn(async move {
let _ = this.close_session_internal(&session_id_owned).await;
let _ = this.inner().closing_session_ids.remove(&session_id_owned);
});
Ok(())
}
pub(crate) async fn close_session_internal(
&self,
session_id: &str,
) -> std::result::Result<(), ClientError> {
if self
.inner()
.closed_session_ids
.lock()
.iter()
.any(|id| id == session_id)
{
return Ok(());
}
self.abort_pending_session_requests(session_id);
self.reject_pending_permission_replies(session_id);
if !self.inner().sessions.contains(session_id) {
return Err(ClientError::SessionNotFound(session_id.to_string()));
}
let _ = self.inner().sessions.remove(session_id);
{
let mut closed = self.inner().closed_session_ids.lock();
closed.push_back(session_id.to_string());
while closed.len() > CLOSED_SESSION_ID_RETENTION_LIMIT {
closed.pop_front();
}
}
let response = self
.transport()
.request(
self.session_ownership(),
RequestPayload::CloseAgentSession(CloseAgentSessionRequest {
session_id: session_id.to_string(),
}),
)
.await?;
match response {
ResponsePayload::AgentSessionClosed(_) => Ok(()),
ResponsePayload::Rejected(rejected) => Err(ClientError::Kernel {
code: rejected.code,
message: rejected.message,
}),
other => Err(ClientError::Sidecar(format!(
"unexpected response to CloseAgentSession: {other:?}"
))),
}
}
pub fn get_session_events(
&self,
session_id: &str,
options: GetEventsOptions,
) -> std::result::Result<Vec<SequencedEvent>, ClientError> {
self.require_session(session_id, |entry| {
let ring = entry.event_ring.lock();
ring.iter()
.filter(|event| {
options
.since
.map(|since| event.sequence_number > since)
.unwrap_or(true)
})
.filter(|event| {
options
.method
.as_ref()
.map(|method| &event.notification.method == method)
.unwrap_or(true)
})
.cloned()
.collect()
})
}
pub async fn respond_permission(
&self,
session_id: &str,
permission_id: &str,
reply: PermissionReply,
) -> Result<JsonRpcResponse> {
let pending = self.require_session(session_id, |entry| {
entry
.pending_permission_replies
.remove(permission_id)
.map(|(_, responder)| responder)
})?;
if let Some(responder) = pending {
let _ = responder.send(reply);
return Ok(JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(JsonRpcId::Null),
result: Some(json!({
"permissionId": permission_id,
"reply": reply,
"via": "sidecar-request",
})),
error: None,
});
}
Ok(self
.send_session_request(
session_id,
LEGACY_PERMISSION_METHOD,
Some(json!({ "permissionId": permission_id, "reply": reply })),
)
.await?)
}
pub async fn set_session_mode(
&self,
session_id: &str,
mode_id: &str,
) -> Result<JsonRpcResponse> {
Ok(self
.send_session_request(
session_id,
"session/set_mode",
Some(json!({ "modeId": mode_id })),
)
.await?)
}
pub fn get_session_modes(&self, session_id: &str) -> Option<SessionModeState> {
self.require_session(session_id, |entry| entry.modes.lock().clone())
.ok()
.flatten()
}
pub async fn set_session_model(
&self,
session_id: &str,
model: &str,
) -> Result<JsonRpcResponse> {
Ok(self
.set_session_config_by_category(session_id, "model", model)
.await?)
}
pub async fn set_session_thought_level(
&self,
session_id: &str,
level: &str,
) -> Result<JsonRpcResponse> {
Ok(self
.set_session_config_by_category(session_id, "thought_level", level)
.await?)
}
pub fn get_session_config_options(&self, session_id: &str) -> Vec<SessionConfigOption> {
self.require_session(session_id, |entry| entry.config_options.lock().clone())
.unwrap_or_default()
}
pub fn get_session_capabilities(&self, session_id: &str) -> Option<AgentCapabilities> {
self.require_session(session_id, |entry| entry.capabilities.lock().clone())
.ok()
.flatten()
.filter(|caps| !agent_capabilities_is_empty(caps))
}
pub fn get_session_agent_info(&self, session_id: &str) -> Option<AgentInfo> {
self.require_session(session_id, |entry| entry.agent_info.lock().clone())
.ok()
.flatten()
}
pub async fn raw_session_send(
&self,
session_id: &str,
method: &str,
params: Option<Value>,
) -> Result<JsonRpcResponse> {
Ok(self.send_session_request(session_id, method, params).await?)
}
pub async fn raw_send(
&self,
session_id: &str,
method: &str,
params: Option<Value>,
) -> Result<JsonRpcResponse> {
self.raw_session_send(session_id, method, params).await
}
pub fn on_session_event(
&self,
session_id: &str,
) -> std::result::Result<
(Pin<Box<dyn Stream<Item = JsonRpcNotification> + Send>>, Subscription),
ClientError,
> {
let (buffered, rx) = self.require_session(session_id, |entry| {
let ring = entry.event_ring.lock();
let buffered: VecDeque<SequencedEvent> = ring
.iter()
.filter(|event| should_dispatch_to_session_event_handlers(&event.notification))
.cloned()
.collect();
(buffered, entry.event_tx.subscribe())
})?;
let stream = subscribe_with_replay(buffered, rx, i64::MIN, true);
let mapped = futures::StreamExt::map(stream, |event| event.notification);
Ok((Box::pin(mapped), Subscription::noop()))
}
pub fn on_permission_request(
&self,
session_id: &str,
) -> std::result::Result<
(Pin<Box<dyn Stream<Item = PermissionRequest> + Send>>, Subscription),
ClientError,
> {
let rx = self.require_session(session_id, |entry| entry.permission_tx.subscribe())?;
let stream = futures::stream::unfold(rx, move |mut rx| async move {
loop {
match rx.recv().await {
Ok(request) => return Some((request, rx)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
});
Ok((Box::pin(stream), Subscription::noop()))
}
pub(crate) async fn deliver_permission_request(
&self,
session_id: &str,
notification: &JsonRpcNotification,
) -> PermissionDelivery {
let is_acp = notification.method == ACP_PERMISSION_METHOD;
let Some((permission_id, request, delivered_params, responder_rx)) =
build_permission_request(notification)
else {
return PermissionDelivery::new(PermissionReply::Reject, is_acp, &Value::Null);
};
let (slot_tx, slot_rx) = tokio::sync::oneshot::channel::<PermissionReply>();
let registered = self.require_session(session_id, |entry| {
if entry.permission_tx.receiver_count() == 0 {
return false;
}
let _ = entry
.pending_permission_replies
.insert(permission_id.clone(), slot_tx);
let _ = entry.permission_tx.send(request);
true
});
match registered {
Ok(true) => {}
Ok(false) | Err(_) => {
return PermissionDelivery::new(PermissionReply::Reject, is_acp, &delivered_params)
}
}
let this = self.clone();
let session_owned = session_id.to_string();
let permission_owned = permission_id.clone();
tokio::spawn(async move {
if let Ok(reply) = responder_rx.await {
let _ = this
.respond_permission(&session_owned, &permission_owned, reply)
.await;
}
});
let timeout =
tokio::time::sleep(std::time::Duration::from_millis(PERMISSION_TIMEOUT_MS));
tokio::pin!(timeout);
let reply = tokio::select! {
reply = slot_rx => reply.unwrap_or(PermissionReply::Reject),
_ = &mut timeout => {
let _ = self.require_session(session_id, |entry| {
let _ = entry.pending_permission_replies.remove(&permission_id);
});
PermissionReply::Reject
}
};
PermissionDelivery::new(reply, is_acp, &delivered_params)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PermissionDelivery {
pub reply: PermissionReply,
pub result: Value,
}
impl PermissionDelivery {
fn new(reply: PermissionReply, is_acp: bool, params: &Value) -> Self {
let result = if is_acp {
build_acp_permission_result(reply, params)
} else {
json!({ "reply": reply })
};
Self { reply, result }
}
}