Skip to main content

routa_core/acp/
mod.rs

1//! ACP (Agent Client Protocol) integration.
2//!
3//! Manages ACP agent processes and provides JSON-RPC communication
4//! between the desktop client and coding agents (e.g. OpenCode, Claude, Copilot).
5//!
6//! Architecture (matches the Next.js `AcpProcessManager`):
7//!   - `session/new`    → spawns a child process, sends `initialize` + `session/new`
8//!   - `session/prompt` → reuses the live process, sends `session/prompt`
9//!   - `session/cancel` → sends cancellation notification
10//!   - SSE GET          → subscribes to `broadcast` channel for `session/update` events
11//!
12//! **Claude Code** uses a different protocol (stream-json) instead of ACP.
13//! The `ClaudeCodeProcess` translates Claude's output into ACP-compatible
14//! `session/update` notifications for frontend compatibility.
15//!
16//! **Agent Trace**: All sessions record trace events to JSONL files for
17//! attribution tracking (which model/session/tool affected which files and when).
18
19pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51// ─── Session Record ─────────────────────────────────────────────────────
52
53/// Record of an active ACP session persisted for UI listing.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct AcpSessionRecord {
57    pub session_id: String,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub name: Option<String>,
60    pub cwd: String,
61    pub workspace_id: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub routa_agent_id: Option<String>,
64    pub provider: Option<String>,
65    pub role: Option<String>,
66    pub mode_id: Option<String>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub model: Option<String>,
69    pub created_at: String,
70    #[serde(default)]
71    pub first_prompt_sent: bool,
72    /// Parent session ID for CRAFTER child sessions
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub parent_session_id: Option<String>,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub specialist_id: Option<String>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub specialist_system_prompt: Option<String>,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct SessionLaunchOptions {
83    pub specialist_id: Option<String>,
84    pub specialist_system_prompt: Option<String>,
85    pub allowed_native_tools: Option<Vec<String>>,
86    pub initialize_timeout_ms: Option<u64>,
87}
88
89// ─── Managed Process ────────────────────────────────────────────────────
90
91/// Process type enum to support both ACP and Claude stream-json protocols.
92#[derive(Clone)]
93enum AgentProcessType {
94    /// Standard ACP protocol (opencode, gemini, copilot, etc.)
95    Acp(Arc<AcpProcess>),
96    /// Claude Code stream-json protocol
97    Claude(Arc<ClaudeCodeProcess>),
98}
99
100impl AgentProcessType {
101    /// Kill the underlying process.
102    async fn kill(&self) {
103        match self {
104            AgentProcessType::Acp(process) => process.kill().await,
105            AgentProcessType::Claude(process) => process.kill().await,
106        }
107    }
108}
109
110/// A managed agent process with its metadata.
111struct ManagedProcess {
112    process: AgentProcessType,
113    /// The agent's own session ID (returned by `session/new` or claude's session_id).
114    acp_session_id: String,
115    preset_id: String,
116    #[allow(dead_code)]
117    created_at: String,
118    /// Trace writer for recording agent activities to JSONL
119    trace_writer: TraceWriter,
120    /// Working directory (for contributor context)
121    #[allow(dead_code)]
122    cwd: String,
123}
124
125// ─── ACP Manager ────────────────────────────────────────────────────────
126
127/// Manages ACP agent sessions and process lifecycle.
128///
129/// Each session maps to a long-lived child process that communicates via
130/// stdio JSON-RPC. Notifications are forwarded to subscribers via broadcast.
131#[derive(Clone)]
132pub struct AcpManager {
133    /// Our sessionId → session record (for UI listing)
134    sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
135    /// Our sessionId → managed process (the live agent)
136    processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
137    /// Our sessionId → broadcast sender for SSE notifications
138    notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
139    /// Our sessionId → message history (session/update notifications)
140    history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
141}
142
143impl Default for AcpManager {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl AcpManager {
150    pub fn rewrite_notification_session_id(
151        session_id: &str,
152        mut notification: serde_json::Value,
153    ) -> serde_json::Value {
154        if let Some(object) = notification.as_object_mut() {
155            object.insert(
156                "sessionId".to_string(),
157                serde_json::Value::String(session_id.to_string()),
158            );
159        }
160        notification
161    }
162
163    pub fn new() -> Self {
164        Self {
165            sessions: Arc::new(RwLock::new(HashMap::new())),
166            processes: Arc::new(RwLock::new(HashMap::new())),
167            notification_channels: Arc::new(RwLock::new(HashMap::new())),
168            history: Arc::new(RwLock::new(HashMap::new())),
169        }
170    }
171
172    /// List all session records.
173    pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
174        let sessions = self.sessions.read().await;
175        sessions.values().cloned().collect()
176    }
177
178    /// Get a session record by ID.
179    pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
180        let sessions = self.sessions.read().await;
181        sessions.get(session_id).cloned()
182    }
183
184    /// Rename a session.
185    /// Returns `Some(())` if the session was found and renamed, `None` if not found.
186    pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
187        let mut sessions = self.sessions.write().await;
188        let session = sessions.get_mut(session_id)?;
189        session.name = Some(name.to_string());
190        Some(())
191    }
192
193    /// Attach a ROUTA agent ID to an existing session record.
194    /// Returns `Some(())` if the session was found, `None` if not found.
195    pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
196        let mut sessions = self.sessions.write().await;
197        let session = sessions.get_mut(session_id)?;
198        session.routa_agent_id = Some(routa_agent_id.to_string());
199        Some(())
200    }
201
202    /// Delete a session.
203    /// Returns `Some(())` if the session was found and deleted, `None` if not found.
204    pub async fn delete_session(&self, session_id: &str) -> Option<()> {
205        let mut sessions = self.sessions.write().await;
206        let mut processes = self.processes.write().await;
207        let mut channels = self.notification_channels.write().await;
208        let mut history = self.history.write().await;
209
210        // Remove session record
211        sessions.remove(session_id)?;
212
213        // Kill the process if it exists
214        if let Some(managed) = processes.remove(session_id) {
215            let _ = managed.process.kill().await;
216        }
217
218        // Remove notification channel
219        channels.remove(session_id);
220
221        // Remove history
222        history.remove(session_id);
223
224        Some(())
225    }
226
227    /// Get session message history.
228    /// Returns `Some(history)` if the session exists, `None` if not found.
229    pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
230        let history = self.history.read().await;
231        history.get(session_id).cloned()
232    }
233
234    /// Add a notification to session history.
235    /// Child agent notifications (those with `childAgentId`) are NOT stored in the
236    /// parent session's history — they would flood out the ROUTA coordinator's own
237    /// messages. Child messages are persisted in their own child session's history.
238    pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
239        // Skip child agent notifications to prevent flooding parent history
240        if notification.get("childAgentId").is_some() {
241            return;
242        }
243        let mut history = self.history.write().await;
244        let entries = history
245            .entry(session_id.to_string())
246            .or_insert_with(Vec::new);
247        entries.push(notification);
248        // Cap at 500 entries (same limit as Next.js backend)
249        if entries.len() > 500 {
250            let drain_count = entries.len() - 500;
251            entries.drain(0..drain_count);
252        }
253    }
254
255    /// Broadcast a synthetic session/update event and persist it into in-memory history.
256    pub async fn emit_session_update(
257        &self,
258        session_id: &str,
259        update: serde_json::Value,
260    ) -> Result<(), String> {
261        let message = serde_json::json!({
262            "jsonrpc": "2.0",
263            "method": "session/update",
264            "params": {
265                "sessionId": session_id,
266                "update": update,
267            }
268        });
269
270        if let Some(channel) = self
271            .notification_channels
272            .read()
273            .await
274            .get(session_id)
275            .cloned()
276        {
277            let _ = channel.send(message.clone());
278        } else {
279            let params = message
280                .get("params")
281                .cloned()
282                .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
283            self.push_to_history(
284                session_id,
285                Self::rewrite_notification_session_id(session_id, params),
286            )
287            .await;
288        }
289        Ok(())
290    }
291
292    /// Mark a session as having had its first prompt dispatched.
293    pub async fn mark_first_prompt_sent(&self, session_id: &str) {
294        let mut sessions = self.sessions.write().await;
295        if let Some(session) = sessions.get_mut(session_id) {
296            session.first_prompt_sent = true;
297        }
298    }
299
300    /// Create a new ACP session: spawn agent process, initialize, create session.
301    /// Supports both static presets and registry-based agents.
302    /// **Claude** uses stream-json protocol instead of ACP.
303    ///
304    /// Returns `(our_session_id, agent_session_id)`.
305    #[allow(clippy::too_many_arguments)]
306    pub async fn create_session(
307        &self,
308        session_id: String,
309        cwd: String,
310        workspace_id: String,
311        provider: Option<String>,
312        role: Option<String>,
313        model: Option<String>,
314        parent_session_id: Option<String>,
315        tool_mode: Option<String>,
316        mcp_profile: Option<String>,
317    ) -> Result<(String, String), String> {
318        self.create_session_with_options(
319            session_id,
320            cwd,
321            workspace_id,
322            provider,
323            role,
324            model,
325            parent_session_id,
326            tool_mode,
327            mcp_profile,
328            SessionLaunchOptions::default(),
329        )
330        .await
331    }
332
333    #[allow(clippy::too_many_arguments)]
334    pub async fn create_session_with_options(
335        &self,
336        session_id: String,
337        cwd: String,
338        workspace_id: String,
339        provider: Option<String>,
340        role: Option<String>,
341        model: Option<String>,
342        parent_session_id: Option<String>,
343        tool_mode: Option<String>,
344        mcp_profile: Option<String>,
345        options: SessionLaunchOptions,
346    ) -> Result<(String, String), String> {
347        let provider_name = provider.as_deref().unwrap_or("opencode");
348
349        // Create the notification broadcast channel for this session
350        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
351        let claude_mcp_config = if provider_name == "claude" {
352            Some(mcp_setup::build_claude_mcp_config(
353                &workspace_id,
354                &session_id,
355                tool_mode.as_deref(),
356                mcp_profile.as_deref(),
357            ))
358        } else {
359            None
360        };
361
362        // Check if this is Claude (uses stream-json protocol, not ACP)
363        let (process_type, acp_session_id) = if provider_name == "claude" {
364            // Use Claude Code stream-json protocol
365            let config = ClaudeCodeConfig {
366                command: "claude".to_string(),
367                cwd: cwd.clone(),
368                display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
369                permission_mode: Some("bypassPermissions".to_string()),
370                mcp_configs: claude_mcp_config.into_iter().collect(),
371                append_system_prompt: options.specialist_system_prompt.clone(),
372                allowed_tools: options.allowed_native_tools.clone(),
373            };
374
375            let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
376            let claude_session_id = claude_process
377                .session_id()
378                .await
379                .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
380
381            (
382                AgentProcessType::Claude(Arc::new(claude_process)),
383                claude_session_id,
384            )
385        } else {
386            // Use standard ACP protocol
387            let preset = get_preset_by_id_with_registry(provider_name).await?;
388
389            if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
390                provider_name,
391                &workspace_id,
392                &session_id,
393                tool_mode.as_deref(),
394                mcp_profile.as_deref(),
395            )
396            .await?
397            {
398                tracing::info!("[AcpManager] {}", summary);
399            }
400
401            // Build args: preset args + optional model flag
402            let mut extra_args: Vec<String> = preset.args.clone();
403            if let Some(ref m) = model {
404                if !m.is_empty() {
405                    // opencode (and future providers) accept -m <model>
406                    extra_args.push("-m".to_string());
407                    extra_args.push(m.clone());
408                }
409            }
410
411            let preset_command = resolve_preset_command(&preset);
412            let process = AcpProcess::spawn(
413                &preset_command,
414                &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
415                &cwd,
416                ntx.clone(),
417                &preset.name,
418                &session_id,
419            )
420            .await?;
421
422            // Initialize the protocol
423            process
424                .initialize_with_timeout(options.initialize_timeout_ms)
425                .await?;
426
427            // Create the agent session
428            let agent_session_id = process.new_session(&cwd).await?;
429
430            (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
431        };
432
433        // Create TraceWriter for this session
434        let trace_writer = TraceWriter::new(&cwd);
435
436        // Store everything
437        let record = AcpSessionRecord {
438            session_id: session_id.clone(),
439            name: None,
440            cwd: cwd.clone(),
441            workspace_id: workspace_id.clone(),
442            routa_agent_id: None,
443            provider: Some(provider_name.to_string()),
444            role: role.clone().or(Some("CRAFTER".to_string())),
445            mode_id: None,
446            model: model.clone(),
447            created_at: chrono::Utc::now().to_rfc3339(),
448            first_prompt_sent: false,
449            parent_session_id: parent_session_id.clone(),
450            specialist_id: options.specialist_id.clone(),
451            specialist_system_prompt: options.specialist_system_prompt.clone(),
452        };
453
454        self.sessions
455            .write()
456            .await
457            .insert(session_id.clone(), record);
458
459        self.processes.write().await.insert(
460            session_id.clone(),
461            ManagedProcess {
462                process: process_type,
463                acp_session_id: acp_session_id.clone(),
464                preset_id: provider_name.to_string(),
465                created_at: chrono::Utc::now().to_rfc3339(),
466                trace_writer: trace_writer.clone(),
467                cwd: cwd.clone(),
468            },
469        );
470
471        self.notification_channels
472            .write()
473            .await
474            .insert(session_id.clone(), ntx.clone());
475
476        // Keep an in-memory transcript for all live sessions, including prompts
477        // dispatched internally by orchestration instead of the HTTP ACP route.
478        let history_manager = self.clone();
479        let history_session_id = session_id.clone();
480        let mut history_rx = ntx.subscribe();
481        tokio::spawn(async move {
482            loop {
483                match history_rx.recv().await {
484                    Ok(message) => {
485                        let params = match message.get("params") {
486                            Some(value) => value.clone(),
487                            None => continue,
488                        };
489                        history_manager
490                            .push_to_history(
491                                &history_session_id,
492                                Self::rewrite_notification_session_id(&history_session_id, params),
493                            )
494                            .await;
495                    }
496                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
497                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
498                        tracing::warn!(
499                            "[AcpManager] Dropped {} session/update notifications for {}",
500                            skipped,
501                            history_session_id
502                        );
503                    }
504                }
505            }
506        });
507
508        // Record SessionStart trace
509        let trace = TraceRecord::new(
510            &session_id,
511            TraceEventType::SessionStart,
512            Contributor::new(provider_name, None),
513        )
514        .with_workspace_id(&workspace_id)
515        .with_metadata(
516            "role",
517            serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
518        )
519        .with_metadata("cwd", serde_json::json!(cwd));
520
521        trace_writer.append_safe(&trace).await;
522
523        tracing::info!(
524            "[AcpManager] Session {} created (provider: {}, agent session: {})",
525            session_id,
526            provider_name,
527            acp_session_id,
528        );
529
530        Ok((session_id, acp_session_id))
531    }
532
533    /// Send a prompt to an existing session's agent process.
534    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
535        self.mark_first_prompt_sent(session_id).await;
536
537        let (process, acp_session_id, preset_id, trace_writer) = {
538            let processes = self.processes.read().await;
539            let managed = processes
540                .get(session_id)
541                .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
542            (
543                managed.process.clone(),
544                managed.acp_session_id.clone(),
545                managed.preset_id.clone(),
546                managed.trace_writer.clone(),
547            )
548        };
549
550        let is_alive = match &process {
551            AgentProcessType::Acp(p) => p.is_alive(),
552            AgentProcessType::Claude(p) => p.is_alive(),
553        };
554
555        if !is_alive {
556            return Err(format!("Agent ({}) process is not running", preset_id));
557        }
558
559        // Record UserMessage trace
560        let trace = TraceRecord::new(
561            session_id,
562            TraceEventType::UserMessage,
563            Contributor::new(&preset_id, None),
564        )
565        .with_conversation(TraceConversation {
566            turn: None,
567            role: Some("user".to_string()),
568            content_preview: Some(truncate_content(text, 500)),
569            full_content: None,
570        });
571
572        trace_writer.append_safe(&trace).await;
573
574        tracing::info!(
575            target: "routa_acp_prompt",
576            session_id = %session_id,
577            preset_id = %preset_id,
578            acp_session_id = %acp_session_id,
579            prompt_len = text.len(),
580            "acp prompt start"
581        );
582
583        let result = match &process {
584            AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
585            AgentProcessType::Claude(p) => {
586                let stop_reason = p.prompt(text).await?;
587                Ok(serde_json::json!({ "stopReason": stop_reason }))
588            }
589        };
590
591        match &result {
592            Ok(_) => tracing::info!(
593                target: "routa_acp_prompt",
594                session_id = %session_id,
595                preset_id = %preset_id,
596                "acp prompt success"
597            ),
598            Err(error) => tracing::error!(
599                target: "routa_acp_prompt",
600                session_id = %session_id,
601                preset_id = %preset_id,
602                error = %error,
603                "acp prompt failed"
604            ),
605        }
606
607        result
608    }
609
610    /// Cancel the current prompt in a session.
611    pub async fn cancel(&self, session_id: &str) {
612        let processes = self.processes.read().await;
613        if let Some(managed) = processes.get(session_id) {
614            match &managed.process {
615                AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
616                AgentProcessType::Claude(p) => p.cancel().await,
617            }
618        }
619    }
620
621    /// Kill a session's agent process and remove it.
622    pub async fn kill_session(&self, session_id: &str) {
623        // Kill the process
624        if let Some(managed) = self.processes.write().await.remove(session_id) {
625            // Record SessionEnd trace before killing
626            let trace = TraceRecord::new(
627                session_id,
628                TraceEventType::SessionEnd,
629                Contributor::new(&managed.preset_id, None),
630            );
631            managed.trace_writer.append_safe(&trace).await;
632
633            match &managed.process {
634                AgentProcessType::Acp(p) => p.kill().await,
635                AgentProcessType::Claude(p) => p.kill().await,
636            }
637        }
638        // Remove session record
639        self.sessions.write().await.remove(session_id);
640        // Remove notification channel
641        self.notification_channels.write().await.remove(session_id);
642    }
643
644    /// Subscribe to SSE notifications for a session.
645    /// Returns a broadcast receiver that yields `session/update` JSON-RPC messages.
646    pub async fn subscribe(
647        &self,
648        session_id: &str,
649    ) -> Option<broadcast::Receiver<serde_json::Value>> {
650        let channels = self.notification_channels.read().await;
651        channels.get(session_id).map(|tx| tx.subscribe())
652    }
653
654    /// Check if a session's agent process is alive.
655    pub async fn is_alive(&self, session_id: &str) -> bool {
656        let processes = self.processes.read().await;
657        processes
658            .get(session_id)
659            .map(|m| match &m.process {
660                AgentProcessType::Acp(p) => p.is_alive(),
661                AgentProcessType::Claude(p) => p.is_alive(),
662            })
663            .unwrap_or(false)
664    }
665
666    /// Get the preset ID for a session.
667    pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
668        let processes = self.processes.read().await;
669        processes.get(session_id).map(|m| m.preset_id.clone())
670    }
671
672    /// Check if a session uses Claude (stream-json protocol, not ACP).
673    pub async fn is_claude_session(&self, session_id: &str) -> bool {
674        let processes = self.processes.read().await;
675        processes
676            .get(session_id)
677            .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
678            .unwrap_or(false)
679    }
680
681    /// Send a prompt to Claude session and return immediately.
682    /// The actual response is streamed via the broadcast channel.
683    /// Use `subscribe()` to receive notifications.
684    pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
685        let processes = self.processes.read().await;
686        let managed = processes
687            .get(session_id)
688            .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
689
690        // Record trace
691        let trace = TraceRecord::new(
692            session_id,
693            TraceEventType::UserMessage,
694            Contributor::new(&managed.preset_id, None),
695        )
696        .with_conversation(TraceConversation {
697            turn: None,
698            role: Some("user".to_string()),
699            content_preview: Some(text[..text.len().min(200)].to_string()),
700            full_content: Some(text.to_string()),
701        });
702
703        managed.trace_writer.append_safe(&trace).await;
704
705        match &managed.process {
706            AgentProcessType::Claude(p) => {
707                // Spawn the prompt in a background task so we can return immediately
708                let process = Arc::clone(p);
709                let text = text.to_string();
710                tokio::spawn(async move {
711                    let _ = process.prompt(&text).await;
712                });
713                Ok(())
714            }
715            AgentProcessType::Acp(_) => {
716                Err("prompt_claude_async is only for Claude sessions".to_string())
717            }
718        }
719    }
720}
721
722// ─── ACP Presets ────────────────────────────────────────────────────────
723
724/// ACP provider presets for known coding agents.
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct AcpPreset {
727    /// Unique identifier (lowercase, e.g., "claude", "opencode")
728    pub id: String,
729    /// Human-readable display name (e.g., "Claude Code", "OpenCode")
730    pub name: String,
731    pub command: String,
732    pub args: Vec<String>,
733    pub description: String,
734    #[serde(default)]
735    #[serde(skip_serializing_if = "Option::is_none")]
736    pub env_bin_override: Option<String>,
737}
738
739/// Get the list of known ACP agent presets (static/builtin only).
740pub fn get_presets() -> Vec<AcpPreset> {
741    vec![
742        AcpPreset {
743            id: "opencode".to_string(),
744            name: "OpenCode".to_string(),
745            command: "opencode".to_string(),
746            args: vec!["acp".to_string()],
747            description: "OpenCode AI coding agent".to_string(),
748            env_bin_override: Some("OPENCODE_BIN".to_string()),
749        },
750        AcpPreset {
751            id: "gemini".to_string(),
752            name: "Gemini".to_string(),
753            command: "gemini".to_string(),
754            args: vec!["--experimental-acp".to_string()],
755            description: "Google Gemini CLI".to_string(),
756            env_bin_override: None,
757        },
758        AcpPreset {
759            id: "codex-acp".to_string(),
760            name: "Codex".to_string(),
761            command: "codex-acp".to_string(),
762            args: vec![],
763            description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
764            env_bin_override: Some("CODEX_ACP_BIN".to_string()),
765        },
766        AcpPreset {
767            id: "copilot".to_string(),
768            name: "GitHub Copilot".to_string(),
769            command: "copilot".to_string(),
770            args: vec![
771                "--acp".to_string(),
772                "--allow-all-tools".to_string(),
773                "--no-ask-user".to_string(),
774            ],
775            description: "GitHub Copilot CLI".to_string(),
776            env_bin_override: Some("COPILOT_BIN".to_string()),
777        },
778        AcpPreset {
779            id: "auggie".to_string(),
780            name: "Auggie".to_string(),
781            command: "auggie".to_string(),
782            args: vec!["--acp".to_string()],
783            description: "Augment Code's AI agent".to_string(),
784            env_bin_override: None,
785        },
786        AcpPreset {
787            id: "kimi".to_string(),
788            name: "Kimi".to_string(),
789            command: "kimi".to_string(),
790            args: vec!["acp".to_string()],
791            description: "Moonshot AI's Kimi CLI".to_string(),
792            env_bin_override: None,
793        },
794        AcpPreset {
795            id: "kiro".to_string(),
796            name: "Kiro".to_string(),
797            command: "kiro-cli".to_string(),
798            args: vec!["acp".to_string()],
799            description: "Amazon Kiro AI coding agent".to_string(),
800            env_bin_override: Some("KIRO_BIN".to_string()),
801        },
802        AcpPreset {
803            id: "qoder".to_string(),
804            name: "Qoder".to_string(),
805            command: "qodercli".to_string(),
806            args: vec!["--acp".to_string()],
807            description: "Qoder AI coding agent".to_string(),
808            env_bin_override: Some("QODER_BIN".to_string()),
809        },
810        AcpPreset {
811            id: "claude".to_string(),
812            name: "Claude Code".to_string(),
813            command: "claude".to_string(),
814            // Claude Code uses stream-json protocol, not ACP flags
815            // Args are unused since we use ClaudeCodeProcess directly
816            args: vec![],
817            description: "Anthropic Claude Code (stream-json protocol)".to_string(),
818            env_bin_override: Some("CLAUDE_BIN".to_string()),
819        },
820    ]
821}
822
823/// Get a preset by ID, checking both static presets and registry.
824/// Static presets take precedence.
825///
826/// Supports suffixed IDs like "auggie-registry" to explicitly request
827/// the registry version when both built-in and registry versions exist.
828pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
829    let normalized_id = match id {
830        "codex" => "codex-acp",
831        "qodercli" => "qoder",
832        other => other,
833    };
834
835    // Handle suffixed IDs (e.g., "auggie-registry")
836    // This allows explicit selection of registry version when both exist
837    const REGISTRY_SUFFIX: &str = "-registry";
838    if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
839        let mut preset = get_registry_preset(base_id).await?;
840        // Keep the suffixed ID in the returned preset for consistency
841        preset.id = id.to_string();
842        return Ok(preset);
843    }
844
845    // Check static presets first (match by id, not name)
846    if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
847        if preset.id != id {
848            preset.id = id.to_string();
849        }
850        return Ok(preset);
851    }
852
853    // Fall back to registry
854    let mut preset = get_registry_preset(normalized_id).await?;
855    if preset.id != id {
856        preset.id = id.to_string();
857    }
858    Ok(preset)
859}
860
861/// Get a preset from the ACP registry by ID.
862async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
863    let registry: AcpRegistry = fetch_registry().await?;
864
865    // Find the agent
866    let agent = registry
867        .agents
868        .into_iter()
869        .find(|a| a.id == id)
870        .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
871
872    // Build command from distribution
873    let (command, args) = if let Some(ref npx) = agent.distribution.npx {
874        let mut args = vec!["-y".to_string(), npx.package.clone()];
875        args.extend(npx.args.clone());
876        ("npx".to_string(), args)
877    } else if let Some(ref uvx) = agent.distribution.uvx {
878        let mut args = vec![uvx.package.clone()];
879        args.extend(uvx.args.clone());
880        ("uvx".to_string(), args)
881    } else {
882        return Err(format!(
883            "Agent '{}' has no supported distribution (npx/uvx)",
884            id
885        ));
886    };
887
888    Ok(AcpPreset {
889        id: agent.id.clone(),
890        name: agent.name,
891        command,
892        args,
893        description: agent.description,
894        env_bin_override: None,
895    })
896}
897
898fn resolve_preset_command(preset: &AcpPreset) -> String {
899    if let Some(env_var) = &preset.env_bin_override {
900        if let Ok(custom_command) = std::env::var(env_var) {
901            let trimmed = custom_command.trim();
902            if !trimmed.is_empty() {
903                return trimmed.to_string();
904            }
905        }
906    }
907
908    crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
909}
910
911// ─── Utility Functions ─────────────────────────────────────────────────────
912
913/// Truncate content to a maximum length for storage in traces.
914fn truncate_content(text: &str, max_len: usize) -> String {
915    if text.chars().count() <= max_len {
916        text.to_string()
917    } else if max_len <= 3 {
918        text.chars().take(max_len).collect()
919    } else {
920        let truncated: String = text.chars().take(max_len - 3).collect();
921        format!("{truncated}...")
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use super::{
928        get_preset_by_id_with_registry, get_presets, truncate_content, AcpManager, AcpSessionRecord,
929    };
930    use std::collections::HashMap;
931    use std::sync::Arc;
932    use tokio::sync::RwLock;
933
934    #[test]
935    fn static_presets_include_codex_acp_for_codex_alias() {
936        let presets = get_presets();
937        assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
938    }
939
940    #[test]
941    fn static_presets_include_qoder() {
942        let presets = get_presets();
943        assert!(presets.iter().any(|preset| preset.id == "qoder"));
944    }
945
946    #[tokio::test]
947    async fn qodercli_alias_resolves_to_qoder_preset() {
948        let preset = get_preset_by_id_with_registry("qodercli")
949            .await
950            .expect("qodercli alias should resolve");
951        assert_eq!(preset.id, "qodercli");
952        assert_eq!(preset.command, "qodercli");
953        assert_eq!(preset.args, vec!["--acp".to_string()]);
954    }
955
956    #[tokio::test]
957    async fn mark_first_prompt_sent_updates_live_session_record() {
958        let manager = AcpManager::new();
959        let session_id = "session-1".to_string();
960        manager.sessions.write().await.insert(
961            session_id.clone(),
962            AcpSessionRecord {
963                session_id: session_id.clone(),
964                name: None,
965                cwd: ".".to_string(),
966                workspace_id: "default".to_string(),
967                routa_agent_id: None,
968                provider: Some("opencode".to_string()),
969                role: Some("CRAFTER".to_string()),
970                mode_id: None,
971                model: None,
972                created_at: chrono::Utc::now().to_rfc3339(),
973                first_prompt_sent: false,
974                parent_session_id: None,
975                specialist_id: None,
976                specialist_system_prompt: None,
977            },
978        );
979
980        manager.mark_first_prompt_sent(&session_id).await;
981
982        let session = manager.get_session(&session_id).await.expect("session");
983        assert!(session.first_prompt_sent);
984    }
985
986    #[tokio::test]
987    async fn push_to_history_skips_parent_child_forwarding_noise() {
988        let manager = AcpManager {
989            sessions: Arc::new(RwLock::new(HashMap::new())),
990            processes: Arc::new(RwLock::new(HashMap::new())),
991            notification_channels: Arc::new(RwLock::new(HashMap::new())),
992            history: Arc::new(RwLock::new(HashMap::new())),
993        };
994
995        manager
996            .push_to_history(
997                "parent",
998                serde_json::json!({
999                    "sessionId": "parent",
1000                    "childAgentId": "child-1",
1001                    "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1002                }),
1003            )
1004            .await;
1005
1006        let history = manager
1007            .get_session_history("parent")
1008            .await
1009            .unwrap_or_default();
1010        assert!(history.is_empty());
1011    }
1012
1013    #[tokio::test]
1014    async fn emit_session_update_broadcasts_when_channel_exists() {
1015        let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1016        let manager = AcpManager {
1017            sessions: Arc::new(RwLock::new(HashMap::new())),
1018            processes: Arc::new(RwLock::new(HashMap::new())),
1019            notification_channels: Arc::new(RwLock::new(HashMap::from([(
1020                "session-1".to_string(),
1021                tx,
1022            )]))),
1023            history: Arc::new(RwLock::new(HashMap::new())),
1024        };
1025
1026        manager
1027            .emit_session_update(
1028                "session-1",
1029                serde_json::json!({
1030                    "sessionUpdate": "turn_complete",
1031                    "stopReason": "cancelled"
1032                }),
1033            )
1034            .await
1035            .expect("emit should succeed");
1036
1037        let broadcast = rx.recv().await.expect("broadcast event");
1038        assert_eq!(
1039            broadcast["params"]["update"]["sessionUpdate"].as_str(),
1040            Some("turn_complete")
1041        );
1042        assert_eq!(
1043            broadcast["params"]["update"]["stopReason"].as_str(),
1044            Some("cancelled")
1045        );
1046    }
1047
1048    #[tokio::test]
1049    async fn emit_session_update_persists_history_without_channel() {
1050        let manager = AcpManager {
1051            sessions: Arc::new(RwLock::new(HashMap::new())),
1052            processes: Arc::new(RwLock::new(HashMap::new())),
1053            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1054            history: Arc::new(RwLock::new(HashMap::new())),
1055        };
1056
1057        manager
1058            .emit_session_update(
1059                "session-1",
1060                serde_json::json!({
1061                    "sessionUpdate": "turn_complete",
1062                    "stopReason": "cancelled"
1063                }),
1064            )
1065            .await
1066            .expect("emit should succeed");
1067
1068        let history = manager
1069            .get_session_history("session-1")
1070            .await
1071            .expect("history should exist");
1072        assert_eq!(history.len(), 1);
1073        assert_eq!(
1074            history[0]["update"]["sessionUpdate"].as_str(),
1075            Some("turn_complete")
1076        );
1077    }
1078
1079    #[test]
1080    fn rewrite_notification_session_id_overrides_provider_session_id() {
1081        let rewritten = AcpManager::rewrite_notification_session_id(
1082            "child-session",
1083            serde_json::json!({
1084                "sessionId": "provider-session",
1085                "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1086            }),
1087        );
1088
1089        assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1090    }
1091
1092    #[test]
1093    fn truncate_content_handles_unicode_boundaries() {
1094        assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1095        assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1096        assert_eq!(truncate_content("短文本", 10), "短文本");
1097    }
1098}