pub mod capabilities;
pub mod config;
pub mod error;
pub mod handler;
pub mod permission;
pub mod streamer;
pub mod transport;
#[cfg(test)]
pub(crate) mod test_helpers;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::info;
pub use capabilities::{AgentCapabilities, CapabilitiesBuilder};
pub use config::{AcpServerConfig, AcpServerConfigBuilder, TransportConfig};
pub use error::{AcpServerError, ErrorResponse};
pub use handler::AcpSessionHandler;
pub use permission::{PermissionBridge, PermissionOutcome};
pub use streamer::{ResponseStreamer, SessionNotification};
pub use transport::{HttpTransport, StdioTransport, Transport};
pub struct AcpServerHandle {
shutdown_token: CancellationToken,
join_handle: tokio::task::JoinHandle<Result<(), AcpServerError>>,
}
impl AcpServerHandle {
pub fn shutdown(&self) {
self.shutdown_token.cancel();
}
pub async fn wait(self) -> Result<(), AcpServerError> {
self.join_handle
.await
.map_err(|e| AcpServerError::Internal(format!("server task panicked: {e}")))?
}
}
pub struct AcpServer;
impl AcpServer {
pub async fn run(config: AcpServerConfig) -> Result<AcpServerHandle, AcpServerError> {
let shutdown_token = CancellationToken::new();
let handler = Arc::new(AcpSessionHandler::new(&config, shutdown_token.clone())?);
let transport: Box<dyn Transport> = match &config.transport {
TransportConfig::Stdio => Box::new(StdioTransport::new(&config)),
TransportConfig::Http { bind_address, port } => {
Box::new(HttpTransport::new(bind_address.clone(), *port))
}
};
let shutdown_timeout = config.shutdown_timeout;
let serve_shutdown = shutdown_token.clone();
let handler_for_drain = handler.clone();
let join_handle = tokio::spawn(async move {
info!("ACP server starting");
let result = transport.serve(handler.clone(), serve_shutdown).await;
handler_for_drain.drain_sessions(shutdown_timeout).await;
info!("ACP server stopped");
result
});
Ok(AcpServerHandle { shutdown_token, join_handle })
}
}