1use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::atomic::AtomicBool;
21
22use serde_json::Value;
23use tokio::process::Command;
24use tokio::sync::mpsc;
25
26use super::agents::{AgentConfig, resolve_binary_in_path, resolve_shell_path};
27use super::jsonrpc::JsonRpcClient;
28use super::message_handler::handle_incoming_messages;
29use super::permissions::SafePaths;
30use super::protocol::{
31 ClientCapabilities, ClientInfo, ContentBlock, InitializeParams, PermissionOption,
32 PermissionOutcome, RequestPermissionResponse, SessionNewParams, SessionPromptParams,
33 SessionResult, SessionUpdate,
34};
35
36#[derive(Debug, Clone, PartialEq)]
42pub enum AgentStatus {
43 Disconnected,
45 Connecting,
47 Connected,
49 Error(String),
51}
52
53#[derive(Debug)]
55pub enum AgentMessage {
56 StatusChanged(AgentStatus),
58 SessionUpdate(SessionUpdate),
60 PermissionRequest {
62 request_id: u64,
63 tool_call: Value,
64 options: Vec<PermissionOption>,
65 },
66 PromptComplete,
68 PromptStarted,
70 ConfigUpdate {
72 updates: std::collections::HashMap<String, serde_json::Value>,
73 reply: tokio::sync::oneshot::Sender<Result<(), String>>,
74 },
75 ClientReady(Arc<JsonRpcClient>),
78 AutoApproved(String),
80}
81
82pub struct Agent {
88 pub config: AgentConfig,
90 pub status: AgentStatus,
92 pub session_id: Option<String>,
94 child: Option<tokio::process::Child>,
96 pub client: Option<Arc<JsonRpcClient>>,
98 ui_tx: mpsc::UnboundedSender<AgentMessage>,
100 pub auto_approve: Arc<AtomicBool>,
102 safe_paths: SafePaths,
104 mcp_server_bin: PathBuf,
106}
107
108impl Agent {
109 pub fn new(
117 config: AgentConfig,
118 ui_tx: mpsc::UnboundedSender<AgentMessage>,
119 safe_paths: SafePaths,
120 mcp_server_bin: PathBuf,
121 ) -> Self {
122 Self {
123 config,
124 status: AgentStatus::Disconnected,
125 session_id: None,
126 child: None,
127 client: None,
128 ui_tx,
129 auto_approve: Arc::new(AtomicBool::new(false)),
130 safe_paths,
131 mcp_server_bin,
132 }
133 }
134
135 pub async fn connect(
141 &mut self,
142 cwd: &str,
143 capabilities: ClientCapabilities,
144 extra_roots: &[String],
145 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
146 let base_run_command_template = self
148 .config
149 .run_command_for_platform()
150 .ok_or("No run command for current platform")?
151 .to_string();
152 let run_command_template = super::session::adapt_run_command_for_extra_roots(
153 &self.config,
154 &base_run_command_template,
155 extra_roots,
156 );
157
158 self.set_status(AgentStatus::Connecting);
159
160 let shell_path = resolve_shell_path();
170 let run_command = if resolve_binary_in_path(&run_command_template).is_none() {
171 if let Some(ref sp) = shell_path {
173 let mut tokens = run_command_template.split_whitespace();
174 if let Some(binary) = tokens.next() {
175 if let Some(abs) = super::agents::resolve_binary_in_path_str(binary, sp) {
176 log::info!("ACP: resolved '{binary}' to '{}'", abs.display());
177 let rest: String = tokens.collect::<Vec<_>>().join(" ");
178 if rest.is_empty() {
179 abs.to_string_lossy().to_string()
180 } else {
181 format!("{} {rest}", abs.to_string_lossy())
182 }
183 } else {
184 run_command_template.clone()
185 }
186 } else {
187 run_command_template.clone()
188 }
189 } else {
190 run_command_template.clone()
191 }
192 } else {
193 run_command_template.clone()
194 };
195
196 fn has_shell_metacharacters(s: &str) -> bool {
213 s.chars().any(|c| {
214 matches!(
215 c,
216 '|' | '&' | ';' | '$' | '`' | '(' | ')' | '>' | '<' | '!' | '{' | '}'
217 )
218 })
219 }
220
221 let mut cmd;
222 let use_direct_spawn = !has_shell_metacharacters(&run_command);
223
224 if use_direct_spawn {
225 let tokens =
228 shell_words::split(&run_command).unwrap_or_else(|_| vec![run_command.clone()]);
229 let (binary, args) = if tokens.is_empty() {
230 (run_command.clone(), vec![])
231 } else {
232 (tokens[0].clone(), tokens[1..].to_vec())
233 };
234 log::info!(
235 "ACP: spawning agent '{}' directly: {:?} {:?} in cwd={cwd}",
236 self.config.identity,
237 binary,
238 args,
239 );
240 cmd = Command::new(&binary);
241 cmd.args(&args);
242 } else {
243 const KNOWN_SHELLS: &[&str] =
256 &["sh", "bash", "zsh", "fish", "dash", "ksh", "tcsh", "csh"];
257
258 let shell = {
259 let raw = std::env::var("SHELL").unwrap_or_default();
260 let valid = !raw.is_empty()
261 && raw.starts_with('/')
262 && std::path::Path::new(&raw)
263 .file_name()
264 .and_then(|n| n.to_str())
265 .map(|name| KNOWN_SHELLS.contains(&name))
266 .unwrap_or(false);
267 if valid {
268 raw
269 } else {
270 if !raw.is_empty() {
271 log::warn!(
272 "ACP: SHELL env var '{}' is not an absolute path to a known shell; \
273 falling back to /bin/sh",
274 raw
275 );
276 }
277 "/bin/sh".to_string()
278 }
279 };
280
281 log::warn!(
293 "ACP: agent '{}' using shell fallback mode (SHELL -lc). \
294 Agent TOML files are a trust boundary — only install agents from \
295 trusted sources. command='{}'",
296 self.config.identity,
297 run_command,
298 );
299 cmd = Command::new(&shell);
300 cmd.arg("-lc").arg(&run_command);
301 }
302
303 cmd.current_dir(cwd)
304 .stdin(std::process::Stdio::piped())
305 .stdout(std::process::Stdio::piped())
306 .stderr(std::process::Stdio::piped());
307
308 if let Some(ref sp) = shell_path {
312 cmd.env("PATH", sp);
313 }
314 cmd.envs(&self.config.env);
315
316 cmd.env_remove("CLAUDECODE");
319
320 let mut child = match cmd.spawn() {
321 Ok(child) => child,
322 Err(e) => {
323 let msg = format!("Failed to spawn agent: {e}");
324 self.set_status(AgentStatus::Error(msg.clone()));
325 return Err(msg.into());
326 }
327 };
328
329 let stdin = child.stdin.take().ok_or("Failed to capture agent stdin")?;
330 let stdout = child
331 .stdout
332 .take()
333 .ok_or("Failed to capture agent stdout")?;
334
335 if let Some(stderr) = child.stderr.take() {
337 let identity = self.config.identity.clone();
338 tokio::spawn(async move {
339 use tokio::io::AsyncBufReadExt;
340 let mut reader = tokio::io::BufReader::new(stderr);
341 let mut line = String::new();
342 loop {
343 line.clear();
344 match reader.read_line(&mut line).await {
345 Ok(0) => break,
346 Ok(_) => {
347 let trimmed = line.trim();
348 if !trimmed.is_empty() {
349 log::warn!("ACP agent [{identity}] stderr: {trimmed}");
350 }
351 }
352 Err(_) => break,
353 }
354 }
355 });
356 }
357
358 let mut rpc_client = JsonRpcClient::new(stdin, stdout);
360 let incoming_rx = rpc_client
361 .take_incoming()
362 .ok_or("Failed to take incoming channel")?;
363 let client = Arc::new(rpc_client);
364
365 const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
367
368 let init_params = InitializeParams {
370 protocol_version: 1,
371 client_capabilities: capabilities,
372 client_info: ClientInfo {
373 name: "par-term".to_string(),
374 title: "Par Term".to_string(),
375 version: env!("CARGO_PKG_VERSION").to_string(),
376 },
377 };
378 log::info!("ACP: sending initialize request");
379 let init_response = match tokio::time::timeout(
380 HANDSHAKE_TIMEOUT,
381 client.request("initialize", Some(serde_json::to_value(&init_params)?)),
382 )
383 .await
384 {
385 Ok(Ok(resp)) => resp,
386 Ok(Err(e)) => {
387 let msg = format!("Initialize request failed: {e}");
388 self.set_status(AgentStatus::Error(msg.clone()));
389 let _ = child.kill().await;
390 return Err(msg.into());
391 }
392 Err(_) => {
393 let msg =
394 "Agent handshake timed out (initialize). Is the agent installed?".to_string();
395 self.set_status(AgentStatus::Error(msg.clone()));
396 let _ = child.kill().await;
397 return Err(msg.into());
398 }
399 };
400 if let Some(err) = init_response.error {
401 let msg = format!("Initialize failed: {err}");
402 self.set_status(AgentStatus::Error(msg.clone()));
403 let _ = child.kill().await;
404 return Err(msg.into());
405 }
406 log::info!("ACP: initialize succeeded");
407
408 let mcp_server = super::session::build_mcp_server_descriptor(
413 &self.safe_paths.config_dir,
414 &self.config,
415 &self.mcp_server_bin,
416 );
417 let session_meta =
418 super::session::build_session_meta(&self.config, &run_command_template, extra_roots);
419
420 let session_params = SessionNewParams {
421 cwd: cwd.to_string(),
422 mcp_servers: Some(vec![mcp_server]),
423 meta: session_meta,
424 };
425 log::info!(
426 "ACP: sending session/new (cwd={cwd}, mcp_server_bin={})",
427 self.mcp_server_bin.display()
428 );
429 const SESSION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
432 let session_response = match tokio::time::timeout(
433 SESSION_TIMEOUT,
434 client.request("session/new", Some(serde_json::to_value(&session_params)?)),
435 )
436 .await
437 {
438 Ok(Ok(resp)) => resp,
439 Ok(Err(e)) => {
440 let msg = format!("Session creation request failed: {e}");
441 self.set_status(AgentStatus::Error(msg.clone()));
442 let _ = child.kill().await;
443 return Err(msg.into());
444 }
445 Err(_) => {
446 let msg = "Agent handshake timed out (session/new)".to_string();
447 self.set_status(AgentStatus::Error(msg.clone()));
448 let _ = child.kill().await;
449 return Err(msg.into());
450 }
451 };
452 if let Some(err) = session_response.error {
453 let msg = format!("Session creation failed: {err}");
454 self.set_status(AgentStatus::Error(msg.clone()));
455 let _ = child.kill().await;
456 return Err(msg.into());
457 }
458
459 let session_result: SessionResult = serde_json::from_value(
460 session_response
461 .result
462 .ok_or("Missing result in session/new response")?,
463 )?;
464
465 self.session_id = Some(session_result.session_id.clone());
467 self.child = Some(child);
468 self.client = Some(Arc::clone(&client));
469 self.set_status(AgentStatus::Connected);
470 log::info!("ACP: connected, session_id={}", session_result.session_id);
471
472 let ui_tx = self.ui_tx.clone();
474 let handler_client = Arc::clone(&client);
475 let auto_approve = Arc::clone(&self.auto_approve);
476 let safe_paths = self.safe_paths.clone();
477 tokio::spawn(async move {
478 handle_incoming_messages(incoming_rx, handler_client, ui_tx, auto_approve, safe_paths)
479 .await;
480 });
481
482 Ok(())
483 }
484
485 pub async fn disconnect(&mut self) {
487 if let Some(ref mut child) = self.child {
488 let _ = child.kill().await;
489 }
490 self.child = None;
491 self.client = None;
492 self.session_id = None;
493 self.set_status(AgentStatus::Disconnected);
494 }
495
496 pub async fn send_prompt(
498 &self,
499 content: Vec<ContentBlock>,
500 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
501 let client = self.client.as_ref().ok_or("Not connected")?;
502 let session_id = self.session_id.as_ref().ok_or("No active session")?;
503
504 let params = SessionPromptParams {
505 session_id: session_id.clone(),
506 prompt: content,
507 };
508 let response = client
509 .request("session/prompt", Some(serde_json::to_value(¶ms)?))
510 .await?;
511 if let Some(err) = response.error {
512 return Err(format!("Prompt failed: {err}").into());
513 }
514 Ok(())
515 }
516
517 pub async fn set_mode(
522 &self,
523 mode_id: &str,
524 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
525 let client = self.client.as_ref().ok_or("Not connected")?;
526 let session_id = self.session_id.as_ref().ok_or("No active session")?;
527
528 let response = client
529 .request(
530 "session/setMode",
531 Some(serde_json::json!({
532 "sessionId": session_id,
533 "modeId": mode_id,
534 })),
535 )
536 .await?;
537 if let Some(err) = response.error {
538 return Err(format!("setMode failed: {err}").into());
539 }
540 Ok(())
541 }
542
543 pub async fn cancel(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
545 let client = self.client.as_ref().ok_or("Not connected")?;
546 let session_id = self.session_id.as_ref().ok_or("No active session")?;
547
548 client
549 .notify(
550 "session/cancel",
551 Some(serde_json::json!({ "sessionId": session_id })),
552 )
553 .await?;
554 Ok(())
555 }
556
557 pub async fn respond_permission(
559 &self,
560 request_id: u64,
561 option_id: &str,
562 cancelled: bool,
563 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
564 let client = self.client.as_ref().ok_or("Not connected")?;
565
566 let outcome = if cancelled {
567 PermissionOutcome {
568 outcome: "cancelled".to_string(),
569 option_id: None,
570 }
571 } else {
572 PermissionOutcome {
573 outcome: "selected".to_string(),
574 option_id: Some(option_id.to_string()),
575 }
576 };
577
578 let result = RequestPermissionResponse { outcome };
579 client
580 .respond(request_id, Some(serde_json::to_value(&result)?), None)
581 .await?;
582 Ok(())
583 }
584
585 fn set_status(&mut self, status: AgentStatus) {
587 self.status = status.clone();
588 let _ = self.ui_tx.send(AgentMessage::StatusChanged(status));
589 }
590}
591
592impl Drop for Agent {
593 fn drop(&mut self) {
594 if let Some(ref mut child) = self.child {
596 let _ = child.start_kill();
597 }
598 }
599}
600
601#[cfg(test)]
606mod tests {
607 use super::*;
608 use std::collections::HashMap;
609 use std::sync::atomic::Ordering;
610 use std::time::{SystemTime, UNIX_EPOCH};
611
612 fn make_test_config() -> AgentConfig {
613 AgentConfig {
614 identity: "test.agent".to_string(),
615 name: "Test Agent".to_string(),
616 short_name: "test".to_string(),
617 protocol: "acp".to_string(),
618 r#type: "coding".to_string(),
619 active: Some(true),
620 run_command: {
621 let mut m = HashMap::new();
622 m.insert("*".to_string(), "echo test".to_string());
623 m
624 },
625 env: HashMap::new(),
626 install_command: None,
627 actions: HashMap::new(),
628 connector_installed: false,
629 }
630 }
631
632 fn make_safe_paths() -> SafePaths {
633 let base = std::env::temp_dir().join(format!(
634 "par-term-acp-agent-tests-{}-{}",
635 std::process::id(),
636 SystemTime::now()
637 .duration_since(UNIX_EPOCH)
638 .expect("clock should be after epoch")
639 .as_nanos()
640 ));
641 let config_dir = base.join("config");
642 let shaders_dir = base.join("shaders");
643 std::fs::create_dir_all(&config_dir).expect("create config dir");
644 std::fs::create_dir_all(&shaders_dir).expect("create shaders dir");
645
646 SafePaths {
647 config_dir,
648 shaders_dir,
649 }
650 }
651
652 #[test]
653 fn test_agent_new_disconnected() {
654 let (tx, _rx) = mpsc::unbounded_channel();
655 let agent = Agent::new(
656 make_test_config(),
657 tx,
658 make_safe_paths(),
659 std::path::PathBuf::from("par-term"),
660 );
661 assert!(matches!(agent.status, AgentStatus::Disconnected));
662 assert!(agent.session_id.is_none());
663 assert!(agent.client.is_none());
664 assert!(agent.child.is_none());
665 assert!(!agent.auto_approve.load(Ordering::Relaxed));
666 }
667
668 #[test]
669 fn test_agent_status_variants() {
670 let status = AgentStatus::Disconnected;
671 assert!(matches!(status, AgentStatus::Disconnected));
672
673 let status = AgentStatus::Connecting;
674 assert!(matches!(status, AgentStatus::Connecting));
675
676 let status = AgentStatus::Connected;
677 assert!(matches!(status, AgentStatus::Connected));
678
679 let status = AgentStatus::Error("test error".to_string());
680 assert!(matches!(status, AgentStatus::Error(_)));
681 }
682
683 #[test]
684 fn test_set_status_sends_message() {
685 let (tx, mut rx) = mpsc::unbounded_channel();
686 let mut agent = Agent::new(
687 make_test_config(),
688 tx,
689 make_safe_paths(),
690 std::path::PathBuf::from("par-term"),
691 );
692
693 agent.set_status(AgentStatus::Connecting);
694 assert!(matches!(agent.status, AgentStatus::Connecting));
695
696 let msg = rx.try_recv().unwrap();
697 assert!(matches!(
698 msg,
699 AgentMessage::StatusChanged(AgentStatus::Connecting)
700 ));
701 }
702
703 #[tokio::test]
704 async fn test_disconnect_clears_state() {
705 let (tx, _rx) = mpsc::unbounded_channel();
706 let mut agent = Agent::new(
707 make_test_config(),
708 tx,
709 make_safe_paths(),
710 std::path::PathBuf::from("par-term"),
711 );
712
713 agent.session_id = Some("test-session".to_string());
715 agent.status = AgentStatus::Connected;
716
717 agent.disconnect().await;
718
719 assert!(matches!(agent.status, AgentStatus::Disconnected));
720 assert!(agent.session_id.is_none());
721 assert!(agent.client.is_none());
722 assert!(agent.child.is_none());
723 }
724
725 #[tokio::test]
726 async fn test_send_prompt_not_connected() {
727 let (tx, _rx) = mpsc::unbounded_channel();
728 let agent = Agent::new(
729 make_test_config(),
730 tx,
731 make_safe_paths(),
732 std::path::PathBuf::from("par-term"),
733 );
734
735 let result = agent
736 .send_prompt(vec![ContentBlock::Text {
737 text: "Hello".to_string(),
738 }])
739 .await;
740 assert!(result.is_err());
741 }
742
743 #[tokio::test]
744 async fn test_cancel_not_connected() {
745 let (tx, _rx) = mpsc::unbounded_channel();
746 let agent = Agent::new(
747 make_test_config(),
748 tx,
749 make_safe_paths(),
750 std::path::PathBuf::from("par-term"),
751 );
752
753 let result = agent.cancel().await;
754 assert!(result.is_err());
755 }
756
757 #[tokio::test]
758 async fn test_respond_permission_not_connected() {
759 let (tx, _rx) = mpsc::unbounded_channel();
760 let agent = Agent::new(
761 make_test_config(),
762 tx,
763 make_safe_paths(),
764 std::path::PathBuf::from("par-term"),
765 );
766
767 let result = agent.respond_permission(1, "allow", false).await;
768 assert!(result.is_err());
769 }
770}