#![allow(clippy::too_many_lines)]
#![allow(clippy::significant_drop_tightening)]
use crate::agent::{
AbortHandle, AbortSignal, AgentEvent, AgentSession, ToolApprovalDecision, ToolApprovalHandler,
ToolApprovalRequest,
};
use crate::agent_cx::AgentCx;
use crate::auth::AuthStorage;
use crate::compaction::ResolvedCompactionSettings;
use crate::config::Config;
use crate::error::{Error, Result};
use crate::model::{AssistantMessageEvent, ContentBlock};
use crate::models::ModelEntry;
use crate::provider::StreamOptions;
use crate::provider_metadata::provider_ids_match;
use crate::providers;
use crate::session::Session;
use crate::tools::ToolRegistry;
use asupersync::channel::oneshot;
use asupersync::runtime::RuntimeHandle;
use asupersync::sync::Mutex;
use asupersync::time::{timeout, wall_now};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, Deserialize)]
struct JsonRpcRequest {
jsonrpc: String,
id: Option<Value>,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcResponse {
jsonrpc: String,
id: Value,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<JsonRpcError>,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcNotification {
jsonrpc: String,
method: String,
params: Value,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcError {
code: i64,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Value>,
}
const PARSE_ERROR: i64 = -32700;
const INVALID_REQUEST: i64 = -32600;
const METHOD_NOT_FOUND: i64 = -32601;
const INVALID_PARAMS: i64 = -32602;
const INTERNAL_ERROR: i64 = -32603;
const SESSION_NOT_FOUND: i64 = -32001;
const PROMPT_IN_PROGRESS: i64 = -32002;
fn json_rpc_ok(id: Value, result: Value) -> String {
serde_json::to_string(&JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(result),
error: None,
})
.expect("serialize json-rpc response")
}
fn json_rpc_error(id: Value, code: i64, message: impl Into<String>) -> String {
serde_json::to_string(&JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: None,
error: Some(JsonRpcError {
code,
message: message.into(),
data: None,
}),
})
.expect("serialize json-rpc error")
}
fn json_rpc_notification(method: &str, params: Value) -> String {
serde_json::to_string(&JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
params,
})
.expect("serialize json-rpc notification")
}
type AcpSessionsMap = Arc<Mutex<HashMap<String, Arc<Mutex<AcpSessionState>>>>>;
type PendingPermissionMap = Arc<StdMutex<HashMap<String, oneshot::Sender<Value>>>>;
const ACP_PERMISSION_ALLOW_ONCE: &str = "allow-once";
const ACP_PERMISSION_REJECT_ONCE: &str = "reject-once";
const ACP_PERMISSION_TIMEOUT_MS: u64 = 120_000;
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct AcpModel {
id: String,
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
provider: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct AcpMode {
slug: String,
name: String,
description: String,
}
struct AcpSessionState {
agent_session: Option<AgentSession>,
cwd: PathBuf,
}
#[derive(Clone)]
pub struct AcpOptions {
pub config: Config,
pub available_models: Vec<ModelEntry>,
pub auth: AuthStorage,
pub runtime_handle: RuntimeHandle,
}
#[derive(Clone)]
struct AcpPermissionClient {
out_tx: std::sync::mpsc::SyncSender<String>,
pending: PendingPermissionMap,
request_counter: Arc<AtomicU64>,
timeout: Duration,
cx: AgentCx,
}
struct PendingPermissionGuard {
pending: PendingPermissionMap,
key: String,
}
impl Drop for PendingPermissionGuard {
fn drop(&mut self) {
if let Ok(mut guard) = self.pending.lock() {
guard.remove(&self.key);
}
}
}
impl AcpPermissionClient {
fn handler_for_session(&self, session_id: String) -> ToolApprovalHandler {
let client = self.clone();
Arc::new(move |request: ToolApprovalRequest| {
let client = client.clone();
let session_id = session_id.clone();
Box::pin(async move { client.request_permission(&session_id, request).await })
})
}
async fn request_permission(
&self,
session_id: &str,
request: ToolApprovalRequest,
) -> ToolApprovalDecision {
let request_id = Value::String(format!(
"pi-tool-permission-{}",
self.request_counter.fetch_add(1, Ordering::SeqCst)
));
let request_key = json_rpc_id_key(&request_id);
let (reply_tx, mut reply_rx) = oneshot::channel();
if let Ok(mut guard) = self.pending.lock() {
guard.insert(request_key.clone(), reply_tx);
} else {
return ToolApprovalDecision::deny("permission request registry unavailable");
}
let _pending_guard = PendingPermissionGuard {
pending: Arc::clone(&self.pending),
key: request_key,
};
let request_line = json_rpc_permission_request(&request_id, session_id, &request);
if self.out_tx.send(request_line).is_err() {
return ToolApprovalDecision::deny("permission request client disconnected");
}
let response = timeout(
wall_now(),
self.timeout,
Box::pin(reply_rx.recv(self.cx.cx())),
)
.await;
match response {
Ok(Ok(value)) => permission_response_to_decision(&value),
Ok(Err(_)) => ToolApprovalDecision::deny("permission response channel closed"),
Err(_) => ToolApprovalDecision::deny("permission request timed out"),
}
}
}
pub async fn run_stdio(options: AcpOptions) -> Result<()> {
let (in_tx, in_rx) = asupersync::channel::mpsc::channel::<String>(256);
let (out_tx, out_rx) = std::sync::mpsc::sync_channel::<String>(1024);
std::thread::spawn(move || {
let stdin = io::stdin();
let mut reader = io::BufReader::new(stdin.lock());
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {
let trimmed = line.trim().to_string();
if trimmed.is_empty() {
continue;
}
let mut to_send = trimmed;
loop {
match in_tx.try_send(to_send) {
Ok(()) => break,
Err(asupersync::channel::mpsc::SendError::Full(unsent)) => {
to_send = unsent;
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(_) => return,
}
}
}
}
}
});
std::thread::spawn(move || {
let stdout = io::stdout();
let mut writer = io::BufWriter::new(stdout.lock());
for line in out_rx {
if writer.write_all(line.as_bytes()).is_err() {
break;
}
if writer.write_all(b"\n").is_err() {
break;
}
if writer.flush().is_err() {
break;
}
}
});
run(options, in_rx, out_tx).await
}
async fn run(
options: AcpOptions,
mut in_rx: asupersync::channel::mpsc::Receiver<String>,
out_tx: std::sync::mpsc::SyncSender<String>,
) -> Result<()> {
let cx = AgentCx::for_current_or_request();
let sessions: AcpSessionsMap = Arc::new(Mutex::new(HashMap::new()));
let prompt_counter = Arc::new(AtomicU64::new(0));
let permission_counter = Arc::new(AtomicU64::new(0));
let pending_permissions: PendingPermissionMap = Arc::new(StdMutex::new(HashMap::new()));
let active_prompts: Arc<Mutex<HashMap<String, AbortHandle>>> =
Arc::new(Mutex::new(HashMap::new()));
let initialized = Arc::new(AtomicBool::new(false));
while let Ok(line) = in_rx.recv(&cx).await {
let raw_message: Value = match serde_json::from_str(&line) {
Ok(value) => value,
Err(err) => {
let _ = out_tx.send(json_rpc_error(
Value::Null,
PARSE_ERROR,
format!("Parse error: {err}"),
));
continue;
}
};
if raw_message.get("method").is_none() {
let _ = route_permission_response(&raw_message, &pending_permissions, &cx);
continue;
}
let request: JsonRpcRequest = match serde_json::from_value(raw_message) {
Ok(req) => req,
Err(err) => {
let _ = out_tx.send(json_rpc_error(
Value::Null,
INVALID_REQUEST,
format!("Invalid request: {err}"),
));
continue;
}
};
if request.jsonrpc != "2.0" {
if let Some(ref id) = request.id {
let _ = out_tx.send(json_rpc_error(
id.clone(),
INVALID_REQUEST,
"Expected jsonrpc version 2.0",
));
}
continue;
}
let id = request.id.clone().unwrap_or(Value::Null);
match request.method.as_str() {
"initialize" => {
let result = handle_initialize();
initialized.store(true, Ordering::SeqCst);
let _ = out_tx.send(json_rpc_ok(id, result));
}
"initialized" => {}
"shutdown" => {
let _ = out_tx.send(json_rpc_ok(id, json!(null)));
}
"exit" => {
break;
}
"session/new" => {
if !initialized.load(Ordering::SeqCst) {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_REQUEST,
"Server not initialized. Call 'initialize' first.",
));
continue;
}
let permission_client = AcpPermissionClient {
out_tx: out_tx.clone(),
pending: Arc::clone(&pending_permissions),
request_counter: Arc::clone(&permission_counter),
timeout: acp_permission_timeout(),
cx: cx.clone(),
};
match handle_session_new(&request.params, &options, Some(&permission_client)) {
Ok((session_id, state)) => {
let models: Vec<AcpModel> = options
.available_models
.iter()
.map(|entry| AcpModel {
id: entry.model.id.clone(),
name: entry.model.name.clone(),
provider: Some(entry.model.provider.clone()),
})
.collect();
let modes = vec![
AcpMode {
slug: "agent".to_string(),
name: "Agent".to_string(),
description: "Full autonomous coding agent with tool access"
.to_string(),
},
AcpMode {
slug: "chat".to_string(),
name: "Chat".to_string(),
description: "Conversational mode without tool execution"
.to_string(),
},
];
let state_arc = Arc::new(Mutex::new(state));
if let Ok(mut guard) = sessions.lock(&cx).await {
guard.insert(session_id.clone(), state_arc);
}
let _ = out_tx.send(json_rpc_ok(
id,
json!({
"sessionId": session_id,
"models": models,
"modes": modes,
}),
));
}
Err(err) => {
let _ = out_tx.send(json_rpc_error(
id,
INTERNAL_ERROR,
format!("Failed to create session: {err}"),
));
}
}
}
"session/prompt" => {
if !initialized.load(Ordering::SeqCst) {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_REQUEST,
"Server not initialized",
));
continue;
}
let session_id = request
.params
.get("sessionId")
.and_then(Value::as_str)
.map(String::from);
let Some(session_id) = session_id else {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: sessionId",
));
continue;
};
let prompt_blocks = request.params.get("prompt").and_then(Value::as_array);
let Some(prompt_blocks) = prompt_blocks else {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: prompt (expected array of ContentBlock)",
));
continue;
};
let message_text = match extract_prompt_text(prompt_blocks) {
Ok(text) => text,
Err(err) => {
let _ = out_tx.send(json_rpc_error(id, INVALID_PARAMS, err));
continue;
}
};
let session_state = {
sessions
.lock(&cx)
.await
.map_or_else(|_| None, |guard| guard.get(&session_id).cloned())
};
let Some(session_state) = session_state else {
let _ = out_tx.send(json_rpc_error(
id,
SESSION_NOT_FOUND,
format!("Session not found: {session_id}"),
));
continue;
};
{
let has_active = active_prompts
.lock(&cx)
.await
.is_ok_and(|guard| guard.contains_key(&session_id));
if has_active {
let _ = out_tx.send(json_rpc_error(
id,
PROMPT_IN_PROGRESS,
format!("Session {session_id} already has an active prompt"),
));
continue;
}
}
let _ = prompt_counter.fetch_add(1, Ordering::SeqCst);
let (abort_handle, abort_signal) = AbortHandle::new();
if let Ok(mut guard) = active_prompts.lock(&cx).await {
guard.insert(session_id.clone(), abort_handle);
}
let out_tx_prompt = out_tx.clone();
let active_prompts_cleanup = Arc::clone(&active_prompts);
let prompt_cx = cx.clone();
let prompt_session_id = session_id.clone();
let response_id = id.clone();
options.runtime_handle.spawn(async move {
let stop_reason = run_prompt(
session_state,
message_text,
abort_signal,
out_tx_prompt.clone(),
prompt_session_id.clone(),
prompt_cx.clone(),
)
.await;
if let Ok(mut guard) = active_prompts_cleanup.lock(&prompt_cx).await {
guard.remove(&prompt_session_id);
}
let _ = out_tx_prompt.send(json_rpc_ok(
response_id,
json!({ "stopReason": stop_reason }),
));
});
}
"session/cancel" => {
let session_id_opt = request
.params
.get("sessionId")
.and_then(Value::as_str)
.map(String::from);
let Some(session_id) = session_id_opt else {
if request.id.is_some() {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: sessionId",
));
}
continue;
};
if let Ok(guard) = active_prompts.lock(&cx).await {
if let Some(handle) = guard.get(&session_id) {
handle.abort();
}
}
if request.id.is_some() {
let _ = out_tx.send(json_rpc_ok(id, json!({})));
}
}
"session/list" => {
let session_list: Vec<Value> = sessions.lock(&cx).await.map_or_else(
|_| Vec::new(),
|guard| {
guard
.keys()
.map(|sid| json!({ "sessionId": sid }))
.collect()
},
);
let _ = out_tx.send(json_rpc_ok(id, json!({ "sessions": session_list })));
}
"session/load" => {
let session_id = request
.params
.get("sessionId")
.and_then(Value::as_str)
.map(String::from);
let Some(session_id) = session_id else {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: sessionId",
));
continue;
};
let exists = sessions
.lock(&cx)
.await
.is_ok_and(|guard| guard.contains_key(&session_id));
if exists {
let models: Vec<AcpModel> = options
.available_models
.iter()
.map(|entry| AcpModel {
id: entry.model.id.clone(),
name: entry.model.name.clone(),
provider: Some(entry.model.provider.clone()),
})
.collect();
let _ = out_tx.send(json_rpc_ok(
id,
json!({
"sessionId": session_id,
"models": models,
}),
));
} else {
let _ = out_tx.send(json_rpc_error(
id,
SESSION_NOT_FOUND,
format!("Session not found: {session_id}"),
));
}
}
"session/resume" => {
let session_id = request
.params
.get("sessionId")
.and_then(Value::as_str)
.map(String::from);
let Some(session_id) = session_id else {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: sessionId",
));
continue;
};
let exists = sessions
.lock(&cx)
.await
.is_ok_and(|guard| guard.contains_key(&session_id));
if exists {
let _ = out_tx.send(json_rpc_ok(
id,
json!({
"sessionId": session_id,
"resumed": true,
}),
));
} else {
let _ = out_tx.send(json_rpc_error(
id,
SESSION_NOT_FOUND,
format!("Session not found: {session_id}"),
));
}
}
"read_text_file" => {
let path_str = match request.params.get("path").and_then(Value::as_str) {
Some(p) if !p.is_empty() => p,
_ => {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing or empty required parameter: path",
));
continue;
}
};
let session_id = request.params.get("sessionId").and_then(Value::as_str);
if let Err(msg) = validate_file_path(path_str, session_id, &sessions, &cx).await {
let _ = out_tx.send(json_rpc_error(id, INVALID_PARAMS, msg));
continue;
}
let max_bytes = 10 * 1024 * 1024; match asupersync::fs::metadata(path_str).await {
Ok(meta) if meta.len() > max_bytes => {
let _ = out_tx.send(json_rpc_error(
id,
INTERNAL_ERROR,
format!(
"File too large ({} bytes). Maximum allowed via ACP is {} bytes.",
meta.len(),
max_bytes
),
));
continue;
}
_ => {}
}
match asupersync::fs::read(path_str).await {
Ok(bytes) => {
let contents = String::from_utf8_lossy(&bytes).into_owned();
let _ = out_tx.send(json_rpc_ok(id, json!({ "contents": contents })));
}
Err(err) => {
let _ = out_tx.send(json_rpc_error(
id,
INTERNAL_ERROR,
format!("Failed to read file: {err}"),
));
}
}
}
"write_text_file" => {
let path_str = match request.params.get("path").and_then(Value::as_str) {
Some(p) if !p.is_empty() => p,
_ => {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing or empty required parameter: path",
));
continue;
}
};
let Some(contents) = request.params.get("contents").and_then(Value::as_str) else {
let _ = out_tx.send(json_rpc_error(
id,
INVALID_PARAMS,
"Missing required parameter: contents",
));
continue;
};
let session_id = request.params.get("sessionId").and_then(Value::as_str);
if let Err(msg) = validate_file_path(path_str, session_id, &sessions, &cx).await {
let _ = out_tx.send(json_rpc_error(id, INVALID_PARAMS, msg));
continue;
}
match asupersync::fs::write(path_str, contents.as_bytes()).await {
Ok(()) => {
let _ = out_tx.send(json_rpc_ok(id, json!({ "success": true })));
}
Err(err) => {
let _ = out_tx.send(json_rpc_error(
id,
INTERNAL_ERROR,
format!("Failed to write file: {err}"),
));
}
}
}
_ => {
let _ = out_tx.send(json_rpc_error(
id,
METHOD_NOT_FOUND,
format!("Method not found: {}", request.method),
));
}
}
}
Ok(())
}
const fn acp_permission_timeout() -> Duration {
Duration::from_millis(ACP_PERMISSION_TIMEOUT_MS)
}
fn json_rpc_id_key(id: &Value) -> String {
serde_json::to_string(id).unwrap_or_else(|_| id.to_string())
}
fn route_permission_response(
message: &Value,
pending: &PendingPermissionMap,
cx: &AgentCx,
) -> bool {
if message.get("jsonrpc").and_then(Value::as_str) != Some("2.0") {
return false;
}
let Some(id) = message.get("id") else {
return false;
};
let key = json_rpc_id_key(id);
let response = message
.get("result")
.cloned()
.or_else(|| message.get("error").map(|error| json!({ "error": error })))
.unwrap_or(Value::Null);
let sender = pending.lock().ok().and_then(|mut guard| guard.remove(&key));
sender.is_some_and(|sender| {
let _ = sender.send(cx.cx(), response);
true
})
}
fn json_rpc_permission_request(
request_id: &Value,
session_id: &str,
request: &ToolApprovalRequest,
) -> String {
serde_json::to_string(&json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "session/request_permission",
"params": {
"sessionId": session_id,
"toolCall": {
"sessionUpdate": "tool_call_update",
"toolCallId": request.tool_call_id,
"title": request.tool_name,
"kind": classify_tool_kind(&request.tool_name),
"status": "pending",
"rawInput": request.arguments,
},
"options": [
{
"optionId": ACP_PERMISSION_ALLOW_ONCE,
"name": "Allow once",
"kind": "allow_once",
},
{
"optionId": ACP_PERMISSION_REJECT_ONCE,
"name": "Reject",
"kind": "reject_once",
},
],
},
}))
.expect("serialize json-rpc permission request")
}
fn permission_response_to_decision(response: &Value) -> ToolApprovalDecision {
if response.get("error").is_some() {
return ToolApprovalDecision::deny("permission request failed");
}
let Some(outcome) = response.get("outcome") else {
return ToolApprovalDecision::deny("permission response missing outcome");
};
match outcome.get("outcome").and_then(Value::as_str) {
Some("selected") => match outcome.get("optionId").and_then(Value::as_str) {
Some(ACP_PERMISSION_ALLOW_ONCE) => ToolApprovalDecision::Allow,
Some(ACP_PERMISSION_REJECT_ONCE) => {
ToolApprovalDecision::deny("permission rejected by client")
}
Some(_) => ToolApprovalDecision::deny("permission response selected unknown option"),
None => ToolApprovalDecision::deny("permission response selected without optionId"),
},
Some("cancelled") => ToolApprovalDecision::deny("permission request cancelled"),
Some(_) => ToolApprovalDecision::deny("permission response has unknown outcome"),
None => ToolApprovalDecision::deny("permission response outcome malformed"),
}
}
async fn validate_file_path(
path_str: &str,
session_id: Option<&str>,
sessions: &AcpSessionsMap,
cx: &AgentCx,
) -> std::result::Result<(), String> {
let resolved = if let Ok(p) = std::path::Path::new(path_str).canonicalize() {
p
} else {
let parent = std::path::Path::new(path_str).parent();
match parent.and_then(|p| p.canonicalize().ok()) {
Some(p) => p.join(
std::path::Path::new(path_str)
.file_name()
.unwrap_or_default(),
),
None => {
return Err(format!(
"Path does not exist and parent is invalid: {path_str}"
));
}
}
};
let guard = sessions
.lock(cx)
.await
.map_err(|e| format!("Lock failed: {e}"))?;
if guard.is_empty() {
return Err("No active sessions — cannot validate file path".to_string());
}
let allowed_cwds: Vec<PathBuf> = if let Some(sid) = session_id {
match guard.get(sid) {
Some(state) => {
if let Ok(s) = state.lock(cx).await {
vec![s.cwd.clone()]
} else {
return Err("Session lock failed".to_string());
}
}
None => return Err(format!("Session not found: {sid}")),
}
} else {
let mut cwds = Vec::new();
for state in guard.values() {
if let Ok(s) = state.lock(cx).await {
cwds.push(s.cwd.clone());
}
}
cwds
};
for cwd in &allowed_cwds {
if let Ok(canonical_cwd) = cwd.canonicalize() {
if resolved.starts_with(&canonical_cwd) {
return Ok(());
}
}
if resolved.starts_with(cwd) {
return Ok(());
}
}
Err(format!(
"Path '{path_str}' is outside all session working directories",
))
}
fn handle_initialize() -> Value {
let version = env!("CARGO_PKG_VERSION");
json!({
"protocolVersion": 1,
"agentInfo": {
"name": "pi-agent",
"version": version,
},
"agentCapabilities": {
"loadSession": false,
"mcpCapabilities": {
"http": false,
"sse": false,
},
"promptCapabilities": {
"audio": false,
"embeddedContext": false,
"image": false,
},
"sessionCapabilities": {},
"_meta": {
"pi.dev": {
"toolApproval": true,
"requestPermission": true,
},
},
},
"authMethods": [],
})
}
fn select_acp_model_entry(config: &Config, available_models: &[ModelEntry]) -> Option<ModelEntry> {
if let (Some(default_provider), Some(default_model)) = (
config.default_provider.as_deref(),
config.default_model.as_deref(),
) {
if let Some(entry) = available_models.iter().find(|entry| {
provider_ids_match(&entry.model.provider, default_provider)
&& entry.model.id.eq_ignore_ascii_case(default_model)
}) {
return Some(entry.clone());
}
}
if let Some(default_provider) = config.default_provider.as_deref() {
if let Some(entry) = available_models
.iter()
.find(|entry| provider_ids_match(&entry.model.provider, default_provider))
{
return Some(entry.clone());
}
}
if let Some(default_model) = config.default_model.as_deref() {
if let Some(entry) = available_models
.iter()
.find(|entry| entry.model.id.eq_ignore_ascii_case(default_model))
{
return Some(entry.clone());
}
}
available_models.first().cloned()
}
fn resolve_acp_thinking_level(
config: &Config,
model_entry: &ModelEntry,
) -> crate::model::ThinkingLevel {
let requested = config
.default_thinking_level
.as_deref()
.and_then(|value| value.parse().ok())
.unwrap_or(crate::model::ThinkingLevel::XHigh);
model_entry.clamp_thinking_level(requested)
}
fn build_acp_system_prompt(cwd: &std::path::Path, enabled_tools: &[&str]) -> String {
use std::fmt::Write as _;
let tool_descriptions = [
("read", "Read file contents"),
("bash", "Execute bash commands"),
("edit", "Make surgical edits to files"),
("write", "Write file contents"),
("grep", "Search file contents with regex"),
("find", "Find files by name pattern"),
("ls", "List directory contents"),
];
let mut prompt = String::from(
"You are a helpful AI coding assistant integrated into the user's editor via ACP (Agent Client Protocol). \
You have access to the following tools:\n\n",
);
for (name, description) in &tool_descriptions {
if enabled_tools.contains(name) {
let _ = writeln!(prompt, "- **{name}**: {description}");
}
}
prompt.push_str(
"\nUse these tools to help the user with coding tasks. \
Be concise and precise. When making file changes, explain what you're doing.\n",
);
for filename in &["pi.md", "AGENTS.md", ".pi"] {
let path = cwd.join(filename);
if path.is_file() {
if let Ok(content) = std::fs::read_to_string(&path) {
let _ = write!(prompt, "\n## {filename}\n\n{content}\n\n");
}
}
}
let date_time = chrono::Utc::now()
.format("%Y-%m-%d %H:%M:%S UTC")
.to_string();
let _ = write!(prompt, "\nCurrent date and time: {date_time}");
let _ = write!(prompt, "\nCurrent working directory: {}", cwd.display());
prompt
}
fn handle_session_new(
params: &Value,
options: &AcpOptions,
permission_client: Option<&AcpPermissionClient>,
) -> Result<(String, AcpSessionState)> {
let cwd = params.get("cwd").and_then(Value::as_str).map_or_else(
|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
PathBuf::from,
);
let mut session = Session::in_memory();
session.header.cwd = cwd.display().to_string();
let session_id = session.header.id.clone();
let enabled_tools: Vec<&str> = vec!["read", "bash", "edit", "write", "grep", "find", "ls"];
let tools = ToolRegistry::new(&enabled_tools, &cwd, Some(&options.config));
let model_entry = select_acp_model_entry(&options.config, &options.available_models)
.ok_or_else(|| Error::provider("acp", "No models available"))?;
let provider = providers::create_provider(&model_entry, None)
.map_err(|e| Error::provider("acp", e.to_string()))?;
let system_prompt = build_acp_system_prompt(&cwd, &enabled_tools);
let api_key = options
.auth
.resolve_api_key(&model_entry.model.provider, None)
.or_else(|| model_entry.api_key.clone())
.and_then(|k| {
let trimmed = k.trim();
(!trimmed.is_empty()).then(|| trimmed.to_string())
});
let stream_options = StreamOptions {
api_key,
thinking_level: Some(resolve_acp_thinking_level(&options.config, &model_entry)),
headers: model_entry.headers.clone(),
..StreamOptions::default()
};
let agent_config = crate::agent::AgentConfig {
system_prompt: Some(system_prompt),
max_tool_iterations: crate::agent::resolved_max_tool_iterations_default(),
stream_options,
block_images: options.config.image_block_images(),
fail_closed_hooks: options.config.fail_closed_hooks(),
tool_approval: permission_client
.map(|client| client.handler_for_session(session_id.clone())),
};
let agent = crate::agent::Agent::new(provider, tools, agent_config);
let session_arc = Arc::new(Mutex::new(session));
let compaction_settings = ResolvedCompactionSettings {
enabled: options.config.compaction_enabled(),
reserve_tokens: options.config.compaction_reserve_tokens(),
keep_recent_tokens: options.config.compaction_keep_recent_tokens(),
context_window_tokens: if model_entry.model.context_window == 0 {
ResolvedCompactionSettings::default().context_window_tokens
} else {
model_entry.model.context_window
},
};
let agent_session = AgentSession::new(agent, session_arc, false, compaction_settings)
.with_runtime_handle(options.runtime_handle.clone());
Ok((
session_id,
AcpSessionState {
agent_session: Some(agent_session),
cwd,
},
))
}
async fn run_prompt(
session_state: Arc<Mutex<AcpSessionState>>,
message: String,
abort_signal: AbortSignal,
out_tx: std::sync::mpsc::SyncSender<String>,
session_id: String,
cx: AgentCx,
) -> &'static str {
let event_handler = build_acp_event_handler(out_tx.clone(), session_id.clone());
let mut agent_session = {
let Ok(mut guard) = session_state.lock(&cx).await else {
return ACP_STOP_REASON_ERROR;
};
let Some(agent) = guard.agent_session.take() else {
return ACP_STOP_REASON_ERROR;
};
agent
};
let result = agent_session
.run_text_with_abort(message, Some(abort_signal), event_handler)
.await;
if let Ok(mut guard) = session_state.lock(&cx).await {
guard.agent_session = Some(agent_session);
}
match result {
Ok(msg) => map_stop_reason(msg.stop_reason),
Err(_) => ACP_STOP_REASON_ERROR,
}
}
const ACP_STOP_REASON_END_TURN: &str = "end_turn";
const ACP_STOP_REASON_MAX_TOKENS: &str = "max_tokens";
const ACP_STOP_REASON_CANCELLED: &str = "cancelled";
const ACP_STOP_REASON_ERROR: &str = "end_turn";
const fn map_stop_reason(reason: crate::model::StopReason) -> &'static str {
use crate::model::StopReason;
match reason {
StopReason::Stop | StopReason::ToolUse => ACP_STOP_REASON_END_TURN,
StopReason::Length => ACP_STOP_REASON_MAX_TOKENS,
StopReason::Aborted => ACP_STOP_REASON_CANCELLED,
StopReason::Error => ACP_STOP_REASON_ERROR,
}
}
fn extract_prompt_text(blocks: &[Value]) -> std::result::Result<String, String> {
let mut out = String::new();
for block in blocks {
let block_type = block.get("type").and_then(Value::as_str).unwrap_or("");
match block_type {
"text" => {
let Some(text) = block.get("text").and_then(Value::as_str) else {
return Err(
"Prompt block of type \"text\" missing required field \"text\"".to_string(),
);
};
if !out.is_empty() {
out.push('\n');
}
out.push_str(text);
}
"resource_link" => {
let Some(uri) = block.get("uri").and_then(Value::as_str) else {
return Err(
"Prompt block of type \"resource_link\" missing required field \"uri\""
.to_string(),
);
};
if !out.is_empty() {
out.push('\n');
}
out.push_str(uri);
}
"" => {
return Err(
"Prompt block missing required discriminator field \"type\"".to_string()
);
}
other => {
return Err(format!(
"Prompt block type \"{other}\" is not supported by this agent (advertised capabilities only allow text and resource_link)"
));
}
}
}
Ok(out)
}
fn build_acp_event_handler(
out_tx: std::sync::mpsc::SyncSender<String>,
session_id: String,
) -> impl Fn(AgentEvent) + Send + Sync + 'static {
move |event: AgentEvent| {
let update = match &event {
AgentEvent::MessageUpdate {
assistant_message_event,
..
} => match assistant_message_event {
AssistantMessageEvent::TextDelta { delta, .. } => Some(json!({
"sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": delta },
})),
AssistantMessageEvent::ThinkingDelta { delta, .. } => Some(json!({
"sessionUpdate": "agent_thought_chunk",
"content": { "type": "text", "text": delta },
})),
_ => None,
},
AgentEvent::ToolExecutionStart {
tool_call_id,
tool_name,
args,
} => Some(json!({
"sessionUpdate": "tool_call",
"toolCallId": tool_call_id,
"title": tool_name,
"kind": classify_tool_kind(tool_name),
"status": "pending",
"rawInput": args,
})),
AgentEvent::ToolExecutionUpdate {
tool_call_id,
tool_name: _,
args: _,
partial_result,
} => {
let content_text = partial_result
.content
.iter()
.filter_map(|block| match block {
ContentBlock::Text(t) => Some(t.text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
let mut update = json!({
"sessionUpdate": "tool_call_update",
"toolCallId": tool_call_id,
"status": "in_progress",
});
if !content_text.is_empty() {
update["content"] = json!([{
"type": "content",
"content": { "type": "text", "text": content_text },
}]);
}
Some(update)
}
AgentEvent::ToolExecutionEnd {
tool_call_id,
tool_name: _,
result,
is_error,
} => {
let content_text = result
.content
.iter()
.filter_map(|block| match block {
ContentBlock::Text(t) => Some(t.text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Some(json!({
"sessionUpdate": "tool_call_update",
"toolCallId": tool_call_id,
"status": if *is_error { "failed" } else { "completed" },
"content": [{
"type": "content",
"content": { "type": "text", "text": content_text },
}],
}))
}
_ => None,
};
if let Some(update) = update {
let _ = out_tx.send(json_rpc_notification(
"session/update",
json!({
"sessionId": session_id,
"update": update,
}),
));
}
}
}
fn classify_tool_kind(tool_name: &str) -> &'static str {
let lower = tool_name.to_ascii_lowercase();
if matches!(lower.as_str(), "read" | "read_text_file" | "view" | "cat") {
"read"
} else if matches!(
lower.as_str(),
"edit" | "write" | "write_text_file" | "patch" | "apply_patch" | "create"
) {
"edit"
} else if matches!(lower.as_str(), "delete" | "rm" | "remove") {
"delete"
} else if matches!(lower.as_str(), "move" | "mv" | "rename") {
"move"
} else if matches!(
lower.as_str(),
"search" | "grep" | "ripgrep" | "rg" | "find" | "glob"
) {
"search"
} else if matches!(
lower.as_str(),
"execute" | "bash" | "shell" | "run" | "exec"
) {
"execute"
} else if matches!(lower.as_str(), "fetch" | "http" | "curl" | "web_fetch") {
"fetch"
} else if matches!(lower.as_str(), "think" | "thinking") {
"think"
} else {
"other"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::{InputType, Model, ModelCost};
use asupersync::runtime::RuntimeBuilder;
use std::collections::HashMap;
fn test_model_entry(provider: &str, id: &str) -> ModelEntry {
ModelEntry {
model: Model {
id: id.to_string(),
name: id.to_string(),
api: "openai-responses".to_string(),
provider: provider.to_string(),
base_url: "https://example.invalid".to_string(),
reasoning: true,
input: vec![InputType::Text],
cost: ModelCost {
input: 0.0,
output: 0.0,
cache_read: 0.0,
cache_write: 0.0,
},
context_window: 128_000,
max_tokens: 8_192,
headers: HashMap::new(),
},
api_key: None,
headers: HashMap::new(),
auth_header: true,
compat: None,
oauth_config: None,
}
}
fn pending_permissions_empty(pending: &PendingPermissionMap) -> bool {
pending.lock().is_ok_and(|guard| guard.is_empty())
}
#[test]
fn json_rpc_ok_response_format() {
let response = json_rpc_ok(Value::Number(1.into()), json!({"key": "value"}));
let parsed: Value = serde_json::from_str(&response).expect("valid json");
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["id"], 1);
assert_eq!(parsed["result"]["key"], "value");
assert!(parsed.get("error").is_none());
}
#[test]
fn json_rpc_error_response_format() {
let response = json_rpc_error(Value::String("test-id".into()), PARSE_ERROR, "bad json");
let parsed: Value = serde_json::from_str(&response).expect("valid json");
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["id"], "test-id");
assert!(parsed.get("result").is_none());
assert_eq!(parsed["error"]["code"], PARSE_ERROR);
assert_eq!(parsed["error"]["message"], "bad json");
}
#[test]
fn json_rpc_notification_format() {
let notif = json_rpc_notification(
"session/update",
json!({
"sessionId": "sess-1",
"update": {
"sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": "hi" },
},
}),
);
let parsed: Value = serde_json::from_str(¬if).expect("valid json");
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["method"], "session/update");
assert_eq!(parsed["params"]["sessionId"], "sess-1");
assert_eq!(
parsed["params"]["update"]["sessionUpdate"],
"agent_message_chunk"
);
assert!(parsed.get("id").is_none());
}
#[test]
fn handle_initialize_returns_correct_shape() {
let result = handle_initialize();
assert_eq!(result["protocolVersion"], 1);
assert_eq!(result["agentInfo"]["name"], "pi-agent");
assert_eq!(result["agentInfo"]["version"], env!("CARGO_PKG_VERSION"));
assert_eq!(result["agentCapabilities"]["loadSession"], false);
assert_eq!(
result["agentCapabilities"]["promptCapabilities"]["audio"],
false
);
assert_eq!(
result["agentCapabilities"]["promptCapabilities"]["image"],
false
);
assert_eq!(
result["agentCapabilities"]["mcpCapabilities"]["http"],
false
);
assert_eq!(result["agentCapabilities"]["mcpCapabilities"]["sse"], false);
assert_eq!(
result["agentCapabilities"]["_meta"]["pi.dev"]["toolApproval"],
true
);
assert_eq!(
result["agentCapabilities"]["_meta"]["pi.dev"]["requestPermission"],
true
);
assert!(result["authMethods"].is_array());
assert_eq!(result["authMethods"].as_array().unwrap().len(), 0);
assert!(result.get("serverInfo").is_none());
assert!(result.get("capabilities").is_none());
}
#[test]
fn select_acp_model_entry_prefers_exact_configured_model() {
let config = Config {
default_provider: Some("anthropic".to_string()),
default_model: Some("claude-opus-4-5".to_string()),
..Config::default()
};
let available = vec![
test_model_entry("openai", "gpt-5.2"),
test_model_entry("anthropic", "claude-opus-4-5"),
];
let selected = select_acp_model_entry(&config, &available).expect("selected model");
assert_eq!(selected.model.provider, "anthropic");
assert_eq!(selected.model.id, "claude-opus-4-5");
}
#[test]
fn select_acp_model_entry_prefers_default_provider_when_model_is_unset() {
let config = Config {
default_provider: Some("anthropic".to_string()),
..Config::default()
};
let available = vec![
test_model_entry("openai", "gpt-5.2"),
test_model_entry("anthropic", "claude-sonnet-4"),
];
let selected = select_acp_model_entry(&config, &available).expect("selected model");
assert_eq!(selected.model.provider, "anthropic");
assert_eq!(selected.model.id, "claude-sonnet-4");
}
#[test]
fn select_acp_model_entry_prefers_default_model_when_provider_is_unset() {
let config = Config {
default_model: Some("gpt-5.2".to_string()),
..Config::default()
};
let available = vec![
test_model_entry("anthropic", "claude-sonnet-4"),
test_model_entry("openai", "gpt-5.2"),
];
let selected = select_acp_model_entry(&config, &available).expect("selected model");
assert_eq!(selected.model.provider, "openai");
assert_eq!(selected.model.id, "gpt-5.2");
}
#[test]
fn select_acp_model_entry_matches_provider_aliases() {
let config = Config {
default_provider: Some("gemini-cli".to_string()),
default_model: Some("gemini-2.5-pro".to_string()),
..Config::default()
};
let available = vec![
test_model_entry("openai", "gpt-5.2"),
test_model_entry("google-gemini-cli", "gemini-2.5-pro"),
];
let selected = select_acp_model_entry(&config, &available).expect("selected model");
assert_eq!(selected.model.provider, "google-gemini-cli");
assert_eq!(selected.model.id, "gemini-2.5-pro");
}
#[test]
fn select_acp_model_entry_falls_back_to_first_available_model() {
let available = vec![
test_model_entry("openai", "gpt-5.2"),
test_model_entry("anthropic", "claude-sonnet-4"),
];
let selected =
select_acp_model_entry(&Config::default(), &available).expect("selected model");
assert_eq!(selected.model.provider, "openai");
assert_eq!(selected.model.id, "gpt-5.2");
}
#[test]
fn resolve_acp_thinking_level_defaults_to_highest_supported_level() {
let config = Config::default();
let model_entry = test_model_entry("openai", "gpt-5.2");
let thinking = resolve_acp_thinking_level(&config, &model_entry);
assert_eq!(thinking, crate::model::ThinkingLevel::XHigh);
}
#[test]
fn resolve_acp_thinking_level_clamps_non_reasoning_models_to_off() {
let config = Config::default();
let mut model_entry = test_model_entry("ollama", "llama3.2");
model_entry.model.reasoning = false;
let thinking = resolve_acp_thinking_level(&config, &model_entry);
assert_eq!(thinking, crate::model::ThinkingLevel::Off);
}
#[test]
fn extract_prompt_text_concatenates_text_blocks() {
let blocks = vec![
json!({ "type": "text", "text": "first line" }),
json!({ "type": "text", "text": "second line" }),
];
let result = extract_prompt_text(&blocks).expect("extracts text");
assert_eq!(result, "first line\nsecond line");
}
#[test]
fn extract_prompt_text_appends_resource_link_uri() {
let blocks = vec![
json!({ "type": "text", "text": "see also" }),
json!({
"type": "resource_link",
"uri": "file:///tmp/notes.md",
"name": "notes.md",
}),
];
let result = extract_prompt_text(&blocks).expect("extracts text");
assert_eq!(result, "see also\nfile:///tmp/notes.md");
}
#[test]
fn extract_prompt_text_rejects_unsupported_block_type() {
let blocks = vec![json!({ "type": "image", "data": "base64..." })];
let err = extract_prompt_text(&blocks).expect_err("should reject");
assert!(err.contains("not supported"), "got: {err}");
}
#[test]
fn extract_prompt_text_rejects_text_block_without_text_field() {
let blocks = vec![json!({ "type": "text" })];
let err = extract_prompt_text(&blocks).expect_err("should reject");
assert!(
err.contains("missing required field \"text\""),
"got: {err}"
);
}
#[test]
fn extract_prompt_text_rejects_block_without_type_discriminator() {
let blocks = vec![json!({ "text": "no type" })];
let err = extract_prompt_text(&blocks).expect_err("should reject");
assert!(err.contains("missing required discriminator"), "got: {err}");
}
#[test]
fn map_stop_reason_covers_acp_values() {
use crate::model::StopReason;
assert_eq!(map_stop_reason(StopReason::Stop), "end_turn");
assert_eq!(map_stop_reason(StopReason::ToolUse), "end_turn");
assert_eq!(map_stop_reason(StopReason::Length), "max_tokens");
assert_eq!(map_stop_reason(StopReason::Aborted), "cancelled");
assert_eq!(map_stop_reason(StopReason::Error), "end_turn");
}
#[test]
fn classify_tool_kind_maps_common_names() {
assert_eq!(classify_tool_kind("read"), "read");
assert_eq!(classify_tool_kind("read_text_file"), "read");
assert_eq!(classify_tool_kind("EDIT"), "edit");
assert_eq!(classify_tool_kind("apply_patch"), "edit");
assert_eq!(classify_tool_kind("rg"), "search");
assert_eq!(classify_tool_kind("bash"), "execute");
assert_eq!(classify_tool_kind("curl"), "fetch");
assert_eq!(classify_tool_kind("rm"), "delete");
assert_eq!(classify_tool_kind("mv"), "move");
assert_eq!(classify_tool_kind("think"), "think");
assert_eq!(classify_tool_kind("playwright_screenshot"), "other");
}
#[test]
fn permission_response_approves_allow_once() {
let decision = permission_response_to_decision(&json!({
"outcome": {
"outcome": "selected",
"optionId": ACP_PERMISSION_ALLOW_ONCE,
},
}));
assert_eq!(decision, ToolApprovalDecision::Allow);
}
#[test]
fn permission_response_denies_reject_once_and_cancelled() {
let rejected = permission_response_to_decision(&json!({
"outcome": {
"outcome": "selected",
"optionId": ACP_PERMISSION_REJECT_ONCE,
},
}));
let cancelled = permission_response_to_decision(&json!({
"outcome": {
"outcome": "cancelled",
},
}));
assert!(matches!(
rejected,
ToolApprovalDecision::Deny { ref reason }
if reason.contains("rejected")
));
assert!(matches!(
cancelled,
ToolApprovalDecision::Deny { ref reason }
if reason.contains("cancelled")
));
}
#[test]
fn permission_response_malformed_is_denied() {
let cases = [
json!({}),
json!({ "outcome": { "outcome": "selected" } }),
json!({ "outcome": { "outcome": "selected", "optionId": "unknown" } }),
json!({ "outcome": { "outcome": "weird" } }),
json!({ "error": { "code": -32601, "message": "Method not found" } }),
];
for case in cases {
assert!(
matches!(
permission_response_to_decision(&case),
ToolApprovalDecision::Deny { .. }
),
"case should deny: {case}"
);
}
}
#[test]
fn permission_request_emits_json_rpc_and_accepts_routed_response() {
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
let (out_tx, out_rx) = std::sync::mpsc::sync_channel::<String>(8);
let pending = Arc::new(StdMutex::new(HashMap::new()));
let cx = AgentCx::for_testing();
let client = AcpPermissionClient {
out_tx,
pending: Arc::clone(&pending),
request_counter: Arc::new(AtomicU64::new(0)),
timeout: Duration::from_secs(1),
cx: cx.clone(),
};
let request = ToolApprovalRequest {
tool_call_id: "call-1".to_string(),
tool_name: "bash".to_string(),
arguments: json!({ "command": "echo ok" }),
};
let responder = async {
let outbound = out_rx.recv().expect("permission request");
let parsed: Value = serde_json::from_str(&outbound).expect("valid request json");
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["method"], "session/request_permission");
assert_eq!(parsed["params"]["sessionId"], "sess-1");
assert_eq!(
parsed["params"]["toolCall"]["sessionUpdate"],
"tool_call_update"
);
assert_eq!(parsed["params"]["toolCall"]["toolCallId"], "call-1");
assert_eq!(parsed["params"]["toolCall"]["kind"], "execute");
assert_eq!(parsed["params"]["toolCall"]["status"], "pending");
assert_eq!(
parsed["params"]["options"][0]["optionId"],
ACP_PERMISSION_ALLOW_ONCE
);
assert!(route_permission_response(
&json!({
"jsonrpc": "2.0",
"id": parsed["id"].clone(),
"result": {
"outcome": {
"outcome": "selected",
"optionId": ACP_PERMISSION_ALLOW_ONCE,
},
},
}),
&pending,
&cx,
));
};
let (decision, ()) =
futures::join!(client.request_permission("sess-1", request), responder);
assert_eq!(decision, ToolApprovalDecision::Allow);
assert!(pending_permissions_empty(&pending));
});
}
#[test]
fn permission_request_times_out_fail_closed() {
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
let (out_tx, _out_rx) = std::sync::mpsc::sync_channel::<String>(8);
let pending = Arc::new(StdMutex::new(HashMap::new()));
let client = AcpPermissionClient {
out_tx,
pending: Arc::clone(&pending),
request_counter: Arc::new(AtomicU64::new(0)),
timeout: Duration::from_millis(1),
cx: AgentCx::for_testing(),
};
let decision = client
.request_permission(
"sess-1",
ToolApprovalRequest {
tool_call_id: "call-1".to_string(),
tool_name: "edit".to_string(),
arguments: json!({}),
},
)
.await;
assert!(matches!(
decision,
ToolApprovalDecision::Deny { ref reason }
if reason.contains("timed out")
));
assert!(pending_permissions_empty(&pending));
});
}
#[test]
fn permission_request_client_disconnect_fail_closed() {
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
let (out_tx, out_rx) = std::sync::mpsc::sync_channel::<String>(8);
drop(out_rx);
let pending = Arc::new(StdMutex::new(HashMap::new()));
let client = AcpPermissionClient {
out_tx,
pending: Arc::clone(&pending),
request_counter: Arc::new(AtomicU64::new(0)),
timeout: Duration::from_secs(1),
cx: AgentCx::for_testing(),
};
let decision = client
.request_permission(
"sess-1",
ToolApprovalRequest {
tool_call_id: "call-1".to_string(),
tool_name: "write".to_string(),
arguments: json!({}),
},
)
.await;
assert!(matches!(
decision,
ToolApprovalDecision::Deny { ref reason }
if reason.contains("disconnected")
));
assert!(pending_permissions_empty(&pending));
});
}
}