Skip to main content

acp_cli/queue/
client.rs

1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12/// A client that connects to an existing queue owner via Unix socket and
13/// enqueues prompts for processing.
14pub struct QueueClient {
15    stream: UnixStream,
16    reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20    /// Connect to the queue owner's Unix socket for the given session key.
21    pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22        let stream = connect_ipc(session_key).await?;
23        // Split the stream: we need an owned reader half for BufReader and keep
24        // the original stream for writing. Since Unix sockets are full-duplex,
25        // we clone the underlying fd via `into_std` + `try_clone` + `from_std`.
26        let std_stream = stream.into_std()?;
27        let reader_std = std_stream.try_clone()?;
28        let write_stream = UnixStream::from_std(std_stream)?;
29        let read_stream = UnixStream::from_std(reader_std)?;
30
31        Ok(Self {
32            stream: write_stream,
33            reader: BufReader::new(read_stream),
34        })
35    }
36
37    /// Send a prompt to the queue owner and stream response events back
38    /// through the renderer. Returns the final `PromptResult` when the owner
39    /// finishes processing.
40    pub async fn prompt(
41        &mut self,
42        messages: Vec<String>,
43        renderer: &mut dyn OutputRenderer,
44        _permission_mode: &PermissionMode,
45    ) -> Result<PromptResult> {
46        // Generate a unique reply ID for correlation.
47        let reply_id = generate_reply_id();
48
49        // Send the prompt request to the queue owner.
50        let request = QueueRequest::Prompt {
51            messages,
52            reply_id: reply_id.clone(),
53        };
54        send_message(&mut self.stream, &request)
55            .await
56            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58        // Loop receiving responses until we get a PromptResult or Error.
59        loop {
60            let response: Option<QueueResponse> = recv_message(&mut self.reader)
61                .await
62                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64            match response {
65                Some(QueueResponse::Queued {
66                    position,
67                    reply_id: _,
68                }) => {
69                    renderer.session_info(&format!("Queued at position {position}"));
70                }
71                Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72                    "text_chunk" => renderer.text_chunk(&data),
73                    "tool_use" => renderer.tool_status(&data),
74                    "tool_result" => {
75                        if let Some((name, is_read, output)) = parse_tool_result_data(&data) {
76                            renderer.tool_result(name, output, is_read);
77                        }
78                    }
79                    "prompt_done" => {
80                        // Informational — PromptResult handles termination.
81                    }
82                    _ => {
83                        // Unknown event kind — log as info and continue.
84                        renderer.session_info(&format!("event({kind}): {data}"));
85                    }
86                },
87                Some(QueueResponse::PromptResult {
88                    content,
89                    stop_reason,
90                    reply_id: _,
91                }) => {
92                    return Ok(PromptResult {
93                        content,
94                        stop_reason,
95                    });
96                }
97                Some(QueueResponse::StatusResponse { state, queue_depth }) => {
98                    renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
99                }
100                Some(QueueResponse::Error { message }) => {
101                    return Err(AcpCliError::Agent(message));
102                }
103                Some(QueueResponse::Ok) => {
104                    // Acknowledgement for non-prompt commands; skip.
105                }
106                None => {
107                    // Stream closed unexpectedly.
108                    return Err(AcpCliError::Connection(
109                        "queue owner disconnected".to_string(),
110                    ));
111                }
112            }
113        }
114    }
115
116    /// Send a prompt and wait only for the `Queued` acknowledgement, returning
117    /// the queue position. This is used by `--no-wait` mode so the CLI can exit
118    /// immediately after confirming the prompt was accepted.
119    pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
120        let reply_id = generate_reply_id();
121
122        let request = QueueRequest::Prompt {
123            messages,
124            reply_id: reply_id.clone(),
125        };
126        send_message(&mut self.stream, &request)
127            .await
128            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
129
130        // Wait only for the Queued response, then return immediately.
131        loop {
132            let response: Option<QueueResponse> = recv_message(&mut self.reader)
133                .await
134                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
135
136            match response {
137                Some(QueueResponse::Queued { position, .. }) => {
138                    return Ok(position);
139                }
140                Some(QueueResponse::Error { message }) => {
141                    return Err(AcpCliError::Agent(message));
142                }
143                None => {
144                    return Err(AcpCliError::Connection(
145                        "queue owner disconnected before acknowledging prompt".to_string(),
146                    ));
147                }
148                // Skip any other messages that arrive before the Queued ack.
149                _ => continue,
150            }
151        }
152    }
153
154    /// Send a set-mode request to the queue owner.
155    pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
156        let request = QueueRequest::SetMode {
157            mode: mode.to_string(),
158        };
159        send_message(&mut self.stream, &request)
160            .await
161            .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
162
163        let response: Option<QueueResponse> = recv_message(&mut self.reader)
164            .await
165            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
166
167        match response {
168            Some(QueueResponse::Ok) => {
169                println!("Mode set to: {mode}");
170                Ok(())
171            }
172            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
173            _ => Err(AcpCliError::Connection(
174                "unexpected response to set-mode request".to_string(),
175            )),
176        }
177    }
178
179    /// Send a set-config request to the queue owner.
180    pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
181        let request = QueueRequest::SetConfig {
182            key: key.to_string(),
183            value: value.to_string(),
184        };
185        send_message(&mut self.stream, &request)
186            .await
187            .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
188
189        let response: Option<QueueResponse> = recv_message(&mut self.reader)
190            .await
191            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
192
193        match response {
194            Some(QueueResponse::Ok) => {
195                println!("Config set: {key} = {value}");
196                Ok(())
197            }
198            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
199            _ => Err(AcpCliError::Connection(
200                "unexpected response to set-config request".to_string(),
201            )),
202        }
203    }
204
205    /// Send a cancel request to the queue owner.
206    pub async fn cancel(&mut self) -> std::io::Result<()> {
207        send_message(&mut self.stream, &QueueRequest::Cancel).await
208    }
209
210    /// Request status from the queue owner.
211    pub async fn status(&mut self) -> std::io::Result<String> {
212        send_message(&mut self.stream, &QueueRequest::Status).await?;
213
214        let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
215        match response {
216            Some(QueueResponse::StatusResponse { state, queue_depth }) => {
217                Ok(format!("state: {state}, queue_depth: {queue_depth}"))
218            }
219            Some(QueueResponse::Error { message }) => {
220                Err(std::io::Error::other(format!("status error: {message}")))
221            }
222            _ => Err(std::io::Error::new(
223                std::io::ErrorKind::UnexpectedEof,
224                "unexpected response to status request",
225            )),
226        }
227    }
228}
229
230/// Parse a `tool_result` IPC data string into `(name, is_read, output)`.
231/// Format: `"name\x00{0|1}\x00output"` where `1` means `is_read`.
232fn parse_tool_result_data(data: &str) -> Option<(&str, bool, &str)> {
233    let mut parts = data.splitn(3, '\x00');
234    let name = parts.next()?;
235    let is_read = parts.next()? == "1";
236    let output = parts.next()?;
237    Some((name, is_read, output))
238}
239
240/// Generate a random reply ID using timestamp and random bytes.
241///
242/// Uses a simple hex-encoded format without requiring the `uuid` crate.
243fn generate_reply_id() -> String {
244    use std::time::{SystemTime, UNIX_EPOCH};
245
246    let timestamp = SystemTime::now()
247        .duration_since(UNIX_EPOCH)
248        .unwrap_or_default()
249        .as_nanos();
250
251    // Mix timestamp with process id and a simple counter for uniqueness.
252    let pid = std::process::id();
253    format!("{timestamp:x}-{pid:x}")
254}
255
256#[cfg(test)]
257mod tests {
258    use super::parse_tool_result_data;
259
260    #[test]
261    fn parse_tool_result_is_read_true() {
262        let (name, is_read, output) =
263            parse_tool_result_data("Read File\x001\x00file content here").unwrap();
264        assert_eq!(name, "Read File");
265        assert!(is_read);
266        assert_eq!(output, "file content here");
267    }
268
269    #[test]
270    fn parse_tool_result_is_read_false() {
271        let (name, is_read, output) =
272            parse_tool_result_data("Bash\x000\x00command output").unwrap();
273        assert_eq!(name, "Bash");
274        assert!(!is_read);
275        assert_eq!(output, "command output");
276    }
277
278    #[test]
279    fn parse_tool_result_output_may_contain_null_bytes() {
280        // Output itself may contain \x00; splitn(3) must not split beyond the third field.
281        let (name, is_read, output) =
282            parse_tool_result_data("Read File\x001\x00line1\x00line2").unwrap();
283        assert_eq!(name, "Read File");
284        assert!(is_read);
285        assert_eq!(output, "line1\x00line2");
286    }
287
288    #[test]
289    fn parse_tool_result_malformed_returns_none() {
290        assert!(parse_tool_result_data("no-nulls-here").is_none());
291        assert!(parse_tool_result_data("only\x00one").is_none());
292    }
293}