Skip to main content

routa_core/acp/
mod.rs

1//! ACP (Agent Client Protocol) integration.
2//!
3//! Manages ACP agent processes and provides JSON-RPC communication
4//! between the desktop client and coding agents (e.g. OpenCode, Claude, Copilot).
5//!
6//! Architecture (matches the Next.js `AcpProcessManager`):
7//!   - `session/new`    → spawns a child process, sends `initialize` + `session/new`
8//!   - `session/prompt` → reuses the live process, sends `session/prompt`
9//!   - `session/cancel` → sends cancellation notification
10//!   - SSE GET          → subscribes to `broadcast` channel for `session/update` events
11//!
12//! **Claude Code** uses a different protocol (stream-json) instead of ACP.
13//! The `ClaudeCodeProcess` translates Claude's output into ACP-compatible
14//! `session/update` notifications for frontend compatibility.
15//!
16//! **Agent Trace**: All sessions record trace events to JSONL files for
17//! attribution tracking (which model/session/tool affected which files and when).
18
19pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::path::Path;
44use std::sync::Arc;
45
46use serde::{Deserialize, Serialize};
47use tokio::sync::{broadcast, RwLock};
48
49use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
50use process::AcpProcess;
51
52#[cfg(windows)]
53pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
54
55fn validate_session_cwd(cwd: &str) -> Result<(), String> {
56    let path = Path::new(cwd);
57    if !path.exists() {
58        return Err(format!(
59            "Invalid session cwd '{}': directory does not exist",
60            cwd
61        ));
62    }
63    if !path.is_dir() {
64        return Err(format!(
65            "Invalid session cwd '{}': path is not a directory",
66            cwd
67        ));
68    }
69    Ok(())
70}
71
72// ─── Session Record ─────────────────────────────────────────────────────
73
74/// Record of an active ACP session persisted for UI listing.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct AcpSessionRecord {
78    pub session_id: String,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub name: Option<String>,
81    pub cwd: String,
82    pub workspace_id: String,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub routa_agent_id: Option<String>,
85    pub provider: Option<String>,
86    pub role: Option<String>,
87    pub mode_id: Option<String>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub model: Option<String>,
90    pub created_at: String,
91    #[serde(default)]
92    pub first_prompt_sent: bool,
93    /// Parent session ID for CRAFTER child sessions
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub parent_session_id: Option<String>,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub specialist_id: Option<String>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub specialist_system_prompt: Option<String>,
100}
101
102#[derive(Debug, Clone, Default)]
103pub struct SessionLaunchOptions {
104    pub specialist_id: Option<String>,
105    pub specialist_system_prompt: Option<String>,
106    pub allowed_native_tools: Option<Vec<String>>,
107    pub initialize_timeout_ms: Option<u64>,
108    pub provider_args: Option<Vec<String>>,
109    pub acp_mcp_servers: Option<Vec<serde_json::Value>>,
110}
111
112// ─── Managed Process ────────────────────────────────────────────────────
113
114/// Process type enum to support both ACP and Claude stream-json protocols.
115#[derive(Clone)]
116enum AgentProcessType {
117    /// Standard ACP protocol (opencode, gemini, copilot, etc.)
118    Acp(Arc<AcpProcess>),
119    /// Claude Code stream-json protocol
120    Claude(Arc<ClaudeCodeProcess>),
121}
122
123impl AgentProcessType {
124    /// Kill the underlying process.
125    async fn kill(&self) {
126        match self {
127            AgentProcessType::Acp(process) => process.kill().await,
128            AgentProcessType::Claude(process) => process.kill().await,
129        }
130    }
131}
132
133/// A managed agent process with its metadata.
134struct ManagedProcess {
135    process: AgentProcessType,
136    /// The agent's own session ID (returned by `session/new` or claude's session_id).
137    acp_session_id: String,
138    preset_id: String,
139    #[allow(dead_code)]
140    created_at: String,
141    /// Trace writer for recording agent activities to JSONL
142    trace_writer: TraceWriter,
143    /// Working directory (for contributor context)
144    #[allow(dead_code)]
145    cwd: String,
146}
147
148// ─── ACP Manager ────────────────────────────────────────────────────────
149
150/// Manages ACP agent sessions and process lifecycle.
151///
152/// Each session maps to a long-lived child process that communicates via
153/// stdio JSON-RPC. Notifications are forwarded to subscribers via broadcast.
154#[derive(Clone)]
155pub struct AcpManager {
156    /// Our sessionId → session record (for UI listing)
157    sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
158    /// Our sessionId → managed process (the live agent)
159    processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
160    /// Our sessionId → broadcast sender for SSE notifications
161    notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
162    /// Our sessionId → message history (session/update notifications)
163    history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
164}
165
166impl Default for AcpManager {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171
172impl AcpManager {
173    pub fn rewrite_notification_session_id(
174        session_id: &str,
175        mut notification: serde_json::Value,
176    ) -> serde_json::Value {
177        if let Some(object) = notification.as_object_mut() {
178            object.insert(
179                "sessionId".to_string(),
180                serde_json::Value::String(session_id.to_string()),
181            );
182        }
183        notification
184    }
185
186    pub fn new() -> Self {
187        Self {
188            sessions: Arc::new(RwLock::new(HashMap::new())),
189            processes: Arc::new(RwLock::new(HashMap::new())),
190            notification_channels: Arc::new(RwLock::new(HashMap::new())),
191            history: Arc::new(RwLock::new(HashMap::new())),
192        }
193    }
194
195    /// List all session records.
196    pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
197        let sessions = self.sessions.read().await;
198        sessions.values().cloned().collect()
199    }
200
201    /// Get a session record by ID.
202    pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
203        let sessions = self.sessions.read().await;
204        sessions.get(session_id).cloned()
205    }
206
207    /// Rename a session.
208    /// Returns `Some(())` if the session was found and renamed, `None` if not found.
209    pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
210        let mut sessions = self.sessions.write().await;
211        let session = sessions.get_mut(session_id)?;
212        session.name = Some(name.to_string());
213        Some(())
214    }
215
216    /// Attach a ROUTA agent ID to an existing session record.
217    /// Returns `Some(())` if the session was found, `None` if not found.
218    pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
219        let mut sessions = self.sessions.write().await;
220        let session = sessions.get_mut(session_id)?;
221        session.routa_agent_id = Some(routa_agent_id.to_string());
222        Some(())
223    }
224
225    /// Delete a session.
226    /// Returns `Some(())` if the session was found and deleted, `None` if not found.
227    pub async fn delete_session(&self, session_id: &str) -> Option<()> {
228        let mut sessions = self.sessions.write().await;
229        let mut processes = self.processes.write().await;
230        let mut channels = self.notification_channels.write().await;
231        let mut history = self.history.write().await;
232
233        // Remove session record
234        sessions.remove(session_id)?;
235
236        // Kill the process if it exists
237        if let Some(managed) = processes.remove(session_id) {
238            let _ = managed.process.kill().await;
239        }
240
241        // Remove notification channel
242        channels.remove(session_id);
243
244        // Remove history
245        history.remove(session_id);
246
247        Some(())
248    }
249
250    /// Get session message history.
251    /// Returns `Some(history)` if the session exists, `None` if not found.
252    pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
253        let history = self.history.read().await;
254        history.get(session_id).cloned()
255    }
256
257    /// Add a notification to session history.
258    /// Child agent notifications (those with `childAgentId`) are NOT stored in the
259    /// parent session's history — they would flood out the ROUTA coordinator's own
260    /// messages. Child messages are persisted in their own child session's history.
261    pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
262        // Skip child agent notifications to prevent flooding parent history
263        if notification.get("childAgentId").is_some() {
264            return;
265        }
266        let mut history = self.history.write().await;
267        let entries = history.entry(session_id.to_string()).or_default();
268        entries.push(notification);
269        // Cap at 500 entries (same limit as Next.js backend)
270        if entries.len() > 500 {
271            let drain_count = entries.len() - 500;
272            entries.drain(0..drain_count);
273        }
274    }
275
276    /// Broadcast a synthetic session/update event and persist it into in-memory history.
277    pub async fn emit_session_update(
278        &self,
279        session_id: &str,
280        update: serde_json::Value,
281    ) -> Result<(), String> {
282        let message = serde_json::json!({
283            "jsonrpc": "2.0",
284            "method": "session/update",
285            "params": {
286                "sessionId": session_id,
287                "update": update,
288            }
289        });
290
291        if let Some(channel) = self
292            .notification_channels
293            .read()
294            .await
295            .get(session_id)
296            .cloned()
297        {
298            let _ = channel.send(message.clone());
299        } else {
300            let params = message
301                .get("params")
302                .cloned()
303                .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
304            self.push_to_history(
305                session_id,
306                Self::rewrite_notification_session_id(session_id, params),
307            )
308            .await;
309        }
310        Ok(())
311    }
312
313    /// Mark a session as having had its first prompt dispatched.
314    pub async fn mark_first_prompt_sent(&self, session_id: &str) {
315        let mut sessions = self.sessions.write().await;
316        if let Some(session) = sessions.get_mut(session_id) {
317            session.first_prompt_sent = true;
318        }
319    }
320
321    /// Create a new ACP session: spawn agent process, initialize, create session.
322    /// Supports both static presets and registry-based agents.
323    /// **Claude** uses stream-json protocol instead of ACP.
324    ///
325    /// Returns `(our_session_id, agent_session_id)`.
326    #[allow(clippy::too_many_arguments)]
327    pub async fn create_session(
328        &self,
329        session_id: String,
330        cwd: String,
331        workspace_id: String,
332        provider: Option<String>,
333        role: Option<String>,
334        model: Option<String>,
335        parent_session_id: Option<String>,
336        tool_mode: Option<String>,
337        mcp_profile: Option<String>,
338    ) -> Result<(String, String), String> {
339        self.create_session_with_options(
340            session_id,
341            cwd,
342            workspace_id,
343            provider,
344            role,
345            model,
346            parent_session_id,
347            tool_mode,
348            mcp_profile,
349            SessionLaunchOptions::default(),
350        )
351        .await
352    }
353
354    /// Resume a persisted ACP session using the provider's native session/load path.
355    ///
356    /// Returns `(our_session_id, agent_session_id)`.
357    #[allow(clippy::too_many_arguments)]
358    pub async fn load_session_with_options(
359        &self,
360        session_id: String,
361        cwd: String,
362        workspace_id: String,
363        provider: Option<String>,
364        role: Option<String>,
365        model: Option<String>,
366        parent_session_id: Option<String>,
367        tool_mode: Option<String>,
368        mcp_profile: Option<String>,
369        provider_session_id: Option<String>,
370        options: SessionLaunchOptions,
371    ) -> Result<(String, String), String> {
372        validate_session_cwd(&cwd)?;
373        let provider_name = provider.as_deref().unwrap_or("opencode");
374        let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
375            options.acp_mcp_servers.clone().unwrap_or_else(|| {
376                mcp_setup::build_acp_http_mcp_servers(
377                    &workspace_id,
378                    &session_id,
379                    tool_mode.as_deref(),
380                    mcp_profile.as_deref(),
381                )
382            })
383        } else {
384            Vec::new()
385        };
386
387        if provider_name == "claude" {
388            return Err("Native session/load is not supported for Claude".to_string());
389        }
390
391        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
392        let preset = get_preset_by_id_with_registry(provider_name).await?;
393
394        if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
395            provider_name,
396            &cwd,
397            &workspace_id,
398            &session_id,
399            tool_mode.as_deref(),
400            mcp_profile.as_deref(),
401        )
402        .await?
403        {
404            tracing::info!("[AcpManager] {}", summary);
405        }
406
407        let mut extra_args: Vec<String> = preset.args.clone();
408        if matches!(provider_name, "codex" | "codex-acp") {
409            for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
410                extra_args.push("-c".to_string());
411                extra_args.push(override_arg);
412            }
413        }
414        if let Some(provider_args) = options.provider_args.clone() {
415            extra_args.extend(provider_args);
416        }
417        if let Some(ref m) = model {
418            if !m.is_empty() {
419                extra_args.push("-m".to_string());
420                extra_args.push(m.clone());
421            }
422        }
423
424        let preset_command = resolve_preset_command(&preset);
425        let process = AcpProcess::spawn(
426            &preset_command,
427            &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
428            &cwd,
429            ntx.clone(),
430            &preset.name,
431            &session_id,
432        )
433        .await?;
434
435        process
436            .initialize_with_timeout(options.initialize_timeout_ms)
437            .await?;
438
439        let resolved_provider_session_id =
440            provider_session_id.unwrap_or_else(|| session_id.clone());
441        let acp_session_id = process
442            .load_session(&resolved_provider_session_id, &cwd, &acp_mcp_servers)
443            .await?;
444
445        self.register_managed_session(
446            session_id.clone(),
447            cwd.clone(),
448            workspace_id.clone(),
449            provider_name.to_string(),
450            role.clone(),
451            model.clone(),
452            parent_session_id.clone(),
453            &options,
454            AgentProcessType::Acp(Arc::new(process)),
455            acp_session_id.clone(),
456            ntx.clone(),
457        )
458        .await;
459
460        tracing::info!(
461            "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
462            session_id,
463            provider_name,
464            acp_session_id,
465        );
466
467        Ok((session_id, acp_session_id))
468    }
469
470    #[allow(clippy::too_many_arguments)]
471    pub async fn load_session(
472        &self,
473        session_id: String,
474        cwd: String,
475        workspace_id: String,
476        provider: Option<String>,
477        role: Option<String>,
478        model: Option<String>,
479        parent_session_id: Option<String>,
480        tool_mode: Option<String>,
481        mcp_profile: Option<String>,
482        provider_session_id: Option<String>,
483    ) -> Result<(String, String), String> {
484        self.load_session_with_options(
485            session_id,
486            cwd,
487            workspace_id,
488            provider,
489            role,
490            model,
491            parent_session_id,
492            tool_mode,
493            mcp_profile,
494            provider_session_id,
495            SessionLaunchOptions::default(),
496        )
497        .await
498    }
499
500    fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
501        let history_manager = self.clone();
502        let history_session_id = session_id.to_string();
503        let mut history_rx = ntx.subscribe();
504        tokio::spawn(async move {
505            loop {
506                match history_rx.recv().await {
507                    Ok(message) => {
508                        let params = match message.get("params") {
509                            Some(value) => value.clone(),
510                            None => continue,
511                        };
512                        history_manager
513                            .push_to_history(
514                                &history_session_id,
515                                Self::rewrite_notification_session_id(&history_session_id, params),
516                            )
517                            .await;
518                    }
519                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
520                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
521                        tracing::warn!(
522                            "[AcpManager] Dropped {} session/update notifications for {}",
523                            skipped,
524                            history_session_id
525                        );
526                    }
527                }
528            }
529        });
530    }
531
532    #[allow(clippy::too_many_arguments)]
533    async fn register_managed_session(
534        &self,
535        session_id: String,
536        cwd: String,
537        workspace_id: String,
538        provider_name: String,
539        role: Option<String>,
540        model: Option<String>,
541        parent_session_id: Option<String>,
542        options: &SessionLaunchOptions,
543        process_type: AgentProcessType,
544        acp_session_id: String,
545        ntx: broadcast::Sender<serde_json::Value>,
546    ) {
547        let created_at = chrono::Utc::now().to_rfc3339();
548        let trace_writer = TraceWriter::new(&cwd);
549        let record = AcpSessionRecord {
550            session_id: session_id.clone(),
551            name: None,
552            cwd: cwd.clone(),
553            workspace_id: workspace_id.clone(),
554            routa_agent_id: None,
555            provider: Some(provider_name.clone()),
556            role: role.clone().or(Some("CRAFTER".to_string())),
557            mode_id: None,
558            model: model.clone(),
559            created_at: created_at.clone(),
560            first_prompt_sent: false,
561            parent_session_id: parent_session_id.clone(),
562            specialist_id: options.specialist_id.clone(),
563            specialist_system_prompt: options.specialist_system_prompt.clone(),
564        };
565
566        self.sessions
567            .write()
568            .await
569            .insert(session_id.clone(), record);
570        self.processes.write().await.insert(
571            session_id.clone(),
572            ManagedProcess {
573                process: process_type,
574                acp_session_id: acp_session_id.clone(),
575                preset_id: provider_name.clone(),
576                created_at,
577                trace_writer: trace_writer.clone(),
578                cwd: cwd.clone(),
579            },
580        );
581        self.notification_channels
582            .write()
583            .await
584            .insert(session_id.clone(), ntx.clone());
585        self.spawn_history_mirror(&session_id, &ntx);
586
587        let trace = TraceRecord::new(
588            &session_id,
589            TraceEventType::SessionStart,
590            Contributor::new(&provider_name, None),
591        )
592        .with_workspace_id(&workspace_id)
593        .with_metadata(
594            "role",
595            serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
596        )
597        .with_metadata("cwd", serde_json::json!(cwd));
598
599        trace_writer.append_safe(&trace).await;
600    }
601
602    #[allow(clippy::too_many_arguments)]
603    pub async fn create_session_from_inline(
604        &self,
605        session_id: String,
606        cwd: String,
607        workspace_id: String,
608        provider_name: String,
609        role: Option<String>,
610        model: Option<String>,
611        parent_session_id: Option<String>,
612        command: String,
613        args: Vec<String>,
614        options: SessionLaunchOptions,
615    ) -> Result<(String, String), String> {
616        validate_session_cwd(&cwd)?;
617        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
618
619        let process = AcpProcess::spawn(
620            &command,
621            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
622            &cwd,
623            ntx.clone(),
624            &provider_name,
625            &session_id,
626        )
627        .await?;
628
629        process
630            .initialize_with_timeout(options.initialize_timeout_ms)
631            .await?;
632
633        let acp_session_id = process
634            .new_session(&cwd, options.acp_mcp_servers.as_deref().unwrap_or(&[]))
635            .await?;
636        self.register_managed_session(
637            session_id.clone(),
638            cwd.clone(),
639            workspace_id.clone(),
640            provider_name.clone(),
641            role.clone(),
642            model.clone(),
643            parent_session_id.clone(),
644            &options,
645            AgentProcessType::Acp(Arc::new(process)),
646            acp_session_id.clone(),
647            ntx.clone(),
648        )
649        .await;
650
651        tracing::info!(
652            "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
653            session_id,
654            provider_name,
655            acp_session_id,
656        );
657
658        Ok((session_id, acp_session_id))
659    }
660
661    #[allow(clippy::too_many_arguments)]
662    pub async fn load_session_from_inline(
663        &self,
664        session_id: String,
665        cwd: String,
666        workspace_id: String,
667        provider_name: String,
668        role: Option<String>,
669        model: Option<String>,
670        parent_session_id: Option<String>,
671        command: String,
672        args: Vec<String>,
673        provider_session_id: Option<String>,
674        options: SessionLaunchOptions,
675    ) -> Result<(String, String), String> {
676        validate_session_cwd(&cwd)?;
677        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
678
679        let process = AcpProcess::spawn(
680            &command,
681            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
682            &cwd,
683            ntx.clone(),
684            &provider_name,
685            &session_id,
686        )
687        .await?;
688
689        process
690            .initialize_with_timeout(options.initialize_timeout_ms)
691            .await?;
692
693        let resolved_provider_session_id =
694            provider_session_id.unwrap_or_else(|| session_id.clone());
695        let acp_session_id = process
696            .load_session(
697                &resolved_provider_session_id,
698                &cwd,
699                options.acp_mcp_servers.as_deref().unwrap_or(&[]),
700            )
701            .await?;
702
703        self.register_managed_session(
704            session_id.clone(),
705            cwd.clone(),
706            workspace_id.clone(),
707            provider_name.clone(),
708            role.clone(),
709            model.clone(),
710            parent_session_id.clone(),
711            &options,
712            AgentProcessType::Acp(Arc::new(process)),
713            acp_session_id.clone(),
714            ntx.clone(),
715        )
716        .await;
717
718        tracing::info!(
719            "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
720            session_id,
721            provider_name,
722            acp_session_id,
723        );
724
725        Ok((session_id, acp_session_id))
726    }
727
728    #[allow(clippy::too_many_arguments)]
729    pub async fn create_session_with_options(
730        &self,
731        session_id: String,
732        cwd: String,
733        workspace_id: String,
734        provider: Option<String>,
735        role: Option<String>,
736        model: Option<String>,
737        parent_session_id: Option<String>,
738        tool_mode: Option<String>,
739        mcp_profile: Option<String>,
740        options: SessionLaunchOptions,
741    ) -> Result<(String, String), String> {
742        validate_session_cwd(&cwd)?;
743        let provider_name = provider.as_deref().unwrap_or("opencode");
744        let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
745            options.acp_mcp_servers.clone().unwrap_or_else(|| {
746                mcp_setup::build_acp_http_mcp_servers(
747                    &workspace_id,
748                    &session_id,
749                    tool_mode.as_deref(),
750                    mcp_profile.as_deref(),
751                )
752            })
753        } else {
754            Vec::new()
755        };
756
757        // Create the notification broadcast channel for this session
758        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
759        let claude_mcp_config = if provider_name == "claude" {
760            Some(mcp_setup::build_claude_mcp_config(
761                &workspace_id,
762                &session_id,
763                tool_mode.as_deref(),
764                mcp_profile.as_deref(),
765            ))
766        } else {
767            None
768        };
769
770        // Check if this is Claude (uses stream-json protocol, not ACP)
771        let (process_type, acp_session_id) = if provider_name == "claude" {
772            // Use Claude Code stream-json protocol
773            let config = ClaudeCodeConfig {
774                command: "claude".to_string(),
775                cwd: cwd.clone(),
776                display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
777                permission_mode: Some("bypassPermissions".to_string()),
778                mcp_configs: claude_mcp_config.into_iter().collect(),
779                append_system_prompt: options.specialist_system_prompt.clone(),
780                allowed_tools: options.allowed_native_tools.clone(),
781            };
782
783            let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
784            let claude_session_id = claude_process
785                .session_id()
786                .await
787                .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
788
789            (
790                AgentProcessType::Claude(Arc::new(claude_process)),
791                claude_session_id,
792            )
793        } else {
794            // Use standard ACP protocol
795            let preset = get_preset_by_id_with_registry(provider_name).await?;
796
797            if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
798                provider_name,
799                &cwd,
800                &workspace_id,
801                &session_id,
802                tool_mode.as_deref(),
803                mcp_profile.as_deref(),
804            )
805            .await?
806            {
807                tracing::info!("[AcpManager] {}", summary);
808            }
809
810            // Build args: preset args + optional model flag
811            let mut extra_args: Vec<String> = preset.args.clone();
812            if matches!(provider_name, "codex" | "codex-acp") {
813                for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
814                    extra_args.push("-c".to_string());
815                    extra_args.push(override_arg);
816                }
817            }
818            if let Some(provider_args) = options.provider_args.clone() {
819                extra_args.extend(provider_args);
820            }
821            if let Some(ref m) = model {
822                if !m.is_empty() {
823                    // opencode (and future providers) accept -m <model>
824                    extra_args.push("-m".to_string());
825                    extra_args.push(m.clone());
826                }
827            }
828
829            let preset_command = resolve_preset_command(&preset);
830            let process = AcpProcess::spawn(
831                &preset_command,
832                &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
833                &cwd,
834                ntx.clone(),
835                &preset.name,
836                &session_id,
837            )
838            .await?;
839
840            // Initialize the protocol
841            process
842                .initialize_with_timeout(options.initialize_timeout_ms)
843                .await?;
844
845            // Create the agent session
846            let agent_session_id = process.new_session(&cwd, &acp_mcp_servers).await?;
847
848            (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
849        };
850
851        self.register_managed_session(
852            session_id.clone(),
853            cwd.clone(),
854            workspace_id.clone(),
855            provider_name.to_string(),
856            role.clone(),
857            model.clone(),
858            parent_session_id.clone(),
859            &options,
860            process_type,
861            acp_session_id.clone(),
862            ntx.clone(),
863        )
864        .await;
865
866        tracing::info!(
867            "[AcpManager] Session {} created (provider: {}, agent session: {})",
868            session_id,
869            provider_name,
870            acp_session_id,
871        );
872
873        Ok((session_id, acp_session_id))
874    }
875
876    /// Send a prompt to an existing session's agent process.
877    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
878        self.mark_first_prompt_sent(session_id).await;
879
880        let (process, acp_session_id, preset_id, trace_writer) = {
881            let processes = self.processes.read().await;
882            let managed = processes
883                .get(session_id)
884                .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
885            (
886                managed.process.clone(),
887                managed.acp_session_id.clone(),
888                managed.preset_id.clone(),
889                managed.trace_writer.clone(),
890            )
891        };
892
893        let is_alive = match &process {
894            AgentProcessType::Acp(p) => p.is_alive(),
895            AgentProcessType::Claude(p) => p.is_alive(),
896        };
897
898        if !is_alive {
899            return Err(format!("Agent ({}) process is not running", preset_id));
900        }
901
902        // Record UserMessage trace
903        let trace = TraceRecord::new(
904            session_id,
905            TraceEventType::UserMessage,
906            Contributor::new(&preset_id, None),
907        )
908        .with_conversation(TraceConversation {
909            turn: None,
910            role: Some("user".to_string()),
911            content_preview: Some(truncate_content(text, 500)),
912            full_content: None,
913        });
914
915        trace_writer.append_safe(&trace).await;
916
917        tracing::info!(
918            target: "routa_acp_prompt",
919            session_id = %session_id,
920            preset_id = %preset_id,
921            acp_session_id = %acp_session_id,
922            prompt_len = text.len(),
923            "acp prompt start"
924        );
925
926        let result = match &process {
927            AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
928            AgentProcessType::Claude(p) => {
929                let stop_reason = p.prompt(text).await?;
930                Ok(serde_json::json!({ "stopReason": stop_reason }))
931            }
932        };
933
934        match &result {
935            Ok(_) => tracing::info!(
936                target: "routa_acp_prompt",
937                session_id = %session_id,
938                preset_id = %preset_id,
939                "acp prompt success"
940            ),
941            Err(error) => tracing::error!(
942                target: "routa_acp_prompt",
943                session_id = %session_id,
944                preset_id = %preset_id,
945                error = %error,
946                "acp prompt failed"
947            ),
948        }
949
950        result
951    }
952
953    /// Cancel the current prompt in a session.
954    pub async fn cancel(&self, session_id: &str) {
955        let processes = self.processes.read().await;
956        if let Some(managed) = processes.get(session_id) {
957            match &managed.process {
958                AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
959                AgentProcessType::Claude(p) => p.cancel().await,
960            }
961        }
962    }
963
964    /// Kill a session's agent process and remove it.
965    pub async fn kill_session(&self, session_id: &str) {
966        // Kill the process
967        if let Some(managed) = self.processes.write().await.remove(session_id) {
968            // Record SessionEnd trace before killing
969            let trace = TraceRecord::new(
970                session_id,
971                TraceEventType::SessionEnd,
972                Contributor::new(&managed.preset_id, None),
973            );
974            managed.trace_writer.append_safe(&trace).await;
975
976            match &managed.process {
977                AgentProcessType::Acp(p) => p.kill().await,
978                AgentProcessType::Claude(p) => p.kill().await,
979            }
980        }
981        // Remove session record
982        self.sessions.write().await.remove(session_id);
983        // Remove notification channel
984        self.notification_channels.write().await.remove(session_id);
985    }
986
987    /// Subscribe to SSE notifications for a session.
988    /// Returns a broadcast receiver that yields `session/update` JSON-RPC messages.
989    pub async fn subscribe(
990        &self,
991        session_id: &str,
992    ) -> Option<broadcast::Receiver<serde_json::Value>> {
993        let channels = self.notification_channels.read().await;
994        channels.get(session_id).map(|tx| tx.subscribe())
995    }
996
997    /// Check if a session's agent process is alive.
998    pub async fn is_alive(&self, session_id: &str) -> bool {
999        let processes = self.processes.read().await;
1000        processes
1001            .get(session_id)
1002            .map(|m| match &m.process {
1003                AgentProcessType::Acp(p) => p.is_alive(),
1004                AgentProcessType::Claude(p) => p.is_alive(),
1005            })
1006            .unwrap_or(false)
1007    }
1008
1009    /// Get the managed ACP session id for a live session.
1010    pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
1011        let processes = self.processes.read().await;
1012        processes
1013            .get(session_id)
1014            .map(|managed| managed.acp_session_id.clone())
1015    }
1016
1017    /// Get the preset ID for a session.
1018    pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
1019        let processes = self.processes.read().await;
1020        processes.get(session_id).map(|m| m.preset_id.clone())
1021    }
1022
1023    /// Check if a session uses Claude (stream-json protocol, not ACP).
1024    pub async fn is_claude_session(&self, session_id: &str) -> bool {
1025        let processes = self.processes.read().await;
1026        processes
1027            .get(session_id)
1028            .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
1029            .unwrap_or(false)
1030    }
1031
1032    /// Send a prompt to Claude session and return immediately.
1033    /// The actual response is streamed via the broadcast channel.
1034    /// Use `subscribe()` to receive notifications.
1035    pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
1036        let processes = self.processes.read().await;
1037        let managed = processes
1038            .get(session_id)
1039            .ok_or_else(|| format!("No agent process for session: {}", session_id))?;
1040
1041        // Record trace
1042        let trace = TraceRecord::new(
1043            session_id,
1044            TraceEventType::UserMessage,
1045            Contributor::new(&managed.preset_id, None),
1046        )
1047        .with_conversation(TraceConversation {
1048            turn: None,
1049            role: Some("user".to_string()),
1050            content_preview: Some(truncate_content(text, 500)),
1051            full_content: Some(text.to_string()),
1052        });
1053
1054        managed.trace_writer.append_safe(&trace).await;
1055
1056        match &managed.process {
1057            AgentProcessType::Claude(p) => {
1058                // Spawn the prompt in a background task so we can return immediately
1059                let process = Arc::clone(p);
1060                let text = text.to_string();
1061                tokio::spawn(async move {
1062                    let _ = process.prompt(&text).await;
1063                });
1064                Ok(())
1065            }
1066            AgentProcessType::Acp(_) => {
1067                Err("prompt_claude_async is only for Claude sessions".to_string())
1068            }
1069        }
1070    }
1071}
1072
1073// ─── ACP Presets ────────────────────────────────────────────────────────
1074
1075/// Resume/continuation capability metadata for a provider.
1076#[derive(Debug, Clone, Serialize, Deserialize)]
1077#[serde(rename_all = "camelCase")]
1078pub struct ResumeCapability {
1079    pub supported: bool,
1080    /// "native" | "replay" | "both"
1081    pub mode: String,
1082    #[serde(default)]
1083    #[serde(skip_serializing_if = "Option::is_none")]
1084    pub supports_fork: Option<bool>,
1085    #[serde(default)]
1086    #[serde(skip_serializing_if = "Option::is_none")]
1087    pub supports_list: Option<bool>,
1088}
1089
1090/// ACP provider presets for known coding agents.
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092pub struct AcpPreset {
1093    /// Unique identifier (lowercase, e.g., "claude", "opencode")
1094    pub id: String,
1095    /// Human-readable display name (e.g., "Claude Code", "OpenCode")
1096    pub name: String,
1097    pub command: String,
1098    pub args: Vec<String>,
1099    pub description: String,
1100    #[serde(default)]
1101    #[serde(skip_serializing_if = "Option::is_none")]
1102    pub env_bin_override: Option<String>,
1103    /// Resume/continuation capabilities for this provider.
1104    #[serde(default)]
1105    #[serde(skip_serializing_if = "Option::is_none")]
1106    pub resume: Option<ResumeCapability>,
1107}
1108
1109/// Get the list of known ACP agent presets (static/builtin only).
1110pub fn get_presets() -> Vec<AcpPreset> {
1111    vec![
1112        AcpPreset {
1113            id: "opencode".to_string(),
1114            name: "OpenCode".to_string(),
1115            command: "opencode".to_string(),
1116            args: vec!["acp".to_string()],
1117            description: "OpenCode AI coding agent".to_string(),
1118            env_bin_override: Some("OPENCODE_BIN".to_string()),
1119            resume: Some(ResumeCapability {
1120                supported: true,
1121                mode: "replay".to_string(),
1122                supports_fork: None,
1123                supports_list: None,
1124            }),
1125        },
1126        AcpPreset {
1127            id: "gemini".to_string(),
1128            name: "Gemini".to_string(),
1129            command: "gemini".to_string(),
1130            args: vec!["--experimental-acp".to_string()],
1131            description: "Google Gemini CLI".to_string(),
1132            env_bin_override: None,
1133            resume: None,
1134        },
1135        AcpPreset {
1136            id: "codex-acp".to_string(),
1137            name: "Codex".to_string(),
1138            command: "codex-acp".to_string(),
1139            args: vec![],
1140            description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1141            env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1142            resume: Some(ResumeCapability {
1143                supported: true,
1144                mode: "both".to_string(),
1145                supports_fork: None,
1146                supports_list: Some(true),
1147            }),
1148        },
1149        AcpPreset {
1150            id: "copilot".to_string(),
1151            name: "GitHub Copilot".to_string(),
1152            command: "copilot".to_string(),
1153            args: vec![
1154                "--acp".to_string(),
1155                "--allow-all-tools".to_string(),
1156                "--no-ask-user".to_string(),
1157            ],
1158            description: "GitHub Copilot CLI".to_string(),
1159            env_bin_override: Some("COPILOT_BIN".to_string()),
1160            resume: None,
1161        },
1162        AcpPreset {
1163            id: "auggie".to_string(),
1164            name: "Auggie".to_string(),
1165            command: "auggie".to_string(),
1166            args: vec!["--acp".to_string()],
1167            description: "Augment Code's AI agent".to_string(),
1168            env_bin_override: None,
1169            resume: None,
1170        },
1171        AcpPreset {
1172            id: "kimi".to_string(),
1173            name: "Kimi".to_string(),
1174            command: "kimi".to_string(),
1175            args: vec!["acp".to_string()],
1176            description: "Moonshot AI's Kimi CLI".to_string(),
1177            env_bin_override: None,
1178            resume: None,
1179        },
1180        AcpPreset {
1181            id: "kiro".to_string(),
1182            name: "Kiro".to_string(),
1183            command: "kiro-cli".to_string(),
1184            args: vec!["acp".to_string()],
1185            description: "Amazon Kiro AI coding agent".to_string(),
1186            env_bin_override: Some("KIRO_BIN".to_string()),
1187            resume: None,
1188        },
1189        AcpPreset {
1190            id: "qoder".to_string(),
1191            name: "Qoder".to_string(),
1192            command: "qodercli".to_string(),
1193            args: vec!["--acp".to_string()],
1194            description: "Qoder AI coding agent".to_string(),
1195            env_bin_override: Some("QODER_BIN".to_string()),
1196            resume: None,
1197        },
1198        AcpPreset {
1199            id: "claude".to_string(),
1200            name: "Claude Code".to_string(),
1201            command: "claude".to_string(),
1202            // Claude Code uses stream-json protocol, not ACP flags
1203            // Args are unused since we use ClaudeCodeProcess directly
1204            args: vec![],
1205            description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1206            env_bin_override: Some("CLAUDE_BIN".to_string()),
1207            resume: Some(ResumeCapability {
1208                supported: true,
1209                mode: "replay".to_string(),
1210                supports_fork: Some(true),
1211                supports_list: None,
1212            }),
1213        },
1214    ]
1215}
1216
1217/// Get a static preset by ID (synchronous, no registry lookup).
1218pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1219    let normalized_id = match id {
1220        "codex" => "codex-acp",
1221        "qodercli" => "qoder",
1222        other => other,
1223    };
1224    get_presets().into_iter().find(|p| p.id == normalized_id)
1225}
1226
1227/// Get the resume capability for a provider ID (synchronous).
1228pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1229    get_preset_by_id(provider).and_then(|p| p.resume)
1230}
1231
1232/// Get a preset by ID, checking both static presets and registry.
1233/// Static presets take precedence.
1234///
1235/// Supports suffixed IDs like "auggie-registry" to explicitly request
1236/// the registry version when both built-in and registry versions exist.
1237pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1238    let normalized_id = match id {
1239        "codex" => "codex-acp",
1240        "qodercli" => "qoder",
1241        other => other,
1242    };
1243
1244    // Handle suffixed IDs (e.g., "auggie-registry")
1245    // This allows explicit selection of registry version when both exist
1246    const REGISTRY_SUFFIX: &str = "-registry";
1247    if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1248        let mut preset = get_registry_preset(base_id).await?;
1249        // Keep the suffixed ID in the returned preset for consistency
1250        preset.id = id.to_string();
1251        return Ok(preset);
1252    }
1253
1254    // Check static presets first (match by id, not name)
1255    if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1256        if preset.id != id {
1257            preset.id = id.to_string();
1258        }
1259        return Ok(preset);
1260    }
1261
1262    // Fall back to registry
1263    let mut preset = get_registry_preset(normalized_id).await?;
1264    if preset.id != id {
1265        preset.id = id.to_string();
1266    }
1267    Ok(preset)
1268}
1269
1270/// Get a preset from the ACP registry by ID.
1271async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1272    let registry: AcpRegistry = fetch_registry().await?;
1273
1274    // Find the agent
1275    let agent = registry
1276        .agents
1277        .into_iter()
1278        .find(|a| a.id == id)
1279        .ok_or_else(|| format!("Agent '{}' not found in registry", id))?;
1280
1281    // Build command from distribution
1282    let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1283        let mut args = vec!["-y".to_string(), npx.package.clone()];
1284        args.extend(npx.args.clone());
1285        ("npx".to_string(), args)
1286    } else if let Some(ref uvx) = agent.distribution.uvx {
1287        let mut args = vec![uvx.package.clone()];
1288        args.extend(uvx.args.clone());
1289        ("uvx".to_string(), args)
1290    } else {
1291        return Err(format!(
1292            "Agent '{}' has no supported distribution (npx/uvx)",
1293            id
1294        ));
1295    };
1296
1297    Ok(AcpPreset {
1298        id: agent.id.clone(),
1299        name: agent.name,
1300        command,
1301        args,
1302        description: agent.description,
1303        env_bin_override: None,
1304        resume: None,
1305    })
1306}
1307
1308fn resolve_preset_command(preset: &AcpPreset) -> String {
1309    if let Some(env_var) = &preset.env_bin_override {
1310        if let Ok(custom_command) = std::env::var(env_var) {
1311            let trimmed = custom_command.trim();
1312            if !trimmed.is_empty() {
1313                return trimmed.to_string();
1314            }
1315        }
1316    }
1317
1318    crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1319}
1320
1321// ─── Utility Functions ─────────────────────────────────────────────────────
1322
1323/// Truncate content to a maximum length for storage in traces.
1324fn truncate_content(text: &str, max_len: usize) -> String {
1325    if text.chars().count() <= max_len {
1326        text.to_string()
1327    } else if max_len <= 3 {
1328        text.chars().take(max_len).collect()
1329    } else {
1330        let truncated: String = text.chars().take(max_len - 3).collect();
1331        format!("{truncated}...")
1332    }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use super::{
1338        get_preset_by_id_with_registry, get_presets, truncate_content, validate_session_cwd,
1339        AcpManager, AcpSessionRecord,
1340    };
1341    use std::collections::HashMap;
1342    use std::fs;
1343    use std::sync::Arc;
1344    use tokio::sync::RwLock;
1345
1346    #[test]
1347    fn static_presets_include_codex_acp_for_codex_alias() {
1348        let presets = get_presets();
1349        assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1350    }
1351
1352    #[test]
1353    fn static_presets_include_qoder() {
1354        let presets = get_presets();
1355        assert!(presets.iter().any(|preset| preset.id == "qoder"));
1356    }
1357
1358    #[tokio::test]
1359    async fn qodercli_alias_resolves_to_qoder_preset() {
1360        let preset = get_preset_by_id_with_registry("qodercli")
1361            .await
1362            .expect("qodercli alias should resolve");
1363        assert_eq!(preset.id, "qodercli");
1364        assert_eq!(preset.command, "qodercli");
1365        assert_eq!(preset.args, vec!["--acp".to_string()]);
1366    }
1367
1368    #[test]
1369    fn validate_session_cwd_rejects_missing_or_non_directory_paths() {
1370        let temp = tempfile::tempdir().expect("tempdir should create");
1371        let missing = temp.path().join("missing-dir");
1372        let file_path = temp.path().join("not-a-dir.txt");
1373        fs::write(&file_path, "content").expect("file should write");
1374
1375        let missing_error = validate_session_cwd(missing.to_string_lossy().as_ref())
1376            .expect_err("missing directory should fail");
1377        assert!(missing_error.contains("directory does not exist"));
1378
1379        let file_error = validate_session_cwd(file_path.to_string_lossy().as_ref())
1380            .expect_err("file path should fail");
1381        assert!(file_error.contains("path is not a directory"));
1382
1383        validate_session_cwd(temp.path().to_string_lossy().as_ref())
1384            .expect("existing directory should pass");
1385    }
1386
1387    #[tokio::test]
1388    async fn mark_first_prompt_sent_updates_live_session_record() {
1389        let manager = AcpManager::new();
1390        let session_id = "session-1".to_string();
1391        manager.sessions.write().await.insert(
1392            session_id.clone(),
1393            AcpSessionRecord {
1394                session_id: session_id.clone(),
1395                name: None,
1396                cwd: ".".to_string(),
1397                workspace_id: "default".to_string(),
1398                routa_agent_id: None,
1399                provider: Some("opencode".to_string()),
1400                role: Some("CRAFTER".to_string()),
1401                mode_id: None,
1402                model: None,
1403                created_at: chrono::Utc::now().to_rfc3339(),
1404                first_prompt_sent: false,
1405                parent_session_id: None,
1406                specialist_id: None,
1407                specialist_system_prompt: None,
1408            },
1409        );
1410
1411        manager.mark_first_prompt_sent(&session_id).await;
1412
1413        let session = manager.get_session(&session_id).await.expect("session");
1414        assert!(session.first_prompt_sent);
1415    }
1416
1417    #[tokio::test]
1418    async fn push_to_history_skips_parent_child_forwarding_noise() {
1419        let manager = AcpManager {
1420            sessions: Arc::new(RwLock::new(HashMap::new())),
1421            processes: Arc::new(RwLock::new(HashMap::new())),
1422            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1423            history: Arc::new(RwLock::new(HashMap::new())),
1424        };
1425
1426        manager
1427            .push_to_history(
1428                "parent",
1429                serde_json::json!({
1430                    "sessionId": "parent",
1431                    "childAgentId": "child-1",
1432                    "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1433                }),
1434            )
1435            .await;
1436
1437        let history = manager
1438            .get_session_history("parent")
1439            .await
1440            .unwrap_or_default();
1441        assert!(history.is_empty());
1442    }
1443
1444    #[tokio::test]
1445    async fn emit_session_update_broadcasts_when_channel_exists() {
1446        let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1447        let manager = AcpManager {
1448            sessions: Arc::new(RwLock::new(HashMap::new())),
1449            processes: Arc::new(RwLock::new(HashMap::new())),
1450            notification_channels: Arc::new(RwLock::new(HashMap::from([(
1451                "session-1".to_string(),
1452                tx,
1453            )]))),
1454            history: Arc::new(RwLock::new(HashMap::new())),
1455        };
1456
1457        manager
1458            .emit_session_update(
1459                "session-1",
1460                serde_json::json!({
1461                    "sessionUpdate": "turn_complete",
1462                    "stopReason": "cancelled"
1463                }),
1464            )
1465            .await
1466            .expect("emit should succeed");
1467
1468        let broadcast = rx.recv().await.expect("broadcast event");
1469        assert_eq!(
1470            broadcast["params"]["update"]["sessionUpdate"].as_str(),
1471            Some("turn_complete")
1472        );
1473        assert_eq!(
1474            broadcast["params"]["update"]["stopReason"].as_str(),
1475            Some("cancelled")
1476        );
1477    }
1478
1479    #[tokio::test]
1480    async fn emit_session_update_persists_history_without_channel() {
1481        let manager = AcpManager {
1482            sessions: Arc::new(RwLock::new(HashMap::new())),
1483            processes: Arc::new(RwLock::new(HashMap::new())),
1484            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1485            history: Arc::new(RwLock::new(HashMap::new())),
1486        };
1487
1488        manager
1489            .emit_session_update(
1490                "session-1",
1491                serde_json::json!({
1492                    "sessionUpdate": "turn_complete",
1493                    "stopReason": "cancelled"
1494                }),
1495            )
1496            .await
1497            .expect("emit should succeed");
1498
1499        let history = manager
1500            .get_session_history("session-1")
1501            .await
1502            .expect("history should exist");
1503        assert_eq!(history.len(), 1);
1504        assert_eq!(
1505            history[0]["update"]["sessionUpdate"].as_str(),
1506            Some("turn_complete")
1507        );
1508    }
1509
1510    #[test]
1511    fn rewrite_notification_session_id_overrides_provider_session_id() {
1512        let rewritten = AcpManager::rewrite_notification_session_id(
1513            "child-session",
1514            serde_json::json!({
1515                "sessionId": "provider-session",
1516                "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1517            }),
1518        );
1519
1520        assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1521    }
1522
1523    #[test]
1524    fn truncate_content_handles_unicode_boundaries() {
1525        assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1526        assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1527        assert_eq!(truncate_content("短文本", 10), "短文本");
1528    }
1529}