Skip to main content

zeph_acp/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! ACP agent implementation — session management and IDE capability proxying.
5//!
6//! [`ZephAcpAgentState`] manages multiple concurrent ACP sessions. Each session creates
7//! an isolated agent loop via the [`AgentSpawner`] factory, runs it on a
8//! [`LoopbackChannel`], and shuttles messages between the loop and the IDE over the ACP
9//! connection. Use [`run_agent`] to drive the dispatch loop over a given transport.
10//!
11//! IDE capabilities (filesystem, terminal, LSP) are detected during `initialize()` and
12//! surfaced to the agent loop through [`AcpContext`].
13
14use std::path::{Component, PathBuf};
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use parking_lot::{Mutex, RwLock};
20
21use agent_client_protocol as acp;
22use futures::StreamExt as _;
23use tokio::sync::{mpsc, oneshot};
24use tokio_util::sync::CancellationToken;
25use zeph_core::channel::{ChannelMessage, LoopbackChannel, LoopbackHandle};
26use zeph_core::text::truncate_to_chars;
27use zeph_core::{LoopbackEvent, StopHint};
28use zeph_llm::any::AnyProvider;
29use zeph_llm::provider::LlmProvider as _;
30use zeph_mcp::McpManager;
31use zeph_mcp::manager::ServerEntry;
32use zeph_memory::ConversationId;
33use zeph_memory::store::SqliteStore;
34
35use tracing::Instrument as _;
36use zeph_tools::is_private_ip;
37
38use crate::fs::AcpFileExecutor;
39use crate::lsp::DiagnosticsCache;
40use crate::permission::AcpPermissionGate;
41use crate::terminal::AcpShellExecutor;
42use crate::transport::SharedAvailableModels;
43
44/// Factory that creates a provider by `{provider}:{model}` key.
45///
46/// Called when the IDE sends `set_session_config_option` with a new model selection.
47/// Returns `None` when the requested key is not recognized.
48///
49/// # Examples
50///
51/// ```rust,no_run
52/// use std::sync::Arc;
53/// use zeph_acp::agent::ProviderFactory;
54///
55/// let factory: ProviderFactory = Arc::new(|key| {
56///     // key format: "openai:gpt-4o" or "ollama:llama3"
57///     let _key = key;
58///     None // return Some(provider) for known keys
59/// });
60/// ```
61pub type ProviderFactory = Arc<dyn Fn(&str) -> Option<AnyProvider> + Send + Sync>;
62
63/// Per-session context passed to the agent spawner.
64///
65/// Provides the session identity and persistence handles needed to bootstrap
66/// an agent loop for an individual ACP session.
67///
68/// `conversation_id` is `Some` when a SQLite-backed [`ConversationId`] was
69/// successfully created or retrieved for this session. `None` means the store
70/// was unavailable at session creation time; the agent operates without
71/// persistent history in that case.
72pub struct SessionContext {
73    /// ACP-assigned session identifier.
74    pub session_id: acp::schema::SessionId,
75    /// `SQLite` conversation ID for persisting message history, if available.
76    pub conversation_id: Option<ConversationId>,
77    /// Working directory reported by the IDE for this session.
78    pub working_dir: PathBuf,
79}
80
81const MAX_PROMPT_BYTES: usize = 1_048_576; // 1 MiB
82const MAX_IMAGE_BASE64_BYTES: usize = 20 * 1_048_576; // 20 MiB base64-encoded
83
84const SUPPORTED_IMAGE_MIMES: &[&str] = &[
85    "image/jpeg",
86    "image/jpg",
87    "image/png",
88    "image/gif",
89    "image/webp",
90];
91const LOOPBACK_CHANNEL_CAPACITY: usize = 64;
92/// Maximum bytes fetched from an HTTP resource link.
93const MAX_RESOURCE_BYTES: usize = 1_048_576; // 1 MiB
94/// Timeout for HTTP resource link fetch.
95const RESOURCE_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
96
97/// Pseudo-filesystem path components that expose secrets or kernel internals.
98const BLOCKED_PATH_COMPONENTS: &[&str] = &["proc", "sys", "dev", ".ssh", ".gnupg", ".aws"];
99
100/// Resolve a `ResourceLink` URI to its text content.
101///
102/// Supports `file://` and `http(s)://` URIs. Returns an error for unsupported
103/// schemes or security violations (SSRF, path traversal, binary content).
104///
105/// `session_cwd` is used as the allowed root for `file://` URIs. Only paths
106/// that are descendants of `session_cwd` are permitted.
107async fn resolve_resource_link(
108    link: &acp::schema::ResourceLink,
109    session_cwd: &std::path::Path,
110) -> Result<String, crate::error::AcpError> {
111    let uri = &link.uri;
112
113    if let Some(path_str) = uri.strip_prefix("file://") {
114        // Canonicalize to resolve symlinks and `..` — single syscall, no TOCTOU.
115        let path = std::path::Path::new(path_str);
116
117        // Pre-check size to avoid loading large files into memory before rejection.
118        let meta = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::metadata(path))
119            .await
120            .map_err(|_| {
121                crate::error::AcpError::ResourceLink(format!("file:// metadata timed out: {uri}"))
122            })?
123            .map_err(|e| {
124                crate::error::AcpError::ResourceLink(format!("file:// stat failed: {e}"))
125            })?;
126
127        if meta.len() > MAX_RESOURCE_BYTES as u64 {
128            return Err(crate::error::AcpError::ResourceLink(format!(
129                "file:// content exceeds size limit ({MAX_RESOURCE_BYTES} bytes): {uri}"
130            )));
131        }
132
133        let canonical = tokio::fs::canonicalize(path).await.map_err(|e| {
134            crate::error::AcpError::ResourceLink(format!("file:// resolution failed: {e}"))
135        })?;
136
137        // Enforce cwd boundary: only files inside the session working directory are allowed.
138        if !canonical.starts_with(session_cwd) {
139            return Err(crate::error::AcpError::ResourceLink(format!(
140                "file:// path outside session working directory: {uri}"
141            )));
142        }
143
144        // Reject pseudo-filesystems and sensitive directories.
145        for component in canonical.components() {
146            if let Component::Normal(name) = component {
147                let name_str = name.to_string_lossy();
148                if BLOCKED_PATH_COMPONENTS
149                    .iter()
150                    .any(|blocked| name_str == *blocked)
151                {
152                    return Err(crate::error::AcpError::ResourceLink(format!(
153                        "file:// path blocked: {uri}"
154                    )));
155                }
156            }
157        }
158
159        let bytes = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::read(&canonical))
160            .await
161            .map_err(|_| {
162                crate::error::AcpError::ResourceLink(format!("file:// read timed out: {uri}"))
163            })?
164            .map_err(|e| {
165                crate::error::AcpError::ResourceLink(format!("file:// read failed: {e}"))
166            })?;
167
168        // Reject binary files (null byte check — S-1).
169        if bytes.contains(&0u8) {
170            return Err(crate::error::AcpError::ResourceLink(format!(
171                "binary file not supported as ResourceLink content: {uri}"
172            )));
173        }
174
175        String::from_utf8(bytes).map_err(|_| {
176            crate::error::AcpError::ResourceLink(format!(
177                "file:// content is not valid UTF-8: {uri}"
178            ))
179        })
180    } else if uri.starts_with("http://") || uri.starts_with("https://") {
181        // No-redirect policy prevents redirect-based SSRF bypass.
182        let client = reqwest::Client::builder()
183            .redirect(reqwest::redirect::Policy::none())
184            .timeout(RESOURCE_FETCH_TIMEOUT)
185            .build()
186            .map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP client error: {e}")))?;
187
188        let resp = client
189            .get(uri.as_str())
190            .header(reqwest::header::ACCEPT, "text/*")
191            .send()
192            .await
193            .map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP fetch failed: {e}")))?;
194
195        // Post-fetch IP check: eliminates DNS rebinding TOCTOU window (RC-1).
196        // Fail-closed: if remote_addr() is unavailable (e.g. rustls), reject the response.
197        match resp.remote_addr() {
198            None => {
199                return Err(crate::error::AcpError::ResourceLink(format!(
200                    "SSRF check failed: remote address unavailable for {uri}"
201                )));
202            }
203            Some(remote_addr) if is_private_ip(remote_addr.ip()) => {
204                return Err(crate::error::AcpError::ResourceLink(format!(
205                    "SSRF blocked: {uri} resolved to private address {remote_addr}"
206                )));
207            }
208            Some(_) => {}
209        }
210
211        if !resp.status().is_success() {
212            return Err(crate::error::AcpError::ResourceLink(format!(
213                "HTTP fetch returned {}: {uri}",
214                resp.status()
215            )));
216        }
217
218        // Reject non-text content types.
219        let content_type = resp
220            .headers()
221            .get(reqwest::header::CONTENT_TYPE)
222            .and_then(|v| v.to_str().ok())
223            .unwrap_or("");
224        if !content_type.is_empty() && !content_type.starts_with("text/") {
225            return Err(crate::error::AcpError::ResourceLink(format!(
226                "non-text MIME type rejected for ResourceLink: {content_type}"
227            )));
228        }
229
230        // Stream up to MAX_RESOURCE_BYTES to avoid unbounded memory use.
231        let mut body = resp.bytes_stream();
232        let mut buf = Vec::with_capacity(4096);
233        while let Some(chunk) = body.next().await {
234            let chunk = chunk.map_err(|e| {
235                crate::error::AcpError::ResourceLink(format!("HTTP read error: {e}"))
236            })?;
237            if buf.len() + chunk.len() > MAX_RESOURCE_BYTES {
238                buf.extend_from_slice(&chunk[..MAX_RESOURCE_BYTES.saturating_sub(buf.len())]);
239                break;
240            }
241            buf.extend_from_slice(&chunk);
242        }
243
244        String::from_utf8(buf).map_err(|_| {
245            crate::error::AcpError::ResourceLink(format!(
246                "HTTP response body is not valid UTF-8: {uri}"
247            ))
248        })
249    } else {
250        Err(crate::error::AcpError::ResourceLink(format!(
251            "unsupported URI scheme in ResourceLink: {uri}"
252        )))
253    }
254}
255
256/// IDE-proxied capabilities passed to the agent loop per session.
257///
258/// Each field is `None` when the IDE did not advertise the corresponding capability
259/// during the ACP `initialize()` handshake. The agent loop should degrade gracefully
260/// when optional capabilities are absent.
261pub struct AcpContext {
262    /// IDE-proxied filesystem executor (`fs.readTextFile` / `fs.writeTextFile`).
263    ///
264    /// `None` when the IDE did not advertise filesystem capability.
265    pub file_executor: Option<AcpFileExecutor>,
266    /// IDE-proxied shell executor (`terminal.create` / `terminal.execute`).
267    ///
268    /// `None` when the IDE did not advertise terminal capability.
269    pub shell_executor: Option<AcpShellExecutor>,
270    /// Permission gate for tool-call approval requests sent to the IDE.
271    ///
272    /// `None` when the IDE did not advertise permission capability.
273    pub permission_gate: Option<AcpPermissionGate>,
274    /// Shared cancellation signal.
275    ///
276    /// Notify this to interrupt the currently running agent operation (e.g. on user cancel).
277    pub cancel_signal: std::sync::Arc<tokio::sync::Notify>,
278    /// Shared slot for runtime model switching via `set_session_config_option`.
279    ///
280    /// When `Some`, the agent should swap its provider before the next turn.
281    pub provider_override: Arc<RwLock<Option<AnyProvider>>>,
282    /// Tool call ID of the parent agent's tool call that spawned this subagent session.
283    ///
284    /// `None` for top-level (non-subagent) sessions.
285    pub parent_tool_use_id: Option<String>,
286    /// LSP provider when the IDE advertised `meta["lsp"]` capability.
287    ///
288    /// `None` when the IDE does not support LSP extension methods.
289    pub lsp_provider: Option<crate::lsp::AcpLspProvider>,
290    /// Shared diagnostics cache — written by the LSP notification handler in `ZephAcpAgent`
291    /// and read by the agent loop context builder to inject diagnostics into the system prompt.
292    pub diagnostics_cache: Arc<RwLock<DiagnosticsCache>>,
293}
294
295/// Factory that receives a [`LoopbackChannel`], optional [`AcpContext`], and [`SessionContext`],
296/// then drives the agent loop to completion.
297///
298/// Each invocation creates an independent agent with its own conversation history,
299/// enabling true multi-session isolation. The future is `'static` but not `Send`
300/// (`Agent<LoopbackChannel>` holds non-`Send` references across `.await`); scheduled
301/// via `tokio::task::spawn_local` inside a `LocalSet`. The ACP transport runtime
302/// (`serve_stdio`/`serve_connection`) already wraps the dispatcher in a `LocalSet`,
303/// so handler code may call `spawn_local` directly without additional setup.
304///
305/// # Examples
306///
307/// ```rust,no_run
308/// use std::sync::Arc;
309/// use zeph_acp::{AgentSpawner, AcpContext, SessionContext};
310/// use zeph_core::channel::LoopbackChannel;
311///
312/// let spawner: AgentSpawner = Arc::new(|channel, ctx, session| {
313///     Box::pin(async move {
314///         // drive your agent loop here
315///         drop((channel, ctx, session));
316///     })
317/// });
318/// ```
319pub type AgentSpawner = Arc<
320    dyn Fn(
321            LoopbackChannel,
322            Option<AcpContext>,
323            SessionContext,
324        ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>
325        + Send
326        + Sync
327        + 'static,
328>;
329
330/// Thread-safe variant of [`AgentSpawner`] required by the HTTP transport.
331///
332/// Used with [`AcpHttpState`](crate::transport::http::AcpHttpState) to satisfy
333/// `axum::State` requirements (`Send + Sync`). In practice this is the same type
334/// alias — the distinction exists to make the intent clear at call sites.
335#[cfg(feature = "acp-http")]
336pub type SendAgentSpawner = AgentSpawner;
337
338/// Sender half for delivering session notifications to the per-session drainer.
339pub(crate) type NotifySender =
340    mpsc::UnboundedSender<(acp::schema::SessionNotification, oneshot::Sender<()>)>;
341
342/// Receiver half paired with [`NotifySender`].
343pub(crate) type NotifyReceiver =
344    mpsc::UnboundedReceiver<(acp::schema::SessionNotification, oneshot::Sender<()>)>;
345
346pub(crate) struct SessionEntry {
347    pub(crate) input_tx: mpsc::Sender<ChannelMessage>,
348    /// Receiver is owned solely by the `prompt()` handler.
349    /// `Mutex` instead of `RefCell` so `SessionEntry` is `Send`.
350    pub(crate) output_rx: Mutex<Option<mpsc::Receiver<LoopbackEvent>>>,
351    pub(crate) cancel_signal: Arc<tokio::sync::Notify>,
352    /// Epoch milliseconds; updated on every prompt.
353    pub(crate) last_active_ms: AtomicU64,
354    pub(crate) created_at: chrono::DateTime<chrono::Utc>,
355    pub(crate) working_dir: Mutex<Option<std::path::PathBuf>>,
356    /// Channel for sending notifications to the per-session drainer task.
357    pub(crate) notify_tx: NotifySender,
358    /// Receiver consumed by the drainer task spawned in `new_session` / `load_session`.
359    /// Wrapped in `Mutex` so it can be `take()`-n exactly once.
360    pub(crate) notify_rx: Mutex<Option<NotifyReceiver>>,
361    /// Shared provider override slot; written by `set_session_config_option`, read by agent loop.
362    provider_override: Arc<RwLock<Option<AnyProvider>>>,
363    /// Currently selected model identifier (display / tracking only).
364    current_model: Mutex<String>,
365    /// Current session mode (ask / architect / code).
366    current_mode: Mutex<acp::schema::SessionModeId>,
367    /// Set after the first successful prompt so title generation fires only once.
368    first_prompt_done: AtomicBool,
369    /// Auto-generated session title; populated after first prompt via `SessionTitle` event.
370    title: Mutex<Option<String>>,
371    /// Whether extended thinking is enabled for this session.
372    thinking_enabled: AtomicBool,
373    /// Auto-approve level for this session ("suggest" | "auto-edit" | "full-auto").
374    auto_approve_level: Mutex<String>,
375    /// Shell executor for this session, retained so the event loop can release terminals
376    /// after `tool_call_update` notifications are sent (ACP requires the terminal to
377    /// remain alive until after the notification that embeds it).
378    pub(crate) shell_executor: Option<AcpShellExecutor>,
379    /// Message-id captured at the start of a `do_prompt` turn.
380    ///
381    /// The existing one-in-flight-prompt invariant (enforced by `output_rx.lock().take()` at
382    /// line ~1142) guarantees at most one concurrent writer, so a plain `Mutex<Option<String>>`
383    /// is sufficient without `parking_lot`.
384    #[cfg(feature = "unstable-message-id")]
385    pub(crate) current_message_id: std::sync::Mutex<Option<String>>,
386}
387
388impl SessionEntry {
389    #[allow(dead_code)]
390    fn last_active(&self) -> std::time::Instant {
391        let ms = self.last_active_ms.load(Ordering::Relaxed);
392        let now_ms = u64::try_from(
393            std::time::SystemTime::now()
394                .duration_since(std::time::UNIX_EPOCH)
395                .unwrap_or_default()
396                .as_millis(),
397        )
398        .unwrap_or(u64::MAX);
399        let elapsed_ms = now_ms.saturating_sub(ms);
400        std::time::Instant::now()
401            .checked_sub(std::time::Duration::from_millis(elapsed_ms))
402            .unwrap_or_else(std::time::Instant::now)
403    }
404
405    fn touch(&self) {
406        let ms = u64::try_from(
407            std::time::SystemTime::now()
408                .duration_since(std::time::UNIX_EPOCH)
409                .unwrap_or_default()
410                .as_millis(),
411        )
412        .unwrap_or(u64::MAX);
413        self.last_active_ms.store(ms, Ordering::Relaxed);
414    }
415}
416
417type SessionMap = Arc<Mutex<std::collections::HashMap<acp::schema::SessionId, SessionEntry>>>;
418
419/// ACP agent state shared across all connections.
420///
421/// Wraps session management, configuration, and per-session tool executors.
422/// Pass an `Arc<ZephAcpAgentState>` to [`run_agent`] to drive the dispatch loop.
423pub struct ZephAcpAgentState {
424    pub(crate) spawner: AgentSpawner,
425    pub(crate) sessions: SessionMap,
426    pub(crate) agent_name: String,
427    agent_version: String,
428    max_sessions: usize,
429    idle_timeout: std::time::Duration,
430    pub(crate) store: Option<SqliteStore>,
431    permission_file: Option<std::path::PathBuf>,
432    /// IDE capabilities received during `initialize()`; used by `build_acp_context`.
433    pub(crate) client_caps: RwLock<acp::schema::ClientCapabilities>,
434    /// Factory for creating a new provider by `{provider}:{model}` key.
435    pub(crate) provider_factory: Option<ProviderFactory>,
436    /// Available model identifiers advertised in `new_session` `config_options`.
437    available_models: SharedAvailableModels,
438    /// Shared MCP manager for `ext_method` add/remove/list.
439    pub(crate) mcp_manager: Option<Arc<McpManager>>,
440    /// Project rule file paths advertised in `new_session` `_meta`.
441    project_rules: Vec<std::path::PathBuf>,
442    /// Maximum characters for auto-generated session titles.
443    title_max_chars: usize,
444    /// Maximum number of sessions returned by `list_sessions` (0 = unlimited).
445    max_history: usize,
446    /// LSP extension configuration (from `[acp.lsp]`).
447    pub(crate) lsp_config: zeph_core::config::AcpLspConfig,
448    /// Per-agent diagnostics cache, shared between the agent (writer) and `AcpContext` (reader).
449    pub(crate) diagnostics_cache: Arc<RwLock<DiagnosticsCache>>,
450    /// Cancellation token for the idle reaper task.
451    reaper_cancel: CancellationToken,
452    /// Canonicalized allowlist of directories ACP clients may reference in session requests.
453    additional_directories_allow: Vec<std::path::PathBuf>,
454    /// Auth methods to advertise in the `initialize` response. MVP: always `[Agent]`.
455    auth_methods_config: Vec<zeph_core::config::AcpAuthMethod>,
456    /// When `true`, echo `PromptRequest.message_id` through responses and chunks.
457    message_ids_enabled: bool,
458}
459
460/// Backward-compatible alias.
461pub type ZephAcpAgent = ZephAcpAgentState;
462
463impl ZephAcpAgentState {
464    pub fn new(
465        spawner: AgentSpawner,
466        max_sessions: usize,
467        session_idle_timeout_secs: u64,
468        permission_file: Option<std::path::PathBuf>,
469    ) -> Self {
470        let lsp_config = zeph_core::config::AcpLspConfig::default();
471        let max_diag_files = lsp_config.max_diagnostic_files;
472        Self {
473            spawner,
474            sessions: Arc::new(Mutex::new(std::collections::HashMap::new())),
475            agent_name: "zeph".to_owned(),
476            agent_version: env!("CARGO_PKG_VERSION").to_owned(),
477            max_sessions,
478            idle_timeout: std::time::Duration::from_secs(session_idle_timeout_secs),
479            store: None,
480            permission_file,
481            client_caps: RwLock::new(acp::schema::ClientCapabilities::default()),
482            provider_factory: None,
483            available_models: Arc::new(RwLock::new(Vec::new())),
484            mcp_manager: None,
485            project_rules: Vec::new(),
486            title_max_chars: 60,
487            max_history: 100,
488            lsp_config,
489            diagnostics_cache: Arc::new(RwLock::new(DiagnosticsCache::new(max_diag_files))),
490            reaper_cancel: CancellationToken::new(),
491            additional_directories_allow: Vec::new(),
492            auth_methods_config: vec![zeph_core::config::AcpAuthMethod::Agent],
493            message_ids_enabled: true,
494        }
495    }
496
497    /// Configure the additional-directories allowlist policy.
498    #[must_use]
499    pub fn with_additional_directories(
500        mut self,
501        dirs: Vec<zeph_core::config::AdditionalDir>,
502    ) -> Self {
503        self.additional_directories_allow = dirs
504            .into_iter()
505            .map(|d| d.as_path().to_path_buf())
506            .collect();
507        self
508    }
509
510    /// Configure auth methods advertised in `initialize`.
511    #[must_use]
512    pub fn with_auth_methods(mut self, methods: Vec<zeph_core::config::AcpAuthMethod>) -> Self {
513        self.auth_methods_config = methods;
514        self
515    }
516
517    /// Configure message-id echo behaviour.
518    #[must_use]
519    pub fn with_message_ids_enabled(mut self, enabled: bool) -> Self {
520        self.message_ids_enabled = enabled;
521        self
522    }
523
524    /// Configure LSP extension settings.
525    #[must_use]
526    pub fn with_lsp_config(mut self, config: zeph_core::config::AcpLspConfig) -> Self {
527        let max_files = config.max_diagnostic_files;
528        self.lsp_config = config;
529        self.diagnostics_cache = Arc::new(RwLock::new(DiagnosticsCache::new(max_files)));
530        self
531    }
532
533    #[must_use]
534    pub fn with_store(mut self, store: SqliteStore) -> Self {
535        self.store = Some(store);
536        self
537    }
538
539    #[must_use]
540    pub fn with_agent_info(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
541        self.agent_name = name.into();
542        self.agent_version = version.into();
543        self
544    }
545
546    #[must_use]
547    pub fn with_provider_factory(
548        mut self,
549        factory: ProviderFactory,
550        available_models: SharedAvailableModels,
551    ) -> Self {
552        self.provider_factory = Some(factory);
553        self.available_models = available_models;
554        self
555    }
556
557    fn available_models_snapshot(&self) -> Vec<String> {
558        self.available_models.read().clone()
559    }
560
561    fn initial_model(&self) -> String {
562        self.available_models_snapshot()
563            .into_iter()
564            .next()
565            .unwrap_or_default()
566    }
567
568    #[must_use]
569    pub fn with_mcp_manager(mut self, manager: Arc<McpManager>) -> Self {
570        self.mcp_manager = Some(manager);
571        self
572    }
573
574    #[must_use]
575    pub fn with_project_rules(mut self, rules: Vec<std::path::PathBuf>) -> Self {
576        self.project_rules = rules;
577        self
578    }
579
580    #[must_use]
581    pub fn with_title_max_chars(mut self, max_chars: usize) -> Self {
582        self.title_max_chars = max_chars;
583        self
584    }
585
586    #[must_use]
587    pub fn with_max_history(mut self, max_history: usize) -> Self {
588        self.max_history = max_history;
589        self
590    }
591
592    /// Spawn a background task that periodically evicts idle sessions.
593    ///
594    /// The task runs until the agent's `reaper_cancel` token is cancelled.
595    /// Tracked via a `tokio::spawn` (not `cx.spawn`) because it must survive
596    /// individual connection teardowns in HTTP/WS mode.
597    pub fn start_idle_reaper(&self) {
598        let sessions = Arc::clone(&self.sessions);
599        let idle_timeout = self.idle_timeout;
600        let cancel = self.reaper_cancel.clone();
601        let span = tracing::info_span!("acp.session.reap");
602        tokio::spawn(
603            async move {
604                let mut interval = tokio::time::interval(std::time::Duration::from_mins(1));
605                interval.tick().await; // skip first tick
606                loop {
607                    tokio::select! {
608                        biased;
609                        () = cancel.cancelled() => break,
610                        _ = interval.tick() => {}
611                    }
612                    let now_ms = u64::try_from(
613                        std::time::SystemTime::now()
614                            .duration_since(std::time::UNIX_EPOCH)
615                            .unwrap_or_default()
616                            .as_millis(),
617                    )
618                    .unwrap_or(u64::MAX);
619                    let idle_timeout_ms =
620                        u64::try_from(idle_timeout.as_millis()).unwrap_or(u64::MAX);
621                    let expired: Vec<acp::schema::SessionId> = sessions
622                        .lock()
623                        .iter()
624                        .filter(|(_, e)| {
625                            let idle_ms =
626                                now_ms.saturating_sub(e.last_active_ms.load(Ordering::Relaxed));
627                            e.output_rx.lock().is_some() && idle_ms > idle_timeout_ms
628                        })
629                        .map(|(id, _)| id.clone())
630                        .collect();
631                    for id in expired {
632                        if let Some(entry) = sessions.lock().remove(&id) {
633                            entry.cancel_signal.notify_one();
634                            tracing::debug!(session_id = %id, "evicted idle ACP session (timeout)");
635                        }
636                    }
637                }
638            }
639            .instrument(span),
640        );
641    }
642
643    /// Cancel the idle reaper task.
644    pub fn shutdown(&self) {
645        self.reaper_cancel.cancel();
646    }
647
648    pub(crate) fn build_acp_context(
649        &self,
650        session_id: &acp::schema::SessionId,
651        cx: &acp::ConnectionTo<acp::Client>,
652        cancel_signal: Arc<tokio::sync::Notify>,
653        provider_override: Arc<RwLock<Option<AnyProvider>>>,
654        cwd: PathBuf,
655    ) -> AcpContext {
656        // Use actual IDE capabilities from initialize(); default to false (deny by default).
657        let (can_read, can_write, ide_supports_lsp) = {
658            let caps = self.client_caps.read();
659            let r = caps.fs.read_text_file;
660            let w = caps.fs.write_text_file;
661            let lsp = self.lsp_config.enabled
662                && caps.meta.as_ref().is_some_and(|m| m.contains_key("lsp"));
663            (r, w, lsp)
664        };
665
666        let conn = Arc::new(cx.clone());
667
668        let (perm_gate, perm_handler) =
669            AcpPermissionGate::new(Arc::clone(&conn), self.permission_file.clone());
670        tokio::spawn(perm_handler);
671
672        let (fs_exec, fs_handler) = AcpFileExecutor::new(
673            Arc::clone(&conn),
674            session_id.clone(),
675            can_read,
676            can_write,
677            cwd,
678            Some(perm_gate.clone()),
679        );
680        tokio::spawn(fs_handler);
681
682        let (shell_exec, shell_handler) = AcpShellExecutor::new(
683            Arc::clone(&conn),
684            session_id.clone(),
685            Some(perm_gate.clone()),
686            120,
687        );
688        tokio::spawn(shell_handler);
689
690        let lsp_provider = if ide_supports_lsp {
691            let (provider, lsp_handler) = crate::lsp::AcpLspProvider::new(
692                Arc::clone(&conn),
693                true,
694                self.lsp_config.request_timeout_secs,
695                self.lsp_config.max_references,
696                self.lsp_config.max_workspace_symbols,
697            );
698            tokio::spawn(lsp_handler);
699            Some(provider)
700        } else {
701            None
702        };
703
704        AcpContext {
705            file_executor: Some(fs_exec),
706            shell_executor: Some(shell_exec),
707            permission_gate: Some(perm_gate),
708            cancel_signal,
709            provider_override,
710            parent_tool_use_id: None,
711            lsp_provider,
712            diagnostics_cache: Arc::clone(&self.diagnostics_cache),
713        }
714    }
715
716    pub(crate) async fn send_notification(
717        &self,
718        session_id: &acp::schema::SessionId,
719        notification: acp::schema::SessionNotification,
720    ) -> acp::Result<()> {
721        let tx = self
722            .sessions
723            .lock()
724            .get(session_id)
725            .map(|e| e.notify_tx.clone());
726        let Some(tx) = tx else {
727            return Err(acp::Error::internal_error().data("session not found"));
728        };
729        let (ack_tx, ack_rx) = oneshot::channel();
730        tx.send((notification, ack_tx))
731            .map_err(|_| acp::Error::internal_error().data("notification channel closed"))?;
732        ack_rx
733            .await
734            .map_err(|_| acp::Error::internal_error().data("notification ack lost"))
735    }
736
737    /// Fire-and-forget notification via the session's notify channel (no ack).
738    pub(crate) fn send_notification_nowait(
739        &self,
740        session_id: &acp::schema::SessionId,
741        notification: acp::schema::SessionNotification,
742    ) {
743        let tx = self
744            .sessions
745            .lock()
746            .get(session_id)
747            .map(|e| e.notify_tx.clone());
748        if let Some(tx) = tx {
749            let (ack_tx, _) = oneshot::channel();
750            tx.send((notification, ack_tx)).ok();
751        }
752    }
753
754    fn handle_lsp_publish_diagnostics(&self, params: &str) {
755        #[derive(serde::Deserialize)]
756        struct PublishDiagnosticsParams {
757            uri: String,
758            #[serde(default)]
759            diagnostics: Vec<crate::lsp::LspDiagnostic>,
760        }
761
762        match serde_json::from_str::<PublishDiagnosticsParams>(params) {
763            Ok(p) => {
764                let max = self.lsp_config.max_diagnostics_per_file;
765                let mut diags = p.diagnostics;
766                diags.truncate(max);
767                tracing::debug!(
768                    uri = %p.uri,
769                    count = diags.len(),
770                    "lsp/publishDiagnostics: cached"
771                );
772                self.diagnostics_cache.write().update(p.uri, diags);
773            }
774            Err(e) => {
775                tracing::warn!(error = %e, "lsp/publishDiagnostics: failed to parse params");
776            }
777        }
778    }
779
780    #[allow(clippy::unused_async)]
781    async fn handle_lsp_did_save(&self, params: &str, cx: &acp::ConnectionTo<acp::Client>) {
782        #[derive(serde::Deserialize)]
783        struct DidSaveParams {
784            uri: String,
785        }
786
787        if !self.lsp_config.auto_diagnostics_on_save {
788            return;
789        }
790
791        let uri = match serde_json::from_str::<DidSaveParams>(params) {
792            Ok(p) => p.uri,
793            Err(e) => {
794                tracing::warn!(error = %e, "lsp/didSave: failed to parse params");
795                return;
796            }
797        };
798
799        let params_json = serde_json::json!({ "uri": &uri });
800        let raw = match serde_json::value::to_raw_value(&params_json) {
801            Ok(r) => r,
802            Err(e) => {
803                tracing::warn!(error = %e, "lsp/didSave: failed to serialize params");
804                return;
805            }
806        };
807        let params_value =
808            serde_json::from_str::<serde_json::Value>(raw.get()).unwrap_or(serde_json::Value::Null);
809        let req = acp::UntypedMessage::new("lsp/diagnostics", params_value).unwrap_or_else(|_| {
810            acp::UntypedMessage {
811                method: "lsp/diagnostics".to_owned(),
812                params: serde_json::Value::Null,
813            }
814        });
815        let timeout = std::time::Duration::from_secs(self.lsp_config.request_timeout_secs);
816        // Outbound round-trip inside a notification handler: must use cx.spawn to avoid blocking dispatch.
817        let diagnostics_cache = Arc::clone(&self.diagnostics_cache);
818        let max = self.lsp_config.max_diagnostics_per_file;
819        let cx_inner = cx.clone();
820        let uri_clone = uri.clone();
821        cx.spawn(async move {
822            match tokio::time::timeout(timeout, cx_inner.send_request(req).block_task()).await {
823                Ok(Ok(resp)) => {
824                    match serde_json::from_value::<Vec<crate::lsp::LspDiagnostic>>(resp) {
825                        Ok(mut diags) => {
826                            diags.truncate(max);
827                            tracing::debug!(
828                                uri = %uri_clone,
829                                count = diags.len(),
830                                "lsp/didSave: fetched diagnostics"
831                            );
832                            diagnostics_cache.write().update(uri_clone, diags);
833                        }
834                        Err(e) => {
835                            tracing::warn!(error = %e, "lsp/didSave: failed to parse diagnostics response");
836                        }
837                    }
838                }
839                Ok(Err(e)) => {
840                    tracing::warn!(error = %e, "lsp/didSave: diagnostics request failed");
841                }
842                Err(_) => {
843                    tracing::warn!(uri = %uri_clone, "lsp/didSave: diagnostics request timed out");
844                }
845            }
846            Ok(())
847        }).ok();
848    }
849}
850
851#[derive(serde::Deserialize)]
852struct McpRemoveParams {
853    id: String,
854}
855
856/// Look up the `ConversationId` for an existing ACP session, creating one for legacy
857/// sessions that predate migration 026 (where `conversation_id` is `NULL`).
858///
859/// Returns `None` when the store is unavailable or all creation attempts fail, allowing
860/// the caller to proceed in ephemeral (no-history) mode rather than failing the session.
861async fn resolve_conversation_id(
862    store: &zeph_memory::store::SqliteStore,
863    session_id: &acp::schema::SessionId,
864) -> Option<ConversationId> {
865    match store
866        .get_acp_session_conversation_id(&session_id.to_string())
867        .await
868    {
869        Ok(Some(cid)) => Some(cid),
870        Ok(None) => {
871            // Legacy session (conversation_id IS NULL): create and persist.
872            match store.create_conversation().await {
873                Ok(cid) => {
874                    if let Err(e) = store
875                        .set_acp_session_conversation_id(&session_id.to_string(), cid)
876                        .await
877                    {
878                        tracing::warn!(error = %e, "failed to set conversation_id for legacy session");
879                    }
880                    Some(cid)
881                }
882                Err(e) => {
883                    tracing::warn!(error = %e, "failed to create conversation for legacy session; session will have no persistent history");
884                    None
885                }
886            }
887        }
888        Err(e) => {
889            tracing::warn!(error = %e, "failed to look up conversation_id; session will have no persistent history");
890            None
891        }
892    }
893}
894
895/// Handler implementations — called from `run_agent` handler closures.
896impl ZephAcpAgentState {
897    #[allow(clippy::unused_async)]
898    #[tracing::instrument(skip_all, name = "acp.handler.initialize")]
899    pub(crate) async fn do_initialize(
900        &self,
901        args: acp::schema::InitializeRequest,
902    ) -> acp::Result<acp::schema::InitializeResponse> {
903        tracing::debug!("ACP initialize");
904        *self.client_caps.write() = args.client_capabilities;
905        let title = format!("{} AI Agent", self.agent_name);
906
907        // stdio transport implies a trusted local client; do not expose internal
908        // configuration details. Provide only a generic authentication hint.
909        let mut meta = serde_json::Map::new();
910        meta.insert(
911            "auth_hint".to_owned(),
912            serde_json::json!("authentication required"),
913        );
914
915        let mut caps = acp::schema::AgentCapabilities::new()
916            .load_session(true)
917            .prompt_capabilities(
918                acp::schema::PromptCapabilities::new()
919                    .image(true)
920                    .embedded_context(true),
921            )
922            .meta({
923                let mut cap_meta = serde_json::Map::new();
924                cap_meta.insert("config_options".to_owned(), serde_json::json!(true));
925                cap_meta.insert("ext_methods".to_owned(), serde_json::json!(true));
926                if self.lsp_config.enabled {
927                    cap_meta.insert(
928                        "lsp".to_owned(),
929                        serde_json::json!({
930                            "methods": crate::lsp::LSP_METHODS,
931                            "notifications": crate::lsp::LSP_NOTIFICATIONS,
932                        }),
933                    );
934                }
935                cap_meta
936            });
937        // Advertise MCP transport capabilities when McpManager is present.
938        // Only StreamableHTTP (http=true) is supported; SSE is deprecated in MCP spec 2025-11-25.
939        if self.mcp_manager.is_some() {
940            caps = caps.mcp_capabilities(acp::schema::McpCapabilities::new().http(true).sse(false));
941        }
942        #[cfg(any(
943            feature = "unstable-session-close",
944            feature = "unstable-session-fork",
945            feature = "unstable-session-resume",
946        ))]
947        let caps = {
948            let mut session_caps = acp::schema::SessionCapabilities::new();
949            session_caps = session_caps.list(acp::schema::SessionListCapabilities::default());
950            #[cfg(feature = "unstable-session-close")]
951            {
952                session_caps = session_caps.close(acp::schema::SessionCloseCapabilities::default());
953            }
954            #[cfg(feature = "unstable-session-fork")]
955            {
956                session_caps = session_caps.fork(acp::schema::SessionForkCapabilities::default());
957            }
958            #[cfg(feature = "unstable-session-resume")]
959            {
960                session_caps =
961                    session_caps.resume(acp::schema::SessionResumeCapabilities::default());
962            }
963            caps.session_capabilities(session_caps)
964        };
965
966        #[cfg(feature = "unstable-logout")]
967        let caps = caps.auth(
968            acp::schema::AgentAuthCapabilities::default()
969                .logout(acp::schema::LogoutCapabilities::default()),
970        );
971
972        let auth_methods: Vec<acp::schema::AuthMethod> = self
973            .auth_methods_config
974            .iter()
975            .map(|m| match m {
976                zeph_core::config::AcpAuthMethod::Agent => acp::schema::AuthMethod::Agent(
977                    acp::schema::AuthMethodAgent::new("zeph", "Zeph"),
978                ),
979            })
980            .collect();
981
982        Ok(
983            acp::schema::InitializeResponse::new(acp::schema::ProtocolVersion::LATEST)
984                .auth_methods(auth_methods)
985                .agent_info(
986                    acp::schema::Implementation::new(&self.agent_name, &self.agent_version)
987                        .title(title),
988                )
989                .agent_capabilities(caps)
990                .meta(meta),
991        )
992    }
993
994    #[tracing::instrument(skip_all, name = "acp.handler.dispatch")]
995    pub(crate) async fn do_ext_method(
996        &self,
997        args: acp::schema::ExtRequest,
998    ) -> acp::Result<acp::schema::ExtResponse> {
999        if let Some(fut) = crate::custom::dispatch(self, &args) {
1000            return fut.await;
1001        }
1002        self.ext_method_mcp(&args).await
1003    }
1004
1005    pub(crate) async fn do_ext_notification(
1006        &self,
1007        args: acp::schema::ExtNotification,
1008        cx: &acp::ConnectionTo<acp::Client>,
1009    ) -> acp::Result<()> {
1010        tracing::debug!(method = %args.method, "received ext_notification");
1011        match args.method.as_ref() {
1012            "lsp/publishDiagnostics" => {
1013                self.handle_lsp_publish_diagnostics(args.params.get());
1014            }
1015            "lsp/didSave" => {
1016                self.handle_lsp_did_save(args.params.get(), cx).await;
1017            }
1018            _ => {}
1019        }
1020        Ok(())
1021    }
1022
1023    #[allow(clippy::unused_async)]
1024    #[tracing::instrument(skip_all, name = "acp.handler.authenticate")]
1025    pub(crate) async fn do_authenticate(
1026        &self,
1027        _args: acp::schema::AuthenticateRequest,
1028    ) -> acp::Result<acp::schema::AuthenticateResponse> {
1029        Ok(acp::schema::AuthenticateResponse::default())
1030    }
1031
1032    #[cfg(feature = "unstable-logout")]
1033    #[allow(clippy::unused_async, dead_code)]
1034    #[tracing::instrument(skip_all, name = "acp.handler.logout")]
1035    pub(crate) async fn do_logout(
1036        &self,
1037        _args: acp::schema::LogoutRequest,
1038    ) -> acp::Result<acp::schema::LogoutResponse> {
1039        tracing::debug!("ACP logout (no-op: vault-based auth)");
1040        Ok(acp::schema::LogoutResponse::default())
1041    }
1042
1043    #[allow(clippy::too_many_lines)]
1044    #[tracing::instrument(skip_all, name = "acp.handler.new_session")]
1045    pub(crate) async fn do_new_session(
1046        &self,
1047        args: acp::schema::NewSessionRequest,
1048        cx: &acp::ConnectionTo<acp::Client>,
1049    ) -> acp::Result<acp::schema::NewSessionResponse> {
1050        #[cfg(feature = "unstable-session-add-dirs")]
1051        self.validate_additional_directories(&args.additional_directories)?;
1052        // LRU eviction: find and remove the oldest idle (non-busy) session when at limit.
1053        if self.sessions.lock().len() >= self.max_sessions {
1054            let evict_id = {
1055                let sessions = self.sessions.lock();
1056                sessions
1057                    .iter()
1058                    .filter(|(_, e)| e.output_rx.lock().is_some())
1059                    .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1060                    .map(|(id, _)| id.clone())
1061            };
1062            match evict_id {
1063                Some(id) => {
1064                    if let Some(entry) = self.sessions.lock().remove(&id) {
1065                        entry.cancel_signal.notify_one();
1066                        tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1067                    }
1068                }
1069                None => {
1070                    return Err(acp::Error::internal_error().data("session limit reached"));
1071                }
1072            }
1073        }
1074
1075        let session_id = acp::schema::SessionId::new(uuid::Uuid::new_v4().to_string());
1076        tracing::debug!(%session_id, "new ACP session");
1077
1078        let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1079        let cancel_signal = Arc::clone(&handle.cancel_signal);
1080        let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1081        let provider_override_for_ctx = Arc::clone(&provider_override);
1082
1083        let session_cwd = args.cwd.clone();
1084        let acp_ctx = self.build_acp_context(
1085            &session_id,
1086            cx,
1087            cancel_signal,
1088            provider_override_for_ctx,
1089            session_cwd.clone(),
1090        );
1091        let shell_executor = acp_ctx.shell_executor.clone();
1092        let initial_model = self.initial_model();
1093        let entry = Self::make_session_entry(
1094            handle,
1095            initial_model.clone(),
1096            session_cwd.clone(),
1097            shell_executor,
1098            provider_override,
1099        );
1100
1101        // Spawn per-session notification drainer bound to this connection.
1102        let mut notify_rx = entry
1103            .notify_rx
1104            .lock()
1105            .take()
1106            .expect("notify_rx consumed once");
1107        let cx_drain = cx.clone();
1108        cx.spawn(async move {
1109            while let Some((notif, ack)) = notify_rx.recv().await {
1110                let _enter = tracing::info_span!("acp.session.notify").entered();
1111                if cx_drain.send_notification(notif).is_err() {
1112                    tracing::warn!("session_notification send failed; drainer exiting");
1113                    break;
1114                }
1115                ack.send(()).ok();
1116            }
1117            Ok(())
1118        })?;
1119
1120        self.sessions.lock().insert(session_id.clone(), entry);
1121
1122        let conversation_id = self.create_session_conversation(&session_id).await;
1123
1124        let session_ctx = SessionContext {
1125            session_id: session_id.clone(),
1126            conversation_id,
1127            working_dir: session_cwd.clone(),
1128        };
1129
1130        let spawner = Arc::clone(&self.spawner);
1131        let span = tracing::info_span!("acp.session.agent_loop", session_id = %session_id);
1132        tokio::task::spawn_local(
1133            async move {
1134                (spawner)(channel, Some(acp_ctx), session_ctx).await;
1135            }
1136            .instrument(span),
1137        );
1138
1139        let available_models = self.available_models_snapshot();
1140        let config_options =
1141            build_config_options(&available_models, &initial_model, false, "suggest");
1142        let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1143        let mut resp = acp::schema::NewSessionResponse::new(session_id.clone())
1144            .modes(build_mode_state(&default_mode_id));
1145        if !config_options.is_empty() {
1146            resp = resp.config_options(config_options);
1147        }
1148        if !self.project_rules.is_empty() {
1149            let rules: Vec<serde_json::Value> = self
1150                .project_rules
1151                .iter()
1152                .filter_map(|p| p.file_name())
1153                .map(|n| serde_json::json!({"name": n.to_string_lossy()}))
1154                .collect();
1155            let mut meta = serde_json::Map::new();
1156            meta.insert("projectRules".to_owned(), serde_json::Value::Array(rules));
1157            resp = resp.meta(meta);
1158        }
1159
1160        self.send_commands_update_nowait(&session_id);
1161
1162        Ok(resp)
1163    }
1164
1165    #[tracing::instrument(skip_all, name = "acp.handler.prompt", fields(session_id = %args.session_id))]
1166    #[allow(clippy::too_many_lines)]
1167    pub(crate) async fn do_prompt(
1168        &self,
1169        args: acp::schema::PromptRequest,
1170    ) -> acp::Result<acp::schema::PromptResponse> {
1171        tracing::debug!(session_id = %args.session_id, "ACP prompt");
1172
1173        // Capture message_id; written to per-session slot only AFTER output_rx take succeeds
1174        // to prevent stale id from leaking when a prompt is rejected as "already in progress".
1175        #[cfg(feature = "unstable-message-id")]
1176        let turn_message_id: Option<String> = if self.message_ids_enabled {
1177            args.message_id.clone()
1178        } else {
1179            None
1180        };
1181
1182        // Capture session cwd for file:// boundary enforcement.
1183        let session_cwd = self
1184            .sessions
1185            .lock()
1186            .get(&args.session_id)
1187            .and_then(|e| e.working_dir.lock().clone())
1188            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
1189
1190        let (text, attachments) = self
1191            .collect_prompt_content(&args.prompt, &session_cwd)
1192            .await?;
1193
1194        let trimmed_text = text.trim_start();
1195        if trimmed_text.starts_with('/') {
1196            let is_acp_native = trimmed_text == "/help"
1197                || trimmed_text.starts_with("/help ")
1198                || trimmed_text == "/mode"
1199                || trimmed_text.starts_with("/mode ")
1200                || trimmed_text == "/clear"
1201                || trimmed_text.starts_with("/review")
1202                || trimmed_text == "/model"
1203                || trimmed_text.starts_with("/model ");
1204            if is_acp_native {
1205                return self
1206                    .handle_slash_command(&args.session_id, trimmed_text)
1207                    .await;
1208            }
1209        }
1210
1211        let (input_tx, output_rx) = {
1212            let sessions = self.sessions.lock();
1213            let entry = sessions
1214                .get(&args.session_id)
1215                .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
1216            let rx =
1217                entry.output_rx.lock().take().ok_or_else(|| {
1218                    acp::Error::internal_error().data("prompt already in progress")
1219                })?;
1220            entry.touch();
1221            // Write message_id here — output_rx take succeeded, prompt will proceed.
1222            #[cfg(feature = "unstable-message-id")]
1223            if let Some(ref mid) = turn_message_id {
1224                *entry
1225                    .current_message_id
1226                    .lock()
1227                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(mid.clone());
1228            }
1229            (entry.input_tx.clone(), rx)
1230        };
1231
1232        // Persist user message before sending to agent.
1233        if let Some(ref store) = self.store {
1234            let sid = args.session_id.to_string();
1235            let payload = text.clone();
1236            let store = store.clone();
1237            tokio::spawn(async move {
1238                if let Err(e) = store.save_acp_event(&sid, "user_message", &payload).await {
1239                    tracing::warn!(error = %e, "failed to persist user message");
1240                }
1241            });
1242        }
1243
1244        input_tx
1245            .send(ChannelMessage {
1246                text: text.clone(),
1247                attachments,
1248            })
1249            .await
1250            .map_err(|_| acp::Error::internal_error().data("agent channel closed"))?;
1251
1252        // Grab the cancel_signal so we can detect cancellation during the drain loop.
1253        let cancel_signal = self
1254            .sessions
1255            .lock()
1256            .get(&args.session_id)
1257            .map(|e| Arc::clone(&e.cancel_signal));
1258
1259        // Block until the agent finishes this turn (signals via Flush or channel close).
1260        let (cancelled, stop_hint, rx) = self
1261            .drain_agent_events(&args.session_id, output_rx, cancel_signal)
1262            .await;
1263
1264        // Return the receiver so future prompt() calls on this session can proceed.
1265        if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1266            *entry.output_rx.lock() = Some(rx);
1267        }
1268
1269        let stop_reason = if cancelled {
1270            acp::schema::StopReason::Cancelled
1271        } else {
1272            match stop_hint {
1273                Some(StopHint::MaxTokens) => acp::schema::StopReason::MaxTokens,
1274                Some(StopHint::MaxTurnRequests) => acp::schema::StopReason::MaxTurnRequests,
1275                None => acp::schema::StopReason::EndTurn,
1276            }
1277        };
1278
1279        // Generate session title after first successful agent response (fire-and-forget).
1280        if !cancelled {
1281            self.maybe_generate_session_title(&args.session_id, &text);
1282        }
1283
1284        // Clear per-turn message-id slot now that the turn is complete.
1285        #[cfg(feature = "unstable-message-id")]
1286        if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1287            *entry
1288                .current_message_id
1289                .lock()
1290                .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1291        }
1292
1293        #[cfg(feature = "unstable-message-id")]
1294        let resp = {
1295            let r = acp::schema::PromptResponse::new(stop_reason);
1296            if let Some(mid) = turn_message_id.as_ref() {
1297                r.user_message_id(mid.clone())
1298            } else {
1299                r
1300            }
1301        };
1302        #[cfg(not(feature = "unstable-message-id"))]
1303        let resp = acp::schema::PromptResponse::new(stop_reason);
1304        Ok(resp)
1305    }
1306
1307    #[allow(clippy::unused_async)]
1308    #[tracing::instrument(skip_all, name = "acp.handler.cancel", fields(session_id = %args.session_id))]
1309    pub(crate) async fn do_cancel(&self, args: acp::schema::CancelNotification) -> acp::Result<()> {
1310        tracing::debug!(session_id = %args.session_id, "ACP cancel");
1311        if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1312            entry.cancel_signal.notify_one();
1313        }
1314        Ok(())
1315    }
1316
1317    #[cfg(feature = "unstable-session-close")]
1318    #[allow(clippy::unused_async, dead_code)]
1319    #[tracing::instrument(skip_all, name = "acp.handler.close_session", fields(session_id = %args.session_id))]
1320    pub(crate) async fn do_close_session(
1321        &self,
1322        args: acp::schema::CloseSessionRequest,
1323    ) -> acp::Result<acp::schema::CloseSessionResponse> {
1324        tracing::debug!(session_id = %args.session_id, "ACP session closed");
1325        if let Some(entry) = self.sessions.lock().remove(&args.session_id) {
1326            entry.cancel_signal.notify_one();
1327        }
1328        Ok(acp::schema::CloseSessionResponse::default())
1329    }
1330
1331    #[tracing::instrument(skip_all, name = "acp.handler.load_session", fields(session_id = %args.session_id))]
1332    pub(crate) async fn do_load_session(
1333        &self,
1334        args: acp::schema::LoadSessionRequest,
1335        cx: &acp::ConnectionTo<acp::Client>,
1336    ) -> acp::Result<acp::schema::LoadSessionResponse> {
1337        #[cfg(feature = "unstable-session-add-dirs")]
1338        self.validate_additional_directories(&args.additional_directories)?;
1339        if self.sessions.lock().contains_key(&args.session_id) {
1340            return Ok(acp::schema::LoadSessionResponse::new());
1341        }
1342
1343        let Some(ref store) = self.store else {
1344            return Err(acp::Error::internal_error().data("session not found"));
1345        };
1346
1347        let exists = store
1348            .acp_session_exists(&args.session_id.to_string())
1349            .await
1350            .map_err(|e| {
1351                tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
1352                acp::Error::internal_error().data("internal error")
1353            })?;
1354
1355        if !exists {
1356            return Err(acp::Error::internal_error().data("session not found"));
1357        }
1358
1359        let events = store
1360            .load_acp_events(&args.session_id.to_string())
1361            .await
1362            .map_err(|e| {
1363                tracing::warn!(error = %e, session_id = %args.session_id, "failed to load ACP session events");
1364                acp::Error::internal_error().data("internal error")
1365            })?;
1366
1367        let session_cwd = args.cwd.clone();
1368        let conversation_id = resolve_conversation_id(store, &args.session_id).await;
1369
1370        let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1371        let cancel_signal = Arc::clone(&handle.cancel_signal);
1372        let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1373        let provider_override_for_ctx = Arc::clone(&provider_override);
1374        let acp_ctx = self.build_acp_context(
1375            &args.session_id,
1376            cx,
1377            cancel_signal,
1378            provider_override_for_ctx,
1379            session_cwd.clone(),
1380        );
1381        let shell_executor = acp_ctx.shell_executor.clone();
1382        let initial_model = self.initial_model();
1383        let entry = Self::make_session_entry(
1384            handle,
1385            initial_model,
1386            session_cwd.clone(),
1387            shell_executor,
1388            provider_override,
1389        );
1390
1391        // Spawn per-session notification drainer.
1392        let mut notify_rx = entry
1393            .notify_rx
1394            .lock()
1395            .take()
1396            .expect("notify_rx consumed once");
1397        let cx_drain = cx.clone();
1398        cx.spawn(async move {
1399            while let Some((notif, ack)) = notify_rx.recv().await {
1400                let _enter = tracing::info_span!("acp.session.notify").entered();
1401                if cx_drain.send_notification(notif).is_err() {
1402                    tracing::warn!("session_notification send failed; drainer exiting");
1403                    break;
1404                }
1405                ack.send(()).ok();
1406            }
1407            Ok(())
1408        })?;
1409
1410        self.sessions.lock().insert(args.session_id.clone(), entry);
1411
1412        let session_ctx = SessionContext {
1413            session_id: args.session_id.clone(),
1414            conversation_id,
1415            working_dir: session_cwd,
1416        };
1417
1418        let spawner = Arc::clone(&self.spawner);
1419        let span = tracing::info_span!("acp.session.agent_loop", session_id = %args.session_id);
1420        tokio::task::spawn_local(
1421            async move {
1422                (spawner)(channel, Some(acp_ctx), session_ctx).await;
1423            }
1424            .instrument(span),
1425        );
1426
1427        self.replay_session_events(&args.session_id, events).await;
1428
1429        let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1430        let load_resp =
1431            acp::schema::LoadSessionResponse::new().modes(build_mode_state(&default_mode_id));
1432
1433        self.send_commands_update_nowait(&args.session_id);
1434
1435        Ok(load_resp)
1436    }
1437
1438    #[tracing::instrument(skip_all, name = "acp.handler.list_sessions")]
1439    pub(crate) async fn do_list_sessions(
1440        &self,
1441        args: acp::schema::ListSessionsRequest,
1442    ) -> acp::Result<acp::schema::ListSessionsResponse> {
1443        let mut result: std::collections::HashMap<String, acp::schema::SessionInfo> = {
1444            let sessions = self.sessions.lock();
1445            sessions
1446                .iter()
1447                .filter_map(|(session_id, entry)| {
1448                    let working_dir = entry.working_dir.lock().clone().unwrap_or_default();
1449                    if let Some(ref filter) = args.cwd
1450                        && &working_dir != filter
1451                    {
1452                        return None;
1453                    }
1454                    let meta = model_meta(&entry.current_model.lock());
1455                    let mut info = acp::schema::SessionInfo::new(session_id.clone(), working_dir)
1456                        .updated_at(entry.created_at.to_rfc3339())
1457                        .meta(meta);
1458                    if let Some(ref t) = *entry.title.lock() {
1459                        info = info.title(t.clone());
1460                    }
1461                    Some((session_id.to_string(), info))
1462                })
1463                .collect()
1464        };
1465
1466        if let Some(ref store) = self.store {
1467            match store.list_acp_sessions(self.max_history).await {
1468                Ok(persisted) => {
1469                    for persisted_info in persisted {
1470                        let sid = acp::schema::SessionId::new(&*persisted_info.id);
1471                        if result.contains_key(&persisted_info.id) {
1472                            continue;
1473                        }
1474                        let info = acp::schema::SessionInfo::new(sid, std::path::PathBuf::new())
1475                            .title(persisted_info.title)
1476                            .updated_at(persisted_info.updated_at);
1477                        result.insert(persisted_info.id, info);
1478                    }
1479                }
1480                Err(e) => {
1481                    tracing::warn!(error = %e, "failed to list persisted ACP sessions");
1482                }
1483            }
1484        }
1485
1486        let mut sessions_vec: Vec<acp::schema::SessionInfo> = result.into_values().collect();
1487        sessions_vec.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1488
1489        Ok(acp::schema::ListSessionsResponse::new(sessions_vec))
1490    }
1491
1492    #[cfg(feature = "unstable-session-fork")]
1493    #[allow(dead_code, clippy::too_many_lines)]
1494    #[tracing::instrument(skip_all, name = "acp.handler.fork_session")]
1495    pub(crate) async fn do_fork_session(
1496        &self,
1497        args: acp::schema::ForkSessionRequest,
1498        cx: &acp::ConnectionTo<acp::Client>,
1499    ) -> acp::Result<acp::schema::ForkSessionResponse> {
1500        #[cfg(feature = "unstable-session-add-dirs")]
1501        self.validate_additional_directories(&args.additional_directories)?;
1502        let in_memory = self.sessions.lock().contains_key(&args.session_id);
1503
1504        if !in_memory {
1505            match self.store.as_ref() {
1506                None => return Err(acp::Error::internal_error().data("session not found")),
1507                Some(s) => {
1508                    let exists = s
1509                        .acp_session_exists(&args.session_id.to_string())
1510                        .await
1511                        .map_err(|e| {
1512                            tracing::warn!(error = %e, "failed to check ACP session existence");
1513                            acp::Error::internal_error().data("internal error")
1514                        })?;
1515                    if !exists {
1516                        return Err(acp::Error::internal_error().data("session not found"));
1517                    }
1518                }
1519            }
1520        }
1521
1522        if self.sessions.lock().len() >= self.max_sessions {
1523            let evict_id = {
1524                let sessions = self.sessions.lock();
1525                sessions
1526                    .iter()
1527                    .filter(|(_, e)| e.output_rx.lock().is_some())
1528                    .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1529                    .map(|(id, _)| id.clone())
1530            };
1531            match evict_id {
1532                Some(id) => {
1533                    if let Some(entry) = self.sessions.lock().remove(&id) {
1534                        entry.cancel_signal.notify_one();
1535                        tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1536                    }
1537                }
1538                None => {
1539                    return Err(acp::Error::internal_error().data("session limit reached"));
1540                }
1541            }
1542        }
1543
1544        let new_id = acp::schema::SessionId::new(uuid::Uuid::new_v4().to_string());
1545        tracing::debug!(source = %args.session_id, new = %new_id, "forking ACP session");
1546
1547        let new_conversation_id = self.fork_conversation(&args.session_id, &new_id).await?;
1548
1549        let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1550        let cancel_signal = Arc::clone(&handle.cancel_signal);
1551        let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1552        let provider_override_for_ctx = Arc::clone(&provider_override);
1553        let acp_ctx = self.build_acp_context(
1554            &new_id,
1555            cx,
1556            cancel_signal,
1557            provider_override_for_ctx,
1558            args.cwd.clone(),
1559        );
1560        let shell_executor = acp_ctx.shell_executor.clone();
1561        let initial_model = self.initial_model();
1562        let entry = Self::make_session_entry(
1563            handle,
1564            initial_model.clone(),
1565            args.cwd.clone(),
1566            shell_executor,
1567            provider_override,
1568        );
1569
1570        let mut notify_rx = entry
1571            .notify_rx
1572            .lock()
1573            .take()
1574            .expect("notify_rx consumed once");
1575        let cx_drain = cx.clone();
1576        cx.spawn(async move {
1577            while let Some((notif, ack)) = notify_rx.recv().await {
1578                if cx_drain.send_notification(notif).is_err() {
1579                    break;
1580                }
1581                ack.send(()).ok();
1582            }
1583            Ok(())
1584        })?;
1585
1586        self.sessions.lock().insert(new_id.clone(), entry);
1587
1588        let session_ctx = SessionContext {
1589            session_id: new_id.clone(),
1590            conversation_id: new_conversation_id,
1591            working_dir: args.cwd.clone(),
1592        };
1593
1594        let spawner = Arc::clone(&self.spawner);
1595        let span = tracing::info_span!("acp.session.agent_loop", session_id = %new_id);
1596        tokio::task::spawn_local(
1597            async move {
1598                (spawner)(channel, Some(acp_ctx), session_ctx).await;
1599            }
1600            .instrument(span),
1601        );
1602
1603        let available_models = self.available_models_snapshot();
1604        let config_options =
1605            build_config_options(&available_models, &initial_model, false, "suggest");
1606        let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1607        let mut resp =
1608            acp::schema::ForkSessionResponse::new(new_id).modes(build_mode_state(&default_mode_id));
1609        if !config_options.is_empty() {
1610            resp = resp.config_options(config_options);
1611        }
1612        Ok(resp)
1613    }
1614
1615    #[cfg(feature = "unstable-session-resume")]
1616    #[allow(dead_code)]
1617    #[tracing::instrument(skip_all, name = "acp.handler.resume_session")]
1618    pub(crate) async fn do_resume_session(
1619        &self,
1620        args: acp::schema::ResumeSessionRequest,
1621        cx: &acp::ConnectionTo<acp::Client>,
1622    ) -> acp::Result<acp::schema::ResumeSessionResponse> {
1623        #[cfg(feature = "unstable-session-add-dirs")]
1624        self.validate_additional_directories(&args.additional_directories)?;
1625        if self.sessions.lock().contains_key(&args.session_id) {
1626            return Ok(acp::schema::ResumeSessionResponse::new());
1627        }
1628
1629        let Some(ref store) = self.store else {
1630            return Err(acp::Error::internal_error().data("session not found"));
1631        };
1632
1633        let exists = store
1634            .acp_session_exists(&args.session_id.to_string())
1635            .await
1636            .map_err(|e| {
1637                tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
1638                acp::Error::internal_error().data("internal error")
1639            })?;
1640
1641        if !exists {
1642            return Err(acp::Error::internal_error().data("session not found"));
1643        }
1644
1645        if self.sessions.lock().len() >= self.max_sessions {
1646            let evict_id = {
1647                let sessions = self.sessions.lock();
1648                sessions
1649                    .iter()
1650                    .filter(|(id, e)| *id != &args.session_id && e.output_rx.lock().is_some())
1651                    .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1652                    .map(|(id, _)| id.clone())
1653            };
1654            match evict_id {
1655                Some(id) => {
1656                    if let Some(entry) = self.sessions.lock().remove(&id) {
1657                        entry.cancel_signal.notify_one();
1658                        tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1659                    }
1660                }
1661                None => {
1662                    return Err(acp::Error::internal_error().data("session limit reached"));
1663                }
1664            }
1665        }
1666
1667        let conversation_id = resolve_conversation_id(store, &args.session_id).await;
1668
1669        let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1670        let cancel_signal = Arc::clone(&handle.cancel_signal);
1671        let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1672        let provider_override_for_ctx = Arc::clone(&provider_override);
1673        let acp_ctx = self.build_acp_context(
1674            &args.session_id,
1675            cx,
1676            cancel_signal,
1677            provider_override_for_ctx,
1678            args.cwd.clone(),
1679        );
1680        let shell_executor = acp_ctx.shell_executor.clone();
1681        let initial_model = self.initial_model();
1682        let entry = Self::make_session_entry(
1683            handle,
1684            initial_model,
1685            args.cwd.clone(),
1686            shell_executor,
1687            provider_override,
1688        );
1689
1690        let mut notify_rx = entry
1691            .notify_rx
1692            .lock()
1693            .take()
1694            .expect("notify_rx consumed once");
1695        let cx_drain = cx.clone();
1696        cx.spawn(async move {
1697            while let Some((notif, ack)) = notify_rx.recv().await {
1698                if cx_drain.send_notification(notif).is_err() {
1699                    break;
1700                }
1701                ack.send(()).ok();
1702            }
1703            Ok(())
1704        })?;
1705
1706        self.sessions.lock().insert(args.session_id.clone(), entry);
1707
1708        let session_ctx = SessionContext {
1709            session_id: args.session_id.clone(),
1710            conversation_id,
1711            working_dir: args.cwd,
1712        };
1713
1714        let spawner = Arc::clone(&self.spawner);
1715        let span = tracing::info_span!("acp.session.agent_loop", session_id = %args.session_id);
1716        tokio::task::spawn_local(
1717            async move {
1718                (spawner)(channel, Some(acp_ctx), session_ctx).await;
1719            }
1720            .instrument(span),
1721        );
1722
1723        Ok(acp::schema::ResumeSessionResponse::new())
1724    }
1725
1726    #[allow(clippy::unused_async)]
1727    #[tracing::instrument(skip_all, name = "acp.handler.set_session_config_option")]
1728    pub(crate) async fn do_set_session_config_option(
1729        &self,
1730        args: acp::schema::SetSessionConfigOptionRequest,
1731    ) -> acp::Result<acp::schema::SetSessionConfigOptionResponse> {
1732        let config_id = args.config_id.0.clone();
1733        #[cfg(not(feature = "unstable-boolean-config"))]
1734        let value_str: std::sync::Arc<str> = args.value.0.clone();
1735        #[cfg(feature = "unstable-boolean-config")]
1736        let value_str: std::sync::Arc<str> = match &args.value {
1737            acp::schema::SessionConfigOptionValue::ValueId { value } => value.0.clone(),
1738            acp::schema::SessionConfigOptionValue::Boolean { value } => {
1739                if *value { "true" } else { "false" }.into()
1740            }
1741            _ => "".into(),
1742        };
1743        let value: &str = &value_str;
1744
1745        let (current_model, thinking, auto_approve) = {
1746            let sessions = self.sessions.lock();
1747            let entry = sessions
1748                .get(&args.session_id)
1749                .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1750
1751            self.apply_session_config(entry, config_id.as_ref(), value, &args.session_id)?;
1752
1753            (
1754                entry.current_model.lock().clone(),
1755                entry.thinking_enabled.load(Ordering::Relaxed),
1756                entry.auto_approve_level.lock().clone(),
1757            )
1758        };
1759
1760        let config_options = build_config_options(
1761            &self.available_models_snapshot(),
1762            &current_model,
1763            thinking,
1764            &auto_approve,
1765        );
1766
1767        let changed_option = config_options.iter().find(|o| o.id.0 == config_id).cloned();
1768
1769        if let Some(option) = changed_option {
1770            let update = acp::schema::SessionUpdate::ConfigOptionUpdate(
1771                acp::schema::ConfigOptionUpdate::new(vec![option]),
1772            );
1773            self.send_notification_nowait(
1774                &args.session_id,
1775                acp::schema::SessionNotification::new(args.session_id.clone(), update),
1776            );
1777
1778            if config_id.as_ref() == "model" {
1779                let info_update = acp::schema::SessionUpdate::SessionInfoUpdate(
1780                    acp::schema::SessionInfoUpdate::new().meta(model_meta(&current_model)),
1781                );
1782                self.send_notification_nowait(
1783                    &args.session_id,
1784                    acp::schema::SessionNotification::new(args.session_id.clone(), info_update),
1785                );
1786            }
1787        }
1788
1789        Ok(acp::schema::SetSessionConfigOptionResponse::new(
1790            config_options,
1791        ))
1792    }
1793
1794    #[tracing::instrument(skip_all, name = "acp.handler.set_session_mode")]
1795    pub(crate) async fn do_set_session_mode(
1796        &self,
1797        args: acp::schema::SetSessionModeRequest,
1798    ) -> acp::Result<acp::schema::SetSessionModeResponse> {
1799        let valid_ids: &[&str] = &["code", "architect", "ask"];
1800        let mode_str = args.mode_id.0.as_ref();
1801        if !valid_ids.contains(&mode_str) {
1802            return Err(acp::Error::invalid_request().data(format!("unknown mode: {mode_str}")));
1803        }
1804
1805        {
1806            let sessions = self.sessions.lock();
1807            let entry = sessions
1808                .get(&args.session_id)
1809                .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1810            *entry.current_mode.lock() = args.mode_id.clone();
1811        }
1812
1813        tracing::debug!(session_id = %args.session_id, mode = %mode_str, "ACP session mode switched");
1814
1815        let update = acp::schema::SessionUpdate::CurrentModeUpdate(
1816            acp::schema::CurrentModeUpdate::new(args.mode_id.clone()),
1817        );
1818        let notification = acp::schema::SessionNotification::new(args.session_id.clone(), update);
1819        if let Err(e) = self.send_notification(&args.session_id, notification).await {
1820            tracing::warn!(error = %e, "failed to send current_mode_update");
1821        }
1822
1823        Ok(acp::schema::SetSessionModeResponse::new())
1824    }
1825
1826    /// Validate `requested` paths against the configured allowlist.
1827    ///
1828    /// Each requested path is canonicalized and checked with `Path::starts_with` (component-aware)
1829    /// against every entry in `self.additional_directories_allow`. Returns an `invalid_params`
1830    /// error if any path is not covered by the allowlist.
1831    #[cfg(feature = "unstable-session-add-dirs")]
1832    fn validate_additional_directories(
1833        &self,
1834        requested: &[std::path::PathBuf],
1835    ) -> acp::Result<Vec<std::path::PathBuf>> {
1836        if requested.is_empty() {
1837            return Ok(Vec::new());
1838        }
1839        if self.additional_directories_allow.is_empty() {
1840            return Err(acp::Error::invalid_params()
1841                .data("additional_directories not permitted: allowlist is empty"));
1842        }
1843        let mut out = Vec::with_capacity(requested.len());
1844        for p in requested {
1845            let canon = std::fs::canonicalize(p).map_err(|e| {
1846                acp::Error::invalid_params()
1847                    .data(format!("cannot canonicalize {}: {e}", p.display()))
1848            })?;
1849            let allowed = self
1850                .additional_directories_allow
1851                .iter()
1852                .any(|allow| canon.starts_with(allow));
1853            if !allowed {
1854                return Err(acp::Error::invalid_params().data(format!(
1855                    "{} is not in the additional_directories allowlist",
1856                    canon.display()
1857                )));
1858            }
1859            out.push(canon);
1860        }
1861        Ok(out)
1862    }
1863
1864    #[cfg(feature = "unstable-session-model")]
1865    #[allow(clippy::unused_async, dead_code)]
1866    #[tracing::instrument(skip_all, name = "acp.handler.set_session_model")]
1867    pub(crate) async fn do_set_session_model(
1868        &self,
1869        args: acp::schema::SetSessionModelRequest,
1870    ) -> acp::Result<acp::schema::SetSessionModelResponse> {
1871        let model_id: &str = &args.model_id.0;
1872
1873        let Some(ref factory) = self.provider_factory else {
1874            return Err(acp::Error::internal_error().data("model switching not configured"));
1875        };
1876
1877        if !self
1878            .available_models_snapshot()
1879            .iter()
1880            .any(|m| m == model_id)
1881        {
1882            return Err(acp::Error::invalid_request().data("model not in allowed list"));
1883        }
1884
1885        let Some(new_provider) = factory(model_id) else {
1886            return Err(acp::Error::invalid_request().data("unknown model"));
1887        };
1888
1889        {
1890            let sessions = self.sessions.lock();
1891            let entry = sessions
1892                .get(&args.session_id)
1893                .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
1894            *entry.provider_override.write() = Some(new_provider);
1895            model_id.clone_into(&mut *entry.current_model.lock());
1896        }
1897
1898        tracing::debug!(session_id = %args.session_id, model = %model_id, "ACP session model switched via set_session_model");
1899
1900        let info_update = acp::schema::SessionUpdate::SessionInfoUpdate(
1901            acp::schema::SessionInfoUpdate::new().meta(model_meta(model_id)),
1902        );
1903        self.send_notification_nowait(
1904            &args.session_id,
1905            acp::schema::SessionNotification::new(args.session_id.clone(), info_update),
1906        );
1907
1908        Ok(acp::schema::SetSessionModelResponse::new())
1909    }
1910}
1911
1912impl ZephAcpAgentState {
1913    fn apply_session_config(
1914        &self,
1915        entry: &SessionEntry,
1916        config_id: &str,
1917        value: &str,
1918        session_id: &acp::schema::SessionId,
1919    ) -> acp::Result<()> {
1920        match config_id {
1921            "model" => {
1922                let Some(ref factory) = self.provider_factory else {
1923                    return Err(acp::Error::internal_error().data("model switching not configured"));
1924                };
1925                let available_models = self.available_models_snapshot();
1926                if !available_models.iter().any(|m| m == value) {
1927                    return Err(acp::Error::invalid_request().data("model not in allowed list"));
1928                }
1929                let Some(new_provider) = factory(value) else {
1930                    return Err(acp::Error::invalid_request().data("unknown model"));
1931                };
1932                *entry.provider_override.write() = Some(new_provider);
1933                value.clone_into(&mut *entry.current_model.lock());
1934                tracing::debug!(session_id = %session_id, model = %value, "ACP model switched");
1935            }
1936            "thinking" => {
1937                let enabled = match value {
1938                    "on" => true,
1939                    "off" => false,
1940                    _ => {
1941                        return Err(
1942                            acp::Error::invalid_request().data("thinking value must be on or off")
1943                        );
1944                    }
1945                };
1946                entry.thinking_enabled.store(enabled, Ordering::Relaxed);
1947                tracing::debug!(session_id = %session_id, thinking = %enabled, "ACP thinking toggled");
1948            }
1949            "auto_approve" => {
1950                if !["suggest", "auto-edit", "full-auto"].contains(&value) {
1951                    return Err(acp::Error::invalid_request()
1952                        .data("auto_approve must be suggest, auto-edit, or full-auto"));
1953                }
1954                value.clone_into(&mut *entry.auto_approve_level.lock());
1955                tracing::debug!(session_id = %session_id, auto_approve = %value, "ACP auto-approve level changed");
1956            }
1957            _ => {
1958                return Err(acp::Error::invalid_request().data("unknown config_id"));
1959            }
1960        }
1961        Ok(())
1962    }
1963
1964    /// Dispatch a slash command, returning a short-circuit `PromptResponse`.
1965    async fn handle_slash_command(
1966        &self,
1967        session_id: &acp::schema::SessionId,
1968        text: &str,
1969    ) -> acp::Result<acp::schema::PromptResponse> {
1970        let mut parts = text.splitn(2, ' ');
1971        let cmd = parts.next().unwrap_or("").trim();
1972        let arg = parts.next().unwrap_or("").trim();
1973
1974        let reply = match cmd {
1975            "/help" => "Available commands:\n\
1976                 /help — show this message\n\
1977                 /model <id> — switch the active model\n\
1978                 /mode <code|architect|ask> — switch session mode\n\
1979                 /clear — clear session history\n\
1980                 /compact — summarize and compact context\n\
1981                 /review [path] — review recent changes (read-only)"
1982                .to_owned(),
1983            "/model" => self.handle_model_command(session_id, arg)?,
1984            "/review" => {
1985                return self.handle_review_command(session_id, arg);
1986            }
1987            "/mode" => {
1988                let valid_ids: &[&str] = &["code", "architect", "ask"];
1989                if !valid_ids.contains(&arg) {
1990                    return Err(acp::Error::invalid_request().data(format!("unknown mode: {arg}")));
1991                }
1992                {
1993                    let sessions = self.sessions.lock();
1994                    let entry = sessions
1995                        .get(session_id)
1996                        .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1997                    *entry.current_mode.lock() = acp::schema::SessionModeId::new(arg);
1998                }
1999                let update = acp::schema::SessionUpdate::CurrentModeUpdate(
2000                    acp::schema::CurrentModeUpdate::new(acp::schema::SessionModeId::new(arg)),
2001                );
2002                let notification =
2003                    acp::schema::SessionNotification::new(session_id.clone(), update);
2004                if let Err(e) = self.send_notification(session_id, notification).await {
2005                    tracing::warn!(error = %e, "failed to send current_mode_update from /mode");
2006                }
2007                format!("Switched to mode: {arg}")
2008            }
2009            "/clear" => {
2010                if let Some(ref store) = self.store {
2011                    let sid = session_id.to_string();
2012                    let store = store.clone();
2013                    tokio::spawn(async move {
2014                        if let Err(e) = store.delete_acp_session(&sid).await {
2015                            tracing::warn!(error = %e, "failed to clear session history");
2016                        }
2017                        if let Err(e) = store.create_acp_session(&sid).await {
2018                            tracing::warn!(error = %e, "failed to recreate session after clear");
2019                        }
2020                    });
2021                }
2022                // Send sentinel to clear in-memory agent context.
2023                let tx = self
2024                    .sessions
2025                    .lock()
2026                    .get(session_id)
2027                    .map(|e| e.input_tx.clone());
2028                if let Some(tx) = tx {
2029                    let _ = tx.try_send(ChannelMessage {
2030                        text: "/clear".to_owned(),
2031                        attachments: vec![],
2032                    });
2033                }
2034                "Session history cleared.".to_owned()
2035            }
2036            _ => {
2037                return Err(acp::Error::invalid_request().data(format!("unknown command: {cmd}")));
2038            }
2039        };
2040
2041        let update = acp::schema::SessionUpdate::AgentMessageChunk(acp::schema::ContentChunk::new(
2042            reply.clone().into(),
2043        ));
2044        let notification = acp::schema::SessionNotification::new(session_id.clone(), update);
2045        if let Err(e) = self.send_notification(session_id, notification).await {
2046            tracing::warn!(error = %e, "failed to send command reply");
2047        }
2048
2049        Ok(acp::schema::PromptResponse::new(
2050            acp::schema::StopReason::EndTurn,
2051        ))
2052    }
2053
2054    fn handle_review_command(
2055        &self,
2056        session_id: &acp::schema::SessionId,
2057        arg: &str,
2058    ) -> acp::Result<acp::schema::PromptResponse> {
2059        // Validate arg to prevent prompt injection: allow only safe path characters.
2060        if !arg.is_empty() {
2061            let valid = arg
2062                .chars()
2063                .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '/' | ' ' | '-'));
2064            if !valid || arg.len() > 512 {
2065                return Err(acp::Error::invalid_request()
2066                    .data("invalid path argument: only alphanumeric, _, ., /, space, - allowed (max 512 chars)"));
2067            }
2068        }
2069        let review_prompt = if arg.is_empty() {
2070            "Review the recent changes in this workspace. Show a plain-text diff summary. \
2071             Use only read_file and list_directory tools. Do not execute any commands or \
2072             write any files."
2073                .to_owned()
2074        } else {
2075            format!(
2076                "Review the following file or path: {arg}. Show a plain-text diff summary. \
2077                 Use only read_file and list_directory tools. Do not execute any commands or \
2078                 write any files."
2079            )
2080        };
2081
2082        let tx = self
2083            .sessions
2084            .lock()
2085            .get(session_id)
2086            .map(|e| e.input_tx.clone());
2087        let Some(tx) = tx else {
2088            return Err(acp::Error::invalid_request().data("session not found"));
2089        };
2090        if tx
2091            .try_send(ChannelMessage {
2092                text: review_prompt,
2093                attachments: vec![],
2094            })
2095            .is_err()
2096        {
2097            tracing::warn!(%session_id, "failed to forward /review to agent input");
2098        }
2099
2100        Ok(acp::schema::PromptResponse::new(
2101            acp::schema::StopReason::EndTurn,
2102        ))
2103    }
2104
2105    fn resolve_model_fuzzy(&self, query: &str) -> acp::Result<String> {
2106        let available_models = self.available_models_snapshot();
2107        if available_models.iter().any(|m| m == query) {
2108            return Ok(query.to_owned());
2109        }
2110        let tokens: Vec<String> = query
2111            .to_lowercase()
2112            .split_whitespace()
2113            .map(String::from)
2114            .collect();
2115        let candidates: Vec<&String> = available_models
2116            .iter()
2117            .filter(|m| {
2118                let lower = m.to_lowercase();
2119                tokens.iter().all(|t| lower.contains(t.as_str()))
2120            })
2121            .collect();
2122        match candidates.len() {
2123            0 => {
2124                let models = available_models.join(", ");
2125                Err(acp::Error::invalid_request()
2126                    .data(format!("no matching model found. Available: {models}")))
2127            }
2128            1 => Ok(candidates[0].clone()),
2129            _ => {
2130                let names: Vec<&str> = candidates.iter().map(|s| s.as_str()).collect();
2131                Err(acp::Error::invalid_request()
2132                    .data(format!("ambiguous model, candidates: {}", names.join(", "))))
2133            }
2134        }
2135    }
2136
2137    fn handle_model_command(
2138        &self,
2139        session_id: &acp::schema::SessionId,
2140        arg: &str,
2141    ) -> acp::Result<String> {
2142        let available_models = self.available_models_snapshot();
2143        if arg.is_empty() {
2144            let models = available_models.join(", ");
2145            return Ok(format!("Available models: {models}"));
2146        }
2147        let Some(ref factory) = self.provider_factory else {
2148            return Err(acp::Error::internal_error().data("model switching not configured"));
2149        };
2150        let resolved = self.resolve_model_fuzzy(arg)?;
2151        let Some(new_provider) = factory(&resolved) else {
2152            return Err(acp::Error::invalid_request().data("unknown model"));
2153        };
2154        let sessions = self.sessions.lock();
2155        let entry = sessions
2156            .get(session_id)
2157            .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
2158        *entry.provider_override.write() = Some(new_provider);
2159        resolved.clone_into(&mut *entry.current_model.lock());
2160        Ok(format!("Switched to model: {resolved}"))
2161    }
2162
2163    /// Collect text and attachments from ACP content blocks.
2164    ///
2165    /// Resolves `ResourceLink` URIs, decodes images, and formats embedded resources.
2166    /// Returns an error if the resulting text exceeds `MAX_PROMPT_BYTES`.
2167    async fn collect_prompt_content(
2168        &self,
2169        blocks: &[acp::schema::ContentBlock],
2170        session_cwd: &std::path::Path,
2171    ) -> acp::Result<(String, Vec<zeph_core::channel::Attachment>)> {
2172        let mut text = String::new();
2173        let mut attachments = Vec::new();
2174        for block in blocks {
2175            match block {
2176                acp::schema::ContentBlock::Text(t) => {
2177                    if !text.is_empty() {
2178                        text.push('\n');
2179                    }
2180                    text.push_str(&t.text);
2181                }
2182                acp::schema::ContentBlock::Image(img) => {
2183                    if !SUPPORTED_IMAGE_MIMES.contains(&img.mime_type.as_str()) {
2184                        tracing::debug!(mime_type = %img.mime_type, "unsupported image MIME type in ACP prompt, skipping");
2185                    } else if img.data.len() > MAX_IMAGE_BASE64_BYTES {
2186                        tracing::warn!(
2187                            size = img.data.len(),
2188                            max = MAX_IMAGE_BASE64_BYTES,
2189                            "image base64 data exceeds size limit, skipping"
2190                        );
2191                    } else {
2192                        use base64::Engine as _;
2193                        match base64::engine::general_purpose::STANDARD.decode(&img.data) {
2194                            Ok(bytes) => {
2195                                attachments.push(zeph_core::channel::Attachment {
2196                                    kind: zeph_core::channel::AttachmentKind::Image,
2197                                    data: bytes,
2198                                    filename: Some(format!(
2199                                        "image.{}",
2200                                        mime_to_ext(&img.mime_type)
2201                                    )),
2202                                });
2203                            }
2204                            Err(e) => {
2205                                tracing::debug!(error = %e, "failed to decode image base64, skipping");
2206                            }
2207                        }
2208                    }
2209                }
2210                acp::schema::ContentBlock::Resource(embedded) => {
2211                    if let acp::schema::EmbeddedResourceResource::TextResourceContents(res) =
2212                        &embedded.resource
2213                    {
2214                        if !text.is_empty() {
2215                            text.push('\n');
2216                        }
2217                        if res
2218                            .mime_type
2219                            .as_deref()
2220                            .is_some_and(|m| m == DIAGNOSTICS_MIME_TYPE)
2221                        {
2222                            format_diagnostics_block(&res.text, &mut text);
2223                        } else if res.mime_type.is_some()
2224                            && res.mime_type.as_deref() != Some("text/plain")
2225                        {
2226                            tracing::debug!(mime_type = ?res.mime_type, uri = %res.uri, "unknown resource mime type — skipping");
2227                        } else {
2228                            text.push_str("<resource name=\"");
2229                            text.push_str(&res.uri.replace('"', "&quot;"));
2230                            text.push_str("\">");
2231                            text.push_str(&res.text);
2232                            text.push_str("</resource>");
2233                        }
2234                    }
2235                }
2236                acp::schema::ContentBlock::Audio(_) => {
2237                    tracing::warn!("unsupported content block: Audio — skipping");
2238                }
2239                acp::schema::ContentBlock::ResourceLink(link) => {
2240                    match resolve_resource_link(link, session_cwd).await {
2241                        Ok(content) => {
2242                            // S-2: XML-escape URI (attribute) and content (body) using full escaping.
2243                            let escaped_uri = xml_escape(&link.uri);
2244                            let escaped_content = xml_escape(&content);
2245                            if !text.is_empty() {
2246                                text.push('\n');
2247                            }
2248                            text.push_str("<resource uri=\"");
2249                            text.push_str(&escaped_uri);
2250                            text.push_str("\">");
2251                            text.push_str(&escaped_content);
2252                            text.push_str("</resource>");
2253                        }
2254                        Err(e) => {
2255                            tracing::warn!(uri = %link.uri, error = %e, "ResourceLink resolution failed — skipping");
2256                        }
2257                    }
2258                }
2259                &_ => {
2260                    tracing::warn!("unsupported content block: unknown — skipping");
2261                }
2262            }
2263        }
2264        if text.len() > MAX_PROMPT_BYTES {
2265            return Err(acp::Error::invalid_request().data("prompt too large"));
2266        }
2267        Ok((text, attachments))
2268    }
2269
2270    /// Drain events from `rx` until `Flush` or channel close, forwarding each as an ACP
2271    /// notification. Returns `(cancelled, stop_hint, rx)`.
2272    async fn drain_agent_events(
2273        &self,
2274        session_id: &acp::schema::SessionId,
2275        output_rx: tokio::sync::mpsc::Receiver<LoopbackEvent>,
2276        cancel_signal: Option<std::sync::Arc<tokio::sync::Notify>>,
2277    ) -> (
2278        bool,
2279        Option<StopHint>,
2280        tokio::sync::mpsc::Receiver<LoopbackEvent>,
2281    ) {
2282        let mut rx = output_rx;
2283        let mut cancelled = false;
2284        let mut stop_hint: Option<StopHint> = None;
2285        // Capture turn message_id once per drain to avoid re-locking sessions per event.
2286        #[cfg(feature = "unstable-message-id")]
2287        let turn_mid: Option<String> = self
2288            .sessions
2289            .lock()
2290            .get(session_id)
2291            .and_then(|e| e.current_message_id.lock().ok().and_then(|g| g.clone()));
2292        loop {
2293            let event = if let Some(ref signal) = cancel_signal {
2294                tokio::select! {
2295                    biased;
2296                    () = signal.notified() => { cancelled = true; break; }
2297                    ev = rx.recv() => ev,
2298                }
2299            } else {
2300                rx.recv().await
2301            };
2302            let Some(event) = event else { break };
2303            if let LoopbackEvent::Stop(hint) = event {
2304                stop_hint = Some(hint);
2305                continue;
2306            }
2307            let is_flush = matches!(event, LoopbackEvent::Flush);
2308            // Extract terminal_id before consuming the event so we can release after notify.
2309            let pending_terminal_release = if let LoopbackEvent::ToolOutput(ref data) = event {
2310                data.terminal_id.clone()
2311            } else {
2312                None
2313            };
2314            for update in loopback_event_to_updates(event) {
2315                if let Some(ref store) = self.store {
2316                    let sid = session_id.to_string();
2317                    let (event_type, payload) = session_update_to_event(&update);
2318                    let store = store.clone();
2319                    tokio::spawn(async move {
2320                        if let Err(e) = store.save_acp_event(&sid, event_type, &payload).await {
2321                            tracing::warn!(error = %e, "failed to persist session event");
2322                        }
2323                    });
2324                }
2325                #[cfg(feature = "unstable-message-id")]
2326                let update = apply_message_id_to_chunk(update, turn_mid.as_deref());
2327                #[cfg(not(feature = "unstable-message-id"))]
2328                let update = update;
2329                let notification =
2330                    acp::schema::SessionNotification::new(session_id.clone(), update);
2331                if let Err(e) = self.send_notification(session_id, notification).await {
2332                    tracing::warn!(error = %e, "failed to send notification");
2333                    break;
2334                }
2335            }
2336            // Release the terminal after tool_call_update has been sent.
2337            if let Some(terminal_id) = pending_terminal_release {
2338                let executor = self
2339                    .sessions
2340                    .lock()
2341                    .get(session_id)
2342                    .and_then(|e| e.shell_executor.clone());
2343                if let Some(executor) = executor {
2344                    executor.release_terminal(terminal_id);
2345                }
2346            }
2347            if is_flush {
2348                break;
2349            }
2350        }
2351        (cancelled, stop_hint, rx)
2352    }
2353
2354    /// Create a forked conversation for `new_id` from `source_id`.
2355    ///
2356    /// Copies ACP events and conversation history from the source session synchronously before
2357    /// the agent loop is spawned to eliminate race conditions where the agent starts
2358    /// `load_history()` before the copy completes.
2359    #[allow(dead_code)]
2360    async fn fork_conversation(
2361        &self,
2362        source_id: &acp::schema::SessionId,
2363        new_id: &acp::schema::SessionId,
2364    ) -> acp::Result<Option<ConversationId>> {
2365        let Some(s) = &self.store else {
2366            return Ok(None);
2367        };
2368        let source_events = s
2369            .load_acp_events(&source_id.to_string())
2370            .await
2371            .map_err(|e| {
2372                tracing::warn!(error = %e, "failed to load ACP session events for fork");
2373                acp::Error::internal_error().data("internal error")
2374            })?;
2375
2376        let new_id_str = new_id.to_string();
2377        let pairs: Vec<(&str, &str)> = source_events
2378            .iter()
2379            .map(|ev| (ev.event_type.as_str(), ev.payload.as_str()))
2380            .collect();
2381
2382        match s.create_conversation().await {
2383            Ok(forked_cid) => {
2384                let forked_from_cid = s
2385                    .get_acp_session_conversation_id(&source_id.to_string())
2386                    .await
2387                    .unwrap_or(None);
2388                if let Err(e) = s
2389                    .create_acp_session_with_conversation(&new_id_str, forked_cid)
2390                    .await
2391                {
2392                    tracing::warn!(error = %e, "failed to persist forked ACP session mapping");
2393                }
2394                if let Err(e) = s.import_acp_events(&new_id_str, &pairs).await {
2395                    tracing::warn!(error = %e, "failed to import events for forked session");
2396                }
2397                if let Some(src_cid) = forked_from_cid
2398                    && let Err(e) = s.copy_conversation(src_cid, forked_cid).await
2399                {
2400                    tracing::warn!(error = %e, "failed to copy conversation for forked session");
2401                }
2402                Ok(Some(forked_cid))
2403            }
2404            Err(e) => {
2405                tracing::warn!(error = %e, "failed to create conversation for forked session; history will not be copied");
2406                if let Err(e2) = s.create_acp_session(&new_id_str).await {
2407                    tracing::warn!(error = %e2, "failed to persist forked ACP session");
2408                }
2409                if let Err(e2) = s.import_acp_events(&new_id_str, &pairs).await {
2410                    tracing::warn!(error = %e2, "failed to import events for forked session");
2411                }
2412                Ok(None)
2413            }
2414        }
2415    }
2416
2417    /// Spawn a background title-generation task for the session's first prompt.
2418    fn maybe_generate_session_title(&self, session_id: &acp::schema::SessionId, user_text: &str) {
2419        let (should_generate, current_model, notify_tx) = {
2420            let sessions = self.sessions.lock();
2421            let Some(entry) = sessions.get(session_id) else {
2422                return;
2423            };
2424            let already_done = entry.first_prompt_done.load(Ordering::Relaxed);
2425            if already_done {
2426                return;
2427            }
2428            entry.first_prompt_done.store(true, Ordering::Relaxed);
2429            let model = entry.current_model.lock().clone();
2430            let tx = entry.notify_tx.clone();
2431            (true, model, tx)
2432        };
2433        if !should_generate {
2434            return;
2435        }
2436        if let Some(ref factory) = self.provider_factory
2437            && !current_model.is_empty()
2438            && let Some(provider) = factory(&current_model)
2439        {
2440            let user_text = user_text.to_owned();
2441            let sid = session_id.clone();
2442            let store = self.store.clone();
2443            let title_max_chars = self.title_max_chars;
2444            let sessions = Arc::clone(&self.sessions);
2445            tokio::spawn(async move {
2446                let prompt = format!(
2447                    "Generate a concise 5-7 word title for a conversation that starts \
2448                     with: {user_text}\nRespond with only the title, no quotes."
2449                );
2450                let messages = vec![zeph_llm::provider::Message::from_legacy(
2451                    zeph_llm::provider::Role::User,
2452                    &prompt,
2453                )];
2454                let sid_str = sid.to_string();
2455                let sid_prefix = &sid_str[..8.min(sid_str.len())];
2456                let fallback_title = format!("Session {sid_prefix}");
2457                let title = match tokio::time::timeout(
2458                    std::time::Duration::from_secs(15),
2459                    provider.chat(&messages),
2460                )
2461                .await
2462                {
2463                    Ok(Ok(t)) => truncate_to_chars(t.trim(), title_max_chars),
2464                    Ok(Err(e)) => {
2465                        tracing::debug!(error = %e, "title generation LLM call failed");
2466                        fallback_title
2467                    }
2468                    Err(_) => {
2469                        tracing::debug!("title generation timed out");
2470                        fallback_title
2471                    }
2472                };
2473                if let Some(ref store) = store {
2474                    let _ = store.update_session_title(&sid.to_string(), &title).await;
2475                }
2476                if let Some(entry) = sessions.lock().get(&sid) {
2477                    *entry.title.lock() = Some(title.clone());
2478                }
2479                let update = acp::schema::SessionUpdate::SessionInfoUpdate(
2480                    acp::schema::SessionInfoUpdate::new().title(title),
2481                );
2482                let notification = acp::schema::SessionNotification::new(sid, update);
2483                let (tx, _rx) = oneshot::channel();
2484                notify_tx.send((notification, tx)).ok();
2485            });
2486        }
2487    }
2488
2489    /// Build a fresh `SessionEntry` from a `LoopbackHandle`.
2490    fn make_session_entry(
2491        handle: LoopbackHandle,
2492        initial_model: String,
2493        cwd: PathBuf,
2494        shell_executor: Option<AcpShellExecutor>,
2495        provider_override: Arc<RwLock<Option<AnyProvider>>>,
2496    ) -> SessionEntry {
2497        let (notify_tx, notify_rx) = mpsc::unbounded_channel();
2498        let now_ms = u64::try_from(
2499            std::time::SystemTime::now()
2500                .duration_since(std::time::UNIX_EPOCH)
2501                .unwrap_or_default()
2502                .as_millis(),
2503        )
2504        .unwrap_or(u64::MAX);
2505        SessionEntry {
2506            input_tx: handle.input_tx,
2507            output_rx: Mutex::new(Some(handle.output_rx)),
2508            cancel_signal: handle.cancel_signal,
2509            last_active_ms: AtomicU64::new(now_ms),
2510            created_at: chrono::Utc::now(),
2511            working_dir: Mutex::new(Some(cwd)),
2512            notify_tx,
2513            notify_rx: Mutex::new(Some(notify_rx)),
2514            provider_override,
2515            current_model: Mutex::new(initial_model),
2516            current_mode: Mutex::new(acp::schema::SessionModeId::new(DEFAULT_MODE_ID)),
2517            first_prompt_done: AtomicBool::new(false),
2518            title: Mutex::new(None),
2519            thinking_enabled: AtomicBool::new(false),
2520            auto_approve_level: Mutex::new("suggest".to_owned()),
2521            shell_executor,
2522            #[cfg(feature = "unstable-message-id")]
2523            current_message_id: std::sync::Mutex::new(None),
2524        }
2525    }
2526
2527    /// Replay stored `AcpSessionEvent` records as ACP notifications for the session.
2528    async fn replay_session_events(
2529        &self,
2530        session_id: &acp::schema::SessionId,
2531        events: Vec<zeph_memory::store::AcpSessionEvent>,
2532    ) {
2533        for ev in events {
2534            let update = match ev.event_type.as_str() {
2535                "user_message" => acp::schema::SessionUpdate::UserMessageChunk(
2536                    acp::schema::ContentChunk::new(ev.payload.into()),
2537                ),
2538                "agent_message" => acp::schema::SessionUpdate::AgentMessageChunk(
2539                    acp::schema::ContentChunk::new(ev.payload.into()),
2540                ),
2541                "agent_thought" => acp::schema::SessionUpdate::AgentThoughtChunk(
2542                    acp::schema::ContentChunk::new(ev.payload.into()),
2543                ),
2544                "tool_call" => match serde_json::from_str::<acp::schema::ToolCall>(&ev.payload) {
2545                    Ok(tc) => acp::schema::SessionUpdate::ToolCall(tc),
2546                    Err(e) => {
2547                        tracing::warn!(error = %e, "failed to deserialize tool call event during replay");
2548                        continue;
2549                    }
2550                },
2551                other => {
2552                    tracing::debug!(
2553                        event_type = other,
2554                        "skipping unknown event type during replay"
2555                    );
2556                    continue;
2557                }
2558            };
2559            let notification = acp::schema::SessionNotification::new(session_id.clone(), update);
2560            if let Err(e) = self.send_notification(session_id, notification).await {
2561                tracing::warn!(error = %e, "failed to replay notification");
2562                break;
2563            }
2564        }
2565    }
2566
2567    /// Create a new conversation for `session_id` and persist the mapping.
2568    async fn create_session_conversation(
2569        &self,
2570        session_id: &acp::schema::SessionId,
2571    ) -> Option<ConversationId> {
2572        let store = self.store.as_ref()?;
2573        let sid = session_id.to_string();
2574        match store.create_conversation().await {
2575            Ok(cid) => {
2576                if let Err(e) = store.create_acp_session_with_conversation(&sid, cid).await {
2577                    tracing::warn!(error = %e, "failed to persist ACP session mapping; history may not survive restart");
2578                }
2579                Some(cid)
2580            }
2581            Err(e) => {
2582                tracing::warn!(error = %e, "failed to create conversation for ACP session; session will have no persistent history");
2583                if let Err(e2) = store.create_acp_session(&sid).await {
2584                    tracing::warn!(error = %e2, "failed to persist ACP session");
2585                }
2586                None
2587            }
2588        }
2589    }
2590
2591    /// Fire-and-forget the `AvailableCommandsUpdate` notification for a session.
2592    fn send_commands_update_nowait(&self, session_id: &acp::schema::SessionId) {
2593        let cmds_update = acp::schema::SessionUpdate::AvailableCommandsUpdate(
2594            acp::schema::AvailableCommandsUpdate::new(build_available_commands()),
2595        );
2596        self.send_notification_nowait(
2597            session_id,
2598            acp::schema::SessionNotification::new(session_id.clone(), cmds_update),
2599        );
2600    }
2601
2602    async fn ext_method_mcp(
2603        &self,
2604        args: &acp::schema::ExtRequest,
2605    ) -> acp::Result<acp::schema::ExtResponse> {
2606        let method = args.method.as_ref();
2607        match method {
2608            "_agent/mcp/list" => {
2609                let Some(ref manager) = self.mcp_manager else {
2610                    return Err(acp::Error::internal_error().data("MCP manager not configured"));
2611                };
2612                let servers = manager.list_servers().await;
2613                let json = serde_json::to_string(&servers).map_err(|e| {
2614                    tracing::error!(error = %e, "failed to serialize MCP server list");
2615                    acp::Error::internal_error().data("internal error")
2616                })?;
2617                let raw: Box<serde_json::value::RawValue> =
2618                    serde_json::value::RawValue::from_string(json).map_err(|e| {
2619                        tracing::error!(error = %e, "failed to build MCP list response");
2620                        acp::Error::internal_error().data("internal error")
2621                    })?;
2622                Ok(acp::schema::ExtResponse::new(raw.into()))
2623            }
2624            "_agent/mcp/add" => {
2625                let Some(ref manager) = self.mcp_manager else {
2626                    return Err(acp::Error::internal_error().data("MCP manager not configured"));
2627                };
2628                let entry: ServerEntry = serde_json::from_str(args.params.get())
2629                    .map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
2630                let tools = manager.add_server(&entry).await.map_err(|e| {
2631                    tracing::error!(error = %e, "failed to add MCP server");
2632                    acp::Error::internal_error().data("internal error")
2633                })?;
2634                let json = serde_json::json!({ "added": entry.id, "tools": tools.len() });
2635                let raw =
2636                    serde_json::value::RawValue::from_string(json.to_string()).map_err(|e| {
2637                        tracing::error!(error = %e, "failed to build MCP add response");
2638                        acp::Error::internal_error().data("internal error")
2639                    })?;
2640                Ok(acp::schema::ExtResponse::new(raw.into()))
2641            }
2642            "_agent/mcp/remove" => {
2643                let Some(ref manager) = self.mcp_manager else {
2644                    return Err(acp::Error::internal_error().data("MCP manager not configured"));
2645                };
2646                let params: McpRemoveParams = serde_json::from_str(args.params.get())
2647                    .map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
2648                manager.remove_server(&params.id).await.map_err(|e| {
2649                    tracing::error!(error = %e, "failed to remove MCP server");
2650                    acp::Error::internal_error().data("internal error")
2651                })?;
2652                let raw = serde_json::value::RawValue::from_string(
2653                    serde_json::json!({ "removed": params.id }).to_string(),
2654                )
2655                .map_err(|e| {
2656                    tracing::error!(error = %e, "failed to build MCP remove response");
2657                    acp::Error::internal_error().data("internal error")
2658                })?;
2659                Ok(acp::schema::ExtResponse::new(raw.into()))
2660            }
2661            _ => Ok(acp::schema::ExtResponse::new(
2662                serde_json::value::RawValue::NULL.to_owned().into(),
2663            )),
2664        }
2665    }
2666}
2667
2668pub(super) mod helpers;
2669use helpers::{
2670    DEFAULT_MODE_ID, DIAGNOSTICS_MIME_TYPE, build_available_commands, build_config_options,
2671    build_mode_state, format_diagnostics_block, loopback_event_to_updates, mime_to_ext, model_meta,
2672    session_update_to_event, xml_escape,
2673};
2674
2675pub(crate) mod handlers;
2676
2677/// Run the ACP agent loop over the provided transport until the connection closes.
2678///
2679/// Builds the ACP 0.11 handler chain from `state` and connects it to `transport`.
2680/// All request handlers delegate to the corresponding `do_*` methods on
2681/// [`ZephAcpAgentState`] which carry all session management logic.
2682///
2683/// # Errors
2684///
2685/// Returns an `acp::Error` if the underlying JSON-RPC transport fails.
2686///
2687/// # Examples
2688///
2689/// ```no_run
2690/// use std::sync::Arc;
2691/// use agent_client_protocol as acp;
2692/// use agent_client_protocol::ByteStreams;
2693/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
2694/// use zeph_acp::agent::{ZephAcpAgentState, run_agent};
2695/// use zeph_acp::AgentSpawner;
2696///
2697/// # async fn example(spawner: AgentSpawner) -> acp::Result<()> {
2698/// let state = Arc::new(ZephAcpAgentState::new(spawner, 4, 1800, None));
2699/// run_agent(
2700///     state,
2701///     ByteStreams::new(
2702///         tokio::io::stdout().compat_write(),
2703///         tokio::io::stdin().compat(),
2704///     ),
2705/// ).await
2706/// # }
2707/// ```
2708#[allow(clippy::too_many_lines)]
2709pub async fn run_agent(
2710    state: Arc<ZephAcpAgentState>,
2711    transport: impl acp::ConnectTo<acp::Agent>,
2712) -> acp::Result<()> {
2713    #[cfg(feature = "unstable-session-close")]
2714    use handlers::close_session;
2715    #[cfg(feature = "unstable-session-fork")]
2716    use handlers::fork_session;
2717    #[cfg(feature = "unstable-logout")]
2718    use handlers::logout;
2719    #[cfg(feature = "unstable-session-resume")]
2720    use handlers::resume_session;
2721    #[cfg(feature = "unstable-session-model")]
2722    use handlers::set_session_model;
2723    use handlers::{
2724        authenticate, cancel, dispatch, initialize, list_sessions, load_session, new_session,
2725        prompt, set_session_config_option, set_session_mode,
2726    };
2727
2728    macro_rules! req_handler {
2729        ($handler:path) => {{
2730            let s = Arc::clone(&state);
2731            move |req, responder, cx| {
2732                let s = Arc::clone(&s);
2733                async move { $handler(req, responder, cx, s).await }
2734            }
2735        }};
2736    }
2737
2738    macro_rules! notif_handler {
2739        ($handler:path) => {{
2740            let s = Arc::clone(&state);
2741            move |notif, cx| {
2742                let s = Arc::clone(&s);
2743                async move { $handler(notif, cx, s).await }
2744            }
2745        }};
2746    }
2747
2748    let builder = acp::Agent
2749        .builder()
2750        .on_receive_request(
2751            req_handler!(initialize::handle_initialize),
2752            acp::on_receive_request!(),
2753        )
2754        .on_receive_request(
2755            req_handler!(authenticate::handle_authenticate),
2756            acp::on_receive_request!(),
2757        )
2758        .on_receive_request(
2759            req_handler!(new_session::handle_new_session),
2760            acp::on_receive_request!(),
2761        )
2762        .on_receive_request(
2763            req_handler!(prompt::handle_prompt),
2764            acp::on_receive_request!(),
2765        )
2766        .on_receive_request(
2767            req_handler!(list_sessions::handle_list_sessions),
2768            acp::on_receive_request!(),
2769        )
2770        .on_receive_request(
2771            req_handler!(load_session::handle_load_session),
2772            acp::on_receive_request!(),
2773        )
2774        .on_receive_request(
2775            req_handler!(set_session_config_option::handle_set_session_config_option),
2776            acp::on_receive_request!(),
2777        )
2778        .on_receive_request(
2779            req_handler!(set_session_mode::handle_set_session_mode),
2780            acp::on_receive_request!(),
2781        )
2782        .on_receive_notification(
2783            notif_handler!(cancel::handle_cancel),
2784            acp::on_receive_notification!(),
2785        );
2786
2787    #[cfg(feature = "unstable-session-close")]
2788    let builder = builder.on_receive_request(
2789        req_handler!(close_session::handle_close_session),
2790        acp::on_receive_request!(),
2791    );
2792    #[cfg(feature = "unstable-session-fork")]
2793    let builder = builder.on_receive_request(
2794        req_handler!(fork_session::handle_fork_session),
2795        acp::on_receive_request!(),
2796    );
2797    #[cfg(feature = "unstable-session-resume")]
2798    let builder = builder.on_receive_request(
2799        req_handler!(resume_session::handle_resume_session),
2800        acp::on_receive_request!(),
2801    );
2802    #[cfg(feature = "unstable-session-model")]
2803    let builder = builder.on_receive_request(
2804        req_handler!(set_session_model::handle_set_session_model),
2805        acp::on_receive_request!(),
2806    );
2807    #[cfg(feature = "unstable-logout")]
2808    let builder = builder.on_receive_request(
2809        req_handler!(logout::handle_logout),
2810        acp::on_receive_request!(),
2811    );
2812
2813    builder
2814        .on_receive_dispatch(
2815            {
2816                let s = Arc::clone(&state);
2817                move |msg, cx| {
2818                    let s = Arc::clone(&s);
2819                    async move { dispatch::handle_dispatch(msg, cx, s).await }
2820                }
2821            },
2822            acp::on_receive_dispatch!(),
2823        )
2824        .connect_to(transport)
2825        .await
2826}
2827
2828/// Attach `message_id` to `AgentMessageChunk`, `UserMessageChunk`, and `AgentThoughtChunk`
2829/// updates when a message id is present for this turn.
2830#[cfg(feature = "unstable-message-id")]
2831fn apply_message_id_to_chunk(
2832    update: acp::schema::SessionUpdate,
2833    message_id: Option<&str>,
2834) -> acp::schema::SessionUpdate {
2835    let Some(mid) = message_id else {
2836        return update;
2837    };
2838    match update {
2839        acp::schema::SessionUpdate::AgentMessageChunk(chunk) => {
2840            acp::schema::SessionUpdate::AgentMessageChunk(chunk.message_id(mid.to_owned()))
2841        }
2842        acp::schema::SessionUpdate::UserMessageChunk(chunk) => {
2843            acp::schema::SessionUpdate::UserMessageChunk(chunk.message_id(mid.to_owned()))
2844        }
2845        acp::schema::SessionUpdate::AgentThoughtChunk(chunk) => {
2846            acp::schema::SessionUpdate::AgentThoughtChunk(chunk.message_id(mid.to_owned()))
2847        }
2848        other => other,
2849    }
2850}
2851
2852/// Compile-time assertions that ACP state and executors are `Send + Sync`.
2853const _: () = {
2854    #[allow(clippy::used_underscore_items)]
2855    fn assert_send_sync<T: Send + Sync>() {}
2856    fn check_send_sync() {
2857        assert_send_sync::<ZephAcpAgentState>();
2858        assert_send_sync::<crate::fs::AcpFileExecutor>();
2859        assert_send_sync::<crate::terminal::AcpShellExecutor>();
2860        assert_send_sync::<crate::permission::AcpPermissionGate>();
2861    }
2862    let _ = check_send_sync;
2863};
2864
2865#[cfg(any())] // ACP 0.10 tests disabled — rewrite for 0.11 tracked in #3267
2866mod tests;
2867
2868#[cfg(all(test, feature = "unstable-message-id"))]
2869mod message_id_tests {
2870    use super::*;
2871
2872    fn agent_chunk(text: &str) -> acp::schema::SessionUpdate {
2873        acp::schema::SessionUpdate::AgentMessageChunk(acp::schema::ContentChunk::new(
2874            text.to_owned().into(),
2875        ))
2876    }
2877
2878    fn user_chunk(text: &str) -> acp::schema::SessionUpdate {
2879        acp::schema::SessionUpdate::UserMessageChunk(acp::schema::ContentChunk::new(
2880            text.to_owned().into(),
2881        ))
2882    }
2883
2884    #[test]
2885    fn apply_sets_message_id_on_agent_chunk() {
2886        let update = agent_chunk("hello");
2887        let result = apply_message_id_to_chunk(update, Some("msg-001"));
2888        if let acp::schema::SessionUpdate::AgentMessageChunk(chunk) = result {
2889            assert_eq!(chunk.message_id, Some("msg-001".to_owned()));
2890        } else {
2891            panic!("expected AgentMessageChunk");
2892        }
2893    }
2894
2895    #[test]
2896    fn apply_sets_message_id_on_user_chunk() {
2897        let update = user_chunk("hi");
2898        let result = apply_message_id_to_chunk(update, Some("msg-002"));
2899        if let acp::schema::SessionUpdate::UserMessageChunk(chunk) = result {
2900            assert_eq!(chunk.message_id, Some("msg-002".to_owned()));
2901        } else {
2902            panic!("expected UserMessageChunk");
2903        }
2904    }
2905
2906    #[test]
2907    fn apply_none_message_id_is_noop() {
2908        let update = agent_chunk("hello");
2909        let result = apply_message_id_to_chunk(update, None);
2910        if let acp::schema::SessionUpdate::AgentMessageChunk(chunk) = result {
2911            assert_eq!(chunk.message_id, None);
2912        } else {
2913            panic!("expected AgentMessageChunk");
2914        }
2915    }
2916}