use std::io::IsTerminal;
use std::sync::Arc;
use sacp::link::AgentToClient;
use sacp::schema::{
CancelNotification, InitializeRequest, LoadSessionRequest, NewSessionRequest, PromptRequest,
SetSessionModeRequest,
};
use sacp::{ByteStreams, JrConnectionCx, MessageCx};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use super::core::ClaudeAcpAgent;
use super::handlers;
use crate::cli::Cli;
use crate::types::AgentError;
#[cfg(feature = "otel")]
use opentelemetry::global;
#[cfg(feature = "otel")]
use opentelemetry::trace::TracerProvider;
#[cfg(feature = "otel")]
use opentelemetry_otlp::WithExportConfig;
#[cfg(feature = "otel")]
use opentelemetry_sdk::trace::SdkTracerProvider;
#[cfg(feature = "otel")]
static OTEL_PROVIDER: std::sync::OnceLock<SdkTracerProvider> = std::sync::OnceLock::new();
#[cfg(feature = "otel")]
pub fn shutdown_otel() {
if let Some(provider) = OTEL_PROVIDER.get() {
tracing::info!("Shutting down OpenTelemetry provider...");
if let Err(e) = provider.shutdown() {
eprintln!("Failed to shutdown OpenTelemetry provider: {:?}", e);
} else {
tracing::info!("OpenTelemetry provider shutdown complete");
}
}
}
#[cfg(not(feature = "otel"))]
pub fn shutdown_otel() {}
#[cfg(feature = "otel")]
fn init_otel(endpoint: &str, service_name: &str) -> anyhow::Result<SdkTracerProvider> {
use opentelemetry_sdk::Resource;
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()?;
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(
Resource::builder()
.with_service_name(service_name.to_owned())
.build(),
)
.build();
global::set_tracer_provider(provider.clone());
Ok(provider)
}
fn build_env_filter(cli: &Cli) -> tracing_subscriber::EnvFilter {
if let Ok(rust_log) = std::env::var("RUST_LOG") {
if !rust_log.is_empty() {
return tracing_subscriber::EnvFilter::new(rust_log);
}
}
let level = cli.log_level();
tracing_subscriber::EnvFilter::from_default_env().add_directive(level.into())
}
fn init_logging_to_file(cli: &Cli) -> anyhow::Result<()> {
let filter = build_env_filter(cli);
let log_path = cli.log_path();
if let Some(parent) = log_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::File::create(&log_path)?;
eprintln!("Diagnostic mode: logging to {}", log_path.display());
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::sync::Mutex::new(file))
.with_ansi(false);
#[cfg(feature = "otel")]
{
if cli.is_otel_enabled() {
let endpoint = cli.otel_endpoint.as_ref().unwrap();
let service_name = &cli.otel_service_name;
eprintln!(
"OpenTelemetry enabled: endpoint={}, service={}",
endpoint, service_name
);
let provider = init_otel(endpoint, service_name)?;
let tracer = provider.tracer("claude-code-acp-rs");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
drop(OTEL_PROVIDER.set(provider));
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(otel_layer)
.init();
} else {
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.init();
}
}
#[cfg(not(feature = "otel"))]
{
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.init();
}
Ok(())
}
fn init_logging_to_stderr(cli: &Cli) {
let filter = build_env_filter(cli);
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_ansi(false);
#[cfg(feature = "otel")]
{
if cli.is_otel_enabled() {
let endpoint = cli.otel_endpoint.as_ref().unwrap();
let service_name = &cli.otel_service_name;
eprintln!(
"OpenTelemetry enabled: endpoint={}, service={}",
endpoint, service_name
);
let provider = init_otel(endpoint, service_name).expect("Failed to init OpenTelemetry");
let tracer = provider.tracer("claude-code-acp-rs");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
drop(OTEL_PROVIDER.set(provider));
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(otel_layer)
.init();
} else {
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.init();
}
}
#[cfg(not(feature = "otel"))]
{
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.init();
}
}
fn init_logging(cli: &Cli) -> anyhow::Result<()> {
if cli.is_diagnostic() {
init_logging_to_file(cli)
} else {
init_logging_to_stderr(cli);
Ok(())
}
}
pub async fn run_acp_with_cli(cli: &Cli) -> anyhow::Result<()> {
let startup_time = std::time::Instant::now();
init_logging(cli)?;
{
let startup_span = tracing::info_span!(
"agent_startup",
version = %env!("CARGO_PKG_VERSION"),
pid = %std::process::id(),
diagnostic = %cli.is_diagnostic(),
otel_enabled = %cli.otel_endpoint.is_some(),
);
let _enter = startup_span.enter();
tracing::info!("========== Claude Code ACP Agent Starting ==========");
tracing::info!(
version = %env!("CARGO_PKG_VERSION"),
pid = %std::process::id(),
"Agent process info"
);
if cli.is_diagnostic() {
tracing::info!(
log_path = %cli.log_path().display(),
"Diagnostic mode enabled"
);
}
if let Some(otel_endpoint) = &cli.otel_endpoint {
tracing::info!(
otel_endpoint = %otel_endpoint,
"OpenTelemetry tracing enabled"
);
}
let init_elapsed = startup_time.elapsed();
tracing::info!(
init_elapsed_ms = init_elapsed.as_millis(),
"Logging initialized"
);
}
emit_agent_ready_trace(startup_time.elapsed()).await;
let result = Box::pin(run_acp_server()).await.map_err(Into::into);
emit_agent_shutdown_trace(startup_time.elapsed()).await;
result
}
#[tracing::instrument(name = "agent_ready", skip_all, fields(
startup_ms = %startup_duration.as_millis(),
version = %env!("CARGO_PKG_VERSION"),
pid = %std::process::id(),
))]
async fn emit_agent_ready_trace(startup_duration: std::time::Duration) {
tracing::info!(
startup_ms = startup_duration.as_millis(),
"Agent ready and waiting for ACP messages"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
#[tracing::instrument(name = "agent_shutdown", skip_all, fields(
uptime_secs = %total_uptime.as_secs(),
uptime_ms = %total_uptime.as_millis(),
))]
async fn emit_agent_shutdown_trace(total_uptime: std::time::Duration) {
tracing::info!(
uptime_secs = total_uptime.as_secs(),
uptime_ms = total_uptime.as_millis(),
"========== Agent Shutdown Complete =========="
);
}
pub async fn run_acp() -> Result<(), sacp::Error> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.with_writer(std::io::stderr)
.init();
Box::pin(run_acp_server()).await
}
#[tracing::instrument(name = "acp_server_main")]
#[allow(clippy::large_futures)]
async fn run_acp_server() -> Result<(), sacp::Error> {
let server_start_time = std::time::Instant::now();
let is_tty = std::io::stdin().is_terminal();
let agent_session_id = uuid::Uuid::new_v4();
let start_time = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
tracing::info!(
"================================================================================"
);
tracing::info!(" Claude Code ACP Agent - Session Start");
tracing::info!(
"--------------------------------------------------------------------------------"
);
tracing::info!(" Version: {}", env!("CARGO_PKG_VERSION"));
tracing::info!(" Start Time: {}", start_time);
tracing::info!(" Session ID: {}", agent_session_id);
tracing::info!(" PID: {}", std::process::id());
tracing::info!(
" TTY Mode: {}",
if is_tty { "interactive" } else { "subprocess" }
);
tracing::info!(
"================================================================================"
);
tracing::debug!(
rust_log = ?std::env::var("RUST_LOG").ok(),
cwd = ?std::env::current_dir().ok(),
"Environment configuration"
);
if is_tty {
eprintln!("Claude Code ACP Agent is running in interactive mode.");
eprintln!("This agent communicates via ACP protocol over stdin/stdout.");
eprintln!("To use with an editor, configure it to run this binary.");
eprintln!("Waiting for ACP protocol messages on stdin...");
eprintln!("(Press Ctrl+C to exit)");
} else {
tracing::info!("Waiting for ACP protocol messages on stdin...");
}
let agent_create_start = std::time::Instant::now();
let agent = ClaudeAcpAgent::new();
let config = Arc::new(agent.config().clone());
let sessions = agent.sessions().clone();
let prompt_manager = agent.prompt_manager().clone();
let agent_create_elapsed = agent_create_start.elapsed();
tracing::info!(
agent_name = %agent.name(),
elapsed_ms = agent_create_elapsed.as_millis(),
has_base_url = config.base_url.is_some(),
has_api_key = config.api_key.is_some(),
has_model = config.model.is_some(),
"Agent created"
);
tracing::debug!("Building ACP handler chain");
AgentToClient::builder()
.name(agent.name())
.on_receive_request(
{
let config = config.clone();
async move |request: InitializeRequest, request_cx, _connection_cx| {
let protocol_version = format!("{:?}", request.protocol_version);
let span = tracing::info_span!(
"handle_initialize",
protocol_version = %protocol_version,
);
async {
tracing::info!(
"Received initialize request (protocol version: {})",
protocol_version
);
let response = handlers::handle_initialize(request, &config);
tracing::debug!("Sending initialize response");
request_cx.respond(response)
}
.instrument(span)
.await
}
},
sacp::on_receive_request!(),
)
.on_receive_request(
{
let config = config.clone();
let sessions = sessions.clone();
async move |request: NewSessionRequest, request_cx, connection_cx| {
let cwd = request.cwd.display().to_string();
let span = tracing::info_span!(
"handle_session_new",
cwd = %cwd,
mcp_server_count = request.mcp_servers.len(),
);
async {
tracing::debug!("Received session/new request");
match handlers::handle_new_session(request, &config, &sessions, connection_cx).await {
Ok(response) => request_cx.respond(response),
Err(e) => request_cx
.respond_with_error(sacp::util::internal_error(e.to_string())),
}
}
.instrument(span)
.await
}
},
sacp::on_receive_request!(),
)
.on_receive_request(
{
let config = config.clone();
let sessions = sessions.clone();
async move |request: LoadSessionRequest, request_cx, _connection_cx| {
let session_id = request.session_id.0.clone();
let span = tracing::info_span!(
"handle_session_load",
session_id = %session_id,
);
async {
tracing::debug!("Received session/load request for session {}", session_id);
match handlers::handle_load_session(request, &config, &sessions) {
Ok(response) => request_cx.respond(response),
Err(e) => request_cx
.respond_with_error(sacp::util::internal_error(e.to_string())),
}
}
.instrument(span)
.await
}
},
sacp::on_receive_request!(),
)
.on_receive_request(
{
let config = config.clone();
let sessions = sessions.clone();
let prompt_manager = prompt_manager.clone();
async move |request: PromptRequest, request_cx, connection_cx| {
let session_id = request.session_id.0.clone();
let prompt_len = request.prompt.len();
let span = tracing::info_span!(
"handle_session_prompt",
session_id = %session_id,
prompt_blocks = prompt_len,
);
tracing::debug!(
"Received session/prompt request for session {}, spawning handler",
session_id
);
let session_id_str = session_id.to_string();
prompt_manager.cancel_session_prompt(&session_id_str).await;
let cancel_token = CancellationToken::new();
let config = config.clone();
let sessions = sessions.clone();
let prompt_manager = prompt_manager.clone();
connection_cx.spawn({
let connection_cx = connection_cx.clone();
async move {
async {
tracing::debug!(
"Starting prompt handling for session {}",
session_id_str
);
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn({
let cancel_token = cancel_token.clone();
let session_id_for_log = session_id_str.clone();
let connection_cx_inner = connection_cx.clone();
async move {
tracing::debug!(
session_id = %session_id_for_log,
"Prompt task started"
);
let result = handlers::handle_prompt(
request,
&config,
&sessions,
connection_cx_inner,
cancel_token,
)
.await;
if result_tx.send(result).is_err() {
tracing::warn!("Failed to send prompt result - receiver was dropped (task may have panicked)");
}
tracing::debug!(
session_id = %session_id_for_log,
"Prompt task completed"
);
}
});
let prompt_id = prompt_manager.register_prompt(
session_id_str.clone(),
handle,
cancel_token,
);
let result = result_rx.await;
let result = match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(e)) => Err(e),
Err(_) => {
Err(AgentError::Internal(
"Prompt task failed unexpectedly".to_string(),
))
}
};
prompt_manager.complete_prompt(&session_id_str, &prompt_id);
match result {
Ok(response) => request_cx.respond(response),
Err(e) => {
tracing::error!("Prompt error: {}", e);
request_cx.respond_with_error(sacp::util::internal_error(
e.to_string(),
))
}
}
}
.instrument(span)
.await
}
})
}
},
sacp::on_receive_request!(),
)
.on_receive_request(
{
let sessions = sessions.clone();
async move |request: SetSessionModeRequest, request_cx, connection_cx| {
let session_id = request.session_id.0.clone();
let mode_id = request.mode_id.0.clone();
let span = tracing::info_span!(
"handle_session_setMode",
session_id = %session_id,
mode_id = %mode_id,
);
async {
tracing::debug!("Received session/setMode request");
match handlers::handle_set_mode(request, &sessions, connection_cx).await {
Ok(response) => request_cx.respond(response),
Err(e) => request_cx
.respond_with_error(sacp::util::internal_error(e.to_string())),
}
}
.instrument(span)
.await
}
},
sacp::on_receive_request!(),
)
.on_receive_notification(
{
let sessions = sessions.clone();
async move |notification: CancelNotification, _connection_cx| {
let session_id = notification.session_id.0.clone();
let span = tracing::info_span!(
"handle_session_cancel",
session_id = %session_id,
);
async {
tracing::debug!(
"Received session/cancel notification for session {}",
session_id
);
if let Err(e) = handlers::handle_cancel(&session_id, &sessions).await {
tracing::error!("Cancel error: {}", e);
}
Ok(())
}
.instrument(span)
.await
}
},
sacp::on_receive_notification!(),
)
.on_receive_message(
{
let config = agent.config().clone();
let sessions = agent.sessions().clone();
async move |message: MessageCx, connection_cx: JrConnectionCx<AgentToClient>| {
let method = message.method().to_string();
let span = tracing::info_span!(
"handle_message",
method = ?method,
);
async {
match method.as_str() {
"session/set_model" => {
dispatch_unstable_request(message, |params| async {
let request: agent_client_protocol_schema::SetSessionModelRequest =
serde_json::from_value(params)?;
let response = handlers::handle_set_session_model(request, &sessions).await
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(serde_json::to_value(response)?)
}).await
}
"session/fork" => {
let cx = connection_cx.clone();
dispatch_unstable_request(message, |params| async {
let request: agent_client_protocol_schema::ForkSessionRequest =
serde_json::from_value(params)?;
let response = handlers::handle_fork_session(request, &config, &sessions, cx)
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(serde_json::to_value(response)?)
}).await
}
"session/resume" => {
let cx = connection_cx.clone();
dispatch_unstable_request(message, |params| async {
let request: agent_client_protocol_schema::ResumeSessionRequest =
serde_json::from_value(params)?;
let response = handlers::handle_resume_session(request, &config, &sessions, cx)
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(serde_json::to_value(response)?)
}).await
}
"session/list" => {
dispatch_unstable_request(message, |params| async move {
let request: agent_client_protocol_schema::ListSessionsRequest =
serde_json::from_value(params)?;
let response = handlers::handle_list_sessions(request)
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(serde_json::to_value(response)?)
}).await
}
_ => {
tracing::warn!("Received unknown message: {:?}", method);
message.respond_with_error(
sacp::util::internal_error("Unknown method"),
connection_cx,
)
}
}
}
.instrument(span)
.await
}
},
sacp::on_receive_message!(),
)
.serve(ByteStreams::new(
tokio::io::stdout().compat_write(),
tokio::io::stdin().compat(),
))
.await
.map_err(|e| {
let uptime = server_start_time.elapsed();
tracing::error!(
error = %e,
uptime_ms = uptime.as_millis(),
"ACP server error"
);
e
})
.inspect(|_result| {
let uptime = server_start_time.elapsed();
tracing::info!(
uptime_secs = uptime.as_secs(),
uptime_ms = uptime.as_millis(),
"ACP server shutting down gracefully"
);
})
}
async fn dispatch_unstable_request<Fut>(
message: MessageCx,
handler: impl FnOnce(serde_json::Value) -> Fut,
) -> Result<(), sacp::Error>
where
Fut: std::future::Future<Output = Result<serde_json::Value, anyhow::Error>>,
{
match message {
MessageCx::Request(untyped, request_cx) => match handler(untyped.params.clone()).await {
Ok(response_value) => request_cx.respond(response_value),
Err(e) => {
tracing::error!(error = %e, "Unstable handler error");
request_cx.respond_with_error(sacp::util::internal_error(e.to_string()))
}
},
MessageCx::Notification(_) => {
tracing::warn!("Received notification for request-only method");
Ok(())
}
}
}