1use std::collections::VecDeque;
2use std::time::Duration;
3
4use tokio::io::BufReader;
5use tokio::net::UnixListener;
6use tokio::net::UnixStream;
7
8use crate::bridge::AcpBridge;
9use crate::bridge::events::BridgeEvent;
10use crate::error::Result;
11use crate::queue::ipc::{recv_message, send_message};
12use crate::queue::lease::LeaseFile;
13use crate::queue::messages::{QueueRequest, QueueResponse};
14
15struct PendingPrompt {
17 messages: Vec<String>,
18 reply_id: String,
19 client: UnixStream,
20}
21
22pub struct QueueOwner {
25 bridge: AcpBridge,
26 listener: UnixListener,
27 session_key: String,
28 ttl_secs: u64,
29}
30
31impl QueueOwner {
32 pub async fn new(
37 bridge: AcpBridge,
38 listener: UnixListener,
39 session_key: &str,
40 ttl_secs: u64,
41 ) -> Result<Self> {
42 Ok(Self {
43 bridge,
44 listener,
45 session_key: session_key.to_string(),
46 ttl_secs,
47 })
48 }
49
50 pub async fn run(mut self) -> Result<()> {
58 let mut queue: VecDeque<PendingPrompt> = VecDeque::new();
59 let mut active_client: Option<UnixStream> = None;
60 let mut active_reply_id: Option<String> = None;
61 let mut prompt_reply: Option<
62 tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
63 > = None;
64
65 let heartbeat_interval = Duration::from_secs(5);
66 let mut heartbeat_timer = tokio::time::interval(heartbeat_interval);
67 heartbeat_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
68 heartbeat_timer.tick().await;
70
71 let idle_duration = Duration::from_secs(self.ttl_secs);
72 let idle_deadline = tokio::time::sleep(idle_duration);
73 tokio::pin!(idle_deadline);
74
75 loop {
76 tokio::select! {
77 accept_result = self.listener.accept() => {
79 match accept_result {
80 Ok((stream, _addr)) => {
81 idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
83 self.handle_client(stream, &mut queue, &mut active_client, &mut active_reply_id, &mut prompt_reply).await;
84 }
85 Err(e) => {
86 eprintln!("[queue-owner] accept error: {e}");
87 }
88 }
89 }
90
91 event = self.bridge.evt_rx.recv() => {
93 match event {
94 Some(evt) => {
95 self.forward_event(&evt, &mut active_client).await;
96 }
97 None => {
98 eprintln!("[queue-owner] bridge closed, shutting down");
100 break;
101 }
102 }
103 }
104
105 result = async {
107 match prompt_reply.as_mut() {
108 Some(rx) => rx.await,
109 None => std::future::pending().await,
110 }
111 } => {
112 prompt_reply = None;
113 let (content, stop_reason) = match result {
114 Ok(Ok(pr)) => (pr.content, pr.stop_reason),
115 Ok(Err(e)) => {
116 if let Some(client) = active_client.as_mut() {
117 let _ = send_message(client, &QueueResponse::Error {
118 message: e.to_string(),
119 }).await;
120 }
121 (String::new(), "error".to_string())
122 }
123 Err(_) => {
124 if let Some(client) = active_client.as_mut() {
125 let _ = send_message(client, &QueueResponse::Error {
126 message: "bridge reply dropped".to_string(),
127 }).await;
128 }
129 (String::new(), "error".to_string())
130 }
131 };
132
133 if let Some(client) = active_client.as_mut() {
135 let reply_id = active_reply_id.take().unwrap_or_default();
136 let _ = send_message(client, &QueueResponse::PromptResult {
137 reply_id,
138 content,
139 stop_reason,
140 }).await;
141 }
142 active_client = None;
143 active_reply_id = None;
144
145 self.dispatch_next(&mut queue, &mut active_client, &mut active_reply_id, &mut prompt_reply).await;
147
148 if queue.is_empty() && active_client.is_none() {
150 idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
151 }
152 }
153
154 _ = heartbeat_timer.tick() => {
156 if let Err(e) = LeaseFile::update_heartbeat(&self.session_key) {
157 eprintln!("[queue-owner] heartbeat error: {e}");
158 }
159 }
160
161 _ = &mut idle_deadline => {
163 if queue.is_empty() && active_client.is_none() {
164 eprintln!("[queue-owner] idle timeout ({} s), shutting down", self.ttl_secs);
165 break;
166 }
167 idle_deadline.as_mut().reset(tokio::time::Instant::now() + idle_duration);
169 }
170 }
171 }
172
173 self.shutdown().await;
175 Ok(())
176 }
177
178 async fn handle_client(
180 &self,
181 stream: UnixStream,
182 queue: &mut VecDeque<PendingPrompt>,
183 active_client: &mut Option<UnixStream>,
184 active_reply_id: &mut Option<String>,
185 prompt_reply: &mut Option<
186 tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
187 >,
188 ) {
189 let std_stream = match stream.into_std() {
192 Ok(s) => s,
193 Err(e) => {
194 eprintln!("[queue-owner] failed to convert stream: {e}");
195 return;
196 }
197 };
198 let read_std = match std_stream.try_clone() {
199 Ok(s) => s,
200 Err(e) => {
201 eprintln!("[queue-owner] failed to clone stream: {e}");
202 return;
203 }
204 };
205 let mut write_stream = match UnixStream::from_std(std_stream) {
206 Ok(s) => s,
207 Err(e) => {
208 eprintln!("[queue-owner] failed to convert write stream: {e}");
209 return;
210 }
211 };
212 let read_stream = match UnixStream::from_std(read_std) {
213 Ok(s) => s,
214 Err(e) => {
215 eprintln!("[queue-owner] failed to convert read stream: {e}");
216 return;
217 }
218 };
219
220 let mut reader = BufReader::new(read_stream);
221 let request: Option<QueueRequest> = match recv_message(&mut reader).await {
222 Ok(msg) => msg,
223 Err(e) => {
224 eprintln!("[queue-owner] failed to read request: {e}");
225 return;
226 }
227 };
228
229 match request {
230 Some(QueueRequest::Prompt { messages, reply_id }) => {
231 if active_client.is_some() {
232 let position = queue.len() + 1;
234 let _ = send_message(
235 &mut write_stream,
236 &QueueResponse::Queued {
237 reply_id: reply_id.clone(),
238 position,
239 },
240 )
241 .await;
242 queue.push_back(PendingPrompt {
243 messages,
244 reply_id,
245 client: write_stream,
246 });
247 } else {
248 self.start_prompt(
250 messages,
251 &reply_id,
252 write_stream,
253 active_client,
254 active_reply_id,
255 prompt_reply,
256 )
257 .await;
258 }
259 }
260 Some(QueueRequest::Cancel) => {
261 let _ = self.bridge.cancel().await;
262 let _ = send_message(
263 &mut write_stream,
264 &QueueResponse::StatusResponse {
265 state: "cancel_requested".to_string(),
266 queue_depth: queue.len(),
267 },
268 )
269 .await;
270 }
271 Some(QueueRequest::Status) => {
272 let state = if active_client.is_some() {
273 "busy"
274 } else {
275 "idle"
276 };
277 let _ = send_message(
278 &mut write_stream,
279 &QueueResponse::StatusResponse {
280 state: state.to_string(),
281 queue_depth: queue.len(),
282 },
283 )
284 .await;
285 }
286 Some(QueueRequest::SetMode { mode }) => match self.bridge.set_mode(mode).await {
287 Ok(()) => {
288 let _ = send_message(&mut write_stream, &QueueResponse::Ok).await;
289 }
290 Err(e) => {
291 let _ = send_message(
292 &mut write_stream,
293 &QueueResponse::Error {
294 message: e.to_string(),
295 },
296 )
297 .await;
298 }
299 },
300 Some(QueueRequest::SetConfig { key, value }) => {
301 match self.bridge.set_config(key, value).await {
302 Ok(()) => {
303 let _ = send_message(&mut write_stream, &QueueResponse::Ok).await;
304 }
305 Err(e) => {
306 let _ = send_message(
307 &mut write_stream,
308 &QueueResponse::Error {
309 message: e.to_string(),
310 },
311 )
312 .await;
313 }
314 }
315 }
316 None => {
317 }
319 }
320 }
321
322 async fn start_prompt(
324 &self,
325 messages: Vec<String>,
326 reply_id: &str,
327 client: UnixStream,
328 active_client: &mut Option<UnixStream>,
329 active_reply_id: &mut Option<String>,
330 prompt_reply: &mut Option<
331 tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
332 >,
333 ) {
334 match self.bridge.send_prompt(messages).await {
335 Ok(rx) => {
336 *active_client = Some(client);
337 *active_reply_id = Some(reply_id.to_string());
338 *prompt_reply = Some(rx);
339 }
340 Err(e) => {
341 let mut c = client;
342 let _ = send_message(
343 &mut c,
344 &QueueResponse::Error {
345 message: e.to_string(),
346 },
347 )
348 .await;
349 }
350 }
351 }
352
353 async fn dispatch_next(
355 &self,
356 queue: &mut VecDeque<PendingPrompt>,
357 active_client: &mut Option<UnixStream>,
358 active_reply_id: &mut Option<String>,
359 prompt_reply: &mut Option<
360 tokio::sync::oneshot::Receiver<Result<crate::bridge::events::PromptResult>>,
361 >,
362 ) {
363 if let Some(pending) = queue.pop_front() {
364 self.start_prompt(
365 pending.messages,
366 &pending.reply_id,
367 pending.client,
368 active_client,
369 active_reply_id,
370 prompt_reply,
371 )
372 .await;
373 }
374 }
375
376 async fn forward_event(&self, event: &BridgeEvent, active_client: &mut Option<UnixStream>) {
378 let response = match event {
379 BridgeEvent::TextChunk { text } => Some(QueueResponse::Event {
380 kind: "text_chunk".to_string(),
381 data: text.clone(),
382 }),
383 BridgeEvent::ToolUse { name } => Some(QueueResponse::Event {
384 kind: "tool_use".to_string(),
385 data: name.clone(),
386 }),
387 BridgeEvent::PromptDone { stop_reason } => Some(QueueResponse::Event {
388 kind: "prompt_done".to_string(),
389 data: stop_reason.clone(),
390 }),
391 BridgeEvent::Error { message } => Some(QueueResponse::Event {
392 kind: "error".to_string(),
393 data: message.clone(),
394 }),
395 BridgeEvent::SessionCreated { session_id } => Some(QueueResponse::Event {
396 kind: "session_created".to_string(),
397 data: session_id.clone(),
398 }),
399 BridgeEvent::AgentExited { code } => Some(QueueResponse::Event {
400 kind: "agent_exited".to_string(),
401 data: code.map(|c| c.to_string()).unwrap_or_default(),
402 }),
403 BridgeEvent::PermissionRequest { .. } => {
404 None
407 }
408 };
409
410 if let Some(resp) = response
411 && let Some(client) = active_client.as_mut()
412 {
413 let _ = send_message(client, &resp).await;
414 }
415 }
416
417 async fn shutdown(self) {
419 LeaseFile::remove(&self.session_key);
420 crate::queue::ipc::cleanup_socket(&self.session_key);
421 let _ = self.bridge.shutdown().await;
422 }
423}