Skip to main content

symbi_runtime/scheduler/
priority_queue.rs

1//! Priority queue implementation for agent scheduling.
2//!
3//! Uses a BinaryHeap for O(log n) push/pop with a HashMap for O(1)
4//! membership checks. The index tracks presence only — not heap positions,
5//! which are unstable across operations.
6
7use std::collections::BinaryHeap;
8use std::collections::HashMap;
9
10use crate::types::AgentId;
11
12/// Priority queue for scheduled tasks.
13///
14/// Provides O(log n) push/pop, O(1) contains, and O(n) remove-by-id.
15/// The remove-by-id cost is acceptable because it's infrequent relative
16/// to push/pop in the scheduler hot path.
17#[derive(Debug)]
18pub struct PriorityQueue<T> {
19    heap: BinaryHeap<T>,
20    /// Tracks which agent IDs are in the queue (presence only, not position).
21    members: HashMap<AgentId, ()>,
22}
23
24impl<T> PriorityQueue<T>
25where
26    T: Ord + Clone + HasAgentId,
27{
28    /// Create a new priority queue.
29    pub fn new() -> Self {
30        Self {
31            heap: BinaryHeap::new(),
32            members: HashMap::new(),
33        }
34    }
35
36    /// Create a new priority queue with pre-allocated capacity.
37    pub fn with_capacity(capacity: usize) -> Self {
38        Self {
39            heap: BinaryHeap::with_capacity(capacity),
40            members: HashMap::with_capacity(capacity),
41        }
42    }
43
44    /// Add an item to the queue. O(log n).
45    pub fn push(&mut self, item: T) {
46        let agent_id = item.agent_id();
47        self.members.insert(agent_id, ());
48        self.heap.push(item);
49    }
50
51    /// Remove and return the highest priority item. O(log n).
52    pub fn pop(&mut self) -> Option<T> {
53        let item = self.heap.pop()?;
54        self.members.remove(&item.agent_id());
55        Some(item)
56    }
57
58    /// Remove a specific item by agent ID. O(n) — acceptable for
59    /// infrequent cancellations.
60    pub fn remove(&mut self, agent_id: &AgentId) -> Option<T> {
61        self.members.remove(agent_id)?;
62
63        // Drain heap, extract target, rebuild.
64        let items: Vec<T> = self.heap.drain().collect();
65        let mut removed = None;
66
67        // Rebuild heap excluding the target. Pre-allocate to avoid
68        // repeated growth.
69        let mut remaining = Vec::with_capacity(items.len() - 1);
70        for item in items {
71            if removed.is_none() && &item.agent_id() == agent_id {
72                removed = Some(item);
73            } else {
74                remaining.push(item);
75            }
76        }
77        self.heap = remaining.into_iter().collect();
78
79        removed
80    }
81
82    /// Check if the queue contains an agent. O(1).
83    pub fn contains(&self, agent_id: &AgentId) -> bool {
84        self.members.contains_key(agent_id)
85    }
86
87    /// Find an item by agent ID. O(n) — use sparingly.
88    pub fn find(&self, agent_id: &AgentId) -> Option<&T> {
89        if !self.members.contains_key(agent_id) {
90            return None;
91        }
92        self.heap.iter().find(|item| &item.agent_id() == agent_id)
93    }
94
95    /// Get the number of items in the queue.
96    pub fn len(&self) -> usize {
97        self.heap.len()
98    }
99
100    /// Check if the queue is empty.
101    pub fn is_empty(&self) -> bool {
102        self.heap.is_empty()
103    }
104
105    /// Peek at the highest priority item without removing it. O(1).
106    pub fn peek(&self) -> Option<&T> {
107        self.heap.peek()
108    }
109
110    /// Clear all items from the queue.
111    pub fn clear(&mut self) {
112        self.heap.clear();
113        self.members.clear();
114    }
115
116    /// Get all items as a vector (for debugging/monitoring).
117    pub fn to_vec(&self) -> Vec<T> {
118        self.heap.iter().cloned().collect()
119    }
120}
121
122impl<T> Default for PriorityQueue<T>
123where
124    T: Ord + Clone + HasAgentId,
125{
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131/// Trait for items that have an agent ID.
132pub trait HasAgentId {
133    fn agent_id(&self) -> AgentId;
134}
135
136impl HasAgentId for super::ScheduledTask {
137    fn agent_id(&self) -> AgentId {
138        self.agent_id
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use crate::scheduler::ScheduledTask;
146    use crate::types::{
147        AgentConfig, AgentId, ExecutionMode, Priority, ResourceLimits, SecurityTier,
148    };
149    use std::collections::HashMap;
150
151    fn create_test_task(priority: Priority) -> ScheduledTask {
152        let agent_id = AgentId::new();
153        let config = AgentConfig {
154            id: agent_id,
155            name: "test".to_string(),
156            dsl_source: "test".to_string(),
157            execution_mode: ExecutionMode::Ephemeral,
158            security_tier: SecurityTier::Tier1,
159            resource_limits: ResourceLimits::default(),
160            capabilities: vec![],
161            policies: vec![],
162            metadata: HashMap::new(),
163            priority,
164        };
165        ScheduledTask::new(config)
166    }
167
168    #[test]
169    fn test_priority_queue_ordering() {
170        let mut queue = PriorityQueue::new();
171
172        let low_task = create_test_task(Priority::Low);
173        let high_task = create_test_task(Priority::High);
174        let normal_task = create_test_task(Priority::Normal);
175
176        queue.push(low_task);
177        queue.push(high_task);
178        queue.push(normal_task);
179
180        assert_eq!(queue.pop().unwrap().priority, Priority::High);
181        assert_eq!(queue.pop().unwrap().priority, Priority::Normal);
182        assert_eq!(queue.pop().unwrap().priority, Priority::Low);
183    }
184
185    #[test]
186    fn test_priority_queue_remove() {
187        let mut queue = PriorityQueue::new();
188
189        let task1 = create_test_task(Priority::High);
190        let task2 = create_test_task(Priority::Normal);
191        let task3 = create_test_task(Priority::Low);
192
193        let agent_id2 = task2.agent_id;
194
195        queue.push(task1);
196        queue.push(task2);
197        queue.push(task3);
198
199        assert_eq!(queue.len(), 3);
200        assert!(queue.contains(&agent_id2));
201
202        let removed = queue.remove(&agent_id2);
203        assert!(removed.is_some());
204        assert_eq!(removed.unwrap().agent_id, agent_id2);
205        assert_eq!(queue.len(), 2);
206        assert!(!queue.contains(&agent_id2));
207    }
208
209    #[test]
210    fn test_pop_maintains_membership() {
211        let mut queue = PriorityQueue::new();
212        let task = create_test_task(Priority::High);
213        let id = task.agent_id;
214
215        queue.push(task);
216        assert!(queue.contains(&id));
217
218        queue.pop();
219        assert!(!queue.contains(&id));
220    }
221
222    #[test]
223    fn test_remove_nonexistent() {
224        let mut queue: PriorityQueue<ScheduledTask> = PriorityQueue::new();
225        let fake_id = AgentId::new();
226        assert!(queue.remove(&fake_id).is_none());
227    }
228}