1use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, oneshot};
11
12pub struct WaitQueue {
14 queues: RwLock<HashMap<String, VecDeque<WaitEntry>>>,
16 wait_history: RwLock<HashMap<String, Vec<Duration>>>,
18 event_sender: broadcast::Sender<WaitQueueEvent>,
20 max_history_entries: usize,
22}
23
24#[derive(Debug)]
26pub struct WaitEntry {
27 pub agent_id: String,
29 pub priority: u8,
31 pub registered_at: Instant,
33 pub auto_acquire: bool,
35 notify_sender: Option<oneshot::Sender<()>>,
37}
38
39pub struct WaitQueueHandle {
41 pub ready: oneshot::Receiver<()>,
43 pub initial_position: usize,
45 pub resource_key: String,
47 pub agent_id: String,
49 wait_queue: Arc<WaitQueue>,
51}
52
53impl WaitQueueHandle {
54 pub async fn cancel(self) -> bool {
56 self.wait_queue
57 .cancel(&self.resource_key, &self.agent_id)
58 .await
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum WaitQueueEvent {
66 Registered {
68 agent_id: String,
70 resource_key: String,
72 position: usize,
74 priority: u8,
76 },
77 PositionChanged {
79 agent_id: String,
81 resource_key: String,
83 old_position: usize,
85 new_position: usize,
87 },
88 Ready {
90 agent_id: String,
92 resource_key: String,
94 wait_duration_ms: u64,
96 },
97 Removed {
99 agent_id: String,
101 resource_key: String,
103 reason: RemovalReason,
105 },
106 QueueEmpty {
108 resource_key: String,
110 },
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum RemovalReason {
117 Cancelled,
119 Acquired,
121 Timeout,
123 ResourceUnavailable,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct QueueStatus {
130 pub resource_key: String,
132 pub queue_length: usize,
134 pub waiters: Vec<WaiterInfo>,
136 pub estimated_wait_ms: Option<u64>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct WaiterInfo {
143 pub agent_id: String,
145 pub position: usize,
147 pub priority: u8,
149 pub waiting_since_secs: u64,
151 pub auto_acquire: bool,
153}
154
155impl WaitQueue {
156 pub fn new() -> Arc<Self> {
158 Self::with_max_history(100)
159 }
160
161 pub fn with_max_history(max_history_entries: usize) -> Arc<Self> {
163 let (event_sender, _) = broadcast::channel(256);
164 Arc::new(Self {
165 queues: RwLock::new(HashMap::new()),
166 wait_history: RwLock::new(HashMap::new()),
167 event_sender,
168 max_history_entries,
169 })
170 }
171
172 pub fn subscribe(&self) -> broadcast::Receiver<WaitQueueEvent> {
174 self.event_sender.subscribe()
175 }
176
177 pub async fn register(
182 self: &Arc<Self>,
183 resource_key: &str,
184 agent_id: &str,
185 priority: u8,
186 auto_acquire: bool,
187 ) -> WaitQueueHandle {
188 let (notify_sender, notify_receiver) = oneshot::channel();
189
190 let entry = WaitEntry {
191 agent_id: agent_id.to_string(),
192 priority,
193 registered_at: Instant::now(),
194 auto_acquire,
195 notify_sender: Some(notify_sender),
196 };
197
198 let position = {
199 let mut queues = self.queues.write().await;
200 let queue = queues.entry(resource_key.to_string()).or_default();
201
202 let insert_pos = queue
204 .iter()
205 .position(|e| e.priority > priority)
206 .unwrap_or(queue.len());
207
208 queue.insert(insert_pos, entry);
209
210 for (i, e) in queue.iter().enumerate().skip(insert_pos + 1) {
212 let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
213 agent_id: e.agent_id.clone(),
214 resource_key: resource_key.to_string(),
215 old_position: i - 1,
216 new_position: i,
217 });
218 }
219
220 insert_pos
221 };
222
223 let _ = self.event_sender.send(WaitQueueEvent::Registered {
224 agent_id: agent_id.to_string(),
225 resource_key: resource_key.to_string(),
226 position,
227 priority,
228 });
229
230 WaitQueueHandle {
231 ready: notify_receiver,
232 initial_position: position,
233 resource_key: resource_key.to_string(),
234 agent_id: agent_id.to_string(),
235 wait_queue: Arc::clone(self),
236 }
237 }
238
239 pub async fn cancel(&self, resource_key: &str, agent_id: &str) -> bool {
241 let mut queues = self.queues.write().await;
242
243 if let Some(queue) = queues.get_mut(resource_key)
244 && let Some(pos) = queue.iter().position(|e| e.agent_id == agent_id)
245 {
246 queue.remove(pos);
247
248 for (i, e) in queue.iter().enumerate().skip(pos) {
250 let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
251 agent_id: e.agent_id.clone(),
252 resource_key: resource_key.to_string(),
253 old_position: i + 1,
254 new_position: i,
255 });
256 }
257
258 let _ = self.event_sender.send(WaitQueueEvent::Removed {
259 agent_id: agent_id.to_string(),
260 resource_key: resource_key.to_string(),
261 reason: RemovalReason::Cancelled,
262 });
263
264 if queue.is_empty() {
265 queues.remove(resource_key);
266 let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
267 resource_key: resource_key.to_string(),
268 });
269 }
270
271 return true;
272 }
273 false
274 }
275
276 pub async fn notify_released(&self, resource_key: &str) -> Option<String> {
280 let mut queues = self.queues.write().await;
281
282 if let Some(queue) = queues.get_mut(resource_key)
283 && let Some(mut entry) = queue.pop_front()
284 {
285 let wait_duration = entry.registered_at.elapsed();
286
287 {
289 let mut history = self.wait_history.write().await;
290 let times = history.entry(resource_key.to_string()).or_default();
291 times.push(wait_duration);
292 if times.len() > self.max_history_entries {
293 times.remove(0);
294 }
295 }
296
297 if let Some(sender) = entry.notify_sender.take() {
299 let _ = sender.send(());
300 }
301
302 let agent_id = entry.agent_id.clone();
303
304 let _ = self.event_sender.send(WaitQueueEvent::Ready {
305 agent_id: agent_id.clone(),
306 resource_key: resource_key.to_string(),
307 wait_duration_ms: wait_duration.as_millis() as u64,
308 });
309
310 for (i, e) in queue.iter().enumerate() {
312 let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
313 agent_id: e.agent_id.clone(),
314 resource_key: resource_key.to_string(),
315 old_position: i + 1,
316 new_position: i,
317 });
318 }
319
320 if queue.is_empty() {
321 queues.remove(resource_key);
322 let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
323 resource_key: resource_key.to_string(),
324 });
325 }
326
327 return Some(agent_id);
328 }
329 None
330 }
331
332 pub async fn queue_length(&self, resource_key: &str) -> usize {
334 let queues = self.queues.read().await;
335 queues.get(resource_key).map_or(0, |q| q.len())
336 }
337
338 pub async fn position(&self, resource_key: &str, agent_id: &str) -> Option<usize> {
340 let queues = self.queues.read().await;
341 queues
342 .get(resource_key)
343 .and_then(|q| q.iter().position(|e| e.agent_id == agent_id))
344 }
345
346 pub async fn estimate_wait(&self, resource_key: &str) -> Option<Duration> {
348 let history = self.wait_history.read().await;
349 if let Some(times) = history.get(resource_key) {
350 if times.is_empty() {
351 return None;
352 }
353 let total: Duration = times.iter().sum();
355 Some(total / times.len() as u32)
356 } else {
357 None
358 }
359 }
360
361 pub async fn estimate_wait_at_position(
363 &self,
364 resource_key: &str,
365 position: usize,
366 ) -> Option<Duration> {
367 let base_estimate = self.estimate_wait(resource_key).await?;
368 Some(base_estimate * (position as u32 + 1))
369 }
370
371 pub async fn get_queue_status(&self, resource_key: &str) -> Option<QueueStatus> {
373 let queues = self.queues.read().await;
374 let queue = queues.get(resource_key)?;
375
376 let waiters: Vec<WaiterInfo> = queue
377 .iter()
378 .enumerate()
379 .map(|(i, e)| WaiterInfo {
380 agent_id: e.agent_id.clone(),
381 position: i,
382 priority: e.priority,
383 waiting_since_secs: e.registered_at.elapsed().as_secs(),
384 auto_acquire: e.auto_acquire,
385 })
386 .collect();
387
388 let estimated_wait_ms = self
389 .estimate_wait(resource_key)
390 .await
391 .map(|d| d.as_millis() as u64);
392
393 Some(QueueStatus {
394 resource_key: resource_key.to_string(),
395 queue_length: queue.len(),
396 waiters,
397 estimated_wait_ms,
398 })
399 }
400
401 pub async fn list_queues(&self) -> Vec<String> {
403 let queues = self.queues.read().await;
404 queues.keys().cloned().collect()
405 }
406
407 pub async fn is_waiting(&self, agent_id: &str) -> bool {
409 let queues = self.queues.read().await;
410 queues
411 .values()
412 .any(|q| q.iter().any(|e| e.agent_id == agent_id))
413 }
414
415 pub async fn waiting_for(&self, agent_id: &str) -> Vec<String> {
417 let queues = self.queues.read().await;
418 queues
419 .iter()
420 .filter(|(_, q)| q.iter().any(|e| e.agent_id == agent_id))
421 .map(|(k, _)| k.clone())
422 .collect()
423 }
424
425 pub async fn record_wait_time(&self, resource_key: &str, duration: Duration) {
427 let mut history = self.wait_history.write().await;
428 let times = history.entry(resource_key.to_string()).or_default();
429 times.push(duration);
430 if times.len() > self.max_history_entries {
431 times.remove(0);
432 }
433 }
434
435 pub async fn peek_next(&self, resource_key: &str) -> Option<WaiterInfo> {
437 let queues = self.queues.read().await;
438 queues.get(resource_key).and_then(|q| {
439 q.front().map(|e| WaiterInfo {
440 agent_id: e.agent_id.clone(),
441 position: 0,
442 priority: e.priority,
443 waiting_since_secs: e.registered_at.elapsed().as_secs(),
444 auto_acquire: e.auto_acquire,
445 })
446 })
447 }
448
449 pub async fn should_auto_acquire(&self, resource_key: &str, agent_id: &str) -> bool {
451 let queues = self.queues.read().await;
452 if let Some(queue) = queues.get(resource_key)
453 && let Some(front) = queue.front()
454 {
455 return front.agent_id == agent_id && front.auto_acquire;
456 }
457 false
458 }
459}
460
461impl Default for WaitQueue {
462 fn default() -> Self {
463 let (event_sender, _) = broadcast::channel(256);
464 Self {
465 queues: RwLock::new(HashMap::new()),
466 wait_history: RwLock::new(HashMap::new()),
467 event_sender,
468 max_history_entries: 100,
469 }
470 }
471}
472
473pub fn resource_key(operation_type: &str, scope: &str) -> String {
475 format!("{}:{}", operation_type, scope)
476}
477
478pub fn file_resource_key(path: &std::path::Path) -> String {
480 format!("file:{}", path.display())
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[tokio::test]
488 async fn test_register_and_position() {
489 let queue = WaitQueue::new();
490
491 let handle1 = queue.register("build:/project", "agent-1", 5, false).await;
492 let handle2 = queue.register("build:/project", "agent-2", 5, false).await;
493
494 assert_eq!(handle1.initial_position, 0);
495 assert_eq!(handle2.initial_position, 1);
496
497 assert_eq!(queue.position("build:/project", "agent-1").await, Some(0));
498 assert_eq!(queue.position("build:/project", "agent-2").await, Some(1));
499 assert_eq!(queue.queue_length("build:/project").await, 2);
500 }
501
502 #[tokio::test]
503 async fn test_priority_ordering() {
504 let queue = WaitQueue::new();
505
506 let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
508 let handle2 = queue.register("build:/project", "agent-2", 1, false).await; let _handle3 = queue.register("build:/project", "agent-3", 10, false).await; assert_eq!(handle2.initial_position, 0);
513 assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
514 assert_eq!(queue.position("build:/project", "agent-1").await, Some(1));
515 assert_eq!(queue.position("build:/project", "agent-3").await, Some(2));
516 }
517
518 #[tokio::test]
519 async fn test_cancel() {
520 let queue = WaitQueue::new();
521
522 let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
523 let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
524
525 assert!(queue.cancel("build:/project", "agent-1").await);
526 assert_eq!(queue.position("build:/project", "agent-1").await, None);
527 assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
528 assert_eq!(queue.queue_length("build:/project").await, 1);
529 }
530
531 #[tokio::test]
532 async fn test_notify_released() {
533 let queue = WaitQueue::new();
534
535 let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
536 let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
537
538 let next = queue.notify_released("build:/project").await;
540 assert_eq!(next, Some("agent-1".to_string()));
541
542 assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
547 assert_eq!(queue.queue_length("build:/project").await, 1);
548 }
549
550 #[tokio::test]
551 async fn test_empty_queue_cleanup() {
552 let queue = WaitQueue::new();
553
554 let _handle = queue.register("build:/project", "agent-1", 5, false).await;
555 assert!(queue.cancel("build:/project", "agent-1").await);
556
557 assert_eq!(queue.queue_length("build:/project").await, 0);
559 assert!(queue.list_queues().await.is_empty());
560 }
561
562 #[tokio::test]
563 async fn test_wait_time_estimation() {
564 let queue = WaitQueue::new();
565
566 queue
568 .record_wait_time("build:/project", Duration::from_secs(10))
569 .await;
570 queue
571 .record_wait_time("build:/project", Duration::from_secs(20))
572 .await;
573 queue
574 .record_wait_time("build:/project", Duration::from_secs(30))
575 .await;
576
577 let estimate = queue.estimate_wait("build:/project").await.unwrap();
578 assert_eq!(estimate, Duration::from_secs(20)); }
580
581 #[tokio::test]
582 async fn test_is_waiting() {
583 let queue = WaitQueue::new();
584
585 let _handle = queue.register("build:/project", "agent-1", 5, false).await;
586
587 assert!(queue.is_waiting("agent-1").await);
588 assert!(!queue.is_waiting("agent-2").await);
589 }
590
591 #[tokio::test]
592 async fn test_waiting_for() {
593 let queue = WaitQueue::new();
594
595 let _handle1 = queue.register("build:/project1", "agent-1", 5, false).await;
596 let _handle2 = queue.register("build:/project2", "agent-1", 5, false).await;
597
598 let waiting = queue.waiting_for("agent-1").await;
599 assert_eq!(waiting.len(), 2);
600 assert!(waiting.contains(&"build:/project1".to_string()));
601 assert!(waiting.contains(&"build:/project2".to_string()));
602 }
603
604 #[tokio::test]
605 async fn test_peek_next() {
606 let queue = WaitQueue::new();
607
608 let _handle = queue.register("build:/project", "agent-1", 5, true).await;
609
610 let next = queue.peek_next("build:/project").await.unwrap();
611 assert_eq!(next.agent_id, "agent-1");
612 assert_eq!(next.priority, 5);
613 assert!(next.auto_acquire);
614
615 assert_eq!(queue.queue_length("build:/project").await, 1);
617 }
618
619 #[tokio::test]
620 async fn test_should_auto_acquire() {
621 let queue = WaitQueue::new();
622
623 let _handle1 = queue.register("build:/project", "agent-1", 5, true).await;
624 let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
625
626 assert!(queue.should_auto_acquire("build:/project", "agent-1").await);
627 assert!(!queue.should_auto_acquire("build:/project", "agent-2").await);
628 }
629
630 #[tokio::test]
631 async fn test_queue_status() {
632 let queue = WaitQueue::new();
633
634 let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
635 let _handle2 = queue.register("build:/project", "agent-2", 3, true).await;
636
637 let status = queue.get_queue_status("build:/project").await.unwrap();
638 assert_eq!(status.queue_length, 2);
639 assert_eq!(status.waiters.len(), 2);
640
641 assert_eq!(status.waiters[0].agent_id, "agent-2");
643 assert_eq!(status.waiters[1].agent_id, "agent-1");
644 }
645
646 #[tokio::test]
647 async fn test_event_subscription() {
648 let queue = WaitQueue::new();
649 let mut receiver = queue.subscribe();
650
651 let _handle = queue.register("build:/project", "agent-1", 5, false).await;
652
653 let event = receiver.try_recv().unwrap();
655 match event {
656 WaitQueueEvent::Registered { agent_id, .. } => {
657 assert_eq!(agent_id, "agent-1");
658 }
659 _ => panic!("Expected Registered event"),
660 }
661 }
662}