Skip to main content

adk_acp/
session.rs

1//! Persistent ACP session with connection reuse.
2//!
3//! Unlike [`prompt_agent`](crate::prompt_agent) which spawns a fresh process per call,
4//! [`AcpSession`] keeps the agent process alive across multiple prompts — preserving
5//! context, reducing latency, and enabling session-based workflows.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use adk_acp::{AcpSession, AcpAgentConfig, PermissionPolicy};
11//! use std::sync::Arc;
12//!
13//! let config = AcpAgentConfig::new("kiro-cli acp --trust-all-tools")
14//!     .working_dir("/path/to/project");
15//!
16//! let mut session = AcpSession::start(config, Arc::new(PermissionPolicy::AutoApprove)).await?;
17//!
18//! // First prompt — Kiro reads the project structure
19//! let r1 = session.prompt("List the files in src/").await?;
20//! println!("{}", r1.text);
21//!
22//! // Second prompt — Kiro already has context from the first
23//! let r2 = session.prompt("Now explain what main.rs does").await?;
24//! println!("{}", r2.text);
25//!
26//! // Clean shutdown
27//! session.close().await?;
28//! ```
29
30use std::path::PathBuf;
31use std::str::FromStr;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use agent_client_protocol::schema::{
36    InitializeRequest, ProtocolVersion, RequestPermissionOutcome, RequestPermissionRequest,
37    RequestPermissionResponse, SelectedPermissionOutcome,
38};
39use agent_client_protocol::{Agent, Client, ConnectionTo};
40use agent_client_protocol_tokio::AcpAgent;
41use tokio::sync::Mutex;
42use tracing::{debug, info, warn};
43
44use crate::connection::AcpAgentConfig;
45use crate::error::{AcpError, Result};
46use crate::permissions::{
47    PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
48};
49
50/// Result of a prompt sent to a persistent session.
51#[derive(Debug, Clone)]
52pub struct PromptResult {
53    /// The text response from the agent.
54    pub text: String,
55    /// Wall-clock duration of this prompt.
56    pub duration: Duration,
57    /// Number of prompts sent in this session so far (including this one).
58    pub prompt_count: u32,
59}
60
61/// A persistent connection to an ACP agent with session reuse.
62///
63/// The agent process stays alive between prompts, preserving conversation
64/// context and reducing spawn overhead. Use this when you need multiple
65/// interactions with the same agent in sequence.
66pub struct AcpSession {
67    config: AcpAgentConfig,
68    #[allow(dead_code)]
69    policy: Arc<PermissionPolicy>,
70    prompt_count: u32,
71    started_at: Instant,
72    /// Inner state — None after close()
73    inner: Option<SessionInner>,
74}
75
76/// Holds the actual connection state.
77/// We use a channel-based approach: the ACP connection runs in a background task,
78/// and we send prompts to it via channels.
79struct SessionInner {
80    prompt_tx: tokio::sync::mpsc::Sender<SessionCommand>,
81    result_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<SessionResult>>>,
82}
83
84enum SessionCommand {
85    Prompt(String),
86    Close,
87}
88
89enum SessionResult {
90    Response(String),
91    Error(String),
92    Closed,
93}
94
95impl AcpSession {
96    /// Start a new persistent session with an ACP agent.
97    ///
98    /// Spawns the agent process, performs the ACP handshake, and creates a session.
99    /// The connection stays alive until [`close()`](Self::close) is called or the
100    /// session is dropped.
101    pub async fn start(config: AcpAgentConfig, policy: Arc<PermissionPolicy>) -> Result<Self> {
102        info!(command = %config.command, cwd = %config.working_dir.display(), "starting persistent ACP session");
103
104        let agent = AcpAgent::from_str(&config.command).map_err(|e| {
105            AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
106        })?;
107
108        let (prompt_tx, mut prompt_rx) = tokio::sync::mpsc::channel::<SessionCommand>(1);
109        let (result_tx, result_rx) = tokio::sync::mpsc::channel::<SessionResult>(1);
110
111        let working_dir = config.working_dir.clone();
112        let policy_clone = policy.clone();
113
114        // Spawn the ACP connection in a background task
115        tokio::spawn(async move {
116            let result_tx_err = result_tx.clone();
117            let outcome = Client
118                .builder()
119                .on_receive_request(
120                    {
121                        let policy = policy_clone.clone();
122                        async move |request: RequestPermissionRequest,
123                                    responder,
124                                    _cx: ConnectionTo<Agent>| {
125                            let title = request
126                                .options
127                                .first()
128                                .map(|o| o.name.to_string())
129                                .unwrap_or_else(|| "Unknown operation".to_string());
130
131                            let perm_request = PermissionRequest {
132                                title: title.clone(),
133                                options: request
134                                    .options
135                                    .iter()
136                                    .map(|o| PermissionOption {
137                                        id: o.option_id.to_string(),
138                                        name: o.name.to_string(),
139                                    })
140                                    .collect(),
141                            };
142
143                            let decision = policy.decide(&perm_request);
144                            match &decision {
145                                PermissionDecision::Allow(option_id) => {
146                                    debug!(title = %title, "ACP permission granted");
147                                    responder.respond(RequestPermissionResponse::new(
148                                        RequestPermissionOutcome::Selected(
149                                            SelectedPermissionOutcome::new(option_id.clone()),
150                                        ),
151                                    ))
152                                }
153                                PermissionDecision::Deny => {
154                                    warn!(title = %title, "ACP permission DENIED");
155                                    responder.respond(RequestPermissionResponse::new(
156                                        RequestPermissionOutcome::Cancelled,
157                                    ))
158                                }
159                            }
160                        }
161                    },
162                    agent_client_protocol::on_receive_request!(),
163                )
164                .connect_with(agent, |connection: ConnectionTo<Agent>| async move {
165                    // Initialize
166                    connection
167                        .send_request(InitializeRequest::new(ProtocolVersion::V1))
168                        .block_task()
169                        .await?;
170
171                    // Create session and enter the prompt loop
172                    connection
173                        .build_session(&working_dir)
174                        .block_task()
175                        .run_until(async |mut session| {
176                            // Process commands from the main task
177                            while let Some(cmd) = prompt_rx.recv().await {
178                                match cmd {
179                                    SessionCommand::Prompt(text) => {
180                                        match session.send_prompt(&text) {
181                                            Ok(()) => match session.read_to_string().await {
182                                                Ok(response) => {
183                                                    let _ = result_tx
184                                                        .send(SessionResult::Response(response))
185                                                        .await;
186                                                }
187                                                Err(e) => {
188                                                    let _ = result_tx
189                                                        .send(SessionResult::Error(e.to_string()))
190                                                        .await;
191                                                }
192                                            },
193                                            Err(e) => {
194                                                let _ = result_tx
195                                                    .send(SessionResult::Error(e.to_string()))
196                                                    .await;
197                                            }
198                                        }
199                                    }
200                                    SessionCommand::Close => {
201                                        let _ = result_tx.send(SessionResult::Closed).await;
202                                        break;
203                                    }
204                                }
205                            }
206                            Ok(())
207                        })
208                        .await?;
209
210                    Ok(())
211                })
212                .await;
213
214            if let Err(e) = outcome {
215                warn!(error = %e, "ACP session background task ended with error");
216                let _ = result_tx_err.send(SessionResult::Error(e.to_string())).await;
217            }
218        });
219
220        Ok(Self {
221            config,
222            policy,
223            prompt_count: 0,
224            started_at: Instant::now(),
225            inner: Some(SessionInner { prompt_tx, result_rx: Arc::new(Mutex::new(result_rx)) }),
226        })
227    }
228
229    /// Send a prompt to the agent within the existing session.
230    ///
231    /// The agent retains context from previous prompts in this session,
232    /// so you don't need to re-explain project structure or repeat instructions.
233    pub async fn prompt(&mut self, text: &str) -> Result<PromptResult> {
234        let inner = self
235            .inner
236            .as_ref()
237            .ok_or_else(|| AcpError::ConnectionLost("session already closed".into()))?;
238
239        let start = Instant::now();
240        self.prompt_count += 1;
241
242        debug!(
243            prompt_count = self.prompt_count,
244            prompt_len = text.len(),
245            "sending prompt to persistent session"
246        );
247
248        inner
249            .prompt_tx
250            .send(SessionCommand::Prompt(text.to_string()))
251            .await
252            .map_err(|_| AcpError::ConnectionLost("agent process exited".into()))?;
253
254        let mut rx = inner.result_rx.lock().await;
255        match rx.recv().await {
256            Some(SessionResult::Response(text)) => Ok(PromptResult {
257                text,
258                duration: start.elapsed(),
259                prompt_count: self.prompt_count,
260            }),
261            Some(SessionResult::Error(e)) => Err(AcpError::Protocol(e)),
262            Some(SessionResult::Closed) => Err(AcpError::ConnectionLost("session closed".into())),
263            None => Err(AcpError::ConnectionLost("agent process exited".into())),
264        }
265    }
266
267    /// Close the session and terminate the agent process.
268    pub async fn close(&mut self) -> Result<()> {
269        if let Some(inner) = self.inner.take() {
270            let _ = inner.prompt_tx.send(SessionCommand::Close).await;
271            info!(
272                prompt_count = self.prompt_count,
273                uptime = ?self.started_at.elapsed(),
274                "ACP session closed"
275            );
276        }
277        Ok(())
278    }
279
280    /// Number of prompts sent in this session.
281    pub fn prompt_count(&self) -> u32 {
282        self.prompt_count
283    }
284
285    /// How long this session has been alive.
286    pub fn uptime(&self) -> Duration {
287        self.started_at.elapsed()
288    }
289
290    /// Whether the session is still connected.
291    pub fn is_active(&self) -> bool {
292        self.inner.is_some()
293    }
294
295    /// Get the working directory for this session.
296    pub fn working_dir(&self) -> &PathBuf {
297        &self.config.working_dir
298    }
299}
300
301impl Drop for AcpSession {
302    fn drop(&mut self) {
303        if self.inner.is_some() {
304            warn!("AcpSession dropped without explicit close — agent process may linger");
305        }
306    }
307}