1use std::path::{Component, PathBuf};
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use parking_lot::{Mutex, RwLock};
20
21use agent_client_protocol as acp;
22use futures::StreamExt as _;
23use tokio::sync::{mpsc, oneshot};
24use tokio_util::sync::CancellationToken;
25use zeph_core::channel::{ChannelMessage, LoopbackChannel, LoopbackHandle};
26use zeph_core::text::truncate_to_chars;
27use zeph_core::{LoopbackEvent, StopHint};
28use zeph_llm::any::AnyProvider;
29use zeph_llm::provider::LlmProvider as _;
30use zeph_mcp::McpManager;
31use zeph_mcp::manager::ServerEntry;
32use zeph_memory::ConversationId;
33use zeph_memory::store::SqliteStore;
34
35use tracing::Instrument as _;
36use zeph_tools::is_private_ip;
37
38use crate::fs::AcpFileExecutor;
39use crate::lsp::DiagnosticsCache;
40use crate::permission::AcpPermissionGate;
41use crate::terminal::AcpShellExecutor;
42use crate::transport::SharedAvailableModels;
43
44pub type ProviderFactory = Arc<dyn Fn(&str) -> Option<AnyProvider> + Send + Sync>;
62
63pub struct SessionContext {
73 pub session_id: acp::schema::SessionId,
75 pub conversation_id: Option<ConversationId>,
77 pub working_dir: PathBuf,
79}
80
81const MAX_PROMPT_BYTES: usize = 1_048_576; const MAX_IMAGE_BASE64_BYTES: usize = 20 * 1_048_576; const SUPPORTED_IMAGE_MIMES: &[&str] = &[
85 "image/jpeg",
86 "image/jpg",
87 "image/png",
88 "image/gif",
89 "image/webp",
90];
91const LOOPBACK_CHANNEL_CAPACITY: usize = 64;
92const MAX_RESOURCE_BYTES: usize = 1_048_576; const RESOURCE_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
96
97const BLOCKED_PATH_COMPONENTS: &[&str] = &["proc", "sys", "dev", ".ssh", ".gnupg", ".aws"];
99
100async fn resolve_resource_link(
108 link: &acp::schema::ResourceLink,
109 session_cwd: &std::path::Path,
110) -> Result<String, crate::error::AcpError> {
111 let uri = &link.uri;
112
113 if let Some(path_str) = uri.strip_prefix("file://") {
114 let path = std::path::Path::new(path_str);
116
117 let meta = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::metadata(path))
119 .await
120 .map_err(|_| {
121 crate::error::AcpError::ResourceLink(format!("file:// metadata timed out: {uri}"))
122 })?
123 .map_err(|e| {
124 crate::error::AcpError::ResourceLink(format!("file:// stat failed: {e}"))
125 })?;
126
127 if meta.len() > MAX_RESOURCE_BYTES as u64 {
128 return Err(crate::error::AcpError::ResourceLink(format!(
129 "file:// content exceeds size limit ({MAX_RESOURCE_BYTES} bytes): {uri}"
130 )));
131 }
132
133 let canonical = tokio::fs::canonicalize(path).await.map_err(|e| {
134 crate::error::AcpError::ResourceLink(format!("file:// resolution failed: {e}"))
135 })?;
136
137 if !canonical.starts_with(session_cwd) {
139 return Err(crate::error::AcpError::ResourceLink(format!(
140 "file:// path outside session working directory: {uri}"
141 )));
142 }
143
144 for component in canonical.components() {
146 if let Component::Normal(name) = component {
147 let name_str = name.to_string_lossy();
148 if BLOCKED_PATH_COMPONENTS
149 .iter()
150 .any(|blocked| name_str == *blocked)
151 {
152 return Err(crate::error::AcpError::ResourceLink(format!(
153 "file:// path blocked: {uri}"
154 )));
155 }
156 }
157 }
158
159 let bytes = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::read(&canonical))
160 .await
161 .map_err(|_| {
162 crate::error::AcpError::ResourceLink(format!("file:// read timed out: {uri}"))
163 })?
164 .map_err(|e| {
165 crate::error::AcpError::ResourceLink(format!("file:// read failed: {e}"))
166 })?;
167
168 if bytes.contains(&0u8) {
170 return Err(crate::error::AcpError::ResourceLink(format!(
171 "binary file not supported as ResourceLink content: {uri}"
172 )));
173 }
174
175 String::from_utf8(bytes).map_err(|_| {
176 crate::error::AcpError::ResourceLink(format!(
177 "file:// content is not valid UTF-8: {uri}"
178 ))
179 })
180 } else if uri.starts_with("http://") || uri.starts_with("https://") {
181 let client = reqwest::Client::builder()
183 .redirect(reqwest::redirect::Policy::none())
184 .timeout(RESOURCE_FETCH_TIMEOUT)
185 .build()
186 .map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP client error: {e}")))?;
187
188 let resp = client
189 .get(uri.as_str())
190 .header(reqwest::header::ACCEPT, "text/*")
191 .send()
192 .await
193 .map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP fetch failed: {e}")))?;
194
195 match resp.remote_addr() {
198 None => {
199 return Err(crate::error::AcpError::ResourceLink(format!(
200 "SSRF check failed: remote address unavailable for {uri}"
201 )));
202 }
203 Some(remote_addr) if is_private_ip(remote_addr.ip()) => {
204 return Err(crate::error::AcpError::ResourceLink(format!(
205 "SSRF blocked: {uri} resolved to private address {remote_addr}"
206 )));
207 }
208 Some(_) => {}
209 }
210
211 if !resp.status().is_success() {
212 return Err(crate::error::AcpError::ResourceLink(format!(
213 "HTTP fetch returned {}: {uri}",
214 resp.status()
215 )));
216 }
217
218 let content_type = resp
220 .headers()
221 .get(reqwest::header::CONTENT_TYPE)
222 .and_then(|v| v.to_str().ok())
223 .unwrap_or("");
224 if !content_type.is_empty() && !content_type.starts_with("text/") {
225 return Err(crate::error::AcpError::ResourceLink(format!(
226 "non-text MIME type rejected for ResourceLink: {content_type}"
227 )));
228 }
229
230 let mut body = resp.bytes_stream();
232 let mut buf = Vec::with_capacity(4096);
233 while let Some(chunk) = body.next().await {
234 let chunk = chunk.map_err(|e| {
235 crate::error::AcpError::ResourceLink(format!("HTTP read error: {e}"))
236 })?;
237 if buf.len() + chunk.len() > MAX_RESOURCE_BYTES {
238 buf.extend_from_slice(&chunk[..MAX_RESOURCE_BYTES.saturating_sub(buf.len())]);
239 break;
240 }
241 buf.extend_from_slice(&chunk);
242 }
243
244 String::from_utf8(buf).map_err(|_| {
245 crate::error::AcpError::ResourceLink(format!(
246 "HTTP response body is not valid UTF-8: {uri}"
247 ))
248 })
249 } else {
250 Err(crate::error::AcpError::ResourceLink(format!(
251 "unsupported URI scheme in ResourceLink: {uri}"
252 )))
253 }
254}
255
256pub struct AcpContext {
262 pub file_executor: Option<AcpFileExecutor>,
266 pub shell_executor: Option<AcpShellExecutor>,
270 pub permission_gate: Option<AcpPermissionGate>,
274 pub cancel_signal: std::sync::Arc<tokio::sync::Notify>,
278 pub provider_override: Arc<RwLock<Option<AnyProvider>>>,
282 pub parent_tool_use_id: Option<String>,
286 pub lsp_provider: Option<crate::lsp::AcpLspProvider>,
290 pub diagnostics_cache: Arc<RwLock<DiagnosticsCache>>,
293}
294
295pub type AgentSpawner = Arc<
320 dyn Fn(
321 LoopbackChannel,
322 Option<AcpContext>,
323 SessionContext,
324 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>
325 + Send
326 + Sync
327 + 'static,
328>;
329
330#[cfg(feature = "acp-http")]
336pub type SendAgentSpawner = AgentSpawner;
337
338pub(crate) type NotifySender =
340 mpsc::UnboundedSender<(acp::schema::SessionNotification, oneshot::Sender<()>)>;
341
342pub(crate) type NotifyReceiver =
344 mpsc::UnboundedReceiver<(acp::schema::SessionNotification, oneshot::Sender<()>)>;
345
346pub(crate) struct SessionEntry {
347 pub(crate) input_tx: mpsc::Sender<ChannelMessage>,
348 pub(crate) output_rx: Mutex<Option<mpsc::Receiver<LoopbackEvent>>>,
351 pub(crate) cancel_signal: Arc<tokio::sync::Notify>,
352 pub(crate) last_active_ms: AtomicU64,
354 pub(crate) created_at: chrono::DateTime<chrono::Utc>,
355 pub(crate) working_dir: Mutex<Option<std::path::PathBuf>>,
356 pub(crate) notify_tx: NotifySender,
358 pub(crate) notify_rx: Mutex<Option<NotifyReceiver>>,
361 provider_override: Arc<RwLock<Option<AnyProvider>>>,
363 current_model: Mutex<String>,
365 current_mode: Mutex<acp::schema::SessionModeId>,
367 first_prompt_done: AtomicBool,
369 title: Mutex<Option<String>>,
371 thinking_enabled: AtomicBool,
373 auto_approve_level: Mutex<String>,
375 pub(crate) shell_executor: Option<AcpShellExecutor>,
379 #[cfg(feature = "unstable-message-id")]
385 pub(crate) current_message_id: std::sync::Mutex<Option<String>>,
386}
387
388impl SessionEntry {
389 #[allow(dead_code)]
390 fn last_active(&self) -> std::time::Instant {
391 let ms = self.last_active_ms.load(Ordering::Relaxed);
392 let now_ms = u64::try_from(
393 std::time::SystemTime::now()
394 .duration_since(std::time::UNIX_EPOCH)
395 .unwrap_or_default()
396 .as_millis(),
397 )
398 .unwrap_or(u64::MAX);
399 let elapsed_ms = now_ms.saturating_sub(ms);
400 std::time::Instant::now()
401 .checked_sub(std::time::Duration::from_millis(elapsed_ms))
402 .unwrap_or_else(std::time::Instant::now)
403 }
404
405 fn touch(&self) {
406 let ms = u64::try_from(
407 std::time::SystemTime::now()
408 .duration_since(std::time::UNIX_EPOCH)
409 .unwrap_or_default()
410 .as_millis(),
411 )
412 .unwrap_or(u64::MAX);
413 self.last_active_ms.store(ms, Ordering::Relaxed);
414 }
415}
416
417type SessionMap = Arc<Mutex<std::collections::HashMap<acp::schema::SessionId, SessionEntry>>>;
418
419pub struct ZephAcpAgentState {
424 pub(crate) spawner: AgentSpawner,
425 pub(crate) sessions: SessionMap,
426 pub(crate) agent_name: String,
427 agent_version: String,
428 max_sessions: usize,
429 idle_timeout: std::time::Duration,
430 pub(crate) store: Option<SqliteStore>,
431 permission_file: Option<std::path::PathBuf>,
432 pub(crate) client_caps: RwLock<acp::schema::ClientCapabilities>,
434 pub(crate) provider_factory: Option<ProviderFactory>,
436 available_models: SharedAvailableModels,
438 pub(crate) mcp_manager: Option<Arc<McpManager>>,
440 project_rules: Vec<std::path::PathBuf>,
442 title_max_chars: usize,
444 max_history: usize,
446 pub(crate) lsp_config: zeph_core::config::AcpLspConfig,
448 pub(crate) diagnostics_cache: Arc<RwLock<DiagnosticsCache>>,
450 reaper_cancel: CancellationToken,
452 additional_directories_allow: Vec<std::path::PathBuf>,
454 auth_methods_config: Vec<zeph_core::config::AcpAuthMethod>,
456 message_ids_enabled: bool,
458}
459
460pub type ZephAcpAgent = ZephAcpAgentState;
462
463impl ZephAcpAgentState {
464 pub fn new(
465 spawner: AgentSpawner,
466 max_sessions: usize,
467 session_idle_timeout_secs: u64,
468 permission_file: Option<std::path::PathBuf>,
469 ) -> Self {
470 let lsp_config = zeph_core::config::AcpLspConfig::default();
471 let max_diag_files = lsp_config.max_diagnostic_files;
472 Self {
473 spawner,
474 sessions: Arc::new(Mutex::new(std::collections::HashMap::new())),
475 agent_name: "zeph".to_owned(),
476 agent_version: env!("CARGO_PKG_VERSION").to_owned(),
477 max_sessions,
478 idle_timeout: std::time::Duration::from_secs(session_idle_timeout_secs),
479 store: None,
480 permission_file,
481 client_caps: RwLock::new(acp::schema::ClientCapabilities::default()),
482 provider_factory: None,
483 available_models: Arc::new(RwLock::new(Vec::new())),
484 mcp_manager: None,
485 project_rules: Vec::new(),
486 title_max_chars: 60,
487 max_history: 100,
488 lsp_config,
489 diagnostics_cache: Arc::new(RwLock::new(DiagnosticsCache::new(max_diag_files))),
490 reaper_cancel: CancellationToken::new(),
491 additional_directories_allow: Vec::new(),
492 auth_methods_config: vec![zeph_core::config::AcpAuthMethod::Agent],
493 message_ids_enabled: true,
494 }
495 }
496
497 #[must_use]
499 pub fn with_additional_directories(
500 mut self,
501 dirs: Vec<zeph_core::config::AdditionalDir>,
502 ) -> Self {
503 self.additional_directories_allow = dirs
504 .into_iter()
505 .map(|d| d.as_path().to_path_buf())
506 .collect();
507 self
508 }
509
510 #[must_use]
512 pub fn with_auth_methods(mut self, methods: Vec<zeph_core::config::AcpAuthMethod>) -> Self {
513 self.auth_methods_config = methods;
514 self
515 }
516
517 #[must_use]
519 pub fn with_message_ids_enabled(mut self, enabled: bool) -> Self {
520 self.message_ids_enabled = enabled;
521 self
522 }
523
524 #[must_use]
526 pub fn with_lsp_config(mut self, config: zeph_core::config::AcpLspConfig) -> Self {
527 let max_files = config.max_diagnostic_files;
528 self.lsp_config = config;
529 self.diagnostics_cache = Arc::new(RwLock::new(DiagnosticsCache::new(max_files)));
530 self
531 }
532
533 #[must_use]
534 pub fn with_store(mut self, store: SqliteStore) -> Self {
535 self.store = Some(store);
536 self
537 }
538
539 #[must_use]
540 pub fn with_agent_info(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
541 self.agent_name = name.into();
542 self.agent_version = version.into();
543 self
544 }
545
546 #[must_use]
547 pub fn with_provider_factory(
548 mut self,
549 factory: ProviderFactory,
550 available_models: SharedAvailableModels,
551 ) -> Self {
552 self.provider_factory = Some(factory);
553 self.available_models = available_models;
554 self
555 }
556
557 fn available_models_snapshot(&self) -> Vec<String> {
558 self.available_models.read().clone()
559 }
560
561 fn initial_model(&self) -> String {
562 self.available_models_snapshot()
563 .into_iter()
564 .next()
565 .unwrap_or_default()
566 }
567
568 #[must_use]
569 pub fn with_mcp_manager(mut self, manager: Arc<McpManager>) -> Self {
570 self.mcp_manager = Some(manager);
571 self
572 }
573
574 #[must_use]
575 pub fn with_project_rules(mut self, rules: Vec<std::path::PathBuf>) -> Self {
576 self.project_rules = rules;
577 self
578 }
579
580 #[must_use]
581 pub fn with_title_max_chars(mut self, max_chars: usize) -> Self {
582 self.title_max_chars = max_chars;
583 self
584 }
585
586 #[must_use]
587 pub fn with_max_history(mut self, max_history: usize) -> Self {
588 self.max_history = max_history;
589 self
590 }
591
592 pub fn start_idle_reaper(&self) {
598 let sessions = Arc::clone(&self.sessions);
599 let idle_timeout = self.idle_timeout;
600 let cancel = self.reaper_cancel.clone();
601 let span = tracing::info_span!("acp.session.reap");
602 tokio::spawn(
603 async move {
604 let mut interval = tokio::time::interval(std::time::Duration::from_mins(1));
605 interval.tick().await; loop {
607 tokio::select! {
608 biased;
609 () = cancel.cancelled() => break,
610 _ = interval.tick() => {}
611 }
612 let now_ms = u64::try_from(
613 std::time::SystemTime::now()
614 .duration_since(std::time::UNIX_EPOCH)
615 .unwrap_or_default()
616 .as_millis(),
617 )
618 .unwrap_or(u64::MAX);
619 let idle_timeout_ms =
620 u64::try_from(idle_timeout.as_millis()).unwrap_or(u64::MAX);
621 let expired: Vec<acp::schema::SessionId> = sessions
622 .lock()
623 .iter()
624 .filter(|(_, e)| {
625 let idle_ms =
626 now_ms.saturating_sub(e.last_active_ms.load(Ordering::Relaxed));
627 e.output_rx.lock().is_some() && idle_ms > idle_timeout_ms
628 })
629 .map(|(id, _)| id.clone())
630 .collect();
631 for id in expired {
632 if let Some(entry) = sessions.lock().remove(&id) {
633 entry.cancel_signal.notify_one();
634 tracing::debug!(session_id = %id, "evicted idle ACP session (timeout)");
635 }
636 }
637 }
638 }
639 .instrument(span),
640 );
641 }
642
643 pub fn shutdown(&self) {
645 self.reaper_cancel.cancel();
646 }
647
648 pub(crate) fn build_acp_context(
649 &self,
650 session_id: &acp::schema::SessionId,
651 cx: &acp::ConnectionTo<acp::Client>,
652 cancel_signal: Arc<tokio::sync::Notify>,
653 provider_override: Arc<RwLock<Option<AnyProvider>>>,
654 cwd: PathBuf,
655 ) -> AcpContext {
656 let (can_read, can_write, ide_supports_lsp) = {
658 let caps = self.client_caps.read();
659 let r = caps.fs.read_text_file;
660 let w = caps.fs.write_text_file;
661 let lsp = self.lsp_config.enabled
662 && caps.meta.as_ref().is_some_and(|m| m.contains_key("lsp"));
663 (r, w, lsp)
664 };
665
666 let conn = Arc::new(cx.clone());
667
668 let (perm_gate, perm_handler) =
669 AcpPermissionGate::new(Arc::clone(&conn), self.permission_file.clone());
670 tokio::spawn(perm_handler);
671
672 let (fs_exec, fs_handler) = AcpFileExecutor::new(
673 Arc::clone(&conn),
674 session_id.clone(),
675 can_read,
676 can_write,
677 cwd,
678 Some(perm_gate.clone()),
679 );
680 tokio::spawn(fs_handler);
681
682 let (shell_exec, shell_handler) = AcpShellExecutor::new(
683 Arc::clone(&conn),
684 session_id.clone(),
685 Some(perm_gate.clone()),
686 120,
687 );
688 tokio::spawn(shell_handler);
689
690 let lsp_provider = if ide_supports_lsp {
691 let (provider, lsp_handler) = crate::lsp::AcpLspProvider::new(
692 Arc::clone(&conn),
693 true,
694 self.lsp_config.request_timeout_secs,
695 self.lsp_config.max_references,
696 self.lsp_config.max_workspace_symbols,
697 );
698 tokio::spawn(lsp_handler);
699 Some(provider)
700 } else {
701 None
702 };
703
704 AcpContext {
705 file_executor: Some(fs_exec),
706 shell_executor: Some(shell_exec),
707 permission_gate: Some(perm_gate),
708 cancel_signal,
709 provider_override,
710 parent_tool_use_id: None,
711 lsp_provider,
712 diagnostics_cache: Arc::clone(&self.diagnostics_cache),
713 }
714 }
715
716 pub(crate) async fn send_notification(
717 &self,
718 session_id: &acp::schema::SessionId,
719 notification: acp::schema::SessionNotification,
720 ) -> acp::Result<()> {
721 let tx = self
722 .sessions
723 .lock()
724 .get(session_id)
725 .map(|e| e.notify_tx.clone());
726 let Some(tx) = tx else {
727 return Err(acp::Error::internal_error().data("session not found"));
728 };
729 let (ack_tx, ack_rx) = oneshot::channel();
730 tx.send((notification, ack_tx))
731 .map_err(|_| acp::Error::internal_error().data("notification channel closed"))?;
732 ack_rx
733 .await
734 .map_err(|_| acp::Error::internal_error().data("notification ack lost"))
735 }
736
737 pub(crate) fn send_notification_nowait(
739 &self,
740 session_id: &acp::schema::SessionId,
741 notification: acp::schema::SessionNotification,
742 ) {
743 let tx = self
744 .sessions
745 .lock()
746 .get(session_id)
747 .map(|e| e.notify_tx.clone());
748 if let Some(tx) = tx {
749 let (ack_tx, _) = oneshot::channel();
750 tx.send((notification, ack_tx)).ok();
751 }
752 }
753
754 fn handle_lsp_publish_diagnostics(&self, params: &str) {
755 #[derive(serde::Deserialize)]
756 struct PublishDiagnosticsParams {
757 uri: String,
758 #[serde(default)]
759 diagnostics: Vec<crate::lsp::LspDiagnostic>,
760 }
761
762 match serde_json::from_str::<PublishDiagnosticsParams>(params) {
763 Ok(p) => {
764 let max = self.lsp_config.max_diagnostics_per_file;
765 let mut diags = p.diagnostics;
766 diags.truncate(max);
767 tracing::debug!(
768 uri = %p.uri,
769 count = diags.len(),
770 "lsp/publishDiagnostics: cached"
771 );
772 self.diagnostics_cache.write().update(p.uri, diags);
773 }
774 Err(e) => {
775 tracing::warn!(error = %e, "lsp/publishDiagnostics: failed to parse params");
776 }
777 }
778 }
779
780 #[allow(clippy::unused_async)]
781 async fn handle_lsp_did_save(&self, params: &str, cx: &acp::ConnectionTo<acp::Client>) {
782 #[derive(serde::Deserialize)]
783 struct DidSaveParams {
784 uri: String,
785 }
786
787 if !self.lsp_config.auto_diagnostics_on_save {
788 return;
789 }
790
791 let uri = match serde_json::from_str::<DidSaveParams>(params) {
792 Ok(p) => p.uri,
793 Err(e) => {
794 tracing::warn!(error = %e, "lsp/didSave: failed to parse params");
795 return;
796 }
797 };
798
799 let params_json = serde_json::json!({ "uri": &uri });
800 let raw = match serde_json::value::to_raw_value(¶ms_json) {
801 Ok(r) => r,
802 Err(e) => {
803 tracing::warn!(error = %e, "lsp/didSave: failed to serialize params");
804 return;
805 }
806 };
807 let params_value =
808 serde_json::from_str::<serde_json::Value>(raw.get()).unwrap_or(serde_json::Value::Null);
809 let req = acp::UntypedMessage::new("lsp/diagnostics", params_value).unwrap_or_else(|_| {
810 acp::UntypedMessage {
811 method: "lsp/diagnostics".to_owned(),
812 params: serde_json::Value::Null,
813 }
814 });
815 let timeout = std::time::Duration::from_secs(self.lsp_config.request_timeout_secs);
816 let diagnostics_cache = Arc::clone(&self.diagnostics_cache);
818 let max = self.lsp_config.max_diagnostics_per_file;
819 let cx_inner = cx.clone();
820 let uri_clone = uri.clone();
821 cx.spawn(async move {
822 match tokio::time::timeout(timeout, cx_inner.send_request(req).block_task()).await {
823 Ok(Ok(resp)) => {
824 match serde_json::from_value::<Vec<crate::lsp::LspDiagnostic>>(resp) {
825 Ok(mut diags) => {
826 diags.truncate(max);
827 tracing::debug!(
828 uri = %uri_clone,
829 count = diags.len(),
830 "lsp/didSave: fetched diagnostics"
831 );
832 diagnostics_cache.write().update(uri_clone, diags);
833 }
834 Err(e) => {
835 tracing::warn!(error = %e, "lsp/didSave: failed to parse diagnostics response");
836 }
837 }
838 }
839 Ok(Err(e)) => {
840 tracing::warn!(error = %e, "lsp/didSave: diagnostics request failed");
841 }
842 Err(_) => {
843 tracing::warn!(uri = %uri_clone, "lsp/didSave: diagnostics request timed out");
844 }
845 }
846 Ok(())
847 }).ok();
848 }
849}
850
851#[derive(serde::Deserialize)]
852struct McpRemoveParams {
853 id: String,
854}
855
856async fn resolve_conversation_id(
862 store: &zeph_memory::store::SqliteStore,
863 session_id: &acp::schema::SessionId,
864) -> Option<ConversationId> {
865 match store
866 .get_acp_session_conversation_id(&session_id.to_string())
867 .await
868 {
869 Ok(Some(cid)) => Some(cid),
870 Ok(None) => {
871 match store.create_conversation().await {
873 Ok(cid) => {
874 if let Err(e) = store
875 .set_acp_session_conversation_id(&session_id.to_string(), cid)
876 .await
877 {
878 tracing::warn!(error = %e, "failed to set conversation_id for legacy session");
879 }
880 Some(cid)
881 }
882 Err(e) => {
883 tracing::warn!(error = %e, "failed to create conversation for legacy session; session will have no persistent history");
884 None
885 }
886 }
887 }
888 Err(e) => {
889 tracing::warn!(error = %e, "failed to look up conversation_id; session will have no persistent history");
890 None
891 }
892 }
893}
894
895impl ZephAcpAgentState {
897 #[allow(clippy::unused_async)]
898 #[tracing::instrument(skip_all, name = "acp.handler.initialize")]
899 pub(crate) async fn do_initialize(
900 &self,
901 args: acp::schema::InitializeRequest,
902 ) -> acp::Result<acp::schema::InitializeResponse> {
903 tracing::debug!("ACP initialize");
904 *self.client_caps.write() = args.client_capabilities;
905 let title = format!("{} AI Agent", self.agent_name);
906
907 let mut meta = serde_json::Map::new();
910 meta.insert(
911 "auth_hint".to_owned(),
912 serde_json::json!("authentication required"),
913 );
914
915 let mut caps = acp::schema::AgentCapabilities::new()
916 .load_session(true)
917 .prompt_capabilities(
918 acp::schema::PromptCapabilities::new()
919 .image(true)
920 .embedded_context(true),
921 )
922 .meta({
923 let mut cap_meta = serde_json::Map::new();
924 cap_meta.insert("config_options".to_owned(), serde_json::json!(true));
925 cap_meta.insert("ext_methods".to_owned(), serde_json::json!(true));
926 if self.lsp_config.enabled {
927 cap_meta.insert(
928 "lsp".to_owned(),
929 serde_json::json!({
930 "methods": crate::lsp::LSP_METHODS,
931 "notifications": crate::lsp::LSP_NOTIFICATIONS,
932 }),
933 );
934 }
935 cap_meta
936 });
937 if self.mcp_manager.is_some() {
940 caps = caps.mcp_capabilities(acp::schema::McpCapabilities::new().http(true).sse(false));
941 }
942 #[cfg(any(
943 feature = "unstable-session-close",
944 feature = "unstable-session-fork",
945 feature = "unstable-session-resume",
946 ))]
947 let caps = {
948 let mut session_caps = acp::schema::SessionCapabilities::new();
949 session_caps = session_caps.list(acp::schema::SessionListCapabilities::default());
950 #[cfg(feature = "unstable-session-close")]
951 {
952 session_caps = session_caps.close(acp::schema::SessionCloseCapabilities::default());
953 }
954 #[cfg(feature = "unstable-session-fork")]
955 {
956 session_caps = session_caps.fork(acp::schema::SessionForkCapabilities::default());
957 }
958 #[cfg(feature = "unstable-session-resume")]
959 {
960 session_caps =
961 session_caps.resume(acp::schema::SessionResumeCapabilities::default());
962 }
963 caps.session_capabilities(session_caps)
964 };
965
966 #[cfg(feature = "unstable-logout")]
967 let caps = caps.auth(
968 acp::schema::AgentAuthCapabilities::default()
969 .logout(acp::schema::LogoutCapabilities::default()),
970 );
971
972 let auth_methods: Vec<acp::schema::AuthMethod> = self
973 .auth_methods_config
974 .iter()
975 .map(|m| match m {
976 zeph_core::config::AcpAuthMethod::Agent => acp::schema::AuthMethod::Agent(
977 acp::schema::AuthMethodAgent::new("zeph", "Zeph"),
978 ),
979 })
980 .collect();
981
982 Ok(
983 acp::schema::InitializeResponse::new(acp::schema::ProtocolVersion::LATEST)
984 .auth_methods(auth_methods)
985 .agent_info(
986 acp::schema::Implementation::new(&self.agent_name, &self.agent_version)
987 .title(title),
988 )
989 .agent_capabilities(caps)
990 .meta(meta),
991 )
992 }
993
994 #[tracing::instrument(skip_all, name = "acp.handler.dispatch")]
995 pub(crate) async fn do_ext_method(
996 &self,
997 args: acp::schema::ExtRequest,
998 ) -> acp::Result<acp::schema::ExtResponse> {
999 if let Some(fut) = crate::custom::dispatch(self, &args) {
1000 return fut.await;
1001 }
1002 self.ext_method_mcp(&args).await
1003 }
1004
1005 pub(crate) async fn do_ext_notification(
1006 &self,
1007 args: acp::schema::ExtNotification,
1008 cx: &acp::ConnectionTo<acp::Client>,
1009 ) -> acp::Result<()> {
1010 tracing::debug!(method = %args.method, "received ext_notification");
1011 match args.method.as_ref() {
1012 "lsp/publishDiagnostics" => {
1013 self.handle_lsp_publish_diagnostics(args.params.get());
1014 }
1015 "lsp/didSave" => {
1016 self.handle_lsp_did_save(args.params.get(), cx).await;
1017 }
1018 _ => {}
1019 }
1020 Ok(())
1021 }
1022
1023 #[allow(clippy::unused_async)]
1024 #[tracing::instrument(skip_all, name = "acp.handler.authenticate")]
1025 pub(crate) async fn do_authenticate(
1026 &self,
1027 _args: acp::schema::AuthenticateRequest,
1028 ) -> acp::Result<acp::schema::AuthenticateResponse> {
1029 Ok(acp::schema::AuthenticateResponse::default())
1030 }
1031
1032 #[cfg(feature = "unstable-logout")]
1033 #[allow(clippy::unused_async, dead_code)]
1034 #[tracing::instrument(skip_all, name = "acp.handler.logout")]
1035 pub(crate) async fn do_logout(
1036 &self,
1037 _args: acp::schema::LogoutRequest,
1038 ) -> acp::Result<acp::schema::LogoutResponse> {
1039 tracing::debug!("ACP logout (no-op: vault-based auth)");
1040 Ok(acp::schema::LogoutResponse::default())
1041 }
1042
1043 #[allow(clippy::too_many_lines)]
1044 #[tracing::instrument(skip_all, name = "acp.handler.new_session")]
1045 pub(crate) async fn do_new_session(
1046 &self,
1047 args: acp::schema::NewSessionRequest,
1048 cx: &acp::ConnectionTo<acp::Client>,
1049 ) -> acp::Result<acp::schema::NewSessionResponse> {
1050 #[cfg(feature = "unstable-session-add-dirs")]
1051 self.validate_additional_directories(&args.additional_directories)?;
1052 if self.sessions.lock().len() >= self.max_sessions {
1054 let evict_id = {
1055 let sessions = self.sessions.lock();
1056 sessions
1057 .iter()
1058 .filter(|(_, e)| e.output_rx.lock().is_some())
1059 .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1060 .map(|(id, _)| id.clone())
1061 };
1062 match evict_id {
1063 Some(id) => {
1064 if let Some(entry) = self.sessions.lock().remove(&id) {
1065 entry.cancel_signal.notify_one();
1066 tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1067 }
1068 }
1069 None => {
1070 return Err(acp::Error::internal_error().data("session limit reached"));
1071 }
1072 }
1073 }
1074
1075 let session_id = acp::schema::SessionId::new(uuid::Uuid::new_v4().to_string());
1076 tracing::debug!(%session_id, "new ACP session");
1077
1078 let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1079 let cancel_signal = Arc::clone(&handle.cancel_signal);
1080 let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1081 let provider_override_for_ctx = Arc::clone(&provider_override);
1082
1083 let session_cwd = args.cwd.clone();
1084 let acp_ctx = self.build_acp_context(
1085 &session_id,
1086 cx,
1087 cancel_signal,
1088 provider_override_for_ctx,
1089 session_cwd.clone(),
1090 );
1091 let shell_executor = acp_ctx.shell_executor.clone();
1092 let initial_model = self.initial_model();
1093 let entry = Self::make_session_entry(
1094 handle,
1095 initial_model.clone(),
1096 session_cwd.clone(),
1097 shell_executor,
1098 provider_override,
1099 );
1100
1101 let mut notify_rx = entry
1103 .notify_rx
1104 .lock()
1105 .take()
1106 .expect("notify_rx consumed once");
1107 let cx_drain = cx.clone();
1108 cx.spawn(async move {
1109 while let Some((notif, ack)) = notify_rx.recv().await {
1110 let _enter = tracing::info_span!("acp.session.notify").entered();
1111 if cx_drain.send_notification(notif).is_err() {
1112 tracing::warn!("session_notification send failed; drainer exiting");
1113 break;
1114 }
1115 ack.send(()).ok();
1116 }
1117 Ok(())
1118 })?;
1119
1120 self.sessions.lock().insert(session_id.clone(), entry);
1121
1122 let conversation_id = self.create_session_conversation(&session_id).await;
1123
1124 let session_ctx = SessionContext {
1125 session_id: session_id.clone(),
1126 conversation_id,
1127 working_dir: session_cwd.clone(),
1128 };
1129
1130 let spawner = Arc::clone(&self.spawner);
1131 let span = tracing::info_span!("acp.session.agent_loop", session_id = %session_id);
1132 tokio::task::spawn_local(
1133 async move {
1134 (spawner)(channel, Some(acp_ctx), session_ctx).await;
1135 }
1136 .instrument(span),
1137 );
1138
1139 let available_models = self.available_models_snapshot();
1140 let config_options =
1141 build_config_options(&available_models, &initial_model, false, "suggest");
1142 let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1143 let mut resp = acp::schema::NewSessionResponse::new(session_id.clone())
1144 .modes(build_mode_state(&default_mode_id));
1145 if !config_options.is_empty() {
1146 resp = resp.config_options(config_options);
1147 }
1148 if !self.project_rules.is_empty() {
1149 let rules: Vec<serde_json::Value> = self
1150 .project_rules
1151 .iter()
1152 .filter_map(|p| p.file_name())
1153 .map(|n| serde_json::json!({"name": n.to_string_lossy()}))
1154 .collect();
1155 let mut meta = serde_json::Map::new();
1156 meta.insert("projectRules".to_owned(), serde_json::Value::Array(rules));
1157 resp = resp.meta(meta);
1158 }
1159
1160 self.send_commands_update_nowait(&session_id);
1161
1162 Ok(resp)
1163 }
1164
1165 #[tracing::instrument(skip_all, name = "acp.handler.prompt", fields(session_id = %args.session_id))]
1166 #[allow(clippy::too_many_lines)]
1167 pub(crate) async fn do_prompt(
1168 &self,
1169 args: acp::schema::PromptRequest,
1170 ) -> acp::Result<acp::schema::PromptResponse> {
1171 tracing::debug!(session_id = %args.session_id, "ACP prompt");
1172
1173 #[cfg(feature = "unstable-message-id")]
1176 let turn_message_id: Option<String> = if self.message_ids_enabled {
1177 args.message_id.clone()
1178 } else {
1179 None
1180 };
1181
1182 let session_cwd = self
1184 .sessions
1185 .lock()
1186 .get(&args.session_id)
1187 .and_then(|e| e.working_dir.lock().clone())
1188 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
1189
1190 let (text, attachments) = self
1191 .collect_prompt_content(&args.prompt, &session_cwd)
1192 .await?;
1193
1194 let trimmed_text = text.trim_start();
1195 if trimmed_text.starts_with('/') {
1196 let is_acp_native = trimmed_text == "/help"
1197 || trimmed_text.starts_with("/help ")
1198 || trimmed_text == "/mode"
1199 || trimmed_text.starts_with("/mode ")
1200 || trimmed_text == "/clear"
1201 || trimmed_text.starts_with("/review")
1202 || trimmed_text == "/model"
1203 || trimmed_text.starts_with("/model ");
1204 if is_acp_native {
1205 return self
1206 .handle_slash_command(&args.session_id, trimmed_text)
1207 .await;
1208 }
1209 }
1210
1211 let (input_tx, output_rx) = {
1212 let sessions = self.sessions.lock();
1213 let entry = sessions
1214 .get(&args.session_id)
1215 .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
1216 let rx =
1217 entry.output_rx.lock().take().ok_or_else(|| {
1218 acp::Error::internal_error().data("prompt already in progress")
1219 })?;
1220 entry.touch();
1221 #[cfg(feature = "unstable-message-id")]
1223 if let Some(ref mid) = turn_message_id {
1224 *entry
1225 .current_message_id
1226 .lock()
1227 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(mid.clone());
1228 }
1229 (entry.input_tx.clone(), rx)
1230 };
1231
1232 if let Some(ref store) = self.store {
1234 let sid = args.session_id.to_string();
1235 let payload = text.clone();
1236 let store = store.clone();
1237 tokio::spawn(async move {
1238 if let Err(e) = store.save_acp_event(&sid, "user_message", &payload).await {
1239 tracing::warn!(error = %e, "failed to persist user message");
1240 }
1241 });
1242 }
1243
1244 input_tx
1245 .send(ChannelMessage {
1246 text: text.clone(),
1247 attachments,
1248 })
1249 .await
1250 .map_err(|_| acp::Error::internal_error().data("agent channel closed"))?;
1251
1252 let cancel_signal = self
1254 .sessions
1255 .lock()
1256 .get(&args.session_id)
1257 .map(|e| Arc::clone(&e.cancel_signal));
1258
1259 let (cancelled, stop_hint, rx) = self
1261 .drain_agent_events(&args.session_id, output_rx, cancel_signal)
1262 .await;
1263
1264 if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1266 *entry.output_rx.lock() = Some(rx);
1267 }
1268
1269 let stop_reason = if cancelled {
1270 acp::schema::StopReason::Cancelled
1271 } else {
1272 match stop_hint {
1273 Some(StopHint::MaxTokens) => acp::schema::StopReason::MaxTokens,
1274 Some(StopHint::MaxTurnRequests) => acp::schema::StopReason::MaxTurnRequests,
1275 None => acp::schema::StopReason::EndTurn,
1276 }
1277 };
1278
1279 if !cancelled {
1281 self.maybe_generate_session_title(&args.session_id, &text);
1282 }
1283
1284 #[cfg(feature = "unstable-message-id")]
1286 if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1287 *entry
1288 .current_message_id
1289 .lock()
1290 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1291 }
1292
1293 #[cfg(feature = "unstable-message-id")]
1294 let resp = {
1295 let r = acp::schema::PromptResponse::new(stop_reason);
1296 if let Some(mid) = turn_message_id.as_ref() {
1297 r.user_message_id(mid.clone())
1298 } else {
1299 r
1300 }
1301 };
1302 #[cfg(not(feature = "unstable-message-id"))]
1303 let resp = acp::schema::PromptResponse::new(stop_reason);
1304 Ok(resp)
1305 }
1306
1307 #[allow(clippy::unused_async)]
1308 #[tracing::instrument(skip_all, name = "acp.handler.cancel", fields(session_id = %args.session_id))]
1309 pub(crate) async fn do_cancel(&self, args: acp::schema::CancelNotification) -> acp::Result<()> {
1310 tracing::debug!(session_id = %args.session_id, "ACP cancel");
1311 if let Some(entry) = self.sessions.lock().get(&args.session_id) {
1312 entry.cancel_signal.notify_one();
1313 }
1314 Ok(())
1315 }
1316
1317 #[cfg(feature = "unstable-session-close")]
1318 #[allow(clippy::unused_async, dead_code)]
1319 #[tracing::instrument(skip_all, name = "acp.handler.close_session", fields(session_id = %args.session_id))]
1320 pub(crate) async fn do_close_session(
1321 &self,
1322 args: acp::schema::CloseSessionRequest,
1323 ) -> acp::Result<acp::schema::CloseSessionResponse> {
1324 tracing::debug!(session_id = %args.session_id, "ACP session closed");
1325 if let Some(entry) = self.sessions.lock().remove(&args.session_id) {
1326 entry.cancel_signal.notify_one();
1327 }
1328 Ok(acp::schema::CloseSessionResponse::default())
1329 }
1330
1331 #[tracing::instrument(skip_all, name = "acp.handler.load_session", fields(session_id = %args.session_id))]
1332 pub(crate) async fn do_load_session(
1333 &self,
1334 args: acp::schema::LoadSessionRequest,
1335 cx: &acp::ConnectionTo<acp::Client>,
1336 ) -> acp::Result<acp::schema::LoadSessionResponse> {
1337 #[cfg(feature = "unstable-session-add-dirs")]
1338 self.validate_additional_directories(&args.additional_directories)?;
1339 if self.sessions.lock().contains_key(&args.session_id) {
1340 return Ok(acp::schema::LoadSessionResponse::new());
1341 }
1342
1343 let Some(ref store) = self.store else {
1344 return Err(acp::Error::internal_error().data("session not found"));
1345 };
1346
1347 let exists = store
1348 .acp_session_exists(&args.session_id.to_string())
1349 .await
1350 .map_err(|e| {
1351 tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
1352 acp::Error::internal_error().data("internal error")
1353 })?;
1354
1355 if !exists {
1356 return Err(acp::Error::internal_error().data("session not found"));
1357 }
1358
1359 let events = store
1360 .load_acp_events(&args.session_id.to_string())
1361 .await
1362 .map_err(|e| {
1363 tracing::warn!(error = %e, session_id = %args.session_id, "failed to load ACP session events");
1364 acp::Error::internal_error().data("internal error")
1365 })?;
1366
1367 let session_cwd = args.cwd.clone();
1368 let conversation_id = resolve_conversation_id(store, &args.session_id).await;
1369
1370 let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1371 let cancel_signal = Arc::clone(&handle.cancel_signal);
1372 let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1373 let provider_override_for_ctx = Arc::clone(&provider_override);
1374 let acp_ctx = self.build_acp_context(
1375 &args.session_id,
1376 cx,
1377 cancel_signal,
1378 provider_override_for_ctx,
1379 session_cwd.clone(),
1380 );
1381 let shell_executor = acp_ctx.shell_executor.clone();
1382 let initial_model = self.initial_model();
1383 let entry = Self::make_session_entry(
1384 handle,
1385 initial_model,
1386 session_cwd.clone(),
1387 shell_executor,
1388 provider_override,
1389 );
1390
1391 let mut notify_rx = entry
1393 .notify_rx
1394 .lock()
1395 .take()
1396 .expect("notify_rx consumed once");
1397 let cx_drain = cx.clone();
1398 cx.spawn(async move {
1399 while let Some((notif, ack)) = notify_rx.recv().await {
1400 let _enter = tracing::info_span!("acp.session.notify").entered();
1401 if cx_drain.send_notification(notif).is_err() {
1402 tracing::warn!("session_notification send failed; drainer exiting");
1403 break;
1404 }
1405 ack.send(()).ok();
1406 }
1407 Ok(())
1408 })?;
1409
1410 self.sessions.lock().insert(args.session_id.clone(), entry);
1411
1412 let session_ctx = SessionContext {
1413 session_id: args.session_id.clone(),
1414 conversation_id,
1415 working_dir: session_cwd,
1416 };
1417
1418 let spawner = Arc::clone(&self.spawner);
1419 let span = tracing::info_span!("acp.session.agent_loop", session_id = %args.session_id);
1420 tokio::task::spawn_local(
1421 async move {
1422 (spawner)(channel, Some(acp_ctx), session_ctx).await;
1423 }
1424 .instrument(span),
1425 );
1426
1427 self.replay_session_events(&args.session_id, events).await;
1428
1429 let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1430 let load_resp =
1431 acp::schema::LoadSessionResponse::new().modes(build_mode_state(&default_mode_id));
1432
1433 self.send_commands_update_nowait(&args.session_id);
1434
1435 Ok(load_resp)
1436 }
1437
1438 #[tracing::instrument(skip_all, name = "acp.handler.list_sessions")]
1439 pub(crate) async fn do_list_sessions(
1440 &self,
1441 args: acp::schema::ListSessionsRequest,
1442 ) -> acp::Result<acp::schema::ListSessionsResponse> {
1443 let mut result: std::collections::HashMap<String, acp::schema::SessionInfo> = {
1444 let sessions = self.sessions.lock();
1445 sessions
1446 .iter()
1447 .filter_map(|(session_id, entry)| {
1448 let working_dir = entry.working_dir.lock().clone().unwrap_or_default();
1449 if let Some(ref filter) = args.cwd
1450 && &working_dir != filter
1451 {
1452 return None;
1453 }
1454 let meta = model_meta(&entry.current_model.lock());
1455 let mut info = acp::schema::SessionInfo::new(session_id.clone(), working_dir)
1456 .updated_at(entry.created_at.to_rfc3339())
1457 .meta(meta);
1458 if let Some(ref t) = *entry.title.lock() {
1459 info = info.title(t.clone());
1460 }
1461 Some((session_id.to_string(), info))
1462 })
1463 .collect()
1464 };
1465
1466 if let Some(ref store) = self.store {
1467 match store.list_acp_sessions(self.max_history).await {
1468 Ok(persisted) => {
1469 for persisted_info in persisted {
1470 let sid = acp::schema::SessionId::new(&*persisted_info.id);
1471 if result.contains_key(&persisted_info.id) {
1472 continue;
1473 }
1474 let info = acp::schema::SessionInfo::new(sid, std::path::PathBuf::new())
1475 .title(persisted_info.title)
1476 .updated_at(persisted_info.updated_at);
1477 result.insert(persisted_info.id, info);
1478 }
1479 }
1480 Err(e) => {
1481 tracing::warn!(error = %e, "failed to list persisted ACP sessions");
1482 }
1483 }
1484 }
1485
1486 let mut sessions_vec: Vec<acp::schema::SessionInfo> = result.into_values().collect();
1487 sessions_vec.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1488
1489 Ok(acp::schema::ListSessionsResponse::new(sessions_vec))
1490 }
1491
1492 #[cfg(feature = "unstable-session-fork")]
1493 #[allow(dead_code, clippy::too_many_lines)]
1494 #[tracing::instrument(skip_all, name = "acp.handler.fork_session")]
1495 pub(crate) async fn do_fork_session(
1496 &self,
1497 args: acp::schema::ForkSessionRequest,
1498 cx: &acp::ConnectionTo<acp::Client>,
1499 ) -> acp::Result<acp::schema::ForkSessionResponse> {
1500 #[cfg(feature = "unstable-session-add-dirs")]
1501 self.validate_additional_directories(&args.additional_directories)?;
1502 let in_memory = self.sessions.lock().contains_key(&args.session_id);
1503
1504 if !in_memory {
1505 match self.store.as_ref() {
1506 None => return Err(acp::Error::internal_error().data("session not found")),
1507 Some(s) => {
1508 let exists = s
1509 .acp_session_exists(&args.session_id.to_string())
1510 .await
1511 .map_err(|e| {
1512 tracing::warn!(error = %e, "failed to check ACP session existence");
1513 acp::Error::internal_error().data("internal error")
1514 })?;
1515 if !exists {
1516 return Err(acp::Error::internal_error().data("session not found"));
1517 }
1518 }
1519 }
1520 }
1521
1522 if self.sessions.lock().len() >= self.max_sessions {
1523 let evict_id = {
1524 let sessions = self.sessions.lock();
1525 sessions
1526 .iter()
1527 .filter(|(_, e)| e.output_rx.lock().is_some())
1528 .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1529 .map(|(id, _)| id.clone())
1530 };
1531 match evict_id {
1532 Some(id) => {
1533 if let Some(entry) = self.sessions.lock().remove(&id) {
1534 entry.cancel_signal.notify_one();
1535 tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1536 }
1537 }
1538 None => {
1539 return Err(acp::Error::internal_error().data("session limit reached"));
1540 }
1541 }
1542 }
1543
1544 let new_id = acp::schema::SessionId::new(uuid::Uuid::new_v4().to_string());
1545 tracing::debug!(source = %args.session_id, new = %new_id, "forking ACP session");
1546
1547 let new_conversation_id = self.fork_conversation(&args.session_id, &new_id).await?;
1548
1549 let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1550 let cancel_signal = Arc::clone(&handle.cancel_signal);
1551 let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1552 let provider_override_for_ctx = Arc::clone(&provider_override);
1553 let acp_ctx = self.build_acp_context(
1554 &new_id,
1555 cx,
1556 cancel_signal,
1557 provider_override_for_ctx,
1558 args.cwd.clone(),
1559 );
1560 let shell_executor = acp_ctx.shell_executor.clone();
1561 let initial_model = self.initial_model();
1562 let entry = Self::make_session_entry(
1563 handle,
1564 initial_model.clone(),
1565 args.cwd.clone(),
1566 shell_executor,
1567 provider_override,
1568 );
1569
1570 let mut notify_rx = entry
1571 .notify_rx
1572 .lock()
1573 .take()
1574 .expect("notify_rx consumed once");
1575 let cx_drain = cx.clone();
1576 cx.spawn(async move {
1577 while let Some((notif, ack)) = notify_rx.recv().await {
1578 if cx_drain.send_notification(notif).is_err() {
1579 break;
1580 }
1581 ack.send(()).ok();
1582 }
1583 Ok(())
1584 })?;
1585
1586 self.sessions.lock().insert(new_id.clone(), entry);
1587
1588 let session_ctx = SessionContext {
1589 session_id: new_id.clone(),
1590 conversation_id: new_conversation_id,
1591 working_dir: args.cwd.clone(),
1592 };
1593
1594 let spawner = Arc::clone(&self.spawner);
1595 let span = tracing::info_span!("acp.session.agent_loop", session_id = %new_id);
1596 tokio::task::spawn_local(
1597 async move {
1598 (spawner)(channel, Some(acp_ctx), session_ctx).await;
1599 }
1600 .instrument(span),
1601 );
1602
1603 let available_models = self.available_models_snapshot();
1604 let config_options =
1605 build_config_options(&available_models, &initial_model, false, "suggest");
1606 let default_mode_id = acp::schema::SessionModeId::new(DEFAULT_MODE_ID);
1607 let mut resp =
1608 acp::schema::ForkSessionResponse::new(new_id).modes(build_mode_state(&default_mode_id));
1609 if !config_options.is_empty() {
1610 resp = resp.config_options(config_options);
1611 }
1612 Ok(resp)
1613 }
1614
1615 #[cfg(feature = "unstable-session-resume")]
1616 #[allow(dead_code)]
1617 #[tracing::instrument(skip_all, name = "acp.handler.resume_session")]
1618 pub(crate) async fn do_resume_session(
1619 &self,
1620 args: acp::schema::ResumeSessionRequest,
1621 cx: &acp::ConnectionTo<acp::Client>,
1622 ) -> acp::Result<acp::schema::ResumeSessionResponse> {
1623 #[cfg(feature = "unstable-session-add-dirs")]
1624 self.validate_additional_directories(&args.additional_directories)?;
1625 if self.sessions.lock().contains_key(&args.session_id) {
1626 return Ok(acp::schema::ResumeSessionResponse::new());
1627 }
1628
1629 let Some(ref store) = self.store else {
1630 return Err(acp::Error::internal_error().data("session not found"));
1631 };
1632
1633 let exists = store
1634 .acp_session_exists(&args.session_id.to_string())
1635 .await
1636 .map_err(|e| {
1637 tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
1638 acp::Error::internal_error().data("internal error")
1639 })?;
1640
1641 if !exists {
1642 return Err(acp::Error::internal_error().data("session not found"));
1643 }
1644
1645 if self.sessions.lock().len() >= self.max_sessions {
1646 let evict_id = {
1647 let sessions = self.sessions.lock();
1648 sessions
1649 .iter()
1650 .filter(|(id, e)| *id != &args.session_id && e.output_rx.lock().is_some())
1651 .min_by_key(|(_, e)| e.last_active_ms.load(Ordering::Relaxed))
1652 .map(|(id, _)| id.clone())
1653 };
1654 match evict_id {
1655 Some(id) => {
1656 if let Some(entry) = self.sessions.lock().remove(&id) {
1657 entry.cancel_signal.notify_one();
1658 tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
1659 }
1660 }
1661 None => {
1662 return Err(acp::Error::internal_error().data("session limit reached"));
1663 }
1664 }
1665 }
1666
1667 let conversation_id = resolve_conversation_id(store, &args.session_id).await;
1668
1669 let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
1670 let cancel_signal = Arc::clone(&handle.cancel_signal);
1671 let provider_override: Arc<RwLock<Option<AnyProvider>>> = Arc::new(RwLock::new(None));
1672 let provider_override_for_ctx = Arc::clone(&provider_override);
1673 let acp_ctx = self.build_acp_context(
1674 &args.session_id,
1675 cx,
1676 cancel_signal,
1677 provider_override_for_ctx,
1678 args.cwd.clone(),
1679 );
1680 let shell_executor = acp_ctx.shell_executor.clone();
1681 let initial_model = self.initial_model();
1682 let entry = Self::make_session_entry(
1683 handle,
1684 initial_model,
1685 args.cwd.clone(),
1686 shell_executor,
1687 provider_override,
1688 );
1689
1690 let mut notify_rx = entry
1691 .notify_rx
1692 .lock()
1693 .take()
1694 .expect("notify_rx consumed once");
1695 let cx_drain = cx.clone();
1696 cx.spawn(async move {
1697 while let Some((notif, ack)) = notify_rx.recv().await {
1698 if cx_drain.send_notification(notif).is_err() {
1699 break;
1700 }
1701 ack.send(()).ok();
1702 }
1703 Ok(())
1704 })?;
1705
1706 self.sessions.lock().insert(args.session_id.clone(), entry);
1707
1708 let session_ctx = SessionContext {
1709 session_id: args.session_id.clone(),
1710 conversation_id,
1711 working_dir: args.cwd,
1712 };
1713
1714 let spawner = Arc::clone(&self.spawner);
1715 let span = tracing::info_span!("acp.session.agent_loop", session_id = %args.session_id);
1716 tokio::task::spawn_local(
1717 async move {
1718 (spawner)(channel, Some(acp_ctx), session_ctx).await;
1719 }
1720 .instrument(span),
1721 );
1722
1723 Ok(acp::schema::ResumeSessionResponse::new())
1724 }
1725
1726 #[allow(clippy::unused_async)]
1727 #[tracing::instrument(skip_all, name = "acp.handler.set_session_config_option")]
1728 pub(crate) async fn do_set_session_config_option(
1729 &self,
1730 args: acp::schema::SetSessionConfigOptionRequest,
1731 ) -> acp::Result<acp::schema::SetSessionConfigOptionResponse> {
1732 let config_id = args.config_id.0.clone();
1733 #[cfg(not(feature = "unstable-boolean-config"))]
1734 let value_str: std::sync::Arc<str> = args.value.0.clone();
1735 #[cfg(feature = "unstable-boolean-config")]
1736 let value_str: std::sync::Arc<str> = match &args.value {
1737 acp::schema::SessionConfigOptionValue::ValueId { value } => value.0.clone(),
1738 acp::schema::SessionConfigOptionValue::Boolean { value } => {
1739 if *value { "true" } else { "false" }.into()
1740 }
1741 _ => "".into(),
1742 };
1743 let value: &str = &value_str;
1744
1745 let (current_model, thinking, auto_approve) = {
1746 let sessions = self.sessions.lock();
1747 let entry = sessions
1748 .get(&args.session_id)
1749 .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1750
1751 self.apply_session_config(entry, config_id.as_ref(), value, &args.session_id)?;
1752
1753 (
1754 entry.current_model.lock().clone(),
1755 entry.thinking_enabled.load(Ordering::Relaxed),
1756 entry.auto_approve_level.lock().clone(),
1757 )
1758 };
1759
1760 let config_options = build_config_options(
1761 &self.available_models_snapshot(),
1762 ¤t_model,
1763 thinking,
1764 &auto_approve,
1765 );
1766
1767 let changed_option = config_options.iter().find(|o| o.id.0 == config_id).cloned();
1768
1769 if let Some(option) = changed_option {
1770 let update = acp::schema::SessionUpdate::ConfigOptionUpdate(
1771 acp::schema::ConfigOptionUpdate::new(vec![option]),
1772 );
1773 self.send_notification_nowait(
1774 &args.session_id,
1775 acp::schema::SessionNotification::new(args.session_id.clone(), update),
1776 );
1777
1778 if config_id.as_ref() == "model" {
1779 let info_update = acp::schema::SessionUpdate::SessionInfoUpdate(
1780 acp::schema::SessionInfoUpdate::new().meta(model_meta(¤t_model)),
1781 );
1782 self.send_notification_nowait(
1783 &args.session_id,
1784 acp::schema::SessionNotification::new(args.session_id.clone(), info_update),
1785 );
1786 }
1787 }
1788
1789 Ok(acp::schema::SetSessionConfigOptionResponse::new(
1790 config_options,
1791 ))
1792 }
1793
1794 #[tracing::instrument(skip_all, name = "acp.handler.set_session_mode")]
1795 pub(crate) async fn do_set_session_mode(
1796 &self,
1797 args: acp::schema::SetSessionModeRequest,
1798 ) -> acp::Result<acp::schema::SetSessionModeResponse> {
1799 let valid_ids: &[&str] = &["code", "architect", "ask"];
1800 let mode_str = args.mode_id.0.as_ref();
1801 if !valid_ids.contains(&mode_str) {
1802 return Err(acp::Error::invalid_request().data(format!("unknown mode: {mode_str}")));
1803 }
1804
1805 {
1806 let sessions = self.sessions.lock();
1807 let entry = sessions
1808 .get(&args.session_id)
1809 .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1810 *entry.current_mode.lock() = args.mode_id.clone();
1811 }
1812
1813 tracing::debug!(session_id = %args.session_id, mode = %mode_str, "ACP session mode switched");
1814
1815 let update = acp::schema::SessionUpdate::CurrentModeUpdate(
1816 acp::schema::CurrentModeUpdate::new(args.mode_id.clone()),
1817 );
1818 let notification = acp::schema::SessionNotification::new(args.session_id.clone(), update);
1819 if let Err(e) = self.send_notification(&args.session_id, notification).await {
1820 tracing::warn!(error = %e, "failed to send current_mode_update");
1821 }
1822
1823 Ok(acp::schema::SetSessionModeResponse::new())
1824 }
1825
1826 #[cfg(feature = "unstable-session-add-dirs")]
1832 fn validate_additional_directories(
1833 &self,
1834 requested: &[std::path::PathBuf],
1835 ) -> acp::Result<Vec<std::path::PathBuf>> {
1836 if requested.is_empty() {
1837 return Ok(Vec::new());
1838 }
1839 if self.additional_directories_allow.is_empty() {
1840 return Err(acp::Error::invalid_params()
1841 .data("additional_directories not permitted: allowlist is empty"));
1842 }
1843 let mut out = Vec::with_capacity(requested.len());
1844 for p in requested {
1845 let canon = std::fs::canonicalize(p).map_err(|e| {
1846 acp::Error::invalid_params()
1847 .data(format!("cannot canonicalize {}: {e}", p.display()))
1848 })?;
1849 let allowed = self
1850 .additional_directories_allow
1851 .iter()
1852 .any(|allow| canon.starts_with(allow));
1853 if !allowed {
1854 return Err(acp::Error::invalid_params().data(format!(
1855 "{} is not in the additional_directories allowlist",
1856 canon.display()
1857 )));
1858 }
1859 out.push(canon);
1860 }
1861 Ok(out)
1862 }
1863
1864 #[cfg(feature = "unstable-session-model")]
1865 #[allow(clippy::unused_async, dead_code)]
1866 #[tracing::instrument(skip_all, name = "acp.handler.set_session_model")]
1867 pub(crate) async fn do_set_session_model(
1868 &self,
1869 args: acp::schema::SetSessionModelRequest,
1870 ) -> acp::Result<acp::schema::SetSessionModelResponse> {
1871 let model_id: &str = &args.model_id.0;
1872
1873 let Some(ref factory) = self.provider_factory else {
1874 return Err(acp::Error::internal_error().data("model switching not configured"));
1875 };
1876
1877 if !self
1878 .available_models_snapshot()
1879 .iter()
1880 .any(|m| m == model_id)
1881 {
1882 return Err(acp::Error::invalid_request().data("model not in allowed list"));
1883 }
1884
1885 let Some(new_provider) = factory(model_id) else {
1886 return Err(acp::Error::invalid_request().data("unknown model"));
1887 };
1888
1889 {
1890 let sessions = self.sessions.lock();
1891 let entry = sessions
1892 .get(&args.session_id)
1893 .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
1894 *entry.provider_override.write() = Some(new_provider);
1895 model_id.clone_into(&mut *entry.current_model.lock());
1896 }
1897
1898 tracing::debug!(session_id = %args.session_id, model = %model_id, "ACP session model switched via set_session_model");
1899
1900 let info_update = acp::schema::SessionUpdate::SessionInfoUpdate(
1901 acp::schema::SessionInfoUpdate::new().meta(model_meta(model_id)),
1902 );
1903 self.send_notification_nowait(
1904 &args.session_id,
1905 acp::schema::SessionNotification::new(args.session_id.clone(), info_update),
1906 );
1907
1908 Ok(acp::schema::SetSessionModelResponse::new())
1909 }
1910}
1911
1912impl ZephAcpAgentState {
1913 fn apply_session_config(
1914 &self,
1915 entry: &SessionEntry,
1916 config_id: &str,
1917 value: &str,
1918 session_id: &acp::schema::SessionId,
1919 ) -> acp::Result<()> {
1920 match config_id {
1921 "model" => {
1922 let Some(ref factory) = self.provider_factory else {
1923 return Err(acp::Error::internal_error().data("model switching not configured"));
1924 };
1925 let available_models = self.available_models_snapshot();
1926 if !available_models.iter().any(|m| m == value) {
1927 return Err(acp::Error::invalid_request().data("model not in allowed list"));
1928 }
1929 let Some(new_provider) = factory(value) else {
1930 return Err(acp::Error::invalid_request().data("unknown model"));
1931 };
1932 *entry.provider_override.write() = Some(new_provider);
1933 value.clone_into(&mut *entry.current_model.lock());
1934 tracing::debug!(session_id = %session_id, model = %value, "ACP model switched");
1935 }
1936 "thinking" => {
1937 let enabled = match value {
1938 "on" => true,
1939 "off" => false,
1940 _ => {
1941 return Err(
1942 acp::Error::invalid_request().data("thinking value must be on or off")
1943 );
1944 }
1945 };
1946 entry.thinking_enabled.store(enabled, Ordering::Relaxed);
1947 tracing::debug!(session_id = %session_id, thinking = %enabled, "ACP thinking toggled");
1948 }
1949 "auto_approve" => {
1950 if !["suggest", "auto-edit", "full-auto"].contains(&value) {
1951 return Err(acp::Error::invalid_request()
1952 .data("auto_approve must be suggest, auto-edit, or full-auto"));
1953 }
1954 value.clone_into(&mut *entry.auto_approve_level.lock());
1955 tracing::debug!(session_id = %session_id, auto_approve = %value, "ACP auto-approve level changed");
1956 }
1957 _ => {
1958 return Err(acp::Error::invalid_request().data("unknown config_id"));
1959 }
1960 }
1961 Ok(())
1962 }
1963
1964 async fn handle_slash_command(
1966 &self,
1967 session_id: &acp::schema::SessionId,
1968 text: &str,
1969 ) -> acp::Result<acp::schema::PromptResponse> {
1970 let mut parts = text.splitn(2, ' ');
1971 let cmd = parts.next().unwrap_or("").trim();
1972 let arg = parts.next().unwrap_or("").trim();
1973
1974 let reply = match cmd {
1975 "/help" => "Available commands:\n\
1976 /help — show this message\n\
1977 /model <id> — switch the active model\n\
1978 /mode <code|architect|ask> — switch session mode\n\
1979 /clear — clear session history\n\
1980 /compact — summarize and compact context\n\
1981 /review [path] — review recent changes (read-only)"
1982 .to_owned(),
1983 "/model" => self.handle_model_command(session_id, arg)?,
1984 "/review" => {
1985 return self.handle_review_command(session_id, arg);
1986 }
1987 "/mode" => {
1988 let valid_ids: &[&str] = &["code", "architect", "ask"];
1989 if !valid_ids.contains(&arg) {
1990 return Err(acp::Error::invalid_request().data(format!("unknown mode: {arg}")));
1991 }
1992 {
1993 let sessions = self.sessions.lock();
1994 let entry = sessions
1995 .get(session_id)
1996 .ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
1997 *entry.current_mode.lock() = acp::schema::SessionModeId::new(arg);
1998 }
1999 let update = acp::schema::SessionUpdate::CurrentModeUpdate(
2000 acp::schema::CurrentModeUpdate::new(acp::schema::SessionModeId::new(arg)),
2001 );
2002 let notification =
2003 acp::schema::SessionNotification::new(session_id.clone(), update);
2004 if let Err(e) = self.send_notification(session_id, notification).await {
2005 tracing::warn!(error = %e, "failed to send current_mode_update from /mode");
2006 }
2007 format!("Switched to mode: {arg}")
2008 }
2009 "/clear" => {
2010 if let Some(ref store) = self.store {
2011 let sid = session_id.to_string();
2012 let store = store.clone();
2013 tokio::spawn(async move {
2014 if let Err(e) = store.delete_acp_session(&sid).await {
2015 tracing::warn!(error = %e, "failed to clear session history");
2016 }
2017 if let Err(e) = store.create_acp_session(&sid).await {
2018 tracing::warn!(error = %e, "failed to recreate session after clear");
2019 }
2020 });
2021 }
2022 let tx = self
2024 .sessions
2025 .lock()
2026 .get(session_id)
2027 .map(|e| e.input_tx.clone());
2028 if let Some(tx) = tx {
2029 let _ = tx.try_send(ChannelMessage {
2030 text: "/clear".to_owned(),
2031 attachments: vec![],
2032 });
2033 }
2034 "Session history cleared.".to_owned()
2035 }
2036 _ => {
2037 return Err(acp::Error::invalid_request().data(format!("unknown command: {cmd}")));
2038 }
2039 };
2040
2041 let update = acp::schema::SessionUpdate::AgentMessageChunk(acp::schema::ContentChunk::new(
2042 reply.clone().into(),
2043 ));
2044 let notification = acp::schema::SessionNotification::new(session_id.clone(), update);
2045 if let Err(e) = self.send_notification(session_id, notification).await {
2046 tracing::warn!(error = %e, "failed to send command reply");
2047 }
2048
2049 Ok(acp::schema::PromptResponse::new(
2050 acp::schema::StopReason::EndTurn,
2051 ))
2052 }
2053
2054 fn handle_review_command(
2055 &self,
2056 session_id: &acp::schema::SessionId,
2057 arg: &str,
2058 ) -> acp::Result<acp::schema::PromptResponse> {
2059 if !arg.is_empty() {
2061 let valid = arg
2062 .chars()
2063 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '/' | ' ' | '-'));
2064 if !valid || arg.len() > 512 {
2065 return Err(acp::Error::invalid_request()
2066 .data("invalid path argument: only alphanumeric, _, ., /, space, - allowed (max 512 chars)"));
2067 }
2068 }
2069 let review_prompt = if arg.is_empty() {
2070 "Review the recent changes in this workspace. Show a plain-text diff summary. \
2071 Use only read_file and list_directory tools. Do not execute any commands or \
2072 write any files."
2073 .to_owned()
2074 } else {
2075 format!(
2076 "Review the following file or path: {arg}. Show a plain-text diff summary. \
2077 Use only read_file and list_directory tools. Do not execute any commands or \
2078 write any files."
2079 )
2080 };
2081
2082 let tx = self
2083 .sessions
2084 .lock()
2085 .get(session_id)
2086 .map(|e| e.input_tx.clone());
2087 let Some(tx) = tx else {
2088 return Err(acp::Error::invalid_request().data("session not found"));
2089 };
2090 if tx
2091 .try_send(ChannelMessage {
2092 text: review_prompt,
2093 attachments: vec![],
2094 })
2095 .is_err()
2096 {
2097 tracing::warn!(%session_id, "failed to forward /review to agent input");
2098 }
2099
2100 Ok(acp::schema::PromptResponse::new(
2101 acp::schema::StopReason::EndTurn,
2102 ))
2103 }
2104
2105 fn resolve_model_fuzzy(&self, query: &str) -> acp::Result<String> {
2106 let available_models = self.available_models_snapshot();
2107 if available_models.iter().any(|m| m == query) {
2108 return Ok(query.to_owned());
2109 }
2110 let tokens: Vec<String> = query
2111 .to_lowercase()
2112 .split_whitespace()
2113 .map(String::from)
2114 .collect();
2115 let candidates: Vec<&String> = available_models
2116 .iter()
2117 .filter(|m| {
2118 let lower = m.to_lowercase();
2119 tokens.iter().all(|t| lower.contains(t.as_str()))
2120 })
2121 .collect();
2122 match candidates.len() {
2123 0 => {
2124 let models = available_models.join(", ");
2125 Err(acp::Error::invalid_request()
2126 .data(format!("no matching model found. Available: {models}")))
2127 }
2128 1 => Ok(candidates[0].clone()),
2129 _ => {
2130 let names: Vec<&str> = candidates.iter().map(|s| s.as_str()).collect();
2131 Err(acp::Error::invalid_request()
2132 .data(format!("ambiguous model, candidates: {}", names.join(", "))))
2133 }
2134 }
2135 }
2136
2137 fn handle_model_command(
2138 &self,
2139 session_id: &acp::schema::SessionId,
2140 arg: &str,
2141 ) -> acp::Result<String> {
2142 let available_models = self.available_models_snapshot();
2143 if arg.is_empty() {
2144 let models = available_models.join(", ");
2145 return Ok(format!("Available models: {models}"));
2146 }
2147 let Some(ref factory) = self.provider_factory else {
2148 return Err(acp::Error::internal_error().data("model switching not configured"));
2149 };
2150 let resolved = self.resolve_model_fuzzy(arg)?;
2151 let Some(new_provider) = factory(&resolved) else {
2152 return Err(acp::Error::invalid_request().data("unknown model"));
2153 };
2154 let sessions = self.sessions.lock();
2155 let entry = sessions
2156 .get(session_id)
2157 .ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
2158 *entry.provider_override.write() = Some(new_provider);
2159 resolved.clone_into(&mut *entry.current_model.lock());
2160 Ok(format!("Switched to model: {resolved}"))
2161 }
2162
2163 async fn collect_prompt_content(
2168 &self,
2169 blocks: &[acp::schema::ContentBlock],
2170 session_cwd: &std::path::Path,
2171 ) -> acp::Result<(String, Vec<zeph_core::channel::Attachment>)> {
2172 let mut text = String::new();
2173 let mut attachments = Vec::new();
2174 for block in blocks {
2175 match block {
2176 acp::schema::ContentBlock::Text(t) => {
2177 if !text.is_empty() {
2178 text.push('\n');
2179 }
2180 text.push_str(&t.text);
2181 }
2182 acp::schema::ContentBlock::Image(img) => {
2183 if !SUPPORTED_IMAGE_MIMES.contains(&img.mime_type.as_str()) {
2184 tracing::debug!(mime_type = %img.mime_type, "unsupported image MIME type in ACP prompt, skipping");
2185 } else if img.data.len() > MAX_IMAGE_BASE64_BYTES {
2186 tracing::warn!(
2187 size = img.data.len(),
2188 max = MAX_IMAGE_BASE64_BYTES,
2189 "image base64 data exceeds size limit, skipping"
2190 );
2191 } else {
2192 use base64::Engine as _;
2193 match base64::engine::general_purpose::STANDARD.decode(&img.data) {
2194 Ok(bytes) => {
2195 attachments.push(zeph_core::channel::Attachment {
2196 kind: zeph_core::channel::AttachmentKind::Image,
2197 data: bytes,
2198 filename: Some(format!(
2199 "image.{}",
2200 mime_to_ext(&img.mime_type)
2201 )),
2202 });
2203 }
2204 Err(e) => {
2205 tracing::debug!(error = %e, "failed to decode image base64, skipping");
2206 }
2207 }
2208 }
2209 }
2210 acp::schema::ContentBlock::Resource(embedded) => {
2211 if let acp::schema::EmbeddedResourceResource::TextResourceContents(res) =
2212 &embedded.resource
2213 {
2214 if !text.is_empty() {
2215 text.push('\n');
2216 }
2217 if res
2218 .mime_type
2219 .as_deref()
2220 .is_some_and(|m| m == DIAGNOSTICS_MIME_TYPE)
2221 {
2222 format_diagnostics_block(&res.text, &mut text);
2223 } else if res.mime_type.is_some()
2224 && res.mime_type.as_deref() != Some("text/plain")
2225 {
2226 tracing::debug!(mime_type = ?res.mime_type, uri = %res.uri, "unknown resource mime type — skipping");
2227 } else {
2228 text.push_str("<resource name=\"");
2229 text.push_str(&res.uri.replace('"', """));
2230 text.push_str("\">");
2231 text.push_str(&res.text);
2232 text.push_str("</resource>");
2233 }
2234 }
2235 }
2236 acp::schema::ContentBlock::Audio(_) => {
2237 tracing::warn!("unsupported content block: Audio — skipping");
2238 }
2239 acp::schema::ContentBlock::ResourceLink(link) => {
2240 match resolve_resource_link(link, session_cwd).await {
2241 Ok(content) => {
2242 let escaped_uri = xml_escape(&link.uri);
2244 let escaped_content = xml_escape(&content);
2245 if !text.is_empty() {
2246 text.push('\n');
2247 }
2248 text.push_str("<resource uri=\"");
2249 text.push_str(&escaped_uri);
2250 text.push_str("\">");
2251 text.push_str(&escaped_content);
2252 text.push_str("</resource>");
2253 }
2254 Err(e) => {
2255 tracing::warn!(uri = %link.uri, error = %e, "ResourceLink resolution failed — skipping");
2256 }
2257 }
2258 }
2259 &_ => {
2260 tracing::warn!("unsupported content block: unknown — skipping");
2261 }
2262 }
2263 }
2264 if text.len() > MAX_PROMPT_BYTES {
2265 return Err(acp::Error::invalid_request().data("prompt too large"));
2266 }
2267 Ok((text, attachments))
2268 }
2269
2270 async fn drain_agent_events(
2273 &self,
2274 session_id: &acp::schema::SessionId,
2275 output_rx: tokio::sync::mpsc::Receiver<LoopbackEvent>,
2276 cancel_signal: Option<std::sync::Arc<tokio::sync::Notify>>,
2277 ) -> (
2278 bool,
2279 Option<StopHint>,
2280 tokio::sync::mpsc::Receiver<LoopbackEvent>,
2281 ) {
2282 let mut rx = output_rx;
2283 let mut cancelled = false;
2284 let mut stop_hint: Option<StopHint> = None;
2285 #[cfg(feature = "unstable-message-id")]
2287 let turn_mid: Option<String> = self
2288 .sessions
2289 .lock()
2290 .get(session_id)
2291 .and_then(|e| e.current_message_id.lock().ok().and_then(|g| g.clone()));
2292 loop {
2293 let event = if let Some(ref signal) = cancel_signal {
2294 tokio::select! {
2295 biased;
2296 () = signal.notified() => { cancelled = true; break; }
2297 ev = rx.recv() => ev,
2298 }
2299 } else {
2300 rx.recv().await
2301 };
2302 let Some(event) = event else { break };
2303 if let LoopbackEvent::Stop(hint) = event {
2304 stop_hint = Some(hint);
2305 continue;
2306 }
2307 let is_flush = matches!(event, LoopbackEvent::Flush);
2308 let pending_terminal_release = if let LoopbackEvent::ToolOutput(ref data) = event {
2310 data.terminal_id.clone()
2311 } else {
2312 None
2313 };
2314 for update in loopback_event_to_updates(event) {
2315 if let Some(ref store) = self.store {
2316 let sid = session_id.to_string();
2317 let (event_type, payload) = session_update_to_event(&update);
2318 let store = store.clone();
2319 tokio::spawn(async move {
2320 if let Err(e) = store.save_acp_event(&sid, event_type, &payload).await {
2321 tracing::warn!(error = %e, "failed to persist session event");
2322 }
2323 });
2324 }
2325 #[cfg(feature = "unstable-message-id")]
2326 let update = apply_message_id_to_chunk(update, turn_mid.as_deref());
2327 #[cfg(not(feature = "unstable-message-id"))]
2328 let update = update;
2329 let notification =
2330 acp::schema::SessionNotification::new(session_id.clone(), update);
2331 if let Err(e) = self.send_notification(session_id, notification).await {
2332 tracing::warn!(error = %e, "failed to send notification");
2333 break;
2334 }
2335 }
2336 if let Some(terminal_id) = pending_terminal_release {
2338 let executor = self
2339 .sessions
2340 .lock()
2341 .get(session_id)
2342 .and_then(|e| e.shell_executor.clone());
2343 if let Some(executor) = executor {
2344 executor.release_terminal(terminal_id);
2345 }
2346 }
2347 if is_flush {
2348 break;
2349 }
2350 }
2351 (cancelled, stop_hint, rx)
2352 }
2353
2354 #[allow(dead_code)]
2360 async fn fork_conversation(
2361 &self,
2362 source_id: &acp::schema::SessionId,
2363 new_id: &acp::schema::SessionId,
2364 ) -> acp::Result<Option<ConversationId>> {
2365 let Some(s) = &self.store else {
2366 return Ok(None);
2367 };
2368 let source_events = s
2369 .load_acp_events(&source_id.to_string())
2370 .await
2371 .map_err(|e| {
2372 tracing::warn!(error = %e, "failed to load ACP session events for fork");
2373 acp::Error::internal_error().data("internal error")
2374 })?;
2375
2376 let new_id_str = new_id.to_string();
2377 let pairs: Vec<(&str, &str)> = source_events
2378 .iter()
2379 .map(|ev| (ev.event_type.as_str(), ev.payload.as_str()))
2380 .collect();
2381
2382 match s.create_conversation().await {
2383 Ok(forked_cid) => {
2384 let forked_from_cid = s
2385 .get_acp_session_conversation_id(&source_id.to_string())
2386 .await
2387 .unwrap_or(None);
2388 if let Err(e) = s
2389 .create_acp_session_with_conversation(&new_id_str, forked_cid)
2390 .await
2391 {
2392 tracing::warn!(error = %e, "failed to persist forked ACP session mapping");
2393 }
2394 if let Err(e) = s.import_acp_events(&new_id_str, &pairs).await {
2395 tracing::warn!(error = %e, "failed to import events for forked session");
2396 }
2397 if let Some(src_cid) = forked_from_cid
2398 && let Err(e) = s.copy_conversation(src_cid, forked_cid).await
2399 {
2400 tracing::warn!(error = %e, "failed to copy conversation for forked session");
2401 }
2402 Ok(Some(forked_cid))
2403 }
2404 Err(e) => {
2405 tracing::warn!(error = %e, "failed to create conversation for forked session; history will not be copied");
2406 if let Err(e2) = s.create_acp_session(&new_id_str).await {
2407 tracing::warn!(error = %e2, "failed to persist forked ACP session");
2408 }
2409 if let Err(e2) = s.import_acp_events(&new_id_str, &pairs).await {
2410 tracing::warn!(error = %e2, "failed to import events for forked session");
2411 }
2412 Ok(None)
2413 }
2414 }
2415 }
2416
2417 fn maybe_generate_session_title(&self, session_id: &acp::schema::SessionId, user_text: &str) {
2419 let (should_generate, current_model, notify_tx) = {
2420 let sessions = self.sessions.lock();
2421 let Some(entry) = sessions.get(session_id) else {
2422 return;
2423 };
2424 let already_done = entry.first_prompt_done.load(Ordering::Relaxed);
2425 if already_done {
2426 return;
2427 }
2428 entry.first_prompt_done.store(true, Ordering::Relaxed);
2429 let model = entry.current_model.lock().clone();
2430 let tx = entry.notify_tx.clone();
2431 (true, model, tx)
2432 };
2433 if !should_generate {
2434 return;
2435 }
2436 if let Some(ref factory) = self.provider_factory
2437 && !current_model.is_empty()
2438 && let Some(provider) = factory(¤t_model)
2439 {
2440 let user_text = user_text.to_owned();
2441 let sid = session_id.clone();
2442 let store = self.store.clone();
2443 let title_max_chars = self.title_max_chars;
2444 let sessions = Arc::clone(&self.sessions);
2445 tokio::spawn(async move {
2446 let prompt = format!(
2447 "Generate a concise 5-7 word title for a conversation that starts \
2448 with: {user_text}\nRespond with only the title, no quotes."
2449 );
2450 let messages = vec![zeph_llm::provider::Message::from_legacy(
2451 zeph_llm::provider::Role::User,
2452 &prompt,
2453 )];
2454 let sid_str = sid.to_string();
2455 let sid_prefix = &sid_str[..8.min(sid_str.len())];
2456 let fallback_title = format!("Session {sid_prefix}");
2457 let title = match tokio::time::timeout(
2458 std::time::Duration::from_secs(15),
2459 provider.chat(&messages),
2460 )
2461 .await
2462 {
2463 Ok(Ok(t)) => truncate_to_chars(t.trim(), title_max_chars),
2464 Ok(Err(e)) => {
2465 tracing::debug!(error = %e, "title generation LLM call failed");
2466 fallback_title
2467 }
2468 Err(_) => {
2469 tracing::debug!("title generation timed out");
2470 fallback_title
2471 }
2472 };
2473 if let Some(ref store) = store {
2474 let _ = store.update_session_title(&sid.to_string(), &title).await;
2475 }
2476 if let Some(entry) = sessions.lock().get(&sid) {
2477 *entry.title.lock() = Some(title.clone());
2478 }
2479 let update = acp::schema::SessionUpdate::SessionInfoUpdate(
2480 acp::schema::SessionInfoUpdate::new().title(title),
2481 );
2482 let notification = acp::schema::SessionNotification::new(sid, update);
2483 let (tx, _rx) = oneshot::channel();
2484 notify_tx.send((notification, tx)).ok();
2485 });
2486 }
2487 }
2488
2489 fn make_session_entry(
2491 handle: LoopbackHandle,
2492 initial_model: String,
2493 cwd: PathBuf,
2494 shell_executor: Option<AcpShellExecutor>,
2495 provider_override: Arc<RwLock<Option<AnyProvider>>>,
2496 ) -> SessionEntry {
2497 let (notify_tx, notify_rx) = mpsc::unbounded_channel();
2498 let now_ms = u64::try_from(
2499 std::time::SystemTime::now()
2500 .duration_since(std::time::UNIX_EPOCH)
2501 .unwrap_or_default()
2502 .as_millis(),
2503 )
2504 .unwrap_or(u64::MAX);
2505 SessionEntry {
2506 input_tx: handle.input_tx,
2507 output_rx: Mutex::new(Some(handle.output_rx)),
2508 cancel_signal: handle.cancel_signal,
2509 last_active_ms: AtomicU64::new(now_ms),
2510 created_at: chrono::Utc::now(),
2511 working_dir: Mutex::new(Some(cwd)),
2512 notify_tx,
2513 notify_rx: Mutex::new(Some(notify_rx)),
2514 provider_override,
2515 current_model: Mutex::new(initial_model),
2516 current_mode: Mutex::new(acp::schema::SessionModeId::new(DEFAULT_MODE_ID)),
2517 first_prompt_done: AtomicBool::new(false),
2518 title: Mutex::new(None),
2519 thinking_enabled: AtomicBool::new(false),
2520 auto_approve_level: Mutex::new("suggest".to_owned()),
2521 shell_executor,
2522 #[cfg(feature = "unstable-message-id")]
2523 current_message_id: std::sync::Mutex::new(None),
2524 }
2525 }
2526
2527 async fn replay_session_events(
2529 &self,
2530 session_id: &acp::schema::SessionId,
2531 events: Vec<zeph_memory::store::AcpSessionEvent>,
2532 ) {
2533 for ev in events {
2534 let update = match ev.event_type.as_str() {
2535 "user_message" => acp::schema::SessionUpdate::UserMessageChunk(
2536 acp::schema::ContentChunk::new(ev.payload.into()),
2537 ),
2538 "agent_message" => acp::schema::SessionUpdate::AgentMessageChunk(
2539 acp::schema::ContentChunk::new(ev.payload.into()),
2540 ),
2541 "agent_thought" => acp::schema::SessionUpdate::AgentThoughtChunk(
2542 acp::schema::ContentChunk::new(ev.payload.into()),
2543 ),
2544 "tool_call" => match serde_json::from_str::<acp::schema::ToolCall>(&ev.payload) {
2545 Ok(tc) => acp::schema::SessionUpdate::ToolCall(tc),
2546 Err(e) => {
2547 tracing::warn!(error = %e, "failed to deserialize tool call event during replay");
2548 continue;
2549 }
2550 },
2551 other => {
2552 tracing::debug!(
2553 event_type = other,
2554 "skipping unknown event type during replay"
2555 );
2556 continue;
2557 }
2558 };
2559 let notification = acp::schema::SessionNotification::new(session_id.clone(), update);
2560 if let Err(e) = self.send_notification(session_id, notification).await {
2561 tracing::warn!(error = %e, "failed to replay notification");
2562 break;
2563 }
2564 }
2565 }
2566
2567 async fn create_session_conversation(
2569 &self,
2570 session_id: &acp::schema::SessionId,
2571 ) -> Option<ConversationId> {
2572 let store = self.store.as_ref()?;
2573 let sid = session_id.to_string();
2574 match store.create_conversation().await {
2575 Ok(cid) => {
2576 if let Err(e) = store.create_acp_session_with_conversation(&sid, cid).await {
2577 tracing::warn!(error = %e, "failed to persist ACP session mapping; history may not survive restart");
2578 }
2579 Some(cid)
2580 }
2581 Err(e) => {
2582 tracing::warn!(error = %e, "failed to create conversation for ACP session; session will have no persistent history");
2583 if let Err(e2) = store.create_acp_session(&sid).await {
2584 tracing::warn!(error = %e2, "failed to persist ACP session");
2585 }
2586 None
2587 }
2588 }
2589 }
2590
2591 fn send_commands_update_nowait(&self, session_id: &acp::schema::SessionId) {
2593 let cmds_update = acp::schema::SessionUpdate::AvailableCommandsUpdate(
2594 acp::schema::AvailableCommandsUpdate::new(build_available_commands()),
2595 );
2596 self.send_notification_nowait(
2597 session_id,
2598 acp::schema::SessionNotification::new(session_id.clone(), cmds_update),
2599 );
2600 }
2601
2602 async fn ext_method_mcp(
2603 &self,
2604 args: &acp::schema::ExtRequest,
2605 ) -> acp::Result<acp::schema::ExtResponse> {
2606 let method = args.method.as_ref();
2607 match method {
2608 "_agent/mcp/list" => {
2609 let Some(ref manager) = self.mcp_manager else {
2610 return Err(acp::Error::internal_error().data("MCP manager not configured"));
2611 };
2612 let servers = manager.list_servers().await;
2613 let json = serde_json::to_string(&servers).map_err(|e| {
2614 tracing::error!(error = %e, "failed to serialize MCP server list");
2615 acp::Error::internal_error().data("internal error")
2616 })?;
2617 let raw: Box<serde_json::value::RawValue> =
2618 serde_json::value::RawValue::from_string(json).map_err(|e| {
2619 tracing::error!(error = %e, "failed to build MCP list response");
2620 acp::Error::internal_error().data("internal error")
2621 })?;
2622 Ok(acp::schema::ExtResponse::new(raw.into()))
2623 }
2624 "_agent/mcp/add" => {
2625 let Some(ref manager) = self.mcp_manager else {
2626 return Err(acp::Error::internal_error().data("MCP manager not configured"));
2627 };
2628 let entry: ServerEntry = serde_json::from_str(args.params.get())
2629 .map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
2630 let tools = manager.add_server(&entry).await.map_err(|e| {
2631 tracing::error!(error = %e, "failed to add MCP server");
2632 acp::Error::internal_error().data("internal error")
2633 })?;
2634 let json = serde_json::json!({ "added": entry.id, "tools": tools.len() });
2635 let raw =
2636 serde_json::value::RawValue::from_string(json.to_string()).map_err(|e| {
2637 tracing::error!(error = %e, "failed to build MCP add response");
2638 acp::Error::internal_error().data("internal error")
2639 })?;
2640 Ok(acp::schema::ExtResponse::new(raw.into()))
2641 }
2642 "_agent/mcp/remove" => {
2643 let Some(ref manager) = self.mcp_manager else {
2644 return Err(acp::Error::internal_error().data("MCP manager not configured"));
2645 };
2646 let params: McpRemoveParams = serde_json::from_str(args.params.get())
2647 .map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
2648 manager.remove_server(¶ms.id).await.map_err(|e| {
2649 tracing::error!(error = %e, "failed to remove MCP server");
2650 acp::Error::internal_error().data("internal error")
2651 })?;
2652 let raw = serde_json::value::RawValue::from_string(
2653 serde_json::json!({ "removed": params.id }).to_string(),
2654 )
2655 .map_err(|e| {
2656 tracing::error!(error = %e, "failed to build MCP remove response");
2657 acp::Error::internal_error().data("internal error")
2658 })?;
2659 Ok(acp::schema::ExtResponse::new(raw.into()))
2660 }
2661 _ => Ok(acp::schema::ExtResponse::new(
2662 serde_json::value::RawValue::NULL.to_owned().into(),
2663 )),
2664 }
2665 }
2666}
2667
2668pub(super) mod helpers;
2669use helpers::{
2670 DEFAULT_MODE_ID, DIAGNOSTICS_MIME_TYPE, build_available_commands, build_config_options,
2671 build_mode_state, format_diagnostics_block, loopback_event_to_updates, mime_to_ext, model_meta,
2672 session_update_to_event, xml_escape,
2673};
2674
2675pub(crate) mod handlers;
2676
2677#[allow(clippy::too_many_lines)]
2709pub async fn run_agent(
2710 state: Arc<ZephAcpAgentState>,
2711 transport: impl acp::ConnectTo<acp::Agent>,
2712) -> acp::Result<()> {
2713 #[cfg(feature = "unstable-session-close")]
2714 use handlers::close_session;
2715 #[cfg(feature = "unstable-session-fork")]
2716 use handlers::fork_session;
2717 #[cfg(feature = "unstable-logout")]
2718 use handlers::logout;
2719 #[cfg(feature = "unstable-session-resume")]
2720 use handlers::resume_session;
2721 #[cfg(feature = "unstable-session-model")]
2722 use handlers::set_session_model;
2723 use handlers::{
2724 authenticate, cancel, dispatch, initialize, list_sessions, load_session, new_session,
2725 prompt, set_session_config_option, set_session_mode,
2726 };
2727
2728 macro_rules! req_handler {
2729 ($handler:path) => {{
2730 let s = Arc::clone(&state);
2731 move |req, responder, cx| {
2732 let s = Arc::clone(&s);
2733 async move { $handler(req, responder, cx, s).await }
2734 }
2735 }};
2736 }
2737
2738 macro_rules! notif_handler {
2739 ($handler:path) => {{
2740 let s = Arc::clone(&state);
2741 move |notif, cx| {
2742 let s = Arc::clone(&s);
2743 async move { $handler(notif, cx, s).await }
2744 }
2745 }};
2746 }
2747
2748 let builder = acp::Agent
2749 .builder()
2750 .on_receive_request(
2751 req_handler!(initialize::handle_initialize),
2752 acp::on_receive_request!(),
2753 )
2754 .on_receive_request(
2755 req_handler!(authenticate::handle_authenticate),
2756 acp::on_receive_request!(),
2757 )
2758 .on_receive_request(
2759 req_handler!(new_session::handle_new_session),
2760 acp::on_receive_request!(),
2761 )
2762 .on_receive_request(
2763 req_handler!(prompt::handle_prompt),
2764 acp::on_receive_request!(),
2765 )
2766 .on_receive_request(
2767 req_handler!(list_sessions::handle_list_sessions),
2768 acp::on_receive_request!(),
2769 )
2770 .on_receive_request(
2771 req_handler!(load_session::handle_load_session),
2772 acp::on_receive_request!(),
2773 )
2774 .on_receive_request(
2775 req_handler!(set_session_config_option::handle_set_session_config_option),
2776 acp::on_receive_request!(),
2777 )
2778 .on_receive_request(
2779 req_handler!(set_session_mode::handle_set_session_mode),
2780 acp::on_receive_request!(),
2781 )
2782 .on_receive_notification(
2783 notif_handler!(cancel::handle_cancel),
2784 acp::on_receive_notification!(),
2785 );
2786
2787 #[cfg(feature = "unstable-session-close")]
2788 let builder = builder.on_receive_request(
2789 req_handler!(close_session::handle_close_session),
2790 acp::on_receive_request!(),
2791 );
2792 #[cfg(feature = "unstable-session-fork")]
2793 let builder = builder.on_receive_request(
2794 req_handler!(fork_session::handle_fork_session),
2795 acp::on_receive_request!(),
2796 );
2797 #[cfg(feature = "unstable-session-resume")]
2798 let builder = builder.on_receive_request(
2799 req_handler!(resume_session::handle_resume_session),
2800 acp::on_receive_request!(),
2801 );
2802 #[cfg(feature = "unstable-session-model")]
2803 let builder = builder.on_receive_request(
2804 req_handler!(set_session_model::handle_set_session_model),
2805 acp::on_receive_request!(),
2806 );
2807 #[cfg(feature = "unstable-logout")]
2808 let builder = builder.on_receive_request(
2809 req_handler!(logout::handle_logout),
2810 acp::on_receive_request!(),
2811 );
2812
2813 builder
2814 .on_receive_dispatch(
2815 {
2816 let s = Arc::clone(&state);
2817 move |msg, cx| {
2818 let s = Arc::clone(&s);
2819 async move { dispatch::handle_dispatch(msg, cx, s).await }
2820 }
2821 },
2822 acp::on_receive_dispatch!(),
2823 )
2824 .connect_to(transport)
2825 .await
2826}
2827
2828#[cfg(feature = "unstable-message-id")]
2831fn apply_message_id_to_chunk(
2832 update: acp::schema::SessionUpdate,
2833 message_id: Option<&str>,
2834) -> acp::schema::SessionUpdate {
2835 let Some(mid) = message_id else {
2836 return update;
2837 };
2838 match update {
2839 acp::schema::SessionUpdate::AgentMessageChunk(chunk) => {
2840 acp::schema::SessionUpdate::AgentMessageChunk(chunk.message_id(mid.to_owned()))
2841 }
2842 acp::schema::SessionUpdate::UserMessageChunk(chunk) => {
2843 acp::schema::SessionUpdate::UserMessageChunk(chunk.message_id(mid.to_owned()))
2844 }
2845 acp::schema::SessionUpdate::AgentThoughtChunk(chunk) => {
2846 acp::schema::SessionUpdate::AgentThoughtChunk(chunk.message_id(mid.to_owned()))
2847 }
2848 other => other,
2849 }
2850}
2851
2852const _: () = {
2854 #[allow(clippy::used_underscore_items)]
2855 fn assert_send_sync<T: Send + Sync>() {}
2856 fn check_send_sync() {
2857 assert_send_sync::<ZephAcpAgentState>();
2858 assert_send_sync::<crate::fs::AcpFileExecutor>();
2859 assert_send_sync::<crate::terminal::AcpShellExecutor>();
2860 assert_send_sync::<crate::permission::AcpPermissionGate>();
2861 }
2862 let _ = check_send_sync;
2863};
2864
2865#[cfg(any())] mod tests;
2867
2868#[cfg(all(test, feature = "unstable-message-id"))]
2869mod message_id_tests {
2870 use super::*;
2871
2872 fn agent_chunk(text: &str) -> acp::schema::SessionUpdate {
2873 acp::schema::SessionUpdate::AgentMessageChunk(acp::schema::ContentChunk::new(
2874 text.to_owned().into(),
2875 ))
2876 }
2877
2878 fn user_chunk(text: &str) -> acp::schema::SessionUpdate {
2879 acp::schema::SessionUpdate::UserMessageChunk(acp::schema::ContentChunk::new(
2880 text.to_owned().into(),
2881 ))
2882 }
2883
2884 #[test]
2885 fn apply_sets_message_id_on_agent_chunk() {
2886 let update = agent_chunk("hello");
2887 let result = apply_message_id_to_chunk(update, Some("msg-001"));
2888 if let acp::schema::SessionUpdate::AgentMessageChunk(chunk) = result {
2889 assert_eq!(chunk.message_id, Some("msg-001".to_owned()));
2890 } else {
2891 panic!("expected AgentMessageChunk");
2892 }
2893 }
2894
2895 #[test]
2896 fn apply_sets_message_id_on_user_chunk() {
2897 let update = user_chunk("hi");
2898 let result = apply_message_id_to_chunk(update, Some("msg-002"));
2899 if let acp::schema::SessionUpdate::UserMessageChunk(chunk) = result {
2900 assert_eq!(chunk.message_id, Some("msg-002".to_owned()));
2901 } else {
2902 panic!("expected UserMessageChunk");
2903 }
2904 }
2905
2906 #[test]
2907 fn apply_none_message_id_is_noop() {
2908 let update = agent_chunk("hello");
2909 let result = apply_message_id_to_chunk(update, None);
2910 if let acp::schema::SessionUpdate::AgentMessageChunk(chunk) = result {
2911 assert_eq!(chunk.message_id, None);
2912 } else {
2913 panic!("expected AgentMessageChunk");
2914 }
2915 }
2916}