1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12pub struct QueueClient {
15 stream: UnixStream,
16 reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20 pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22 let stream = connect_ipc(session_key).await?;
23 let std_stream = stream.into_std()?;
27 let reader_std = std_stream.try_clone()?;
28 let write_stream = UnixStream::from_std(std_stream)?;
29 let read_stream = UnixStream::from_std(reader_std)?;
30
31 Ok(Self {
32 stream: write_stream,
33 reader: BufReader::new(read_stream),
34 })
35 }
36
37 pub async fn prompt(
41 &mut self,
42 messages: Vec<String>,
43 renderer: &mut dyn OutputRenderer,
44 _permission_mode: &PermissionMode,
45 ) -> Result<PromptResult> {
46 let reply_id = generate_reply_id();
48
49 let request = QueueRequest::Prompt {
51 messages,
52 reply_id: reply_id.clone(),
53 };
54 send_message(&mut self.stream, &request)
55 .await
56 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58 loop {
60 let response: Option<QueueResponse> = recv_message(&mut self.reader)
61 .await
62 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64 match response {
65 Some(QueueResponse::Queued {
66 position,
67 reply_id: _,
68 }) => {
69 renderer.session_info(&format!("Queued at position {position}"));
70 }
71 Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72 "text_chunk" => renderer.text_chunk(&data),
73 "tool_use" => renderer.tool_status(&data),
74 "tool_result" => {
75 if let Some((name, output)) = data.split_once('\x00') {
76 renderer.tool_result(name, output);
77 }
78 }
79 _ => {
80 renderer.session_info(&format!("event({kind}): {data}"));
82 }
83 },
84 Some(QueueResponse::PromptResult {
85 content,
86 stop_reason,
87 reply_id: _,
88 }) => {
89 return Ok(PromptResult {
90 content,
91 stop_reason,
92 });
93 }
94 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
95 renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
96 }
97 Some(QueueResponse::Error { message }) => {
98 return Err(AcpCliError::Agent(message));
99 }
100 Some(QueueResponse::Ok) => {
101 }
103 None => {
104 return Err(AcpCliError::Connection(
106 "queue owner disconnected".to_string(),
107 ));
108 }
109 }
110 }
111 }
112
113 pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
117 let reply_id = generate_reply_id();
118
119 let request = QueueRequest::Prompt {
120 messages,
121 reply_id: reply_id.clone(),
122 };
123 send_message(&mut self.stream, &request)
124 .await
125 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
126
127 loop {
129 let response: Option<QueueResponse> = recv_message(&mut self.reader)
130 .await
131 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
132
133 match response {
134 Some(QueueResponse::Queued { position, .. }) => {
135 return Ok(position);
136 }
137 Some(QueueResponse::Error { message }) => {
138 return Err(AcpCliError::Agent(message));
139 }
140 None => {
141 return Err(AcpCliError::Connection(
142 "queue owner disconnected before acknowledging prompt".to_string(),
143 ));
144 }
145 _ => continue,
147 }
148 }
149 }
150
151 pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
153 let request = QueueRequest::SetMode {
154 mode: mode.to_string(),
155 };
156 send_message(&mut self.stream, &request)
157 .await
158 .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
159
160 let response: Option<QueueResponse> = recv_message(&mut self.reader)
161 .await
162 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
163
164 match response {
165 Some(QueueResponse::Ok) => {
166 println!("Mode set to: {mode}");
167 Ok(())
168 }
169 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
170 _ => Err(AcpCliError::Connection(
171 "unexpected response to set-mode request".to_string(),
172 )),
173 }
174 }
175
176 pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
178 let request = QueueRequest::SetConfig {
179 key: key.to_string(),
180 value: value.to_string(),
181 };
182 send_message(&mut self.stream, &request)
183 .await
184 .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
185
186 let response: Option<QueueResponse> = recv_message(&mut self.reader)
187 .await
188 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
189
190 match response {
191 Some(QueueResponse::Ok) => {
192 println!("Config set: {key} = {value}");
193 Ok(())
194 }
195 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
196 _ => Err(AcpCliError::Connection(
197 "unexpected response to set-config request".to_string(),
198 )),
199 }
200 }
201
202 pub async fn cancel(&mut self) -> std::io::Result<()> {
204 send_message(&mut self.stream, &QueueRequest::Cancel).await
205 }
206
207 pub async fn status(&mut self) -> std::io::Result<String> {
209 send_message(&mut self.stream, &QueueRequest::Status).await?;
210
211 let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
212 match response {
213 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
214 Ok(format!("state: {state}, queue_depth: {queue_depth}"))
215 }
216 Some(QueueResponse::Error { message }) => {
217 Err(std::io::Error::other(format!("status error: {message}")))
218 }
219 _ => Err(std::io::Error::new(
220 std::io::ErrorKind::UnexpectedEof,
221 "unexpected response to status request",
222 )),
223 }
224 }
225}
226
227fn generate_reply_id() -> String {
231 use std::time::{SystemTime, UNIX_EPOCH};
232
233 let timestamp = SystemTime::now()
234 .duration_since(UNIX_EPOCH)
235 .unwrap_or_default()
236 .as_nanos();
237
238 let pid = std::process::id();
240 format!("{timestamp:x}-{pid:x}")
241}