Skip to main content

routa_core/acp/
mod.rs

1//! ACP (Agent Client Protocol) integration.
2//!
3//! Manages ACP agent processes and provides JSON-RPC communication
4//! between the desktop client and coding agents (e.g. OpenCode, Claude, Copilot).
5//!
6//! Architecture (matches the Next.js `AcpProcessManager`):
7//!   - `session/new`    → spawns a child process, sends `initialize` + `session/new`
8//!   - `session/prompt` → reuses the live process, sends `session/prompt`
9//!   - `session/cancel` → sends cancellation notification
10//!   - SSE GET          → subscribes to `broadcast` channel for `session/update` events
11//!
12//! **Claude Code** uses a different protocol (stream-json) instead of ACP.
13//! The `ClaudeCodeProcess` translates Claude's output into ACP-compatible
14//! `session/update` notifications for frontend compatibility.
15//!
16//! **Agent Trace**: All sessions record trace events to JSONL files for
17//! attribution tracking (which model/session/tool affected which files and when).
18
19pub mod binary_manager;
20pub mod claude_code_process;
21pub mod docker;
22pub mod installation_state;
23pub mod mcp_setup;
24pub mod paths;
25pub mod process;
26pub mod provider_adapter;
27pub mod registry_fetch;
28pub mod registry_types;
29pub mod runtime_manager;
30pub mod terminal_manager;
31pub mod warmup;
32
33pub use binary_manager::AcpBinaryManager;
34pub use claude_code_process::{ClaudeCodeConfig, ClaudeCodeProcess};
35pub use installation_state::AcpInstallationState;
36pub use paths::AcpPaths;
37pub use registry_fetch::{fetch_registry, fetch_registry_json};
38pub use registry_types::*;
39pub use runtime_manager::{current_platform, AcpRuntimeManager, RuntimeInfo, RuntimeType};
40pub use warmup::{AcpWarmupService, WarmupState, WarmupStatus};
41
42use std::collections::HashMap;
43use std::path::Path;
44use std::sync::Arc;
45
46use serde::{Deserialize, Serialize};
47use tokio::sync::{broadcast, RwLock};
48
49use crate::trace::{Contributor, TraceConversation, TraceEventType, TraceRecord, TraceWriter};
50use process::AcpProcess;
51
52#[cfg(windows)]
53pub(crate) const CREATE_NO_WINDOW: u32 = 0x0800_0000;
54
55fn validate_session_cwd(cwd: &str) -> Result<(), String> {
56    let path = Path::new(cwd);
57    if !path.exists() {
58        return Err(format!(
59            "Invalid session cwd '{cwd}': directory does not exist"
60        ));
61    }
62    if !path.is_dir() {
63        return Err(format!(
64            "Invalid session cwd '{cwd}': path is not a directory"
65        ));
66    }
67    Ok(())
68}
69
70// ─── Session Record ─────────────────────────────────────────────────────
71
72/// Record of an active ACP session persisted for UI listing.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74#[serde(rename_all = "camelCase")]
75pub struct AcpSessionRecord {
76    pub session_id: String,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub name: Option<String>,
79    pub cwd: String,
80    pub workspace_id: String,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub routa_agent_id: Option<String>,
83    pub provider: Option<String>,
84    pub role: Option<String>,
85    pub mode_id: Option<String>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub model: Option<String>,
88    pub created_at: String,
89    #[serde(default)]
90    pub first_prompt_sent: bool,
91    /// Parent session ID for CRAFTER child sessions
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub parent_session_id: Option<String>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub specialist_id: Option<String>,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub specialist_system_prompt: Option<String>,
98}
99
100#[derive(Debug, Clone, Default)]
101pub struct SessionLaunchOptions {
102    pub specialist_id: Option<String>,
103    pub specialist_system_prompt: Option<String>,
104    pub allowed_native_tools: Option<Vec<String>>,
105    pub initialize_timeout_ms: Option<u64>,
106    pub provider_args: Option<Vec<String>>,
107    pub acp_mcp_servers: Option<Vec<serde_json::Value>>,
108}
109
110// ─── Managed Process ────────────────────────────────────────────────────
111
112/// Process type enum to support both ACP and Claude stream-json protocols.
113#[derive(Clone)]
114enum AgentProcessType {
115    /// Standard ACP protocol (opencode, gemini, copilot, etc.)
116    Acp(Arc<AcpProcess>),
117    /// Claude Code stream-json protocol
118    Claude(Arc<ClaudeCodeProcess>),
119}
120
121impl AgentProcessType {
122    /// Kill the underlying process.
123    async fn kill(&self) {
124        match self {
125            AgentProcessType::Acp(process) => process.kill().await,
126            AgentProcessType::Claude(process) => process.kill().await,
127        }
128    }
129}
130
131/// A managed agent process with its metadata.
132struct ManagedProcess {
133    process: AgentProcessType,
134    /// The agent's own session ID (returned by `session/new` or claude's session_id).
135    acp_session_id: String,
136    preset_id: String,
137    #[allow(dead_code)]
138    created_at: String,
139    /// Trace writer for recording agent activities to JSONL
140    trace_writer: TraceWriter,
141    /// Working directory (for contributor context)
142    #[allow(dead_code)]
143    cwd: String,
144}
145
146// ─── ACP Manager ────────────────────────────────────────────────────────
147
148/// Manages ACP agent sessions and process lifecycle.
149///
150/// Each session maps to a long-lived child process that communicates via
151/// stdio JSON-RPC. Notifications are forwarded to subscribers via broadcast.
152#[derive(Clone)]
153pub struct AcpManager {
154    /// Our sessionId → session record (for UI listing)
155    sessions: Arc<RwLock<HashMap<String, AcpSessionRecord>>>,
156    /// Our sessionId → managed process (the live agent)
157    processes: Arc<RwLock<HashMap<String, ManagedProcess>>>,
158    /// Our sessionId → broadcast sender for SSE notifications
159    notification_channels: Arc<RwLock<HashMap<String, broadcast::Sender<serde_json::Value>>>>,
160    /// Our sessionId → message history (session/update notifications)
161    history: Arc<RwLock<HashMap<String, Vec<serde_json::Value>>>>,
162}
163
164impl Default for AcpManager {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170impl AcpManager {
171    pub fn rewrite_notification_session_id(
172        session_id: &str,
173        mut notification: serde_json::Value,
174    ) -> serde_json::Value {
175        if let Some(object) = notification.as_object_mut() {
176            object.insert(
177                "sessionId".to_string(),
178                serde_json::Value::String(session_id.to_string()),
179            );
180        }
181        notification
182    }
183
184    pub fn new() -> Self {
185        Self {
186            sessions: Arc::new(RwLock::new(HashMap::new())),
187            processes: Arc::new(RwLock::new(HashMap::new())),
188            notification_channels: Arc::new(RwLock::new(HashMap::new())),
189            history: Arc::new(RwLock::new(HashMap::new())),
190        }
191    }
192
193    /// List all session records.
194    pub async fn list_sessions(&self) -> Vec<AcpSessionRecord> {
195        let sessions = self.sessions.read().await;
196        sessions.values().cloned().collect()
197    }
198
199    /// Get a session record by ID.
200    pub async fn get_session(&self, session_id: &str) -> Option<AcpSessionRecord> {
201        let sessions = self.sessions.read().await;
202        sessions.get(session_id).cloned()
203    }
204
205    /// Rename a session.
206    /// Returns `Some(())` if the session was found and renamed, `None` if not found.
207    pub async fn rename_session(&self, session_id: &str, name: &str) -> Option<()> {
208        let mut sessions = self.sessions.write().await;
209        let session = sessions.get_mut(session_id)?;
210        session.name = Some(name.to_string());
211        Some(())
212    }
213
214    /// Attach a ROUTA agent ID to an existing session record.
215    /// Returns `Some(())` if the session was found, `None` if not found.
216    pub async fn set_routa_agent_id(&self, session_id: &str, routa_agent_id: &str) -> Option<()> {
217        let mut sessions = self.sessions.write().await;
218        let session = sessions.get_mut(session_id)?;
219        session.routa_agent_id = Some(routa_agent_id.to_string());
220        Some(())
221    }
222
223    /// Delete a session.
224    /// Returns `Some(())` if the session was found and deleted, `None` if not found.
225    pub async fn delete_session(&self, session_id: &str) -> Option<()> {
226        let mut sessions = self.sessions.write().await;
227        let mut processes = self.processes.write().await;
228        let mut channels = self.notification_channels.write().await;
229        let mut history = self.history.write().await;
230
231        // Remove session record
232        sessions.remove(session_id)?;
233
234        // Kill the process if it exists
235        if let Some(managed) = processes.remove(session_id) {
236            let _ = managed.process.kill().await;
237        }
238
239        // Remove notification channel
240        channels.remove(session_id);
241
242        // Remove history
243        history.remove(session_id);
244
245        Some(())
246    }
247
248    /// Get session message history.
249    /// Returns `Some(history)` if the session exists, `None` if not found.
250    pub async fn get_session_history(&self, session_id: &str) -> Option<Vec<serde_json::Value>> {
251        let history = self.history.read().await;
252        history.get(session_id).cloned()
253    }
254
255    /// Add a notification to session history.
256    /// Child agent notifications (those with `childAgentId`) are NOT stored in the
257    /// parent session's history — they would flood out the ROUTA coordinator's own
258    /// messages. Child messages are persisted in their own child session's history.
259    pub async fn push_to_history(&self, session_id: &str, notification: serde_json::Value) {
260        // Skip child agent notifications to prevent flooding parent history
261        if notification.get("childAgentId").is_some() {
262            return;
263        }
264        let mut history = self.history.write().await;
265        let entries = history.entry(session_id.to_string()).or_default();
266        entries.push(notification);
267        // Cap at 500 entries (same limit as Next.js backend)
268        if entries.len() > 500 {
269            let drain_count = entries.len() - 500;
270            entries.drain(0..drain_count);
271        }
272    }
273
274    /// Broadcast a synthetic session/update event and persist it into in-memory history.
275    pub async fn emit_session_update(
276        &self,
277        session_id: &str,
278        update: serde_json::Value,
279    ) -> Result<(), String> {
280        let message = serde_json::json!({
281            "jsonrpc": "2.0",
282            "method": "session/update",
283            "params": {
284                "sessionId": session_id,
285                "update": update,
286            }
287        });
288
289        if let Some(channel) = self
290            .notification_channels
291            .read()
292            .await
293            .get(session_id)
294            .cloned()
295        {
296            let _ = channel.send(message.clone());
297        } else {
298            let params = message
299                .get("params")
300                .cloned()
301                .ok_or_else(|| "Missing params in synthetic session/update".to_string())?;
302            self.push_to_history(
303                session_id,
304                Self::rewrite_notification_session_id(session_id, params),
305            )
306            .await;
307        }
308        Ok(())
309    }
310
311    /// Mark a session as having had its first prompt dispatched.
312    pub async fn mark_first_prompt_sent(&self, session_id: &str) {
313        let mut sessions = self.sessions.write().await;
314        if let Some(session) = sessions.get_mut(session_id) {
315            session.first_prompt_sent = true;
316        }
317    }
318
319    /// Create a new ACP session: spawn agent process, initialize, create session.
320    /// Supports both static presets and registry-based agents.
321    /// **Claude** uses stream-json protocol instead of ACP.
322    ///
323    /// Returns `(our_session_id, agent_session_id)`.
324    #[allow(clippy::too_many_arguments)]
325    pub async fn create_session(
326        &self,
327        session_id: String,
328        cwd: String,
329        workspace_id: String,
330        provider: Option<String>,
331        role: Option<String>,
332        model: Option<String>,
333        parent_session_id: Option<String>,
334        tool_mode: Option<String>,
335        mcp_profile: Option<String>,
336    ) -> Result<(String, String), String> {
337        self.create_session_with_options(
338            session_id,
339            cwd,
340            workspace_id,
341            provider,
342            role,
343            model,
344            parent_session_id,
345            tool_mode,
346            mcp_profile,
347            SessionLaunchOptions::default(),
348        )
349        .await
350    }
351
352    /// Resume a persisted ACP session using the provider's native session/load path.
353    ///
354    /// Returns `(our_session_id, agent_session_id)`.
355    #[allow(clippy::too_many_arguments)]
356    pub async fn load_session_with_options(
357        &self,
358        session_id: String,
359        cwd: String,
360        workspace_id: String,
361        provider: Option<String>,
362        role: Option<String>,
363        model: Option<String>,
364        parent_session_id: Option<String>,
365        tool_mode: Option<String>,
366        mcp_profile: Option<String>,
367        provider_session_id: Option<String>,
368        options: SessionLaunchOptions,
369    ) -> Result<(String, String), String> {
370        validate_session_cwd(&cwd)?;
371        let provider_name = provider.as_deref().unwrap_or("opencode");
372        let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
373            options.acp_mcp_servers.clone().unwrap_or_else(|| {
374                mcp_setup::build_acp_http_mcp_servers(
375                    &workspace_id,
376                    &session_id,
377                    tool_mode.as_deref(),
378                    mcp_profile.as_deref(),
379                )
380            })
381        } else {
382            Vec::new()
383        };
384
385        if provider_name == "claude" {
386            return Err("Native session/load is not supported for Claude".to_string());
387        }
388
389        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
390        let preset = get_preset_by_id_with_registry(provider_name).await?;
391
392        if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
393            provider_name,
394            &cwd,
395            &workspace_id,
396            &session_id,
397            tool_mode.as_deref(),
398            mcp_profile.as_deref(),
399        )
400        .await?
401        {
402            tracing::info!("[AcpManager] {}", summary);
403        }
404
405        let mut extra_args: Vec<String> = preset.args.clone();
406        if matches!(provider_name, "codex" | "codex-acp") {
407            for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
408                extra_args.push("-c".to_string());
409                extra_args.push(override_arg);
410            }
411        }
412        if let Some(provider_args) = options.provider_args.clone() {
413            extra_args.extend(provider_args);
414        }
415        if let Some(ref m) = model {
416            if !m.is_empty() {
417                extra_args.push("-m".to_string());
418                extra_args.push(m.clone());
419            }
420        }
421
422        let preset_command = resolve_preset_command(&preset);
423        let process = AcpProcess::spawn(
424            &preset_command,
425            &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
426            &cwd,
427            ntx.clone(),
428            &preset.name,
429            &session_id,
430        )
431        .await?;
432
433        process
434            .initialize_with_timeout(options.initialize_timeout_ms)
435            .await?;
436
437        let resolved_provider_session_id =
438            provider_session_id.unwrap_or_else(|| session_id.clone());
439        let acp_session_id = process
440            .load_session(&resolved_provider_session_id, &cwd, &acp_mcp_servers)
441            .await?;
442
443        self.register_managed_session(
444            session_id.clone(),
445            cwd.clone(),
446            workspace_id.clone(),
447            provider_name.to_string(),
448            role.clone(),
449            model.clone(),
450            parent_session_id.clone(),
451            &options,
452            AgentProcessType::Acp(Arc::new(process)),
453            acp_session_id.clone(),
454            ntx.clone(),
455        )
456        .await;
457
458        tracing::info!(
459            "[AcpManager] Session {} loaded (provider: {}, agent session: {})",
460            session_id,
461            provider_name,
462            acp_session_id,
463        );
464
465        Ok((session_id, acp_session_id))
466    }
467
468    #[allow(clippy::too_many_arguments)]
469    pub async fn load_session(
470        &self,
471        session_id: String,
472        cwd: String,
473        workspace_id: String,
474        provider: Option<String>,
475        role: Option<String>,
476        model: Option<String>,
477        parent_session_id: Option<String>,
478        tool_mode: Option<String>,
479        mcp_profile: Option<String>,
480        provider_session_id: Option<String>,
481    ) -> Result<(String, String), String> {
482        self.load_session_with_options(
483            session_id,
484            cwd,
485            workspace_id,
486            provider,
487            role,
488            model,
489            parent_session_id,
490            tool_mode,
491            mcp_profile,
492            provider_session_id,
493            SessionLaunchOptions::default(),
494        )
495        .await
496    }
497
498    fn spawn_history_mirror(&self, session_id: &str, ntx: &broadcast::Sender<serde_json::Value>) {
499        let history_manager = self.clone();
500        let history_session_id = session_id.to_string();
501        let mut history_rx = ntx.subscribe();
502        tokio::spawn(async move {
503            loop {
504                match history_rx.recv().await {
505                    Ok(message) => {
506                        let params = match message.get("params") {
507                            Some(value) => value.clone(),
508                            None => continue,
509                        };
510                        history_manager
511                            .push_to_history(
512                                &history_session_id,
513                                Self::rewrite_notification_session_id(&history_session_id, params),
514                            )
515                            .await;
516                    }
517                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
518                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
519                        tracing::warn!(
520                            "[AcpManager] Dropped {} session/update notifications for {}",
521                            skipped,
522                            history_session_id
523                        );
524                    }
525                }
526            }
527        });
528    }
529
530    #[allow(clippy::too_many_arguments)]
531    async fn register_managed_session(
532        &self,
533        session_id: String,
534        cwd: String,
535        workspace_id: String,
536        provider_name: String,
537        role: Option<String>,
538        model: Option<String>,
539        parent_session_id: Option<String>,
540        options: &SessionLaunchOptions,
541        process_type: AgentProcessType,
542        acp_session_id: String,
543        ntx: broadcast::Sender<serde_json::Value>,
544    ) {
545        let created_at = chrono::Utc::now().to_rfc3339();
546        let trace_writer = TraceWriter::new(&cwd);
547        let record = AcpSessionRecord {
548            session_id: session_id.clone(),
549            name: None,
550            cwd: cwd.clone(),
551            workspace_id: workspace_id.clone(),
552            routa_agent_id: None,
553            provider: Some(provider_name.clone()),
554            role: role.clone().or(Some("CRAFTER".to_string())),
555            mode_id: None,
556            model: model.clone(),
557            created_at: created_at.clone(),
558            first_prompt_sent: false,
559            parent_session_id: parent_session_id.clone(),
560            specialist_id: options.specialist_id.clone(),
561            specialist_system_prompt: options.specialist_system_prompt.clone(),
562        };
563
564        self.sessions
565            .write()
566            .await
567            .insert(session_id.clone(), record);
568        self.processes.write().await.insert(
569            session_id.clone(),
570            ManagedProcess {
571                process: process_type,
572                acp_session_id: acp_session_id.clone(),
573                preset_id: provider_name.clone(),
574                created_at,
575                trace_writer: trace_writer.clone(),
576                cwd: cwd.clone(),
577            },
578        );
579        self.notification_channels
580            .write()
581            .await
582            .insert(session_id.clone(), ntx.clone());
583        self.spawn_history_mirror(&session_id, &ntx);
584
585        let trace = TraceRecord::new(
586            &session_id,
587            TraceEventType::SessionStart,
588            Contributor::new(&provider_name, None),
589        )
590        .with_workspace_id(&workspace_id)
591        .with_metadata(
592            "role",
593            serde_json::json!(role.as_deref().unwrap_or("CRAFTER")),
594        )
595        .with_metadata("cwd", serde_json::json!(cwd));
596
597        trace_writer.append_safe(&trace).await;
598    }
599
600    #[allow(clippy::too_many_arguments)]
601    pub async fn create_session_from_inline(
602        &self,
603        session_id: String,
604        cwd: String,
605        workspace_id: String,
606        provider_name: String,
607        role: Option<String>,
608        model: Option<String>,
609        parent_session_id: Option<String>,
610        command: String,
611        args: Vec<String>,
612        options: SessionLaunchOptions,
613    ) -> Result<(String, String), String> {
614        validate_session_cwd(&cwd)?;
615        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
616
617        let process = AcpProcess::spawn(
618            &command,
619            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
620            &cwd,
621            ntx.clone(),
622            &provider_name,
623            &session_id,
624        )
625        .await?;
626
627        process
628            .initialize_with_timeout(options.initialize_timeout_ms)
629            .await?;
630
631        let acp_session_id = process
632            .new_session(&cwd, options.acp_mcp_servers.as_deref().unwrap_or(&[]))
633            .await?;
634        self.register_managed_session(
635            session_id.clone(),
636            cwd.clone(),
637            workspace_id.clone(),
638            provider_name.clone(),
639            role.clone(),
640            model.clone(),
641            parent_session_id.clone(),
642            &options,
643            AgentProcessType::Acp(Arc::new(process)),
644            acp_session_id.clone(),
645            ntx.clone(),
646        )
647        .await;
648
649        tracing::info!(
650            "[AcpManager] Session {} created from inline command (provider: {}, agent session: {})",
651            session_id,
652            provider_name,
653            acp_session_id,
654        );
655
656        Ok((session_id, acp_session_id))
657    }
658
659    #[allow(clippy::too_many_arguments)]
660    pub async fn load_session_from_inline(
661        &self,
662        session_id: String,
663        cwd: String,
664        workspace_id: String,
665        provider_name: String,
666        role: Option<String>,
667        model: Option<String>,
668        parent_session_id: Option<String>,
669        command: String,
670        args: Vec<String>,
671        provider_session_id: Option<String>,
672        options: SessionLaunchOptions,
673    ) -> Result<(String, String), String> {
674        validate_session_cwd(&cwd)?;
675        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
676
677        let process = AcpProcess::spawn(
678            &command,
679            &args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
680            &cwd,
681            ntx.clone(),
682            &provider_name,
683            &session_id,
684        )
685        .await?;
686
687        process
688            .initialize_with_timeout(options.initialize_timeout_ms)
689            .await?;
690
691        let resolved_provider_session_id =
692            provider_session_id.unwrap_or_else(|| session_id.clone());
693        let acp_session_id = process
694            .load_session(
695                &resolved_provider_session_id,
696                &cwd,
697                options.acp_mcp_servers.as_deref().unwrap_or(&[]),
698            )
699            .await?;
700
701        self.register_managed_session(
702            session_id.clone(),
703            cwd.clone(),
704            workspace_id.clone(),
705            provider_name.clone(),
706            role.clone(),
707            model.clone(),
708            parent_session_id.clone(),
709            &options,
710            AgentProcessType::Acp(Arc::new(process)),
711            acp_session_id.clone(),
712            ntx.clone(),
713        )
714        .await;
715
716        tracing::info!(
717            "[AcpManager] Session {} loaded from inline command (provider: {}, agent session: {})",
718            session_id,
719            provider_name,
720            acp_session_id,
721        );
722
723        Ok((session_id, acp_session_id))
724    }
725
726    #[allow(clippy::too_many_arguments)]
727    pub async fn create_session_with_options(
728        &self,
729        session_id: String,
730        cwd: String,
731        workspace_id: String,
732        provider: Option<String>,
733        role: Option<String>,
734        model: Option<String>,
735        parent_session_id: Option<String>,
736        tool_mode: Option<String>,
737        mcp_profile: Option<String>,
738        options: SessionLaunchOptions,
739    ) -> Result<(String, String), String> {
740        validate_session_cwd(&cwd)?;
741        let provider_name = provider.as_deref().unwrap_or("opencode");
742        let acp_mcp_servers = if matches!(provider_name, "codex" | "codex-acp") {
743            options.acp_mcp_servers.clone().unwrap_or_else(|| {
744                mcp_setup::build_acp_http_mcp_servers(
745                    &workspace_id,
746                    &session_id,
747                    tool_mode.as_deref(),
748                    mcp_profile.as_deref(),
749                )
750            })
751        } else {
752            Vec::new()
753        };
754
755        // Create the notification broadcast channel for this session
756        let (ntx, _) = broadcast::channel::<serde_json::Value>(256);
757        let claude_mcp_config = if provider_name == "claude" {
758            Some(mcp_setup::build_claude_mcp_config(
759                &workspace_id,
760                &session_id,
761                tool_mode.as_deref(),
762                mcp_profile.as_deref(),
763            ))
764        } else {
765            None
766        };
767
768        // Check if this is Claude (uses stream-json protocol, not ACP)
769        let (process_type, acp_session_id) = if provider_name == "claude" {
770            // Use Claude Code stream-json protocol
771            let config = ClaudeCodeConfig {
772                command: "claude".to_string(),
773                cwd: cwd.clone(),
774                display_name: format!("Claude-{}", &session_id[..8.min(session_id.len())]),
775                permission_mode: Some("bypassPermissions".to_string()),
776                mcp_configs: claude_mcp_config.into_iter().collect(),
777                append_system_prompt: options.specialist_system_prompt.clone(),
778                allowed_tools: options.allowed_native_tools.clone(),
779            };
780
781            let claude_process = ClaudeCodeProcess::spawn(config, ntx.clone()).await?;
782            let claude_session_id = claude_process
783                .session_id()
784                .await
785                .unwrap_or_else(|| format!("claude-{}", &session_id[..8.min(session_id.len())]));
786
787            (
788                AgentProcessType::Claude(Arc::new(claude_process)),
789                claude_session_id,
790            )
791        } else {
792            // Use standard ACP protocol
793            let preset = get_preset_by_id_with_registry(provider_name).await?;
794
795            if let Some(summary) = mcp_setup::ensure_mcp_for_provider(
796                provider_name,
797                &cwd,
798                &workspace_id,
799                &session_id,
800                tool_mode.as_deref(),
801                mcp_profile.as_deref(),
802            )
803            .await?
804            {
805                tracing::info!("[AcpManager] {}", summary);
806            }
807
808            // Build args: preset args + optional model flag
809            let mut extra_args: Vec<String> = preset.args.clone();
810            if matches!(provider_name, "codex" | "codex-acp") {
811                for override_arg in mcp_setup::codex_cli_overrides(&cwd)? {
812                    extra_args.push("-c".to_string());
813                    extra_args.push(override_arg);
814                }
815            }
816            if let Some(provider_args) = options.provider_args.clone() {
817                extra_args.extend(provider_args);
818            }
819            if let Some(ref m) = model {
820                if !m.is_empty() {
821                    // opencode (and future providers) accept -m <model>
822                    extra_args.push("-m".to_string());
823                    extra_args.push(m.clone());
824                }
825            }
826
827            let preset_command = resolve_preset_command(&preset);
828            let process = AcpProcess::spawn(
829                &preset_command,
830                &extra_args.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
831                &cwd,
832                ntx.clone(),
833                &preset.name,
834                &session_id,
835            )
836            .await?;
837
838            // Initialize the protocol
839            process
840                .initialize_with_timeout(options.initialize_timeout_ms)
841                .await?;
842
843            // Create the agent session
844            let agent_session_id = process.new_session(&cwd, &acp_mcp_servers).await?;
845
846            (AgentProcessType::Acp(Arc::new(process)), agent_session_id)
847        };
848
849        self.register_managed_session(
850            session_id.clone(),
851            cwd.clone(),
852            workspace_id.clone(),
853            provider_name.to_string(),
854            role.clone(),
855            model.clone(),
856            parent_session_id.clone(),
857            &options,
858            process_type,
859            acp_session_id.clone(),
860            ntx.clone(),
861        )
862        .await;
863
864        tracing::info!(
865            "[AcpManager] Session {} created (provider: {}, agent session: {})",
866            session_id,
867            provider_name,
868            acp_session_id,
869        );
870
871        Ok((session_id, acp_session_id))
872    }
873
874    /// Send a prompt to an existing session's agent process.
875    pub async fn prompt(&self, session_id: &str, text: &str) -> Result<serde_json::Value, String> {
876        self.mark_first_prompt_sent(session_id).await;
877
878        let (process, acp_session_id, preset_id, trace_writer) = {
879            let processes = self.processes.read().await;
880            let managed = processes
881                .get(session_id)
882                .ok_or_else(|| format!("No agent process for session: {session_id}"))?;
883            (
884                managed.process.clone(),
885                managed.acp_session_id.clone(),
886                managed.preset_id.clone(),
887                managed.trace_writer.clone(),
888            )
889        };
890
891        let is_alive = match &process {
892            AgentProcessType::Acp(p) => p.is_alive(),
893            AgentProcessType::Claude(p) => p.is_alive(),
894        };
895
896        if !is_alive {
897            return Err(format!("Agent ({preset_id}) process is not running"));
898        }
899
900        // Record UserMessage trace
901        let trace = TraceRecord::new(
902            session_id,
903            TraceEventType::UserMessage,
904            Contributor::new(&preset_id, None),
905        )
906        .with_conversation(TraceConversation {
907            turn: None,
908            role: Some("user".to_string()),
909            content_preview: Some(truncate_content(text, 500)),
910            full_content: None,
911        });
912
913        trace_writer.append_safe(&trace).await;
914
915        tracing::info!(
916            target: "routa_acp_prompt",
917            session_id = %session_id,
918            preset_id = %preset_id,
919            acp_session_id = %acp_session_id,
920            prompt_len = text.len(),
921            "acp prompt start"
922        );
923
924        let result = match &process {
925            AgentProcessType::Acp(p) => p.prompt(&acp_session_id, text).await,
926            AgentProcessType::Claude(p) => {
927                let stop_reason = p.prompt(text).await?;
928                Ok(serde_json::json!({ "stopReason": stop_reason }))
929            }
930        };
931
932        match &result {
933            Ok(_) => tracing::info!(
934                target: "routa_acp_prompt",
935                session_id = %session_id,
936                preset_id = %preset_id,
937                "acp prompt success"
938            ),
939            Err(error) => tracing::error!(
940                target: "routa_acp_prompt",
941                session_id = %session_id,
942                preset_id = %preset_id,
943                error = %error,
944                "acp prompt failed"
945            ),
946        }
947
948        result
949    }
950
951    /// Cancel the current prompt in a session.
952    pub async fn cancel(&self, session_id: &str) {
953        let processes = self.processes.read().await;
954        if let Some(managed) = processes.get(session_id) {
955            match &managed.process {
956                AgentProcessType::Acp(p) => p.cancel(&managed.acp_session_id).await,
957                AgentProcessType::Claude(p) => p.cancel().await,
958            }
959        }
960    }
961
962    /// Kill a session's agent process and remove it.
963    pub async fn kill_session(&self, session_id: &str) {
964        // Kill the process
965        if let Some(managed) = self.processes.write().await.remove(session_id) {
966            // Record SessionEnd trace before killing
967            let trace = TraceRecord::new(
968                session_id,
969                TraceEventType::SessionEnd,
970                Contributor::new(&managed.preset_id, None),
971            );
972            managed.trace_writer.append_safe(&trace).await;
973
974            match &managed.process {
975                AgentProcessType::Acp(p) => p.kill().await,
976                AgentProcessType::Claude(p) => p.kill().await,
977            }
978        }
979        // Remove session record
980        self.sessions.write().await.remove(session_id);
981        // Remove notification channel
982        self.notification_channels.write().await.remove(session_id);
983    }
984
985    /// Subscribe to SSE notifications for a session.
986    /// Returns a broadcast receiver that yields `session/update` JSON-RPC messages.
987    pub async fn subscribe(
988        &self,
989        session_id: &str,
990    ) -> Option<broadcast::Receiver<serde_json::Value>> {
991        let channels = self.notification_channels.read().await;
992        channels.get(session_id).map(|tx| tx.subscribe())
993    }
994
995    /// Check if a session's agent process is alive.
996    pub async fn is_alive(&self, session_id: &str) -> bool {
997        let processes = self.processes.read().await;
998        processes
999            .get(session_id)
1000            .map(|m| match &m.process {
1001                AgentProcessType::Acp(p) => p.is_alive(),
1002                AgentProcessType::Claude(p) => p.is_alive(),
1003            })
1004            .unwrap_or(false)
1005    }
1006
1007    /// Get the managed ACP session id for a live session.
1008    pub async fn get_acp_session_id(&self, session_id: &str) -> Option<String> {
1009        let processes = self.processes.read().await;
1010        processes
1011            .get(session_id)
1012            .map(|managed| managed.acp_session_id.clone())
1013    }
1014
1015    /// Get the preset ID for a session.
1016    pub async fn get_preset_id(&self, session_id: &str) -> Option<String> {
1017        let processes = self.processes.read().await;
1018        processes.get(session_id).map(|m| m.preset_id.clone())
1019    }
1020
1021    /// Check if a session uses Claude (stream-json protocol, not ACP).
1022    pub async fn is_claude_session(&self, session_id: &str) -> bool {
1023        let processes = self.processes.read().await;
1024        processes
1025            .get(session_id)
1026            .map(|m| matches!(&m.process, AgentProcessType::Claude(_)))
1027            .unwrap_or(false)
1028    }
1029
1030    /// Send a prompt to Claude session and return immediately.
1031    /// The actual response is streamed via the broadcast channel.
1032    /// Use `subscribe()` to receive notifications.
1033    pub async fn prompt_claude_async(&self, session_id: &str, text: &str) -> Result<(), String> {
1034        let processes = self.processes.read().await;
1035        let managed = processes
1036            .get(session_id)
1037            .ok_or_else(|| format!("No agent process for session: {session_id}"))?;
1038
1039        // Record trace
1040        let trace = TraceRecord::new(
1041            session_id,
1042            TraceEventType::UserMessage,
1043            Contributor::new(&managed.preset_id, None),
1044        )
1045        .with_conversation(TraceConversation {
1046            turn: None,
1047            role: Some("user".to_string()),
1048            content_preview: Some(truncate_content(text, 500)),
1049            full_content: Some(text.to_string()),
1050        });
1051
1052        managed.trace_writer.append_safe(&trace).await;
1053
1054        match &managed.process {
1055            AgentProcessType::Claude(p) => {
1056                // Spawn the prompt in a background task so we can return immediately
1057                let process = Arc::clone(p);
1058                let text = text.to_string();
1059                tokio::spawn(async move {
1060                    let _ = process.prompt(&text).await;
1061                });
1062                Ok(())
1063            }
1064            AgentProcessType::Acp(_) => {
1065                Err("prompt_claude_async is only for Claude sessions".to_string())
1066            }
1067        }
1068    }
1069}
1070
1071// ─── ACP Presets ────────────────────────────────────────────────────────
1072
1073/// Resume/continuation capability metadata for a provider.
1074#[derive(Debug, Clone, Serialize, Deserialize)]
1075#[serde(rename_all = "camelCase")]
1076pub struct ResumeCapability {
1077    pub supported: bool,
1078    /// "native" | "replay" | "both"
1079    pub mode: String,
1080    #[serde(default)]
1081    #[serde(skip_serializing_if = "Option::is_none")]
1082    pub supports_fork: Option<bool>,
1083    #[serde(default)]
1084    #[serde(skip_serializing_if = "Option::is_none")]
1085    pub supports_list: Option<bool>,
1086}
1087
1088/// ACP provider presets for known coding agents.
1089#[derive(Debug, Clone, Serialize, Deserialize)]
1090pub struct AcpPreset {
1091    /// Unique identifier (lowercase, e.g., "claude", "opencode")
1092    pub id: String,
1093    /// Human-readable display name (e.g., "Claude Code", "OpenCode")
1094    pub name: String,
1095    pub command: String,
1096    pub args: Vec<String>,
1097    pub description: String,
1098    #[serde(default)]
1099    #[serde(skip_serializing_if = "Option::is_none")]
1100    pub env_bin_override: Option<String>,
1101    /// Resume/continuation capabilities for this provider.
1102    #[serde(default)]
1103    #[serde(skip_serializing_if = "Option::is_none")]
1104    pub resume: Option<ResumeCapability>,
1105}
1106
1107/// Get the list of known ACP agent presets (static/builtin only).
1108pub fn get_presets() -> Vec<AcpPreset> {
1109    vec![
1110        AcpPreset {
1111            id: "opencode".to_string(),
1112            name: "OpenCode".to_string(),
1113            command: "opencode".to_string(),
1114            args: vec!["acp".to_string()],
1115            description: "OpenCode AI coding agent".to_string(),
1116            env_bin_override: Some("OPENCODE_BIN".to_string()),
1117            resume: Some(ResumeCapability {
1118                supported: true,
1119                mode: "replay".to_string(),
1120                supports_fork: None,
1121                supports_list: None,
1122            }),
1123        },
1124        AcpPreset {
1125            id: "gemini".to_string(),
1126            name: "Gemini".to_string(),
1127            command: "gemini".to_string(),
1128            args: vec!["--experimental-acp".to_string()],
1129            description: "Google Gemini CLI".to_string(),
1130            env_bin_override: None,
1131            resume: None,
1132        },
1133        AcpPreset {
1134            id: "codex-acp".to_string(),
1135            name: "Codex".to_string(),
1136            command: "codex-acp".to_string(),
1137            args: vec![],
1138            description: "OpenAI Codex CLI (codex-acp wrapper)".to_string(),
1139            env_bin_override: Some("CODEX_ACP_BIN".to_string()),
1140            resume: Some(ResumeCapability {
1141                supported: true,
1142                mode: "both".to_string(),
1143                supports_fork: None,
1144                supports_list: Some(true),
1145            }),
1146        },
1147        AcpPreset {
1148            id: "copilot".to_string(),
1149            name: "GitHub Copilot".to_string(),
1150            command: "copilot".to_string(),
1151            args: vec![
1152                "--acp".to_string(),
1153                "--allow-all-tools".to_string(),
1154                "--no-ask-user".to_string(),
1155            ],
1156            description: "GitHub Copilot CLI".to_string(),
1157            env_bin_override: Some("COPILOT_BIN".to_string()),
1158            resume: None,
1159        },
1160        AcpPreset {
1161            id: "auggie".to_string(),
1162            name: "Auggie".to_string(),
1163            command: "auggie".to_string(),
1164            args: vec!["--acp".to_string()],
1165            description: "Augment Code's AI agent".to_string(),
1166            env_bin_override: None,
1167            resume: None,
1168        },
1169        AcpPreset {
1170            id: "kimi".to_string(),
1171            name: "Kimi".to_string(),
1172            command: "kimi".to_string(),
1173            args: vec!["acp".to_string()],
1174            description: "Moonshot AI's Kimi CLI".to_string(),
1175            env_bin_override: None,
1176            resume: None,
1177        },
1178        AcpPreset {
1179            id: "kiro".to_string(),
1180            name: "Kiro".to_string(),
1181            command: "kiro-cli".to_string(),
1182            args: vec!["acp".to_string()],
1183            description: "Amazon Kiro AI coding agent".to_string(),
1184            env_bin_override: Some("KIRO_BIN".to_string()),
1185            resume: None,
1186        },
1187        AcpPreset {
1188            id: "qoder".to_string(),
1189            name: "Qoder".to_string(),
1190            command: "qodercli".to_string(),
1191            args: vec!["--acp".to_string()],
1192            description: "Qoder AI coding agent".to_string(),
1193            env_bin_override: Some("QODER_BIN".to_string()),
1194            resume: None,
1195        },
1196        AcpPreset {
1197            id: "claude".to_string(),
1198            name: "Claude Code".to_string(),
1199            command: "claude".to_string(),
1200            // Claude Code uses stream-json protocol, not ACP flags
1201            // Args are unused since we use ClaudeCodeProcess directly
1202            args: vec![],
1203            description: "Anthropic Claude Code (stream-json protocol)".to_string(),
1204            env_bin_override: Some("CLAUDE_BIN".to_string()),
1205            resume: Some(ResumeCapability {
1206                supported: true,
1207                mode: "replay".to_string(),
1208                supports_fork: Some(true),
1209                supports_list: None,
1210            }),
1211        },
1212    ]
1213}
1214
1215/// Get a static preset by ID (synchronous, no registry lookup).
1216pub fn get_preset_by_id(id: &str) -> Option<AcpPreset> {
1217    let normalized_id = match id {
1218        "codex" => "codex-acp",
1219        "qodercli" => "qoder",
1220        other => other,
1221    };
1222    get_presets().into_iter().find(|p| p.id == normalized_id)
1223}
1224
1225/// Get the resume capability for a provider ID (synchronous).
1226pub fn get_resume_capability(provider: &str) -> Option<ResumeCapability> {
1227    get_preset_by_id(provider).and_then(|p| p.resume)
1228}
1229
1230/// Get a preset by ID, checking both static presets and registry.
1231/// Static presets take precedence.
1232///
1233/// Supports suffixed IDs like "auggie-registry" to explicitly request
1234/// the registry version when both built-in and registry versions exist.
1235pub async fn get_preset_by_id_with_registry(id: &str) -> Result<AcpPreset, String> {
1236    let normalized_id = match id {
1237        "codex" => "codex-acp",
1238        "qodercli" => "qoder",
1239        other => other,
1240    };
1241
1242    // Handle suffixed IDs (e.g., "auggie-registry")
1243    // This allows explicit selection of registry version when both exist
1244    const REGISTRY_SUFFIX: &str = "-registry";
1245    if let Some(base_id) = normalized_id.strip_suffix(REGISTRY_SUFFIX) {
1246        let mut preset = get_registry_preset(base_id).await?;
1247        // Keep the suffixed ID in the returned preset for consistency
1248        preset.id = id.to_string();
1249        return Ok(preset);
1250    }
1251
1252    // Check static presets first (match by id, not name)
1253    if let Some(mut preset) = get_presets().into_iter().find(|p| p.id == normalized_id) {
1254        if preset.id != id {
1255            preset.id = id.to_string();
1256        }
1257        return Ok(preset);
1258    }
1259
1260    // Fall back to registry
1261    let mut preset = get_registry_preset(normalized_id).await?;
1262    if preset.id != id {
1263        preset.id = id.to_string();
1264    }
1265    Ok(preset)
1266}
1267
1268/// Get a preset from the ACP registry by ID.
1269async fn get_registry_preset(id: &str) -> Result<AcpPreset, String> {
1270    let registry: AcpRegistry = fetch_registry().await?;
1271
1272    // Find the agent
1273    let agent = registry
1274        .agents
1275        .into_iter()
1276        .find(|a| a.id == id)
1277        .ok_or_else(|| format!("Agent '{id}' not found in registry"))?;
1278
1279    // Build command from distribution
1280    let (command, args) = if let Some(ref npx) = agent.distribution.npx {
1281        let mut args = vec!["-y".to_string(), npx.package.clone()];
1282        args.extend(npx.args.clone());
1283        ("npx".to_string(), args)
1284    } else if let Some(ref uvx) = agent.distribution.uvx {
1285        let mut args = vec![uvx.package.clone()];
1286        args.extend(uvx.args.clone());
1287        ("uvx".to_string(), args)
1288    } else {
1289        return Err(format!(
1290            "Agent '{id}' has no supported distribution (npx/uvx)"
1291        ));
1292    };
1293
1294    Ok(AcpPreset {
1295        id: agent.id.clone(),
1296        name: agent.name,
1297        command,
1298        args,
1299        description: agent.description,
1300        env_bin_override: None,
1301        resume: None,
1302    })
1303}
1304
1305fn resolve_preset_command(preset: &AcpPreset) -> String {
1306    if let Some(env_var) = &preset.env_bin_override {
1307        if let Ok(custom_command) = std::env::var(env_var) {
1308            let trimmed = custom_command.trim();
1309            if !trimmed.is_empty() {
1310                return trimmed.to_string();
1311            }
1312        }
1313    }
1314
1315    crate::shell_env::which(&preset.command).unwrap_or_else(|| preset.command.clone())
1316}
1317
1318// ─── Utility Functions ─────────────────────────────────────────────────────
1319
1320/// Truncate content to a maximum length for storage in traces.
1321fn truncate_content(text: &str, max_len: usize) -> String {
1322    if text.chars().count() <= max_len {
1323        text.to_string()
1324    } else if max_len <= 3 {
1325        text.chars().take(max_len).collect()
1326    } else {
1327        let truncated: String = text.chars().take(max_len - 3).collect();
1328        format!("{truncated}...")
1329    }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::{
1335        get_preset_by_id_with_registry, get_presets, truncate_content, validate_session_cwd,
1336        AcpManager, AcpSessionRecord,
1337    };
1338    use std::collections::HashMap;
1339    use std::fs;
1340    use std::sync::Arc;
1341    use tokio::sync::RwLock;
1342
1343    #[test]
1344    fn static_presets_include_codex_acp_for_codex_alias() {
1345        let presets = get_presets();
1346        assert!(presets.iter().any(|preset| preset.id == "codex-acp"));
1347    }
1348
1349    #[test]
1350    fn static_presets_include_qoder() {
1351        let presets = get_presets();
1352        assert!(presets.iter().any(|preset| preset.id == "qoder"));
1353    }
1354
1355    #[tokio::test]
1356    async fn qodercli_alias_resolves_to_qoder_preset() {
1357        let preset = get_preset_by_id_with_registry("qodercli")
1358            .await
1359            .expect("qodercli alias should resolve");
1360        assert_eq!(preset.id, "qodercli");
1361        assert_eq!(preset.command, "qodercli");
1362        assert_eq!(preset.args, vec!["--acp".to_string()]);
1363    }
1364
1365    #[test]
1366    fn validate_session_cwd_rejects_missing_or_non_directory_paths() {
1367        let temp = tempfile::tempdir().expect("tempdir should create");
1368        let missing = temp.path().join("missing-dir");
1369        let file_path = temp.path().join("not-a-dir.txt");
1370        fs::write(&file_path, "content").expect("file should write");
1371
1372        let missing_error = validate_session_cwd(missing.to_string_lossy().as_ref())
1373            .expect_err("missing directory should fail");
1374        assert!(missing_error.contains("directory does not exist"));
1375
1376        let file_error = validate_session_cwd(file_path.to_string_lossy().as_ref())
1377            .expect_err("file path should fail");
1378        assert!(file_error.contains("path is not a directory"));
1379
1380        validate_session_cwd(temp.path().to_string_lossy().as_ref())
1381            .expect("existing directory should pass");
1382    }
1383
1384    #[tokio::test]
1385    async fn mark_first_prompt_sent_updates_live_session_record() {
1386        let manager = AcpManager::new();
1387        let session_id = "session-1".to_string();
1388        manager.sessions.write().await.insert(
1389            session_id.clone(),
1390            AcpSessionRecord {
1391                session_id: session_id.clone(),
1392                name: None,
1393                cwd: ".".to_string(),
1394                workspace_id: "default".to_string(),
1395                routa_agent_id: None,
1396                provider: Some("opencode".to_string()),
1397                role: Some("CRAFTER".to_string()),
1398                mode_id: None,
1399                model: None,
1400                created_at: chrono::Utc::now().to_rfc3339(),
1401                first_prompt_sent: false,
1402                parent_session_id: None,
1403                specialist_id: None,
1404                specialist_system_prompt: None,
1405            },
1406        );
1407
1408        manager.mark_first_prompt_sent(&session_id).await;
1409
1410        let session = manager.get_session(&session_id).await.expect("session");
1411        assert!(session.first_prompt_sent);
1412    }
1413
1414    #[tokio::test]
1415    async fn push_to_history_skips_parent_child_forwarding_noise() {
1416        let manager = AcpManager {
1417            sessions: Arc::new(RwLock::new(HashMap::new())),
1418            processes: Arc::new(RwLock::new(HashMap::new())),
1419            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1420            history: Arc::new(RwLock::new(HashMap::new())),
1421        };
1422
1423        manager
1424            .push_to_history(
1425                "parent",
1426                serde_json::json!({
1427                    "sessionId": "parent",
1428                    "childAgentId": "child-1",
1429                    "update": { "sessionUpdate": "agent_message", "content": { "type": "text", "text": "delegated" } }
1430                }),
1431            )
1432            .await;
1433
1434        let history = manager
1435            .get_session_history("parent")
1436            .await
1437            .unwrap_or_default();
1438        assert!(history.is_empty());
1439    }
1440
1441    #[tokio::test]
1442    async fn emit_session_update_broadcasts_when_channel_exists() {
1443        let (tx, mut rx) = tokio::sync::broadcast::channel(8);
1444        let manager = AcpManager {
1445            sessions: Arc::new(RwLock::new(HashMap::new())),
1446            processes: Arc::new(RwLock::new(HashMap::new())),
1447            notification_channels: Arc::new(RwLock::new(HashMap::from([(
1448                "session-1".to_string(),
1449                tx,
1450            )]))),
1451            history: Arc::new(RwLock::new(HashMap::new())),
1452        };
1453
1454        manager
1455            .emit_session_update(
1456                "session-1",
1457                serde_json::json!({
1458                    "sessionUpdate": "turn_complete",
1459                    "stopReason": "cancelled"
1460                }),
1461            )
1462            .await
1463            .expect("emit should succeed");
1464
1465        let broadcast = rx.recv().await.expect("broadcast event");
1466        assert_eq!(
1467            broadcast["params"]["update"]["sessionUpdate"].as_str(),
1468            Some("turn_complete")
1469        );
1470        assert_eq!(
1471            broadcast["params"]["update"]["stopReason"].as_str(),
1472            Some("cancelled")
1473        );
1474    }
1475
1476    #[tokio::test]
1477    async fn emit_session_update_persists_history_without_channel() {
1478        let manager = AcpManager {
1479            sessions: Arc::new(RwLock::new(HashMap::new())),
1480            processes: Arc::new(RwLock::new(HashMap::new())),
1481            notification_channels: Arc::new(RwLock::new(HashMap::new())),
1482            history: Arc::new(RwLock::new(HashMap::new())),
1483        };
1484
1485        manager
1486            .emit_session_update(
1487                "session-1",
1488                serde_json::json!({
1489                    "sessionUpdate": "turn_complete",
1490                    "stopReason": "cancelled"
1491                }),
1492            )
1493            .await
1494            .expect("emit should succeed");
1495
1496        let history = manager
1497            .get_session_history("session-1")
1498            .await
1499            .expect("history should exist");
1500        assert_eq!(history.len(), 1);
1501        assert_eq!(
1502            history[0]["update"]["sessionUpdate"].as_str(),
1503            Some("turn_complete")
1504        );
1505    }
1506
1507    #[test]
1508    fn rewrite_notification_session_id_overrides_provider_session_id() {
1509        let rewritten = AcpManager::rewrite_notification_session_id(
1510            "child-session",
1511            serde_json::json!({
1512                "sessionId": "provider-session",
1513                "update": { "sessionUpdate": "agent_message_chunk", "content": { "text": "hi" } }
1514            }),
1515        );
1516
1517        assert_eq!(rewritten["sessionId"].as_str(), Some("child-session"));
1518    }
1519
1520    #[test]
1521    fn truncate_content_handles_unicode_boundaries() {
1522        assert_eq!(truncate_content("你好世界ABC", 5), "你好...");
1523        assert_eq!(truncate_content("你好世界ABC", 3), "你好世");
1524        assert_eq!(truncate_content("短文本", 10), "短文本");
1525    }
1526}