#![cfg(feature = "acp-http")]
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use zeph_memory::store::AcpSessionInfo;
use crate::transport::http::AcpHttpState;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionStatus {
Running,
Idle,
Stopped,
Error,
}
#[derive(Debug, Serialize)]
pub struct SessionInfo {
pub id: String,
pub title: Option<String>,
pub created_at: String,
pub updated_at: String,
pub message_count: i64,
pub status: SessionStatus,
pub working_dir: Option<PathBuf>,
}
impl SessionInfo {
fn from_store(
info: AcpSessionInfo,
status: SessionStatus,
working_dir: Option<PathBuf>,
) -> Self {
Self {
id: info.id,
title: info.title,
created_at: info.created_at,
updated_at: info.updated_at,
message_count: info.message_count,
status,
working_dir,
}
}
}
#[derive(Debug, Deserialize)]
pub struct CreateSessionRequest {
pub working_dir: Option<PathBuf>,
pub model: Option<String>,
pub mode: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct UpdateSessionRequest {
pub title: Option<String>,
}
fn resolve_status(state: &AcpHttpState, id: &str) -> SessionStatus {
if state.connections.contains_key(id) {
SessionStatus::Running
} else {
SessionStatus::Idle
}
}
async fn validate_working_dir(
state: &AcpHttpState,
working_dir: Option<&PathBuf>,
) -> Result<(), StatusCode> {
let Some(dir) = working_dir else {
return Ok(());
};
let allowlist = &state.server_config.additional_directories;
if allowlist.is_empty() {
tracing::warn!(
path = %dir.display(),
"POST /sessions rejected: working_dir provided but additional_directories allowlist is empty"
);
return Err(StatusCode::FORBIDDEN);
}
let canonical = tokio::fs::canonicalize(dir).await.map_err(|e| {
tracing::warn!(path = %dir.display(), error = %e, "cannot canonicalize working_dir");
StatusCode::FORBIDDEN
})?;
let allowed = allowlist
.iter()
.any(|entry| canonical.starts_with(entry.as_path()));
if !allowed {
tracing::warn!(
path = %canonical.display(),
"POST /sessions rejected: working_dir not in additional_directories allowlist"
);
return Err(StatusCode::FORBIDDEN);
}
Ok(())
}
pub async fn create_session_handler(
State(state): State<AcpHttpState>,
Json(req): Json<CreateSessionRequest>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(std::sync::atomic::Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
validate_working_dir(&state, req.working_dir.as_ref()).await?;
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let session_id = uuid::Uuid::new_v4().to_string();
store.create_acp_session(&session_id).await.map_err(|e| {
tracing::warn!(error = %e, "failed to create ACP session");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let info = store.get_acp_session_info(&session_id).await.map_err(|e| {
tracing::warn!(error = %e, "failed to retrieve created ACP session");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let Some(info) = info else {
tracing::warn!(session_id, "created ACP session not found immediately");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};
let body = SessionInfo::from_store(info, SessionStatus::Idle, req.working_dir);
Ok((StatusCode::CREATED, Json(body)))
}
pub async fn get_session_handler(
State(state): State<AcpHttpState>,
Path(session_id): Path<String>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(std::sync::atomic::Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
uuid::Uuid::parse_str(&session_id).map_err(|_| StatusCode::BAD_REQUEST)?;
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let info = store
.get_acp_session_info(&session_id)
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to get ACP session info");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let status = resolve_status(&state, &session_id);
Ok(Json(SessionInfo::from_store(info, status, None)))
}
pub async fn update_session_handler(
State(state): State<AcpHttpState>,
Path(session_id): Path<String>,
Json(req): Json<UpdateSessionRequest>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(std::sync::atomic::Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
uuid::Uuid::parse_str(&session_id).map_err(|_| StatusCode::BAD_REQUEST)?;
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
if let Some(ref title) = req.title
&& title.chars().count() > state.server_config.title_max_chars
{
return Err(StatusCode::BAD_REQUEST);
}
let found = if let Some(title) = req.title {
store
.update_session_title_checked(&session_id, &title)
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to update ACP session title");
StatusCode::INTERNAL_SERVER_ERROR
})?
} else {
store.acp_session_exists(&session_id).await.map_err(|e| {
tracing::warn!(error = %e, "failed to check ACP session existence");
StatusCode::INTERNAL_SERVER_ERROR
})?
};
if !found {
return Err(StatusCode::NOT_FOUND);
}
let info = store
.get_acp_session_info(&session_id)
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to fetch updated ACP session info");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let status = resolve_status(&state, &session_id);
Ok(Json(SessionInfo::from_store(info, status, None)))
}
pub async fn delete_session_handler(
State(state): State<AcpHttpState>,
Path(session_id): Path<String>,
) -> Result<impl IntoResponse, StatusCode> {
if !state.ready.load(std::sync::atomic::Ordering::Acquire) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
uuid::Uuid::parse_str(&session_id).map_err(|_| StatusCode::BAD_REQUEST)?;
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let deleted = store
.delete_acp_session_checked(&session_id)
.await
.map_err(|e| {
tracing::warn!(error = %e, "failed to delete ACP session");
StatusCode::INTERNAL_SERVER_ERROR
})?;
if !deleted {
return Err(StatusCode::NOT_FOUND);
}
state.connections.remove(&session_id);
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use axum::Router;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use axum::routing::{delete, get, patch, post};
use tokio::sync::{Mutex, broadcast};
use tower::ServiceExt as _;
use zeph_core::channel::LoopbackChannel;
use super::*;
use crate::agent::{AcpContext, SendAgentSpawner, SessionContext};
use crate::transport::http::{AcpHttpState, ConnectionHandle};
use crate::transport::{AcpServerConfig, SharedAvailableModels};
fn shared_models() -> SharedAvailableModels {
Arc::new(parking_lot::RwLock::new(vec![]))
}
fn noop_spawner() -> SendAgentSpawner {
Arc::new(
|_ch: LoopbackChannel, _ctx: Option<AcpContext>, _sess: SessionContext| {
Box::pin(async {})
as Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>
},
)
}
fn base_config() -> AcpServerConfig {
AcpServerConfig {
agent_name: "test".into(),
agent_version: "0.0.1".into(),
max_sessions: 4,
session_idle_timeout_secs: 1800,
available_models: shared_models(),
..AcpServerConfig::default()
}
}
fn build_router(state: AcpHttpState) -> Router {
Router::new()
.route("/sessions", post(create_session_handler))
.route("/sessions/{id}", get(get_session_handler))
.route("/sessions/{id}", patch(update_session_handler))
.route("/sessions/{id}", delete(delete_session_handler))
.with_state(state)
}
fn state_no_store() -> AcpHttpState {
AcpHttpState::new(noop_spawner(), base_config()).with_ready(true)
}
#[tokio::test]
async fn create_session_returns_503_without_store() {
let app = build_router(state_no_store());
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/sessions")
.header("content-type", "application/json")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn get_session_returns_503_without_store() {
let app = build_router(state_no_store());
let id = uuid::Uuid::new_v4();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri(format!("/sessions/{id}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn get_session_rejects_invalid_uuid() {
let app = build_router(state_no_store());
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/sessions/not-a-uuid")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn patch_session_rejects_invalid_uuid() {
let app = build_router(state_no_store());
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri("/sessions/not-a-uuid")
.header("content-type", "application/json")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn delete_session_rejects_invalid_uuid() {
let app = build_router(state_no_store());
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/sessions/not-a-uuid")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn create_session_rejects_working_dir_when_allowlist_empty() {
let app = build_router(state_no_store());
let body = serde_json::json!({ "working_dir": "/tmp/test" });
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/sessions")
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn create_session_accepts_request_without_working_dir_when_allowlist_empty() {
let app = build_router(state_no_store());
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/sessions")
.header("content-type", "application/json")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn session_status_running_when_connection_exists() {
let state = state_no_store();
let (_, writer) = tokio::io::duplex(64);
let (tx, _) = broadcast::channel(4);
let session_id = uuid::Uuid::new_v4().to_string();
let handle = Arc::new(ConnectionHandle {
writer: Arc::new(Mutex::new(writer)),
output_tx: tx,
last_activity: AtomicU64::new(0),
idle_timeout_secs: 1800,
});
state.connections.insert(session_id.clone(), handle);
assert_eq!(resolve_status(&state, &session_id), SessionStatus::Running);
}
#[tokio::test]
async fn session_status_idle_when_no_connection() {
let state = state_no_store();
let session_id = uuid::Uuid::new_v4().to_string();
assert_eq!(resolve_status(&state, &session_id), SessionStatus::Idle);
}
async fn state_with_store() -> AcpHttpState {
let store = zeph_memory::store::SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new");
AcpHttpState::new(noop_spawner(), base_config())
.with_store(store)
.with_ready(true)
}
async fn state_with_store_and_limit(title_max_chars: usize) -> AcpHttpState {
let store = zeph_memory::store::SqliteStore::new(":memory:")
.await
.expect("SqliteStore::new");
let mut cfg = base_config();
cfg.title_max_chars = title_max_chars;
AcpHttpState::new(noop_spawner(), cfg)
.with_store(store)
.with_ready(true)
}
#[tokio::test]
async fn patch_title_over_limit_returns_400() {
let limit = 10;
let state = state_with_store_and_limit(limit).await;
let app = build_router(state);
let id = uuid::Uuid::new_v4();
let title = "a".repeat(limit + 1);
let body = serde_json::json!({ "title": title });
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri(format!("/sessions/{id}"))
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn patch_title_exactly_at_limit_returns_404_for_missing_session() {
let limit = 10;
let state = state_with_store_and_limit(limit).await;
let app = build_router(state);
let id = uuid::Uuid::new_v4();
let title = "a".repeat(limit); let body = serde_json::json!({ "title": title });
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri(format!("/sessions/{id}"))
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn delete_nonexistent_session_returns_404() {
let state = state_with_store().await;
let app = build_router(state);
let id = uuid::Uuid::new_v4();
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri(format!("/sessions/{id}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn patch_nonexistent_session_returns_404() {
let state = state_with_store().await;
let app = build_router(state);
let id = uuid::Uuid::new_v4();
let body = serde_json::json!({ "title": "new name" });
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri(format!("/sessions/{id}"))
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn patch_existing_session_returns_200_with_updated_title() {
let state = state_with_store().await;
let session_id = uuid::Uuid::new_v4().to_string();
state
.store
.as_ref()
.unwrap()
.create_acp_session(&session_id)
.await
.unwrap();
let app = build_router(state);
let body = serde_json::json!({ "title": "renamed" });
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri(format!("/sessions/{session_id}"))
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = axum::body::to_bytes(resp.into_body(), 4096).await.unwrap();
let info: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(info["title"], "renamed");
}
#[tokio::test]
async fn delete_existing_session_returns_204() {
let state = state_with_store().await;
let session_id = uuid::Uuid::new_v4().to_string();
state
.store
.as_ref()
.unwrap()
.create_acp_session(&session_id)
.await
.unwrap();
let app = build_router(state);
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri(format!("/sessions/{session_id}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
}