use std::cell::RefCell;
use std::path::{Component, PathBuf};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use agent_client_protocol as acp;
use futures::StreamExt as _;
use tokio::sync::{mpsc, oneshot};
use zeph_core::channel::{ChannelMessage, LoopbackChannel, LoopbackHandle};
use zeph_core::text::truncate_to_chars;
use zeph_core::{LoopbackEvent, StopHint};
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::LlmProvider as _;
use zeph_mcp::McpManager;
use zeph_mcp::manager::ServerEntry;
use zeph_memory::ConversationId;
use zeph_memory::store::SqliteStore;
use zeph_tools::is_private_ip;
use crate::fs::AcpFileExecutor;
use crate::lsp::DiagnosticsCache;
use crate::permission::AcpPermissionGate;
use crate::terminal::AcpShellExecutor;
use crate::transport::{ConnSlot, SharedAvailableModels};
pub type ProviderFactory = Arc<dyn Fn(&str) -> Option<AnyProvider> + Send + Sync>;
pub struct SessionContext {
pub session_id: acp::SessionId,
pub conversation_id: Option<ConversationId>,
pub working_dir: PathBuf,
}
const MAX_PROMPT_BYTES: usize = 1_048_576; const MAX_IMAGE_BASE64_BYTES: usize = 20 * 1_048_576;
const SUPPORTED_IMAGE_MIMES: &[&str] = &[
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
];
const LOOPBACK_CHANNEL_CAPACITY: usize = 64;
const MAX_RESOURCE_BYTES: usize = 1_048_576; const RESOURCE_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const BLOCKED_PATH_COMPONENTS: &[&str] = &["proc", "sys", "dev", ".ssh", ".gnupg", ".aws"];
async fn resolve_resource_link(
link: &acp::ResourceLink,
session_cwd: &std::path::Path,
) -> Result<String, crate::error::AcpError> {
let uri = &link.uri;
if let Some(path_str) = uri.strip_prefix("file://") {
let path = std::path::Path::new(path_str);
let meta = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::metadata(path))
.await
.map_err(|_| {
crate::error::AcpError::ResourceLink(format!("file:// metadata timed out: {uri}"))
})?
.map_err(|e| {
crate::error::AcpError::ResourceLink(format!("file:// stat failed: {e}"))
})?;
if meta.len() > MAX_RESOURCE_BYTES as u64 {
return Err(crate::error::AcpError::ResourceLink(format!(
"file:// content exceeds size limit ({MAX_RESOURCE_BYTES} bytes): {uri}"
)));
}
let canonical = tokio::fs::canonicalize(path).await.map_err(|e| {
crate::error::AcpError::ResourceLink(format!("file:// resolution failed: {e}"))
})?;
if !canonical.starts_with(session_cwd) {
return Err(crate::error::AcpError::ResourceLink(format!(
"file:// path outside session working directory: {uri}"
)));
}
for component in canonical.components() {
if let Component::Normal(name) = component {
let name_str = name.to_string_lossy();
if BLOCKED_PATH_COMPONENTS
.iter()
.any(|blocked| name_str == *blocked)
{
return Err(crate::error::AcpError::ResourceLink(format!(
"file:// path blocked: {uri}"
)));
}
}
}
let bytes = tokio::time::timeout(RESOURCE_FETCH_TIMEOUT, tokio::fs::read(&canonical))
.await
.map_err(|_| {
crate::error::AcpError::ResourceLink(format!("file:// read timed out: {uri}"))
})?
.map_err(|e| {
crate::error::AcpError::ResourceLink(format!("file:// read failed: {e}"))
})?;
if bytes.contains(&0u8) {
return Err(crate::error::AcpError::ResourceLink(format!(
"binary file not supported as ResourceLink content: {uri}"
)));
}
String::from_utf8(bytes).map_err(|_| {
crate::error::AcpError::ResourceLink(format!(
"file:// content is not valid UTF-8: {uri}"
))
})
} else if uri.starts_with("http://") || uri.starts_with("https://") {
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.timeout(RESOURCE_FETCH_TIMEOUT)
.build()
.map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP client error: {e}")))?;
let resp = client
.get(uri.as_str())
.header(reqwest::header::ACCEPT, "text/*")
.send()
.await
.map_err(|e| crate::error::AcpError::ResourceLink(format!("HTTP fetch failed: {e}")))?;
match resp.remote_addr() {
None => {
return Err(crate::error::AcpError::ResourceLink(format!(
"SSRF check failed: remote address unavailable for {uri}"
)));
}
Some(remote_addr) if is_private_ip(remote_addr.ip()) => {
return Err(crate::error::AcpError::ResourceLink(format!(
"SSRF blocked: {uri} resolved to private address {remote_addr}"
)));
}
Some(_) => {}
}
if !resp.status().is_success() {
return Err(crate::error::AcpError::ResourceLink(format!(
"HTTP fetch returned {}: {uri}",
resp.status()
)));
}
let content_type = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !content_type.is_empty() && !content_type.starts_with("text/") {
return Err(crate::error::AcpError::ResourceLink(format!(
"non-text MIME type rejected for ResourceLink: {content_type}"
)));
}
let mut body = resp.bytes_stream();
let mut buf = Vec::with_capacity(4096);
while let Some(chunk) = body.next().await {
let chunk = chunk.map_err(|e| {
crate::error::AcpError::ResourceLink(format!("HTTP read error: {e}"))
})?;
if buf.len() + chunk.len() > MAX_RESOURCE_BYTES {
buf.extend_from_slice(&chunk[..MAX_RESOURCE_BYTES.saturating_sub(buf.len())]);
break;
}
buf.extend_from_slice(&chunk);
}
String::from_utf8(buf).map_err(|_| {
crate::error::AcpError::ResourceLink(format!(
"HTTP response body is not valid UTF-8: {uri}"
))
})
} else {
Err(crate::error::AcpError::ResourceLink(format!(
"unsupported URI scheme in ResourceLink: {uri}"
)))
}
}
pub struct AcpContext {
pub file_executor: Option<AcpFileExecutor>,
pub shell_executor: Option<AcpShellExecutor>,
pub permission_gate: Option<AcpPermissionGate>,
pub cancel_signal: std::sync::Arc<tokio::sync::Notify>,
pub provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>>,
pub parent_tool_use_id: Option<String>,
pub lsp_provider: Option<crate::lsp::AcpLspProvider>,
pub diagnostics_cache: Arc<std::sync::RwLock<DiagnosticsCache>>,
}
pub type AgentSpawner = Arc<
dyn Fn(
LoopbackChannel,
Option<AcpContext>,
SessionContext,
) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>
+ Send
+ Sync
+ 'static,
>;
#[cfg(feature = "acp-http")]
pub type SendAgentSpawner = AgentSpawner;
pub(crate) type NotifySender =
mpsc::UnboundedSender<(acp::SessionNotification, oneshot::Sender<()>)>;
pub(crate) struct SessionEntry {
pub(crate) input_tx: mpsc::Sender<ChannelMessage>,
pub(crate) output_rx: RefCell<Option<mpsc::Receiver<LoopbackEvent>>>,
pub(crate) cancel_signal: std::sync::Arc<tokio::sync::Notify>,
pub(crate) last_active: std::cell::Cell<std::time::Instant>,
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
pub(crate) working_dir: RefCell<Option<std::path::PathBuf>>,
provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>>,
current_model: RefCell<String>,
current_mode: RefCell<acp::SessionModeId>,
first_prompt_done: std::cell::Cell<bool>,
title: RefCell<Option<String>>,
thinking_enabled: std::cell::Cell<bool>,
auto_approve_level: RefCell<String>,
pub(crate) shell_executor: Option<AcpShellExecutor>,
}
type SessionMap = Rc<RefCell<std::collections::HashMap<acp::SessionId, SessionEntry>>>;
pub struct ZephAcpAgent {
notify_tx: NotifySender,
spawner: AgentSpawner,
pub(crate) sessions: SessionMap,
conn_slot: ConnSlot,
agent_name: String,
agent_version: String,
max_sessions: usize,
idle_timeout: std::time::Duration,
pub(crate) store: Option<SqliteStore>,
permission_file: Option<std::path::PathBuf>,
client_caps: RefCell<acp::ClientCapabilities>,
provider_factory: Option<ProviderFactory>,
available_models: SharedAvailableModels,
mcp_manager: Option<Arc<McpManager>>,
project_rules: Vec<std::path::PathBuf>,
title_max_chars: usize,
max_history: usize,
lsp_config: zeph_core::config::AcpLspConfig,
diagnostics_cache: Arc<std::sync::RwLock<DiagnosticsCache>>,
}
impl ZephAcpAgent {
pub fn new(
spawner: AgentSpawner,
notify_tx: NotifySender,
conn_slot: ConnSlot,
max_sessions: usize,
session_idle_timeout_secs: u64,
permission_file: Option<std::path::PathBuf>,
) -> Self {
let lsp_config = zeph_core::config::AcpLspConfig::default();
let max_diag_files = lsp_config.max_diagnostic_files;
Self {
notify_tx,
spawner,
sessions: Rc::new(RefCell::new(std::collections::HashMap::new())),
conn_slot,
agent_name: "zeph".to_owned(),
agent_version: env!("CARGO_PKG_VERSION").to_owned(),
max_sessions,
idle_timeout: std::time::Duration::from_secs(session_idle_timeout_secs),
store: None,
permission_file,
client_caps: RefCell::new(acp::ClientCapabilities::default()),
provider_factory: None,
available_models: Arc::new(std::sync::RwLock::new(Vec::new())),
mcp_manager: None,
project_rules: Vec::new(),
title_max_chars: 60,
max_history: 100,
lsp_config,
diagnostics_cache: Arc::new(std::sync::RwLock::new(DiagnosticsCache::new(
max_diag_files,
))),
}
}
#[must_use]
pub fn with_lsp_config(mut self, config: zeph_core::config::AcpLspConfig) -> Self {
let max_files = config.max_diagnostic_files;
self.lsp_config = config;
self.diagnostics_cache = Arc::new(std::sync::RwLock::new(DiagnosticsCache::new(max_files)));
self
}
#[must_use]
pub fn with_store(mut self, store: SqliteStore) -> Self {
self.store = Some(store);
self
}
#[must_use]
pub fn with_agent_info(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
self.agent_name = name.into();
self.agent_version = version.into();
self
}
#[must_use]
pub fn with_provider_factory(
mut self,
factory: ProviderFactory,
available_models: SharedAvailableModels,
) -> Self {
self.provider_factory = Some(factory);
self.available_models = available_models;
self
}
fn available_models_snapshot(&self) -> Vec<String> {
self.available_models
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn initial_model(&self) -> String {
self.available_models_snapshot()
.into_iter()
.next()
.unwrap_or_default()
}
#[must_use]
pub fn with_mcp_manager(mut self, manager: Arc<McpManager>) -> Self {
self.mcp_manager = Some(manager);
self
}
#[must_use]
pub fn with_project_rules(mut self, rules: Vec<std::path::PathBuf>) -> Self {
self.project_rules = rules;
self
}
#[must_use]
pub fn with_title_max_chars(mut self, max_chars: usize) -> Self {
self.title_max_chars = max_chars;
self
}
#[must_use]
pub fn with_max_history(mut self, max_history: usize) -> Self {
self.max_history = max_history;
self
}
pub fn start_idle_reaper(&self) {
let sessions = Rc::clone(&self.sessions);
let idle_timeout = self.idle_timeout;
tokio::task::spawn_local(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
interval.tick().await; loop {
interval.tick().await;
let now = std::time::Instant::now();
let expired: Vec<acp::SessionId> = sessions
.borrow()
.iter()
.filter(|(_, e)| {
e.output_rx.borrow().is_some()
&& now.duration_since(e.last_active.get()) > idle_timeout
})
.map(|(id, _)| id.clone())
.collect();
for id in expired {
if let Some(entry) = sessions.borrow_mut().remove(&id) {
entry.cancel_signal.notify_one();
tracing::debug!(session_id = %id, "evicted idle ACP session (timeout)");
}
}
}
});
}
fn build_acp_context(
&self,
session_id: &acp::SessionId,
cancel_signal: std::sync::Arc<tokio::sync::Notify>,
provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>>,
cwd: PathBuf,
) -> Option<AcpContext> {
let conn_guard = self.conn_slot.borrow();
let conn = conn_guard.as_ref()?;
let (perm_gate, perm_handler) =
AcpPermissionGate::new(Rc::clone(conn), self.permission_file.clone());
tokio::task::spawn_local(perm_handler);
let caps = self.client_caps.borrow();
let can_read = caps.fs.read_text_file;
let can_write = caps.fs.write_text_file;
let ide_supports_lsp =
self.lsp_config.enabled && caps.meta.as_ref().is_some_and(|m| m.contains_key("lsp"));
drop(caps);
let (fs_exec, fs_handler) = AcpFileExecutor::new(
Rc::clone(conn),
session_id.clone(),
can_read,
can_write,
cwd,
Some(perm_gate.clone()),
);
tokio::task::spawn_local(fs_handler);
let (shell_exec, shell_handler) = AcpShellExecutor::new(
Rc::clone(conn),
session_id.clone(),
Some(perm_gate.clone()),
120,
);
tokio::task::spawn_local(shell_handler);
let lsp_provider = if ide_supports_lsp {
let (provider, handler) = crate::lsp::AcpLspProvider::new(
Rc::clone(conn),
true,
self.lsp_config.request_timeout_secs,
self.lsp_config.max_references,
self.lsp_config.max_workspace_symbols,
);
tokio::task::spawn_local(handler);
Some(provider)
} else {
None
};
Some(AcpContext {
file_executor: Some(fs_exec),
shell_executor: Some(shell_exec),
permission_gate: Some(perm_gate),
cancel_signal,
provider_override,
parent_tool_use_id: None,
lsp_provider,
diagnostics_cache: Arc::clone(&self.diagnostics_cache),
})
}
async fn send_notification(&self, notification: acp::SessionNotification) -> acp::Result<()> {
let (tx, rx) = oneshot::channel();
self.notify_tx
.send((notification, tx))
.map_err(|_| acp::Error::internal_error().data("notification channel closed"))?;
rx.await
.map_err(|_| acp::Error::internal_error().data("notification ack lost"))
}
fn handle_lsp_publish_diagnostics(&self, params: &str) {
#[derive(serde::Deserialize)]
struct PublishDiagnosticsParams {
uri: String,
#[serde(default)]
diagnostics: Vec<crate::lsp::LspDiagnostic>,
}
match serde_json::from_str::<PublishDiagnosticsParams>(params) {
Ok(p) => {
let max = self.lsp_config.max_diagnostics_per_file;
let mut diags = p.diagnostics;
diags.truncate(max);
tracing::debug!(
uri = %p.uri,
count = diags.len(),
"lsp/publishDiagnostics: cached"
);
self.diagnostics_cache
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.update(p.uri, diags);
}
Err(e) => {
tracing::warn!(error = %e, "lsp/publishDiagnostics: failed to parse params");
}
}
}
async fn handle_lsp_did_save(&self, params: &str) {
#[derive(serde::Deserialize)]
struct DidSaveParams {
uri: String,
}
use acp::Client as _;
if !self.lsp_config.auto_diagnostics_on_save {
return;
}
let uri = match serde_json::from_str::<DidSaveParams>(params) {
Ok(p) => p.uri,
Err(e) => {
tracing::warn!(error = %e, "lsp/didSave: failed to parse params");
return;
}
};
let conn = {
let guard = self.conn_slot.borrow();
guard.as_ref().cloned()
};
let Some(conn) = conn else {
return;
};
let params_json = serde_json::json!({ "uri": &uri });
let raw = match serde_json::value::to_raw_value(¶ms_json) {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "lsp/didSave: failed to serialize params");
return;
}
};
let req = acp::ExtRequest::new("lsp/diagnostics", std::sync::Arc::from(raw));
let timeout = std::time::Duration::from_secs(self.lsp_config.request_timeout_secs);
match tokio::time::timeout(timeout, conn.ext_method(req)).await {
Ok(Ok(resp)) => {
match serde_json::from_str::<Vec<crate::lsp::LspDiagnostic>>(resp.0.get()) {
Ok(mut diags) => {
let max = self.lsp_config.max_diagnostics_per_file;
diags.truncate(max);
tracing::debug!(
uri = %uri,
count = diags.len(),
"lsp/didSave: fetched diagnostics"
);
self.diagnostics_cache
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.update(uri, diags);
}
Err(e) => {
tracing::warn!(error = %e, "lsp/didSave: failed to parse diagnostics response");
}
}
}
Ok(Err(e)) => {
tracing::warn!(error = %e, "lsp/didSave: diagnostics request failed");
}
Err(_) => {
tracing::warn!(uri = %uri, "lsp/didSave: diagnostics request timed out");
}
}
}
}
#[derive(serde::Deserialize)]
struct McpRemoveParams {
id: String,
}
async fn resolve_conversation_id(
store: &zeph_memory::store::SqliteStore,
session_id: &acp::SessionId,
) -> Option<ConversationId> {
match store
.get_acp_session_conversation_id(&session_id.to_string())
.await
{
Ok(Some(cid)) => Some(cid),
Ok(None) => {
match store.create_conversation().await {
Ok(cid) => {
if let Err(e) = store
.set_acp_session_conversation_id(&session_id.to_string(), cid)
.await
{
tracing::warn!(error = %e, "failed to set conversation_id for legacy session");
}
Some(cid)
}
Err(e) => {
tracing::warn!(error = %e, "failed to create conversation for legacy session; session will have no persistent history");
None
}
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to look up conversation_id; session will have no persistent history");
None
}
}
}
#[async_trait::async_trait(?Send)]
impl acp::Agent for ZephAcpAgent {
async fn initialize(
&self,
args: acp::InitializeRequest,
) -> acp::Result<acp::InitializeResponse> {
tracing::debug!("ACP initialize");
*self.client_caps.borrow_mut() = args.client_capabilities;
let title = format!("{} AI Agent", self.agent_name);
let mut meta = serde_json::Map::new();
meta.insert(
"auth_hint".to_owned(),
serde_json::json!("authentication required"),
);
let mut caps = acp::AgentCapabilities::new()
.load_session(true)
.prompt_capabilities(
acp::PromptCapabilities::new()
.image(true)
.embedded_context(true),
)
.meta({
let mut cap_meta = serde_json::Map::new();
cap_meta.insert("config_options".to_owned(), serde_json::json!(true));
cap_meta.insert("ext_methods".to_owned(), serde_json::json!(true));
if self.lsp_config.enabled {
cap_meta.insert(
"lsp".to_owned(),
serde_json::json!({
"methods": crate::lsp::LSP_METHODS,
"notifications": crate::lsp::LSP_NOTIFICATIONS,
}),
);
}
cap_meta
});
if self.mcp_manager.is_some() {
caps = caps.mcp_capabilities(acp::McpCapabilities::new().http(true).sse(false));
}
#[cfg(any(
feature = "unstable-session-close",
feature = "unstable-session-fork",
feature = "unstable-session-resume",
))]
let caps = {
let mut session_caps = acp::SessionCapabilities::new();
session_caps = session_caps.list(acp::SessionListCapabilities::default());
#[cfg(feature = "unstable-session-close")]
{
session_caps = session_caps.close(acp::SessionCloseCapabilities::default());
}
#[cfg(feature = "unstable-session-fork")]
{
session_caps = session_caps.fork(acp::SessionForkCapabilities::default());
}
#[cfg(feature = "unstable-session-resume")]
{
session_caps = session_caps.resume(acp::SessionResumeCapabilities::default());
}
caps.session_capabilities(session_caps)
};
#[cfg(feature = "unstable-logout")]
let caps = caps
.auth(acp::AgentAuthCapabilities::default().logout(acp::LogoutCapabilities::default()));
Ok(acp::InitializeResponse::new(acp::ProtocolVersion::LATEST)
.auth_methods(vec![acp::AuthMethod::Agent(acp::AuthMethodAgent::new(
"zeph", "Zeph",
))])
.agent_info(
acp::Implementation::new(&self.agent_name, &self.agent_version).title(title),
)
.agent_capabilities(caps)
.meta(meta))
}
async fn ext_method(&self, args: acp::ExtRequest) -> acp::Result<acp::ExtResponse> {
if let Some(fut) = crate::custom::dispatch(self, &args) {
return fut.await;
}
self.ext_method_mcp(&args).await
}
async fn ext_notification(&self, args: acp::ExtNotification) -> acp::Result<()> {
tracing::debug!(method = %args.method, "received ext_notification");
match args.method.as_ref() {
"lsp/publishDiagnostics" => {
self.handle_lsp_publish_diagnostics(args.params.get());
}
"lsp/didSave" => {
self.handle_lsp_did_save(args.params.get()).await;
}
_ => {}
}
Ok(())
}
async fn authenticate(
&self,
_args: acp::AuthenticateRequest,
) -> acp::Result<acp::AuthenticateResponse> {
Ok(acp::AuthenticateResponse::default())
}
#[cfg(feature = "unstable-logout")]
async fn logout(&self, _args: acp::LogoutRequest) -> acp::Result<acp::LogoutResponse> {
tracing::debug!("ACP logout (no-op: vault-based auth)");
Ok(acp::LogoutResponse::default())
}
async fn new_session(
&self,
args: acp::NewSessionRequest,
) -> acp::Result<acp::NewSessionResponse> {
if self.sessions.borrow().len() >= self.max_sessions {
let evict_id = {
let sessions = self.sessions.borrow();
sessions
.iter()
.filter(|(_, e)| e.output_rx.borrow().is_some())
.min_by_key(|(_, e)| e.last_active.get())
.map(|(id, _)| id.clone())
};
match evict_id {
Some(id) => {
if let Some(entry) = self.sessions.borrow_mut().remove(&id) {
entry.cancel_signal.notify_one();
tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
}
}
None => {
return Err(acp::Error::internal_error().data("session limit reached"));
}
}
}
let session_id = acp::SessionId::new(uuid::Uuid::new_v4().to_string());
tracing::debug!(%session_id, "new ACP session");
let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
let cancel_signal = std::sync::Arc::clone(&handle.cancel_signal);
let provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>> =
Arc::new(std::sync::RwLock::new(None));
let provider_override_for_ctx = Arc::clone(&provider_override);
let session_cwd = args.cwd.clone();
let acp_ctx = self.build_acp_context(
&session_id,
cancel_signal,
provider_override_for_ctx,
session_cwd.clone(),
);
let shell_executor = acp_ctx.as_ref().and_then(|c| c.shell_executor.clone());
let initial_model = self.initial_model();
let entry = Self::make_session_entry(
handle,
initial_model.clone(),
session_cwd.clone(),
shell_executor,
provider_override,
);
self.sessions.borrow_mut().insert(session_id.clone(), entry);
let conversation_id = self.create_session_conversation(&session_id).await;
let session_ctx = SessionContext {
session_id: session_id.clone(),
conversation_id,
working_dir: session_cwd.clone(),
};
let spawner = Arc::clone(&self.spawner);
tokio::task::spawn_local(async move {
(spawner)(channel, acp_ctx, session_ctx).await;
});
let available_models = self.available_models_snapshot();
let config_options =
build_config_options(&available_models, &initial_model, false, "suggest");
let default_mode_id = acp::SessionModeId::new(DEFAULT_MODE_ID);
let mut resp = acp::NewSessionResponse::new(session_id.clone())
.modes(build_mode_state(&default_mode_id));
if !config_options.is_empty() {
resp = resp.config_options(config_options);
}
if !self.project_rules.is_empty() {
let rules: Vec<serde_json::Value> = self
.project_rules
.iter()
.filter_map(|p| p.file_name())
.map(|n| serde_json::json!({"name": n.to_string_lossy()}))
.collect();
let mut meta = serde_json::Map::new();
meta.insert("projectRules".to_owned(), serde_json::Value::Array(rules));
resp = resp.meta(meta);
}
self.send_commands_update_nowait(session_id);
Ok(resp)
}
async fn prompt(&self, args: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
tracing::debug!(session_id = %args.session_id, "ACP prompt");
let session_cwd = self
.sessions
.borrow()
.get(&args.session_id)
.and_then(|e| e.working_dir.borrow().clone())
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
let (text, attachments) = self
.collect_prompt_content(&args.prompt, &session_cwd)
.await?;
let trimmed_text = text.trim_start();
if trimmed_text.starts_with('/') {
let is_acp_native = trimmed_text == "/help"
|| trimmed_text.starts_with("/help ")
|| trimmed_text == "/mode"
|| trimmed_text.starts_with("/mode ")
|| trimmed_text == "/clear"
|| trimmed_text.starts_with("/review")
|| trimmed_text == "/model"
|| trimmed_text.starts_with("/model ");
if is_acp_native {
return self
.handle_slash_command(&args.session_id, trimmed_text)
.await;
}
}
let (input_tx, output_rx) = {
let sessions = self.sessions.borrow();
let entry = sessions
.get(&args.session_id)
.ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
let rx =
entry.output_rx.borrow_mut().take().ok_or_else(|| {
acp::Error::internal_error().data("prompt already in progress")
})?;
entry.last_active.set(std::time::Instant::now());
(entry.input_tx.clone(), rx)
};
if let Some(ref store) = self.store {
let sid = args.session_id.to_string();
let payload = text.clone();
let store = store.clone();
tokio::task::spawn_local(async move {
if let Err(e) = store.save_acp_event(&sid, "user_message", &payload).await {
tracing::warn!(error = %e, "failed to persist user message");
}
});
}
input_tx
.send(ChannelMessage {
text: text.clone(),
attachments,
})
.await
.map_err(|_| acp::Error::internal_error().data("agent channel closed"))?;
let cancel_signal = self
.sessions
.borrow()
.get(&args.session_id)
.map(|e| std::sync::Arc::clone(&e.cancel_signal));
let (cancelled, stop_hint, rx) = self
.drain_agent_events(&args.session_id, output_rx, cancel_signal)
.await;
if let Some(entry) = self.sessions.borrow().get(&args.session_id) {
*entry.output_rx.borrow_mut() = Some(rx);
}
let stop_reason = if cancelled {
acp::StopReason::Cancelled
} else {
match stop_hint {
Some(StopHint::MaxTokens) => acp::StopReason::MaxTokens,
Some(StopHint::MaxTurnRequests) => acp::StopReason::MaxTurnRequests,
None => acp::StopReason::EndTurn,
}
};
if !cancelled {
self.maybe_generate_session_title(&args.session_id, &text);
}
Ok(acp::PromptResponse::new(stop_reason))
}
async fn cancel(&self, args: acp::CancelNotification) -> acp::Result<()> {
tracing::debug!(session_id = %args.session_id, "ACP cancel");
if let Some(entry) = self.sessions.borrow().get(&args.session_id) {
entry.cancel_signal.notify_one();
}
Ok(())
}
#[cfg(feature = "unstable-session-close")]
async fn close_session(
&self,
args: acp::CloseSessionRequest,
) -> acp::Result<acp::CloseSessionResponse> {
tracing::debug!(session_id = %args.session_id, "ACP session closed");
if let Some(entry) = self.sessions.borrow_mut().remove(&args.session_id) {
entry.cancel_signal.notify_one();
}
Ok(acp::CloseSessionResponse::default())
}
async fn load_session(
&self,
args: acp::LoadSessionRequest,
) -> acp::Result<acp::LoadSessionResponse> {
if self.sessions.borrow().contains_key(&args.session_id) {
return Ok(acp::LoadSessionResponse::new());
}
let Some(ref store) = self.store else {
return Err(acp::Error::internal_error().data("session not found"));
};
let exists = store
.acp_session_exists(&args.session_id.to_string())
.await
.map_err(|e| {
tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
acp::Error::internal_error().data("internal error")
})?;
if !exists {
return Err(acp::Error::internal_error().data("session not found"));
}
let events = store
.load_acp_events(&args.session_id.to_string())
.await
.map_err(|e| {
tracing::warn!(error = %e, session_id = %args.session_id, "failed to load ACP session events");
acp::Error::internal_error().data("internal error")
})?;
let session_cwd = args.cwd.clone();
let conversation_id = resolve_conversation_id(store, &args.session_id).await;
let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
let cancel_signal = std::sync::Arc::clone(&handle.cancel_signal);
let provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>> =
Arc::new(std::sync::RwLock::new(None));
let provider_override_for_ctx = Arc::clone(&provider_override);
let acp_ctx = self.build_acp_context(
&args.session_id,
cancel_signal,
provider_override_for_ctx,
session_cwd.clone(),
);
let shell_executor = acp_ctx.as_ref().and_then(|c| c.shell_executor.clone());
let initial_model = self.initial_model();
let entry = Self::make_session_entry(
handle,
initial_model,
session_cwd.clone(),
shell_executor,
provider_override,
);
self.sessions
.borrow_mut()
.insert(args.session_id.clone(), entry);
let session_ctx = SessionContext {
session_id: args.session_id.clone(),
conversation_id,
working_dir: session_cwd,
};
let spawner = Arc::clone(&self.spawner);
tokio::task::spawn_local(async move {
(spawner)(channel, acp_ctx, session_ctx).await;
});
self.replay_session_events(&args.session_id, events).await;
let default_mode_id = acp::SessionModeId::new(DEFAULT_MODE_ID);
let load_resp = acp::LoadSessionResponse::new().modes(build_mode_state(&default_mode_id));
self.send_commands_update_nowait(args.session_id);
Ok(load_resp)
}
async fn list_sessions(
&self,
args: acp::ListSessionsRequest,
) -> acp::Result<acp::ListSessionsResponse> {
let mut result: std::collections::HashMap<String, acp::SessionInfo> = {
let sessions = self.sessions.borrow();
sessions
.iter()
.filter_map(|(session_id, entry)| {
let working_dir = entry.working_dir.borrow().clone().unwrap_or_default();
if let Some(ref filter) = args.cwd
&& &working_dir != filter
{
return None;
}
let meta = model_meta(&entry.current_model.borrow());
let mut info = acp::SessionInfo::new(session_id.clone(), working_dir)
.updated_at(entry.created_at.to_rfc3339())
.meta(meta);
if let Some(ref t) = *entry.title.borrow() {
info = info.title(t.clone());
}
Some((session_id.to_string(), info))
})
.collect()
};
if let Some(ref store) = self.store {
match store.list_acp_sessions(self.max_history).await {
Ok(persisted) => {
for persisted_info in persisted {
let sid = acp::SessionId::new(&*persisted_info.id);
if result.contains_key(&persisted_info.id) {
continue;
}
let info = acp::SessionInfo::new(sid, std::path::PathBuf::new())
.title(persisted_info.title)
.updated_at(persisted_info.updated_at);
result.insert(persisted_info.id, info);
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to list persisted ACP sessions");
}
}
}
let mut sessions_vec: Vec<acp::SessionInfo> = result.into_values().collect();
sessions_vec.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
Ok(acp::ListSessionsResponse::new(sessions_vec))
}
#[cfg(feature = "unstable-session-fork")]
async fn fork_session(
&self,
args: acp::ForkSessionRequest,
) -> acp::Result<acp::ForkSessionResponse> {
let in_memory = self.sessions.borrow().contains_key(&args.session_id);
let store = self.store.as_ref();
if !in_memory {
match store {
None => return Err(acp::Error::internal_error().data("session not found")),
Some(s) => {
let exists = s
.acp_session_exists(&args.session_id.to_string())
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to check ACP session existence");
acp::Error::internal_error().data("internal error")
})?;
if !exists {
return Err(acp::Error::internal_error().data("session not found"));
}
}
}
}
if self.sessions.borrow().len() >= self.max_sessions {
let evict_id = {
let sessions = self.sessions.borrow();
sessions
.iter()
.filter(|(_, e)| e.output_rx.borrow().is_some())
.min_by_key(|(_, e)| e.last_active.get())
.map(|(id, _)| id.clone())
};
match evict_id {
Some(id) => {
if let Some(entry) = self.sessions.borrow_mut().remove(&id) {
entry.cancel_signal.notify_one();
tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
}
}
None => {
return Err(acp::Error::internal_error().data("session limit reached"));
}
}
}
let new_id = acp::SessionId::new(uuid::Uuid::new_v4().to_string());
tracing::debug!(
source = %args.session_id,
new = %new_id,
"forking ACP session"
);
let new_conversation_id = self.fork_conversation(&args.session_id, &new_id).await?;
let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
let cancel_signal = std::sync::Arc::clone(&handle.cancel_signal);
let provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>> =
Arc::new(std::sync::RwLock::new(None));
let provider_override_for_ctx = Arc::clone(&provider_override);
let acp_ctx = self.build_acp_context(
&new_id,
cancel_signal,
provider_override_for_ctx,
args.cwd.clone(),
);
let shell_executor = acp_ctx.as_ref().and_then(|c| c.shell_executor.clone());
let initial_model = self.initial_model();
let entry = Self::make_session_entry(
handle,
initial_model.clone(),
args.cwd.clone(),
shell_executor,
provider_override,
);
self.sessions.borrow_mut().insert(new_id.clone(), entry);
let session_ctx = SessionContext {
session_id: new_id.clone(),
conversation_id: new_conversation_id,
working_dir: args.cwd.clone(),
};
let spawner = Arc::clone(&self.spawner);
tokio::task::spawn_local(async move {
(spawner)(channel, acp_ctx, session_ctx).await;
});
let available_models = self.available_models_snapshot();
let config_options =
build_config_options(&available_models, &initial_model, false, "suggest");
let default_mode_id = acp::SessionModeId::new(DEFAULT_MODE_ID);
let mut resp =
acp::ForkSessionResponse::new(new_id).modes(build_mode_state(&default_mode_id));
if !config_options.is_empty() {
resp = resp.config_options(config_options);
}
Ok(resp)
}
#[cfg(feature = "unstable-session-resume")]
async fn resume_session(
&self,
args: acp::ResumeSessionRequest,
) -> acp::Result<acp::ResumeSessionResponse> {
if self.sessions.borrow().contains_key(&args.session_id) {
return Ok(acp::ResumeSessionResponse::new());
}
let Some(ref store) = self.store else {
return Err(acp::Error::internal_error().data("session not found"));
};
let exists = store
.acp_session_exists(&args.session_id.to_string())
.await
.map_err(|e| {
tracing::warn!(error = %e, session_id = %args.session_id, "failed to check ACP session existence");
acp::Error::internal_error().data("internal error")
})?;
if !exists {
return Err(acp::Error::internal_error().data("session not found"));
}
if self.sessions.borrow().len() >= self.max_sessions {
let evict_id = {
let sessions = self.sessions.borrow();
sessions
.iter()
.filter(|(id, e)| *id != &args.session_id && e.output_rx.borrow().is_some())
.min_by_key(|(_, e)| e.last_active.get())
.map(|(id, _)| id.clone())
};
match evict_id {
Some(id) => {
if let Some(entry) = self.sessions.borrow_mut().remove(&id) {
entry.cancel_signal.notify_one();
tracing::debug!(session_id = %id, "evicted idle ACP session (LRU)");
}
}
None => {
return Err(acp::Error::internal_error().data("session limit reached"));
}
}
}
let conversation_id = resolve_conversation_id(store, &args.session_id).await;
let (channel, handle) = LoopbackChannel::pair(LOOPBACK_CHANNEL_CAPACITY);
let cancel_signal = std::sync::Arc::clone(&handle.cancel_signal);
let provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>> =
Arc::new(std::sync::RwLock::new(None));
let provider_override_for_ctx = Arc::clone(&provider_override);
let acp_ctx = self.build_acp_context(
&args.session_id,
cancel_signal,
provider_override_for_ctx,
args.cwd.clone(),
);
let shell_executor = acp_ctx.as_ref().and_then(|c| c.shell_executor.clone());
let initial_model = self.initial_model();
let entry = Self::make_session_entry(
handle,
initial_model,
args.cwd.clone(),
shell_executor,
provider_override,
);
self.sessions
.borrow_mut()
.insert(args.session_id.clone(), entry);
let session_ctx = SessionContext {
session_id: args.session_id.clone(),
conversation_id,
working_dir: args.cwd,
};
let spawner = Arc::clone(&self.spawner);
tokio::task::spawn_local(async move {
(spawner)(channel, acp_ctx, session_ctx).await;
});
Ok(acp::ResumeSessionResponse::new())
}
async fn set_session_config_option(
&self,
args: acp::SetSessionConfigOptionRequest,
) -> acp::Result<acp::SetSessionConfigOptionResponse> {
let config_id = args.config_id.0.clone();
let value: &str = &args.value.0;
let (current_model, thinking, auto_approve) = {
let sessions = self.sessions.borrow();
let entry = sessions
.get(&args.session_id)
.ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
self.apply_session_config(entry, config_id.as_ref(), value, &args.session_id)?;
(
entry.current_model.borrow().clone(),
entry.thinking_enabled.get(),
entry.auto_approve_level.borrow().clone(),
)
};
let config_options = build_config_options(
&self.available_models_snapshot(),
¤t_model,
thinking,
&auto_approve,
);
let changed_option = config_options.iter().find(|o| o.id.0 == config_id).cloned();
if let Some(option) = changed_option {
let update =
acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate::new(vec![option]));
let notification = acp::SessionNotification::new(args.session_id.clone(), update);
let (tx, _rx) = oneshot::channel();
if self.notify_tx.send((notification, tx)).is_err() {
tracing::warn!("failed to send ConfigOptionUpdate notification: channel closed");
}
if config_id.as_ref() == "model" {
let info_update = acp::SessionUpdate::SessionInfoUpdate(
acp::SessionInfoUpdate::new().meta(model_meta(¤t_model)),
);
let info_notification = acp::SessionNotification::new(args.session_id, info_update);
let (tx2, _rx2) = oneshot::channel();
if self.notify_tx.send((info_notification, tx2)).is_err() {
tracing::warn!("failed to send SessionInfoUpdate notification: channel closed");
}
}
}
Ok(acp::SetSessionConfigOptionResponse::new(config_options))
}
async fn set_session_mode(
&self,
args: acp::SetSessionModeRequest,
) -> acp::Result<acp::SetSessionModeResponse> {
let valid_ids: &[&str] = &["code", "architect", "ask"];
let mode_str = args.mode_id.0.as_ref();
if !valid_ids.contains(&mode_str) {
return Err(acp::Error::invalid_request().data(format!("unknown mode: {mode_str}")));
}
{
let sessions = self.sessions.borrow();
let entry = sessions
.get(&args.session_id)
.ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
*entry.current_mode.borrow_mut() = args.mode_id.clone();
}
tracing::debug!(
session_id = %args.session_id,
mode = %mode_str,
"ACP session mode switched"
);
let update = acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate::new(
args.mode_id.clone(),
));
let notification = acp::SessionNotification::new(args.session_id, update);
if let Err(e) = self.send_notification(notification).await {
tracing::warn!(error = %e, "failed to send current_mode_update");
}
Ok(acp::SetSessionModeResponse::new())
}
#[cfg(feature = "unstable-session-model")]
async fn set_session_model(
&self,
args: acp::SetSessionModelRequest,
) -> acp::Result<acp::SetSessionModelResponse> {
let model_id: &str = &args.model_id.0;
let Some(ref factory) = self.provider_factory else {
return Err(acp::Error::internal_error().data("model switching not configured"));
};
if !self
.available_models_snapshot()
.iter()
.any(|m| m == model_id)
{
return Err(acp::Error::invalid_request().data("model not in allowed list"));
}
let Some(new_provider) = factory(model_id) else {
return Err(acp::Error::invalid_request().data("unknown model"));
};
let sessions = self.sessions.borrow();
let entry = sessions
.get(&args.session_id)
.ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
*entry
.provider_override
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(new_provider);
model_id.clone_into(&mut entry.current_model.borrow_mut());
tracing::debug!(
session_id = %args.session_id,
model = %model_id,
"ACP session model switched via set_session_model"
);
let info_update = acp::SessionUpdate::SessionInfoUpdate(
acp::SessionInfoUpdate::new().meta(model_meta(model_id)),
);
let notification = acp::SessionNotification::new(args.session_id, info_update);
let (tx, _rx) = oneshot::channel();
if self.notify_tx.send((notification, tx)).is_err() {
tracing::warn!("failed to send SessionInfoUpdate notification: channel closed");
}
Ok(acp::SetSessionModelResponse::new())
}
}
impl ZephAcpAgent {
fn apply_session_config(
&self,
entry: &SessionEntry,
config_id: &str,
value: &str,
session_id: &acp::SessionId,
) -> acp::Result<()> {
match config_id {
"model" => {
let Some(ref factory) = self.provider_factory else {
return Err(acp::Error::internal_error().data("model switching not configured"));
};
let available_models = self.available_models_snapshot();
if !available_models.iter().any(|m| m == value) {
return Err(acp::Error::invalid_request().data("model not in allowed list"));
}
let Some(new_provider) = factory(value) else {
return Err(acp::Error::invalid_request().data("unknown model"));
};
*entry
.provider_override
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(new_provider);
value.clone_into(&mut entry.current_model.borrow_mut());
tracing::debug!(session_id = %session_id, model = %value, "ACP model switched");
}
"thinking" => {
let enabled = match value {
"on" => true,
"off" => false,
_ => {
return Err(
acp::Error::invalid_request().data("thinking value must be on or off")
);
}
};
entry.thinking_enabled.set(enabled);
tracing::debug!(session_id = %session_id, thinking = %enabled, "ACP thinking toggled");
}
"auto_approve" => {
if !["suggest", "auto-edit", "full-auto"].contains(&value) {
return Err(acp::Error::invalid_request()
.data("auto_approve must be suggest, auto-edit, or full-auto"));
}
value.clone_into(&mut entry.auto_approve_level.borrow_mut());
tracing::debug!(session_id = %session_id, auto_approve = %value, "ACP auto-approve level changed");
}
_ => {
return Err(acp::Error::invalid_request().data("unknown config_id"));
}
}
Ok(())
}
async fn handle_slash_command(
&self,
session_id: &acp::SessionId,
text: &str,
) -> acp::Result<acp::PromptResponse> {
let mut parts = text.splitn(2, ' ');
let cmd = parts.next().unwrap_or("").trim();
let arg = parts.next().unwrap_or("").trim();
let reply = match cmd {
"/help" => "Available commands:\n\
/help — show this message\n\
/model <id> — switch the active model\n\
/mode <code|architect|ask> — switch session mode\n\
/clear — clear session history\n\
/compact — summarize and compact context\n\
/review [path] — review recent changes (read-only)"
.to_owned(),
"/model" => self.handle_model_command(session_id, arg)?,
"/review" => {
return self.handle_review_command(session_id, arg);
}
"/mode" => {
let valid_ids: &[&str] = &["code", "architect", "ask"];
if !valid_ids.contains(&arg) {
return Err(acp::Error::invalid_request().data(format!("unknown mode: {arg}")));
}
{
let sessions = self.sessions.borrow();
let entry = sessions
.get(session_id)
.ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
*entry.current_mode.borrow_mut() = acp::SessionModeId::new(arg);
}
let update = acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate::new(
acp::SessionModeId::new(arg),
));
let notification = acp::SessionNotification::new(session_id.clone(), update);
if let Err(e) = self.send_notification(notification).await {
tracing::warn!(error = %e, "failed to send current_mode_update from /mode");
}
format!("Switched to mode: {arg}")
}
"/clear" => {
if let Some(ref store) = self.store {
let sid = session_id.to_string();
let store = store.clone();
tokio::task::spawn_local(async move {
if let Err(e) = store.delete_acp_session(&sid).await {
tracing::warn!(error = %e, "failed to clear session history");
}
if let Err(e) = store.create_acp_session(&sid).await {
tracing::warn!(error = %e, "failed to recreate session after clear");
}
});
}
let sessions = self.sessions.borrow();
if let Some(entry) = sessions.get(session_id) {
let _ = entry.input_tx.try_send(ChannelMessage {
text: "/clear".to_owned(),
attachments: vec![],
});
}
"Session history cleared.".to_owned()
}
_ => {
return Err(acp::Error::invalid_request().data(format!("unknown command: {cmd}")));
}
};
let update =
acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(reply.clone().into()));
let notification = acp::SessionNotification::new(session_id.clone(), update);
if let Err(e) = self.send_notification(notification).await {
tracing::warn!(error = %e, "failed to send command reply");
}
Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
}
fn handle_review_command(
&self,
session_id: &acp::SessionId,
arg: &str,
) -> acp::Result<acp::PromptResponse> {
if !arg.is_empty() {
let valid = arg
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '/' | ' ' | '-'));
if !valid || arg.len() > 512 {
return Err(acp::Error::invalid_request()
.data("invalid path argument: only alphanumeric, _, ., /, space, - allowed (max 512 chars)"));
}
}
let review_prompt = if arg.is_empty() {
"Review the recent changes in this workspace. Show a plain-text diff summary. \
Use only read_file and list_directory tools. Do not execute any commands or \
write any files."
.to_owned()
} else {
format!(
"Review the following file or path: {arg}. Show a plain-text diff summary. \
Use only read_file and list_directory tools. Do not execute any commands or \
write any files."
)
};
let sessions = self.sessions.borrow();
let entry = sessions
.get(session_id)
.ok_or_else(|| acp::Error::invalid_request().data("session not found"))?;
if entry
.input_tx
.try_send(ChannelMessage {
text: review_prompt,
attachments: vec![],
})
.is_err()
{
tracing::warn!(%session_id, "failed to forward /review to agent input");
}
drop(sessions);
Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
}
fn resolve_model_fuzzy(&self, query: &str) -> acp::Result<String> {
let available_models = self.available_models_snapshot();
if available_models.iter().any(|m| m == query) {
return Ok(query.to_owned());
}
let tokens: Vec<String> = query
.to_lowercase()
.split_whitespace()
.map(String::from)
.collect();
let candidates: Vec<&String> = available_models
.iter()
.filter(|m| {
let lower = m.to_lowercase();
tokens.iter().all(|t| lower.contains(t.as_str()))
})
.collect();
match candidates.len() {
0 => {
let models = available_models.join(", ");
Err(acp::Error::invalid_request()
.data(format!("no matching model found. Available: {models}")))
}
1 => Ok(candidates[0].clone()),
_ => {
let names: Vec<&str> = candidates.iter().map(|s| s.as_str()).collect();
Err(acp::Error::invalid_request()
.data(format!("ambiguous model, candidates: {}", names.join(", "))))
}
}
}
fn handle_model_command(&self, session_id: &acp::SessionId, arg: &str) -> acp::Result<String> {
let available_models = self.available_models_snapshot();
if arg.is_empty() {
let models = available_models.join(", ");
return Ok(format!("Available models: {models}"));
}
let Some(ref factory) = self.provider_factory else {
return Err(acp::Error::internal_error().data("model switching not configured"));
};
let resolved = self.resolve_model_fuzzy(arg)?;
let Some(new_provider) = factory(&resolved) else {
return Err(acp::Error::invalid_request().data("unknown model"));
};
let sessions = self.sessions.borrow();
let entry = sessions
.get(session_id)
.ok_or_else(|| acp::Error::internal_error().data("session not found"))?;
*entry
.provider_override
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(new_provider);
resolved.clone_into(&mut entry.current_model.borrow_mut());
Ok(format!("Switched to model: {resolved}"))
}
async fn collect_prompt_content(
&self,
blocks: &[acp::ContentBlock],
session_cwd: &std::path::Path,
) -> acp::Result<(String, Vec<zeph_core::channel::Attachment>)> {
let mut text = String::new();
let mut attachments = Vec::new();
for block in blocks {
match block {
acp::ContentBlock::Text(t) => {
if !text.is_empty() {
text.push('\n');
}
text.push_str(&t.text);
}
acp::ContentBlock::Image(img) => {
if !SUPPORTED_IMAGE_MIMES.contains(&img.mime_type.as_str()) {
tracing::debug!(mime_type = %img.mime_type, "unsupported image MIME type in ACP prompt, skipping");
} else if img.data.len() > MAX_IMAGE_BASE64_BYTES {
tracing::warn!(
size = img.data.len(),
max = MAX_IMAGE_BASE64_BYTES,
"image base64 data exceeds size limit, skipping"
);
} else {
use base64::Engine as _;
match base64::engine::general_purpose::STANDARD.decode(&img.data) {
Ok(bytes) => {
attachments.push(zeph_core::channel::Attachment {
kind: zeph_core::channel::AttachmentKind::Image,
data: bytes,
filename: Some(format!(
"image.{}",
mime_to_ext(&img.mime_type)
)),
});
}
Err(e) => {
tracing::debug!(error = %e, "failed to decode image base64, skipping");
}
}
}
}
acp::ContentBlock::Resource(embedded) => {
if let acp::EmbeddedResourceResource::TextResourceContents(res) =
&embedded.resource
{
if !text.is_empty() {
text.push('\n');
}
if res
.mime_type
.as_deref()
.is_some_and(|m| m == DIAGNOSTICS_MIME_TYPE)
{
format_diagnostics_block(&res.text, &mut text);
} else if res.mime_type.is_some()
&& res.mime_type.as_deref() != Some("text/plain")
{
tracing::debug!(mime_type = ?res.mime_type, uri = %res.uri, "unknown resource mime type — skipping");
} else {
text.push_str("<resource name=\"");
text.push_str(&res.uri.replace('"', """));
text.push_str("\">");
text.push_str(&res.text);
text.push_str("</resource>");
}
}
}
acp::ContentBlock::Audio(_) => {
tracing::warn!("unsupported content block: Audio — skipping");
}
acp::ContentBlock::ResourceLink(link) => {
match resolve_resource_link(link, session_cwd).await {
Ok(content) => {
let escaped_uri = xml_escape(&link.uri);
let escaped_content = xml_escape(&content);
if !text.is_empty() {
text.push('\n');
}
text.push_str("<resource uri=\"");
text.push_str(&escaped_uri);
text.push_str("\">");
text.push_str(&escaped_content);
text.push_str("</resource>");
}
Err(e) => {
tracing::warn!(uri = %link.uri, error = %e, "ResourceLink resolution failed — skipping");
}
}
}
&_ => {
tracing::warn!("unsupported content block: unknown — skipping");
}
}
}
if text.len() > MAX_PROMPT_BYTES {
return Err(acp::Error::invalid_request().data("prompt too large"));
}
Ok((text, attachments))
}
async fn drain_agent_events(
&self,
session_id: &acp::SessionId,
output_rx: tokio::sync::mpsc::Receiver<LoopbackEvent>,
cancel_signal: Option<std::sync::Arc<tokio::sync::Notify>>,
) -> (
bool,
Option<StopHint>,
tokio::sync::mpsc::Receiver<LoopbackEvent>,
) {
let mut rx = output_rx;
let mut cancelled = false;
let mut stop_hint: Option<StopHint> = None;
loop {
let event = if let Some(ref signal) = cancel_signal {
tokio::select! {
biased;
() = signal.notified() => { cancelled = true; break; }
ev = rx.recv() => ev,
}
} else {
rx.recv().await
};
let Some(event) = event else { break };
if let LoopbackEvent::Stop(hint) = event {
stop_hint = Some(hint);
continue;
}
let is_flush = matches!(event, LoopbackEvent::Flush);
let pending_terminal_release = if let LoopbackEvent::ToolOutput(ref data) = event {
data.terminal_id.clone()
} else {
None
};
for update in loopback_event_to_updates(event) {
if let Some(ref store) = self.store {
let sid = session_id.to_string();
let (event_type, payload) = session_update_to_event(&update);
let store = store.clone();
tokio::task::spawn_local(async move {
if let Err(e) = store.save_acp_event(&sid, event_type, &payload).await {
tracing::warn!(error = %e, "failed to persist session event");
}
});
}
let notification = acp::SessionNotification::new(session_id.clone(), update);
if let Err(e) = self.send_notification(notification).await {
tracing::warn!(error = %e, "failed to send notification");
break;
}
}
if let Some(terminal_id) = pending_terminal_release
&& let Some(entry) = self.sessions.borrow().get(session_id)
&& let Some(ref executor) = entry.shell_executor
{
executor.release_terminal(terminal_id);
}
if is_flush {
break;
}
}
(cancelled, stop_hint, rx)
}
async fn fork_conversation(
&self,
source_id: &acp::SessionId,
new_id: &acp::SessionId,
) -> acp::Result<Option<ConversationId>> {
let Some(s) = &self.store else {
return Ok(None);
};
let source_events = s
.load_acp_events(&source_id.to_string())
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to load ACP session events for fork");
acp::Error::internal_error().data("internal error")
})?;
let new_id_str = new_id.to_string();
let pairs: Vec<(&str, &str)> = source_events
.iter()
.map(|ev| (ev.event_type.as_str(), ev.payload.as_str()))
.collect();
match s.create_conversation().await {
Ok(forked_cid) => {
let forked_from_cid = s
.get_acp_session_conversation_id(&source_id.to_string())
.await
.unwrap_or(None);
if let Err(e) = s
.create_acp_session_with_conversation(&new_id_str, forked_cid)
.await
{
tracing::warn!(error = %e, "failed to persist forked ACP session mapping");
}
if let Err(e) = s.import_acp_events(&new_id_str, &pairs).await {
tracing::warn!(error = %e, "failed to import events for forked session");
}
if let Some(src_cid) = forked_from_cid
&& let Err(e) = s.copy_conversation(src_cid, forked_cid).await
{
tracing::warn!(error = %e, "failed to copy conversation for forked session");
}
Ok(Some(forked_cid))
}
Err(e) => {
tracing::warn!(error = %e, "failed to create conversation for forked session; history will not be copied");
if let Err(e2) = s.create_acp_session(&new_id_str).await {
tracing::warn!(error = %e2, "failed to persist forked ACP session");
}
if let Err(e2) = s.import_acp_events(&new_id_str, &pairs).await {
tracing::warn!(error = %e2, "failed to import events for forked session");
}
Ok(None)
}
}
}
fn maybe_generate_session_title(&self, session_id: &acp::SessionId, user_text: &str) {
let should_generate = self
.sessions
.borrow()
.get(session_id)
.is_some_and(|e| !e.first_prompt_done.get());
if !should_generate {
return;
}
if let Some(entry) = self.sessions.borrow().get(session_id) {
entry.first_prompt_done.set(true);
}
let current_model = self
.sessions
.borrow()
.get(session_id)
.map(|entry| entry.current_model.borrow().clone())
.unwrap_or_default();
if let Some(ref factory) = self.provider_factory
&& !current_model.is_empty()
&& let Some(provider) = factory(¤t_model)
{
let user_text = user_text.to_owned();
let sid = session_id.clone();
let store = self.store.clone();
let notify_tx = self.notify_tx.clone();
let title_max_chars = self.title_max_chars;
let sessions_for_title = Rc::clone(&self.sessions);
tokio::task::spawn_local(async move {
let prompt = format!(
"Generate a concise 5-7 word title for a conversation that starts \
with: {user_text}\nRespond with only the title, no quotes."
);
let messages = vec![zeph_llm::provider::Message::from_legacy(
zeph_llm::provider::Role::User,
&prompt,
)];
let sid_prefix = &sid.to_string()[..8.min(sid.to_string().len())];
let fallback_title = format!("Session {sid_prefix}");
let title = match tokio::time::timeout(
std::time::Duration::from_secs(15),
provider.chat(&messages),
)
.await
{
Ok(Ok(t)) => truncate_to_chars(t.trim(), title_max_chars),
Ok(Err(e)) => {
tracing::debug!(error = %e, "title generation LLM call failed");
fallback_title
}
Err(_) => {
tracing::debug!("title generation timed out");
fallback_title
}
};
if let Some(ref store) = store {
let _ = store.update_session_title(&sid.to_string(), &title).await;
}
if let Some(e) = sessions_for_title.borrow().get(&sid) {
*e.title.borrow_mut() = Some(title.clone());
}
let update = acp::SessionUpdate::SessionInfoUpdate(
acp::SessionInfoUpdate::new().title(title),
);
let notification = acp::SessionNotification::new(sid, update);
let (tx, _rx) = oneshot::channel();
notify_tx.send((notification, tx)).ok();
});
}
}
fn make_session_entry(
handle: LoopbackHandle,
initial_model: String,
cwd: PathBuf,
shell_executor: Option<AcpShellExecutor>,
provider_override: Arc<std::sync::RwLock<Option<AnyProvider>>>,
) -> SessionEntry {
SessionEntry {
input_tx: handle.input_tx,
output_rx: RefCell::new(Some(handle.output_rx)),
cancel_signal: handle.cancel_signal,
last_active: std::cell::Cell::new(std::time::Instant::now()),
created_at: chrono::Utc::now(),
working_dir: RefCell::new(Some(cwd)),
provider_override,
current_model: RefCell::new(initial_model),
current_mode: RefCell::new(acp::SessionModeId::new(DEFAULT_MODE_ID)),
first_prompt_done: std::cell::Cell::new(false),
title: RefCell::new(None),
thinking_enabled: std::cell::Cell::new(false),
auto_approve_level: RefCell::new("suggest".to_owned()),
shell_executor,
}
}
async fn replay_session_events(
&self,
session_id: &acp::SessionId,
events: Vec<zeph_memory::store::AcpSessionEvent>,
) {
for ev in events {
let update = match ev.event_type.as_str() {
"user_message" => {
acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(ev.payload.into()))
}
"agent_message" => {
acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(ev.payload.into()))
}
"agent_thought" => {
acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(ev.payload.into()))
}
"tool_call" => match serde_json::from_str::<acp::ToolCall>(&ev.payload) {
Ok(tc) => acp::SessionUpdate::ToolCall(tc),
Err(e) => {
tracing::warn!(error = %e, "failed to deserialize tool call event during replay");
continue;
}
},
other => {
tracing::debug!(
event_type = other,
"skipping unknown event type during replay"
);
continue;
}
};
let notification = acp::SessionNotification::new(session_id.clone(), update);
if let Err(e) = self.send_notification(notification).await {
tracing::warn!(error = %e, "failed to replay notification");
break;
}
}
}
async fn create_session_conversation(
&self,
session_id: &acp::SessionId,
) -> Option<ConversationId> {
let store = self.store.as_ref()?;
let sid = session_id.to_string();
match store.create_conversation().await {
Ok(cid) => {
if let Err(e) = store.create_acp_session_with_conversation(&sid, cid).await {
tracing::warn!(error = %e, "failed to persist ACP session mapping; history may not survive restart");
}
Some(cid)
}
Err(e) => {
tracing::warn!(error = %e, "failed to create conversation for ACP session; session will have no persistent history");
if let Err(e2) = store.create_acp_session(&sid).await {
tracing::warn!(error = %e2, "failed to persist ACP session");
}
None
}
}
}
fn send_commands_update_nowait(&self, session_id: acp::SessionId) {
let cmds_update = acp::SessionUpdate::AvailableCommandsUpdate(
acp::AvailableCommandsUpdate::new(build_available_commands()),
);
let (tx, _rx) = oneshot::channel();
self.notify_tx
.send((acp::SessionNotification::new(session_id, cmds_update), tx))
.ok();
}
async fn ext_method_mcp(&self, args: &acp::ExtRequest) -> acp::Result<acp::ExtResponse> {
let method = args.method.as_ref();
match method {
"_agent/mcp/list" => {
let Some(ref manager) = self.mcp_manager else {
return Err(acp::Error::internal_error().data("MCP manager not configured"));
};
let servers = manager.list_servers().await;
let json = serde_json::to_string(&servers).map_err(|e| {
tracing::error!(error = %e, "failed to serialize MCP server list");
acp::Error::internal_error().data("internal error")
})?;
let raw: Box<serde_json::value::RawValue> =
serde_json::value::RawValue::from_string(json).map_err(|e| {
tracing::error!(error = %e, "failed to build MCP list response");
acp::Error::internal_error().data("internal error")
})?;
Ok(acp::ExtResponse::new(raw.into()))
}
"_agent/mcp/add" => {
let Some(ref manager) = self.mcp_manager else {
return Err(acp::Error::internal_error().data("MCP manager not configured"));
};
let entry: ServerEntry = serde_json::from_str(args.params.get())
.map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
let tools = manager.add_server(&entry).await.map_err(|e| {
tracing::error!(error = %e, "failed to add MCP server");
acp::Error::internal_error().data("internal error")
})?;
let json = serde_json::json!({ "added": entry.id, "tools": tools.len() });
let raw =
serde_json::value::RawValue::from_string(json.to_string()).map_err(|e| {
tracing::error!(error = %e, "failed to build MCP add response");
acp::Error::internal_error().data("internal error")
})?;
Ok(acp::ExtResponse::new(raw.into()))
}
"_agent/mcp/remove" => {
let Some(ref manager) = self.mcp_manager else {
return Err(acp::Error::internal_error().data("MCP manager not configured"));
};
let params: McpRemoveParams = serde_json::from_str(args.params.get())
.map_err(|e| acp::Error::invalid_request().data(e.to_string()))?;
manager.remove_server(¶ms.id).await.map_err(|e| {
tracing::error!(error = %e, "failed to remove MCP server");
acp::Error::internal_error().data("internal error")
})?;
let raw = serde_json::value::RawValue::from_string(
serde_json::json!({ "removed": params.id }).to_string(),
)
.map_err(|e| {
tracing::error!(error = %e, "failed to build MCP remove response");
acp::Error::internal_error().data("internal error")
})?;
Ok(acp::ExtResponse::new(raw.into()))
}
_ => Ok(acp::ExtResponse::new(
serde_json::value::RawValue::NULL.to_owned().into(),
)),
}
}
}
pub(super) mod helpers;
use helpers::{
DEFAULT_MODE_ID, DIAGNOSTICS_MIME_TYPE, build_available_commands, build_config_options,
build_mode_state, format_diagnostics_block, loopback_event_to_updates, mime_to_ext, model_meta,
session_update_to_event, xml_escape,
};
#[cfg(test)]
mod tests;