use std::sync::Arc;
use agent_client_protocol as acp;
use futures::{AsyncWrite, AsyncWriteExt as _};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use zeph_memory::store::SqliteStore;
use crate::agent::{AgentSpawner, ZephAcpAgentState, run_agent};
use crate::error::AcpError;
use crate::transport::{AcpServerConfig, ReadyNotification};
async fn write_ready_notification<W>(
writer: &mut W,
ready: &ReadyNotification,
) -> Result<(), AcpError>
where
W: AsyncWrite + Unpin,
{
let mut payload = serde_json::Map::new();
payload.insert(
"version".into(),
serde_json::Value::String(ready.version.clone()),
);
payload.insert("pid".into(), serde_json::Value::from(ready.pid));
if let Some(log_file) = &ready.log_file {
payload.insert(
"log_file".into(),
serde_json::Value::String(log_file.clone()),
);
}
let frame = serde_json::json!({
"jsonrpc": "2.0",
"method": "zeph/ready",
"params": payload,
});
let line = serde_json::to_string(&frame).map_err(|e| AcpError::Transport(e.to_string()))?;
writer
.write_all(line.as_bytes())
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
writer
.write_all(b"\n")
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
writer
.flush()
.await
.map_err(|e| AcpError::Transport(e.to_string()))
}
pub(crate) async fn build_agent_state(
spawner: AgentSpawner,
server_config: AcpServerConfig,
) -> Arc<ZephAcpAgentState> {
let mut agent = ZephAcpAgentState::new(
spawner,
server_config.max_sessions,
server_config.session_idle_timeout_secs,
server_config.permission_file,
)
.with_agent_info(server_config.agent_name, server_config.agent_version)
.with_title_max_chars(server_config.title_max_chars)
.with_max_history(server_config.max_history);
if let Some(ref path) = server_config.sqlite_path {
match SqliteStore::new(path).await {
Ok(store) => agent = agent.with_store(store),
Err(e) => tracing::warn!(error = %e, "failed to open ACP SQLite store"),
}
}
if let Some(factory) = server_config.provider_factory {
agent = agent.with_provider_factory(factory, server_config.available_models);
}
if let Some(manager) = server_config.mcp_manager {
agent = agent.with_mcp_manager(manager);
}
if !server_config.project_rules.is_empty() {
agent = agent.with_project_rules(server_config.project_rules);
}
if !server_config.additional_directories.is_empty() {
agent = agent.with_additional_directories(server_config.additional_directories);
}
if !server_config.auth_methods.is_empty() {
agent = agent.with_auth_methods(server_config.auth_methods);
}
agent = agent.with_message_ids_enabled(server_config.message_ids_enabled);
let state = Arc::new(agent);
state.start_idle_reaper();
state
}
pub async fn serve_stdio(
spawner: AgentSpawner,
server_config: AcpServerConfig,
) -> Result<(), AcpError> {
let mut stdout = tokio::io::stdout().compat_write();
if let Some(ready) = server_config.ready_notification.as_ref() {
write_ready_notification(&mut stdout, ready).await?;
tracing::info!(
transport = "stdio",
pid = ready.pid,
version = %ready.version,
log_file = ready.log_file.as_deref().unwrap_or("<disabled>"),
"ACP server ready"
);
}
let state = build_agent_state(spawner, server_config).await;
tokio::task::LocalSet::new()
.run_until(run_agent(
state,
acp::ByteStreams::new(stdout, tokio::io::stdin().compat()),
))
.await
.map_err(|e| AcpError::Transport(e.to_string()))
}
pub async fn serve_connection<W, R>(
spawner: AgentSpawner,
server_config: AcpServerConfig,
writer: W,
reader: R,
) -> Result<(), AcpError>
where
W: futures::AsyncWrite + Unpin + Send + 'static,
R: futures::AsyncRead + Unpin + Send + 'static,
{
let state = build_agent_state(spawner, server_config).await;
tokio::task::LocalSet::new()
.run_until(run_agent(state, acp::ByteStreams::new(writer, reader)))
.await
.map_err(|e| AcpError::Transport(e.to_string()))
}