Skip to main content

routa_core/acp/
mod.rs

1//! ACP (Agent Client Protocol) integration.
2//!
3//! Manages ACP agent processes and provides JSON-RPC communication
4//! between the desktop client and coding agents (e.g. OpenCode, Claude, Copilot).
5//!
6//! Architecture (matches the Next.js `AcpProcessManager`):
7//!   - `session/new`    → spawns a child process, sends `initialize` + `session/new`
8//!   - `session/prompt` → reuses the live process, sends `session/prompt`
9//!   - `session/cancel` → sends cancellation notification
10//!   - SSE GET          → subscribes to `broadcast` channel for `session/update` events
11//!
12//! **Claude Code** uses a different protocol (stream-json) instead of ACP.
13//! The `ClaudeCodeProcess` translates Claude's output into ACP-compatible
14//! `session/update` notifications for frontend compatibility.
15//!
16//! **Agent Trace**: All sessions record trace events to JSONL files for
17//! attribution tracking (which model/session/tool affected which files and when).
18
19pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51#[cfg(windows)]
52pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
53
54// ─── Session Record ─────────────────────────────────────────────────────
55
56/// Record of an active ACP session persisted for UI listing.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(rename_all = "camelCase")]
59pub struct AcpSessionRecord {
60    pub session_id: String,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub name: Option<String>,
63    pub cwd: String,
64    pub workspace_id: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub routa_agent_id: Option<String>,
67    pub provider: Option<String>,
68    pub role: Option<String>,
69    pub mode_id: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub model: Option<String>,
72    pub created_at: String,
73    #[serde(default)]
74    pub first_prompt_sent: bool,
75    /// Parent session ID for CRAFTER child sessions
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub parent_session_id: Option<String>,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub specialist_id: Option<String>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub specialist_system_prompt: Option<String>,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct SessionLaunchOptions {
86    pub specialist_id: Option<String>,
87    pub specialist_system_prompt: Option<String>,
88    pub allowed_native_tools: Option<Vec<String>>,
89    pub initialize_timeout_ms: Option<u64>,
90    pub provider_args: Option<Vec<String>>,
91}
92
93// ─── Managed Process ────────────────────────────────────────────────────
94
95/// Process type enum to support both ACP and Claude stream-json protocols.
96#[derive(Clone)]
97enum AgentProcessType {
98    /// Standard ACP protocol (opencode, gemini, copilot, etc.)
99    Acp(Arc<AcpProcess>),
100    /// Claude Code stream-json protocol
101    Claude(Arc<ClaudeCodeProcess>),
102}
103
104impl AgentProcessType {
105    /// Kill the underlying process.
106    async fn kill(&self) {
107        match self {
108            AgentProcessType::Acp(process) => process.kill().await,
109            AgentProcessType::Claude(process) => process.kill().await,
110        }
111    }
112}
113
114/// A managed agent process with its metadata.
115struct ManagedProcess {
116    process: AgentProcessType,
117    /// The agent's own session ID (returned by `session/new` or claude's session_id).
118    acp_session_id: String,
119    preset_id: String,
120    #[allow(dead_code)]
121    created_at: String,
122    /// Trace writer for recording agent activities to JSONL
123    trace_writer: TraceWriter,
124    /// Working directory (for contributor context)
125    #[allow(dead_code)]
126    cwd: String,
127}
128
129// ─── ACP Manager ────────────────────────────────────────────────────────
130
131/// Manages ACP agent sessions and process lifecycle.
132///
133/// Each session maps to a long-lived child process that communicates via
134/// stdio JSON-RPC. Notifications are forwarded to subscribers via broadcast.
135#[derive(Clone)]
136pub struct AcpManager {
137    /// Our sessionId → session record (for UI listing)
138    sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
139    /// Our sessionId → managed process (the live agent)
140    processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
141    /// Our sessionId → broadcast sender for SSE notifications
142    notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
143    /// Our sessionId → message history (session/update notifications)
144    history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
145}
146
147impl Default for AcpManager {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153impl AcpManager {
154    pub fn rewrite_notification_session_id(
155        session_id: &str,
156        mut notification: serde_json::Value,
157    ) -> serde_json::Value {
158        if let Some(object) = notification.as_object_mut() {
159            object.insert(
160                "sessionId".to_string(),
161                serde_json::Value::String(session_id.to_string()),
162            );
163        }
164        notification
165    }
166
167    pub fn new() -> Self {
168        Self {
169            sessions: Arc::new(RwLock::new(HashMap::new())),
170            processes: Arc::new(RwLock::new(HashMap::new())),
171            notification_channels: Arc::new(RwLock::new(HashMap::new())),
172            history: Arc::new(RwLock::new(HashMap::new())),
173        }
174    }
175
176    /// List all session records.
177    pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
178        let sessions = self.sessions.read().await;
179        sessions.values().cloned().collect()
180    }
181
182    /// Get a session record by ID.
183    pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
184        let sessions = self.sessions.read().await;
185        sessions.get(session_id).cloned()
186    }
187
188    /// Rename a session.
189    /// Returns `Some(())` if the session was found and renamed, `None` if not found.
190    pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
191        let mut sessions = self.sessions.write().await;
192        let session = sessions.get_mut(session_id)?;
193        session.name = Some(name.to_string());
194        Some(())
195    }
196
197    /// Attach a ROUTA agent ID to an existing session record.
198    /// Returns `Some(())` if the session was found, `None` if not found.
199    pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
200        let mut sessions = self.sessions.write().await;
201        let session = sessions.get_mut(session_id)?;
202        session.routa_agent_id = Some(routa_agent_id.to_string());
203        Some(())
204    }
205
206    /// Delete a session.
207    /// Returns `Some(())` if the session was found and deleted, `None` if not found.
208    pub async fn delete_session(&self, session_id: &str) -> Option<()> {
209        let mut sessions = self.sessions.write().await;
210        let mut processes = self.processes.write().await;
211        let mut channels = self.notification_channels.write().await;
212        let mut history = self.history.write().await;
213
214        // Remove session record
215        sessions.remove(session_id)?;
216
217        // Kill the process if it exists
218        if let Some(managed) = processes.remove(session_id) {
219            let _ = managed.process.kill().await;
220        }
221
222        // Remove notification channel
223        channels.remove(session_id);
224
225        // Remove history
226        history.remove(session_id);
227
228        Some(())
229    }
230
231    /// Get session message history.
232    /// Returns `Some(history)` if the session exists, `None` if not found.
233    pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
234        let history = self.history.read().await;
235        history.get(session_id).cloned()
236    }
237
238    /// Add a notification to session history.
239    /// Child agent notifications (those with `childAgentId`) are NOT stored in the
240    /// parent session's history — they would flood out the ROUTA coordinator's own
241    /// messages. Child messages are persisted in their own child session's history.
242    pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
243        // Skip child agent notifications to prevent flooding parent history
244        if notification.get("childAgentId").is_some() {
245            return;
246        }
247        let mut history = self.history.write().await;
248        let entries = history
249            .entry(session_id.to_string())
250            .or_insert_with(Vec::new);
251        entries.push(notification);
252        // Cap at 500 entries (same limit as Next.js backend)
253        if entries.len() > 500 {
254            let drain_count = entries.len() - 500;
255            entries.drain(0..drain_count);
256        }
257    }
258
259    /// Broadcast a synthetic session/update event and persist it into in-memory history.
260    pub async fn emit_session_update(
261        &self,
262        session_id: &str,
263        update: serde_json::Value,
264    ) -> Result<(), String> {
265        let message = serde_json::json!({
266            "jsonrpc": "2.0",
267            "method": "session/update",
268            "params": {
269                "sessionId": session_id,
270                "update": update,
271            }
272        });
273
274        if let Some(channel) = self
275            .notification_channels
276            .read()
277            .await
278            .get(session_id)
279            .cloned()
280        {
281            let _ = channel.send(message.clone());
282        } else {
283            let params = message
284                .get("params")
285                .cloned()
286                .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
287            self.push_to_history(
288                session_id,
289                Self::rewrite_notification_session_id(session_id, params),
290            )
291            .await;
292        }
293        Ok(())
294    }
295
296    /// Mark a session as having had its first prompt dispatched.
297    pub async fn mark_first_prompt_sent(&self, session_id: &str) {
298        let mut sessions = self.sessions.write().await;
299        if let Some(session) = sessions.get_mut(session_id) {
300            session.first_prompt_sent = true;
301        }
302    }
303
304    /// Create a new ACP session: spawn agent process, initialize, create session.
305    /// Supports both static presets and registry-based agents.
306    /// **Claude** uses stream-json protocol instead of ACP.
307    ///
308    /// Returns `(our_session_id, agent_session_id)`.
309    #[allow(clippy::too_many_arguments)]
310    pub async fn create_session(
311        &self,
312        session_id: String,
313        cwd: String,
314        workspace_id: String,
315        provider: Option<String>,
316        role: Option<String>,
317        model: Option<String>,
318        parent_session_id: Option<String>,
319        tool_mode: Option<String>,
320        mcp_profile: Option<String>,
321    ) -> Result<(String, String), String> {
322        self.create_session_with_options(
323            session_id,
324            cwd,
325            workspace_id,
326            provider,
327            role,
328            model,
329            parent_session_id,
330            tool_mode,
331            mcp_profile,
332            SessionLaunchOptions::default(),
333        )
334        .await
335    }
336
337    /// Resume a persisted ACP session using the provider's native session/load path.
338    ///
339    /// Returns `(our_session_id, agent_session_id)`.
340    #[allow(clippy::too_many_arguments)]
341    pub async fn load_session_with_options(
342        &self,
343        session_id: String,
344        cwd: String,
345        workspace_id: String,
346        provider: Option<String>,
347        role: Option<String>,
348        model: Option<String>,
349        parent_session_id: Option<String>,
350        tool_mode: Option<String>,
351        mcp_profile: Option<String>,
352        provider_session_id: Option<String>,
353        options: SessionLaunchOptions,
354    ) -> Result<(String, String), String> {
355        let provider_name = provider.as_deref().unwrap_or("opencode");
356
357        if provider_name == "claude" {
358            return Err("Native session/load is not supported for Claude".to_string());
359        }
360
361        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
362        let preset = get_preset_by_id_with_registry(provider_name).await?;
363
364        if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
365            provider_name,
366            &workspace_id,
367            &session_id,
368            tool_mode.as_deref(),
369            mcp_profile.as_deref(),
370        )
371        .await?
372        {
373            tracing::info!("[AcpManager] {}", summary);
374        }
375
376        let mut extra_args: Vec<String> = preset.args.clone();
377        if let Some(provider_args) = options.provider_args.clone() {
378            extra_args.extend(provider_args);
379        }
380        if let Some(ref m) = model {
381            if !m.is_empty() {
382                extra_args.push("-m".to_string());
383                extra_args.push(m.clone());
384            }
385        }
386
387        let preset_command = resolve_preset_command(&preset);
388        let process = AcpProcess::spawn(
389            &preset_command,
390            &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
391            &cwd,
392            ntx.clone(),
393            &preset.name,
394            &session_id,
395        )
396        .await?;
397
398        process
399            .initialize_with_timeout(options.initialize_timeout_ms)
400            .await?;
401
402        let resolved_provider_session_id =
403            provider_session_id.unwrap_or_else(|| session_id.clone());
404        let acp_session_id = process
405            .load_session(&resolved_provider_session_id, &cwd)
406            .await?;
407
408        self.register_managed_session(
409            session_id.clone(),
410            cwd.clone(),
411            workspace_id.clone(),
412            provider_name.to_string(),
413            role.clone(),
414            model.clone(),
415            parent_session_id.clone(),
416            &options,
417            AgentProcessType::Acp(Arc::new(process)),
418            acp_session_id.clone(),
419            ntx.clone(),
420        )
421        .await;
422
423        tracing::info!(
424            "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
425            session_id,
426            provider_name,
427            acp_session_id,
428        );
429
430        Ok((session_id, acp_session_id))
431    }
432
433    #[allow(clippy::too_many_arguments)]
434    pub async fn load_session(
435        &self,
436        session_id: String,
437        cwd: String,
438        workspace_id: String,
439        provider: Option<String>,
440        role: Option<String>,
441        model: Option<String>,
442        parent_session_id: Option<String>,
443        tool_mode: Option<String>,
444        mcp_profile: Option<String>,
445        provider_session_id: Option<String>,
446    ) -> Result<(String, String), String> {
447        self.load_session_with_options(
448            session_id,
449            cwd,
450            workspace_id,
451            provider,
452            role,
453            model,
454            parent_session_id,
455            tool_mode,
456            mcp_profile,
457            provider_session_id,
458            SessionLaunchOptions::default(),
459        )
460        .await
461    }
462
463    fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
464        let history_manager = self.clone();
465        let history_session_id = session_id.to_string();
466        let mut history_rx = ntx.subscribe();
467        tokio::spawn(async move {
468            loop {
469                match history_rx.recv().await {
470                    Ok(message) => {
471                        let params = match message.get("params") {
472                            Some(value) => value.clone(),
473                            None => continue,
474                        };
475                        history_manager
476                            .push_to_history(
477                                &history_session_id,
478                                Self::rewrite_notification_session_id(&history_session_id, params),
479                            )
480                            .await;
481                    }
482                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
483                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
484                        tracing::warn!(
485                            "[AcpManager] Dropped {} session/update notifications for {}",
486                            skipped,
487                            history_session_id
488                        );
489                    }
490                }
491            }
492        });
493    }
494
495    #[allow(clippy::too_many_arguments)]
496    async fn register_managed_session(
497        &self,
498        session_id: String,
499        cwd: String,
500        workspace_id: String,
501        provider_name: String,
502        role: Option<String>,
503        model: Option<String>,
504        parent_session_id: Option<String>,
505        options: &SessionLaunchOptions,
506        process_type: AgentProcessType,
507        acp_session_id: String,
508        ntx: broadcast::Sender<serde_json::Value>,
509    ) {
510        let created_at = chrono::Utc::now().to_rfc3339();
511        let trace_writer = TraceWriter::new(&cwd);
512        let record = AcpSessionRecord {
513            session_id: session_id.clone(),
514            name: None,
515            cwd: cwd.clone(),
516            workspace_id: workspace_id.clone(),
517            routa_agent_id: None,
518            provider: Some(provider_name.clone()),
519            role: role.clone().or(Some("CRAFTER".to_string())),
520            mode_id: None,
521            model: model.clone(),
522            created_at: created_at.clone(),
523            first_prompt_sent: false,
524            parent_session_id: parent_session_id.clone(),
525            specialist_id: options.specialist_id.clone(),
526            specialist_system_prompt: options.specialist_system_prompt.clone(),
527        };
528
529        self.sessions
530            .write()
531            .await
532            .insert(session_id.clone(), record);
533        self.processes.write().await.insert(
534            session_id.clone(),
535            ManagedProcess {
536                process: process_type,
537                acp_session_id: acp_session_id.clone(),
538                preset_id: provider_name.clone(),
539                created_at,
540                trace_writer: trace_writer.clone(),
541                cwd: cwd.clone(),
542            },
543        );
544        self.notification_channels
545            .write()
546            .await
547            .insert(session_id.clone(), ntx.clone());
548        self.spawn_history_mirror(&session_id, &ntx);
549
550        let trace = TraceRecord::new(
551            &session_id,
552            TraceEventType::SessionStart,
553            Contributor::new(&provider_name, None),
554        )
555        .with_workspace_id(&workspace_id)
556        .with_metadata(
557            "role",
558            serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
559        )
560        .with_metadata("cwd", serde_json::json!(cwd));
561
562        trace_writer.append_safe(&trace).await;
563    }
564
565    #[allow(clippy::too_many_arguments)]
566    pub async fn create_session_from_inline(
567        &self,
568        session_id: String,
569        cwd: String,
570        workspace_id: String,
571        provider_name: String,
572        role: Option<String>,
573        model: Option<String>,
574        parent_session_id: Option<String>,
575        command: String,
576        args: Vec<String>,
577        options: SessionLaunchOptions,
578    ) -> Result<(String, String), String> {
579        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
580
581        let process = AcpProcess::spawn(
582            &command,
583            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
584            &cwd,
585            ntx.clone(),
586            &provider_name,
587            &session_id,
588        )
589        .await?;
590
591        process
592            .initialize_with_timeout(options.initialize_timeout_ms)
593            .await?;
594
595        let acp_session_id = process.new_session(&cwd).await?;
596        self.register_managed_session(
597            session_id.clone(),
598            cwd.clone(),
599            workspace_id.clone(),
600            provider_name.clone(),
601            role.clone(),
602            model.clone(),
603            parent_session_id.clone(),
604            &options,
605            AgentProcessType::Acp(Arc::new(process)),
606            acp_session_id.clone(),
607            ntx.clone(),
608        )
609        .await;
610
611        tracing::info!(
612            "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
613            session_id,
614            provider_name,
615            acp_session_id,
616        );
617
618        Ok((session_id, acp_session_id))
619    }
620
621    #[allow(clippy::too_many_arguments)]
622    pub async fn load_session_from_inline(
623        &self,
624        session_id: String,
625        cwd: String,
626        workspace_id: String,
627        provider_name: String,
628        role: Option<String>,
629        model: Option<String>,
630        parent_session_id: Option<String>,
631        command: String,
632        args: Vec<String>,
633        provider_session_id: Option<String>,
634        options: SessionLaunchOptions,
635    ) -> Result<(String, String), String> {
636        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
637
638        let process = AcpProcess::spawn(
639            &command,
640            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
641            &cwd,
642            ntx.clone(),
643            &provider_name,
644            &session_id,
645        )
646        .await?;
647
648        process
649            .initialize_with_timeout(options.initialize_timeout_ms)
650            .await?;
651
652        let resolved_provider_session_id =
653            provider_session_id.unwrap_or_else(|| session_id.clone());
654        let acp_session_id = process
655            .load_session(&resolved_provider_session_id, &cwd)
656            .await?;
657
658        self.register_managed_session(
659            session_id.clone(),
660            cwd.clone(),
661            workspace_id.clone(),
662            provider_name.clone(),
663            role.clone(),
664            model.clone(),
665            parent_session_id.clone(),
666            &options,
667            AgentProcessType::Acp(Arc::new(process)),
668            acp_session_id.clone(),
669            ntx.clone(),
670        )
671        .await;
672
673        tracing::info!(
674            "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
675            session_id,
676            provider_name,
677            acp_session_id,
678        );
679
680        Ok((session_id, acp_session_id))
681    }
682
683    #[allow(clippy::too_many_arguments)]
684    pub async fn create_session_with_options(
685        &self,
686        session_id: String,
687        cwd: String,
688        workspace_id: String,
689        provider: Option<String>,
690        role: Option<String>,
691        model: Option<String>,
692        parent_session_id: Option<String>,
693        tool_mode: Option<String>,
694        mcp_profile: Option<String>,
695        options: SessionLaunchOptions,
696    ) -> Result<(String, String), String> {
697        let provider_name = provider.as_deref().unwrap_or("opencode");
698
699        // Create the notification broadcast channel for this session
700        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
701        let claude_mcp_config = if provider_name == "claude" {
702            Some(mcp_setup::build_claude_mcp_config(
703                &workspace_id,
704                &session_id,
705                tool_mode.as_deref(),
706                mcp_profile.as_deref(),
707            ))
708        } else {
709            None
710        };
711
712        // Check if this is Claude (uses stream-json protocol, not ACP)
713        let (process_type, acp_session_id) = if provider_name == "claude" {
714            // Use Claude Code stream-json protocol
715            let config = ClaudeCodeConfig {
716                command: "claude".to_string(),
717                cwd: cwd.clone(),
718                display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
719                permission_mode: Some("bypassPermissions".to_string()),
720                mcp_configs: claude_mcp_config.into_iter().collect(),
721                append_system_prompt: options.specialist_system_prompt.clone(),
722                allowed_tools: options.allowed_native_tools.clone(),
723            };
724
725            let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
726            let claude_session_id = claude_process
727                .session_id()
728                .await
729                .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
730
731            (
732                AgentProcessType::Claude(Arc::new(claude_process)),
733                claude_session_id,
734            )
735        } else {
736            // Use standard ACP protocol
737            let preset = get_preset_by_id_with_registry(provider_name).await?;
738
739            if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
740                provider_name,
741                &workspace_id,
742                &session_id,
743                tool_mode.as_deref(),
744                mcp_profile.as_deref(),
745            )
746            .await?
747            {
748                tracing::info!("[AcpManager] {}", summary);
749            }
750
751            // Build args: preset args + optional model flag
752            let mut extra_args: Vec<String> = preset.args.clone();
753            if let Some(provider_args) = options.provider_args.clone() {
754                extra_args.extend(provider_args);
755            }
756            if let Some(ref m) = model {
757                if !m.is_empty() {
758                    // opencode (and future providers) accept -m <model>
759                    extra_args.push("-m".to_string());
760                    extra_args.push(m.clone());
761                }
762            }
763
764            let preset_command = resolve_preset_command(&preset);
765            let process = AcpProcess::spawn(
766                &preset_command,
767                &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
768                &cwd,
769                ntx.clone(),
770                &preset.name,
771                &session_id,
772            )
773            .await?;
774
775            // Initialize the protocol
776            process
777                .initialize_with_timeout(options.initialize_timeout_ms)
778                .await?;
779
780            // Create the agent session
781            let agent_session_id = process.new_session(&cwd).await?;
782
783            (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
784        };
785
786        self.register_managed_session(
787            session_id.clone(),
788            cwd.clone(),
789            workspace_id.clone(),
790            provider_name.to_string(),
791            role.clone(),
792            model.clone(),
793            parent_session_id.clone(),
794            &options,
795            process_type,
796            acp_session_id.clone(),
797            ntx.clone(),
798        )
799        .await;
800
801        tracing::info!(
802            "[AcpManager] Session {} created (provider: {}, agent session: {})",
803            session_id,
804            provider_name,
805            acp_session_id,
806        );
807
808        Ok((session_id, acp_session_id))
809    }
810
811    /// Send a prompt to an existing session's agent process.
812    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
813        self.mark_first_prompt_sent(session_id).await;
814
815        let (process, acp_session_id, preset_id, trace_writer) = {
816            let processes = self.processes.read().await;
817            let managed = processes
818                .get(session_id)
819                .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
820            (
821                managed.process.clone(),
822                managed.acp_session_id.clone(),
823                managed.preset_id.clone(),
824                managed.trace_writer.clone(),
825            )
826        };
827
828        let is_alive = match &process {
829            AgentProcessType::Acp(p) => p.is_alive(),
830            AgentProcessType::Claude(p) => p.is_alive(),
831        };
832
833        if !is_alive {
834            return Err(format!("Agent ({}) process is not running", preset_id));
835        }
836
837        // Record UserMessage trace
838        let trace = TraceRecord::new(
839            session_id,
840            TraceEventType::UserMessage,
841            Contributor::new(&preset_id, None),
842        )
843        .with_conversation(TraceConversation {
844            turn: None,
845            role: Some("user".to_string()),
846            content_preview: Some(truncate_content(text, 500)),
847            full_content: None,
848        });
849
850        trace_writer.append_safe(&trace).await;
851
852        tracing::info!(
853            target: "routa_acp_prompt",
854            session_id = %session_id,
855            preset_id = %preset_id,
856            acp_session_id = %acp_session_id,
857            prompt_len = text.len(),
858            "acp prompt start"
859        );
860
861        let result = match &process {
862            AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
863            AgentProcessType::Claude(p) => {
864                let stop_reason = p.prompt(text).await?;
865                Ok(serde_json::json!({ "stopReason": stop_reason }))
866            }
867        };
868
869        match &result {
870            Ok(_) => tracing::info!(
871                target: "routa_acp_prompt",
872                session_id = %session_id,
873                preset_id = %preset_id,
874                "acp prompt success"
875            ),
876            Err(error) => tracing::error!(
877                target: "routa_acp_prompt",
878                session_id = %session_id,
879                preset_id = %preset_id,
880                error = %error,
881                "acp prompt failed"
882            ),
883        }
884
885        result
886    }
887
888    /// Cancel the current prompt in a session.
889    pub async fn cancel(&self, session_id: &str) {
890        let processes = self.processes.read().await;
891        if let Some(managed) = processes.get(session_id) {
892            match &managed.process {
893                AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
894                AgentProcessType::Claude(p) => p.cancel().await,
895            }
896        }
897    }
898
899    /// Kill a session's agent process and remove it.
900    pub async fn kill_session(&self, session_id: &str) {
901        // Kill the process
902        if let Some(managed) = self.processes.write().await.remove(session_id) {
903            // Record SessionEnd trace before killing
904            let trace = TraceRecord::new(
905                session_id,
906                TraceEventType::SessionEnd,
907                Contributor::new(&managed.preset_id, None),
908            );
909            managed.trace_writer.append_safe(&trace).await;
910
911            match &managed.process {
912                AgentProcessType::Acp(p) => p.kill().await,
913                AgentProcessType::Claude(p) => p.kill().await,
914            }
915        }
916        // Remove session record
917        self.sessions.write().await.remove(session_id);
918        // Remove notification channel
919        self.notification_channels.write().await.remove(session_id);
920    }
921
922    /// Subscribe to SSE notifications for a session.
923    /// Returns a broadcast receiver that yields `session/update` JSON-RPC messages.
924    pub async fn subscribe(
925        &self,
926        session_id: &str,
927    ) -> Option<broadcast::Receiver<serde_json::Value>> {
928        let channels = self.notification_channels.read().await;
929        channels.get(session_id).map(|tx| tx.subscribe())
930    }
931
932    /// Check if a session's agent process is alive.
933    pub async fn is_alive(&self, session_id: &str) -> bool {
934        let processes = self.processes.read().await;
935        processes
936            .get(session_id)
937            .map(|m| match &m.process {
938                AgentProcessType::Acp(p) => p.is_alive(),
939                AgentProcessType::Claude(p) => p.is_alive(),
940            })
941            .unwrap_or(false)
942    }
943
944    /// Get the managed ACP session id for a live session.
945    pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
946        let processes = self.processes.read().await;
947        processes
948            .get(session_id)
949            .map(|managed| managed.acp_session_id.clone())
950    }
951
952    /// Get the preset ID for a session.
953    pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
954        let processes = self.processes.read().await;
955        processes.get(session_id).map(|m| m.preset_id.clone())
956    }
957
958    /// Check if a session uses Claude (stream-json protocol, not ACP).
959    pub async fn is_claude_session(&self, session_id: &str) -> bool {
960        let processes = self.processes.read().await;
961        processes
962            .get(session_id)
963            .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
964            .unwrap_or(false)
965    }
966
967    /// Send a prompt to Claude session and return immediately.
968    /// The actual response is streamed via the broadcast channel.
969    /// Use `subscribe()` to receive notifications.
970    pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
971        let processes = self.processes.read().await;
972        let managed = processes
973            .get(session_id)
974            .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
975
976        // Record trace
977        let trace = TraceRecord::new(
978            session_id,
979            TraceEventType::UserMessage,
980            Contributor::new(&managed.preset_id, None),
981        )
982        .with_conversation(TraceConversation {
983            turn: None,
984            role: Some("user".to_string()),
985            content_preview: Some(truncate_content(text, 500)),
986            full_content: Some(text.to_string()),
987        });
988
989        managed.trace_writer.append_safe(&trace).await;
990
991        match &managed.process {
992            AgentProcessType::Claude(p) => {
993                // Spawn the prompt in a background task so we can return immediately
994                let process = Arc::clone(p);
995                let text = text.to_string();
996                tokio::spawn(async move {
997                    let _ = process.prompt(&text).await;
998                });
999                Ok(())
1000            }
1001            AgentProcessType::Acp(_) => {
1002                Err("prompt_claude_async is only for Claude sessions".to_string())
1003            }
1004        }
1005    }
1006}
1007
1008// ─── ACP Presets ────────────────────────────────────────────────────────
1009
1010/// Resume/continuation capability metadata for a provider.
1011#[derive(Debug, Clone, Serialize, Deserialize)]
1012#[serde(rename_all = "camelCase")]
1013pub struct ResumeCapability {
1014    pub supported: bool,
1015    /// "native" | "replay" | "both"
1016    pub mode: String,
1017    #[serde(default)]
1018    #[serde(skip_serializing_if = "Option::is_none")]
1019    pub supports_fork: Option<bool>,
1020    #[serde(default)]
1021    #[serde(skip_serializing_if = "Option::is_none")]
1022    pub supports_list: Option<bool>,
1023}
1024
1025/// ACP provider presets for known coding agents.
1026#[derive(Debug, Clone, Serialize, Deserialize)]
1027pub struct AcpPreset {
1028    /// Unique identifier (lowercase, e.g., "claude", "opencode")
1029    pub id: String,
1030    /// Human-readable display name (e.g., "Claude Code", "OpenCode")
1031    pub name: String,
1032    pub command: String,
1033    pub args: Vec<String>,
1034    pub description: String,
1035    #[serde(default)]
1036    #[serde(skip_serializing_if = "Option::is_none")]
1037    pub env_bin_override: Option<String>,
1038    /// Resume/continuation capabilities for this provider.
1039    #[serde(default)]
1040    #[serde(skip_serializing_if = "Option::is_none")]
1041    pub resume: Option<ResumeCapability>,
1042}
1043
1044/// Get the list of known ACP agent presets (static/builtin only).
1045pub fn get_presets() -> Vec<AcpPreset> {
1046    vec![
1047        AcpPreset {
1048            id: "opencode".to_string(),
1049            name: "OpenCode".to_string(),
1050            command: "opencode".to_string(),
1051            args: vec!["acp".to_string()],
1052            description: "OpenCode AI coding agent".to_string(),
1053            env_bin_override: Some("OPENCODE_BIN".to_string()),
1054            resume: Some(ResumeCapability { supported: true, mode: "replay".to_string(), supports_fork: None, supports_list: None }),
1055        },
1056        AcpPreset {
1057            id: "gemini".to_string(),
1058            name: "Gemini".to_string(),
1059            command: "gemini".to_string(),
1060            args: vec!["--experimental-acp".to_string()],
1061            description: "Google Gemini CLI".to_string(),
1062            env_bin_override: None,
1063            resume: None,
1064        },
1065        AcpPreset {
1066            id: "codex-acp".to_string(),
1067            name: "Codex".to_string(),
1068            command: "codex-acp".to_string(),
1069            args: vec![],
1070            description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1071            env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1072            resume: Some(ResumeCapability { supported: true, mode: "both".to_string(), supports_fork: None, supports_list: Some(true) }),
1073        },
1074        AcpPreset {
1075            id: "copilot".to_string(),
1076            name: "GitHub Copilot".to_string(),
1077            command: "copilot".to_string(),
1078            args: vec![
1079                "--acp".to_string(),
1080                "--allow-all-tools".to_string(),
1081                "--no-ask-user".to_string(),
1082            ],
1083            description: "GitHub Copilot CLI".to_string(),
1084            env_bin_override: Some("COPILOT_BIN".to_string()),
1085            resume: None,
1086        },
1087        AcpPreset {
1088            id: "auggie".to_string(),
1089            name: "Auggie".to_string(),
1090            command: "auggie".to_string(),
1091            args: vec!["--acp".to_string()],
1092            description: "Augment Code's AI agent".to_string(),
1093            env_bin_override: None,
1094            resume: None,
1095        },
1096        AcpPreset {
1097            id: "kimi".to_string(),
1098            name: "Kimi".to_string(),
1099            command: "kimi".to_string(),
1100            args: vec!["acp".to_string()],
1101            description: "Moonshot AI's Kimi CLI".to_string(),
1102            env_bin_override: None,
1103            resume: None,
1104        },
1105        AcpPreset {
1106            id: "kiro".to_string(),
1107            name: "Kiro".to_string(),
1108            command: "kiro-cli".to_string(),
1109            args: vec!["acp".to_string()],
1110            description: "Amazon Kiro AI coding agent".to_string(),
1111            env_bin_override: Some("KIRO_BIN".to_string()),
1112            resume: None,
1113        },
1114        AcpPreset {
1115            id: "qoder".to_string(),
1116            name: "Qoder".to_string(),
1117            command: "qodercli".to_string(),
1118            args: vec!["--acp".to_string()],
1119            description: "Qoder AI coding agent".to_string(),
1120            env_bin_override: Some("QODER_BIN".to_string()),
1121            resume: None,
1122        },
1123        AcpPreset {
1124            id: "claude".to_string(),
1125            name: "Claude Code".to_string(),
1126            command: "claude".to_string(),
1127            // Claude Code uses stream-json protocol, not ACP flags
1128            // Args are unused since we use ClaudeCodeProcess directly
1129            args: vec![],
1130            description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1131            env_bin_override: Some("CLAUDE_BIN".to_string()),
1132            resume: Some(ResumeCapability { supported: true, mode: "replay".to_string(), supports_fork: Some(true), supports_list: None }),
1133        },
1134    ]
1135}
1136
1137/// Get a static preset by ID (synchronous, no registry lookup).
1138pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1139    let normalized_id = match id {
1140        "codex" => "codex-acp",
1141        "qodercli" => "qoder",
1142        other => other,
1143    };
1144    get_presets().into_iter().find(|p| p.id == normalized_id)
1145}
1146
1147/// Get the resume capability for a provider ID (synchronous).
1148pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1149    get_preset_by_id(provider).and_then(|p| p.resume)
1150}
1151
1152/// Get a preset by ID, checking both static presets and registry.
1153/// Static presets take precedence.
1154///
1155/// Supports suffixed IDs like "auggie-registry" to explicitly request
1156/// the registry version when both built-in and registry versions exist.
1157pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1158    let normalized_id = match id {
1159        "codex" => "codex-acp",
1160        "qodercli" => "qoder",
1161        other => other,
1162    };
1163
1164    // Handle suffixed IDs (e.g., "auggie-registry")
1165    // This allows explicit selection of registry version when both exist
1166    const REGISTRY_SUFFIX: &str = "-registry";
1167    if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1168        let mut preset = get_registry_preset(base_id).await?;
1169        // Keep the suffixed ID in the returned preset for consistency
1170        preset.id = id.to_string();
1171        return Ok(preset);
1172    }
1173
1174    // Check static presets first (match by id, not name)
1175    if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1176        if preset.id != id {
1177            preset.id = id.to_string();
1178        }
1179        return Ok(preset);
1180    }
1181
1182    // Fall back to registry
1183    let mut preset = get_registry_preset(normalized_id).await?;
1184    if preset.id != id {
1185        preset.id = id.to_string();
1186    }
1187    Ok(preset)
1188}
1189
1190/// Get a preset from the ACP registry by ID.
1191async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1192    let registry: AcpRegistry = fetch_registry().await?;
1193
1194    // Find the agent
1195    let agent = registry
1196        .agents
1197        .into_iter()
1198        .find(|a| a.id == id)
1199        .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
1200
1201    // Build command from distribution
1202    let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1203        let mut args = vec!["-y".to_string(), npx.package.clone()];
1204        args.extend(npx.args.clone());
1205        ("npx".to_string(), args)
1206    } else if let Some(ref uvx) = agent.distribution.uvx {
1207        let mut args = vec![uvx.package.clone()];
1208        args.extend(uvx.args.clone());
1209        ("uvx".to_string(), args)
1210    } else {
1211        return Err(format!(
1212            "Agent '{}' has no supported distribution (npx/uvx)",
1213            id
1214        ));
1215    };
1216
1217    Ok(AcpPreset {
1218        id: agent.id.clone(),
1219        name: agent.name,
1220        command,
1221        args,
1222        description: agent.description,
1223        env_bin_override: None,
1224        resume: None,
1225    })
1226}
1227
1228fn resolve_preset_command(preset: &AcpPreset) -> String {
1229    if let Some(env_var) = &preset.env_bin_override {
1230        if let Ok(custom_command) = std::env::var(env_var) {
1231            let trimmed = custom_command.trim();
1232            if !trimmed.is_empty() {
1233                return trimmed.to_string();
1234            }
1235        }
1236    }
1237
1238    crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1239}
1240
1241// ─── Utility Functions ─────────────────────────────────────────────────────
1242
1243/// Truncate content to a maximum length for storage in traces.
1244fn truncate_content(text: &str, max_len: usize) -> String {
1245    if text.chars().count() <= max_len {
1246        text.to_string()
1247    } else if max_len <= 3 {
1248        text.chars().take(max_len).collect()
1249    } else {
1250        let truncated: String = text.chars().take(max_len - 3).collect();
1251        format!("{truncated}...")
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use super::{
1258        get_preset_by_id_with_registry, get_presets, truncate_content, AcpManager, AcpSessionRecord,
1259    };
1260    use std::collections::HashMap;
1261    use std::sync::Arc;
1262    use tokio::sync::RwLock;
1263
1264    #[test]
1265    fn static_presets_include_codex_acp_for_codex_alias() {
1266        let presets = get_presets();
1267        assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1268    }
1269
1270    #[test]
1271    fn static_presets_include_qoder() {
1272        let presets = get_presets();
1273        assert!(presets.iter().any(|preset| preset.id == "qoder"));
1274    }
1275
1276    #[tokio::test]
1277    async fn qodercli_alias_resolves_to_qoder_preset() {
1278        let preset = get_preset_by_id_with_registry("qodercli")
1279            .await
1280            .expect("qodercli alias should resolve");
1281        assert_eq!(preset.id, "qodercli");
1282        assert_eq!(preset.command, "qodercli");
1283        assert_eq!(preset.args, vec!["--acp".to_string()]);
1284    }
1285
1286    #[tokio::test]
1287    async fn mark_first_prompt_sent_updates_live_session_record() {
1288        let manager = AcpManager::new();
1289        let session_id = "session-1".to_string();
1290        manager.sessions.write().await.insert(
1291            session_id.clone(),
1292            AcpSessionRecord {
1293                session_id: session_id.clone(),
1294                name: None,
1295                cwd: ".".to_string(),
1296                workspace_id: "default".to_string(),
1297                routa_agent_id: None,
1298                provider: Some("opencode".to_string()),
1299                role: Some("CRAFTER".to_string()),
1300                mode_id: None,
1301                model: None,
1302                created_at: chrono::Utc::now().to_rfc3339(),
1303                first_prompt_sent: false,
1304                parent_session_id: None,
1305                specialist_id: None,
1306                specialist_system_prompt: None,
1307            },
1308        );
1309
1310        manager.mark_first_prompt_sent(&session_id).await;
1311
1312        let session = manager.get_session(&session_id).await.expect("session");
1313        assert!(session.first_prompt_sent);
1314    }
1315
1316    #[tokio::test]
1317    async fn push_to_history_skips_parent_child_forwarding_noise() {
1318        let manager = AcpManager {
1319            sessions: Arc::new(RwLock::new(HashMap::new())),
1320            processes: Arc::new(RwLock::new(HashMap::new())),
1321            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1322            history: Arc::new(RwLock::new(HashMap::new())),
1323        };
1324
1325        manager
1326            .push_to_history(
1327                "parent",
1328                serde_json::json!({
1329                    "sessionId": "parent",
1330                    "childAgentId": "child-1",
1331                    "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1332                }),
1333            )
1334            .await;
1335
1336        let history = manager
1337            .get_session_history("parent")
1338            .await
1339            .unwrap_or_default();
1340        assert!(history.is_empty());
1341    }
1342
1343    #[tokio::test]
1344    async fn emit_session_update_broadcasts_when_channel_exists() {
1345        let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1346        let manager = AcpManager {
1347            sessions: Arc::new(RwLock::new(HashMap::new())),
1348            processes: Arc::new(RwLock::new(HashMap::new())),
1349            notification_channels: Arc::new(RwLock::new(HashMap::from([(
1350                "session-1".to_string(),
1351                tx,
1352            )]))),
1353            history: Arc::new(RwLock::new(HashMap::new())),
1354        };
1355
1356        manager
1357            .emit_session_update(
1358                "session-1",
1359                serde_json::json!({
1360                    "sessionUpdate": "turn_complete",
1361                    "stopReason": "cancelled"
1362                }),
1363            )
1364            .await
1365            .expect("emit should succeed");
1366
1367        let broadcast = rx.recv().await.expect("broadcast event");
1368        assert_eq!(
1369            broadcast["params"]["update"]["sessionUpdate"].as_str(),
1370            Some("turn_complete")
1371        );
1372        assert_eq!(
1373            broadcast["params"]["update"]["stopReason"].as_str(),
1374            Some("cancelled")
1375        );
1376    }
1377
1378    #[tokio::test]
1379    async fn emit_session_update_persists_history_without_channel() {
1380        let manager = AcpManager {
1381            sessions: Arc::new(RwLock::new(HashMap::new())),
1382            processes: Arc::new(RwLock::new(HashMap::new())),
1383            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1384            history: Arc::new(RwLock::new(HashMap::new())),
1385        };
1386
1387        manager
1388            .emit_session_update(
1389                "session-1",
1390                serde_json::json!({
1391                    "sessionUpdate": "turn_complete",
1392                    "stopReason": "cancelled"
1393                }),
1394            )
1395            .await
1396            .expect("emit should succeed");
1397
1398        let history = manager
1399            .get_session_history("session-1")
1400            .await
1401            .expect("history should exist");
1402        assert_eq!(history.len(), 1);
1403        assert_eq!(
1404            history[0]["update"]["sessionUpdate"].as_str(),
1405            Some("turn_complete")
1406        );
1407    }
1408
1409    #[test]
1410    fn rewrite_notification_session_id_overrides_provider_session_id() {
1411        let rewritten = AcpManager::rewrite_notification_session_id(
1412            "child-session",
1413            serde_json::json!({
1414                "sessionId": "provider-session",
1415                "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1416            }),
1417        );
1418
1419        assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1420    }
1421
1422    #[test]
1423    fn truncate_content_handles_unicode_boundaries() {
1424        assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1425        assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1426        assert_eq!(truncate_content("短文本", 10), "短文本");
1427    }
1428}