1pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct AcpSessionRecord {
57 pub session_id: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub name: Option<String>,
60 pub cwd: String,
61 pub workspace_id: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub routa_agent_id: Option<String>,
64 pub provider: Option<String>,
65 pub role: Option<String>,
66 pub mode_id: Option<String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub model: Option<String>,
69 pub created_at: String,
70 #[serde(default)]
71 pub first_prompt_sent: bool,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub parent_session_id: Option<String>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub specialist_id: Option<String>,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub specialist_system_prompt: Option<String>,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct SessionLaunchOptions {
83 pub specialist_id: Option<String>,
84 pub specialist_system_prompt: Option<String>,
85 pub allowed_native_tools: Option<Vec<String>>,
86 pub initialize_timeout_ms: Option<u64>,
87}
88
89#[derive(Clone)]
93enum AgentProcessType {
94 Acp(Arc<AcpProcess>),
96 Claude(Arc<ClaudeCodeProcess>),
98}
99
100impl AgentProcessType {
101 async fn kill(&self) {
103 match self {
104 AgentProcessType::Acp(process) => process.kill().await,
105 AgentProcessType::Claude(process) => process.kill().await,
106 }
107 }
108}
109
110struct ManagedProcess {
112 process: AgentProcessType,
113 acp_session_id: String,
115 preset_id: String,
116 #[allow(dead_code)]
117 created_at: String,
118 trace_writer: TraceWriter,
120 #[allow(dead_code)]
122 cwd: String,
123}
124
125#[derive(Clone)]
132pub struct AcpManager {
133 sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
135 processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
137 notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
139 history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
141}
142
143impl Default for AcpManager {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149impl AcpManager {
150 pub fn rewrite_notification_session_id(
151 session_id: &str,
152 mut notification: serde_json::Value,
153 ) -> serde_json::Value {
154 if let Some(object) = notification.as_object_mut() {
155 object.insert(
156 "sessionId".to_string(),
157 serde_json::Value::String(session_id.to_string()),
158 );
159 }
160 notification
161 }
162
163 pub fn new() -> Self {
164 Self {
165 sessions: Arc::new(RwLock::new(HashMap::new())),
166 processes: Arc::new(RwLock::new(HashMap::new())),
167 notification_channels: Arc::new(RwLock::new(HashMap::new())),
168 history: Arc::new(RwLock::new(HashMap::new())),
169 }
170 }
171
172 pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
174 let sessions = self.sessions.read().await;
175 sessions.values().cloned().collect()
176 }
177
178 pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
180 let sessions = self.sessions.read().await;
181 sessions.get(session_id).cloned()
182 }
183
184 pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
187 let mut sessions = self.sessions.write().await;
188 let session = sessions.get_mut(session_id)?;
189 session.name = Some(name.to_string());
190 Some(())
191 }
192
193 pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
196 let mut sessions = self.sessions.write().await;
197 let session = sessions.get_mut(session_id)?;
198 session.routa_agent_id = Some(routa_agent_id.to_string());
199 Some(())
200 }
201
202 pub async fn delete_session(&self, session_id: &str) -> Option<()> {
205 let mut sessions = self.sessions.write().await;
206 let mut processes = self.processes.write().await;
207 let mut channels = self.notification_channels.write().await;
208 let mut history = self.history.write().await;
209
210 sessions.remove(session_id)?;
212
213 if let Some(managed) = processes.remove(session_id) {
215 let _ = managed.process.kill().await;
216 }
217
218 channels.remove(session_id);
220
221 history.remove(session_id);
223
224 Some(())
225 }
226
227 pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
230 let history = self.history.read().await;
231 history.get(session_id).cloned()
232 }
233
234 pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
239 if notification.get("childAgentId").is_some() {
241 return;
242 }
243 let mut history = self.history.write().await;
244 let entries = history
245 .entry(session_id.to_string())
246 .or_insert_with(Vec::new);
247 entries.push(notification);
248 if entries.len() > 500 {
250 let drain_count = entries.len() - 500;
251 entries.drain(0..drain_count);
252 }
253 }
254
255 pub async fn emit_session_update(
257 &self,
258 session_id: &str,
259 update: serde_json::Value,
260 ) -> Result<(), String> {
261 let message = serde_json::json!({
262 "jsonrpc": "2.0",
263 "method": "session/update",
264 "params": {
265 "sessionId": session_id,
266 "update": update,
267 }
268 });
269
270 if let Some(channel) = self
271 .notification_channels
272 .read()
273 .await
274 .get(session_id)
275 .cloned()
276 {
277 let _ = channel.send(message.clone());
278 } else {
279 let params = message
280 .get("params")
281 .cloned()
282 .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
283 self.push_to_history(
284 session_id,
285 Self::rewrite_notification_session_id(session_id, params),
286 )
287 .await;
288 }
289 Ok(())
290 }
291
292 pub async fn mark_first_prompt_sent(&self, session_id: &str) {
294 let mut sessions = self.sessions.write().await;
295 if let Some(session) = sessions.get_mut(session_id) {
296 session.first_prompt_sent = true;
297 }
298 }
299
300 #[allow(clippy::too_many_arguments)]
306 pub async fn create_session(
307 &self,
308 session_id: String,
309 cwd: String,
310 workspace_id: String,
311 provider: Option<String>,
312 role: Option<String>,
313 model: Option<String>,
314 parent_session_id: Option<String>,
315 tool_mode: Option<String>,
316 mcp_profile: Option<String>,
317 ) -> Result<(String, String), String> {
318 self.create_session_with_options(
319 session_id,
320 cwd,
321 workspace_id,
322 provider,
323 role,
324 model,
325 parent_session_id,
326 tool_mode,
327 mcp_profile,
328 SessionLaunchOptions::default(),
329 )
330 .await
331 }
332
333 #[allow(clippy::too_many_arguments)]
334 pub async fn create_session_with_options(
335 &self,
336 session_id: String,
337 cwd: String,
338 workspace_id: String,
339 provider: Option<String>,
340 role: Option<String>,
341 model: Option<String>,
342 parent_session_id: Option<String>,
343 tool_mode: Option<String>,
344 mcp_profile: Option<String>,
345 options: SessionLaunchOptions,
346 ) -> Result<(String, String), String> {
347 let provider_name = provider.as_deref().unwrap_or("opencode");
348
349 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
351 let claude_mcp_config = if provider_name == "claude" {
352 Some(mcp_setup::build_claude_mcp_config(
353 &workspace_id,
354 &session_id,
355 tool_mode.as_deref(),
356 mcp_profile.as_deref(),
357 ))
358 } else {
359 None
360 };
361
362 let (process_type, acp_session_id) = if provider_name == "claude" {
364 let config = ClaudeCodeConfig {
366 command: "claude".to_string(),
367 cwd: cwd.clone(),
368 display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
369 permission_mode: Some("bypassPermissions".to_string()),
370 mcp_configs: claude_mcp_config.into_iter().collect(),
371 append_system_prompt: options.specialist_system_prompt.clone(),
372 allowed_tools: options.allowed_native_tools.clone(),
373 };
374
375 let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
376 let claude_session_id = claude_process
377 .session_id()
378 .await
379 .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
380
381 (
382 AgentProcessType::Claude(Arc::new(claude_process)),
383 claude_session_id,
384 )
385 } else {
386 let preset = get_preset_by_id_with_registry(provider_name).await?;
388
389 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
390 provider_name,
391 &workspace_id,
392 &session_id,
393 tool_mode.as_deref(),
394 mcp_profile.as_deref(),
395 )
396 .await?
397 {
398 tracing::info!("[AcpManager] {}", summary);
399 }
400
401 let mut extra_args: Vec<String> = preset.args.clone();
403 if let Some(ref m) = model {
404 if !m.is_empty() {
405 extra_args.push("-m".to_string());
407 extra_args.push(m.clone());
408 }
409 }
410
411 let preset_command = resolve_preset_command(&preset);
412 let process = AcpProcess::spawn(
413 &preset_command,
414 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
415 &cwd,
416 ntx.clone(),
417 &preset.name,
418 &session_id,
419 )
420 .await?;
421
422 process
424 .initialize_with_timeout(options.initialize_timeout_ms)
425 .await?;
426
427 let agent_session_id = process.new_session(&cwd).await?;
429
430 (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
431 };
432
433 let trace_writer = TraceWriter::new(&cwd);
435
436 let record = AcpSessionRecord {
438 session_id: session_id.clone(),
439 name: None,
440 cwd: cwd.clone(),
441 workspace_id: workspace_id.clone(),
442 routa_agent_id: None,
443 provider: Some(provider_name.to_string()),
444 role: role.clone().or(Some("CRAFTER".to_string())),
445 mode_id: None,
446 model: model.clone(),
447 created_at: chrono::Utc::now().to_rfc3339(),
448 first_prompt_sent: false,
449 parent_session_id: parent_session_id.clone(),
450 specialist_id: options.specialist_id.clone(),
451 specialist_system_prompt: options.specialist_system_prompt.clone(),
452 };
453
454 self.sessions
455 .write()
456 .await
457 .insert(session_id.clone(), record);
458
459 self.processes.write().await.insert(
460 session_id.clone(),
461 ManagedProcess {
462 process: process_type,
463 acp_session_id: acp_session_id.clone(),
464 preset_id: provider_name.to_string(),
465 created_at: chrono::Utc::now().to_rfc3339(),
466 trace_writer: trace_writer.clone(),
467 cwd: cwd.clone(),
468 },
469 );
470
471 self.notification_channels
472 .write()
473 .await
474 .insert(session_id.clone(), ntx.clone());
475
476 let history_manager = self.clone();
479 let history_session_id = session_id.clone();
480 let mut history_rx = ntx.subscribe();
481 tokio::spawn(async move {
482 loop {
483 match history_rx.recv().await {
484 Ok(message) => {
485 let params = match message.get("params") {
486 Some(value) => value.clone(),
487 None => continue,
488 };
489 history_manager
490 .push_to_history(
491 &history_session_id,
492 Self::rewrite_notification_session_id(&history_session_id, params),
493 )
494 .await;
495 }
496 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
497 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
498 tracing::warn!(
499 "[AcpManager] Dropped {} session/update notifications for {}",
500 skipped,
501 history_session_id
502 );
503 }
504 }
505 }
506 });
507
508 let trace = TraceRecord::new(
510 &session_id,
511 TraceEventType::SessionStart,
512 Contributor::new(provider_name, None),
513 )
514 .with_workspace_id(&workspace_id)
515 .with_metadata(
516 "role",
517 serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
518 )
519 .with_metadata("cwd", serde_json::json!(cwd));
520
521 trace_writer.append_safe(&trace).await;
522
523 tracing::info!(
524 "[AcpManager] Session {} created (provider: {}, agent session: {})",
525 session_id,
526 provider_name,
527 acp_session_id,
528 );
529
530 Ok((session_id, acp_session_id))
531 }
532
533 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
535 self.mark_first_prompt_sent(session_id).await;
536
537 let (process, acp_session_id, preset_id, trace_writer) = {
538 let processes = self.processes.read().await;
539 let managed = processes
540 .get(session_id)
541 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
542 (
543 managed.process.clone(),
544 managed.acp_session_id.clone(),
545 managed.preset_id.clone(),
546 managed.trace_writer.clone(),
547 )
548 };
549
550 let is_alive = match &process {
551 AgentProcessType::Acp(p) => p.is_alive(),
552 AgentProcessType::Claude(p) => p.is_alive(),
553 };
554
555 if !is_alive {
556 return Err(format!("Agent ({}) process is not running", preset_id));
557 }
558
559 let trace = TraceRecord::new(
561 session_id,
562 TraceEventType::UserMessage,
563 Contributor::new(&preset_id, None),
564 )
565 .with_conversation(TraceConversation {
566 turn: None,
567 role: Some("user".to_string()),
568 content_preview: Some(truncate_content(text, 500)),
569 full_content: None,
570 });
571
572 trace_writer.append_safe(&trace).await;
573
574 tracing::info!(
575 target: "routa_acp_prompt",
576 session_id = %session_id,
577 preset_id = %preset_id,
578 acp_session_id = %acp_session_id,
579 prompt_len = text.len(),
580 "acp prompt start"
581 );
582
583 let result = match &process {
584 AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
585 AgentProcessType::Claude(p) => {
586 let stop_reason = p.prompt(text).await?;
587 Ok(serde_json::json!({ "stopReason": stop_reason }))
588 }
589 };
590
591 match &result {
592 Ok(_) => tracing::info!(
593 target: "routa_acp_prompt",
594 session_id = %session_id,
595 preset_id = %preset_id,
596 "acp prompt success"
597 ),
598 Err(error) => tracing::error!(
599 target: "routa_acp_prompt",
600 session_id = %session_id,
601 preset_id = %preset_id,
602 error = %error,
603 "acp prompt failed"
604 ),
605 }
606
607 result
608 }
609
610 pub async fn cancel(&self, session_id: &str) {
612 let processes = self.processes.read().await;
613 if let Some(managed) = processes.get(session_id) {
614 match &managed.process {
615 AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
616 AgentProcessType::Claude(p) => p.cancel().await,
617 }
618 }
619 }
620
621 pub async fn kill_session(&self, session_id: &str) {
623 if let Some(managed) = self.processes.write().await.remove(session_id) {
625 let trace = TraceRecord::new(
627 session_id,
628 TraceEventType::SessionEnd,
629 Contributor::new(&managed.preset_id, None),
630 );
631 managed.trace_writer.append_safe(&trace).await;
632
633 match &managed.process {
634 AgentProcessType::Acp(p) => p.kill().await,
635 AgentProcessType::Claude(p) => p.kill().await,
636 }
637 }
638 self.sessions.write().await.remove(session_id);
640 self.notification_channels.write().await.remove(session_id);
642 }
643
644 pub async fn subscribe(
647 &self,
648 session_id: &str,
649 ) -> Option<broadcast::Receiver<serde_json::Value>> {
650 let channels = self.notification_channels.read().await;
651 channels.get(session_id).map(|tx| tx.subscribe())
652 }
653
654 pub async fn is_alive(&self, session_id: &str) -> bool {
656 let processes = self.processes.read().await;
657 processes
658 .get(session_id)
659 .map(|m| match &m.process {
660 AgentProcessType::Acp(p) => p.is_alive(),
661 AgentProcessType::Claude(p) => p.is_alive(),
662 })
663 .unwrap_or(false)
664 }
665
666 pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
668 let processes = self.processes.read().await;
669 processes.get(session_id).map(|m| m.preset_id.clone())
670 }
671
672 pub async fn is_claude_session(&self, session_id: &str) -> bool {
674 let processes = self.processes.read().await;
675 processes
676 .get(session_id)
677 .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
678 .unwrap_or(false)
679 }
680
681 pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
685 let processes = self.processes.read().await;
686 let managed = processes
687 .get(session_id)
688 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
689
690 let trace = TraceRecord::new(
692 session_id,
693 TraceEventType::UserMessage,
694 Contributor::new(&managed.preset_id, None),
695 )
696 .with_conversation(TraceConversation {
697 turn: None,
698 role: Some("user".to_string()),
699 content_preview: Some(text[..text.len().min(200)].to_string()),
700 full_content: Some(text.to_string()),
701 });
702
703 managed.trace_writer.append_safe(&trace).await;
704
705 match &managed.process {
706 AgentProcessType::Claude(p) => {
707 let process = Arc::clone(p);
709 let text = text.to_string();
710 tokio::spawn(async move {
711 let _ = process.prompt(&text).await;
712 });
713 Ok(())
714 }
715 AgentProcessType::Acp(_) => {
716 Err("prompt_claude_async is only for Claude sessions".to_string())
717 }
718 }
719 }
720}
721
722#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct AcpPreset {
727 pub id: String,
729 pub name: String,
731 pub command: String,
732 pub args: Vec<String>,
733 pub description: String,
734 #[serde(default)]
735 #[serde(skip_serializing_if = "Option::is_none")]
736 pub env_bin_override: Option<String>,
737}
738
739pub fn get_presets() -> Vec<AcpPreset> {
741 vec![
742 AcpPreset {
743 id: "opencode".to_string(),
744 name: "OpenCode".to_string(),
745 command: "opencode".to_string(),
746 args: vec!["acp".to_string()],
747 description: "OpenCode AI coding agent".to_string(),
748 env_bin_override: Some("OPENCODE_BIN".to_string()),
749 },
750 AcpPreset {
751 id: "gemini".to_string(),
752 name: "Gemini".to_string(),
753 command: "gemini".to_string(),
754 args: vec!["--experimental-acp".to_string()],
755 description: "Google Gemini CLI".to_string(),
756 env_bin_override: None,
757 },
758 AcpPreset {
759 id: "codex-acp".to_string(),
760 name: "Codex".to_string(),
761 command: "codex-acp".to_string(),
762 args: vec![],
763 description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
764 env_bin_override: Some("CODEX_ACP_BIN".to_string()),
765 },
766 AcpPreset {
767 id: "copilot".to_string(),
768 name: "GitHub Copilot".to_string(),
769 command: "copilot".to_string(),
770 args: vec![
771 "--acp".to_string(),
772 "--allow-all-tools".to_string(),
773 "--no-ask-user".to_string(),
774 ],
775 description: "GitHub Copilot CLI".to_string(),
776 env_bin_override: Some("COPILOT_BIN".to_string()),
777 },
778 AcpPreset {
779 id: "auggie".to_string(),
780 name: "Auggie".to_string(),
781 command: "auggie".to_string(),
782 args: vec!["--acp".to_string()],
783 description: "Augment Code's AI agent".to_string(),
784 env_bin_override: None,
785 },
786 AcpPreset {
787 id: "kimi".to_string(),
788 name: "Kimi".to_string(),
789 command: "kimi".to_string(),
790 args: vec!["acp".to_string()],
791 description: "Moonshot AI's Kimi CLI".to_string(),
792 env_bin_override: None,
793 },
794 AcpPreset {
795 id: "kiro".to_string(),
796 name: "Kiro".to_string(),
797 command: "kiro-cli".to_string(),
798 args: vec!["acp".to_string()],
799 description: "Amazon Kiro AI coding agent".to_string(),
800 env_bin_override: Some("KIRO_BIN".to_string()),
801 },
802 AcpPreset {
803 id: "claude".to_string(),
804 name: "Claude Code".to_string(),
805 command: "claude".to_string(),
806 args: vec![],
809 description: "Anthropic Claude Code (stream-json protocol)".to_string(),
810 env_bin_override: Some("CLAUDE_BIN".to_string()),
811 },
812 ]
813}
814
815pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
821 let normalized_id = match id {
822 "codex" => "codex-acp",
823 other => other,
824 };
825
826 const REGISTRY_SUFFIX: &str = "-registry";
829 if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
830 let mut preset = get_registry_preset(base_id).await?;
831 preset.id = id.to_string();
833 return Ok(preset);
834 }
835
836 if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
838 if preset.id != id {
839 preset.id = id.to_string();
840 }
841 return Ok(preset);
842 }
843
844 let mut preset = get_registry_preset(normalized_id).await?;
846 if preset.id != id {
847 preset.id = id.to_string();
848 }
849 Ok(preset)
850}
851
852async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
854 let registry: AcpRegistry = fetch_registry().await?;
855
856 let agent = registry
858 .agents
859 .into_iter()
860 .find(|a| a.id == id)
861 .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
862
863 let (command, args) = if let Some(ref npx) = agent.distribution.npx {
865 let mut args = vec!["-y".to_string(), npx.package.clone()];
866 args.extend(npx.args.clone());
867 ("npx".to_string(), args)
868 } else if let Some(ref uvx) = agent.distribution.uvx {
869 let mut args = vec![uvx.package.clone()];
870 args.extend(uvx.args.clone());
871 ("uvx".to_string(), args)
872 } else {
873 return Err(format!(
874 "Agent '{}' has no supported distribution (npx/uvx)",
875 id
876 ));
877 };
878
879 Ok(AcpPreset {
880 id: agent.id.clone(),
881 name: agent.name,
882 command,
883 args,
884 description: agent.description,
885 env_bin_override: None,
886 })
887}
888
889fn resolve_preset_command(preset: &AcpPreset) -> String {
890 if let Some(env_var) = &preset.env_bin_override {
891 if let Ok(custom_command) = std::env::var(env_var) {
892 let trimmed = custom_command.trim();
893 if !trimmed.is_empty() {
894 return trimmed.to_string();
895 }
896 }
897 }
898
899 crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
900}
901
902fn truncate_content(text: &str, max_len: usize) -> String {
906 if text.chars().count() <= max_len {
907 text.to_string()
908 } else if max_len <= 3 {
909 text.chars().take(max_len).collect()
910 } else {
911 let truncated: String = text.chars().take(max_len - 3).collect();
912 format!("{truncated}...")
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::{get_presets, truncate_content, AcpManager, AcpSessionRecord};
919 use std::collections::HashMap;
920 use std::sync::Arc;
921 use tokio::sync::RwLock;
922
923 #[test]
924 fn static_presets_include_codex_acp_for_codex_alias() {
925 let presets = get_presets();
926 assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
927 }
928
929 #[tokio::test]
930 async fn mark_first_prompt_sent_updates_live_session_record() {
931 let manager = AcpManager::new();
932 let session_id = "session-1".to_string();
933 manager.sessions.write().await.insert(
934 session_id.clone(),
935 AcpSessionRecord {
936 session_id: session_id.clone(),
937 name: None,
938 cwd: ".".to_string(),
939 workspace_id: "default".to_string(),
940 routa_agent_id: None,
941 provider: Some("opencode".to_string()),
942 role: Some("CRAFTER".to_string()),
943 mode_id: None,
944 model: None,
945 created_at: chrono::Utc::now().to_rfc3339(),
946 first_prompt_sent: false,
947 parent_session_id: None,
948 specialist_id: None,
949 specialist_system_prompt: None,
950 },
951 );
952
953 manager.mark_first_prompt_sent(&session_id).await;
954
955 let session = manager.get_session(&session_id).await.expect("session");
956 assert!(session.first_prompt_sent);
957 }
958
959 #[tokio::test]
960 async fn push_to_history_skips_parent_child_forwarding_noise() {
961 let manager = AcpManager {
962 sessions: Arc::new(RwLock::new(HashMap::new())),
963 processes: Arc::new(RwLock::new(HashMap::new())),
964 notification_channels: Arc::new(RwLock::new(HashMap::new())),
965 history: Arc::new(RwLock::new(HashMap::new())),
966 };
967
968 manager
969 .push_to_history(
970 "parent",
971 serde_json::json!({
972 "sessionId": "parent",
973 "childAgentId": "child-1",
974 "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
975 }),
976 )
977 .await;
978
979 let history = manager
980 .get_session_history("parent")
981 .await
982 .unwrap_or_default();
983 assert!(history.is_empty());
984 }
985
986 #[tokio::test]
987 async fn emit_session_update_broadcasts_when_channel_exists() {
988 let (tx, mut rx) = tokio::sync::broadcast::channel(8);
989 let manager = AcpManager {
990 sessions: Arc::new(RwLock::new(HashMap::new())),
991 processes: Arc::new(RwLock::new(HashMap::new())),
992 notification_channels: Arc::new(RwLock::new(HashMap::from([(
993 "session-1".to_string(),
994 tx,
995 )]))),
996 history: Arc::new(RwLock::new(HashMap::new())),
997 };
998
999 manager
1000 .emit_session_update(
1001 "session-1",
1002 serde_json::json!({
1003 "sessionUpdate": "turn_complete",
1004 "stopReason": "cancelled"
1005 }),
1006 )
1007 .await
1008 .expect("emit should succeed");
1009
1010 let broadcast = rx.recv().await.expect("broadcast event");
1011 assert_eq!(
1012 broadcast["params"]["update"]["sessionUpdate"].as_str(),
1013 Some("turn_complete")
1014 );
1015 assert_eq!(
1016 broadcast["params"]["update"]["stopReason"].as_str(),
1017 Some("cancelled")
1018 );
1019 }
1020
1021 #[tokio::test]
1022 async fn emit_session_update_persists_history_without_channel() {
1023 let manager = AcpManager {
1024 sessions: Arc::new(RwLock::new(HashMap::new())),
1025 processes: Arc::new(RwLock::new(HashMap::new())),
1026 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1027 history: Arc::new(RwLock::new(HashMap::new())),
1028 };
1029
1030 manager
1031 .emit_session_update(
1032 "session-1",
1033 serde_json::json!({
1034 "sessionUpdate": "turn_complete",
1035 "stopReason": "cancelled"
1036 }),
1037 )
1038 .await
1039 .expect("emit should succeed");
1040
1041 let history = manager
1042 .get_session_history("session-1")
1043 .await
1044 .expect("history should exist");
1045 assert_eq!(history.len(), 1);
1046 assert_eq!(
1047 history[0]["update"]["sessionUpdate"].as_str(),
1048 Some("turn_complete")
1049 );
1050 }
1051
1052 #[test]
1053 fn rewrite_notification_session_id_overrides_provider_session_id() {
1054 let rewritten = AcpManager::rewrite_notification_session_id(
1055 "child-session",
1056 serde_json::json!({
1057 "sessionId": "provider-session",
1058 "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1059 }),
1060 );
1061
1062 assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1063 }
1064
1065 #[test]
1066 fn truncate_content_handles_unicode_boundaries() {
1067 assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1068 assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1069 assert_eq!(truncate_content("短文本", 10), "短文本");
1070 }
1071}