use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use serde_json::Value;
use tokio::process::Command;
use tokio::sync::mpsc;
use super::agents::{AgentConfig, resolve_binary_in_path, resolve_shell_path};
use super::jsonrpc::JsonRpcClient;
use super::message_handler::handle_incoming_messages;
use super::permissions::SafePaths;
use super::protocol::{
ClientCapabilities, ClientInfo, ContentBlock, InitializeParams, PermissionOption,
PermissionOutcome, RequestPermissionResponse, SessionNewParams, SessionPromptParams,
SessionResult, SessionUpdate,
};
#[derive(Debug, Clone, PartialEq)]
pub enum AgentStatus {
Disconnected,
Connecting,
Connected,
Error(String),
}
#[derive(Debug)]
pub enum AgentMessage {
StatusChanged(AgentStatus),
SessionUpdate(SessionUpdate),
PermissionRequest {
request_id: u64,
tool_call: Value,
options: Vec<PermissionOption>,
},
PromptComplete,
PromptStarted,
ConfigUpdate {
updates: std::collections::HashMap<String, serde_json::Value>,
reply: tokio::sync::oneshot::Sender<Result<(), String>>,
},
ClientReady(Arc<JsonRpcClient>),
AutoApproved(String),
}
pub struct Agent {
pub config: AgentConfig,
pub status: AgentStatus,
pub session_id: Option<String>,
child: Option<tokio::process::Child>,
pub client: Option<Arc<JsonRpcClient>>,
ui_tx: mpsc::UnboundedSender<AgentMessage>,
pub auto_approve: Arc<AtomicBool>,
safe_paths: SafePaths,
mcp_server_bin: PathBuf,
}
impl Agent {
pub fn new(
config: AgentConfig,
ui_tx: mpsc::UnboundedSender<AgentMessage>,
safe_paths: SafePaths,
mcp_server_bin: PathBuf,
) -> Self {
Self {
config,
status: AgentStatus::Disconnected,
session_id: None,
child: None,
client: None,
ui_tx,
auto_approve: Arc::new(AtomicBool::new(false)),
safe_paths,
mcp_server_bin,
}
}
pub async fn connect(
&mut self,
cwd: &str,
capabilities: ClientCapabilities,
extra_roots: &[String],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let base_run_command_template = self
.config
.run_command_for_platform()
.ok_or("No run command for current platform")?
.to_string();
let run_command_template = super::session::adapt_run_command_for_extra_roots(
&self.config,
&base_run_command_template,
extra_roots,
);
self.set_status(AgentStatus::Connecting);
let shell_path = resolve_shell_path();
let run_command = if resolve_binary_in_path(&run_command_template).is_none() {
if let Some(ref sp) = shell_path {
let mut tokens = run_command_template.split_whitespace();
if let Some(binary) = tokens.next() {
if let Some(abs) = super::agents::resolve_binary_in_path_str(binary, sp) {
log::info!("ACP: resolved '{binary}' to '{}'", abs.display());
let rest: String = tokens.collect::<Vec<_>>().join(" ");
if rest.is_empty() {
abs.to_string_lossy().to_string()
} else {
format!("{} {rest}", abs.to_string_lossy())
}
} else {
run_command_template.clone()
}
} else {
run_command_template.clone()
}
} else {
run_command_template.clone()
}
} else {
run_command_template.clone()
};
fn has_shell_metacharacters(s: &str) -> bool {
s.chars().any(|c| {
matches!(
c,
'|' | '&' | ';' | '$' | '`' | '(' | ')' | '>' | '<' | '!' | '{' | '}'
)
})
}
let mut cmd;
let use_direct_spawn = !has_shell_metacharacters(&run_command);
if use_direct_spawn {
let tokens =
shell_words::split(&run_command).unwrap_or_else(|_| vec![run_command.clone()]);
let (binary, args) = if tokens.is_empty() {
(run_command.clone(), vec![])
} else {
(tokens[0].clone(), tokens[1..].to_vec())
};
log::info!(
"ACP: spawning agent '{}' directly: {:?} {:?} in cwd={cwd}",
self.config.identity,
binary,
args,
);
cmd = Command::new(&binary);
cmd.args(&args);
} else {
const KNOWN_SHELLS: &[&str] =
&["sh", "bash", "zsh", "fish", "dash", "ksh", "tcsh", "csh"];
let shell = {
let raw = std::env::var("SHELL").unwrap_or_default();
let valid = !raw.is_empty()
&& raw.starts_with('/')
&& std::path::Path::new(&raw)
.file_name()
.and_then(|n| n.to_str())
.map(|name| KNOWN_SHELLS.contains(&name))
.unwrap_or(false);
if valid {
raw
} else {
if !raw.is_empty() {
log::warn!(
"ACP: SHELL env var '{}' is not an absolute path to a known shell; \
falling back to /bin/sh",
raw
);
}
"/bin/sh".to_string()
}
};
log::warn!(
"ACP: agent '{}' using shell fallback mode (SHELL -lc). \
Agent TOML files are a trust boundary — only install agents from \
trusted sources. command='{}'",
self.config.identity,
run_command,
);
cmd = Command::new(&shell);
cmd.arg("-lc").arg(&run_command);
}
cmd.current_dir(cwd)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref sp) = shell_path {
cmd.env("PATH", sp);
}
cmd.envs(&self.config.env);
cmd.env_remove("CLAUDECODE");
let mut child = match cmd.spawn() {
Ok(child) => child,
Err(e) => {
let msg = format!("Failed to spawn agent: {e}");
self.set_status(AgentStatus::Error(msg.clone()));
return Err(msg.into());
}
};
let stdin = child.stdin.take().ok_or("Failed to capture agent stdin")?;
let stdout = child
.stdout
.take()
.ok_or("Failed to capture agent stdout")?;
if let Some(stderr) = child.stderr.take() {
let identity = self.config.identity.clone();
tokio::spawn(async move {
use tokio::io::AsyncBufReadExt;
let mut reader = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
log::warn!("ACP agent [{identity}] stderr: {trimmed}");
}
}
Err(_) => break,
}
}
});
}
let mut rpc_client = JsonRpcClient::new(stdin, stdout);
let incoming_rx = rpc_client
.take_incoming()
.ok_or("Failed to take incoming channel")?;
let client = Arc::new(rpc_client);
const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
let init_params = InitializeParams {
protocol_version: 1,
client_capabilities: capabilities,
client_info: ClientInfo {
name: "par-term".to_string(),
title: "Par Term".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
};
log::info!("ACP: sending initialize request");
let init_response = match tokio::time::timeout(
HANDSHAKE_TIMEOUT,
client.request("initialize", Some(serde_json::to_value(&init_params)?)),
)
.await
{
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
let msg = format!("Initialize request failed: {e}");
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
Err(_) => {
let msg =
"Agent handshake timed out (initialize). Is the agent installed?".to_string();
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
};
if let Some(err) = init_response.error {
let msg = format!("Initialize failed: {err}");
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
log::info!("ACP: initialize succeeded");
let mcp_server = super::session::build_mcp_server_descriptor(
&self.safe_paths.config_dir,
&self.config,
&self.mcp_server_bin,
);
let session_meta =
super::session::build_session_meta(&self.config, &run_command_template, extra_roots);
let session_params = SessionNewParams {
cwd: cwd.to_string(),
mcp_servers: Some(vec![mcp_server]),
meta: session_meta,
};
log::info!(
"ACP: sending session/new (cwd={cwd}, mcp_server_bin={})",
self.mcp_server_bin.display()
);
const SESSION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
let session_response = match tokio::time::timeout(
SESSION_TIMEOUT,
client.request("session/new", Some(serde_json::to_value(&session_params)?)),
)
.await
{
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
let msg = format!("Session creation request failed: {e}");
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
Err(_) => {
let msg = "Agent handshake timed out (session/new)".to_string();
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
};
if let Some(err) = session_response.error {
let msg = format!("Session creation failed: {err}");
self.set_status(AgentStatus::Error(msg.clone()));
let _ = child.kill().await;
return Err(msg.into());
}
let session_result: SessionResult = serde_json::from_value(
session_response
.result
.ok_or("Missing result in session/new response")?,
)?;
self.session_id = Some(session_result.session_id.clone());
self.child = Some(child);
self.client = Some(Arc::clone(&client));
self.set_status(AgentStatus::Connected);
log::info!("ACP: connected, session_id={}", session_result.session_id);
let ui_tx = self.ui_tx.clone();
let handler_client = Arc::clone(&client);
let auto_approve = Arc::clone(&self.auto_approve);
let safe_paths = self.safe_paths.clone();
tokio::spawn(async move {
handle_incoming_messages(incoming_rx, handler_client, ui_tx, auto_approve, safe_paths)
.await;
});
Ok(())
}
pub async fn disconnect(&mut self) {
if let Some(ref mut child) = self.child {
let _ = child.kill().await;
}
self.child = None;
self.client = None;
self.session_id = None;
self.set_status(AgentStatus::Disconnected);
}
pub async fn send_prompt(
&self,
content: Vec<ContentBlock>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = self.client.as_ref().ok_or("Not connected")?;
let session_id = self.session_id.as_ref().ok_or("No active session")?;
let params = SessionPromptParams {
session_id: session_id.clone(),
prompt: content,
};
let response = client
.request("session/prompt", Some(serde_json::to_value(¶ms)?))
.await?;
if let Some(err) = response.error {
return Err(format!("Prompt failed: {err}").into());
}
Ok(())
}
pub async fn set_mode(
&self,
mode_id: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = self.client.as_ref().ok_or("Not connected")?;
let session_id = self.session_id.as_ref().ok_or("No active session")?;
let response = client
.request(
"session/setMode",
Some(serde_json::json!({
"sessionId": session_id,
"modeId": mode_id,
})),
)
.await?;
if let Some(err) = response.error {
return Err(format!("setMode failed: {err}").into());
}
Ok(())
}
pub async fn cancel(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = self.client.as_ref().ok_or("Not connected")?;
let session_id = self.session_id.as_ref().ok_or("No active session")?;
client
.notify(
"session/cancel",
Some(serde_json::json!({ "sessionId": session_id })),
)
.await?;
Ok(())
}
pub async fn respond_permission(
&self,
request_id: u64,
option_id: &str,
cancelled: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = self.client.as_ref().ok_or("Not connected")?;
let outcome = if cancelled {
PermissionOutcome {
outcome: "cancelled".to_string(),
option_id: None,
}
} else {
PermissionOutcome {
outcome: "selected".to_string(),
option_id: Some(option_id.to_string()),
}
};
let result = RequestPermissionResponse { outcome };
client
.respond(request_id, Some(serde_json::to_value(&result)?), None)
.await?;
Ok(())
}
fn set_status(&mut self, status: AgentStatus) {
self.status = status.clone();
let _ = self.ui_tx.send(AgentMessage::StatusChanged(status));
}
}
impl Drop for Agent {
fn drop(&mut self) {
if let Some(ref mut child) = self.child {
let _ = child.start_kill();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::time::{SystemTime, UNIX_EPOCH};
fn make_test_config() -> AgentConfig {
AgentConfig {
identity: "test.agent".to_string(),
name: "Test Agent".to_string(),
short_name: "test".to_string(),
protocol: "acp".to_string(),
r#type: "coding".to_string(),
active: Some(true),
run_command: {
let mut m = HashMap::new();
m.insert("*".to_string(), "echo test".to_string());
m
},
env: HashMap::new(),
install_command: None,
actions: HashMap::new(),
connector_installed: false,
}
}
fn make_safe_paths() -> SafePaths {
let base = std::env::temp_dir().join(format!(
"par-term-acp-agent-tests-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock should be after epoch")
.as_nanos()
));
let config_dir = base.join("config");
let shaders_dir = base.join("shaders");
std::fs::create_dir_all(&config_dir).expect("create config dir");
std::fs::create_dir_all(&shaders_dir).expect("create shaders dir");
SafePaths {
config_dir,
shaders_dir,
}
}
#[test]
fn test_agent_new_disconnected() {
let (tx, _rx) = mpsc::unbounded_channel();
let agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
assert!(matches!(agent.status, AgentStatus::Disconnected));
assert!(agent.session_id.is_none());
assert!(agent.client.is_none());
assert!(agent.child.is_none());
assert!(!agent.auto_approve.load(Ordering::Relaxed));
}
#[test]
fn test_agent_status_variants() {
let status = AgentStatus::Disconnected;
assert!(matches!(status, AgentStatus::Disconnected));
let status = AgentStatus::Connecting;
assert!(matches!(status, AgentStatus::Connecting));
let status = AgentStatus::Connected;
assert!(matches!(status, AgentStatus::Connected));
let status = AgentStatus::Error("test error".to_string());
assert!(matches!(status, AgentStatus::Error(_)));
}
#[test]
fn test_set_status_sends_message() {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
agent.set_status(AgentStatus::Connecting);
assert!(matches!(agent.status, AgentStatus::Connecting));
let msg = rx.try_recv().unwrap();
assert!(matches!(
msg,
AgentMessage::StatusChanged(AgentStatus::Connecting)
));
}
#[tokio::test]
async fn test_disconnect_clears_state() {
let (tx, _rx) = mpsc::unbounded_channel();
let mut agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
agent.session_id = Some("test-session".to_string());
agent.status = AgentStatus::Connected;
agent.disconnect().await;
assert!(matches!(agent.status, AgentStatus::Disconnected));
assert!(agent.session_id.is_none());
assert!(agent.client.is_none());
assert!(agent.child.is_none());
}
#[tokio::test]
async fn test_send_prompt_not_connected() {
let (tx, _rx) = mpsc::unbounded_channel();
let agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
let result = agent
.send_prompt(vec![ContentBlock::Text {
text: "Hello".to_string(),
}])
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_cancel_not_connected() {
let (tx, _rx) = mpsc::unbounded_channel();
let agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
let result = agent.cancel().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_respond_permission_not_connected() {
let (tx, _rx) = mpsc::unbounded_channel();
let agent = Agent::new(
make_test_config(),
tx,
make_safe_paths(),
std::path::PathBuf::from("par-term"),
);
let result = agent.respond_permission(1, "allow", false).await;
assert!(result.is_err());
}
}