1pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::path::Path;
44use std::sync::Arc;
45
46use serde::{Deserialize, Serialize};
47use tokio::sync::{broadcast, RwLock};
48
49use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
50use process::AcpProcess;
51
52#[cfg(windows)]
53pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
54
55fn validate_session_cwd(cwd: &str) -> Result<(), String> {
56 let path = Path::new(cwd);
57 if !path.exists() {
58 return Err(format!(
59 "Invalid session cwd '{}': directory does not exist",
60 cwd
61 ));
62 }
63 if !path.is_dir() {
64 return Err(format!(
65 "Invalid session cwd '{}': path is not a directory",
66 cwd
67 ));
68 }
69 Ok(())
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct AcpSessionRecord {
78 pub session_id: String,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub name: Option<String>,
81 pub cwd: String,
82 pub workspace_id: String,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub routa_agent_id: Option<String>,
85 pub provider: Option<String>,
86 pub role: Option<String>,
87 pub mode_id: Option<String>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub model: Option<String>,
90 pub created_at: String,
91 #[serde(default)]
92 pub first_prompt_sent: bool,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub parent_session_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub specialist_id: Option<String>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub specialist_system_prompt: Option<String>,
100}
101
102#[derive(Debug, Clone, Default)]
103pub struct SessionLaunchOptions {
104 pub specialist_id: Option<String>,
105 pub specialist_system_prompt: Option<String>,
106 pub allowed_native_tools: Option<Vec<String>>,
107 pub initialize_timeout_ms: Option<u64>,
108 pub provider_args: Option<Vec<String>>,
109 pub acp_mcp_servers: Option<Vec<serde_json::Value>>,
110}
111
112#[derive(Clone)]
116enum AgentProcessType {
117 Acp(Arc<AcpProcess>),
119 Claude(Arc<ClaudeCodeProcess>),
121}
122
123impl AgentProcessType {
124 async fn kill(&self) {
126 match self {
127 AgentProcessType::Acp(process) => process.kill().await,
128 AgentProcessType::Claude(process) => process.kill().await,
129 }
130 }
131}
132
133struct ManagedProcess {
135 process: AgentProcessType,
136 acp_session_id: String,
138 preset_id: String,
139 #[allow(dead_code)]
140 created_at: String,
141 trace_writer: TraceWriter,
143 #[allow(dead_code)]
145 cwd: String,
146}
147
148#[derive(Clone)]
155pub struct AcpManager {
156 sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
158 processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
160 notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
162 history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
164}
165
166impl Default for AcpManager {
167 fn default() -> Self {
168 Self::new()
169 }
170}
171
172impl AcpManager {
173 pub fn rewrite_notification_session_id(
174 session_id: &str,
175 mut notification: serde_json::Value,
176 ) -> serde_json::Value {
177 if let Some(object) = notification.as_object_mut() {
178 object.insert(
179 "sessionId".to_string(),
180 serde_json::Value::String(session_id.to_string()),
181 );
182 }
183 notification
184 }
185
186 pub fn new() -> Self {
187 Self {
188 sessions: Arc::new(RwLock::new(HashMap::new())),
189 processes: Arc::new(RwLock::new(HashMap::new())),
190 notification_channels: Arc::new(RwLock::new(HashMap::new())),
191 history: Arc::new(RwLock::new(HashMap::new())),
192 }
193 }
194
195 pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
197 let sessions = self.sessions.read().await;
198 sessions.values().cloned().collect()
199 }
200
201 pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
203 let sessions = self.sessions.read().await;
204 sessions.get(session_id).cloned()
205 }
206
207 pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
210 let mut sessions = self.sessions.write().await;
211 let session = sessions.get_mut(session_id)?;
212 session.name = Some(name.to_string());
213 Some(())
214 }
215
216 pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
219 let mut sessions = self.sessions.write().await;
220 let session = sessions.get_mut(session_id)?;
221 session.routa_agent_id = Some(routa_agent_id.to_string());
222 Some(())
223 }
224
225 pub async fn delete_session(&self, session_id: &str) -> Option<()> {
228 let mut sessions = self.sessions.write().await;
229 let mut processes = self.processes.write().await;
230 let mut channels = self.notification_channels.write().await;
231 let mut history = self.history.write().await;
232
233 sessions.remove(session_id)?;
235
236 if let Some(managed) = processes.remove(session_id) {
238 let _ = managed.process.kill().await;
239 }
240
241 channels.remove(session_id);
243
244 history.remove(session_id);
246
247 Some(())
248 }
249
250 pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
253 let history = self.history.read().await;
254 history.get(session_id).cloned()
255 }
256
257 pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
262 if notification.get("childAgentId").is_some() {
264 return;
265 }
266 let mut history = self.history.write().await;
267 let entries = history.entry(session_id.to_string()).or_default();
268 entries.push(notification);
269 if entries.len() > 500 {
271 let drain_count = entries.len() - 500;
272 entries.drain(0..drain_count);
273 }
274 }
275
276 pub async fn emit_session_update(
278 &self,
279 session_id: &str,
280 update: serde_json::Value,
281 ) -> Result<(), String> {
282 let message = serde_json::json!({
283 "jsonrpc": "2.0",
284 "method": "session/update",
285 "params": {
286 "sessionId": session_id,
287 "update": update,
288 }
289 });
290
291 if let Some(channel) = self
292 .notification_channels
293 .read()
294 .await
295 .get(session_id)
296 .cloned()
297 {
298 let _ = channel.send(message.clone());
299 } else {
300 let params = message
301 .get("params")
302 .cloned()
303 .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
304 self.push_to_history(
305 session_id,
306 Self::rewrite_notification_session_id(session_id, params),
307 )
308 .await;
309 }
310 Ok(())
311 }
312
313 pub async fn mark_first_prompt_sent(&self, session_id: &str) {
315 let mut sessions = self.sessions.write().await;
316 if let Some(session) = sessions.get_mut(session_id) {
317 session.first_prompt_sent = true;
318 }
319 }
320
321 #[allow(clippy::too_many_arguments)]
327 pub async fn create_session(
328 &self,
329 session_id: String,
330 cwd: String,
331 workspace_id: String,
332 provider: Option<String>,
333 role: Option<String>,
334 model: Option<String>,
335 parent_session_id: Option<String>,
336 tool_mode: Option<String>,
337 mcp_profile: Option<String>,
338 ) -> Result<(String, String), String> {
339 self.create_session_with_options(
340 session_id,
341 cwd,
342 workspace_id,
343 provider,
344 role,
345 model,
346 parent_session_id,
347 tool_mode,
348 mcp_profile,
349 SessionLaunchOptions::default(),
350 )
351 .await
352 }
353
354 #[allow(clippy::too_many_arguments)]
358 pub async fn load_session_with_options(
359 &self,
360 session_id: String,
361 cwd: String,
362 workspace_id: String,
363 provider: Option<String>,
364 role: Option<String>,
365 model: Option<String>,
366 parent_session_id: Option<String>,
367 tool_mode: Option<String>,
368 mcp_profile: Option<String>,
369 provider_session_id: Option<String>,
370 options: SessionLaunchOptions,
371 ) -> Result<(String, String), String> {
372 validate_session_cwd(&cwd)?;
373 let provider_name = provider.as_deref().unwrap_or("opencode");
374 let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
375 options.acp_mcp_servers.clone().unwrap_or_else(|| {
376 mcp_setup::build_acp_http_mcp_servers(
377 &workspace_id,
378 &session_id,
379 tool_mode.as_deref(),
380 mcp_profile.as_deref(),
381 )
382 })
383 } else {
384 Vec::new()
385 };
386
387 if provider_name == "claude" {
388 return Err("Native session/load is not supported for Claude".to_string());
389 }
390
391 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
392 let preset = get_preset_by_id_with_registry(provider_name).await?;
393
394 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
395 provider_name,
396 &cwd,
397 &workspace_id,
398 &session_id,
399 tool_mode.as_deref(),
400 mcp_profile.as_deref(),
401 )
402 .await?
403 {
404 tracing::info!("[AcpManager] {}", summary);
405 }
406
407 let mut extra_args: Vec<String> = preset.args.clone();
408 if matches!(provider_name, "codex" | "codex-acp") {
409 for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
410 extra_args.push("-c".to_string());
411 extra_args.push(override_arg);
412 }
413 }
414 if let Some(provider_args) = options.provider_args.clone() {
415 extra_args.extend(provider_args);
416 }
417 if let Some(ref m) = model {
418 if !m.is_empty() {
419 extra_args.push("-m".to_string());
420 extra_args.push(m.clone());
421 }
422 }
423
424 let preset_command = resolve_preset_command(&preset);
425 let process = AcpProcess::spawn(
426 &preset_command,
427 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
428 &cwd,
429 ntx.clone(),
430 &preset.name,
431 &session_id,
432 )
433 .await?;
434
435 process
436 .initialize_with_timeout(options.initialize_timeout_ms)
437 .await?;
438
439 let resolved_provider_session_id =
440 provider_session_id.unwrap_or_else(|| session_id.clone());
441 let acp_session_id = process
442 .load_session(&resolved_provider_session_id, &cwd, &acp_mcp_servers)
443 .await?;
444
445 self.register_managed_session(
446 session_id.clone(),
447 cwd.clone(),
448 workspace_id.clone(),
449 provider_name.to_string(),
450 role.clone(),
451 model.clone(),
452 parent_session_id.clone(),
453 &options,
454 AgentProcessType::Acp(Arc::new(process)),
455 acp_session_id.clone(),
456 ntx.clone(),
457 )
458 .await;
459
460 tracing::info!(
461 "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
462 session_id,
463 provider_name,
464 acp_session_id,
465 );
466
467 Ok((session_id, acp_session_id))
468 }
469
470 #[allow(clippy::too_many_arguments)]
471 pub async fn load_session(
472 &self,
473 session_id: String,
474 cwd: String,
475 workspace_id: String,
476 provider: Option<String>,
477 role: Option<String>,
478 model: Option<String>,
479 parent_session_id: Option<String>,
480 tool_mode: Option<String>,
481 mcp_profile: Option<String>,
482 provider_session_id: Option<String>,
483 ) -> Result<(String, String), String> {
484 self.load_session_with_options(
485 session_id,
486 cwd,
487 workspace_id,
488 provider,
489 role,
490 model,
491 parent_session_id,
492 tool_mode,
493 mcp_profile,
494 provider_session_id,
495 SessionLaunchOptions::default(),
496 )
497 .await
498 }
499
500 fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
501 let history_manager = self.clone();
502 let history_session_id = session_id.to_string();
503 let mut history_rx = ntx.subscribe();
504 tokio::spawn(async move {
505 loop {
506 match history_rx.recv().await {
507 Ok(message) => {
508 let params = match message.get("params") {
509 Some(value) => value.clone(),
510 None => continue,
511 };
512 history_manager
513 .push_to_history(
514 &history_session_id,
515 Self::rewrite_notification_session_id(&history_session_id, params),
516 )
517 .await;
518 }
519 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
520 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
521 tracing::warn!(
522 "[AcpManager] Dropped {} session/update notifications for {}",
523 skipped,
524 history_session_id
525 );
526 }
527 }
528 }
529 });
530 }
531
532 #[allow(clippy::too_many_arguments)]
533 async fn register_managed_session(
534 &self,
535 session_id: String,
536 cwd: String,
537 workspace_id: String,
538 provider_name: String,
539 role: Option<String>,
540 model: Option<String>,
541 parent_session_id: Option<String>,
542 options: &SessionLaunchOptions,
543 process_type: AgentProcessType,
544 acp_session_id: String,
545 ntx: broadcast::Sender<serde_json::Value>,
546 ) {
547 let created_at = chrono::Utc::now().to_rfc3339();
548 let trace_writer = TraceWriter::new(&cwd);
549 let record = AcpSessionRecord {
550 session_id: session_id.clone(),
551 name: None,
552 cwd: cwd.clone(),
553 workspace_id: workspace_id.clone(),
554 routa_agent_id: None,
555 provider: Some(provider_name.clone()),
556 role: role.clone().or(Some("CRAFTER".to_string())),
557 mode_id: None,
558 model: model.clone(),
559 created_at: created_at.clone(),
560 first_prompt_sent: false,
561 parent_session_id: parent_session_id.clone(),
562 specialist_id: options.specialist_id.clone(),
563 specialist_system_prompt: options.specialist_system_prompt.clone(),
564 };
565
566 self.sessions
567 .write()
568 .await
569 .insert(session_id.clone(), record);
570 self.processes.write().await.insert(
571 session_id.clone(),
572 ManagedProcess {
573 process: process_type,
574 acp_session_id: acp_session_id.clone(),
575 preset_id: provider_name.clone(),
576 created_at,
577 trace_writer: trace_writer.clone(),
578 cwd: cwd.clone(),
579 },
580 );
581 self.notification_channels
582 .write()
583 .await
584 .insert(session_id.clone(), ntx.clone());
585 self.spawn_history_mirror(&session_id, &ntx);
586
587 let trace = TraceRecord::new(
588 &session_id,
589 TraceEventType::SessionStart,
590 Contributor::new(&provider_name, None),
591 )
592 .with_workspace_id(&workspace_id)
593 .with_metadata(
594 "role",
595 serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
596 )
597 .with_metadata("cwd", serde_json::json!(cwd));
598
599 trace_writer.append_safe(&trace).await;
600 }
601
602 #[allow(clippy::too_many_arguments)]
603 pub async fn create_session_from_inline(
604 &self,
605 session_id: String,
606 cwd: String,
607 workspace_id: String,
608 provider_name: String,
609 role: Option<String>,
610 model: Option<String>,
611 parent_session_id: Option<String>,
612 command: String,
613 args: Vec<String>,
614 options: SessionLaunchOptions,
615 ) -> Result<(String, String), String> {
616 validate_session_cwd(&cwd)?;
617 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
618
619 let process = AcpProcess::spawn(
620 &command,
621 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
622 &cwd,
623 ntx.clone(),
624 &provider_name,
625 &session_id,
626 )
627 .await?;
628
629 process
630 .initialize_with_timeout(options.initialize_timeout_ms)
631 .await?;
632
633 let acp_session_id = process
634 .new_session(&cwd, options.acp_mcp_servers.as_deref().unwrap_or(&[]))
635 .await?;
636 self.register_managed_session(
637 session_id.clone(),
638 cwd.clone(),
639 workspace_id.clone(),
640 provider_name.clone(),
641 role.clone(),
642 model.clone(),
643 parent_session_id.clone(),
644 &options,
645 AgentProcessType::Acp(Arc::new(process)),
646 acp_session_id.clone(),
647 ntx.clone(),
648 )
649 .await;
650
651 tracing::info!(
652 "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
653 session_id,
654 provider_name,
655 acp_session_id,
656 );
657
658 Ok((session_id, acp_session_id))
659 }
660
661 #[allow(clippy::too_many_arguments)]
662 pub async fn load_session_from_inline(
663 &self,
664 session_id: String,
665 cwd: String,
666 workspace_id: String,
667 provider_name: String,
668 role: Option<String>,
669 model: Option<String>,
670 parent_session_id: Option<String>,
671 command: String,
672 args: Vec<String>,
673 provider_session_id: Option<String>,
674 options: SessionLaunchOptions,
675 ) -> Result<(String, String), String> {
676 validate_session_cwd(&cwd)?;
677 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
678
679 let process = AcpProcess::spawn(
680 &command,
681 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
682 &cwd,
683 ntx.clone(),
684 &provider_name,
685 &session_id,
686 )
687 .await?;
688
689 process
690 .initialize_with_timeout(options.initialize_timeout_ms)
691 .await?;
692
693 let resolved_provider_session_id =
694 provider_session_id.unwrap_or_else(|| session_id.clone());
695 let acp_session_id = process
696 .load_session(
697 &resolved_provider_session_id,
698 &cwd,
699 options.acp_mcp_servers.as_deref().unwrap_or(&[]),
700 )
701 .await?;
702
703 self.register_managed_session(
704 session_id.clone(),
705 cwd.clone(),
706 workspace_id.clone(),
707 provider_name.clone(),
708 role.clone(),
709 model.clone(),
710 parent_session_id.clone(),
711 &options,
712 AgentProcessType::Acp(Arc::new(process)),
713 acp_session_id.clone(),
714 ntx.clone(),
715 )
716 .await;
717
718 tracing::info!(
719 "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
720 session_id,
721 provider_name,
722 acp_session_id,
723 );
724
725 Ok((session_id, acp_session_id))
726 }
727
728 #[allow(clippy::too_many_arguments)]
729 pub async fn create_session_with_options(
730 &self,
731 session_id: String,
732 cwd: String,
733 workspace_id: String,
734 provider: Option<String>,
735 role: Option<String>,
736 model: Option<String>,
737 parent_session_id: Option<String>,
738 tool_mode: Option<String>,
739 mcp_profile: Option<String>,
740 options: SessionLaunchOptions,
741 ) -> Result<(String, String), String> {
742 validate_session_cwd(&cwd)?;
743 let provider_name = provider.as_deref().unwrap_or("opencode");
744 let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
745 options.acp_mcp_servers.clone().unwrap_or_else(|| {
746 mcp_setup::build_acp_http_mcp_servers(
747 &workspace_id,
748 &session_id,
749 tool_mode.as_deref(),
750 mcp_profile.as_deref(),
751 )
752 })
753 } else {
754 Vec::new()
755 };
756
757 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
759 let claude_mcp_config = if provider_name == "claude" {
760 Some(mcp_setup::build_claude_mcp_config(
761 &workspace_id,
762 &session_id,
763 tool_mode.as_deref(),
764 mcp_profile.as_deref(),
765 ))
766 } else {
767 None
768 };
769
770 let (process_type, acp_session_id) = if provider_name == "claude" {
772 let config = ClaudeCodeConfig {
774 command: "claude".to_string(),
775 cwd: cwd.clone(),
776 display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
777 permission_mode: Some("bypassPermissions".to_string()),
778 mcp_configs: claude_mcp_config.into_iter().collect(),
779 append_system_prompt: options.specialist_system_prompt.clone(),
780 allowed_tools: options.allowed_native_tools.clone(),
781 };
782
783 let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
784 let claude_session_id = claude_process
785 .session_id()
786 .await
787 .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
788
789 (
790 AgentProcessType::Claude(Arc::new(claude_process)),
791 claude_session_id,
792 )
793 } else {
794 let preset = get_preset_by_id_with_registry(provider_name).await?;
796
797 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
798 provider_name,
799 &cwd,
800 &workspace_id,
801 &session_id,
802 tool_mode.as_deref(),
803 mcp_profile.as_deref(),
804 )
805 .await?
806 {
807 tracing::info!("[AcpManager] {}", summary);
808 }
809
810 let mut extra_args: Vec<String> = preset.args.clone();
812 if matches!(provider_name, "codex" | "codex-acp") {
813 for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
814 extra_args.push("-c".to_string());
815 extra_args.push(override_arg);
816 }
817 }
818 if let Some(provider_args) = options.provider_args.clone() {
819 extra_args.extend(provider_args);
820 }
821 if let Some(ref m) = model {
822 if !m.is_empty() {
823 extra_args.push("-m".to_string());
825 extra_args.push(m.clone());
826 }
827 }
828
829 let preset_command = resolve_preset_command(&preset);
830 let process = AcpProcess::spawn(
831 &preset_command,
832 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
833 &cwd,
834 ntx.clone(),
835 &preset.name,
836 &session_id,
837 )
838 .await?;
839
840 process
842 .initialize_with_timeout(options.initialize_timeout_ms)
843 .await?;
844
845 let agent_session_id = process.new_session(&cwd, &acp_mcp_servers).await?;
847
848 (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
849 };
850
851 self.register_managed_session(
852 session_id.clone(),
853 cwd.clone(),
854 workspace_id.clone(),
855 provider_name.to_string(),
856 role.clone(),
857 model.clone(),
858 parent_session_id.clone(),
859 &options,
860 process_type,
861 acp_session_id.clone(),
862 ntx.clone(),
863 )
864 .await;
865
866 tracing::info!(
867 "[AcpManager] Session {} created (provider: {}, agent session: {})",
868 session_id,
869 provider_name,
870 acp_session_id,
871 );
872
873 Ok((session_id, acp_session_id))
874 }
875
876 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
878 self.mark_first_prompt_sent(session_id).await;
879
880 let (process, acp_session_id, preset_id, trace_writer) = {
881 let processes = self.processes.read().await;
882 let managed = processes
883 .get(session_id)
884 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
885 (
886 managed.process.clone(),
887 managed.acp_session_id.clone(),
888 managed.preset_id.clone(),
889 managed.trace_writer.clone(),
890 )
891 };
892
893 let is_alive = match &process {
894 AgentProcessType::Acp(p) => p.is_alive(),
895 AgentProcessType::Claude(p) => p.is_alive(),
896 };
897
898 if !is_alive {
899 return Err(format!("Agent ({}) process is not running", preset_id));
900 }
901
902 let trace = TraceRecord::new(
904 session_id,
905 TraceEventType::UserMessage,
906 Contributor::new(&preset_id, None),
907 )
908 .with_conversation(TraceConversation {
909 turn: None,
910 role: Some("user".to_string()),
911 content_preview: Some(truncate_content(text, 500)),
912 full_content: None,
913 });
914
915 trace_writer.append_safe(&trace).await;
916
917 tracing::info!(
918 target: "routa_acp_prompt",
919 session_id = %session_id,
920 preset_id = %preset_id,
921 acp_session_id = %acp_session_id,
922 prompt_len = text.len(),
923 "acp prompt start"
924 );
925
926 let result = match &process {
927 AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
928 AgentProcessType::Claude(p) => {
929 let stop_reason = p.prompt(text).await?;
930 Ok(serde_json::json!({ "stopReason": stop_reason }))
931 }
932 };
933
934 match &result {
935 Ok(_) => tracing::info!(
936 target: "routa_acp_prompt",
937 session_id = %session_id,
938 preset_id = %preset_id,
939 "acp prompt success"
940 ),
941 Err(error) => tracing::error!(
942 target: "routa_acp_prompt",
943 session_id = %session_id,
944 preset_id = %preset_id,
945 error = %error,
946 "acp prompt failed"
947 ),
948 }
949
950 result
951 }
952
953 pub async fn cancel(&self, session_id: &str) {
955 let processes = self.processes.read().await;
956 if let Some(managed) = processes.get(session_id) {
957 match &managed.process {
958 AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
959 AgentProcessType::Claude(p) => p.cancel().await,
960 }
961 }
962 }
963
964 pub async fn kill_session(&self, session_id: &str) {
966 if let Some(managed) = self.processes.write().await.remove(session_id) {
968 let trace = TraceRecord::new(
970 session_id,
971 TraceEventType::SessionEnd,
972 Contributor::new(&managed.preset_id, None),
973 );
974 managed.trace_writer.append_safe(&trace).await;
975
976 match &managed.process {
977 AgentProcessType::Acp(p) => p.kill().await,
978 AgentProcessType::Claude(p) => p.kill().await,
979 }
980 }
981 self.sessions.write().await.remove(session_id);
983 self.notification_channels.write().await.remove(session_id);
985 }
986
987 pub async fn subscribe(
990 &self,
991 session_id: &str,
992 ) -> Option<broadcast::Receiver<serde_json::Value>> {
993 let channels = self.notification_channels.read().await;
994 channels.get(session_id).map(|tx| tx.subscribe())
995 }
996
997 pub async fn is_alive(&self, session_id: &str) -> bool {
999 let processes = self.processes.read().await;
1000 processes
1001 .get(session_id)
1002 .map(|m| match &m.process {
1003 AgentProcessType::Acp(p) => p.is_alive(),
1004 AgentProcessType::Claude(p) => p.is_alive(),
1005 })
1006 .unwrap_or(false)
1007 }
1008
1009 pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
1011 let processes = self.processes.read().await;
1012 processes
1013 .get(session_id)
1014 .map(|managed| managed.acp_session_id.clone())
1015 }
1016
1017 pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
1019 let processes = self.processes.read().await;
1020 processes.get(session_id).map(|m| m.preset_id.clone())
1021 }
1022
1023 pub async fn is_claude_session(&self, session_id: &str) -> bool {
1025 let processes = self.processes.read().await;
1026 processes
1027 .get(session_id)
1028 .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
1029 .unwrap_or(false)
1030 }
1031
1032 pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
1036 let processes = self.processes.read().await;
1037 let managed = processes
1038 .get(session_id)
1039 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
1040
1041 let trace = TraceRecord::new(
1043 session_id,
1044 TraceEventType::UserMessage,
1045 Contributor::new(&managed.preset_id, None),
1046 )
1047 .with_conversation(TraceConversation {
1048 turn: None,
1049 role: Some("user".to_string()),
1050 content_preview: Some(truncate_content(text, 500)),
1051 full_content: Some(text.to_string()),
1052 });
1053
1054 managed.trace_writer.append_safe(&trace).await;
1055
1056 match &managed.process {
1057 AgentProcessType::Claude(p) => {
1058 let process = Arc::clone(p);
1060 let text = text.to_string();
1061 tokio::spawn(async move {
1062 let _ = process.prompt(&text).await;
1063 });
1064 Ok(())
1065 }
1066 AgentProcessType::Acp(_) => {
1067 Err("prompt_claude_async is only for Claude sessions".to_string())
1068 }
1069 }
1070 }
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1077#[serde(rename_all = "camelCase")]
1078pub struct ResumeCapability {
1079 pub supported: bool,
1080 pub mode: String,
1082 #[serde(default)]
1083 #[serde(skip_serializing_if = "Option::is_none")]
1084 pub supports_fork: Option<bool>,
1085 #[serde(default)]
1086 #[serde(skip_serializing_if = "Option::is_none")]
1087 pub supports_list: Option<bool>,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1092pub struct AcpPreset {
1093 pub id: String,
1095 pub name: String,
1097 pub command: String,
1098 pub args: Vec<String>,
1099 pub description: String,
1100 #[serde(default)]
1101 #[serde(skip_serializing_if = "Option::is_none")]
1102 pub env_bin_override: Option<String>,
1103 #[serde(default)]
1105 #[serde(skip_serializing_if = "Option::is_none")]
1106 pub resume: Option<ResumeCapability>,
1107}
1108
1109pub fn get_presets() -> Vec<AcpPreset> {
1111 vec![
1112 AcpPreset {
1113 id: "opencode".to_string(),
1114 name: "OpenCode".to_string(),
1115 command: "opencode".to_string(),
1116 args: vec!["acp".to_string()],
1117 description: "OpenCode AI coding agent".to_string(),
1118 env_bin_override: Some("OPENCODE_BIN".to_string()),
1119 resume: Some(ResumeCapability {
1120 supported: true,
1121 mode: "replay".to_string(),
1122 supports_fork: None,
1123 supports_list: None,
1124 }),
1125 },
1126 AcpPreset {
1127 id: "gemini".to_string(),
1128 name: "Gemini".to_string(),
1129 command: "gemini".to_string(),
1130 args: vec!["--experimental-acp".to_string()],
1131 description: "Google Gemini CLI".to_string(),
1132 env_bin_override: None,
1133 resume: None,
1134 },
1135 AcpPreset {
1136 id: "codex-acp".to_string(),
1137 name: "Codex".to_string(),
1138 command: "codex-acp".to_string(),
1139 args: vec![],
1140 description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1141 env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1142 resume: Some(ResumeCapability {
1143 supported: true,
1144 mode: "both".to_string(),
1145 supports_fork: None,
1146 supports_list: Some(true),
1147 }),
1148 },
1149 AcpPreset {
1150 id: "copilot".to_string(),
1151 name: "GitHub Copilot".to_string(),
1152 command: "copilot".to_string(),
1153 args: vec![
1154 "--acp".to_string(),
1155 "--allow-all-tools".to_string(),
1156 "--no-ask-user".to_string(),
1157 ],
1158 description: "GitHub Copilot CLI".to_string(),
1159 env_bin_override: Some("COPILOT_BIN".to_string()),
1160 resume: None,
1161 },
1162 AcpPreset {
1163 id: "auggie".to_string(),
1164 name: "Auggie".to_string(),
1165 command: "auggie".to_string(),
1166 args: vec!["--acp".to_string()],
1167 description: "Augment Code's AI agent".to_string(),
1168 env_bin_override: None,
1169 resume: None,
1170 },
1171 AcpPreset {
1172 id: "kimi".to_string(),
1173 name: "Kimi".to_string(),
1174 command: "kimi".to_string(),
1175 args: vec!["acp".to_string()],
1176 description: "Moonshot AI's Kimi CLI".to_string(),
1177 env_bin_override: None,
1178 resume: None,
1179 },
1180 AcpPreset {
1181 id: "kiro".to_string(),
1182 name: "Kiro".to_string(),
1183 command: "kiro-cli".to_string(),
1184 args: vec!["acp".to_string()],
1185 description: "Amazon Kiro AI coding agent".to_string(),
1186 env_bin_override: Some("KIRO_BIN".to_string()),
1187 resume: None,
1188 },
1189 AcpPreset {
1190 id: "qoder".to_string(),
1191 name: "Qoder".to_string(),
1192 command: "qodercli".to_string(),
1193 args: vec!["--acp".to_string()],
1194 description: "Qoder AI coding agent".to_string(),
1195 env_bin_override: Some("QODER_BIN".to_string()),
1196 resume: None,
1197 },
1198 AcpPreset {
1199 id: "claude".to_string(),
1200 name: "Claude Code".to_string(),
1201 command: "claude".to_string(),
1202 args: vec![],
1205 description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1206 env_bin_override: Some("CLAUDE_BIN".to_string()),
1207 resume: Some(ResumeCapability {
1208 supported: true,
1209 mode: "replay".to_string(),
1210 supports_fork: Some(true),
1211 supports_list: None,
1212 }),
1213 },
1214 ]
1215}
1216
1217pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1219 let normalized_id = match id {
1220 "codex" => "codex-acp",
1221 "qodercli" => "qoder",
1222 other => other,
1223 };
1224 get_presets().into_iter().find(|p| p.id == normalized_id)
1225}
1226
1227pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1229 get_preset_by_id(provider).and_then(|p| p.resume)
1230}
1231
1232pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1238 let normalized_id = match id {
1239 "codex" => "codex-acp",
1240 "qodercli" => "qoder",
1241 other => other,
1242 };
1243
1244 const REGISTRY_SUFFIX: &str = "-registry";
1247 if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1248 let mut preset = get_registry_preset(base_id).await?;
1249 preset.id = id.to_string();
1251 return Ok(preset);
1252 }
1253
1254 if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1256 if preset.id != id {
1257 preset.id = id.to_string();
1258 }
1259 return Ok(preset);
1260 }
1261
1262 let mut preset = get_registry_preset(normalized_id).await?;
1264 if preset.id != id {
1265 preset.id = id.to_string();
1266 }
1267 Ok(preset)
1268}
1269
1270async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1272 let registry: AcpRegistry = fetch_registry().await?;
1273
1274 let agent = registry
1276 .agents
1277 .into_iter()
1278 .find(|a| a.id == id)
1279 .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
1280
1281 let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1283 let mut args = vec!["-y".to_string(), npx.package.clone()];
1284 args.extend(npx.args.clone());
1285 ("npx".to_string(), args)
1286 } else if let Some(ref uvx) = agent.distribution.uvx {
1287 let mut args = vec![uvx.package.clone()];
1288 args.extend(uvx.args.clone());
1289 ("uvx".to_string(), args)
1290 } else {
1291 return Err(format!(
1292 "Agent '{}' has no supported distribution (npx/uvx)",
1293 id
1294 ));
1295 };
1296
1297 Ok(AcpPreset {
1298 id: agent.id.clone(),
1299 name: agent.name,
1300 command,
1301 args,
1302 description: agent.description,
1303 env_bin_override: None,
1304 resume: None,
1305 })
1306}
1307
1308fn resolve_preset_command(preset: &AcpPreset) -> String {
1309 if let Some(env_var) = &preset.env_bin_override {
1310 if let Ok(custom_command) = std::env::var(env_var) {
1311 let trimmed = custom_command.trim();
1312 if !trimmed.is_empty() {
1313 return trimmed.to_string();
1314 }
1315 }
1316 }
1317
1318 crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1319}
1320
1321fn truncate_content(text: &str, max_len: usize) -> String {
1325 if text.chars().count() <= max_len {
1326 text.to_string()
1327 } else if max_len <= 3 {
1328 text.chars().take(max_len).collect()
1329 } else {
1330 let truncated: String = text.chars().take(max_len - 3).collect();
1331 format!("{truncated}...")
1332 }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::{
1338 get_preset_by_id_with_registry, get_presets, truncate_content, validate_session_cwd,
1339 AcpManager, AcpSessionRecord,
1340 };
1341 use std::collections::HashMap;
1342 use std::fs;
1343 use std::sync::Arc;
1344 use tokio::sync::RwLock;
1345
1346 #[test]
1347 fn static_presets_include_codex_acp_for_codex_alias() {
1348 let presets = get_presets();
1349 assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1350 }
1351
1352 #[test]
1353 fn static_presets_include_qoder() {
1354 let presets = get_presets();
1355 assert!(presets.iter().any(|preset| preset.id == "qoder"));
1356 }
1357
1358 #[tokio::test]
1359 async fn qodercli_alias_resolves_to_qoder_preset() {
1360 let preset = get_preset_by_id_with_registry("qodercli")
1361 .await
1362 .expect("qodercli alias should resolve");
1363 assert_eq!(preset.id, "qodercli");
1364 assert_eq!(preset.command, "qodercli");
1365 assert_eq!(preset.args, vec!["--acp".to_string()]);
1366 }
1367
1368 #[test]
1369 fn validate_session_cwd_rejects_missing_or_non_directory_paths() {
1370 let temp = tempfile::tempdir().expect("tempdir should create");
1371 let missing = temp.path().join("missing-dir");
1372 let file_path = temp.path().join("not-a-dir.txt");
1373 fs::write(&file_path, "content").expect("file should write");
1374
1375 let missing_error = validate_session_cwd(missing.to_string_lossy().as_ref())
1376 .expect_err("missing directory should fail");
1377 assert!(missing_error.contains("directory does not exist"));
1378
1379 let file_error = validate_session_cwd(file_path.to_string_lossy().as_ref())
1380 .expect_err("file path should fail");
1381 assert!(file_error.contains("path is not a directory"));
1382
1383 validate_session_cwd(temp.path().to_string_lossy().as_ref())
1384 .expect("existing directory should pass");
1385 }
1386
1387 #[tokio::test]
1388 async fn mark_first_prompt_sent_updates_live_session_record() {
1389 let manager = AcpManager::new();
1390 let session_id = "session-1".to_string();
1391 manager.sessions.write().await.insert(
1392 session_id.clone(),
1393 AcpSessionRecord {
1394 session_id: session_id.clone(),
1395 name: None,
1396 cwd: ".".to_string(),
1397 workspace_id: "default".to_string(),
1398 routa_agent_id: None,
1399 provider: Some("opencode".to_string()),
1400 role: Some("CRAFTER".to_string()),
1401 mode_id: None,
1402 model: None,
1403 created_at: chrono::Utc::now().to_rfc3339(),
1404 first_prompt_sent: false,
1405 parent_session_id: None,
1406 specialist_id: None,
1407 specialist_system_prompt: None,
1408 },
1409 );
1410
1411 manager.mark_first_prompt_sent(&session_id).await;
1412
1413 let session = manager.get_session(&session_id).await.expect("session");
1414 assert!(session.first_prompt_sent);
1415 }
1416
1417 #[tokio::test]
1418 async fn push_to_history_skips_parent_child_forwarding_noise() {
1419 let manager = AcpManager {
1420 sessions: Arc::new(RwLock::new(HashMap::new())),
1421 processes: Arc::new(RwLock::new(HashMap::new())),
1422 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1423 history: Arc::new(RwLock::new(HashMap::new())),
1424 };
1425
1426 manager
1427 .push_to_history(
1428 "parent",
1429 serde_json::json!({
1430 "sessionId": "parent",
1431 "childAgentId": "child-1",
1432 "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1433 }),
1434 )
1435 .await;
1436
1437 let history = manager
1438 .get_session_history("parent")
1439 .await
1440 .unwrap_or_default();
1441 assert!(history.is_empty());
1442 }
1443
1444 #[tokio::test]
1445 async fn emit_session_update_broadcasts_when_channel_exists() {
1446 let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1447 let manager = AcpManager {
1448 sessions: Arc::new(RwLock::new(HashMap::new())),
1449 processes: Arc::new(RwLock::new(HashMap::new())),
1450 notification_channels: Arc::new(RwLock::new(HashMap::from([(
1451 "session-1".to_string(),
1452 tx,
1453 )]))),
1454 history: Arc::new(RwLock::new(HashMap::new())),
1455 };
1456
1457 manager
1458 .emit_session_update(
1459 "session-1",
1460 serde_json::json!({
1461 "sessionUpdate": "turn_complete",
1462 "stopReason": "cancelled"
1463 }),
1464 )
1465 .await
1466 .expect("emit should succeed");
1467
1468 let broadcast = rx.recv().await.expect("broadcast event");
1469 assert_eq!(
1470 broadcast["params"]["update"]["sessionUpdate"].as_str(),
1471 Some("turn_complete")
1472 );
1473 assert_eq!(
1474 broadcast["params"]["update"]["stopReason"].as_str(),
1475 Some("cancelled")
1476 );
1477 }
1478
1479 #[tokio::test]
1480 async fn emit_session_update_persists_history_without_channel() {
1481 let manager = AcpManager {
1482 sessions: Arc::new(RwLock::new(HashMap::new())),
1483 processes: Arc::new(RwLock::new(HashMap::new())),
1484 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1485 history: Arc::new(RwLock::new(HashMap::new())),
1486 };
1487
1488 manager
1489 .emit_session_update(
1490 "session-1",
1491 serde_json::json!({
1492 "sessionUpdate": "turn_complete",
1493 "stopReason": "cancelled"
1494 }),
1495 )
1496 .await
1497 .expect("emit should succeed");
1498
1499 let history = manager
1500 .get_session_history("session-1")
1501 .await
1502 .expect("history should exist");
1503 assert_eq!(history.len(), 1);
1504 assert_eq!(
1505 history[0]["update"]["sessionUpdate"].as_str(),
1506 Some("turn_complete")
1507 );
1508 }
1509
1510 #[test]
1511 fn rewrite_notification_session_id_overrides_provider_session_id() {
1512 let rewritten = AcpManager::rewrite_notification_session_id(
1513 "child-session",
1514 serde_json::json!({
1515 "sessionId": "provider-session",
1516 "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1517 }),
1518 );
1519
1520 assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1521 }
1522
1523 #[test]
1524 fn truncate_content_handles_unicode_boundaries() {
1525 assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1526 assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1527 assert_eq!(truncate_content("短文本", 10), "短文本");
1528 }
1529}