adk_acp/session.rs
1//! Persistent ACP session with connection reuse.
2//!
3//! Unlike [`prompt_agent`](crate::prompt_agent) which spawns a fresh process per call,
4//! [`AcpSession`] keeps the agent process alive across multiple prompts — preserving
5//! context, reducing latency, and enabling session-based workflows.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use adk_acp::{AcpSession, AcpAgentConfig, PermissionPolicy};
11//! use std::sync::Arc;
12//!
13//! let config = AcpAgentConfig::new("kiro-cli acp --trust-all-tools")
14//! .working_dir("/path/to/project");
15//!
16//! let mut session = AcpSession::start(config, Arc::new(PermissionPolicy::AutoApprove)).await?;
17//!
18//! // First prompt — Kiro reads the project structure
19//! let r1 = session.prompt("List the files in src/").await?;
20//! println!("{}", r1.text);
21//!
22//! // Second prompt — Kiro already has context from the first
23//! let r2 = session.prompt("Now explain what main.rs does").await?;
24//! println!("{}", r2.text);
25//!
26//! // Clean shutdown
27//! session.close().await?;
28//! ```
29
30use std::path::PathBuf;
31use std::str::FromStr;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use agent_client_protocol::schema::{
36 InitializeRequest, ProtocolVersion, RequestPermissionOutcome, RequestPermissionRequest,
37 RequestPermissionResponse, SelectedPermissionOutcome,
38};
39use agent_client_protocol::{Agent, Client, ConnectionTo};
40use agent_client_protocol_tokio::AcpAgent;
41use tokio::sync::Mutex;
42use tracing::{debug, info, warn};
43
44use crate::connection::AcpAgentConfig;
45use crate::error::{AcpError, Result};
46use crate::permissions::{
47 PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
48};
49
50/// Result of a prompt sent to a persistent session.
51#[derive(Debug, Clone)]
52pub struct PromptResult {
53 /// The text response from the agent.
54 pub text: String,
55 /// Wall-clock duration of this prompt.
56 pub duration: Duration,
57 /// Number of prompts sent in this session so far (including this one).
58 pub prompt_count: u32,
59}
60
61/// A persistent connection to an ACP agent with session reuse.
62///
63/// The agent process stays alive between prompts, preserving conversation
64/// context and reducing spawn overhead. Use this when you need multiple
65/// interactions with the same agent in sequence.
66pub struct AcpSession {
67 config: AcpAgentConfig,
68 #[allow(dead_code)]
69 policy: Arc<PermissionPolicy>,
70 prompt_count: u32,
71 started_at: Instant,
72 /// Inner state — None after close()
73 inner: Option<SessionInner>,
74}
75
76/// Holds the actual connection state.
77/// We use a channel-based approach: the ACP connection runs in a background task,
78/// and we send prompts to it via channels.
79struct SessionInner {
80 prompt_tx: tokio::sync::mpsc::Sender<SessionCommand>,
81 result_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<SessionResult>>>,
82}
83
84enum SessionCommand {
85 Prompt(String),
86 Close,
87}
88
89enum SessionResult {
90 Response(String),
91 Error(String),
92 Closed,
93}
94
95impl AcpSession {
96 /// Start a new persistent session with an ACP agent.
97 ///
98 /// Spawns the agent process, performs the ACP handshake, and creates a session.
99 /// The connection stays alive until [`close()`](Self::close) is called or the
100 /// session is dropped.
101 pub async fn start(config: AcpAgentConfig, policy: Arc<PermissionPolicy>) -> Result<Self> {
102 info!(command = %config.command, cwd = %config.working_dir.display(), "starting persistent ACP session");
103
104 let agent = AcpAgent::from_str(&config.command).map_err(|e| {
105 AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
106 })?;
107
108 let (prompt_tx, mut prompt_rx) = tokio::sync::mpsc::channel::<SessionCommand>(1);
109 let (result_tx, result_rx) = tokio::sync::mpsc::channel::<SessionResult>(1);
110
111 let working_dir = config.working_dir.clone();
112 let policy_clone = policy.clone();
113
114 // Spawn the ACP connection in a background task
115 tokio::spawn(async move {
116 let result_tx_err = result_tx.clone();
117 let outcome = Client
118 .builder()
119 .on_receive_request(
120 {
121 let policy = policy_clone.clone();
122 async move |request: RequestPermissionRequest,
123 responder,
124 _cx: ConnectionTo<Agent>| {
125 let title = request
126 .options
127 .first()
128 .map(|o| o.name.to_string())
129 .unwrap_or_else(|| "Unknown operation".to_string());
130
131 let perm_request = PermissionRequest {
132 title: title.clone(),
133 options: request
134 .options
135 .iter()
136 .map(|o| PermissionOption {
137 id: o.option_id.to_string(),
138 name: o.name.to_string(),
139 })
140 .collect(),
141 };
142
143 let decision = policy.decide(&perm_request);
144 match &decision {
145 PermissionDecision::Allow(option_id) => {
146 debug!(title = %title, "ACP permission granted");
147 responder.respond(RequestPermissionResponse::new(
148 RequestPermissionOutcome::Selected(
149 SelectedPermissionOutcome::new(option_id.clone()),
150 ),
151 ))
152 }
153 PermissionDecision::Deny => {
154 warn!(title = %title, "ACP permission DENIED");
155 responder.respond(RequestPermissionResponse::new(
156 RequestPermissionOutcome::Cancelled,
157 ))
158 }
159 }
160 }
161 },
162 agent_client_protocol::on_receive_request!(),
163 )
164 .connect_with(agent, |connection: ConnectionTo<Agent>| async move {
165 // Initialize
166 connection
167 .send_request(InitializeRequest::new(ProtocolVersion::V1))
168 .block_task()
169 .await?;
170
171 // Create session and enter the prompt loop
172 connection
173 .build_session(&working_dir)
174 .block_task()
175 .run_until(async |mut session| {
176 // Process commands from the main task
177 while let Some(cmd) = prompt_rx.recv().await {
178 match cmd {
179 SessionCommand::Prompt(text) => {
180 match session.send_prompt(&text) {
181 Ok(()) => match session.read_to_string().await {
182 Ok(response) => {
183 let _ = result_tx
184 .send(SessionResult::Response(response))
185 .await;
186 }
187 Err(e) => {
188 let _ = result_tx
189 .send(SessionResult::Error(e.to_string()))
190 .await;
191 }
192 },
193 Err(e) => {
194 let _ = result_tx
195 .send(SessionResult::Error(e.to_string()))
196 .await;
197 }
198 }
199 }
200 SessionCommand::Close => {
201 let _ = result_tx.send(SessionResult::Closed).await;
202 break;
203 }
204 }
205 }
206 Ok(())
207 })
208 .await?;
209
210 Ok(())
211 })
212 .await;
213
214 if let Err(e) = outcome {
215 warn!(error = %e, "ACP session background task ended with error");
216 let _ = result_tx_err.send(SessionResult::Error(e.to_string())).await;
217 }
218 });
219
220 Ok(Self {
221 config,
222 policy,
223 prompt_count: 0,
224 started_at: Instant::now(),
225 inner: Some(SessionInner { prompt_tx, result_rx: Arc::new(Mutex::new(result_rx)) }),
226 })
227 }
228
229 /// Send a prompt to the agent within the existing session.
230 ///
231 /// The agent retains context from previous prompts in this session,
232 /// so you don't need to re-explain project structure or repeat instructions.
233 pub async fn prompt(&mut self, text: &str) -> Result<PromptResult> {
234 let inner = self
235 .inner
236 .as_ref()
237 .ok_or_else(|| AcpError::ConnectionLost("session already closed".into()))?;
238
239 let start = Instant::now();
240 self.prompt_count += 1;
241
242 debug!(
243 prompt_count = self.prompt_count,
244 prompt_len = text.len(),
245 "sending prompt to persistent session"
246 );
247
248 inner
249 .prompt_tx
250 .send(SessionCommand::Prompt(text.to_string()))
251 .await
252 .map_err(|_| AcpError::ConnectionLost("agent process exited".into()))?;
253
254 let mut rx = inner.result_rx.lock().await;
255 match rx.recv().await {
256 Some(SessionResult::Response(text)) => Ok(PromptResult {
257 text,
258 duration: start.elapsed(),
259 prompt_count: self.prompt_count,
260 }),
261 Some(SessionResult::Error(e)) => Err(AcpError::Protocol(e)),
262 Some(SessionResult::Closed) => Err(AcpError::ConnectionLost("session closed".into())),
263 None => Err(AcpError::ConnectionLost("agent process exited".into())),
264 }
265 }
266
267 /// Close the session and terminate the agent process.
268 pub async fn close(&mut self) -> Result<()> {
269 if let Some(inner) = self.inner.take() {
270 let _ = inner.prompt_tx.send(SessionCommand::Close).await;
271 info!(
272 prompt_count = self.prompt_count,
273 uptime = ?self.started_at.elapsed(),
274 "ACP session closed"
275 );
276 }
277 Ok(())
278 }
279
280 /// Number of prompts sent in this session.
281 pub fn prompt_count(&self) -> u32 {
282 self.prompt_count
283 }
284
285 /// How long this session has been alive.
286 pub fn uptime(&self) -> Duration {
287 self.started_at.elapsed()
288 }
289
290 /// Whether the session is still connected.
291 pub fn is_active(&self) -> bool {
292 self.inner.is_some()
293 }
294
295 /// Get the working directory for this session.
296 pub fn working_dir(&self) -> &PathBuf {
297 &self.config.working_dir
298 }
299}
300
301impl Drop for AcpSession {
302 fn drop(&mut self) {
303 if self.inner.is_some() {
304 warn!("AcpSession dropped without explicit close — agent process may linger");
305 }
306 }
307}