1use std::str::FromStr;
8use std::sync::Arc;
9
10use agent_client_protocol::schema::{
11 ContentBlock, InitializeRequest, ProtocolVersion, RequestPermissionOutcome,
12 RequestPermissionRequest, RequestPermissionResponse, SelectedPermissionOutcome,
13 SessionNotification, SessionUpdate,
14};
15use agent_client_protocol::{Agent, Client, ConnectionTo};
16use agent_client_protocol_tokio::AcpAgent;
17use tokio::sync::mpsc;
18use tracing::{info, warn};
19
20use crate::connection::AcpAgentConfig;
21use crate::error::{AcpError, Result};
22use crate::permissions::{
23 PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
24};
25use crate::status::{AgentStatus, StatusTracker};
26
27#[derive(Debug, Clone)]
29pub enum OutputChunk {
30 Text(String),
32 Thought(String),
34 ToolCall {
36 title: String,
38 },
39 ToolCallComplete {
41 title: String,
43 },
44 PermissionRequested {
46 title: String,
48 approved: bool,
50 },
51 Done,
53 Error(String),
55}
56
57pub type OutputStream = mpsc::Receiver<OutputChunk>;
77
78pub async fn stream_prompt(
83 config: &AcpAgentConfig,
84 prompt: &str,
85 policy: Arc<PermissionPolicy>,
86 status: StatusTracker,
87) -> Result<OutputStream> {
88 info!(command = %config.command, "starting streaming ACP prompt");
89
90 let agent = AcpAgent::from_str(&config.command).map_err(|e| {
91 AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
92 })?;
93
94 let (chunk_tx, chunk_rx) = mpsc::channel::<OutputChunk>(64);
95 let prompt_text = prompt.to_string();
96 let working_dir = config.working_dir.clone();
97
98 status.set(AgentStatus::Starting);
99
100 tokio::spawn(async move {
101 let chunk_tx_err = chunk_tx.clone();
102 let status_inner = status.clone();
103 let policy_clone = policy.clone();
104 let chunk_tx_perm = chunk_tx.clone();
105
106 let outcome = Client
107 .builder()
108 .on_receive_notification(
109 {
110 let tx = chunk_tx.clone();
111 async move |notif: SessionNotification, _cx: ConnectionTo<Agent>| {
112 match notif.update {
113 SessionUpdate::AgentMessageChunk(chunk) => {
114 if let ContentBlock::Text(text_content) = chunk.content {
115 let _ = tx
116 .send(OutputChunk::Text(text_content.text.to_string()))
117 .await;
118 }
119 }
120 SessionUpdate::AgentThoughtChunk(chunk) => {
121 if let ContentBlock::Text(text_content) = chunk.content {
122 let _ = tx
123 .send(OutputChunk::Thought(text_content.text.to_string()))
124 .await;
125 }
126 }
127 SessionUpdate::ToolCall(tool_call) => {
128 let _ = tx
129 .send(OutputChunk::ToolCall {
130 title: tool_call.title.to_string(),
131 })
132 .await;
133 }
134 _ => {}
135 }
136 Ok(())
137 }
138 },
139 agent_client_protocol::on_receive_notification!(),
140 )
141 .on_receive_request(
142 {
143 let status = status_inner.clone();
144 async move |request: RequestPermissionRequest,
145 responder,
146 _cx: ConnectionTo<Agent>| {
147 status.set(AgentStatus::WaitingPermission);
148
149 let title = request
150 .options
151 .first()
152 .map(|o| o.name.to_string())
153 .unwrap_or_else(|| "Unknown".to_string());
154
155 let perm_request = PermissionRequest {
156 title: title.clone(),
157 options: request
158 .options
159 .iter()
160 .map(|o| PermissionOption {
161 id: o.option_id.to_string(),
162 name: o.name.to_string(),
163 })
164 .collect(),
165 };
166
167 let decision = policy_clone.decide(&perm_request);
168 let approved = matches!(decision, PermissionDecision::Allow(_));
169
170 let _ = chunk_tx_perm
171 .send(OutputChunk::PermissionRequested {
172 title: title.clone(),
173 approved,
174 })
175 .await;
176
177 status.set(AgentStatus::Running);
178
179 match decision {
180 PermissionDecision::Allow(id) => responder.respond(
181 RequestPermissionResponse::new(RequestPermissionOutcome::Selected(
182 SelectedPermissionOutcome::new(id),
183 )),
184 ),
185 PermissionDecision::Deny => responder.respond(
186 RequestPermissionResponse::new(RequestPermissionOutcome::Cancelled),
187 ),
188 }
189 }
190 },
191 agent_client_protocol::on_receive_request!(),
192 )
193 .connect_with(agent, {
194 let status = status_inner.clone();
195 let tx = chunk_tx.clone();
196 |connection: ConnectionTo<Agent>| async move {
197 status.set(AgentStatus::Starting);
198
199 connection
200 .send_request(InitializeRequest::new(ProtocolVersion::V1))
201 .block_task()
202 .await?;
203
204 status.set(AgentStatus::Running);
205
206 connection
207 .build_session(&working_dir)
208 .block_task()
209 .run_until(async |mut session| {
210 session.send_prompt(&prompt_text)?;
211 let _ = session.read_to_string().await?;
213 let _ = tx.send(OutputChunk::Done).await;
214 Ok(())
215 })
216 .await?;
217
218 status.set(AgentStatus::Idle);
219 Ok(())
220 }
221 })
222 .await;
223
224 if let Err(e) = outcome {
225 warn!(error = %e, "streaming ACP session ended with error");
226 let _ = chunk_tx_err.send(OutputChunk::Error(e.to_string())).await;
227 }
228
229 status_inner.set(AgentStatus::Stopped);
230 });
231
232 Ok(chunk_rx)
233}