Skip to main content

acp_cli/queue/
client.rs

1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12/// A client that connects to an existing queue owner via Unix socket and
13/// enqueues prompts for processing.
14pub struct QueueClient {
15    stream: UnixStream,
16    reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20    /// Connect to the queue owner's Unix socket for the given session key.
21    pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22        let stream = connect_ipc(session_key).await?;
23        // Split the stream: we need an owned reader half for BufReader and keep
24        // the original stream for writing. Since Unix sockets are full-duplex,
25        // we clone the underlying fd via `into_std` + `try_clone` + `from_std`.
26        let std_stream = stream.into_std()?;
27        let reader_std = std_stream.try_clone()?;
28        let write_stream = UnixStream::from_std(std_stream)?;
29        let read_stream = UnixStream::from_std(reader_std)?;
30
31        Ok(Self {
32            stream: write_stream,
33            reader: BufReader::new(read_stream),
34        })
35    }
36
37    /// Send a prompt to the queue owner and stream response events back
38    /// through the renderer. Returns the final `PromptResult` when the owner
39    /// finishes processing.
40    pub async fn prompt(
41        &mut self,
42        messages: Vec<String>,
43        renderer: &mut dyn OutputRenderer,
44        _permission_mode: &PermissionMode,
45    ) -> Result<PromptResult> {
46        // Generate a unique reply ID for correlation.
47        let reply_id = generate_reply_id();
48
49        // Send the prompt request to the queue owner.
50        let request = QueueRequest::Prompt {
51            messages,
52            reply_id: reply_id.clone(),
53        };
54        send_message(&mut self.stream, &request)
55            .await
56            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58        // Loop receiving responses until we get a PromptResult or Error.
59        loop {
60            let response: Option<QueueResponse> = recv_message(&mut self.reader)
61                .await
62                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64            match response {
65                Some(QueueResponse::Queued {
66                    position,
67                    reply_id: _,
68                }) => {
69                    renderer.session_info(&format!("Queued at position {position}"));
70                }
71                Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72                    "text_chunk" => renderer.text_chunk(&data),
73                    "tool_use" => renderer.tool_status(&data),
74                    "tool_result" => {
75                        if let Some((name, output)) = data.split_once('\x00') {
76                            renderer.tool_result(name, output);
77                        }
78                    }
79                    _ => {
80                        // Unknown event kind — log as info and continue.
81                        renderer.session_info(&format!("event({kind}): {data}"));
82                    }
83                },
84                Some(QueueResponse::PromptResult {
85                    content,
86                    stop_reason,
87                    reply_id: _,
88                }) => {
89                    return Ok(PromptResult {
90                        content,
91                        stop_reason,
92                    });
93                }
94                Some(QueueResponse::StatusResponse { state, queue_depth }) => {
95                    renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
96                }
97                Some(QueueResponse::Error { message }) => {
98                    return Err(AcpCliError::Agent(message));
99                }
100                Some(QueueResponse::Ok) => {
101                    // Acknowledgement for non-prompt commands; skip.
102                }
103                None => {
104                    // Stream closed unexpectedly.
105                    return Err(AcpCliError::Connection(
106                        "queue owner disconnected".to_string(),
107                    ));
108                }
109            }
110        }
111    }
112
113    /// Send a prompt and wait only for the `Queued` acknowledgement, returning
114    /// the queue position. This is used by `--no-wait` mode so the CLI can exit
115    /// immediately after confirming the prompt was accepted.
116    pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
117        let reply_id = generate_reply_id();
118
119        let request = QueueRequest::Prompt {
120            messages,
121            reply_id: reply_id.clone(),
122        };
123        send_message(&mut self.stream, &request)
124            .await
125            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
126
127        // Wait only for the Queued response, then return immediately.
128        loop {
129            let response: Option<QueueResponse> = recv_message(&mut self.reader)
130                .await
131                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
132
133            match response {
134                Some(QueueResponse::Queued { position, .. }) => {
135                    return Ok(position);
136                }
137                Some(QueueResponse::Error { message }) => {
138                    return Err(AcpCliError::Agent(message));
139                }
140                None => {
141                    return Err(AcpCliError::Connection(
142                        "queue owner disconnected before acknowledging prompt".to_string(),
143                    ));
144                }
145                // Skip any other messages that arrive before the Queued ack.
146                _ => continue,
147            }
148        }
149    }
150
151    /// Send a set-mode request to the queue owner.
152    pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
153        let request = QueueRequest::SetMode {
154            mode: mode.to_string(),
155        };
156        send_message(&mut self.stream, &request)
157            .await
158            .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
159
160        let response: Option<QueueResponse> = recv_message(&mut self.reader)
161            .await
162            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
163
164        match response {
165            Some(QueueResponse::Ok) => {
166                println!("Mode set to: {mode}");
167                Ok(())
168            }
169            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
170            _ => Err(AcpCliError::Connection(
171                "unexpected response to set-mode request".to_string(),
172            )),
173        }
174    }
175
176    /// Send a set-config request to the queue owner.
177    pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
178        let request = QueueRequest::SetConfig {
179            key: key.to_string(),
180            value: value.to_string(),
181        };
182        send_message(&mut self.stream, &request)
183            .await
184            .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
185
186        let response: Option<QueueResponse> = recv_message(&mut self.reader)
187            .await
188            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
189
190        match response {
191            Some(QueueResponse::Ok) => {
192                println!("Config set: {key} = {value}");
193                Ok(())
194            }
195            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
196            _ => Err(AcpCliError::Connection(
197                "unexpected response to set-config request".to_string(),
198            )),
199        }
200    }
201
202    /// Send a cancel request to the queue owner.
203    pub async fn cancel(&mut self) -> std::io::Result<()> {
204        send_message(&mut self.stream, &QueueRequest::Cancel).await
205    }
206
207    /// Request status from the queue owner.
208    pub async fn status(&mut self) -> std::io::Result<String> {
209        send_message(&mut self.stream, &QueueRequest::Status).await?;
210
211        let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
212        match response {
213            Some(QueueResponse::StatusResponse { state, queue_depth }) => {
214                Ok(format!("state: {state}, queue_depth: {queue_depth}"))
215            }
216            Some(QueueResponse::Error { message }) => {
217                Err(std::io::Error::other(format!("status error: {message}")))
218            }
219            _ => Err(std::io::Error::new(
220                std::io::ErrorKind::UnexpectedEof,
221                "unexpected response to status request",
222            )),
223        }
224    }
225}
226
227/// Generate a random reply ID using timestamp and random bytes.
228///
229/// Uses a simple hex-encoded format without requiring the `uuid` crate.
230fn generate_reply_id() -> String {
231    use std::time::{SystemTime, UNIX_EPOCH};
232
233    let timestamp = SystemTime::now()
234        .duration_since(UNIX_EPOCH)
235        .unwrap_or_default()
236        .as_nanos();
237
238    // Mix timestamp with process id and a simple counter for uniqueness.
239    let pid = std::process::id();
240    format!("{timestamp:x}-{pid:x}")
241}