use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use agent_client_protocol::schema::{
InitializeRequest, ProtocolVersion, RequestPermissionOutcome, RequestPermissionRequest,
RequestPermissionResponse, SelectedPermissionOutcome,
};
use agent_client_protocol::{Agent, Client, ConnectionTo};
use agent_client_protocol_tokio::AcpAgent;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::connection::AcpAgentConfig;
use crate::error::{AcpError, Result};
use crate::permissions::{
PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
};
#[derive(Debug, Clone)]
pub struct PromptResult {
pub text: String,
pub duration: Duration,
pub prompt_count: u32,
}
pub struct AcpSession {
config: AcpAgentConfig,
#[allow(dead_code)]
policy: Arc<PermissionPolicy>,
prompt_count: u32,
started_at: Instant,
inner: Option<SessionInner>,
}
struct SessionInner {
prompt_tx: tokio::sync::mpsc::Sender<SessionCommand>,
result_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<SessionResult>>>,
}
enum SessionCommand {
Prompt(String),
Close,
}
enum SessionResult {
Response(String),
Error(String),
Closed,
}
impl AcpSession {
pub async fn start(config: AcpAgentConfig, policy: Arc<PermissionPolicy>) -> Result<Self> {
info!(command = %config.command, cwd = %config.working_dir.display(), "starting persistent ACP session");
let agent = AcpAgent::from_str(&config.command).map_err(|e| {
AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
})?;
let (prompt_tx, mut prompt_rx) = tokio::sync::mpsc::channel::<SessionCommand>(1);
let (result_tx, result_rx) = tokio::sync::mpsc::channel::<SessionResult>(1);
let working_dir = config.working_dir.clone();
let policy_clone = policy.clone();
tokio::spawn(async move {
let result_tx_err = result_tx.clone();
let outcome = Client
.builder()
.on_receive_request(
{
let policy = policy_clone.clone();
async move |request: RequestPermissionRequest,
responder,
_cx: ConnectionTo<Agent>| {
let title = request
.options
.first()
.map(|o| o.name.to_string())
.unwrap_or_else(|| "Unknown operation".to_string());
let perm_request = PermissionRequest {
title: title.clone(),
options: request
.options
.iter()
.map(|o| PermissionOption {
id: o.option_id.to_string(),
name: o.name.to_string(),
})
.collect(),
};
let decision = policy.decide(&perm_request);
match &decision {
PermissionDecision::Allow(option_id) => {
debug!(title = %title, "ACP permission granted");
responder.respond(RequestPermissionResponse::new(
RequestPermissionOutcome::Selected(
SelectedPermissionOutcome::new(option_id.clone()),
),
))
}
PermissionDecision::Deny => {
warn!(title = %title, "ACP permission DENIED");
responder.respond(RequestPermissionResponse::new(
RequestPermissionOutcome::Cancelled,
))
}
}
}
},
agent_client_protocol::on_receive_request!(),
)
.connect_with(agent, |connection: ConnectionTo<Agent>| async move {
connection
.send_request(InitializeRequest::new(ProtocolVersion::V1))
.block_task()
.await?;
connection
.build_session(&working_dir)
.block_task()
.run_until(async |mut session| {
while let Some(cmd) = prompt_rx.recv().await {
match cmd {
SessionCommand::Prompt(text) => {
match session.send_prompt(&text) {
Ok(()) => match session.read_to_string().await {
Ok(response) => {
let _ = result_tx
.send(SessionResult::Response(response))
.await;
}
Err(e) => {
let _ = result_tx
.send(SessionResult::Error(e.to_string()))
.await;
}
},
Err(e) => {
let _ = result_tx
.send(SessionResult::Error(e.to_string()))
.await;
}
}
}
SessionCommand::Close => {
let _ = result_tx.send(SessionResult::Closed).await;
break;
}
}
}
Ok(())
})
.await?;
Ok(())
})
.await;
if let Err(e) = outcome {
warn!(error = %e, "ACP session background task ended with error");
let _ = result_tx_err.send(SessionResult::Error(e.to_string())).await;
}
});
Ok(Self {
config,
policy,
prompt_count: 0,
started_at: Instant::now(),
inner: Some(SessionInner { prompt_tx, result_rx: Arc::new(Mutex::new(result_rx)) }),
})
}
pub async fn prompt(&mut self, text: &str) -> Result<PromptResult> {
let inner = self
.inner
.as_ref()
.ok_or_else(|| AcpError::ConnectionLost("session already closed".into()))?;
let start = Instant::now();
self.prompt_count += 1;
debug!(
prompt_count = self.prompt_count,
prompt_len = text.len(),
"sending prompt to persistent session"
);
inner
.prompt_tx
.send(SessionCommand::Prompt(text.to_string()))
.await
.map_err(|_| AcpError::ConnectionLost("agent process exited".into()))?;
let mut rx = inner.result_rx.lock().await;
match rx.recv().await {
Some(SessionResult::Response(text)) => Ok(PromptResult {
text,
duration: start.elapsed(),
prompt_count: self.prompt_count,
}),
Some(SessionResult::Error(e)) => Err(AcpError::Protocol(e)),
Some(SessionResult::Closed) => Err(AcpError::ConnectionLost("session closed".into())),
None => Err(AcpError::ConnectionLost("agent process exited".into())),
}
}
pub async fn close(&mut self) -> Result<()> {
if let Some(inner) = self.inner.take() {
let _ = inner.prompt_tx.send(SessionCommand::Close).await;
info!(
prompt_count = self.prompt_count,
uptime = ?self.started_at.elapsed(),
"ACP session closed"
);
}
Ok(())
}
pub fn prompt_count(&self) -> u32 {
self.prompt_count
}
pub fn uptime(&self) -> Duration {
self.started_at.elapsed()
}
pub fn is_active(&self) -> bool {
self.inner.is_some()
}
pub fn working_dir(&self) -> &PathBuf {
&self.config.working_dir
}
}
impl Drop for AcpSession {
fn drop(&mut self) {
if self.inner.is_some() {
warn!("AcpSession dropped without explicit close — agent process may linger");
}
}
}