Skip to main content

routa_core/acp/
mod.rs

1//! ACP (Agent Client Protocol) integration.
2//!
3//! Manages ACP agent processes and provides JSON-RPC communication
4//! between the desktop client and coding agents (e.g. OpenCode, Claude, Copilot).
5//!
6//! Architecture (matches the Next.js `AcpProcessManager`):
7//!   - `session/new`    → spawns a child process, sends `initialize` + `session/new`
8//!   - `session/prompt` → reuses the live process, sends `session/prompt`
9//!   - `session/cancel` → sends cancellation notification
10//!   - SSE GET          → subscribes to `broadcast` channel for `session/update` events
11//!
12//! **Claude Code** uses a different protocol (stream-json) instead of ACP.
13//! The `ClaudeCodeProcess` translates Claude's output into ACP-compatible
14//! `session/update` notifications for frontend compatibility.
15//!
16//! **Agent Trace**: All sessions record trace events to JSONL files for
17//! attribution tracking (which model/session/tool affected which files and when).
18
19pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use serde::{Deserialize, Serialize};
46use tokio::sync::{broadcast, RwLock};
47
48use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
49use process::AcpProcess;
50
51#[cfg(windows)]
52pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
53
54// ─── Session Record ─────────────────────────────────────────────────────
55
56/// Record of an active ACP session persisted for UI listing.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(rename_all = "camelCase")]
59pub struct AcpSessionRecord {
60    pub session_id: String,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub name: Option<String>,
63    pub cwd: String,
64    pub workspace_id: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub routa_agent_id: Option<String>,
67    pub provider: Option<String>,
68    pub role: Option<String>,
69    pub mode_id: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub model: Option<String>,
72    pub created_at: String,
73    #[serde(default)]
74    pub first_prompt_sent: bool,
75    /// Parent session ID for CRAFTER child sessions
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub parent_session_id: Option<String>,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub specialist_id: Option<String>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub specialist_system_prompt: Option<String>,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct SessionLaunchOptions {
86    pub specialist_id: Option<String>,
87    pub specialist_system_prompt: Option<String>,
88    pub allowed_native_tools: Option<Vec<String>>,
89    pub initialize_timeout_ms: Option<u64>,
90    pub provider_args: Option<Vec<String>>,
91}
92
93// ─── Managed Process ────────────────────────────────────────────────────
94
95/// Process type enum to support both ACP and Claude stream-json protocols.
96#[derive(Clone)]
97enum AgentProcessType {
98    /// Standard ACP protocol (opencode, gemini, copilot, etc.)
99    Acp(Arc<AcpProcess>),
100    /// Claude Code stream-json protocol
101    Claude(Arc<ClaudeCodeProcess>),
102}
103
104impl AgentProcessType {
105    /// Kill the underlying process.
106    async fn kill(&self) {
107        match self {
108            AgentProcessType::Acp(process) => process.kill().await,
109            AgentProcessType::Claude(process) => process.kill().await,
110        }
111    }
112}
113
114/// A managed agent process with its metadata.
115struct ManagedProcess {
116    process: AgentProcessType,
117    /// The agent's own session ID (returned by `session/new` or claude's session_id).
118    acp_session_id: String,
119    preset_id: String,
120    #[allow(dead_code)]
121    created_at: String,
122    /// Trace writer for recording agent activities to JSONL
123    trace_writer: TraceWriter,
124    /// Working directory (for contributor context)
125    #[allow(dead_code)]
126    cwd: String,
127}
128
129// ─── ACP Manager ────────────────────────────────────────────────────────
130
131/// Manages ACP agent sessions and process lifecycle.
132///
133/// Each session maps to a long-lived child process that communicates via
134/// stdio JSON-RPC. Notifications are forwarded to subscribers via broadcast.
135#[derive(Clone)]
136pub struct AcpManager {
137    /// Our sessionId → session record (for UI listing)
138    sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
139    /// Our sessionId → managed process (the live agent)
140    processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
141    /// Our sessionId → broadcast sender for SSE notifications
142    notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
143    /// Our sessionId → message history (session/update notifications)
144    history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
145}
146
147impl Default for AcpManager {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153impl AcpManager {
154    pub fn rewrite_notification_session_id(
155        session_id: &str,
156        mut notification: serde_json::Value,
157    ) -> serde_json::Value {
158        if let Some(object) = notification.as_object_mut() {
159            object.insert(
160                "sessionId".to_string(),
161                serde_json::Value::String(session_id.to_string()),
162            );
163        }
164        notification
165    }
166
167    pub fn new() -> Self {
168        Self {
169            sessions: Arc::new(RwLock::new(HashMap::new())),
170            processes: Arc::new(RwLock::new(HashMap::new())),
171            notification_channels: Arc::new(RwLock::new(HashMap::new())),
172            history: Arc::new(RwLock::new(HashMap::new())),
173        }
174    }
175
176    /// List all session records.
177    pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
178        let sessions = self.sessions.read().await;
179        sessions.values().cloned().collect()
180    }
181
182    /// Get a session record by ID.
183    pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
184        let sessions = self.sessions.read().await;
185        sessions.get(session_id).cloned()
186    }
187
188    /// Rename a session.
189    /// Returns `Some(())` if the session was found and renamed, `None` if not found.
190    pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
191        let mut sessions = self.sessions.write().await;
192        let session = sessions.get_mut(session_id)?;
193        session.name = Some(name.to_string());
194        Some(())
195    }
196
197    /// Attach a ROUTA agent ID to an existing session record.
198    /// Returns `Some(())` if the session was found, `None` if not found.
199    pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
200        let mut sessions = self.sessions.write().await;
201        let session = sessions.get_mut(session_id)?;
202        session.routa_agent_id = Some(routa_agent_id.to_string());
203        Some(())
204    }
205
206    /// Delete a session.
207    /// Returns `Some(())` if the session was found and deleted, `None` if not found.
208    pub async fn delete_session(&self, session_id: &str) -> Option<()> {
209        let mut sessions = self.sessions.write().await;
210        let mut processes = self.processes.write().await;
211        let mut channels = self.notification_channels.write().await;
212        let mut history = self.history.write().await;
213
214        // Remove session record
215        sessions.remove(session_id)?;
216
217        // Kill the process if it exists
218        if let Some(managed) = processes.remove(session_id) {
219            let _ = managed.process.kill().await;
220        }
221
222        // Remove notification channel
223        channels.remove(session_id);
224
225        // Remove history
226        history.remove(session_id);
227
228        Some(())
229    }
230
231    /// Get session message history.
232    /// Returns `Some(history)` if the session exists, `None` if not found.
233    pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
234        let history = self.history.read().await;
235        history.get(session_id).cloned()
236    }
237
238    /// Add a notification to session history.
239    /// Child agent notifications (those with `childAgentId`) are NOT stored in the
240    /// parent session's history — they would flood out the ROUTA coordinator's own
241    /// messages. Child messages are persisted in their own child session's history.
242    pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
243        // Skip child agent notifications to prevent flooding parent history
244        if notification.get("childAgentId").is_some() {
245            return;
246        }
247        let mut history = self.history.write().await;
248        let entries = history
249            .entry(session_id.to_string())
250            .or_insert_with(Vec::new);
251        entries.push(notification);
252        // Cap at 500 entries (same limit as Next.js backend)
253        if entries.len() > 500 {
254            let drain_count = entries.len() - 500;
255            entries.drain(0..drain_count);
256        }
257    }
258
259    /// Broadcast a synthetic session/update event and persist it into in-memory history.
260    pub async fn emit_session_update(
261        &self,
262        session_id: &str,
263        update: serde_json::Value,
264    ) -> Result<(), String> {
265        let message = serde_json::json!({
266            "jsonrpc": "2.0",
267            "method": "session/update",
268            "params": {
269                "sessionId": session_id,
270                "update": update,
271            }
272        });
273
274        if let Some(channel) = self
275            .notification_channels
276            .read()
277            .await
278            .get(session_id)
279            .cloned()
280        {
281            let _ = channel.send(message.clone());
282        } else {
283            let params = message
284                .get("params")
285                .cloned()
286                .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
287            self.push_to_history(
288                session_id,
289                Self::rewrite_notification_session_id(session_id, params),
290            )
291            .await;
292        }
293        Ok(())
294    }
295
296    /// Mark a session as having had its first prompt dispatched.
297    pub async fn mark_first_prompt_sent(&self, session_id: &str) {
298        let mut sessions = self.sessions.write().await;
299        if let Some(session) = sessions.get_mut(session_id) {
300            session.first_prompt_sent = true;
301        }
302    }
303
304    /// Create a new ACP session: spawn agent process, initialize, create session.
305    /// Supports both static presets and registry-based agents.
306    /// **Claude** uses stream-json protocol instead of ACP.
307    ///
308    /// Returns `(our_session_id, agent_session_id)`.
309    #[allow(clippy::too_many_arguments)]
310    pub async fn create_session(
311        &self,
312        session_id: String,
313        cwd: String,
314        workspace_id: String,
315        provider: Option<String>,
316        role: Option<String>,
317        model: Option<String>,
318        parent_session_id: Option<String>,
319        tool_mode: Option<String>,
320        mcp_profile: Option<String>,
321    ) -> Result<(String, String), String> {
322        self.create_session_with_options(
323            session_id,
324            cwd,
325            workspace_id,
326            provider,
327            role,
328            model,
329            parent_session_id,
330            tool_mode,
331            mcp_profile,
332            SessionLaunchOptions::default(),
333        )
334        .await
335    }
336
337    fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
338        let history_manager = self.clone();
339        let history_session_id = session_id.to_string();
340        let mut history_rx = ntx.subscribe();
341        tokio::spawn(async move {
342            loop {
343                match history_rx.recv().await {
344                    Ok(message) => {
345                        let params = match message.get("params") {
346                            Some(value) => value.clone(),
347                            None => continue,
348                        };
349                        history_manager
350                            .push_to_history(
351                                &history_session_id,
352                                Self::rewrite_notification_session_id(&history_session_id, params),
353                            )
354                            .await;
355                    }
356                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
357                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
358                        tracing::warn!(
359                            "[AcpManager] Dropped {} session/update notifications for {}",
360                            skipped,
361                            history_session_id
362                        );
363                    }
364                }
365            }
366        });
367    }
368
369    #[allow(clippy::too_many_arguments)]
370    async fn register_managed_session(
371        &self,
372        session_id: String,
373        cwd: String,
374        workspace_id: String,
375        provider_name: String,
376        role: Option<String>,
377        model: Option<String>,
378        parent_session_id: Option<String>,
379        options: &SessionLaunchOptions,
380        process_type: AgentProcessType,
381        acp_session_id: String,
382        ntx: broadcast::Sender<serde_json::Value>,
383    ) {
384        let created_at = chrono::Utc::now().to_rfc3339();
385        let trace_writer = TraceWriter::new(&cwd);
386        let record = AcpSessionRecord {
387            session_id: session_id.clone(),
388            name: None,
389            cwd: cwd.clone(),
390            workspace_id: workspace_id.clone(),
391            routa_agent_id: None,
392            provider: Some(provider_name.clone()),
393            role: role.clone().or(Some("CRAFTER".to_string())),
394            mode_id: None,
395            model: model.clone(),
396            created_at: created_at.clone(),
397            first_prompt_sent: false,
398            parent_session_id: parent_session_id.clone(),
399            specialist_id: options.specialist_id.clone(),
400            specialist_system_prompt: options.specialist_system_prompt.clone(),
401        };
402
403        self.sessions
404            .write()
405            .await
406            .insert(session_id.clone(), record);
407        self.processes.write().await.insert(
408            session_id.clone(),
409            ManagedProcess {
410                process: process_type,
411                acp_session_id: acp_session_id.clone(),
412                preset_id: provider_name.clone(),
413                created_at,
414                trace_writer: trace_writer.clone(),
415                cwd: cwd.clone(),
416            },
417        );
418        self.notification_channels
419            .write()
420            .await
421            .insert(session_id.clone(), ntx.clone());
422        self.spawn_history_mirror(&session_id, &ntx);
423
424        let trace = TraceRecord::new(
425            &session_id,
426            TraceEventType::SessionStart,
427            Contributor::new(&provider_name, None),
428        )
429        .with_workspace_id(&workspace_id)
430        .with_metadata(
431            "role",
432            serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
433        )
434        .with_metadata("cwd", serde_json::json!(cwd));
435
436        trace_writer.append_safe(&trace).await;
437    }
438
439    #[allow(clippy::too_many_arguments)]
440    pub async fn create_session_from_inline(
441        &self,
442        session_id: String,
443        cwd: String,
444        workspace_id: String,
445        provider_name: String,
446        role: Option<String>,
447        model: Option<String>,
448        parent_session_id: Option<String>,
449        command: String,
450        args: Vec<String>,
451        options: SessionLaunchOptions,
452    ) -> Result<(String, String), String> {
453        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
454
455        let process = AcpProcess::spawn(
456            &command,
457            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
458            &cwd,
459            ntx.clone(),
460            &provider_name,
461            &session_id,
462        )
463        .await?;
464
465        process
466            .initialize_with_timeout(options.initialize_timeout_ms)
467            .await?;
468
469        let acp_session_id = process.new_session(&cwd).await?;
470        self.register_managed_session(
471            session_id.clone(),
472            cwd.clone(),
473            workspace_id.clone(),
474            provider_name.clone(),
475            role.clone(),
476            model.clone(),
477            parent_session_id.clone(),
478            &options,
479            AgentProcessType::Acp(Arc::new(process)),
480            acp_session_id.clone(),
481            ntx.clone(),
482        )
483        .await;
484
485        tracing::info!(
486            "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
487            session_id,
488            provider_name,
489            acp_session_id,
490        );
491
492        Ok((session_id, acp_session_id))
493    }
494
495    #[allow(clippy::too_many_arguments)]
496    pub async fn create_session_with_options(
497        &self,
498        session_id: String,
499        cwd: String,
500        workspace_id: String,
501        provider: Option<String>,
502        role: Option<String>,
503        model: Option<String>,
504        parent_session_id: Option<String>,
505        tool_mode: Option<String>,
506        mcp_profile: Option<String>,
507        options: SessionLaunchOptions,
508    ) -> Result<(String, String), String> {
509        let provider_name = provider.as_deref().unwrap_or("opencode");
510
511        // Create the notification broadcast channel for this session
512        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
513        let claude_mcp_config = if provider_name == "claude" {
514            Some(mcp_setup::build_claude_mcp_config(
515                &workspace_id,
516                &session_id,
517                tool_mode.as_deref(),
518                mcp_profile.as_deref(),
519            ))
520        } else {
521            None
522        };
523
524        // Check if this is Claude (uses stream-json protocol, not ACP)
525        let (process_type, acp_session_id) = if provider_name == "claude" {
526            // Use Claude Code stream-json protocol
527            let config = ClaudeCodeConfig {
528                command: "claude".to_string(),
529                cwd: cwd.clone(),
530                display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
531                permission_mode: Some("bypassPermissions".to_string()),
532                mcp_configs: claude_mcp_config.into_iter().collect(),
533                append_system_prompt: options.specialist_system_prompt.clone(),
534                allowed_tools: options.allowed_native_tools.clone(),
535            };
536
537            let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
538            let claude_session_id = claude_process
539                .session_id()
540                .await
541                .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
542
543            (
544                AgentProcessType::Claude(Arc::new(claude_process)),
545                claude_session_id,
546            )
547        } else {
548            // Use standard ACP protocol
549            let preset = get_preset_by_id_with_registry(provider_name).await?;
550
551            if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
552                provider_name,
553                &workspace_id,
554                &session_id,
555                tool_mode.as_deref(),
556                mcp_profile.as_deref(),
557            )
558            .await?
559            {
560                tracing::info!("[AcpManager] {}", summary);
561            }
562
563            // Build args: preset args + optional model flag
564            let mut extra_args: Vec<String> = preset.args.clone();
565            if let Some(provider_args) = options.provider_args.clone() {
566                extra_args.extend(provider_args);
567            }
568            if let Some(ref m) = model {
569                if !m.is_empty() {
570                    // opencode (and future providers) accept -m <model>
571                    extra_args.push("-m".to_string());
572                    extra_args.push(m.clone());
573                }
574            }
575
576            let preset_command = resolve_preset_command(&preset);
577            let process = AcpProcess::spawn(
578                &preset_command,
579                &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
580                &cwd,
581                ntx.clone(),
582                &preset.name,
583                &session_id,
584            )
585            .await?;
586
587            // Initialize the protocol
588            process
589                .initialize_with_timeout(options.initialize_timeout_ms)
590                .await?;
591
592            // Create the agent session
593            let agent_session_id = process.new_session(&cwd).await?;
594
595            (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
596        };
597
598        self.register_managed_session(
599            session_id.clone(),
600            cwd.clone(),
601            workspace_id.clone(),
602            provider_name.to_string(),
603            role.clone(),
604            model.clone(),
605            parent_session_id.clone(),
606            &options,
607            process_type,
608            acp_session_id.clone(),
609            ntx.clone(),
610        )
611        .await;
612
613        tracing::info!(
614            "[AcpManager] Session {} created (provider: {}, agent session: {})",
615            session_id,
616            provider_name,
617            acp_session_id,
618        );
619
620        Ok((session_id, acp_session_id))
621    }
622
623    /// Send a prompt to an existing session's agent process.
624    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
625        self.mark_first_prompt_sent(session_id).await;
626
627        let (process, acp_session_id, preset_id, trace_writer) = {
628            let processes = self.processes.read().await;
629            let managed = processes
630                .get(session_id)
631                .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
632            (
633                managed.process.clone(),
634                managed.acp_session_id.clone(),
635                managed.preset_id.clone(),
636                managed.trace_writer.clone(),
637            )
638        };
639
640        let is_alive = match &process {
641            AgentProcessType::Acp(p) => p.is_alive(),
642            AgentProcessType::Claude(p) => p.is_alive(),
643        };
644
645        if !is_alive {
646            return Err(format!("Agent ({}) process is not running", preset_id));
647        }
648
649        // Record UserMessage trace
650        let trace = TraceRecord::new(
651            session_id,
652            TraceEventType::UserMessage,
653            Contributor::new(&preset_id, None),
654        )
655        .with_conversation(TraceConversation {
656            turn: None,
657            role: Some("user".to_string()),
658            content_preview: Some(truncate_content(text, 500)),
659            full_content: None,
660        });
661
662        trace_writer.append_safe(&trace).await;
663
664        tracing::info!(
665            target: "routa_acp_prompt",
666            session_id = %session_id,
667            preset_id = %preset_id,
668            acp_session_id = %acp_session_id,
669            prompt_len = text.len(),
670            "acp prompt start"
671        );
672
673        let result = match &process {
674            AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
675            AgentProcessType::Claude(p) => {
676                let stop_reason = p.prompt(text).await?;
677                Ok(serde_json::json!({ "stopReason": stop_reason }))
678            }
679        };
680
681        match &result {
682            Ok(_) => tracing::info!(
683                target: "routa_acp_prompt",
684                session_id = %session_id,
685                preset_id = %preset_id,
686                "acp prompt success"
687            ),
688            Err(error) => tracing::error!(
689                target: "routa_acp_prompt",
690                session_id = %session_id,
691                preset_id = %preset_id,
692                error = %error,
693                "acp prompt failed"
694            ),
695        }
696
697        result
698    }
699
700    /// Cancel the current prompt in a session.
701    pub async fn cancel(&self, session_id: &str) {
702        let processes = self.processes.read().await;
703        if let Some(managed) = processes.get(session_id) {
704            match &managed.process {
705                AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
706                AgentProcessType::Claude(p) => p.cancel().await,
707            }
708        }
709    }
710
711    /// Kill a session's agent process and remove it.
712    pub async fn kill_session(&self, session_id: &str) {
713        // Kill the process
714        if let Some(managed) = self.processes.write().await.remove(session_id) {
715            // Record SessionEnd trace before killing
716            let trace = TraceRecord::new(
717                session_id,
718                TraceEventType::SessionEnd,
719                Contributor::new(&managed.preset_id, None),
720            );
721            managed.trace_writer.append_safe(&trace).await;
722
723            match &managed.process {
724                AgentProcessType::Acp(p) => p.kill().await,
725                AgentProcessType::Claude(p) => p.kill().await,
726            }
727        }
728        // Remove session record
729        self.sessions.write().await.remove(session_id);
730        // Remove notification channel
731        self.notification_channels.write().await.remove(session_id);
732    }
733
734    /// Subscribe to SSE notifications for a session.
735    /// Returns a broadcast receiver that yields `session/update` JSON-RPC messages.
736    pub async fn subscribe(
737        &self,
738        session_id: &str,
739    ) -> Option<broadcast::Receiver<serde_json::Value>> {
740        let channels = self.notification_channels.read().await;
741        channels.get(session_id).map(|tx| tx.subscribe())
742    }
743
744    /// Check if a session's agent process is alive.
745    pub async fn is_alive(&self, session_id: &str) -> bool {
746        let processes = self.processes.read().await;
747        processes
748            .get(session_id)
749            .map(|m| match &m.process {
750                AgentProcessType::Acp(p) => p.is_alive(),
751                AgentProcessType::Claude(p) => p.is_alive(),
752            })
753            .unwrap_or(false)
754    }
755
756    /// Get the preset ID for a session.
757    pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
758        let processes = self.processes.read().await;
759        processes.get(session_id).map(|m| m.preset_id.clone())
760    }
761
762    /// Check if a session uses Claude (stream-json protocol, not ACP).
763    pub async fn is_claude_session(&self, session_id: &str) -> bool {
764        let processes = self.processes.read().await;
765        processes
766            .get(session_id)
767            .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
768            .unwrap_or(false)
769    }
770
771    /// Send a prompt to Claude session and return immediately.
772    /// The actual response is streamed via the broadcast channel.
773    /// Use `subscribe()` to receive notifications.
774    pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
775        let processes = self.processes.read().await;
776        let managed = processes
777            .get(session_id)
778            .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
779
780        // Record trace
781        let trace = TraceRecord::new(
782            session_id,
783            TraceEventType::UserMessage,
784            Contributor::new(&managed.preset_id, None),
785        )
786        .with_conversation(TraceConversation {
787            turn: None,
788            role: Some("user".to_string()),
789            content_preview: Some(text[..text.len().min(200)].to_string()),
790            full_content: Some(text.to_string()),
791        });
792
793        managed.trace_writer.append_safe(&trace).await;
794
795        match &managed.process {
796            AgentProcessType::Claude(p) => {
797                // Spawn the prompt in a background task so we can return immediately
798                let process = Arc::clone(p);
799                let text = text.to_string();
800                tokio::spawn(async move {
801                    let _ = process.prompt(&text).await;
802                });
803                Ok(())
804            }
805            AgentProcessType::Acp(_) => {
806                Err("prompt_claude_async is only for Claude sessions".to_string())
807            }
808        }
809    }
810}
811
812// ─── ACP Presets ────────────────────────────────────────────────────────
813
814/// ACP provider presets for known coding agents.
815#[derive(Debug, Clone, Serialize, Deserialize)]
816pub struct AcpPreset {
817    /// Unique identifier (lowercase, e.g., "claude", "opencode")
818    pub id: String,
819    /// Human-readable display name (e.g., "Claude Code", "OpenCode")
820    pub name: String,
821    pub command: String,
822    pub args: Vec<String>,
823    pub description: String,
824    #[serde(default)]
825    #[serde(skip_serializing_if = "Option::is_none")]
826    pub env_bin_override: Option<String>,
827}
828
829/// Get the list of known ACP agent presets (static/builtin only).
830pub fn get_presets() -> Vec<AcpPreset> {
831    vec![
832        AcpPreset {
833            id: "opencode".to_string(),
834            name: "OpenCode".to_string(),
835            command: "opencode".to_string(),
836            args: vec!["acp".to_string()],
837            description: "OpenCode AI coding agent".to_string(),
838            env_bin_override: Some("OPENCODE_BIN".to_string()),
839        },
840        AcpPreset {
841            id: "gemini".to_string(),
842            name: "Gemini".to_string(),
843            command: "gemini".to_string(),
844            args: vec!["--experimental-acp".to_string()],
845            description: "Google Gemini CLI".to_string(),
846            env_bin_override: None,
847        },
848        AcpPreset {
849            id: "codex-acp".to_string(),
850            name: "Codex".to_string(),
851            command: "codex-acp".to_string(),
852            args: vec![],
853            description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
854            env_bin_override: Some("CODEX_ACP_BIN".to_string()),
855        },
856        AcpPreset {
857            id: "copilot".to_string(),
858            name: "GitHub Copilot".to_string(),
859            command: "copilot".to_string(),
860            args: vec![
861                "--acp".to_string(),
862                "--allow-all-tools".to_string(),
863                "--no-ask-user".to_string(),
864            ],
865            description: "GitHub Copilot CLI".to_string(),
866            env_bin_override: Some("COPILOT_BIN".to_string()),
867        },
868        AcpPreset {
869            id: "auggie".to_string(),
870            name: "Auggie".to_string(),
871            command: "auggie".to_string(),
872            args: vec!["--acp".to_string()],
873            description: "Augment Code's AI agent".to_string(),
874            env_bin_override: None,
875        },
876        AcpPreset {
877            id: "kimi".to_string(),
878            name: "Kimi".to_string(),
879            command: "kimi".to_string(),
880            args: vec!["acp".to_string()],
881            description: "Moonshot AI's Kimi CLI".to_string(),
882            env_bin_override: None,
883        },
884        AcpPreset {
885            id: "kiro".to_string(),
886            name: "Kiro".to_string(),
887            command: "kiro-cli".to_string(),
888            args: vec!["acp".to_string()],
889            description: "Amazon Kiro AI coding agent".to_string(),
890            env_bin_override: Some("KIRO_BIN".to_string()),
891        },
892        AcpPreset {
893            id: "qoder".to_string(),
894            name: "Qoder".to_string(),
895            command: "qodercli".to_string(),
896            args: vec!["--acp".to_string()],
897            description: "Qoder AI coding agent".to_string(),
898            env_bin_override: Some("QODER_BIN".to_string()),
899        },
900        AcpPreset {
901            id: "claude".to_string(),
902            name: "Claude Code".to_string(),
903            command: "claude".to_string(),
904            // Claude Code uses stream-json protocol, not ACP flags
905            // Args are unused since we use ClaudeCodeProcess directly
906            args: vec![],
907            description: "Anthropic Claude Code (stream-json protocol)".to_string(),
908            env_bin_override: Some("CLAUDE_BIN".to_string()),
909        },
910    ]
911}
912
913/// Get a preset by ID, checking both static presets and registry.
914/// Static presets take precedence.
915///
916/// Supports suffixed IDs like "auggie-registry" to explicitly request
917/// the registry version when both built-in and registry versions exist.
918pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
919    let normalized_id = match id {
920        "codex" => "codex-acp",
921        "qodercli" => "qoder",
922        other => other,
923    };
924
925    // Handle suffixed IDs (e.g., "auggie-registry")
926    // This allows explicit selection of registry version when both exist
927    const REGISTRY_SUFFIX: &str = "-registry";
928    if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
929        let mut preset = get_registry_preset(base_id).await?;
930        // Keep the suffixed ID in the returned preset for consistency
931        preset.id = id.to_string();
932        return Ok(preset);
933    }
934
935    // Check static presets first (match by id, not name)
936    if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
937        if preset.id != id {
938            preset.id = id.to_string();
939        }
940        return Ok(preset);
941    }
942
943    // Fall back to registry
944    let mut preset = get_registry_preset(normalized_id).await?;
945    if preset.id != id {
946        preset.id = id.to_string();
947    }
948    Ok(preset)
949}
950
951/// Get a preset from the ACP registry by ID.
952async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
953    let registry: AcpRegistry = fetch_registry().await?;
954
955    // Find the agent
956    let agent = registry
957        .agents
958        .into_iter()
959        .find(|a| a.id == id)
960        .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
961
962    // Build command from distribution
963    let (command, args) = if let Some(ref npx) = agent.distribution.npx {
964        let mut args = vec!["-y".to_string(), npx.package.clone()];
965        args.extend(npx.args.clone());
966        ("npx".to_string(), args)
967    } else if let Some(ref uvx) = agent.distribution.uvx {
968        let mut args = vec![uvx.package.clone()];
969        args.extend(uvx.args.clone());
970        ("uvx".to_string(), args)
971    } else {
972        return Err(format!(
973            "Agent '{}' has no supported distribution (npx/uvx)",
974            id
975        ));
976    };
977
978    Ok(AcpPreset {
979        id: agent.id.clone(),
980        name: agent.name,
981        command,
982        args,
983        description: agent.description,
984        env_bin_override: None,
985    })
986}
987
988fn resolve_preset_command(preset: &AcpPreset) -> String {
989    if let Some(env_var) = &preset.env_bin_override {
990        if let Ok(custom_command) = std::env::var(env_var) {
991            let trimmed = custom_command.trim();
992            if !trimmed.is_empty() {
993                return trimmed.to_string();
994            }
995        }
996    }
997
998    crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
999}
1000
1001// ─── Utility Functions ─────────────────────────────────────────────────────
1002
1003/// Truncate content to a maximum length for storage in traces.
1004fn truncate_content(text: &str, max_len: usize) -> String {
1005    if text.chars().count() <= max_len {
1006        text.to_string()
1007    } else if max_len <= 3 {
1008        text.chars().take(max_len).collect()
1009    } else {
1010        let truncated: String = text.chars().take(max_len - 3).collect();
1011        format!("{truncated}...")
1012    }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::{
1018        get_preset_by_id_with_registry, get_presets, truncate_content, AcpManager, AcpSessionRecord,
1019    };
1020    use std::collections::HashMap;
1021    use std::sync::Arc;
1022    use tokio::sync::RwLock;
1023
1024    #[test]
1025    fn static_presets_include_codex_acp_for_codex_alias() {
1026        let presets = get_presets();
1027        assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1028    }
1029
1030    #[test]
1031    fn static_presets_include_qoder() {
1032        let presets = get_presets();
1033        assert!(presets.iter().any(|preset| preset.id == "qoder"));
1034    }
1035
1036    #[tokio::test]
1037    async fn qodercli_alias_resolves_to_qoder_preset() {
1038        let preset = get_preset_by_id_with_registry("qodercli")
1039            .await
1040            .expect("qodercli alias should resolve");
1041        assert_eq!(preset.id, "qodercli");
1042        assert_eq!(preset.command, "qodercli");
1043        assert_eq!(preset.args, vec!["--acp".to_string()]);
1044    }
1045
1046    #[tokio::test]
1047    async fn mark_first_prompt_sent_updates_live_session_record() {
1048        let manager = AcpManager::new();
1049        let session_id = "session-1".to_string();
1050        manager.sessions.write().await.insert(
1051            session_id.clone(),
1052            AcpSessionRecord {
1053                session_id: session_id.clone(),
1054                name: None,
1055                cwd: ".".to_string(),
1056                workspace_id: "default".to_string(),
1057                routa_agent_id: None,
1058                provider: Some("opencode".to_string()),
1059                role: Some("CRAFTER".to_string()),
1060                mode_id: None,
1061                model: None,
1062                created_at: chrono::Utc::now().to_rfc3339(),
1063                first_prompt_sent: false,
1064                parent_session_id: None,
1065                specialist_id: None,
1066                specialist_system_prompt: None,
1067            },
1068        );
1069
1070        manager.mark_first_prompt_sent(&session_id).await;
1071
1072        let session = manager.get_session(&session_id).await.expect("session");
1073        assert!(session.first_prompt_sent);
1074    }
1075
1076    #[tokio::test]
1077    async fn push_to_history_skips_parent_child_forwarding_noise() {
1078        let manager = AcpManager {
1079            sessions: Arc::new(RwLock::new(HashMap::new())),
1080            processes: Arc::new(RwLock::new(HashMap::new())),
1081            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1082            history: Arc::new(RwLock::new(HashMap::new())),
1083        };
1084
1085        manager
1086            .push_to_history(
1087                "parent",
1088                serde_json::json!({
1089                    "sessionId": "parent",
1090                    "childAgentId": "child-1",
1091                    "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1092                }),
1093            )
1094            .await;
1095
1096        let history = manager
1097            .get_session_history("parent")
1098            .await
1099            .unwrap_or_default();
1100        assert!(history.is_empty());
1101    }
1102
1103    #[tokio::test]
1104    async fn emit_session_update_broadcasts_when_channel_exists() {
1105        let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1106        let manager = AcpManager {
1107            sessions: Arc::new(RwLock::new(HashMap::new())),
1108            processes: Arc::new(RwLock::new(HashMap::new())),
1109            notification_channels: Arc::new(RwLock::new(HashMap::from([(
1110                "session-1".to_string(),
1111                tx,
1112            )]))),
1113            history: Arc::new(RwLock::new(HashMap::new())),
1114        };
1115
1116        manager
1117            .emit_session_update(
1118                "session-1",
1119                serde_json::json!({
1120                    "sessionUpdate": "turn_complete",
1121                    "stopReason": "cancelled"
1122                }),
1123            )
1124            .await
1125            .expect("emit should succeed");
1126
1127        let broadcast = rx.recv().await.expect("broadcast event");
1128        assert_eq!(
1129            broadcast["params"]["update"]["sessionUpdate"].as_str(),
1130            Some("turn_complete")
1131        );
1132        assert_eq!(
1133            broadcast["params"]["update"]["stopReason"].as_str(),
1134            Some("cancelled")
1135        );
1136    }
1137
1138    #[tokio::test]
1139    async fn emit_session_update_persists_history_without_channel() {
1140        let manager = AcpManager {
1141            sessions: Arc::new(RwLock::new(HashMap::new())),
1142            processes: Arc::new(RwLock::new(HashMap::new())),
1143            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1144            history: Arc::new(RwLock::new(HashMap::new())),
1145        };
1146
1147        manager
1148            .emit_session_update(
1149                "session-1",
1150                serde_json::json!({
1151                    "sessionUpdate": "turn_complete",
1152                    "stopReason": "cancelled"
1153                }),
1154            )
1155            .await
1156            .expect("emit should succeed");
1157
1158        let history = manager
1159            .get_session_history("session-1")
1160            .await
1161            .expect("history should exist");
1162        assert_eq!(history.len(), 1);
1163        assert_eq!(
1164            history[0]["update"]["sessionUpdate"].as_str(),
1165            Some("turn_complete")
1166        );
1167    }
1168
1169    #[test]
1170    fn rewrite_notification_session_id_overrides_provider_session_id() {
1171        let rewritten = AcpManager::rewrite_notification_session_id(
1172            "child-session",
1173            serde_json::json!({
1174                "sessionId": "provider-session",
1175                "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1176            }),
1177        );
1178
1179        assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1180    }
1181
1182    #[test]
1183    fn truncate_content_handles_unicode_boundaries() {
1184        assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1185        assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1186        assert_eq!(truncate_content("短文本", 10), "短文本");
1187    }
1188}