1pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::path::Path;
44use std::sync::Arc;
45
46use serde::{Deserialize, Serialize};
47use tokio::sync::{broadcast, RwLock};
48
49use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
50use process::AcpProcess;
51
52#[cfg(windows)]
53pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
54
55fn validate_session_cwd(cwd: &str) -> Result<(), String> {
56 let path = Path::new(cwd);
57 if !path.exists() {
58 return Err(format!(
59 "Invalid session cwd '{cwd}': directory does not exist"
60 ));
61 }
62 if !path.is_dir() {
63 return Err(format!(
64 "Invalid session cwd '{cwd}': path is not a directory"
65 ));
66 }
67 Ok(())
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
74#[serde(rename_all = "camelCase")]
75pub struct AcpSessionRecord {
76 pub session_id: String,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub name: Option<String>,
79 pub cwd: String,
80 pub workspace_id: String,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub routa_agent_id: Option<String>,
83 pub provider: Option<String>,
84 pub role: Option<String>,
85 pub mode_id: Option<String>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub model: Option<String>,
88 pub created_at: String,
89 #[serde(default)]
90 pub first_prompt_sent: bool,
91 #[serde(skip_serializing_if = "Option::is_none")]
93 pub parent_session_id: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub specialist_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub specialist_system_prompt: Option<String>,
98}
99
100#[derive(Debug, Clone, Default)]
101pub struct SessionLaunchOptions {
102 pub specialist_id: Option<String>,
103 pub specialist_system_prompt: Option<String>,
104 pub allowed_native_tools: Option<Vec<String>>,
105 pub initialize_timeout_ms: Option<u64>,
106 pub provider_args: Option<Vec<String>>,
107 pub acp_mcp_servers: Option<Vec<serde_json::Value>>,
108}
109
110#[derive(Clone)]
114enum AgentProcessType {
115 Acp(Arc<AcpProcess>),
117 Claude(Arc<ClaudeCodeProcess>),
119}
120
121impl AgentProcessType {
122 async fn kill(&self) {
124 match self {
125 AgentProcessType::Acp(process) => process.kill().await,
126 AgentProcessType::Claude(process) => process.kill().await,
127 }
128 }
129}
130
131struct ManagedProcess {
133 process: AgentProcessType,
134 acp_session_id: String,
136 preset_id: String,
137 #[allow(dead_code)]
138 created_at: String,
139 trace_writer: TraceWriter,
141 #[allow(dead_code)]
143 cwd: String,
144}
145
146#[derive(Clone)]
153pub struct AcpManager {
154 sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
156 processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
158 notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
160 history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
162}
163
164impl Default for AcpManager {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170impl AcpManager {
171 pub fn rewrite_notification_session_id(
172 session_id: &str,
173 mut notification: serde_json::Value,
174 ) -> serde_json::Value {
175 if let Some(object) = notification.as_object_mut() {
176 object.insert(
177 "sessionId".to_string(),
178 serde_json::Value::String(session_id.to_string()),
179 );
180 }
181 notification
182 }
183
184 pub fn new() -> Self {
185 Self {
186 sessions: Arc::new(RwLock::new(HashMap::new())),
187 processes: Arc::new(RwLock::new(HashMap::new())),
188 notification_channels: Arc::new(RwLock::new(HashMap::new())),
189 history: Arc::new(RwLock::new(HashMap::new())),
190 }
191 }
192
193 pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
195 let sessions = self.sessions.read().await;
196 sessions.values().cloned().collect()
197 }
198
199 pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
201 let sessions = self.sessions.read().await;
202 sessions.get(session_id).cloned()
203 }
204
205 pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
208 let mut sessions = self.sessions.write().await;
209 let session = sessions.get_mut(session_id)?;
210 session.name = Some(name.to_string());
211 Some(())
212 }
213
214 pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
217 let mut sessions = self.sessions.write().await;
218 let session = sessions.get_mut(session_id)?;
219 session.routa_agent_id = Some(routa_agent_id.to_string());
220 Some(())
221 }
222
223 pub async fn delete_session(&self, session_id: &str) -> Option<()> {
226 let mut sessions = self.sessions.write().await;
227 let mut processes = self.processes.write().await;
228 let mut channels = self.notification_channels.write().await;
229 let mut history = self.history.write().await;
230
231 sessions.remove(session_id)?;
233
234 if let Some(managed) = processes.remove(session_id) {
236 let _ = managed.process.kill().await;
237 }
238
239 channels.remove(session_id);
241
242 history.remove(session_id);
244
245 Some(())
246 }
247
248 pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
251 let history = self.history.read().await;
252 history.get(session_id).cloned()
253 }
254
255 pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
260 if notification.get("childAgentId").is_some() {
262 return;
263 }
264 let mut history = self.history.write().await;
265 let entries = history.entry(session_id.to_string()).or_default();
266 entries.push(notification);
267 if entries.len() > 500 {
269 let drain_count = entries.len() - 500;
270 entries.drain(0..drain_count);
271 }
272 }
273
274 pub async fn emit_session_update(
276 &self,
277 session_id: &str,
278 update: serde_json::Value,
279 ) -> Result<(), String> {
280 let message = serde_json::json!({
281 "jsonrpc": "2.0",
282 "method": "session/update",
283 "params": {
284 "sessionId": session_id,
285 "update": update,
286 }
287 });
288
289 if let Some(channel) = self
290 .notification_channels
291 .read()
292 .await
293 .get(session_id)
294 .cloned()
295 {
296 let _ = channel.send(message.clone());
297 } else {
298 let params = message
299 .get("params")
300 .cloned()
301 .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
302 self.push_to_history(
303 session_id,
304 Self::rewrite_notification_session_id(session_id, params),
305 )
306 .await;
307 }
308 Ok(())
309 }
310
311 pub async fn mark_first_prompt_sent(&self, session_id: &str) {
313 let mut sessions = self.sessions.write().await;
314 if let Some(session) = sessions.get_mut(session_id) {
315 session.first_prompt_sent = true;
316 }
317 }
318
319 #[allow(clippy::too_many_arguments)]
325 pub async fn create_session(
326 &self,
327 session_id: String,
328 cwd: String,
329 workspace_id: String,
330 provider: Option<String>,
331 role: Option<String>,
332 model: Option<String>,
333 parent_session_id: Option<String>,
334 tool_mode: Option<String>,
335 mcp_profile: Option<String>,
336 ) -> Result<(String, String), String> {
337 self.create_session_with_options(
338 session_id,
339 cwd,
340 workspace_id,
341 provider,
342 role,
343 model,
344 parent_session_id,
345 tool_mode,
346 mcp_profile,
347 SessionLaunchOptions::default(),
348 )
349 .await
350 }
351
352 #[allow(clippy::too_many_arguments)]
356 pub async fn load_session_with_options(
357 &self,
358 session_id: String,
359 cwd: String,
360 workspace_id: String,
361 provider: Option<String>,
362 role: Option<String>,
363 model: Option<String>,
364 parent_session_id: Option<String>,
365 tool_mode: Option<String>,
366 mcp_profile: Option<String>,
367 provider_session_id: Option<String>,
368 options: SessionLaunchOptions,
369 ) -> Result<(String, String), String> {
370 validate_session_cwd(&cwd)?;
371 let provider_name = provider.as_deref().unwrap_or("opencode");
372 let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
373 options.acp_mcp_servers.clone().unwrap_or_else(|| {
374 mcp_setup::build_acp_http_mcp_servers(
375 &workspace_id,
376 &session_id,
377 tool_mode.as_deref(),
378 mcp_profile.as_deref(),
379 )
380 })
381 } else {
382 Vec::new()
383 };
384
385 if provider_name == "claude" {
386 return Err("Native session/load is not supported for Claude".to_string());
387 }
388
389 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
390 let preset = get_preset_by_id_with_registry(provider_name).await?;
391
392 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
393 provider_name,
394 &cwd,
395 &workspace_id,
396 &session_id,
397 tool_mode.as_deref(),
398 mcp_profile.as_deref(),
399 )
400 .await?
401 {
402 tracing::info!("[AcpManager] {}", summary);
403 }
404
405 let mut extra_args: Vec<String> = preset.args.clone();
406 if matches!(provider_name, "codex" | "codex-acp") {
407 for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
408 extra_args.push("-c".to_string());
409 extra_args.push(override_arg);
410 }
411 }
412 if let Some(provider_args) = options.provider_args.clone() {
413 extra_args.extend(provider_args);
414 }
415 if let Some(ref m) = model {
416 if !m.is_empty() {
417 extra_args.push("-m".to_string());
418 extra_args.push(m.clone());
419 }
420 }
421
422 let preset_command = resolve_preset_command(&preset);
423 let process = AcpProcess::spawn(
424 &preset_command,
425 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
426 &cwd,
427 ntx.clone(),
428 &preset.name,
429 &session_id,
430 )
431 .await?;
432
433 process
434 .initialize_with_timeout(options.initialize_timeout_ms)
435 .await?;
436
437 let resolved_provider_session_id =
438 provider_session_id.unwrap_or_else(|| session_id.clone());
439 let acp_session_id = process
440 .load_session(&resolved_provider_session_id, &cwd, &acp_mcp_servers)
441 .await?;
442
443 self.register_managed_session(
444 session_id.clone(),
445 cwd.clone(),
446 workspace_id.clone(),
447 provider_name.to_string(),
448 role.clone(),
449 model.clone(),
450 parent_session_id.clone(),
451 &options,
452 AgentProcessType::Acp(Arc::new(process)),
453 acp_session_id.clone(),
454 ntx.clone(),
455 )
456 .await;
457
458 tracing::info!(
459 "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
460 session_id,
461 provider_name,
462 acp_session_id,
463 );
464
465 Ok((session_id, acp_session_id))
466 }
467
468 #[allow(clippy::too_many_arguments)]
469 pub async fn load_session(
470 &self,
471 session_id: String,
472 cwd: String,
473 workspace_id: String,
474 provider: Option<String>,
475 role: Option<String>,
476 model: Option<String>,
477 parent_session_id: Option<String>,
478 tool_mode: Option<String>,
479 mcp_profile: Option<String>,
480 provider_session_id: Option<String>,
481 ) -> Result<(String, String), String> {
482 self.load_session_with_options(
483 session_id,
484 cwd,
485 workspace_id,
486 provider,
487 role,
488 model,
489 parent_session_id,
490 tool_mode,
491 mcp_profile,
492 provider_session_id,
493 SessionLaunchOptions::default(),
494 )
495 .await
496 }
497
498 fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
499 let history_manager = self.clone();
500 let history_session_id = session_id.to_string();
501 let mut history_rx = ntx.subscribe();
502 tokio::spawn(async move {
503 loop {
504 match history_rx.recv().await {
505 Ok(message) => {
506 let params = match message.get("params") {
507 Some(value) => value.clone(),
508 None => continue,
509 };
510 history_manager
511 .push_to_history(
512 &history_session_id,
513 Self::rewrite_notification_session_id(&history_session_id, params),
514 )
515 .await;
516 }
517 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
518 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
519 tracing::warn!(
520 "[AcpManager] Dropped {} session/update notifications for {}",
521 skipped,
522 history_session_id
523 );
524 }
525 }
526 }
527 });
528 }
529
530 #[allow(clippy::too_many_arguments)]
531 async fn register_managed_session(
532 &self,
533 session_id: String,
534 cwd: String,
535 workspace_id: String,
536 provider_name: String,
537 role: Option<String>,
538 model: Option<String>,
539 parent_session_id: Option<String>,
540 options: &SessionLaunchOptions,
541 process_type: AgentProcessType,
542 acp_session_id: String,
543 ntx: broadcast::Sender<serde_json::Value>,
544 ) {
545 let created_at = chrono::Utc::now().to_rfc3339();
546 let trace_writer = TraceWriter::new(&cwd);
547 let record = AcpSessionRecord {
548 session_id: session_id.clone(),
549 name: None,
550 cwd: cwd.clone(),
551 workspace_id: workspace_id.clone(),
552 routa_agent_id: None,
553 provider: Some(provider_name.clone()),
554 role: role.clone().or(Some("CRAFTER".to_string())),
555 mode_id: None,
556 model: model.clone(),
557 created_at: created_at.clone(),
558 first_prompt_sent: false,
559 parent_session_id: parent_session_id.clone(),
560 specialist_id: options.specialist_id.clone(),
561 specialist_system_prompt: options.specialist_system_prompt.clone(),
562 };
563
564 self.sessions
565 .write()
566 .await
567 .insert(session_id.clone(), record);
568 self.processes.write().await.insert(
569 session_id.clone(),
570 ManagedProcess {
571 process: process_type,
572 acp_session_id: acp_session_id.clone(),
573 preset_id: provider_name.clone(),
574 created_at,
575 trace_writer: trace_writer.clone(),
576 cwd: cwd.clone(),
577 },
578 );
579 self.notification_channels
580 .write()
581 .await
582 .insert(session_id.clone(), ntx.clone());
583 self.spawn_history_mirror(&session_id, &ntx);
584
585 let trace = TraceRecord::new(
586 &session_id,
587 TraceEventType::SessionStart,
588 Contributor::new(&provider_name, None),
589 )
590 .with_workspace_id(&workspace_id)
591 .with_metadata(
592 "role",
593 serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
594 )
595 .with_metadata("cwd", serde_json::json!(cwd));
596
597 trace_writer.append_safe(&trace).await;
598 }
599
600 #[allow(clippy::too_many_arguments)]
601 pub async fn create_session_from_inline(
602 &self,
603 session_id: String,
604 cwd: String,
605 workspace_id: String,
606 provider_name: String,
607 role: Option<String>,
608 model: Option<String>,
609 parent_session_id: Option<String>,
610 command: String,
611 args: Vec<String>,
612 options: SessionLaunchOptions,
613 ) -> Result<(String, String), String> {
614 validate_session_cwd(&cwd)?;
615 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
616
617 let process = AcpProcess::spawn(
618 &command,
619 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
620 &cwd,
621 ntx.clone(),
622 &provider_name,
623 &session_id,
624 )
625 .await?;
626
627 process
628 .initialize_with_timeout(options.initialize_timeout_ms)
629 .await?;
630
631 let acp_session_id = process
632 .new_session(&cwd, options.acp_mcp_servers.as_deref().unwrap_or(&[]))
633 .await?;
634 self.register_managed_session(
635 session_id.clone(),
636 cwd.clone(),
637 workspace_id.clone(),
638 provider_name.clone(),
639 role.clone(),
640 model.clone(),
641 parent_session_id.clone(),
642 &options,
643 AgentProcessType::Acp(Arc::new(process)),
644 acp_session_id.clone(),
645 ntx.clone(),
646 )
647 .await;
648
649 tracing::info!(
650 "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
651 session_id,
652 provider_name,
653 acp_session_id,
654 );
655
656 Ok((session_id, acp_session_id))
657 }
658
659 #[allow(clippy::too_many_arguments)]
660 pub async fn load_session_from_inline(
661 &self,
662 session_id: String,
663 cwd: String,
664 workspace_id: String,
665 provider_name: String,
666 role: Option<String>,
667 model: Option<String>,
668 parent_session_id: Option<String>,
669 command: String,
670 args: Vec<String>,
671 provider_session_id: Option<String>,
672 options: SessionLaunchOptions,
673 ) -> Result<(String, String), String> {
674 validate_session_cwd(&cwd)?;
675 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
676
677 let process = AcpProcess::spawn(
678 &command,
679 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
680 &cwd,
681 ntx.clone(),
682 &provider_name,
683 &session_id,
684 )
685 .await?;
686
687 process
688 .initialize_with_timeout(options.initialize_timeout_ms)
689 .await?;
690
691 let resolved_provider_session_id =
692 provider_session_id.unwrap_or_else(|| session_id.clone());
693 let acp_session_id = process
694 .load_session(
695 &resolved_provider_session_id,
696 &cwd,
697 options.acp_mcp_servers.as_deref().unwrap_or(&[]),
698 )
699 .await?;
700
701 self.register_managed_session(
702 session_id.clone(),
703 cwd.clone(),
704 workspace_id.clone(),
705 provider_name.clone(),
706 role.clone(),
707 model.clone(),
708 parent_session_id.clone(),
709 &options,
710 AgentProcessType::Acp(Arc::new(process)),
711 acp_session_id.clone(),
712 ntx.clone(),
713 )
714 .await;
715
716 tracing::info!(
717 "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
718 session_id,
719 provider_name,
720 acp_session_id,
721 );
722
723 Ok((session_id, acp_session_id))
724 }
725
726 #[allow(clippy::too_many_arguments)]
727 pub async fn create_session_with_options(
728 &self,
729 session_id: String,
730 cwd: String,
731 workspace_id: String,
732 provider: Option<String>,
733 role: Option<String>,
734 model: Option<String>,
735 parent_session_id: Option<String>,
736 tool_mode: Option<String>,
737 mcp_profile: Option<String>,
738 options: SessionLaunchOptions,
739 ) -> Result<(String, String), String> {
740 validate_session_cwd(&cwd)?;
741 let provider_name = provider.as_deref().unwrap_or("opencode");
742 let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
743 options.acp_mcp_servers.clone().unwrap_or_else(|| {
744 mcp_setup::build_acp_http_mcp_servers(
745 &workspace_id,
746 &session_id,
747 tool_mode.as_deref(),
748 mcp_profile.as_deref(),
749 )
750 })
751 } else {
752 Vec::new()
753 };
754
755 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
757 let claude_mcp_config = if provider_name == "claude" {
758 Some(mcp_setup::build_claude_mcp_config(
759 &workspace_id,
760 &session_id,
761 tool_mode.as_deref(),
762 mcp_profile.as_deref(),
763 ))
764 } else {
765 None
766 };
767
768 let (process_type, acp_session_id) = if provider_name == "claude" {
770 let config = ClaudeCodeConfig {
772 command: "claude".to_string(),
773 cwd: cwd.clone(),
774 display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
775 permission_mode: Some("bypassPermissions".to_string()),
776 mcp_configs: claude_mcp_config.into_iter().collect(),
777 append_system_prompt: options.specialist_system_prompt.clone(),
778 allowed_tools: options.allowed_native_tools.clone(),
779 };
780
781 let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
782 let claude_session_id = claude_process
783 .session_id()
784 .await
785 .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
786
787 (
788 AgentProcessType::Claude(Arc::new(claude_process)),
789 claude_session_id,
790 )
791 } else {
792 let preset = get_preset_by_id_with_registry(provider_name).await?;
794
795 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
796 provider_name,
797 &cwd,
798 &workspace_id,
799 &session_id,
800 tool_mode.as_deref(),
801 mcp_profile.as_deref(),
802 )
803 .await?
804 {
805 tracing::info!("[AcpManager] {}", summary);
806 }
807
808 let mut extra_args: Vec<String> = preset.args.clone();
810 if matches!(provider_name, "codex" | "codex-acp") {
811 for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
812 extra_args.push("-c".to_string());
813 extra_args.push(override_arg);
814 }
815 }
816 if let Some(provider_args) = options.provider_args.clone() {
817 extra_args.extend(provider_args);
818 }
819 if let Some(ref m) = model {
820 if !m.is_empty() {
821 extra_args.push("-m".to_string());
823 extra_args.push(m.clone());
824 }
825 }
826
827 let preset_command = resolve_preset_command(&preset);
828 let process = AcpProcess::spawn(
829 &preset_command,
830 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
831 &cwd,
832 ntx.clone(),
833 &preset.name,
834 &session_id,
835 )
836 .await?;
837
838 process
840 .initialize_with_timeout(options.initialize_timeout_ms)
841 .await?;
842
843 let agent_session_id = process.new_session(&cwd, &acp_mcp_servers).await?;
845
846 (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
847 };
848
849 self.register_managed_session(
850 session_id.clone(),
851 cwd.clone(),
852 workspace_id.clone(),
853 provider_name.to_string(),
854 role.clone(),
855 model.clone(),
856 parent_session_id.clone(),
857 &options,
858 process_type,
859 acp_session_id.clone(),
860 ntx.clone(),
861 )
862 .await;
863
864 tracing::info!(
865 "[AcpManager] Session {} created (provider: {}, agent session: {})",
866 session_id,
867 provider_name,
868 acp_session_id,
869 );
870
871 Ok((session_id, acp_session_id))
872 }
873
874 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
876 self.mark_first_prompt_sent(session_id).await;
877
878 let (process, acp_session_id, preset_id, trace_writer) = {
879 let processes = self.processes.read().await;
880 let managed = processes
881 .get(session_id)
882 .ok_or_else(|| format!("No agent process for session: {session_id}"))?;
883 (
884 managed.process.clone(),
885 managed.acp_session_id.clone(),
886 managed.preset_id.clone(),
887 managed.trace_writer.clone(),
888 )
889 };
890
891 let is_alive = match &process {
892 AgentProcessType::Acp(p) => p.is_alive(),
893 AgentProcessType::Claude(p) => p.is_alive(),
894 };
895
896 if !is_alive {
897 return Err(format!("Agent ({preset_id}) process is not running"));
898 }
899
900 let trace = TraceRecord::new(
902 session_id,
903 TraceEventType::UserMessage,
904 Contributor::new(&preset_id, None),
905 )
906 .with_conversation(TraceConversation {
907 turn: None,
908 role: Some("user".to_string()),
909 content_preview: Some(truncate_content(text, 500)),
910 full_content: None,
911 });
912
913 trace_writer.append_safe(&trace).await;
914
915 tracing::info!(
916 target: "routa_acp_prompt",
917 session_id = %session_id,
918 preset_id = %preset_id,
919 acp_session_id = %acp_session_id,
920 prompt_len = text.len(),
921 "acp prompt start"
922 );
923
924 let result = match &process {
925 AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
926 AgentProcessType::Claude(p) => {
927 let stop_reason = p.prompt(text).await?;
928 Ok(serde_json::json!({ "stopReason": stop_reason }))
929 }
930 };
931
932 match &result {
933 Ok(_) => tracing::info!(
934 target: "routa_acp_prompt",
935 session_id = %session_id,
936 preset_id = %preset_id,
937 "acp prompt success"
938 ),
939 Err(error) => tracing::error!(
940 target: "routa_acp_prompt",
941 session_id = %session_id,
942 preset_id = %preset_id,
943 error = %error,
944 "acp prompt failed"
945 ),
946 }
947
948 result
949 }
950
951 pub async fn cancel(&self, session_id: &str) {
953 let processes = self.processes.read().await;
954 if let Some(managed) = processes.get(session_id) {
955 match &managed.process {
956 AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
957 AgentProcessType::Claude(p) => p.cancel().await,
958 }
959 }
960 }
961
962 pub async fn kill_session(&self, session_id: &str) {
964 if let Some(managed) = self.processes.write().await.remove(session_id) {
966 let trace = TraceRecord::new(
968 session_id,
969 TraceEventType::SessionEnd,
970 Contributor::new(&managed.preset_id, None),
971 );
972 managed.trace_writer.append_safe(&trace).await;
973
974 match &managed.process {
975 AgentProcessType::Acp(p) => p.kill().await,
976 AgentProcessType::Claude(p) => p.kill().await,
977 }
978 }
979 self.sessions.write().await.remove(session_id);
981 self.notification_channels.write().await.remove(session_id);
983 }
984
985 pub async fn subscribe(
988 &self,
989 session_id: &str,
990 ) -> Option<broadcast::Receiver<serde_json::Value>> {
991 let channels = self.notification_channels.read().await;
992 channels.get(session_id).map(|tx| tx.subscribe())
993 }
994
995 pub async fn is_alive(&self, session_id: &str) -> bool {
997 let processes = self.processes.read().await;
998 processes
999 .get(session_id)
1000 .map(|m| match &m.process {
1001 AgentProcessType::Acp(p) => p.is_alive(),
1002 AgentProcessType::Claude(p) => p.is_alive(),
1003 })
1004 .unwrap_or(false)
1005 }
1006
1007 pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
1009 let processes = self.processes.read().await;
1010 processes
1011 .get(session_id)
1012 .map(|managed| managed.acp_session_id.clone())
1013 }
1014
1015 pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
1017 let processes = self.processes.read().await;
1018 processes.get(session_id).map(|m| m.preset_id.clone())
1019 }
1020
1021 pub async fn is_claude_session(&self, session_id: &str) -> bool {
1023 let processes = self.processes.read().await;
1024 processes
1025 .get(session_id)
1026 .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
1027 .unwrap_or(false)
1028 }
1029
1030 pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
1034 let processes = self.processes.read().await;
1035 let managed = processes
1036 .get(session_id)
1037 .ok_or_else(|| format!("No agent process for session: {session_id}"))?;
1038
1039 let trace = TraceRecord::new(
1041 session_id,
1042 TraceEventType::UserMessage,
1043 Contributor::new(&managed.preset_id, None),
1044 )
1045 .with_conversation(TraceConversation {
1046 turn: None,
1047 role: Some("user".to_string()),
1048 content_preview: Some(truncate_content(text, 500)),
1049 full_content: Some(text.to_string()),
1050 });
1051
1052 managed.trace_writer.append_safe(&trace).await;
1053
1054 match &managed.process {
1055 AgentProcessType::Claude(p) => {
1056 let process = Arc::clone(p);
1058 let text = text.to_string();
1059 tokio::spawn(async move {
1060 let _ = process.prompt(&text).await;
1061 });
1062 Ok(())
1063 }
1064 AgentProcessType::Acp(_) => {
1065 Err("prompt_claude_async is only for Claude sessions".to_string())
1066 }
1067 }
1068 }
1069}
1070
1071#[derive(Debug, Clone, Serialize, Deserialize)]
1075#[serde(rename_all = "camelCase")]
1076pub struct ResumeCapability {
1077 pub supported: bool,
1078 pub mode: String,
1080 #[serde(default)]
1081 #[serde(skip_serializing_if = "Option::is_none")]
1082 pub supports_fork: Option<bool>,
1083 #[serde(default)]
1084 #[serde(skip_serializing_if = "Option::is_none")]
1085 pub supports_list: Option<bool>,
1086}
1087
1088#[derive(Debug, Clone, Serialize, Deserialize)]
1090pub struct AcpPreset {
1091 pub id: String,
1093 pub name: String,
1095 pub command: String,
1096 pub args: Vec<String>,
1097 pub description: String,
1098 #[serde(default)]
1099 #[serde(skip_serializing_if = "Option::is_none")]
1100 pub env_bin_override: Option<String>,
1101 #[serde(default)]
1103 #[serde(skip_serializing_if = "Option::is_none")]
1104 pub resume: Option<ResumeCapability>,
1105}
1106
1107pub fn get_presets() -> Vec<AcpPreset> {
1109 vec![
1110 AcpPreset {
1111 id: "opencode".to_string(),
1112 name: "OpenCode".to_string(),
1113 command: "opencode".to_string(),
1114 args: vec!["acp".to_string()],
1115 description: "OpenCode AI coding agent".to_string(),
1116 env_bin_override: Some("OPENCODE_BIN".to_string()),
1117 resume: Some(ResumeCapability {
1118 supported: true,
1119 mode: "replay".to_string(),
1120 supports_fork: None,
1121 supports_list: None,
1122 }),
1123 },
1124 AcpPreset {
1125 id: "gemini".to_string(),
1126 name: "Gemini".to_string(),
1127 command: "gemini".to_string(),
1128 args: vec!["--experimental-acp".to_string()],
1129 description: "Google Gemini CLI".to_string(),
1130 env_bin_override: None,
1131 resume: None,
1132 },
1133 AcpPreset {
1134 id: "codex-acp".to_string(),
1135 name: "Codex".to_string(),
1136 command: "codex-acp".to_string(),
1137 args: vec![],
1138 description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1139 env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1140 resume: Some(ResumeCapability {
1141 supported: true,
1142 mode: "both".to_string(),
1143 supports_fork: None,
1144 supports_list: Some(true),
1145 }),
1146 },
1147 AcpPreset {
1148 id: "copilot".to_string(),
1149 name: "GitHub Copilot".to_string(),
1150 command: "copilot".to_string(),
1151 args: vec![
1152 "--acp".to_string(),
1153 "--allow-all-tools".to_string(),
1154 "--no-ask-user".to_string(),
1155 ],
1156 description: "GitHub Copilot CLI".to_string(),
1157 env_bin_override: Some("COPILOT_BIN".to_string()),
1158 resume: None,
1159 },
1160 AcpPreset {
1161 id: "auggie".to_string(),
1162 name: "Auggie".to_string(),
1163 command: "auggie".to_string(),
1164 args: vec!["--acp".to_string()],
1165 description: "Augment Code's AI agent".to_string(),
1166 env_bin_override: None,
1167 resume: None,
1168 },
1169 AcpPreset {
1170 id: "kimi".to_string(),
1171 name: "Kimi".to_string(),
1172 command: "kimi".to_string(),
1173 args: vec!["acp".to_string()],
1174 description: "Moonshot AI's Kimi CLI".to_string(),
1175 env_bin_override: None,
1176 resume: None,
1177 },
1178 AcpPreset {
1179 id: "kiro".to_string(),
1180 name: "Kiro".to_string(),
1181 command: "kiro-cli".to_string(),
1182 args: vec!["acp".to_string()],
1183 description: "Amazon Kiro AI coding agent".to_string(),
1184 env_bin_override: Some("KIRO_BIN".to_string()),
1185 resume: None,
1186 },
1187 AcpPreset {
1188 id: "qoder".to_string(),
1189 name: "Qoder".to_string(),
1190 command: "qodercli".to_string(),
1191 args: vec!["--acp".to_string()],
1192 description: "Qoder AI coding agent".to_string(),
1193 env_bin_override: Some("QODER_BIN".to_string()),
1194 resume: None,
1195 },
1196 AcpPreset {
1197 id: "claude".to_string(),
1198 name: "Claude Code".to_string(),
1199 command: "claude".to_string(),
1200 args: vec![],
1203 description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1204 env_bin_override: Some("CLAUDE_BIN".to_string()),
1205 resume: Some(ResumeCapability {
1206 supported: true,
1207 mode: "replay".to_string(),
1208 supports_fork: Some(true),
1209 supports_list: None,
1210 }),
1211 },
1212 ]
1213}
1214
1215pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1217 let normalized_id = match id {
1218 "codex" => "codex-acp",
1219 "qodercli" => "qoder",
1220 other => other,
1221 };
1222 get_presets().into_iter().find(|p| p.id == normalized_id)
1223}
1224
1225pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1227 get_preset_by_id(provider).and_then(|p| p.resume)
1228}
1229
1230pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1236 let normalized_id = match id {
1237 "codex" => "codex-acp",
1238 "qodercli" => "qoder",
1239 other => other,
1240 };
1241
1242 const REGISTRY_SUFFIX: &str = "-registry";
1245 if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1246 let mut preset = get_registry_preset(base_id).await?;
1247 preset.id = id.to_string();
1249 return Ok(preset);
1250 }
1251
1252 if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1254 if preset.id != id {
1255 preset.id = id.to_string();
1256 }
1257 return Ok(preset);
1258 }
1259
1260 let mut preset = get_registry_preset(normalized_id).await?;
1262 if preset.id != id {
1263 preset.id = id.to_string();
1264 }
1265 Ok(preset)
1266}
1267
1268async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1270 let registry: AcpRegistry = fetch_registry().await?;
1271
1272 let agent = registry
1274 .agents
1275 .into_iter()
1276 .find(|a| a.id == id)
1277 .ok_or_else(|| format!("Agent '{id}' not found in registry"))?;
1278
1279 let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1281 let mut args = vec!["-y".to_string(), npx.package.clone()];
1282 args.extend(npx.args.clone());
1283 ("npx".to_string(), args)
1284 } else if let Some(ref uvx) = agent.distribution.uvx {
1285 let mut args = vec![uvx.package.clone()];
1286 args.extend(uvx.args.clone());
1287 ("uvx".to_string(), args)
1288 } else {
1289 return Err(format!(
1290 "Agent '{id}' has no supported distribution (npx/uvx)"
1291 ));
1292 };
1293
1294 Ok(AcpPreset {
1295 id: agent.id.clone(),
1296 name: agent.name,
1297 command,
1298 args,
1299 description: agent.description,
1300 env_bin_override: None,
1301 resume: None,
1302 })
1303}
1304
1305fn resolve_preset_command(preset: &AcpPreset) -> String {
1306 if let Some(env_var) = &preset.env_bin_override {
1307 if let Ok(custom_command) = std::env::var(env_var) {
1308 let trimmed = custom_command.trim();
1309 if !trimmed.is_empty() {
1310 return trimmed.to_string();
1311 }
1312 }
1313 }
1314
1315 crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1316}
1317
1318fn truncate_content(text: &str, max_len: usize) -> String {
1322 if text.chars().count() <= max_len {
1323 text.to_string()
1324 } else if max_len <= 3 {
1325 text.chars().take(max_len).collect()
1326 } else {
1327 let truncated: String = text.chars().take(max_len - 3).collect();
1328 format!("{truncated}...")
1329 }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::{
1335 get_preset_by_id_with_registry, get_presets, truncate_content, validate_session_cwd,
1336 AcpManager, AcpSessionRecord,
1337 };
1338 use std::collections::HashMap;
1339 use std::fs;
1340 use std::sync::Arc;
1341 use tokio::sync::RwLock;
1342
1343 #[test]
1344 fn static_presets_include_codex_acp_for_codex_alias() {
1345 let presets = get_presets();
1346 assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1347 }
1348
1349 #[test]
1350 fn static_presets_include_qoder() {
1351 let presets = get_presets();
1352 assert!(presets.iter().any(|preset| preset.id == "qoder"));
1353 }
1354
1355 #[tokio::test]
1356 async fn qodercli_alias_resolves_to_qoder_preset() {
1357 let preset = get_preset_by_id_with_registry("qodercli")
1358 .await
1359 .expect("qodercli alias should resolve");
1360 assert_eq!(preset.id, "qodercli");
1361 assert_eq!(preset.command, "qodercli");
1362 assert_eq!(preset.args, vec!["--acp".to_string()]);
1363 }
1364
1365 #[test]
1366 fn validate_session_cwd_rejects_missing_or_non_directory_paths() {
1367 let temp = tempfile::tempdir().expect("tempdir should create");
1368 let missing = temp.path().join("missing-dir");
1369 let file_path = temp.path().join("not-a-dir.txt");
1370 fs::write(&file_path, "content").expect("file should write");
1371
1372 let missing_error = validate_session_cwd(missing.to_string_lossy().as_ref())
1373 .expect_err("missing directory should fail");
1374 assert!(missing_error.contains("directory does not exist"));
1375
1376 let file_error = validate_session_cwd(file_path.to_string_lossy().as_ref())
1377 .expect_err("file path should fail");
1378 assert!(file_error.contains("path is not a directory"));
1379
1380 validate_session_cwd(temp.path().to_string_lossy().as_ref())
1381 .expect("existing directory should pass");
1382 }
1383
1384 #[tokio::test]
1385 async fn mark_first_prompt_sent_updates_live_session_record() {
1386 let manager = AcpManager::new();
1387 let session_id = "session-1".to_string();
1388 manager.sessions.write().await.insert(
1389 session_id.clone(),
1390 AcpSessionRecord {
1391 session_id: session_id.clone(),
1392 name: None,
1393 cwd: ".".to_string(),
1394 workspace_id: "default".to_string(),
1395 routa_agent_id: None,
1396 provider: Some("opencode".to_string()),
1397 role: Some("CRAFTER".to_string()),
1398 mode_id: None,
1399 model: None,
1400 created_at: chrono::Utc::now().to_rfc3339(),
1401 first_prompt_sent: false,
1402 parent_session_id: None,
1403 specialist_id: None,
1404 specialist_system_prompt: None,
1405 },
1406 );
1407
1408 manager.mark_first_prompt_sent(&session_id).await;
1409
1410 let session = manager.get_session(&session_id).await.expect("session");
1411 assert!(session.first_prompt_sent);
1412 }
1413
1414 #[tokio::test]
1415 async fn push_to_history_skips_parent_child_forwarding_noise() {
1416 let manager = AcpManager {
1417 sessions: Arc::new(RwLock::new(HashMap::new())),
1418 processes: Arc::new(RwLock::new(HashMap::new())),
1419 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1420 history: Arc::new(RwLock::new(HashMap::new())),
1421 };
1422
1423 manager
1424 .push_to_history(
1425 "parent",
1426 serde_json::json!({
1427 "sessionId": "parent",
1428 "childAgentId": "child-1",
1429 "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1430 }),
1431 )
1432 .await;
1433
1434 let history = manager
1435 .get_session_history("parent")
1436 .await
1437 .unwrap_or_default();
1438 assert!(history.is_empty());
1439 }
1440
1441 #[tokio::test]
1442 async fn emit_session_update_broadcasts_when_channel_exists() {
1443 let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1444 let manager = AcpManager {
1445 sessions: Arc::new(RwLock::new(HashMap::new())),
1446 processes: Arc::new(RwLock::new(HashMap::new())),
1447 notification_channels: Arc::new(RwLock::new(HashMap::from([(
1448 "session-1".to_string(),
1449 tx,
1450 )]))),
1451 history: Arc::new(RwLock::new(HashMap::new())),
1452 };
1453
1454 manager
1455 .emit_session_update(
1456 "session-1",
1457 serde_json::json!({
1458 "sessionUpdate": "turn_complete",
1459 "stopReason": "cancelled"
1460 }),
1461 )
1462 .await
1463 .expect("emit should succeed");
1464
1465 let broadcast = rx.recv().await.expect("broadcast event");
1466 assert_eq!(
1467 broadcast["params"]["update"]["sessionUpdate"].as_str(),
1468 Some("turn_complete")
1469 );
1470 assert_eq!(
1471 broadcast["params"]["update"]["stopReason"].as_str(),
1472 Some("cancelled")
1473 );
1474 }
1475
1476 #[tokio::test]
1477 async fn emit_session_update_persists_history_without_channel() {
1478 let manager = AcpManager {
1479 sessions: Arc::new(RwLock::new(HashMap::new())),
1480 processes: Arc::new(RwLock::new(HashMap::new())),
1481 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1482 history: Arc::new(RwLock::new(HashMap::new())),
1483 };
1484
1485 manager
1486 .emit_session_update(
1487 "session-1",
1488 serde_json::json!({
1489 "sessionUpdate": "turn_complete",
1490 "stopReason": "cancelled"
1491 }),
1492 )
1493 .await
1494 .expect("emit should succeed");
1495
1496 let history = manager
1497 .get_session_history("session-1")
1498 .await
1499 .expect("history should exist");
1500 assert_eq!(history.len(), 1);
1501 assert_eq!(
1502 history[0]["update"]["sessionUpdate"].as_str(),
1503 Some("turn_complete")
1504 );
1505 }
1506
1507 #[test]
1508 fn rewrite_notification_session_id_overrides_provider_session_id() {
1509 let rewritten = AcpManager::rewrite_notification_session_id(
1510 "child-session",
1511 serde_json::json!({
1512 "sessionId": "provider-session",
1513 "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1514 }),
1515 );
1516
1517 assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1518 }
1519
1520 #[test]
1521 fn truncate_content_handles_unicode_boundaries() {
1522 assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1523 assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1524 assert_eq!(truncate_content("短文本", 10), "短文本");
1525 }
1526}