Skip to main content

acp_cli/queue/
owner.rs

1use std::collections::VecDeque;
2use std::time::Duration;
3
4use tokio::io::BufReader;
5use tokio::net::UnixListener;
6use tokio::net::UnixStream;
7
8use crate::bridge::AcpBridge;
9use crate::bridge::events::BridgeEvent;
10use crate::error::Result;
11use crate::queue::ipc::{recv_message, send_message};
12use crate::queue::lease::LeaseFile;
13use crate::queue::messages::{QueueRequest, QueueResponse};
14
15/// A queued prompt waiting to be executed.
16struct PendingPrompt {
17    messages: Vec<String>,
18    reply_id: String,
19    client: UnixStream,
20}
21
22/// The queue owner process — owns the ACP bridge and multiplexes incoming
23/// prompts from IPC clients, executing them sequentially.
24pub struct QueueOwner {
25    bridge: AcpBridge,
26    listener: UnixListener,
27    session_key: String,
28    ttl_secs: u64,
29}
30
31impl QueueOwner {
32    /// Create a new queue owner.
33    ///
34    /// The caller must have already written the lease file and started the IPC
35    /// server (`UnixListener`).
36    pub async fn new(
37        bridge: AcpBridge,
38        listener: UnixListener,
39        session_key: &str,
40        ttl_secs: u64,
41    ) -> Result<Self> {
42        Ok(Self {
43            bridge,
44            listener,
45            session_key: session_key.to_string(),
46            ttl_secs,
47        })
48    }
49
50    /// Run the owner event loop.
51    ///
52    /// Uses `tokio::select!` to concurrently handle:
53    /// 1. New client connections from the Unix socket.
54    /// 2. Bridge events (forwarded to the active prompt's client).
55    /// 3. Heartbeat timer (updates lease file every 5 seconds).
56    /// 4. Idle timeout (shuts down after `ttl_secs` with an empty queue).
57    pub async fn run(mut self) -> Result<()> {
58        let mut queue: VecDeque<PendingPrompt> = VecDeque::new();
59        let mut active_client: Option<UnixStream> = None;
60        let mut active_reply_id: Option<String> = None;
61        let mut prompt_reply: Option<
62            tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
63        > = None;
64
65        let heartbeat_interval = Duration::from_secs(5);
66        let mut heartbeat_timer = tokio::time::interval(heartbeat_interval);
67        heartbeat_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
68        // Consume the first immediate tick.
69        heartbeat_timer.tick().await;
70
71        let idle_duration = Duration::from_secs(self.ttl_secs);
72        let idle_deadline = tokio::time::sleep(idle_duration);
73        tokio::pin!(idle_deadline);
74
75        loop {
76            tokio::select! {
77                // --- 1. Accept new client connections ---
78                accept_result = self.listener.accept() => {
79                    match accept_result {
80                        Ok((stream, _addr)) => {
81                            // Reset idle timer on any connection.
82                            idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
83                            self.handle_client(stream, &mut queue, &mut active_client, &mut active_reply_id, &mut prompt_reply).await;
84                        }
85                        Err(e) => {
86                            eprintln!("[queue-owner] accept error: {e}");
87                        }
88                    }
89                }
90
91                // --- 2. Bridge events (forward to active client) ---
92                event = self.bridge.evt_rx.recv() => {
93                    match event {
94                        Some(evt) => {
95                            self.forward_event(&evt, &mut active_client).await;
96                        }
97                        None => {
98                            // Bridge channel closed — agent died.
99                            eprintln!("[queue-owner] bridge closed, shutting down");
100                            break;
101                        }
102                    }
103                }
104
105                // --- 3. Prompt completion ---
106                result = async {
107                    match prompt_reply.as_mut() {
108                        Some(rx) => rx.await,
109                        None => std::future::pending().await,
110                    }
111                } => {
112                    prompt_reply = None;
113                    let (content, stop_reason) = match result {
114                        Ok(Ok(pr)) => (pr.content, pr.stop_reason),
115                        Ok(Err(e)) => {
116                            if let Some(client) = active_client.as_mut() {
117                                let _ = send_message(client, &QueueResponse::Error {
118                                    message: e.to_string(),
119                                }).await;
120                            }
121                            (String::new(), "error".to_string())
122                        }
123                        Err(_) => {
124                            if let Some(client) = active_client.as_mut() {
125                                let _ = send_message(client, &QueueResponse::Error {
126                                    message: "bridge reply dropped".to_string(),
127                                }).await;
128                            }
129                            (String::new(), "error".to_string())
130                        }
131                    };
132
133                    // Send PromptResult to the active client.
134                    if let Some(client) = active_client.as_mut() {
135                        let reply_id = active_reply_id.take().unwrap_or_default();
136                        let _ = send_message(client, &QueueResponse::PromptResult {
137                            reply_id,
138                            content,
139                            stop_reason,
140                        }).await;
141                    }
142                    active_client = None;
143                    active_reply_id = None;
144
145                    // Process the next prompt in the queue.
146                    self.dispatch_next(&mut queue, &mut active_client, &mut active_reply_id, &mut prompt_reply).await;
147
148                    // If queue is empty after dispatch, reset idle timer.
149                    if queue.is_empty() && active_client.is_none() {
150                        idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
151                    }
152                }
153
154                // --- 4. Heartbeat timer ---
155                _ = heartbeat_timer.tick() => {
156                    if let Err(e) = LeaseFile::update_heartbeat(&self.session_key) {
157                        eprintln!("[queue-owner] heartbeat error: {e}");
158                    }
159                }
160
161                // --- 5. Idle timeout ---
162                _ = &mut idle_deadline => {
163                    if queue.is_empty() && active_client.is_none() {
164                        eprintln!("[queue-owner] idle timeout ({} s), shutting down", self.ttl_secs);
165                        break;
166                    }
167                    // Still busy — reset and try again later.
168                    idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
169                }
170            }
171        }
172
173        // Cleanup
174        self.shutdown().await;
175        Ok(())
176    }
177
178    /// Handle a newly connected client — read one request and act on it.
179    async fn handle_client(
180        &self,
181        stream: UnixStream,
182        queue: &mut VecDeque<PendingPrompt>,
183        active_client: &mut Option<UnixStream>,
184        active_reply_id: &mut Option<String>,
185        prompt_reply: &mut Option<
186            tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
187        >,
188    ) {
189        // We need to read a request from the stream. Clone the stream fd for
190        // the BufReader while keeping the original for sending responses.
191        let std_stream = match stream.into_std() {
192            Ok(s) => s,
193            Err(e) => {
194                eprintln!("[queue-owner] failed to convert stream: {e}");
195                return;
196            }
197        };
198        let read_std = match std_stream.try_clone() {
199            Ok(s) => s,
200            Err(e) => {
201                eprintln!("[queue-owner] failed to clone stream: {e}");
202                return;
203            }
204        };
205        let mut write_stream = match UnixStream::from_std(std_stream) {
206            Ok(s) => s,
207            Err(e) => {
208                eprintln!("[queue-owner] failed to convert write stream: {e}");
209                return;
210            }
211        };
212        let read_stream = match UnixStream::from_std(read_std) {
213            Ok(s) => s,
214            Err(e) => {
215                eprintln!("[queue-owner] failed to convert read stream: {e}");
216                return;
217            }
218        };
219
220        let mut reader = BufReader::new(read_stream);
221        let request: Option<QueueRequest> = match recv_message(&mut reader).await {
222            Ok(msg) => msg,
223            Err(e) => {
224                eprintln!("[queue-owner] failed to read request: {e}");
225                return;
226            }
227        };
228
229        match request {
230            Some(QueueRequest::Prompt { messages, reply_id }) => {
231                if active_client.is_some() {
232                    // Already processing a prompt — enqueue.
233                    let position = queue.len() + 1;
234                    let _ = send_message(
235                        &mut write_stream,
236                        &QueueResponse::Queued {
237                            reply_id: reply_id.clone(),
238                            position,
239                        },
240                    )
241                    .await;
242                    queue.push_back(PendingPrompt {
243                        messages,
244                        reply_id,
245                        client: write_stream,
246                    });
247                } else {
248                    // No active prompt — execute immediately.
249                    self.start_prompt(
250                        messages,
251                        &reply_id,
252                        write_stream,
253                        active_client,
254                        active_reply_id,
255                        prompt_reply,
256                    )
257                    .await;
258                }
259            }
260            Some(QueueRequest::Cancel) => {
261                let _ = self.bridge.cancel().await;
262                let _ = send_message(
263                    &mut write_stream,
264                    &QueueResponse::StatusResponse {
265                        state: "cancel_requested".to_string(),
266                        queue_depth: queue.len(),
267                    },
268                )
269                .await;
270            }
271            Some(QueueRequest::Status) => {
272                let state = if active_client.is_some() {
273                    "busy"
274                } else {
275                    "idle"
276                };
277                let _ = send_message(
278                    &mut write_stream,
279                    &QueueResponse::StatusResponse {
280                        state: state.to_string(),
281                        queue_depth: queue.len(),
282                    },
283                )
284                .await;
285            }
286            Some(QueueRequest::SetMode { mode }) => match self.bridge.set_mode(mode).await {
287                Ok(()) => {
288                    let _ = send_message(&mut write_stream, &QueueResponse::Ok).await;
289                }
290                Err(e) => {
291                    let _ = send_message(
292                        &mut write_stream,
293                        &QueueResponse::Error {
294                            message: e.to_string(),
295                        },
296                    )
297                    .await;
298                }
299            },
300            Some(QueueRequest::SetConfig { key, value }) => {
301                match self.bridge.set_config(key, value).await {
302                    Ok(()) => {
303                        let _ = send_message(&mut write_stream, &QueueResponse::Ok).await;
304                    }
305                    Err(e) => {
306                        let _ = send_message(
307                            &mut write_stream,
308                            &QueueResponse::Error {
309                                message: e.to_string(),
310                            },
311                        )
312                        .await;
313                    }
314                }
315            }
316            None => {
317                // Client disconnected before sending a request.
318            }
319        }
320    }
321
322    /// Start executing a prompt on the bridge.
323    async fn start_prompt(
324        &self,
325        messages: Vec<String>,
326        reply_id: &str,
327        client: UnixStream,
328        active_client: &mut Option<UnixStream>,
329        active_reply_id: &mut Option<String>,
330        prompt_reply: &mut Option<
331            tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
332        >,
333    ) {
334        match self.bridge.send_prompt(messages).await {
335            Ok(rx) => {
336                *active_client = Some(client);
337                *active_reply_id = Some(reply_id.to_string());
338                *prompt_reply = Some(rx);
339            }
340            Err(e) => {
341                let mut c = client;
342                let _ = send_message(
343                    &mut c,
344                    &QueueResponse::Error {
345                        message: e.to_string(),
346                    },
347                )
348                .await;
349            }
350        }
351    }
352
353    /// Dispatch the next queued prompt (if any).
354    async fn dispatch_next(
355        &self,
356        queue: &mut VecDeque<PendingPrompt>,
357        active_client: &mut Option<UnixStream>,
358        active_reply_id: &mut Option<String>,
359        prompt_reply: &mut Option<
360            tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
361        >,
362    ) {
363        if let Some(pending) = queue.pop_front() {
364            self.start_prompt(
365                pending.messages,
366                &pending.reply_id,
367                pending.client,
368                active_client,
369                active_reply_id,
370                prompt_reply,
371            )
372            .await;
373        }
374    }
375
376    /// Forward a bridge event to the active client as a `QueueResponse::Event`.
377    async fn forward_event(&self, event: &BridgeEvent, active_client: &mut Option<UnixStream>) {
378        let response = match event {
379            BridgeEvent::TextChunk { text } => Some(QueueResponse::Event {
380                kind: "text_chunk".to_string(),
381                data: text.clone(),
382            }),
383            BridgeEvent::ToolUse { name } => Some(QueueResponse::Event {
384                kind: "tool_use".to_string(),
385                data: name.clone(),
386            }),
387            BridgeEvent::PromptDone { stop_reason } => Some(QueueResponse::Event {
388                kind: "prompt_done".to_string(),
389                data: stop_reason.clone(),
390            }),
391            BridgeEvent::Error { message } => Some(QueueResponse::Event {
392                kind: "error".to_string(),
393                data: message.clone(),
394            }),
395            BridgeEvent::SessionCreated { session_id } => Some(QueueResponse::Event {
396                kind: "session_created".to_string(),
397                data: session_id.clone(),
398            }),
399            BridgeEvent::AgentExited { code } => Some(QueueResponse::Event {
400                kind: "agent_exited".to_string(),
401                data: code.map(|c| c.to_string()).unwrap_or_default(),
402            }),
403            BridgeEvent::ToolResult {
404                name,
405                output,
406                is_read,
407            } => Some(QueueResponse::Event {
408                kind: "tool_result".to_string(),
409                // Encoded as "name\x00{0|1}\x00output" where 1 = is_read.
410                data: format!("{name}\x00{}\x00{output}", u8::from(*is_read)),
411            }),
412            BridgeEvent::PermissionRequest { .. } => {
413                // Permission requests are handled by the owner process itself,
414                // not forwarded to IPC clients.
415                None
416            }
417        };
418
419        if let Some(resp) = response
420            && let Some(client) = active_client.as_mut()
421        {
422            let _ = send_message(client, &resp).await;
423        }
424    }
425
426    /// Shut down the owner: clean up lease, socket, and bridge.
427    async fn shutdown(self) {
428        LeaseFile::remove(&self.session_key);
429        crate::queue::ipc::cleanup_socket(&self.session_key);
430        let _ = self.bridge.shutdown().await;
431    }
432}