1pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51#[cfg(windows)]
52pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(rename_all = "camelCase")]
59pub struct AcpSessionRecord {
60 pub session_id: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub name: Option<String>,
63 pub cwd: String,
64 pub workspace_id: String,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub routa_agent_id: Option<String>,
67 pub provider: Option<String>,
68 pub role: Option<String>,
69 pub mode_id: Option<String>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub model: Option<String>,
72 pub created_at: String,
73 #[serde(default)]
74 pub first_prompt_sent: bool,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub parent_session_id: Option<String>,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub specialist_id: Option<String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub specialist_system_prompt: Option<String>,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct SessionLaunchOptions {
86 pub specialist_id: Option<String>,
87 pub specialist_system_prompt: Option<String>,
88 pub allowed_native_tools: Option<Vec<String>>,
89 pub initialize_timeout_ms: Option<u64>,
90 pub provider_args: Option<Vec<String>>,
91}
92
93#[derive(Clone)]
97enum AgentProcessType {
98 Acp(Arc<AcpProcess>),
100 Claude(Arc<ClaudeCodeProcess>),
102}
103
104impl AgentProcessType {
105 async fn kill(&self) {
107 match self {
108 AgentProcessType::Acp(process) => process.kill().await,
109 AgentProcessType::Claude(process) => process.kill().await,
110 }
111 }
112}
113
114struct ManagedProcess {
116 process: AgentProcessType,
117 acp_session_id: String,
119 preset_id: String,
120 #[allow(dead_code)]
121 created_at: String,
122 trace_writer: TraceWriter,
124 #[allow(dead_code)]
126 cwd: String,
127}
128
129#[derive(Clone)]
136pub struct AcpManager {
137 sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
139 processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
141 notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
143 history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
145}
146
147impl Default for AcpManager {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153impl AcpManager {
154 pub fn rewrite_notification_session_id(
155 session_id: &str,
156 mut notification: serde_json::Value,
157 ) -> serde_json::Value {
158 if let Some(object) = notification.as_object_mut() {
159 object.insert(
160 "sessionId".to_string(),
161 serde_json::Value::String(session_id.to_string()),
162 );
163 }
164 notification
165 }
166
167 pub fn new() -> Self {
168 Self {
169 sessions: Arc::new(RwLock::new(HashMap::new())),
170 processes: Arc::new(RwLock::new(HashMap::new())),
171 notification_channels: Arc::new(RwLock::new(HashMap::new())),
172 history: Arc::new(RwLock::new(HashMap::new())),
173 }
174 }
175
176 pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
178 let sessions = self.sessions.read().await;
179 sessions.values().cloned().collect()
180 }
181
182 pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
184 let sessions = self.sessions.read().await;
185 sessions.get(session_id).cloned()
186 }
187
188 pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
191 let mut sessions = self.sessions.write().await;
192 let session = sessions.get_mut(session_id)?;
193 session.name = Some(name.to_string());
194 Some(())
195 }
196
197 pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
200 let mut sessions = self.sessions.write().await;
201 let session = sessions.get_mut(session_id)?;
202 session.routa_agent_id = Some(routa_agent_id.to_string());
203 Some(())
204 }
205
206 pub async fn delete_session(&self, session_id: &str) -> Option<()> {
209 let mut sessions = self.sessions.write().await;
210 let mut processes = self.processes.write().await;
211 let mut channels = self.notification_channels.write().await;
212 let mut history = self.history.write().await;
213
214 sessions.remove(session_id)?;
216
217 if let Some(managed) = processes.remove(session_id) {
219 let _ = managed.process.kill().await;
220 }
221
222 channels.remove(session_id);
224
225 history.remove(session_id);
227
228 Some(())
229 }
230
231 pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
234 let history = self.history.read().await;
235 history.get(session_id).cloned()
236 }
237
238 pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
243 if notification.get("childAgentId").is_some() {
245 return;
246 }
247 let mut history = self.history.write().await;
248 let entries = history
249 .entry(session_id.to_string())
250 .or_insert_with(Vec::new);
251 entries.push(notification);
252 if entries.len() > 500 {
254 let drain_count = entries.len() - 500;
255 entries.drain(0..drain_count);
256 }
257 }
258
259 pub async fn emit_session_update(
261 &self,
262 session_id: &str,
263 update: serde_json::Value,
264 ) -> Result<(), String> {
265 let message = serde_json::json!({
266 "jsonrpc": "2.0",
267 "method": "session/update",
268 "params": {
269 "sessionId": session_id,
270 "update": update,
271 }
272 });
273
274 if let Some(channel) = self
275 .notification_channels
276 .read()
277 .await
278 .get(session_id)
279 .cloned()
280 {
281 let _ = channel.send(message.clone());
282 } else {
283 let params = message
284 .get("params")
285 .cloned()
286 .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
287 self.push_to_history(
288 session_id,
289 Self::rewrite_notification_session_id(session_id, params),
290 )
291 .await;
292 }
293 Ok(())
294 }
295
296 pub async fn mark_first_prompt_sent(&self, session_id: &str) {
298 let mut sessions = self.sessions.write().await;
299 if let Some(session) = sessions.get_mut(session_id) {
300 session.first_prompt_sent = true;
301 }
302 }
303
304 #[allow(clippy::too_many_arguments)]
310 pub async fn create_session(
311 &self,
312 session_id: String,
313 cwd: String,
314 workspace_id: String,
315 provider: Option<String>,
316 role: Option<String>,
317 model: Option<String>,
318 parent_session_id: Option<String>,
319 tool_mode: Option<String>,
320 mcp_profile: Option<String>,
321 ) -> Result<(String, String), String> {
322 self.create_session_with_options(
323 session_id,
324 cwd,
325 workspace_id,
326 provider,
327 role,
328 model,
329 parent_session_id,
330 tool_mode,
331 mcp_profile,
332 SessionLaunchOptions::default(),
333 )
334 .await
335 }
336
337 fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
338 let history_manager = self.clone();
339 let history_session_id = session_id.to_string();
340 let mut history_rx = ntx.subscribe();
341 tokio::spawn(async move {
342 loop {
343 match history_rx.recv().await {
344 Ok(message) => {
345 let params = match message.get("params") {
346 Some(value) => value.clone(),
347 None => continue,
348 };
349 history_manager
350 .push_to_history(
351 &history_session_id,
352 Self::rewrite_notification_session_id(&history_session_id, params),
353 )
354 .await;
355 }
356 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
357 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
358 tracing::warn!(
359 "[AcpManager] Dropped {} session/update notifications for {}",
360 skipped,
361 history_session_id
362 );
363 }
364 }
365 }
366 });
367 }
368
369 #[allow(clippy::too_many_arguments)]
370 async fn register_managed_session(
371 &self,
372 session_id: String,
373 cwd: String,
374 workspace_id: String,
375 provider_name: String,
376 role: Option<String>,
377 model: Option<String>,
378 parent_session_id: Option<String>,
379 options: &SessionLaunchOptions,
380 process_type: AgentProcessType,
381 acp_session_id: String,
382 ntx: broadcast::Sender<serde_json::Value>,
383 ) {
384 let created_at = chrono::Utc::now().to_rfc3339();
385 let trace_writer = TraceWriter::new(&cwd);
386 let record = AcpSessionRecord {
387 session_id: session_id.clone(),
388 name: None,
389 cwd: cwd.clone(),
390 workspace_id: workspace_id.clone(),
391 routa_agent_id: None,
392 provider: Some(provider_name.clone()),
393 role: role.clone().or(Some("CRAFTER".to_string())),
394 mode_id: None,
395 model: model.clone(),
396 created_at: created_at.clone(),
397 first_prompt_sent: false,
398 parent_session_id: parent_session_id.clone(),
399 specialist_id: options.specialist_id.clone(),
400 specialist_system_prompt: options.specialist_system_prompt.clone(),
401 };
402
403 self.sessions
404 .write()
405 .await
406 .insert(session_id.clone(), record);
407 self.processes.write().await.insert(
408 session_id.clone(),
409 ManagedProcess {
410 process: process_type,
411 acp_session_id: acp_session_id.clone(),
412 preset_id: provider_name.clone(),
413 created_at,
414 trace_writer: trace_writer.clone(),
415 cwd: cwd.clone(),
416 },
417 );
418 self.notification_channels
419 .write()
420 .await
421 .insert(session_id.clone(), ntx.clone());
422 self.spawn_history_mirror(&session_id, &ntx);
423
424 let trace = TraceRecord::new(
425 &session_id,
426 TraceEventType::SessionStart,
427 Contributor::new(&provider_name, None),
428 )
429 .with_workspace_id(&workspace_id)
430 .with_metadata(
431 "role",
432 serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
433 )
434 .with_metadata("cwd", serde_json::json!(cwd));
435
436 trace_writer.append_safe(&trace).await;
437 }
438
439 #[allow(clippy::too_many_arguments)]
440 pub async fn create_session_from_inline(
441 &self,
442 session_id: String,
443 cwd: String,
444 workspace_id: String,
445 provider_name: String,
446 role: Option<String>,
447 model: Option<String>,
448 parent_session_id: Option<String>,
449 command: String,
450 args: Vec<String>,
451 options: SessionLaunchOptions,
452 ) -> Result<(String, String), String> {
453 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
454
455 let process = AcpProcess::spawn(
456 &command,
457 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
458 &cwd,
459 ntx.clone(),
460 &provider_name,
461 &session_id,
462 )
463 .await?;
464
465 process
466 .initialize_with_timeout(options.initialize_timeout_ms)
467 .await?;
468
469 let acp_session_id = process.new_session(&cwd).await?;
470 self.register_managed_session(
471 session_id.clone(),
472 cwd.clone(),
473 workspace_id.clone(),
474 provider_name.clone(),
475 role.clone(),
476 model.clone(),
477 parent_session_id.clone(),
478 &options,
479 AgentProcessType::Acp(Arc::new(process)),
480 acp_session_id.clone(),
481 ntx.clone(),
482 )
483 .await;
484
485 tracing::info!(
486 "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
487 session_id,
488 provider_name,
489 acp_session_id,
490 );
491
492 Ok((session_id, acp_session_id))
493 }
494
495 #[allow(clippy::too_many_arguments)]
496 pub async fn create_session_with_options(
497 &self,
498 session_id: String,
499 cwd: String,
500 workspace_id: String,
501 provider: Option<String>,
502 role: Option<String>,
503 model: Option<String>,
504 parent_session_id: Option<String>,
505 tool_mode: Option<String>,
506 mcp_profile: Option<String>,
507 options: SessionLaunchOptions,
508 ) -> Result<(String, String), String> {
509 let provider_name = provider.as_deref().unwrap_or("opencode");
510
511 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
513 let claude_mcp_config = if provider_name == "claude" {
514 Some(mcp_setup::build_claude_mcp_config(
515 &workspace_id,
516 &session_id,
517 tool_mode.as_deref(),
518 mcp_profile.as_deref(),
519 ))
520 } else {
521 None
522 };
523
524 let (process_type, acp_session_id) = if provider_name == "claude" {
526 let config = ClaudeCodeConfig {
528 command: "claude".to_string(),
529 cwd: cwd.clone(),
530 display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
531 permission_mode: Some("bypassPermissions".to_string()),
532 mcp_configs: claude_mcp_config.into_iter().collect(),
533 append_system_prompt: options.specialist_system_prompt.clone(),
534 allowed_tools: options.allowed_native_tools.clone(),
535 };
536
537 let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
538 let claude_session_id = claude_process
539 .session_id()
540 .await
541 .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
542
543 (
544 AgentProcessType::Claude(Arc::new(claude_process)),
545 claude_session_id,
546 )
547 } else {
548 let preset = get_preset_by_id_with_registry(provider_name).await?;
550
551 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
552 provider_name,
553 &workspace_id,
554 &session_id,
555 tool_mode.as_deref(),
556 mcp_profile.as_deref(),
557 )
558 .await?
559 {
560 tracing::info!("[AcpManager] {}", summary);
561 }
562
563 let mut extra_args: Vec<String> = preset.args.clone();
565 if let Some(provider_args) = options.provider_args.clone() {
566 extra_args.extend(provider_args);
567 }
568 if let Some(ref m) = model {
569 if !m.is_empty() {
570 extra_args.push("-m".to_string());
572 extra_args.push(m.clone());
573 }
574 }
575
576 let preset_command = resolve_preset_command(&preset);
577 let process = AcpProcess::spawn(
578 &preset_command,
579 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
580 &cwd,
581 ntx.clone(),
582 &preset.name,
583 &session_id,
584 )
585 .await?;
586
587 process
589 .initialize_with_timeout(options.initialize_timeout_ms)
590 .await?;
591
592 let agent_session_id = process.new_session(&cwd).await?;
594
595 (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
596 };
597
598 self.register_managed_session(
599 session_id.clone(),
600 cwd.clone(),
601 workspace_id.clone(),
602 provider_name.to_string(),
603 role.clone(),
604 model.clone(),
605 parent_session_id.clone(),
606 &options,
607 process_type,
608 acp_session_id.clone(),
609 ntx.clone(),
610 )
611 .await;
612
613 tracing::info!(
614 "[AcpManager] Session {} created (provider: {}, agent session: {})",
615 session_id,
616 provider_name,
617 acp_session_id,
618 );
619
620 Ok((session_id, acp_session_id))
621 }
622
623 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
625 self.mark_first_prompt_sent(session_id).await;
626
627 let (process, acp_session_id, preset_id, trace_writer) = {
628 let processes = self.processes.read().await;
629 let managed = processes
630 .get(session_id)
631 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
632 (
633 managed.process.clone(),
634 managed.acp_session_id.clone(),
635 managed.preset_id.clone(),
636 managed.trace_writer.clone(),
637 )
638 };
639
640 let is_alive = match &process {
641 AgentProcessType::Acp(p) => p.is_alive(),
642 AgentProcessType::Claude(p) => p.is_alive(),
643 };
644
645 if !is_alive {
646 return Err(format!("Agent ({}) process is not running", preset_id));
647 }
648
649 let trace = TraceRecord::new(
651 session_id,
652 TraceEventType::UserMessage,
653 Contributor::new(&preset_id, None),
654 )
655 .with_conversation(TraceConversation {
656 turn: None,
657 role: Some("user".to_string()),
658 content_preview: Some(truncate_content(text, 500)),
659 full_content: None,
660 });
661
662 trace_writer.append_safe(&trace).await;
663
664 tracing::info!(
665 target: "routa_acp_prompt",
666 session_id = %session_id,
667 preset_id = %preset_id,
668 acp_session_id = %acp_session_id,
669 prompt_len = text.len(),
670 "acp prompt start"
671 );
672
673 let result = match &process {
674 AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
675 AgentProcessType::Claude(p) => {
676 let stop_reason = p.prompt(text).await?;
677 Ok(serde_json::json!({ "stopReason": stop_reason }))
678 }
679 };
680
681 match &result {
682 Ok(_) => tracing::info!(
683 target: "routa_acp_prompt",
684 session_id = %session_id,
685 preset_id = %preset_id,
686 "acp prompt success"
687 ),
688 Err(error) => tracing::error!(
689 target: "routa_acp_prompt",
690 session_id = %session_id,
691 preset_id = %preset_id,
692 error = %error,
693 "acp prompt failed"
694 ),
695 }
696
697 result
698 }
699
700 pub async fn cancel(&self, session_id: &str) {
702 let processes = self.processes.read().await;
703 if let Some(managed) = processes.get(session_id) {
704 match &managed.process {
705 AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
706 AgentProcessType::Claude(p) => p.cancel().await,
707 }
708 }
709 }
710
711 pub async fn kill_session(&self, session_id: &str) {
713 if let Some(managed) = self.processes.write().await.remove(session_id) {
715 let trace = TraceRecord::new(
717 session_id,
718 TraceEventType::SessionEnd,
719 Contributor::new(&managed.preset_id, None),
720 );
721 managed.trace_writer.append_safe(&trace).await;
722
723 match &managed.process {
724 AgentProcessType::Acp(p) => p.kill().await,
725 AgentProcessType::Claude(p) => p.kill().await,
726 }
727 }
728 self.sessions.write().await.remove(session_id);
730 self.notification_channels.write().await.remove(session_id);
732 }
733
734 pub async fn subscribe(
737 &self,
738 session_id: &str,
739 ) -> Option<broadcast::Receiver<serde_json::Value>> {
740 let channels = self.notification_channels.read().await;
741 channels.get(session_id).map(|tx| tx.subscribe())
742 }
743
744 pub async fn is_alive(&self, session_id: &str) -> bool {
746 let processes = self.processes.read().await;
747 processes
748 .get(session_id)
749 .map(|m| match &m.process {
750 AgentProcessType::Acp(p) => p.is_alive(),
751 AgentProcessType::Claude(p) => p.is_alive(),
752 })
753 .unwrap_or(false)
754 }
755
756 pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
758 let processes = self.processes.read().await;
759 processes.get(session_id).map(|m| m.preset_id.clone())
760 }
761
762 pub async fn is_claude_session(&self, session_id: &str) -> bool {
764 let processes = self.processes.read().await;
765 processes
766 .get(session_id)
767 .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
768 .unwrap_or(false)
769 }
770
771 pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
775 let processes = self.processes.read().await;
776 let managed = processes
777 .get(session_id)
778 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
779
780 let trace = TraceRecord::new(
782 session_id,
783 TraceEventType::UserMessage,
784 Contributor::new(&managed.preset_id, None),
785 )
786 .with_conversation(TraceConversation {
787 turn: None,
788 role: Some("user".to_string()),
789 content_preview: Some(text[..text.len().min(200)].to_string()),
790 full_content: Some(text.to_string()),
791 });
792
793 managed.trace_writer.append_safe(&trace).await;
794
795 match &managed.process {
796 AgentProcessType::Claude(p) => {
797 let process = Arc::clone(p);
799 let text = text.to_string();
800 tokio::spawn(async move {
801 let _ = process.prompt(&text).await;
802 });
803 Ok(())
804 }
805 AgentProcessType::Acp(_) => {
806 Err("prompt_claude_async is only for Claude sessions".to_string())
807 }
808 }
809 }
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize)]
816pub struct AcpPreset {
817 pub id: String,
819 pub name: String,
821 pub command: String,
822 pub args: Vec<String>,
823 pub description: String,
824 #[serde(default)]
825 #[serde(skip_serializing_if = "Option::is_none")]
826 pub env_bin_override: Option<String>,
827}
828
829pub fn get_presets() -> Vec<AcpPreset> {
831 vec![
832 AcpPreset {
833 id: "opencode".to_string(),
834 name: "OpenCode".to_string(),
835 command: "opencode".to_string(),
836 args: vec!["acp".to_string()],
837 description: "OpenCode AI coding agent".to_string(),
838 env_bin_override: Some("OPENCODE_BIN".to_string()),
839 },
840 AcpPreset {
841 id: "gemini".to_string(),
842 name: "Gemini".to_string(),
843 command: "gemini".to_string(),
844 args: vec!["--experimental-acp".to_string()],
845 description: "Google Gemini CLI".to_string(),
846 env_bin_override: None,
847 },
848 AcpPreset {
849 id: "codex-acp".to_string(),
850 name: "Codex".to_string(),
851 command: "codex-acp".to_string(),
852 args: vec![],
853 description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
854 env_bin_override: Some("CODEX_ACP_BIN".to_string()),
855 },
856 AcpPreset {
857 id: "copilot".to_string(),
858 name: "GitHub Copilot".to_string(),
859 command: "copilot".to_string(),
860 args: vec![
861 "--acp".to_string(),
862 "--allow-all-tools".to_string(),
863 "--no-ask-user".to_string(),
864 ],
865 description: "GitHub Copilot CLI".to_string(),
866 env_bin_override: Some("COPILOT_BIN".to_string()),
867 },
868 AcpPreset {
869 id: "auggie".to_string(),
870 name: "Auggie".to_string(),
871 command: "auggie".to_string(),
872 args: vec!["--acp".to_string()],
873 description: "Augment Code's AI agent".to_string(),
874 env_bin_override: None,
875 },
876 AcpPreset {
877 id: "kimi".to_string(),
878 name: "Kimi".to_string(),
879 command: "kimi".to_string(),
880 args: vec!["acp".to_string()],
881 description: "Moonshot AI's Kimi CLI".to_string(),
882 env_bin_override: None,
883 },
884 AcpPreset {
885 id: "kiro".to_string(),
886 name: "Kiro".to_string(),
887 command: "kiro-cli".to_string(),
888 args: vec!["acp".to_string()],
889 description: "Amazon Kiro AI coding agent".to_string(),
890 env_bin_override: Some("KIRO_BIN".to_string()),
891 },
892 AcpPreset {
893 id: "qoder".to_string(),
894 name: "Qoder".to_string(),
895 command: "qodercli".to_string(),
896 args: vec!["--acp".to_string()],
897 description: "Qoder AI coding agent".to_string(),
898 env_bin_override: Some("QODER_BIN".to_string()),
899 },
900 AcpPreset {
901 id: "claude".to_string(),
902 name: "Claude Code".to_string(),
903 command: "claude".to_string(),
904 args: vec![],
907 description: "Anthropic Claude Code (stream-json protocol)".to_string(),
908 env_bin_override: Some("CLAUDE_BIN".to_string()),
909 },
910 ]
911}
912
913pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
919 let normalized_id = match id {
920 "codex" => "codex-acp",
921 "qodercli" => "qoder",
922 other => other,
923 };
924
925 const REGISTRY_SUFFIX: &str = "-registry";
928 if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
929 let mut preset = get_registry_preset(base_id).await?;
930 preset.id = id.to_string();
932 return Ok(preset);
933 }
934
935 if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
937 if preset.id != id {
938 preset.id = id.to_string();
939 }
940 return Ok(preset);
941 }
942
943 let mut preset = get_registry_preset(normalized_id).await?;
945 if preset.id != id {
946 preset.id = id.to_string();
947 }
948 Ok(preset)
949}
950
951async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
953 let registry: AcpRegistry = fetch_registry().await?;
954
955 let agent = registry
957 .agents
958 .into_iter()
959 .find(|a| a.id == id)
960 .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
961
962 let (command, args) = if let Some(ref npx) = agent.distribution.npx {
964 let mut args = vec!["-y".to_string(), npx.package.clone()];
965 args.extend(npx.args.clone());
966 ("npx".to_string(), args)
967 } else if let Some(ref uvx) = agent.distribution.uvx {
968 let mut args = vec![uvx.package.clone()];
969 args.extend(uvx.args.clone());
970 ("uvx".to_string(), args)
971 } else {
972 return Err(format!(
973 "Agent '{}' has no supported distribution (npx/uvx)",
974 id
975 ));
976 };
977
978 Ok(AcpPreset {
979 id: agent.id.clone(),
980 name: agent.name,
981 command,
982 args,
983 description: agent.description,
984 env_bin_override: None,
985 })
986}
987
988fn resolve_preset_command(preset: &AcpPreset) -> String {
989 if let Some(env_var) = &preset.env_bin_override {
990 if let Ok(custom_command) = std::env::var(env_var) {
991 let trimmed = custom_command.trim();
992 if !trimmed.is_empty() {
993 return trimmed.to_string();
994 }
995 }
996 }
997
998 crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
999}
1000
1001fn truncate_content(text: &str, max_len: usize) -> String {
1005 if text.chars().count() <= max_len {
1006 text.to_string()
1007 } else if max_len <= 3 {
1008 text.chars().take(max_len).collect()
1009 } else {
1010 let truncated: String = text.chars().take(max_len - 3).collect();
1011 format!("{truncated}...")
1012 }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use super::{
1018 get_preset_by_id_with_registry, get_presets, truncate_content, AcpManager, AcpSessionRecord,
1019 };
1020 use std::collections::HashMap;
1021 use std::sync::Arc;
1022 use tokio::sync::RwLock;
1023
1024 #[test]
1025 fn static_presets_include_codex_acp_for_codex_alias() {
1026 let presets = get_presets();
1027 assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1028 }
1029
1030 #[test]
1031 fn static_presets_include_qoder() {
1032 let presets = get_presets();
1033 assert!(presets.iter().any(|preset| preset.id == "qoder"));
1034 }
1035
1036 #[tokio::test]
1037 async fn qodercli_alias_resolves_to_qoder_preset() {
1038 let preset = get_preset_by_id_with_registry("qodercli")
1039 .await
1040 .expect("qodercli alias should resolve");
1041 assert_eq!(preset.id, "qodercli");
1042 assert_eq!(preset.command, "qodercli");
1043 assert_eq!(preset.args, vec!["--acp".to_string()]);
1044 }
1045
1046 #[tokio::test]
1047 async fn mark_first_prompt_sent_updates_live_session_record() {
1048 let manager = AcpManager::new();
1049 let session_id = "session-1".to_string();
1050 manager.sessions.write().await.insert(
1051 session_id.clone(),
1052 AcpSessionRecord {
1053 session_id: session_id.clone(),
1054 name: None,
1055 cwd: ".".to_string(),
1056 workspace_id: "default".to_string(),
1057 routa_agent_id: None,
1058 provider: Some("opencode".to_string()),
1059 role: Some("CRAFTER".to_string()),
1060 mode_id: None,
1061 model: None,
1062 created_at: chrono::Utc::now().to_rfc3339(),
1063 first_prompt_sent: false,
1064 parent_session_id: None,
1065 specialist_id: None,
1066 specialist_system_prompt: None,
1067 },
1068 );
1069
1070 manager.mark_first_prompt_sent(&session_id).await;
1071
1072 let session = manager.get_session(&session_id).await.expect("session");
1073 assert!(session.first_prompt_sent);
1074 }
1075
1076 #[tokio::test]
1077 async fn push_to_history_skips_parent_child_forwarding_noise() {
1078 let manager = AcpManager {
1079 sessions: Arc::new(RwLock::new(HashMap::new())),
1080 processes: Arc::new(RwLock::new(HashMap::new())),
1081 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1082 history: Arc::new(RwLock::new(HashMap::new())),
1083 };
1084
1085 manager
1086 .push_to_history(
1087 "parent",
1088 serde_json::json!({
1089 "sessionId": "parent",
1090 "childAgentId": "child-1",
1091 "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1092 }),
1093 )
1094 .await;
1095
1096 let history = manager
1097 .get_session_history("parent")
1098 .await
1099 .unwrap_or_default();
1100 assert!(history.is_empty());
1101 }
1102
1103 #[tokio::test]
1104 async fn emit_session_update_broadcasts_when_channel_exists() {
1105 let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1106 let manager = AcpManager {
1107 sessions: Arc::new(RwLock::new(HashMap::new())),
1108 processes: Arc::new(RwLock::new(HashMap::new())),
1109 notification_channels: Arc::new(RwLock::new(HashMap::from([(
1110 "session-1".to_string(),
1111 tx,
1112 )]))),
1113 history: Arc::new(RwLock::new(HashMap::new())),
1114 };
1115
1116 manager
1117 .emit_session_update(
1118 "session-1",
1119 serde_json::json!({
1120 "sessionUpdate": "turn_complete",
1121 "stopReason": "cancelled"
1122 }),
1123 )
1124 .await
1125 .expect("emit should succeed");
1126
1127 let broadcast = rx.recv().await.expect("broadcast event");
1128 assert_eq!(
1129 broadcast["params"]["update"]["sessionUpdate"].as_str(),
1130 Some("turn_complete")
1131 );
1132 assert_eq!(
1133 broadcast["params"]["update"]["stopReason"].as_str(),
1134 Some("cancelled")
1135 );
1136 }
1137
1138 #[tokio::test]
1139 async fn emit_session_update_persists_history_without_channel() {
1140 let manager = AcpManager {
1141 sessions: Arc::new(RwLock::new(HashMap::new())),
1142 processes: Arc::new(RwLock::new(HashMap::new())),
1143 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1144 history: Arc::new(RwLock::new(HashMap::new())),
1145 };
1146
1147 manager
1148 .emit_session_update(
1149 "session-1",
1150 serde_json::json!({
1151 "sessionUpdate": "turn_complete",
1152 "stopReason": "cancelled"
1153 }),
1154 )
1155 .await
1156 .expect("emit should succeed");
1157
1158 let history = manager
1159 .get_session_history("session-1")
1160 .await
1161 .expect("history should exist");
1162 assert_eq!(history.len(), 1);
1163 assert_eq!(
1164 history[0]["update"]["sessionUpdate"].as_str(),
1165 Some("turn_complete")
1166 );
1167 }
1168
1169 #[test]
1170 fn rewrite_notification_session_id_overrides_provider_session_id() {
1171 let rewritten = AcpManager::rewrite_notification_session_id(
1172 "child-session",
1173 serde_json::json!({
1174 "sessionId": "provider-session",
1175 "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1176 }),
1177 );
1178
1179 assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1180 }
1181
1182 #[test]
1183 fn truncate_content_handles_unicode_boundaries() {
1184 assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1185 assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1186 assert_eq!(truncate_content("短文本", 10), "短文本");
1187 }
1188}