1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12pub struct QueueClient {
15 stream: UnixStream,
16 reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20 pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22 let stream = connect_ipc(session_key).await?;
23 let std_stream = stream.into_std()?;
27 let reader_std = std_stream.try_clone()?;
28 let write_stream = UnixStream::from_std(std_stream)?;
29 let read_stream = UnixStream::from_std(reader_std)?;
30
31 Ok(Self {
32 stream: write_stream,
33 reader: BufReader::new(read_stream),
34 })
35 }
36
37 pub async fn prompt(
41 &mut self,
42 messages: Vec<String>,
43 renderer: &mut dyn OutputRenderer,
44 _permission_mode: &PermissionMode,
45 ) -> Result<PromptResult> {
46 let reply_id = generate_reply_id();
48
49 let request = QueueRequest::Prompt {
51 messages,
52 reply_id: reply_id.clone(),
53 };
54 send_message(&mut self.stream, &request)
55 .await
56 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58 loop {
60 let response: Option<QueueResponse> = recv_message(&mut self.reader)
61 .await
62 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64 match response {
65 Some(QueueResponse::Queued {
66 position,
67 reply_id: _,
68 }) => {
69 renderer.session_info(&format!("Queued at position {position}"));
70 }
71 Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72 "text_chunk" => renderer.text_chunk(&data),
73 "tool_use" => renderer.tool_status(&data),
74 "tool_result" => {
75 if let Some((name, is_read, output)) = parse_tool_result_data(&data) {
76 renderer.tool_result(name, output, is_read);
77 }
78 }
79 "prompt_done" => {
80 }
82 _ => {
83 renderer.session_info(&format!("event({kind}): {data}"));
85 }
86 },
87 Some(QueueResponse::PromptResult {
88 content,
89 stop_reason,
90 reply_id: _,
91 }) => {
92 return Ok(PromptResult {
93 content,
94 stop_reason,
95 });
96 }
97 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
98 renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
99 }
100 Some(QueueResponse::Error { message }) => {
101 return Err(AcpCliError::Agent(message));
102 }
103 Some(QueueResponse::Ok) => {
104 }
106 None => {
107 return Err(AcpCliError::Connection(
109 "queue owner disconnected".to_string(),
110 ));
111 }
112 }
113 }
114 }
115
116 pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
120 let reply_id = generate_reply_id();
121
122 let request = QueueRequest::Prompt {
123 messages,
124 reply_id: reply_id.clone(),
125 };
126 send_message(&mut self.stream, &request)
127 .await
128 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
129
130 loop {
132 let response: Option<QueueResponse> = recv_message(&mut self.reader)
133 .await
134 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
135
136 match response {
137 Some(QueueResponse::Queued { position, .. }) => {
138 return Ok(position);
139 }
140 Some(QueueResponse::Error { message }) => {
141 return Err(AcpCliError::Agent(message));
142 }
143 None => {
144 return Err(AcpCliError::Connection(
145 "queue owner disconnected before acknowledging prompt".to_string(),
146 ));
147 }
148 _ => continue,
150 }
151 }
152 }
153
154 pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
156 let request = QueueRequest::SetMode {
157 mode: mode.to_string(),
158 };
159 send_message(&mut self.stream, &request)
160 .await
161 .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
162
163 let response: Option<QueueResponse> = recv_message(&mut self.reader)
164 .await
165 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
166
167 match response {
168 Some(QueueResponse::Ok) => {
169 println!("Mode set to: {mode}");
170 Ok(())
171 }
172 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
173 _ => Err(AcpCliError::Connection(
174 "unexpected response to set-mode request".to_string(),
175 )),
176 }
177 }
178
179 pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
181 let request = QueueRequest::SetConfig {
182 key: key.to_string(),
183 value: value.to_string(),
184 };
185 send_message(&mut self.stream, &request)
186 .await
187 .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
188
189 let response: Option<QueueResponse> = recv_message(&mut self.reader)
190 .await
191 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
192
193 match response {
194 Some(QueueResponse::Ok) => {
195 println!("Config set: {key} = {value}");
196 Ok(())
197 }
198 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
199 _ => Err(AcpCliError::Connection(
200 "unexpected response to set-config request".to_string(),
201 )),
202 }
203 }
204
205 pub async fn cancel(&mut self) -> std::io::Result<()> {
207 send_message(&mut self.stream, &QueueRequest::Cancel).await
208 }
209
210 pub async fn status(&mut self) -> std::io::Result<String> {
212 send_message(&mut self.stream, &QueueRequest::Status).await?;
213
214 let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
215 match response {
216 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
217 Ok(format!("state: {state}, queue_depth: {queue_depth}"))
218 }
219 Some(QueueResponse::Error { message }) => {
220 Err(std::io::Error::other(format!("status error: {message}")))
221 }
222 _ => Err(std::io::Error::new(
223 std::io::ErrorKind::UnexpectedEof,
224 "unexpected response to status request",
225 )),
226 }
227 }
228}
229
230fn parse_tool_result_data(data: &str) -> Option<(&str, bool, &str)> {
233 let mut parts = data.splitn(3, '\x00');
234 let name = parts.next()?;
235 let is_read = parts.next()? == "1";
236 let output = parts.next()?;
237 Some((name, is_read, output))
238}
239
240fn generate_reply_id() -> String {
244 use std::time::{SystemTime, UNIX_EPOCH};
245
246 let timestamp = SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .unwrap_or_default()
249 .as_nanos();
250
251 let pid = std::process::id();
253 format!("{timestamp:x}-{pid:x}")
254}
255
256#[cfg(test)]
257mod tests {
258 use super::parse_tool_result_data;
259
260 #[test]
261 fn parse_tool_result_is_read_true() {
262 let (name, is_read, output) =
263 parse_tool_result_data("Read File\x001\x00file content here").unwrap();
264 assert_eq!(name, "Read File");
265 assert!(is_read);
266 assert_eq!(output, "file content here");
267 }
268
269 #[test]
270 fn parse_tool_result_is_read_false() {
271 let (name, is_read, output) =
272 parse_tool_result_data("Bash\x000\x00command output").unwrap();
273 assert_eq!(name, "Bash");
274 assert!(!is_read);
275 assert_eq!(output, "command output");
276 }
277
278 #[test]
279 fn parse_tool_result_output_may_contain_null_bytes() {
280 let (name, is_read, output) =
282 parse_tool_result_data("Read File\x001\x00line1\x00line2").unwrap();
283 assert_eq!(name, "Read File");
284 assert!(is_read);
285 assert_eq!(output, "line1\x00line2");
286 }
287
288 #[test]
289 fn parse_tool_result_malformed_returns_none() {
290 assert!(parse_tool_result_data("no-nulls-here").is_none());
291 assert!(parse_tool_result_data("only\x00one").is_none());
292 }
293}