Skip to main content

brainwires_agents/
wait_queue.rs

1//! Wait queue implementation for resource coordination
2//!
3//! Manages agents waiting for locked resources with priority ordering
4//! and notification when resources become available.
5
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, oneshot};
11
12/// Priority-ordered wait queue for resource locks
13pub struct WaitQueue {
14    /// Queues indexed by resource key (e.g., "build:/path/to/project")
15    queues: RwLock<HashMap<String, VecDeque<WaitEntry>>>,
16    /// Historical wait times for estimation (resource_key -> durations)
17    wait_history: RwLock<HashMap<String, Vec<Duration>>>,
18    /// Notification broadcaster for queue events
19    event_sender: broadcast::Sender<WaitQueueEvent>,
20    /// Maximum history entries to keep per resource
21    max_history_entries: usize,
22}
23
24/// Entry in the wait queue
25#[derive(Debug)]
26pub struct WaitEntry {
27    /// Agent waiting for the resource
28    pub agent_id: String,
29    /// Priority (0 = highest, higher numbers = lower priority)
30    pub priority: u8,
31    /// When the agent registered in the queue
32    pub registered_at: Instant,
33    /// Whether to automatically acquire when reaching front
34    pub auto_acquire: bool,
35    /// Channel to notify when agent reaches front of queue
36    notify_sender: Option<oneshot::Sender<()>>,
37}
38
39/// Handle returned when registering in wait queue
40pub struct WaitQueueHandle {
41    /// Receiver that fires when agent reaches front of queue
42    pub ready: oneshot::Receiver<()>,
43    /// Initial position in queue (0 = front)
44    pub initial_position: usize,
45    /// Resource being waited for
46    pub resource_key: String,
47    /// Agent ID
48    pub agent_id: String,
49    /// Reference to wait queue for cancellation
50    wait_queue: Arc<WaitQueue>,
51}
52
53impl WaitQueueHandle {
54    /// Cancel waiting and remove from queue
55    pub async fn cancel(self) -> bool {
56        self.wait_queue
57            .cancel(&self.resource_key, &self.agent_id)
58            .await
59    }
60}
61
62/// Events emitted by the wait queue
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum WaitQueueEvent {
66    /// Agent registered in queue.
67    Registered {
68        /// Agent that registered.
69        agent_id: String,
70        /// Resource being waited for.
71        resource_key: String,
72        /// Initial position in the queue.
73        position: usize,
74        /// Agent's priority level.
75        priority: u8,
76    },
77    /// Agent's position changed (due to higher priority agent joining).
78    PositionChanged {
79        /// Affected agent identifier.
80        agent_id: String,
81        /// Resource being waited for.
82        resource_key: String,
83        /// Previous position in queue.
84        old_position: usize,
85        /// New position in queue.
86        new_position: usize,
87    },
88    /// Agent reached front of queue and can acquire.
89    Ready {
90        /// Agent that is ready.
91        agent_id: String,
92        /// Resource now available.
93        resource_key: String,
94        /// Time spent waiting in milliseconds.
95        wait_duration_ms: u64,
96    },
97    /// Agent was removed from queue (cancelled or resource acquired).
98    Removed {
99        /// Agent that was removed.
100        agent_id: String,
101        /// Resource that was being waited for.
102        resource_key: String,
103        /// Why the agent was removed.
104        reason: RemovalReason,
105    },
106    /// Queue became empty for a resource.
107    QueueEmpty {
108        /// Resource whose queue is now empty.
109        resource_key: String,
110    },
111}
112
113/// Reason for removal from queue
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum RemovalReason {
117    /// Agent cancelled the wait
118    Cancelled,
119    /// Agent acquired the resource
120    Acquired,
121    /// Agent timed out (if timeout was set)
122    Timeout,
123    /// Resource became unavailable
124    ResourceUnavailable,
125}
126
127/// Status of a wait queue for a resource
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct QueueStatus {
130    /// Resource identifier.
131    pub resource_key: String,
132    /// Number of agents waiting.
133    pub queue_length: usize,
134    /// Details about each waiter.
135    pub waiters: Vec<WaiterInfo>,
136    /// Estimated wait time in milliseconds.
137    pub estimated_wait_ms: Option<u64>,
138}
139
140/// Information about a waiter in the queue
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct WaiterInfo {
143    /// Agent identifier.
144    pub agent_id: String,
145    /// Current position in queue (0 = front).
146    pub position: usize,
147    /// Priority level (lower = higher priority).
148    pub priority: u8,
149    /// Seconds since the agent started waiting.
150    pub waiting_since_secs: u64,
151    /// Whether to auto-acquire when reaching front.
152    pub auto_acquire: bool,
153}
154
155impl WaitQueue {
156    /// Create a new wait queue
157    pub fn new() -> Arc<Self> {
158        Self::with_max_history(100)
159    }
160
161    /// Create a new wait queue with custom max history entries
162    pub fn with_max_history(max_history_entries: usize) -> Arc<Self> {
163        let (event_sender, _) = broadcast::channel(256);
164        Arc::new(Self {
165            queues: RwLock::new(HashMap::new()),
166            wait_history: RwLock::new(HashMap::new()),
167            event_sender,
168            max_history_entries,
169        })
170    }
171
172    /// Subscribe to queue events
173    pub fn subscribe(&self) -> broadcast::Receiver<WaitQueueEvent> {
174        self.event_sender.subscribe()
175    }
176
177    /// Register interest in a resource
178    ///
179    /// Returns a handle with a receiver that fires when the agent reaches
180    /// the front of the queue.
181    pub async fn register(
182        self: &Arc<Self>,
183        resource_key: &str,
184        agent_id: &str,
185        priority: u8,
186        auto_acquire: bool,
187    ) -> WaitQueueHandle {
188        let (notify_sender, notify_receiver) = oneshot::channel();
189
190        let entry = WaitEntry {
191            agent_id: agent_id.to_string(),
192            priority,
193            registered_at: Instant::now(),
194            auto_acquire,
195            notify_sender: Some(notify_sender),
196        };
197
198        let position = {
199            let mut queues = self.queues.write().await;
200            let queue = queues.entry(resource_key.to_string()).or_default();
201
202            // Find insertion position based on priority (lower number = higher priority)
203            let insert_pos = queue
204                .iter()
205                .position(|e| e.priority > priority)
206                .unwrap_or(queue.len());
207
208            queue.insert(insert_pos, entry);
209
210            // Notify agents whose position changed
211            for (i, e) in queue.iter().enumerate().skip(insert_pos + 1) {
212                let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
213                    agent_id: e.agent_id.clone(),
214                    resource_key: resource_key.to_string(),
215                    old_position: i - 1,
216                    new_position: i,
217                });
218            }
219
220            insert_pos
221        };
222
223        let _ = self.event_sender.send(WaitQueueEvent::Registered {
224            agent_id: agent_id.to_string(),
225            resource_key: resource_key.to_string(),
226            position,
227            priority,
228        });
229
230        WaitQueueHandle {
231            ready: notify_receiver,
232            initial_position: position,
233            resource_key: resource_key.to_string(),
234            agent_id: agent_id.to_string(),
235            wait_queue: Arc::clone(self),
236        }
237    }
238
239    /// Remove an agent from the queue
240    pub async fn cancel(&self, resource_key: &str, agent_id: &str) -> bool {
241        let mut queues = self.queues.write().await;
242
243        if let Some(queue) = queues.get_mut(resource_key)
244            && let Some(pos) = queue.iter().position(|e| e.agent_id == agent_id)
245        {
246            queue.remove(pos);
247
248            // Notify agents whose position changed
249            for (i, e) in queue.iter().enumerate().skip(pos) {
250                let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
251                    agent_id: e.agent_id.clone(),
252                    resource_key: resource_key.to_string(),
253                    old_position: i + 1,
254                    new_position: i,
255                });
256            }
257
258            let _ = self.event_sender.send(WaitQueueEvent::Removed {
259                agent_id: agent_id.to_string(),
260                resource_key: resource_key.to_string(),
261                reason: RemovalReason::Cancelled,
262            });
263
264            if queue.is_empty() {
265                queues.remove(resource_key);
266                let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
267                    resource_key: resource_key.to_string(),
268                });
269            }
270
271            return true;
272        }
273        false
274    }
275
276    /// Notify that a resource was released
277    ///
278    /// Returns the agent_id of the next waiter (if any) who should acquire.
279    pub async fn notify_released(&self, resource_key: &str) -> Option<String> {
280        let mut queues = self.queues.write().await;
281
282        if let Some(queue) = queues.get_mut(resource_key)
283            && let Some(mut entry) = queue.pop_front()
284        {
285            let wait_duration = entry.registered_at.elapsed();
286
287            // Record wait time for estimation
288            {
289                let mut history = self.wait_history.write().await;
290                let times = history.entry(resource_key.to_string()).or_default();
291                times.push(wait_duration);
292                if times.len() > self.max_history_entries {
293                    times.remove(0);
294                }
295            }
296
297            // Notify the waiter
298            if let Some(sender) = entry.notify_sender.take() {
299                let _ = sender.send(());
300            }
301
302            let agent_id = entry.agent_id.clone();
303
304            let _ = self.event_sender.send(WaitQueueEvent::Ready {
305                agent_id: agent_id.clone(),
306                resource_key: resource_key.to_string(),
307                wait_duration_ms: wait_duration.as_millis() as u64,
308            });
309
310            // Update positions for remaining waiters
311            for (i, e) in queue.iter().enumerate() {
312                let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
313                    agent_id: e.agent_id.clone(),
314                    resource_key: resource_key.to_string(),
315                    old_position: i + 1,
316                    new_position: i,
317                });
318            }
319
320            if queue.is_empty() {
321                queues.remove(resource_key);
322                let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
323                    resource_key: resource_key.to_string(),
324                });
325            }
326
327            return Some(agent_id);
328        }
329        None
330    }
331
332    /// Get queue length for a resource
333    pub async fn queue_length(&self, resource_key: &str) -> usize {
334        let queues = self.queues.read().await;
335        queues.get(resource_key).map_or(0, |q| q.len())
336    }
337
338    /// Get position of agent in queue (0 = front)
339    pub async fn position(&self, resource_key: &str, agent_id: &str) -> Option<usize> {
340        let queues = self.queues.read().await;
341        queues
342            .get(resource_key)
343            .and_then(|q| q.iter().position(|e| e.agent_id == agent_id))
344    }
345
346    /// Estimate wait time based on historical data
347    pub async fn estimate_wait(&self, resource_key: &str) -> Option<Duration> {
348        let history = self.wait_history.read().await;
349        if let Some(times) = history.get(resource_key) {
350            if times.is_empty() {
351                return None;
352            }
353            // Return average wait time
354            let total: Duration = times.iter().sum();
355            Some(total / times.len() as u32)
356        } else {
357            None
358        }
359    }
360
361    /// Estimate wait time for a specific position
362    pub async fn estimate_wait_at_position(
363        &self,
364        resource_key: &str,
365        position: usize,
366    ) -> Option<Duration> {
367        let base_estimate = self.estimate_wait(resource_key).await?;
368        Some(base_estimate * (position as u32 + 1))
369    }
370
371    /// Get detailed status of a queue
372    pub async fn get_queue_status(&self, resource_key: &str) -> Option<QueueStatus> {
373        let queues = self.queues.read().await;
374        let queue = queues.get(resource_key)?;
375
376        let waiters: Vec<WaiterInfo> = queue
377            .iter()
378            .enumerate()
379            .map(|(i, e)| WaiterInfo {
380                agent_id: e.agent_id.clone(),
381                position: i,
382                priority: e.priority,
383                waiting_since_secs: e.registered_at.elapsed().as_secs(),
384                auto_acquire: e.auto_acquire,
385            })
386            .collect();
387
388        let estimated_wait_ms = self
389            .estimate_wait(resource_key)
390            .await
391            .map(|d| d.as_millis() as u64);
392
393        Some(QueueStatus {
394            resource_key: resource_key.to_string(),
395            queue_length: queue.len(),
396            waiters,
397            estimated_wait_ms,
398        })
399    }
400
401    /// Get all active queues
402    pub async fn list_queues(&self) -> Vec<String> {
403        let queues = self.queues.read().await;
404        queues.keys().cloned().collect()
405    }
406
407    /// Check if an agent is waiting for any resource
408    pub async fn is_waiting(&self, agent_id: &str) -> bool {
409        let queues = self.queues.read().await;
410        queues
411            .values()
412            .any(|q| q.iter().any(|e| e.agent_id == agent_id))
413    }
414
415    /// Get all resources an agent is waiting for
416    pub async fn waiting_for(&self, agent_id: &str) -> Vec<String> {
417        let queues = self.queues.read().await;
418        queues
419            .iter()
420            .filter(|(_, q)| q.iter().any(|e| e.agent_id == agent_id))
421            .map(|(k, _)| k.clone())
422            .collect()
423    }
424
425    /// Record a completed wait time (for external tracking)
426    pub async fn record_wait_time(&self, resource_key: &str, duration: Duration) {
427        let mut history = self.wait_history.write().await;
428        let times = history.entry(resource_key.to_string()).or_default();
429        times.push(duration);
430        if times.len() > self.max_history_entries {
431            times.remove(0);
432        }
433    }
434
435    /// Get the next waiter without removing them (peek)
436    pub async fn peek_next(&self, resource_key: &str) -> Option<WaiterInfo> {
437        let queues = self.queues.read().await;
438        queues.get(resource_key).and_then(|q| {
439            q.front().map(|e| WaiterInfo {
440                agent_id: e.agent_id.clone(),
441                position: 0,
442                priority: e.priority,
443                waiting_since_secs: e.registered_at.elapsed().as_secs(),
444                auto_acquire: e.auto_acquire,
445            })
446        })
447    }
448
449    /// Check if agent should auto-acquire (is at front and has auto_acquire set)
450    pub async fn should_auto_acquire(&self, resource_key: &str, agent_id: &str) -> bool {
451        let queues = self.queues.read().await;
452        if let Some(queue) = queues.get(resource_key)
453            && let Some(front) = queue.front()
454        {
455            return front.agent_id == agent_id && front.auto_acquire;
456        }
457        false
458    }
459}
460
461impl Default for WaitQueue {
462    fn default() -> Self {
463        let (event_sender, _) = broadcast::channel(256);
464        Self {
465            queues: RwLock::new(HashMap::new()),
466            wait_history: RwLock::new(HashMap::new()),
467            event_sender,
468            max_history_entries: 100,
469        }
470    }
471}
472
473/// Generate a resource key for a given operation type and scope
474pub fn resource_key(operation_type: &str, scope: &str) -> String {
475    format!("{}:{}", operation_type, scope)
476}
477
478/// Generate a resource key for a file
479pub fn file_resource_key(path: &std::path::Path) -> String {
480    format!("file:{}", path.display())
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[tokio::test]
488    async fn test_register_and_position() {
489        let queue = WaitQueue::new();
490
491        let handle1 = queue.register("build:/project", "agent-1", 5, false).await;
492        let handle2 = queue.register("build:/project", "agent-2", 5, false).await;
493
494        assert_eq!(handle1.initial_position, 0);
495        assert_eq!(handle2.initial_position, 1);
496
497        assert_eq!(queue.position("build:/project", "agent-1").await, Some(0));
498        assert_eq!(queue.position("build:/project", "agent-2").await, Some(1));
499        assert_eq!(queue.queue_length("build:/project").await, 2);
500    }
501
502    #[tokio::test]
503    async fn test_priority_ordering() {
504        let queue = WaitQueue::new();
505
506        // Register with different priorities
507        let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
508        let handle2 = queue.register("build:/project", "agent-2", 1, false).await; // Higher priority
509        let _handle3 = queue.register("build:/project", "agent-3", 10, false).await; // Lower priority
510
511        // agent-2 should be at front (priority 1)
512        assert_eq!(handle2.initial_position, 0);
513        assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
514        assert_eq!(queue.position("build:/project", "agent-1").await, Some(1));
515        assert_eq!(queue.position("build:/project", "agent-3").await, Some(2));
516    }
517
518    #[tokio::test]
519    async fn test_cancel() {
520        let queue = WaitQueue::new();
521
522        let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
523        let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
524
525        assert!(queue.cancel("build:/project", "agent-1").await);
526        assert_eq!(queue.position("build:/project", "agent-1").await, None);
527        assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
528        assert_eq!(queue.queue_length("build:/project").await, 1);
529    }
530
531    #[tokio::test]
532    async fn test_notify_released() {
533        let queue = WaitQueue::new();
534
535        let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
536        let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
537
538        // Notify release - should return agent-1
539        let next = queue.notify_released("build:/project").await;
540        assert_eq!(next, Some("agent-1".to_string()));
541
542        // handle1.ready should now be signaled
543        // (Can't easily test this without spawning tasks)
544
545        // agent-2 should now be at front
546        assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
547        assert_eq!(queue.queue_length("build:/project").await, 1);
548    }
549
550    #[tokio::test]
551    async fn test_empty_queue_cleanup() {
552        let queue = WaitQueue::new();
553
554        let _handle = queue.register("build:/project", "agent-1", 5, false).await;
555        assert!(queue.cancel("build:/project", "agent-1").await);
556
557        // Queue should be removed when empty
558        assert_eq!(queue.queue_length("build:/project").await, 0);
559        assert!(queue.list_queues().await.is_empty());
560    }
561
562    #[tokio::test]
563    async fn test_wait_time_estimation() {
564        let queue = WaitQueue::new();
565
566        // Record some wait times
567        queue
568            .record_wait_time("build:/project", Duration::from_secs(10))
569            .await;
570        queue
571            .record_wait_time("build:/project", Duration::from_secs(20))
572            .await;
573        queue
574            .record_wait_time("build:/project", Duration::from_secs(30))
575            .await;
576
577        let estimate = queue.estimate_wait("build:/project").await.unwrap();
578        assert_eq!(estimate, Duration::from_secs(20)); // Average of 10, 20, 30
579    }
580
581    #[tokio::test]
582    async fn test_is_waiting() {
583        let queue = WaitQueue::new();
584
585        let _handle = queue.register("build:/project", "agent-1", 5, false).await;
586
587        assert!(queue.is_waiting("agent-1").await);
588        assert!(!queue.is_waiting("agent-2").await);
589    }
590
591    #[tokio::test]
592    async fn test_waiting_for() {
593        let queue = WaitQueue::new();
594
595        let _handle1 = queue.register("build:/project1", "agent-1", 5, false).await;
596        let _handle2 = queue.register("build:/project2", "agent-1", 5, false).await;
597
598        let waiting = queue.waiting_for("agent-1").await;
599        assert_eq!(waiting.len(), 2);
600        assert!(waiting.contains(&"build:/project1".to_string()));
601        assert!(waiting.contains(&"build:/project2".to_string()));
602    }
603
604    #[tokio::test]
605    async fn test_peek_next() {
606        let queue = WaitQueue::new();
607
608        let _handle = queue.register("build:/project", "agent-1", 5, true).await;
609
610        let next = queue.peek_next("build:/project").await.unwrap();
611        assert_eq!(next.agent_id, "agent-1");
612        assert_eq!(next.priority, 5);
613        assert!(next.auto_acquire);
614
615        // Queue should still have the entry
616        assert_eq!(queue.queue_length("build:/project").await, 1);
617    }
618
619    #[tokio::test]
620    async fn test_should_auto_acquire() {
621        let queue = WaitQueue::new();
622
623        let _handle1 = queue.register("build:/project", "agent-1", 5, true).await;
624        let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
625
626        assert!(queue.should_auto_acquire("build:/project", "agent-1").await);
627        assert!(!queue.should_auto_acquire("build:/project", "agent-2").await);
628    }
629
630    #[tokio::test]
631    async fn test_queue_status() {
632        let queue = WaitQueue::new();
633
634        let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
635        let _handle2 = queue.register("build:/project", "agent-2", 3, true).await;
636
637        let status = queue.get_queue_status("build:/project").await.unwrap();
638        assert_eq!(status.queue_length, 2);
639        assert_eq!(status.waiters.len(), 2);
640
641        // agent-2 should be first (priority 3 < 5)
642        assert_eq!(status.waiters[0].agent_id, "agent-2");
643        assert_eq!(status.waiters[1].agent_id, "agent-1");
644    }
645
646    #[tokio::test]
647    async fn test_event_subscription() {
648        let queue = WaitQueue::new();
649        let mut receiver = queue.subscribe();
650
651        let _handle = queue.register("build:/project", "agent-1", 5, false).await;
652
653        // Should receive registered event
654        let event = receiver.try_recv().unwrap();
655        match event {
656            WaitQueueEvent::Registered { agent_id, .. } => {
657                assert_eq!(agent_id, "agent-1");
658            }
659            _ => panic!("Expected Registered event"),
660        }
661    }
662}