1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12pub struct QueueClient {
15 stream: UnixStream,
16 reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20 pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22 let stream = connect_ipc(session_key).await?;
23 let std_stream = stream.into_std()?;
27 let reader_std = std_stream.try_clone()?;
28 let write_stream = UnixStream::from_std(std_stream)?;
29 let read_stream = UnixStream::from_std(reader_std)?;
30
31 Ok(Self {
32 stream: write_stream,
33 reader: BufReader::new(read_stream),
34 })
35 }
36
37 pub async fn prompt(
41 &mut self,
42 messages: Vec<String>,
43 renderer: &mut dyn OutputRenderer,
44 _permission_mode: &PermissionMode,
45 ) -> Result<PromptResult> {
46 let reply_id = generate_reply_id();
48
49 let request = QueueRequest::Prompt {
51 messages,
52 reply_id: reply_id.clone(),
53 };
54 send_message(&mut self.stream, &request)
55 .await
56 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58 loop {
60 let response: Option<QueueResponse> = recv_message(&mut self.reader)
61 .await
62 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64 match response {
65 Some(QueueResponse::Queued {
66 position,
67 reply_id: _,
68 }) => {
69 renderer.session_info(&format!("Queued at position {position}"));
70 }
71 Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72 "text_chunk" => renderer.text_chunk(&data),
73 "tool_use" => renderer.tool_status(&data),
74 _ => {
75 renderer.session_info(&format!("event({kind}): {data}"));
77 }
78 },
79 Some(QueueResponse::PromptResult {
80 content,
81 stop_reason,
82 reply_id: _,
83 }) => {
84 return Ok(PromptResult {
85 content,
86 stop_reason,
87 });
88 }
89 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
90 renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
91 }
92 Some(QueueResponse::Error { message }) => {
93 return Err(AcpCliError::Agent(message));
94 }
95 Some(QueueResponse::Ok) => {
96 }
98 None => {
99 return Err(AcpCliError::Connection(
101 "queue owner disconnected".to_string(),
102 ));
103 }
104 }
105 }
106 }
107
108 pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
112 let reply_id = generate_reply_id();
113
114 let request = QueueRequest::Prompt {
115 messages,
116 reply_id: reply_id.clone(),
117 };
118 send_message(&mut self.stream, &request)
119 .await
120 .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
121
122 loop {
124 let response: Option<QueueResponse> = recv_message(&mut self.reader)
125 .await
126 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
127
128 match response {
129 Some(QueueResponse::Queued { position, .. }) => {
130 return Ok(position);
131 }
132 Some(QueueResponse::Error { message }) => {
133 return Err(AcpCliError::Agent(message));
134 }
135 None => {
136 return Err(AcpCliError::Connection(
137 "queue owner disconnected before acknowledging prompt".to_string(),
138 ));
139 }
140 _ => continue,
142 }
143 }
144 }
145
146 pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
148 let request = QueueRequest::SetMode {
149 mode: mode.to_string(),
150 };
151 send_message(&mut self.stream, &request)
152 .await
153 .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
154
155 let response: Option<QueueResponse> = recv_message(&mut self.reader)
156 .await
157 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
158
159 match response {
160 Some(QueueResponse::Ok) => {
161 println!("Mode set to: {mode}");
162 Ok(())
163 }
164 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
165 _ => Err(AcpCliError::Connection(
166 "unexpected response to set-mode request".to_string(),
167 )),
168 }
169 }
170
171 pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
173 let request = QueueRequest::SetConfig {
174 key: key.to_string(),
175 value: value.to_string(),
176 };
177 send_message(&mut self.stream, &request)
178 .await
179 .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
180
181 let response: Option<QueueResponse> = recv_message(&mut self.reader)
182 .await
183 .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
184
185 match response {
186 Some(QueueResponse::Ok) => {
187 println!("Config set: {key} = {value}");
188 Ok(())
189 }
190 Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
191 _ => Err(AcpCliError::Connection(
192 "unexpected response to set-config request".to_string(),
193 )),
194 }
195 }
196
197 pub async fn cancel(&mut self) -> std::io::Result<()> {
199 send_message(&mut self.stream, &QueueRequest::Cancel).await
200 }
201
202 pub async fn status(&mut self) -> std::io::Result<String> {
204 send_message(&mut self.stream, &QueueRequest::Status).await?;
205
206 let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
207 match response {
208 Some(QueueResponse::StatusResponse { state, queue_depth }) => {
209 Ok(format!("state: {state}, queue_depth: {queue_depth}"))
210 }
211 Some(QueueResponse::Error { message }) => {
212 Err(std::io::Error::other(format!("status error: {message}")))
213 }
214 _ => Err(std::io::Error::new(
215 std::io::ErrorKind::UnexpectedEof,
216 "unexpected response to status request",
217 )),
218 }
219 }
220}
221
222fn generate_reply_id() -> String {
226 use std::time::{SystemTime, UNIX_EPOCH};
227
228 let timestamp = SystemTime::now()
229 .duration_since(UNIX_EPOCH)
230 .unwrap_or_default()
231 .as_nanos();
232
233 let pid = std::process::id();
235 format!("{timestamp:x}-{pid:x}")
236}