1pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51#[cfg(windows)]
52pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(rename_all = "camelCase")]
59pub struct AcpSessionRecord {
60 pub session_id: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub name: Option<String>,
63 pub cwd: String,
64 pub workspace_id: String,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub routa_agent_id: Option<String>,
67 pub provider: Option<String>,
68 pub role: Option<String>,
69 pub mode_id: Option<String>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub model: Option<String>,
72 pub created_at: String,
73 #[serde(default)]
74 pub first_prompt_sent: bool,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub parent_session_id: Option<String>,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub specialist_id: Option<String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub specialist_system_prompt: Option<String>,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct SessionLaunchOptions {
86 pub specialist_id: Option<String>,
87 pub specialist_system_prompt: Option<String>,
88 pub allowed_native_tools: Option<Vec<String>>,
89 pub initialize_timeout_ms: Option<u64>,
90 pub provider_args: Option<Vec<String>>,
91}
92
93#[derive(Clone)]
97enum AgentProcessType {
98 Acp(Arc<AcpProcess>),
100 Claude(Arc<ClaudeCodeProcess>),
102}
103
104impl AgentProcessType {
105 async fn kill(&self) {
107 match self {
108 AgentProcessType::Acp(process) => process.kill().await,
109 AgentProcessType::Claude(process) => process.kill().await,
110 }
111 }
112}
113
114struct ManagedProcess {
116 process: AgentProcessType,
117 acp_session_id: String,
119 preset_id: String,
120 #[allow(dead_code)]
121 created_at: String,
122 trace_writer: TraceWriter,
124 #[allow(dead_code)]
126 cwd: String,
127}
128
129#[derive(Clone)]
136pub struct AcpManager {
137 sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
139 processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
141 notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
143 history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
145}
146
147impl Default for AcpManager {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153impl AcpManager {
154 pub fn rewrite_notification_session_id(
155 session_id: &str,
156 mut notification: serde_json::Value,
157 ) -> serde_json::Value {
158 if let Some(object) = notification.as_object_mut() {
159 object.insert(
160 "sessionId".to_string(),
161 serde_json::Value::String(session_id.to_string()),
162 );
163 }
164 notification
165 }
166
167 pub fn new() -> Self {
168 Self {
169 sessions: Arc::new(RwLock::new(HashMap::new())),
170 processes: Arc::new(RwLock::new(HashMap::new())),
171 notification_channels: Arc::new(RwLock::new(HashMap::new())),
172 history: Arc::new(RwLock::new(HashMap::new())),
173 }
174 }
175
176 pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
178 let sessions = self.sessions.read().await;
179 sessions.values().cloned().collect()
180 }
181
182 pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
184 let sessions = self.sessions.read().await;
185 sessions.get(session_id).cloned()
186 }
187
188 pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
191 let mut sessions = self.sessions.write().await;
192 let session = sessions.get_mut(session_id)?;
193 session.name = Some(name.to_string());
194 Some(())
195 }
196
197 pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
200 let mut sessions = self.sessions.write().await;
201 let session = sessions.get_mut(session_id)?;
202 session.routa_agent_id = Some(routa_agent_id.to_string());
203 Some(())
204 }
205
206 pub async fn delete_session(&self, session_id: &str) -> Option<()> {
209 let mut sessions = self.sessions.write().await;
210 let mut processes = self.processes.write().await;
211 let mut channels = self.notification_channels.write().await;
212 let mut history = self.history.write().await;
213
214 sessions.remove(session_id)?;
216
217 if let Some(managed) = processes.remove(session_id) {
219 let _ = managed.process.kill().await;
220 }
221
222 channels.remove(session_id);
224
225 history.remove(session_id);
227
228 Some(())
229 }
230
231 pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
234 let history = self.history.read().await;
235 history.get(session_id).cloned()
236 }
237
238 pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
243 if notification.get("childAgentId").is_some() {
245 return;
246 }
247 let mut history = self.history.write().await;
248 let entries = history
249 .entry(session_id.to_string())
250 .or_insert_with(Vec::new);
251 entries.push(notification);
252 if entries.len() > 500 {
254 let drain_count = entries.len() - 500;
255 entries.drain(0..drain_count);
256 }
257 }
258
259 pub async fn emit_session_update(
261 &self,
262 session_id: &str,
263 update: serde_json::Value,
264 ) -> Result<(), String> {
265 let message = serde_json::json!({
266 "jsonrpc": "2.0",
267 "method": "session/update",
268 "params": {
269 "sessionId": session_id,
270 "update": update,
271 }
272 });
273
274 if let Some(channel) = self
275 .notification_channels
276 .read()
277 .await
278 .get(session_id)
279 .cloned()
280 {
281 let _ = channel.send(message.clone());
282 } else {
283 let params = message
284 .get("params")
285 .cloned()
286 .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
287 self.push_to_history(
288 session_id,
289 Self::rewrite_notification_session_id(session_id, params),
290 )
291 .await;
292 }
293 Ok(())
294 }
295
296 pub async fn mark_first_prompt_sent(&self, session_id: &str) {
298 let mut sessions = self.sessions.write().await;
299 if let Some(session) = sessions.get_mut(session_id) {
300 session.first_prompt_sent = true;
301 }
302 }
303
304 #[allow(clippy::too_many_arguments)]
310 pub async fn create_session(
311 &self,
312 session_id: String,
313 cwd: String,
314 workspace_id: String,
315 provider: Option<String>,
316 role: Option<String>,
317 model: Option<String>,
318 parent_session_id: Option<String>,
319 tool_mode: Option<String>,
320 mcp_profile: Option<String>,
321 ) -> Result<(String, String), String> {
322 self.create_session_with_options(
323 session_id,
324 cwd,
325 workspace_id,
326 provider,
327 role,
328 model,
329 parent_session_id,
330 tool_mode,
331 mcp_profile,
332 SessionLaunchOptions::default(),
333 )
334 .await
335 }
336
337 #[allow(clippy::too_many_arguments)]
341 pub async fn load_session_with_options(
342 &self,
343 session_id: String,
344 cwd: String,
345 workspace_id: String,
346 provider: Option<String>,
347 role: Option<String>,
348 model: Option<String>,
349 parent_session_id: Option<String>,
350 tool_mode: Option<String>,
351 mcp_profile: Option<String>,
352 provider_session_id: Option<String>,
353 options: SessionLaunchOptions,
354 ) -> Result<(String, String), String> {
355 let provider_name = provider.as_deref().unwrap_or("opencode");
356
357 if provider_name == "claude" {
358 return Err("Native session/load is not supported for Claude".to_string());
359 }
360
361 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
362 let preset = get_preset_by_id_with_registry(provider_name).await?;
363
364 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
365 provider_name,
366 &workspace_id,
367 &session_id,
368 tool_mode.as_deref(),
369 mcp_profile.as_deref(),
370 )
371 .await?
372 {
373 tracing::info!("[AcpManager] {}", summary);
374 }
375
376 let mut extra_args: Vec<String> = preset.args.clone();
377 if let Some(provider_args) = options.provider_args.clone() {
378 extra_args.extend(provider_args);
379 }
380 if let Some(ref m) = model {
381 if !m.is_empty() {
382 extra_args.push("-m".to_string());
383 extra_args.push(m.clone());
384 }
385 }
386
387 let preset_command = resolve_preset_command(&preset);
388 let process = AcpProcess::spawn(
389 &preset_command,
390 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
391 &cwd,
392 ntx.clone(),
393 &preset.name,
394 &session_id,
395 )
396 .await?;
397
398 process
399 .initialize_with_timeout(options.initialize_timeout_ms)
400 .await?;
401
402 let resolved_provider_session_id =
403 provider_session_id.unwrap_or_else(|| session_id.clone());
404 let acp_session_id = process
405 .load_session(&resolved_provider_session_id, &cwd)
406 .await?;
407
408 self.register_managed_session(
409 session_id.clone(),
410 cwd.clone(),
411 workspace_id.clone(),
412 provider_name.to_string(),
413 role.clone(),
414 model.clone(),
415 parent_session_id.clone(),
416 &options,
417 AgentProcessType::Acp(Arc::new(process)),
418 acp_session_id.clone(),
419 ntx.clone(),
420 )
421 .await;
422
423 tracing::info!(
424 "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
425 session_id,
426 provider_name,
427 acp_session_id,
428 );
429
430 Ok((session_id, acp_session_id))
431 }
432
433 #[allow(clippy::too_many_arguments)]
434 pub async fn load_session(
435 &self,
436 session_id: String,
437 cwd: String,
438 workspace_id: String,
439 provider: Option<String>,
440 role: Option<String>,
441 model: Option<String>,
442 parent_session_id: Option<String>,
443 tool_mode: Option<String>,
444 mcp_profile: Option<String>,
445 provider_session_id: Option<String>,
446 ) -> Result<(String, String), String> {
447 self.load_session_with_options(
448 session_id,
449 cwd,
450 workspace_id,
451 provider,
452 role,
453 model,
454 parent_session_id,
455 tool_mode,
456 mcp_profile,
457 provider_session_id,
458 SessionLaunchOptions::default(),
459 )
460 .await
461 }
462
463 fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
464 let history_manager = self.clone();
465 let history_session_id = session_id.to_string();
466 let mut history_rx = ntx.subscribe();
467 tokio::spawn(async move {
468 loop {
469 match history_rx.recv().await {
470 Ok(message) => {
471 let params = match message.get("params") {
472 Some(value) => value.clone(),
473 None => continue,
474 };
475 history_manager
476 .push_to_history(
477 &history_session_id,
478 Self::rewrite_notification_session_id(&history_session_id, params),
479 )
480 .await;
481 }
482 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
483 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
484 tracing::warn!(
485 "[AcpManager] Dropped {} session/update notifications for {}",
486 skipped,
487 history_session_id
488 );
489 }
490 }
491 }
492 });
493 }
494
495 #[allow(clippy::too_many_arguments)]
496 async fn register_managed_session(
497 &self,
498 session_id: String,
499 cwd: String,
500 workspace_id: String,
501 provider_name: String,
502 role: Option<String>,
503 model: Option<String>,
504 parent_session_id: Option<String>,
505 options: &SessionLaunchOptions,
506 process_type: AgentProcessType,
507 acp_session_id: String,
508 ntx: broadcast::Sender<serde_json::Value>,
509 ) {
510 let created_at = chrono::Utc::now().to_rfc3339();
511 let trace_writer = TraceWriter::new(&cwd);
512 let record = AcpSessionRecord {
513 session_id: session_id.clone(),
514 name: None,
515 cwd: cwd.clone(),
516 workspace_id: workspace_id.clone(),
517 routa_agent_id: None,
518 provider: Some(provider_name.clone()),
519 role: role.clone().or(Some("CRAFTER".to_string())),
520 mode_id: None,
521 model: model.clone(),
522 created_at: created_at.clone(),
523 first_prompt_sent: false,
524 parent_session_id: parent_session_id.clone(),
525 specialist_id: options.specialist_id.clone(),
526 specialist_system_prompt: options.specialist_system_prompt.clone(),
527 };
528
529 self.sessions
530 .write()
531 .await
532 .insert(session_id.clone(), record);
533 self.processes.write().await.insert(
534 session_id.clone(),
535 ManagedProcess {
536 process: process_type,
537 acp_session_id: acp_session_id.clone(),
538 preset_id: provider_name.clone(),
539 created_at,
540 trace_writer: trace_writer.clone(),
541 cwd: cwd.clone(),
542 },
543 );
544 self.notification_channels
545 .write()
546 .await
547 .insert(session_id.clone(), ntx.clone());
548 self.spawn_history_mirror(&session_id, &ntx);
549
550 let trace = TraceRecord::new(
551 &session_id,
552 TraceEventType::SessionStart,
553 Contributor::new(&provider_name, None),
554 )
555 .with_workspace_id(&workspace_id)
556 .with_metadata(
557 "role",
558 serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
559 )
560 .with_metadata("cwd", serde_json::json!(cwd));
561
562 trace_writer.append_safe(&trace).await;
563 }
564
565 #[allow(clippy::too_many_arguments)]
566 pub async fn create_session_from_inline(
567 &self,
568 session_id: String,
569 cwd: String,
570 workspace_id: String,
571 provider_name: String,
572 role: Option<String>,
573 model: Option<String>,
574 parent_session_id: Option<String>,
575 command: String,
576 args: Vec<String>,
577 options: SessionLaunchOptions,
578 ) -> Result<(String, String), String> {
579 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
580
581 let process = AcpProcess::spawn(
582 &command,
583 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
584 &cwd,
585 ntx.clone(),
586 &provider_name,
587 &session_id,
588 )
589 .await?;
590
591 process
592 .initialize_with_timeout(options.initialize_timeout_ms)
593 .await?;
594
595 let acp_session_id = process.new_session(&cwd).await?;
596 self.register_managed_session(
597 session_id.clone(),
598 cwd.clone(),
599 workspace_id.clone(),
600 provider_name.clone(),
601 role.clone(),
602 model.clone(),
603 parent_session_id.clone(),
604 &options,
605 AgentProcessType::Acp(Arc::new(process)),
606 acp_session_id.clone(),
607 ntx.clone(),
608 )
609 .await;
610
611 tracing::info!(
612 "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
613 session_id,
614 provider_name,
615 acp_session_id,
616 );
617
618 Ok((session_id, acp_session_id))
619 }
620
621 #[allow(clippy::too_many_arguments)]
622 pub async fn load_session_from_inline(
623 &self,
624 session_id: String,
625 cwd: String,
626 workspace_id: String,
627 provider_name: String,
628 role: Option<String>,
629 model: Option<String>,
630 parent_session_id: Option<String>,
631 command: String,
632 args: Vec<String>,
633 provider_session_id: Option<String>,
634 options: SessionLaunchOptions,
635 ) -> Result<(String, String), String> {
636 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
637
638 let process = AcpProcess::spawn(
639 &command,
640 &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
641 &cwd,
642 ntx.clone(),
643 &provider_name,
644 &session_id,
645 )
646 .await?;
647
648 process
649 .initialize_with_timeout(options.initialize_timeout_ms)
650 .await?;
651
652 let resolved_provider_session_id =
653 provider_session_id.unwrap_or_else(|| session_id.clone());
654 let acp_session_id = process
655 .load_session(&resolved_provider_session_id, &cwd)
656 .await?;
657
658 self.register_managed_session(
659 session_id.clone(),
660 cwd.clone(),
661 workspace_id.clone(),
662 provider_name.clone(),
663 role.clone(),
664 model.clone(),
665 parent_session_id.clone(),
666 &options,
667 AgentProcessType::Acp(Arc::new(process)),
668 acp_session_id.clone(),
669 ntx.clone(),
670 )
671 .await;
672
673 tracing::info!(
674 "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
675 session_id,
676 provider_name,
677 acp_session_id,
678 );
679
680 Ok((session_id, acp_session_id))
681 }
682
683 #[allow(clippy::too_many_arguments)]
684 pub async fn create_session_with_options(
685 &self,
686 session_id: String,
687 cwd: String,
688 workspace_id: String,
689 provider: Option<String>,
690 role: Option<String>,
691 model: Option<String>,
692 parent_session_id: Option<String>,
693 tool_mode: Option<String>,
694 mcp_profile: Option<String>,
695 options: SessionLaunchOptions,
696 ) -> Result<(String, String), String> {
697 let provider_name = provider.as_deref().unwrap_or("opencode");
698
699 let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
701 let claude_mcp_config = if provider_name == "claude" {
702 Some(mcp_setup::build_claude_mcp_config(
703 &workspace_id,
704 &session_id,
705 tool_mode.as_deref(),
706 mcp_profile.as_deref(),
707 ))
708 } else {
709 None
710 };
711
712 let (process_type, acp_session_id) = if provider_name == "claude" {
714 let config = ClaudeCodeConfig {
716 command: "claude".to_string(),
717 cwd: cwd.clone(),
718 display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
719 permission_mode: Some("bypassPermissions".to_string()),
720 mcp_configs: claude_mcp_config.into_iter().collect(),
721 append_system_prompt: options.specialist_system_prompt.clone(),
722 allowed_tools: options.allowed_native_tools.clone(),
723 };
724
725 let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
726 let claude_session_id = claude_process
727 .session_id()
728 .await
729 .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
730
731 (
732 AgentProcessType::Claude(Arc::new(claude_process)),
733 claude_session_id,
734 )
735 } else {
736 let preset = get_preset_by_id_with_registry(provider_name).await?;
738
739 if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
740 provider_name,
741 &workspace_id,
742 &session_id,
743 tool_mode.as_deref(),
744 mcp_profile.as_deref(),
745 )
746 .await?
747 {
748 tracing::info!("[AcpManager] {}", summary);
749 }
750
751 let mut extra_args: Vec<String> = preset.args.clone();
753 if let Some(provider_args) = options.provider_args.clone() {
754 extra_args.extend(provider_args);
755 }
756 if let Some(ref m) = model {
757 if !m.is_empty() {
758 extra_args.push("-m".to_string());
760 extra_args.push(m.clone());
761 }
762 }
763
764 let preset_command = resolve_preset_command(&preset);
765 let process = AcpProcess::spawn(
766 &preset_command,
767 &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
768 &cwd,
769 ntx.clone(),
770 &preset.name,
771 &session_id,
772 )
773 .await?;
774
775 process
777 .initialize_with_timeout(options.initialize_timeout_ms)
778 .await?;
779
780 let agent_session_id = process.new_session(&cwd).await?;
782
783 (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
784 };
785
786 self.register_managed_session(
787 session_id.clone(),
788 cwd.clone(),
789 workspace_id.clone(),
790 provider_name.to_string(),
791 role.clone(),
792 model.clone(),
793 parent_session_id.clone(),
794 &options,
795 process_type,
796 acp_session_id.clone(),
797 ntx.clone(),
798 )
799 .await;
800
801 tracing::info!(
802 "[AcpManager] Session {} created (provider: {}, agent session: {})",
803 session_id,
804 provider_name,
805 acp_session_id,
806 );
807
808 Ok((session_id, acp_session_id))
809 }
810
811 pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
813 self.mark_first_prompt_sent(session_id).await;
814
815 let (process, acp_session_id, preset_id, trace_writer) = {
816 let processes = self.processes.read().await;
817 let managed = processes
818 .get(session_id)
819 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
820 (
821 managed.process.clone(),
822 managed.acp_session_id.clone(),
823 managed.preset_id.clone(),
824 managed.trace_writer.clone(),
825 )
826 };
827
828 let is_alive = match &process {
829 AgentProcessType::Acp(p) => p.is_alive(),
830 AgentProcessType::Claude(p) => p.is_alive(),
831 };
832
833 if !is_alive {
834 return Err(format!("Agent ({}) process is not running", preset_id));
835 }
836
837 let trace = TraceRecord::new(
839 session_id,
840 TraceEventType::UserMessage,
841 Contributor::new(&preset_id, None),
842 )
843 .with_conversation(TraceConversation {
844 turn: None,
845 role: Some("user".to_string()),
846 content_preview: Some(truncate_content(text, 500)),
847 full_content: None,
848 });
849
850 trace_writer.append_safe(&trace).await;
851
852 tracing::info!(
853 target: "routa_acp_prompt",
854 session_id = %session_id,
855 preset_id = %preset_id,
856 acp_session_id = %acp_session_id,
857 prompt_len = text.len(),
858 "acp prompt start"
859 );
860
861 let result = match &process {
862 AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
863 AgentProcessType::Claude(p) => {
864 let stop_reason = p.prompt(text).await?;
865 Ok(serde_json::json!({ "stopReason": stop_reason }))
866 }
867 };
868
869 match &result {
870 Ok(_) => tracing::info!(
871 target: "routa_acp_prompt",
872 session_id = %session_id,
873 preset_id = %preset_id,
874 "acp prompt success"
875 ),
876 Err(error) => tracing::error!(
877 target: "routa_acp_prompt",
878 session_id = %session_id,
879 preset_id = %preset_id,
880 error = %error,
881 "acp prompt failed"
882 ),
883 }
884
885 result
886 }
887
888 pub async fn cancel(&self, session_id: &str) {
890 let processes = self.processes.read().await;
891 if let Some(managed) = processes.get(session_id) {
892 match &managed.process {
893 AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
894 AgentProcessType::Claude(p) => p.cancel().await,
895 }
896 }
897 }
898
899 pub async fn kill_session(&self, session_id: &str) {
901 if let Some(managed) = self.processes.write().await.remove(session_id) {
903 let trace = TraceRecord::new(
905 session_id,
906 TraceEventType::SessionEnd,
907 Contributor::new(&managed.preset_id, None),
908 );
909 managed.trace_writer.append_safe(&trace).await;
910
911 match &managed.process {
912 AgentProcessType::Acp(p) => p.kill().await,
913 AgentProcessType::Claude(p) => p.kill().await,
914 }
915 }
916 self.sessions.write().await.remove(session_id);
918 self.notification_channels.write().await.remove(session_id);
920 }
921
922 pub async fn subscribe(
925 &self,
926 session_id: &str,
927 ) -> Option<broadcast::Receiver<serde_json::Value>> {
928 let channels = self.notification_channels.read().await;
929 channels.get(session_id).map(|tx| tx.subscribe())
930 }
931
932 pub async fn is_alive(&self, session_id: &str) -> bool {
934 let processes = self.processes.read().await;
935 processes
936 .get(session_id)
937 .map(|m| match &m.process {
938 AgentProcessType::Acp(p) => p.is_alive(),
939 AgentProcessType::Claude(p) => p.is_alive(),
940 })
941 .unwrap_or(false)
942 }
943
944 pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
946 let processes = self.processes.read().await;
947 processes
948 .get(session_id)
949 .map(|managed| managed.acp_session_id.clone())
950 }
951
952 pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
954 let processes = self.processes.read().await;
955 processes.get(session_id).map(|m| m.preset_id.clone())
956 }
957
958 pub async fn is_claude_session(&self, session_id: &str) -> bool {
960 let processes = self.processes.read().await;
961 processes
962 .get(session_id)
963 .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
964 .unwrap_or(false)
965 }
966
967 pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
971 let processes = self.processes.read().await;
972 let managed = processes
973 .get(session_id)
974 .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
975
976 let trace = TraceRecord::new(
978 session_id,
979 TraceEventType::UserMessage,
980 Contributor::new(&managed.preset_id, None),
981 )
982 .with_conversation(TraceConversation {
983 turn: None,
984 role: Some("user".to_string()),
985 content_preview: Some(truncate_content(text, 500)),
986 full_content: Some(text.to_string()),
987 });
988
989 managed.trace_writer.append_safe(&trace).await;
990
991 match &managed.process {
992 AgentProcessType::Claude(p) => {
993 let process = Arc::clone(p);
995 let text = text.to_string();
996 tokio::spawn(async move {
997 let _ = process.prompt(&text).await;
998 });
999 Ok(())
1000 }
1001 AgentProcessType::Acp(_) => {
1002 Err("prompt_claude_async is only for Claude sessions".to_string())
1003 }
1004 }
1005 }
1006}
1007
1008#[derive(Debug, Clone, Serialize, Deserialize)]
1012#[serde(rename_all = "camelCase")]
1013pub struct ResumeCapability {
1014 pub supported: bool,
1015 pub mode: String,
1017 #[serde(default)]
1018 #[serde(skip_serializing_if = "Option::is_none")]
1019 pub supports_fork: Option<bool>,
1020 #[serde(default)]
1021 #[serde(skip_serializing_if = "Option::is_none")]
1022 pub supports_list: Option<bool>,
1023}
1024
1025#[derive(Debug, Clone, Serialize, Deserialize)]
1027pub struct AcpPreset {
1028 pub id: String,
1030 pub name: String,
1032 pub command: String,
1033 pub args: Vec<String>,
1034 pub description: String,
1035 #[serde(default)]
1036 #[serde(skip_serializing_if = "Option::is_none")]
1037 pub env_bin_override: Option<String>,
1038 #[serde(default)]
1040 #[serde(skip_serializing_if = "Option::is_none")]
1041 pub resume: Option<ResumeCapability>,
1042}
1043
1044pub fn get_presets() -> Vec<AcpPreset> {
1046 vec![
1047 AcpPreset {
1048 id: "opencode".to_string(),
1049 name: "OpenCode".to_string(),
1050 command: "opencode".to_string(),
1051 args: vec!["acp".to_string()],
1052 description: "OpenCode AI coding agent".to_string(),
1053 env_bin_override: Some("OPENCODE_BIN".to_string()),
1054 resume: Some(ResumeCapability { supported: true, mode: "replay".to_string(), supports_fork: None, supports_list: None }),
1055 },
1056 AcpPreset {
1057 id: "gemini".to_string(),
1058 name: "Gemini".to_string(),
1059 command: "gemini".to_string(),
1060 args: vec!["--experimental-acp".to_string()],
1061 description: "Google Gemini CLI".to_string(),
1062 env_bin_override: None,
1063 resume: None,
1064 },
1065 AcpPreset {
1066 id: "codex-acp".to_string(),
1067 name: "Codex".to_string(),
1068 command: "codex-acp".to_string(),
1069 args: vec![],
1070 description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1071 env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1072 resume: Some(ResumeCapability { supported: true, mode: "both".to_string(), supports_fork: None, supports_list: Some(true) }),
1073 },
1074 AcpPreset {
1075 id: "copilot".to_string(),
1076 name: "GitHub Copilot".to_string(),
1077 command: "copilot".to_string(),
1078 args: vec![
1079 "--acp".to_string(),
1080 "--allow-all-tools".to_string(),
1081 "--no-ask-user".to_string(),
1082 ],
1083 description: "GitHub Copilot CLI".to_string(),
1084 env_bin_override: Some("COPILOT_BIN".to_string()),
1085 resume: None,
1086 },
1087 AcpPreset {
1088 id: "auggie".to_string(),
1089 name: "Auggie".to_string(),
1090 command: "auggie".to_string(),
1091 args: vec!["--acp".to_string()],
1092 description: "Augment Code's AI agent".to_string(),
1093 env_bin_override: None,
1094 resume: None,
1095 },
1096 AcpPreset {
1097 id: "kimi".to_string(),
1098 name: "Kimi".to_string(),
1099 command: "kimi".to_string(),
1100 args: vec!["acp".to_string()],
1101 description: "Moonshot AI's Kimi CLI".to_string(),
1102 env_bin_override: None,
1103 resume: None,
1104 },
1105 AcpPreset {
1106 id: "kiro".to_string(),
1107 name: "Kiro".to_string(),
1108 command: "kiro-cli".to_string(),
1109 args: vec!["acp".to_string()],
1110 description: "Amazon Kiro AI coding agent".to_string(),
1111 env_bin_override: Some("KIRO_BIN".to_string()),
1112 resume: None,
1113 },
1114 AcpPreset {
1115 id: "qoder".to_string(),
1116 name: "Qoder".to_string(),
1117 command: "qodercli".to_string(),
1118 args: vec!["--acp".to_string()],
1119 description: "Qoder AI coding agent".to_string(),
1120 env_bin_override: Some("QODER_BIN".to_string()),
1121 resume: None,
1122 },
1123 AcpPreset {
1124 id: "claude".to_string(),
1125 name: "Claude Code".to_string(),
1126 command: "claude".to_string(),
1127 args: vec![],
1130 description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1131 env_bin_override: Some("CLAUDE_BIN".to_string()),
1132 resume: Some(ResumeCapability { supported: true, mode: "replay".to_string(), supports_fork: Some(true), supports_list: None }),
1133 },
1134 ]
1135}
1136
1137pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1139 let normalized_id = match id {
1140 "codex" => "codex-acp",
1141 "qodercli" => "qoder",
1142 other => other,
1143 };
1144 get_presets().into_iter().find(|p| p.id == normalized_id)
1145}
1146
1147pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1149 get_preset_by_id(provider).and_then(|p| p.resume)
1150}
1151
1152pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1158 let normalized_id = match id {
1159 "codex" => "codex-acp",
1160 "qodercli" => "qoder",
1161 other => other,
1162 };
1163
1164 const REGISTRY_SUFFIX: &str = "-registry";
1167 if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1168 let mut preset = get_registry_preset(base_id).await?;
1169 preset.id = id.to_string();
1171 return Ok(preset);
1172 }
1173
1174 if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1176 if preset.id != id {
1177 preset.id = id.to_string();
1178 }
1179 return Ok(preset);
1180 }
1181
1182 let mut preset = get_registry_preset(normalized_id).await?;
1184 if preset.id != id {
1185 preset.id = id.to_string();
1186 }
1187 Ok(preset)
1188}
1189
1190async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1192 let registry: AcpRegistry = fetch_registry().await?;
1193
1194 let agent = registry
1196 .agents
1197 .into_iter()
1198 .find(|a| a.id == id)
1199 .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
1200
1201 let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1203 let mut args = vec!["-y".to_string(), npx.package.clone()];
1204 args.extend(npx.args.clone());
1205 ("npx".to_string(), args)
1206 } else if let Some(ref uvx) = agent.distribution.uvx {
1207 let mut args = vec![uvx.package.clone()];
1208 args.extend(uvx.args.clone());
1209 ("uvx".to_string(), args)
1210 } else {
1211 return Err(format!(
1212 "Agent '{}' has no supported distribution (npx/uvx)",
1213 id
1214 ));
1215 };
1216
1217 Ok(AcpPreset {
1218 id: agent.id.clone(),
1219 name: agent.name,
1220 command,
1221 args,
1222 description: agent.description,
1223 env_bin_override: None,
1224 resume: None,
1225 })
1226}
1227
1228fn resolve_preset_command(preset: &AcpPreset) -> String {
1229 if let Some(env_var) = &preset.env_bin_override {
1230 if let Ok(custom_command) = std::env::var(env_var) {
1231 let trimmed = custom_command.trim();
1232 if !trimmed.is_empty() {
1233 return trimmed.to_string();
1234 }
1235 }
1236 }
1237
1238 crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1239}
1240
1241fn truncate_content(text: &str, max_len: usize) -> String {
1245 if text.chars().count() <= max_len {
1246 text.to_string()
1247 } else if max_len <= 3 {
1248 text.chars().take(max_len).collect()
1249 } else {
1250 let truncated: String = text.chars().take(max_len - 3).collect();
1251 format!("{truncated}...")
1252 }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257 use super::{
1258 get_preset_by_id_with_registry, get_presets, truncate_content, AcpManager, AcpSessionRecord,
1259 };
1260 use std::collections::HashMap;
1261 use std::sync::Arc;
1262 use tokio::sync::RwLock;
1263
1264 #[test]
1265 fn static_presets_include_codex_acp_for_codex_alias() {
1266 let presets = get_presets();
1267 assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1268 }
1269
1270 #[test]
1271 fn static_presets_include_qoder() {
1272 let presets = get_presets();
1273 assert!(presets.iter().any(|preset| preset.id == "qoder"));
1274 }
1275
1276 #[tokio::test]
1277 async fn qodercli_alias_resolves_to_qoder_preset() {
1278 let preset = get_preset_by_id_with_registry("qodercli")
1279 .await
1280 .expect("qodercli alias should resolve");
1281 assert_eq!(preset.id, "qodercli");
1282 assert_eq!(preset.command, "qodercli");
1283 assert_eq!(preset.args, vec!["--acp".to_string()]);
1284 }
1285
1286 #[tokio::test]
1287 async fn mark_first_prompt_sent_updates_live_session_record() {
1288 let manager = AcpManager::new();
1289 let session_id = "session-1".to_string();
1290 manager.sessions.write().await.insert(
1291 session_id.clone(),
1292 AcpSessionRecord {
1293 session_id: session_id.clone(),
1294 name: None,
1295 cwd: ".".to_string(),
1296 workspace_id: "default".to_string(),
1297 routa_agent_id: None,
1298 provider: Some("opencode".to_string()),
1299 role: Some("CRAFTER".to_string()),
1300 mode_id: None,
1301 model: None,
1302 created_at: chrono::Utc::now().to_rfc3339(),
1303 first_prompt_sent: false,
1304 parent_session_id: None,
1305 specialist_id: None,
1306 specialist_system_prompt: None,
1307 },
1308 );
1309
1310 manager.mark_first_prompt_sent(&session_id).await;
1311
1312 let session = manager.get_session(&session_id).await.expect("session");
1313 assert!(session.first_prompt_sent);
1314 }
1315
1316 #[tokio::test]
1317 async fn push_to_history_skips_parent_child_forwarding_noise() {
1318 let manager = AcpManager {
1319 sessions: Arc::new(RwLock::new(HashMap::new())),
1320 processes: Arc::new(RwLock::new(HashMap::new())),
1321 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1322 history: Arc::new(RwLock::new(HashMap::new())),
1323 };
1324
1325 manager
1326 .push_to_history(
1327 "parent",
1328 serde_json::json!({
1329 "sessionId": "parent",
1330 "childAgentId": "child-1",
1331 "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1332 }),
1333 )
1334 .await;
1335
1336 let history = manager
1337 .get_session_history("parent")
1338 .await
1339 .unwrap_or_default();
1340 assert!(history.is_empty());
1341 }
1342
1343 #[tokio::test]
1344 async fn emit_session_update_broadcasts_when_channel_exists() {
1345 let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1346 let manager = AcpManager {
1347 sessions: Arc::new(RwLock::new(HashMap::new())),
1348 processes: Arc::new(RwLock::new(HashMap::new())),
1349 notification_channels: Arc::new(RwLock::new(HashMap::from([(
1350 "session-1".to_string(),
1351 tx,
1352 )]))),
1353 history: Arc::new(RwLock::new(HashMap::new())),
1354 };
1355
1356 manager
1357 .emit_session_update(
1358 "session-1",
1359 serde_json::json!({
1360 "sessionUpdate": "turn_complete",
1361 "stopReason": "cancelled"
1362 }),
1363 )
1364 .await
1365 .expect("emit should succeed");
1366
1367 let broadcast = rx.recv().await.expect("broadcast event");
1368 assert_eq!(
1369 broadcast["params"]["update"]["sessionUpdate"].as_str(),
1370 Some("turn_complete")
1371 );
1372 assert_eq!(
1373 broadcast["params"]["update"]["stopReason"].as_str(),
1374 Some("cancelled")
1375 );
1376 }
1377
1378 #[tokio::test]
1379 async fn emit_session_update_persists_history_without_channel() {
1380 let manager = AcpManager {
1381 sessions: Arc::new(RwLock::new(HashMap::new())),
1382 processes: Arc::new(RwLock::new(HashMap::new())),
1383 notification_channels: Arc::new(RwLock::new(HashMap::new())),
1384 history: Arc::new(RwLock::new(HashMap::new())),
1385 };
1386
1387 manager
1388 .emit_session_update(
1389 "session-1",
1390 serde_json::json!({
1391 "sessionUpdate": "turn_complete",
1392 "stopReason": "cancelled"
1393 }),
1394 )
1395 .await
1396 .expect("emit should succeed");
1397
1398 let history = manager
1399 .get_session_history("session-1")
1400 .await
1401 .expect("history should exist");
1402 assert_eq!(history.len(), 1);
1403 assert_eq!(
1404 history[0]["update"]["sessionUpdate"].as_str(),
1405 Some("turn_complete")
1406 );
1407 }
1408
1409 #[test]
1410 fn rewrite_notification_session_id_overrides_provider_session_id() {
1411 let rewritten = AcpManager::rewrite_notification_session_id(
1412 "child-session",
1413 serde_json::json!({
1414 "sessionId": "provider-session",
1415 "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1416 }),
1417 );
1418
1419 assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1420 }
1421
1422 #[test]
1423 fn truncate_content_handles_unicode_boundaries() {
1424 assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1425 assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1426 assert_eq!(truncate_content("短文本", 10), "短文本");
1427 }
1428}