Skip to main content

adk_acp/
streaming.rs

1//! Streaming output from ACP agent sessions.
2//!
3//! Instead of collecting the full response into a string, streaming mode
4//! yields chunks as they arrive from the agent — enabling real-time display
5//! and lower time-to-first-token.
6
7use std::str::FromStr;
8use std::sync::Arc;
9
10use agent_client_protocol::schema::{
11    ContentBlock, InitializeRequest, ProtocolVersion, RequestPermissionOutcome,
12    RequestPermissionRequest, RequestPermissionResponse, SelectedPermissionOutcome,
13    SessionNotification, SessionUpdate,
14};
15use agent_client_protocol::{Agent, Client, ConnectionTo};
16use agent_client_protocol_tokio::AcpAgent;
17use tokio::sync::mpsc;
18use tracing::{info, warn};
19
20use crate::connection::AcpAgentConfig;
21use crate::error::{AcpError, Result};
22use crate::permissions::{
23    PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
24};
25use crate::status::{AgentStatus, StatusTracker};
26
27/// A chunk of output from the ACP agent.
28#[derive(Debug, Clone)]
29pub enum OutputChunk {
30    /// A text chunk from the agent's response.
31    Text(String),
32    /// The agent is thinking (internal reasoning, not shown to user by default).
33    Thought(String),
34    /// A tool call was initiated (e.g., "Creating file app.rs").
35    ToolCall {
36        /// Human-readable title of the operation.
37        title: String,
38    },
39    /// A tool call completed.
40    ToolCallComplete {
41        /// Human-readable title.
42        title: String,
43    },
44    /// The agent requested permission (informational — decision already made by policy).
45    PermissionRequested {
46        /// What the agent wanted to do.
47        title: String,
48        /// Whether it was approved.
49        approved: bool,
50    },
51    /// The agent finished responding.
52    Done,
53    /// An error occurred.
54    Error(String),
55}
56
57/// A streaming receiver for ACP agent output.
58///
59/// Yields [`OutputChunk`]s as they arrive from the agent.
60///
61/// # Example
62///
63/// ```rust,ignore
64/// use adk_acp::streaming::stream_prompt;
65///
66/// let mut stream = stream_prompt(&config, "Write a hello world", policy, status).await?;
67/// while let Some(chunk) = stream.recv().await {
68///     match chunk {
69///         OutputChunk::Text(t) => print!("{t}"),
70///         OutputChunk::ToolCall { title } => println!("\n[tool] {title}"),
71///         OutputChunk::Done => break,
72///         _ => {}
73///     }
74/// }
75/// ```
76pub type OutputStream = mpsc::Receiver<OutputChunk>;
77
78/// Send a prompt and stream the response chunks.
79///
80/// Returns a receiver that yields [`OutputChunk`]s as they arrive.
81/// The agent process is terminated when the stream completes.
82pub async fn stream_prompt(
83    config: &AcpAgentConfig,
84    prompt: &str,
85    policy: Arc<PermissionPolicy>,
86    status: StatusTracker,
87) -> Result<OutputStream> {
88    info!(command = %config.command, "starting streaming ACP prompt");
89
90    let agent = AcpAgent::from_str(&config.command).map_err(|e| {
91        AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
92    })?;
93
94    let (chunk_tx, chunk_rx) = mpsc::channel::<OutputChunk>(64);
95    let prompt_text = prompt.to_string();
96    let working_dir = config.working_dir.clone();
97
98    status.set(AgentStatus::Starting);
99
100    tokio::spawn(async move {
101        let chunk_tx_err = chunk_tx.clone();
102        let status_inner = status.clone();
103        let policy_clone = policy.clone();
104        let chunk_tx_perm = chunk_tx.clone();
105
106        let outcome = Client
107            .builder()
108            .on_receive_notification(
109                {
110                    let tx = chunk_tx.clone();
111                    async move |notif: SessionNotification, _cx: ConnectionTo<Agent>| {
112                        match notif.update {
113                            SessionUpdate::AgentMessageChunk(chunk) => {
114                                if let ContentBlock::Text(text_content) = chunk.content {
115                                    let _ = tx
116                                        .send(OutputChunk::Text(text_content.text.to_string()))
117                                        .await;
118                                }
119                            }
120                            SessionUpdate::AgentThoughtChunk(chunk) => {
121                                if let ContentBlock::Text(text_content) = chunk.content {
122                                    let _ = tx
123                                        .send(OutputChunk::Thought(text_content.text.to_string()))
124                                        .await;
125                                }
126                            }
127                            SessionUpdate::ToolCall(tool_call) => {
128                                let _ = tx
129                                    .send(OutputChunk::ToolCall {
130                                        title: tool_call.title.to_string(),
131                                    })
132                                    .await;
133                            }
134                            _ => {}
135                        }
136                        Ok(())
137                    }
138                },
139                agent_client_protocol::on_receive_notification!(),
140            )
141            .on_receive_request(
142                {
143                    let status = status_inner.clone();
144                    async move |request: RequestPermissionRequest,
145                                responder,
146                                _cx: ConnectionTo<Agent>| {
147                        status.set(AgentStatus::WaitingPermission);
148
149                        let title = request
150                            .options
151                            .first()
152                            .map(|o| o.name.to_string())
153                            .unwrap_or_else(|| "Unknown".to_string());
154
155                        let perm_request = PermissionRequest {
156                            title: title.clone(),
157                            options: request
158                                .options
159                                .iter()
160                                .map(|o| PermissionOption {
161                                    id: o.option_id.to_string(),
162                                    name: o.name.to_string(),
163                                })
164                                .collect(),
165                        };
166
167                        let decision = policy_clone.decide(&perm_request);
168                        let approved = matches!(decision, PermissionDecision::Allow(_));
169
170                        let _ = chunk_tx_perm
171                            .send(OutputChunk::PermissionRequested {
172                                title: title.clone(),
173                                approved,
174                            })
175                            .await;
176
177                        status.set(AgentStatus::Running);
178
179                        match decision {
180                            PermissionDecision::Allow(id) => responder.respond(
181                                RequestPermissionResponse::new(RequestPermissionOutcome::Selected(
182                                    SelectedPermissionOutcome::new(id),
183                                )),
184                            ),
185                            PermissionDecision::Deny => responder.respond(
186                                RequestPermissionResponse::new(RequestPermissionOutcome::Cancelled),
187                            ),
188                        }
189                    }
190                },
191                agent_client_protocol::on_receive_request!(),
192            )
193            .connect_with(agent, {
194                let status = status_inner.clone();
195                let tx = chunk_tx.clone();
196                |connection: ConnectionTo<Agent>| async move {
197                    status.set(AgentStatus::Starting);
198
199                    connection
200                        .send_request(InitializeRequest::new(ProtocolVersion::V1))
201                        .block_task()
202                        .await?;
203
204                    status.set(AgentStatus::Running);
205
206                    connection
207                        .build_session(&working_dir)
208                        .block_task()
209                        .run_until(async |mut session| {
210                            session.send_prompt(&prompt_text)?;
211                            // read_to_string collects internally; notifications stream via callback
212                            let _ = session.read_to_string().await?;
213                            let _ = tx.send(OutputChunk::Done).await;
214                            Ok(())
215                        })
216                        .await?;
217
218                    status.set(AgentStatus::Idle);
219                    Ok(())
220                }
221            })
222            .await;
223
224        if let Err(e) = outcome {
225            warn!(error = %e, "streaming ACP session ended with error");
226            let _ = chunk_tx_err.send(OutputChunk::Error(e.to_string())).await;
227        }
228
229        status_inner.set(AgentStatus::Stopped);
230    });
231
232    Ok(chunk_rx)
233}