#[cfg(feature = "acp-http")]
use std::sync::Arc;
#[cfg(feature = "acp-http")]
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
#[cfg(feature = "acp-http")]
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "acp-http")]
use axum::extract::State;
#[cfg(feature = "acp-http")]
use axum::http::{HeaderMap, StatusCode};
#[cfg(feature = "acp-http")]
use axum::response::IntoResponse;
#[cfg(feature = "acp-http")]
use axum::response::sse::{Event, KeepAlive, Sse};
#[cfg(feature = "acp-http")]
use dashmap::DashMap;
#[cfg(feature = "acp-http")]
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, DuplexStream};
#[cfg(feature = "acp-http")]
use tokio::sync::{Mutex, broadcast};
#[cfg(feature = "acp-http")]
use axum::Json;
#[cfg(feature = "acp-http")]
use axum::extract::Path;
#[cfg(feature = "acp-http")]
use serde::Serialize;
#[cfg(feature = "acp-http")]
use zeph_memory::store::{AcpSessionInfo, SqliteStore};
#[cfg(feature = "acp-http")]
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
#[cfg(feature = "acp-http")]
use crate::agent::SendAgentSpawner;
#[cfg(feature = "acp-http")]
use crate::transport::AcpServerConfig;
#[cfg(feature = "acp-http")]
const BRIDGE_BUFFER_SIZE: usize = 64 * 1024;
#[cfg(feature = "acp-http")]
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
#[cfg(feature = "acp-http")]
pub(crate) struct ConnectionHandle {
pub(crate) writer: Arc<Mutex<DuplexStream>>,
pub(crate) output_tx: broadcast::Sender<String>,
pub(crate) last_activity: AtomicU64,
pub(crate) idle_timeout_secs: u64,
}
#[cfg(feature = "acp-http")]
impl ConnectionHandle {
fn is_expired(&self) -> bool {
let last = self.last_activity.load(Ordering::Relaxed);
now_secs().saturating_sub(last) > self.idle_timeout_secs
}
fn touch(&self) {
self.last_activity.store(now_secs(), Ordering::Relaxed);
}
}
#[cfg(feature = "acp-http")]
#[derive(Serialize)]
pub struct SessionSummary {
pub id: String,
pub title: Option<String>,
pub created_at: String,
pub updated_at: String,
pub message_count: i64,
}
#[cfg(feature = "acp-http")]
impl From<AcpSessionInfo> for SessionSummary {
fn from(info: AcpSessionInfo) -> Self {
Self {
id: info.id,
title: info.title,
created_at: info.created_at,
updated_at: info.updated_at,
message_count: info.message_count,
}
}
}
#[cfg(feature = "acp-http")]
#[derive(Serialize)]
pub struct SessionEventDto {
pub event_type: String,
pub payload: String,
pub created_at: String,
}
#[cfg(feature = "acp-http")]
#[derive(Serialize)]
pub struct HealthStatus {
pub status: &'static str,
pub version: String,
pub uptime_secs: u64,
}
#[cfg(feature = "acp-http")]
#[derive(Clone)]
pub struct AcpHttpState {
pub(crate) connections: Arc<DashMap<String, Arc<ConnectionHandle>>>,
pub spawner: SendAgentSpawner,
pub server_config: Arc<AcpServerConfig>,
pub(crate) active_ws: Arc<AtomicUsize>,
pub store: Option<Arc<SqliteStore>>,
pub(crate) started_at: Instant,
pub(crate) ready: Arc<AtomicBool>,
}
#[cfg(feature = "acp-http")]
impl AcpHttpState {
pub fn new(spawner: SendAgentSpawner, server_config: AcpServerConfig) -> Self {
Self {
connections: Arc::new(DashMap::new()),
spawner,
server_config: Arc::new(server_config),
active_ws: Arc::new(AtomicUsize::new(0)),
store: None,
started_at: Instant::now(),
ready: Arc::new(AtomicBool::new(false)),
}
}
#[must_use]
pub fn with_store(mut self, store: SqliteStore) -> Self {
self.store = Some(Arc::new(store));
self
}
#[must_use]
pub fn with_ready(self, ready: bool) -> Self {
self.ready.store(ready, Ordering::Release);
self
}
pub fn mark_ready(&self) {
self.ready.store(true, Ordering::Release);
}
pub(crate) fn try_reserve_ws_slot(&self) -> bool {
let max = self.server_config.max_sessions;
let mut current = self.active_ws.load(Ordering::Relaxed);
loop {
if current >= max {
return false;
}
match self.active_ws.compare_exchange_weak(
current,
current + 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}
pub(crate) fn release_ws_slot(&self) {
self.active_ws.fetch_sub(1, Ordering::AcqRel);
}
pub(crate) fn remove_connection(&self, id: &str) {
self.connections.remove(id);
}
pub fn start_reaper(&self) {
let connections = Arc::clone(&self.connections);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_mins(1));
loop {
interval.tick().await;
connections.retain(|_, handle| !handle.is_expired());
}
});
}
}
#[cfg(feature = "acp-http")]
pub async fn health_handler(State(state): State<AcpHttpState>) -> impl IntoResponse {
let ready = state.ready.load(Ordering::Acquire);
let status = if ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let body = HealthStatus {
status: if ready { "ok" } else { "starting" },
version: state.server_config.agent_version.clone(),
uptime_secs: state.started_at.elapsed().as_secs(),
};
(status, Json(body))
}
#[cfg(feature = "acp-http")]
pub(crate) fn spawn_agent_connection(
spawner: crate::agent::SendAgentSpawner,
server_config: AcpServerConfig,
) -> (DuplexStream, DuplexStream) {
let (client_w, agent_r) = tokio::io::duplex(BRIDGE_BUFFER_SIZE);
let (agent_w, client_r) = tokio::io::duplex(BRIDGE_BUFFER_SIZE);
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio current-thread runtime for ACP agent");
let local = tokio::task::LocalSet::new();
rt.block_on(local.run_until(async move {
let writer = agent_w.compat_write();
let reader = agent_r.compat();
if let Err(e) =
crate::transport::stdio::serve_connection(spawner, server_config, writer, reader)
.await
{
tracing::error!("ACP agent connection error: {e}");
}
}));
});
(client_r, client_w)
}
#[cfg(feature = "acp-http")]
pub(crate) fn create_connection(
state: &AcpHttpState,
) -> Result<(String, Arc<ConnectionHandle>), StatusCode> {
if state.connections.len() >= state.server_config.max_sessions {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
let (reader, writer) =
spawn_agent_connection(state.spawner.clone(), (*state.server_config).clone());
let (tx, _) = broadcast::channel(256);
let tx2 = tx.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
let _ = tx2.send(line);
}
});
let session_id = uuid::Uuid::new_v4().to_string();
let handle = Arc::new(ConnectionHandle {
writer: Arc::new(Mutex::new(writer)),
output_tx: tx,
last_activity: AtomicU64::new(now_secs()),
idle_timeout_secs: state.server_config.session_idle_timeout_secs,
});
state
.connections
.insert(session_id.clone(), Arc::clone(&handle));
Ok((session_id, handle))
}
#[cfg(feature = "acp-http")]
pub async fn post_handler(
State(state): State<AcpHttpState>,
headers: HeaderMap,
body: String,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
let (session_id, handle) =
if let Some(id) = headers.get("acp-session-id").and_then(|v| v.to_str().ok()) {
uuid::Uuid::parse_str(id).map_err(|_| StatusCode::BAD_REQUEST)?;
let handle = state
.connections
.get(id)
.map(|r| Arc::clone(&*r))
.ok_or(StatusCode::NOT_FOUND)?;
(id.to_owned(), handle)
} else {
create_connection(&state)?
};
{
let mut w = handle.writer.lock().await;
w.write_all(body.as_bytes())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
w.write_all(b"\n")
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
handle.touch();
let mut rx = handle.output_tx.subscribe();
let stream = async_stream::stream! {
while let Ok(line) = rx.recv().await {
yield Ok::<_, std::convert::Infallible>(
Event::default().event("message").data(line)
);
}
};
let sse = Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
);
let mut response = sse.into_response();
response.headers_mut().insert(
"acp-session-id",
session_id
.parse()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
);
Ok(response)
}
#[cfg(feature = "acp-http")]
pub async fn get_handler(
State(state): State<AcpHttpState>,
headers: HeaderMap,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
let id = headers
.get("acp-session-id")
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::BAD_REQUEST)?;
uuid::Uuid::parse_str(id).map_err(|_| StatusCode::BAD_REQUEST)?;
let handle = state
.connections
.get(id)
.map(|r| Arc::clone(&*r))
.ok_or(StatusCode::NOT_FOUND)?;
let mut rx = handle.output_tx.subscribe();
let stream = async_stream::stream! {
while let Ok(line) = rx.recv().await {
yield Ok::<_, std::convert::Infallible>(
Event::default().event("message").data(line)
);
}
};
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
))
}
#[cfg(feature = "acp-http")]
pub async fn list_sessions_handler(
State(state): State<AcpHttpState>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let sessions = store
.list_acp_sessions(state.server_config.max_history)
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to list ACP sessions");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let summaries: Vec<SessionSummary> = sessions.into_iter().map(SessionSummary::from).collect();
Ok(Json(summaries))
}
#[cfg(feature = "acp-http")]
pub async fn session_messages_handler(
State(state): State<AcpHttpState>,
Path(session_id): Path<String>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
uuid::Uuid::parse_str(&session_id).map_err(|_| StatusCode::BAD_REQUEST)?;
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let exists = store.acp_session_exists(&session_id).await.map_err(|e| {
tracing::warn!(error = %e, "failed to check ACP session existence");
StatusCode::INTERNAL_SERVER_ERROR
})?;
if !exists {
return Err(StatusCode::NOT_FOUND);
}
let events = store.load_acp_events(&session_id).await.map_err(|e| {
tracing::warn!(error = %e, "failed to load ACP session events");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let dtos: Vec<SessionEventDto> = events
.into_iter()
.map(|e| SessionEventDto {
event_type: e.event_type,
payload: e.payload,
created_at: e.created_at,
})
.collect();
Ok(Json(dtos))
}