Skip to main content

acp_cli/queue/
client.rs

1use tokio::io::BufReader;
2use tokio::net::UnixStream;
3
4use crate::bridge::events::PromptResult;
5use crate::client::permissions::PermissionMode;
6use crate::error::{AcpCliError, Result};
7use crate::output::OutputRenderer;
8
9use super::ipc::{connect_ipc, recv_message, send_message};
10use super::messages::{QueueRequest, QueueResponse};
11
12/// A client that connects to an existing queue owner via Unix socket and
13/// enqueues prompts for processing.
14pub struct QueueClient {
15    stream: UnixStream,
16    reader: BufReader<UnixStream>,
17}
18
19impl QueueClient {
20    /// Connect to the queue owner's Unix socket for the given session key.
21    pub async fn connect(session_key: &str) -> std::io::Result<Self> {
22        let stream = connect_ipc(session_key).await?;
23        // Split the stream: we need an owned reader half for BufReader and keep
24        // the original stream for writing. Since Unix sockets are full-duplex,
25        // we clone the underlying fd via `into_std` + `try_clone` + `from_std`.
26        let std_stream = stream.into_std()?;
27        let reader_std = std_stream.try_clone()?;
28        let write_stream = UnixStream::from_std(std_stream)?;
29        let read_stream = UnixStream::from_std(reader_std)?;
30
31        Ok(Self {
32            stream: write_stream,
33            reader: BufReader::new(read_stream),
34        })
35    }
36
37    /// Send a prompt to the queue owner and stream response events back
38    /// through the renderer. Returns the final `PromptResult` when the owner
39    /// finishes processing.
40    pub async fn prompt(
41        &mut self,
42        messages: Vec<String>,
43        renderer: &mut dyn OutputRenderer,
44        _permission_mode: &PermissionMode,
45    ) -> Result<PromptResult> {
46        // Generate a unique reply ID for correlation.
47        let reply_id = generate_reply_id();
48
49        // Send the prompt request to the queue owner.
50        let request = QueueRequest::Prompt {
51            messages,
52            reply_id: reply_id.clone(),
53        };
54        send_message(&mut self.stream, &request)
55            .await
56            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
57
58        // Loop receiving responses until we get a PromptResult or Error.
59        loop {
60            let response: Option<QueueResponse> = recv_message(&mut self.reader)
61                .await
62                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
63
64            match response {
65                Some(QueueResponse::Queued {
66                    position,
67                    reply_id: _,
68                }) => {
69                    renderer.session_info(&format!("Queued at position {position}"));
70                }
71                Some(QueueResponse::Event { kind, data }) => match kind.as_str() {
72                    "text_chunk" => renderer.text_chunk(&data),
73                    "tool_use" => renderer.tool_status(&data),
74                    _ => {
75                        // Unknown event kind — log as info and continue.
76                        renderer.session_info(&format!("event({kind}): {data}"));
77                    }
78                },
79                Some(QueueResponse::PromptResult {
80                    content,
81                    stop_reason,
82                    reply_id: _,
83                }) => {
84                    return Ok(PromptResult {
85                        content,
86                        stop_reason,
87                    });
88                }
89                Some(QueueResponse::StatusResponse { state, queue_depth }) => {
90                    renderer.session_info(&format!("status: {state}, depth: {queue_depth}"));
91                }
92                Some(QueueResponse::Error { message }) => {
93                    return Err(AcpCliError::Agent(message));
94                }
95                Some(QueueResponse::Ok) => {
96                    // Acknowledgement for non-prompt commands; skip.
97                }
98                None => {
99                    // Stream closed unexpectedly.
100                    return Err(AcpCliError::Connection(
101                        "queue owner disconnected".to_string(),
102                    ));
103                }
104            }
105        }
106    }
107
108    /// Send a prompt and wait only for the `Queued` acknowledgement, returning
109    /// the queue position. This is used by `--no-wait` mode so the CLI can exit
110    /// immediately after confirming the prompt was accepted.
111    pub async fn enqueue_only(&mut self, messages: Vec<String>) -> Result<usize> {
112        let reply_id = generate_reply_id();
113
114        let request = QueueRequest::Prompt {
115            messages,
116            reply_id: reply_id.clone(),
117        };
118        send_message(&mut self.stream, &request)
119            .await
120            .map_err(|e| AcpCliError::Connection(format!("failed to send prompt: {e}")))?;
121
122        // Wait only for the Queued response, then return immediately.
123        loop {
124            let response: Option<QueueResponse> = recv_message(&mut self.reader)
125                .await
126                .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
127
128            match response {
129                Some(QueueResponse::Queued { position, .. }) => {
130                    return Ok(position);
131                }
132                Some(QueueResponse::Error { message }) => {
133                    return Err(AcpCliError::Agent(message));
134                }
135                None => {
136                    return Err(AcpCliError::Connection(
137                        "queue owner disconnected before acknowledging prompt".to_string(),
138                    ));
139                }
140                // Skip any other messages that arrive before the Queued ack.
141                _ => continue,
142            }
143        }
144    }
145
146    /// Send a set-mode request to the queue owner.
147    pub async fn set_mode(&mut self, mode: &str) -> Result<()> {
148        let request = QueueRequest::SetMode {
149            mode: mode.to_string(),
150        };
151        send_message(&mut self.stream, &request)
152            .await
153            .map_err(|e| AcpCliError::Connection(format!("failed to send set-mode: {e}")))?;
154
155        let response: Option<QueueResponse> = recv_message(&mut self.reader)
156            .await
157            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
158
159        match response {
160            Some(QueueResponse::Ok) => {
161                println!("Mode set to: {mode}");
162                Ok(())
163            }
164            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
165            _ => Err(AcpCliError::Connection(
166                "unexpected response to set-mode request".to_string(),
167            )),
168        }
169    }
170
171    /// Send a set-config request to the queue owner.
172    pub async fn set_config(&mut self, key: &str, value: &str) -> Result<()> {
173        let request = QueueRequest::SetConfig {
174            key: key.to_string(),
175            value: value.to_string(),
176        };
177        send_message(&mut self.stream, &request)
178            .await
179            .map_err(|e| AcpCliError::Connection(format!("failed to send set-config: {e}")))?;
180
181        let response: Option<QueueResponse> = recv_message(&mut self.reader)
182            .await
183            .map_err(|e| AcpCliError::Connection(format!("failed to read response: {e}")))?;
184
185        match response {
186            Some(QueueResponse::Ok) => {
187                println!("Config set: {key} = {value}");
188                Ok(())
189            }
190            Some(QueueResponse::Error { message }) => Err(AcpCliError::Agent(message)),
191            _ => Err(AcpCliError::Connection(
192                "unexpected response to set-config request".to_string(),
193            )),
194        }
195    }
196
197    /// Send a cancel request to the queue owner.
198    pub async fn cancel(&mut self) -> std::io::Result<()> {
199        send_message(&mut self.stream, &QueueRequest::Cancel).await
200    }
201
202    /// Request status from the queue owner.
203    pub async fn status(&mut self) -> std::io::Result<String> {
204        send_message(&mut self.stream, &QueueRequest::Status).await?;
205
206        let response: Option<QueueResponse> = recv_message(&mut self.reader).await?;
207        match response {
208            Some(QueueResponse::StatusResponse { state, queue_depth }) => {
209                Ok(format!("state: {state}, queue_depth: {queue_depth}"))
210            }
211            Some(QueueResponse::Error { message }) => {
212                Err(std::io::Error::other(format!("status error: {message}")))
213            }
214            _ => Err(std::io::Error::new(
215                std::io::ErrorKind::UnexpectedEof,
216                "unexpected response to status request",
217            )),
218        }
219    }
220}
221
222/// Generate a random reply ID using timestamp and random bytes.
223///
224/// Uses a simple hex-encoded format without requiring the `uuid` crate.
225fn generate_reply_id() -> String {
226    use std::time::{SystemTime, UNIX_EPOCH};
227
228    let timestamp = SystemTime::now()
229        .duration_since(UNIX_EPOCH)
230        .unwrap_or_default()
231        .as_nanos();
232
233    // Mix timestamp with process id and a simple counter for uniqueness.
234    let pid = std::process::id();
235    format!("{timestamp:x}-{pid:x}")
236}