Skip to main content

haagenti_network/
priority.rs

1//! Priority queue for fragment fetching
2
3use haagenti_fragments::FragmentId;
4use serde::{Deserialize, Serialize};
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::sync::{Arc, Mutex};
8
9/// Priority level for fragment loading
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
11pub enum Priority {
12    /// Critical - needed immediately (blocking inference)
13    Critical = 0,
14    /// High - needed soon (next few steps)
15    High = 1,
16    /// Normal - standard priority
17    #[default]
18    Normal = 2,
19    /// Low - prefetch/background
20    Low = 3,
21    /// Idle - load when nothing else to do
22    Idle = 4,
23}
24
25impl Priority {
26    /// Get numeric priority (lower = higher priority)
27    pub fn as_u8(&self) -> u8 {
28        *self as u8
29    }
30
31    /// Create from numeric priority
32    pub fn from_u8(val: u8) -> Self {
33        match val {
34            0 => Priority::Critical,
35            1 => Priority::High,
36            2 => Priority::Normal,
37            3 => Priority::Low,
38            _ => Priority::Idle,
39        }
40    }
41}
42
43impl PartialOrd for Priority {
44    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49impl Ord for Priority {
50    fn cmp(&self, other: &Self) -> Ordering {
51        // Lower value = higher priority, so reverse the comparison
52        other.as_u8().cmp(&self.as_u8())
53    }
54}
55
56/// A fragment with associated priority
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PrioritizedFragment {
59    /// Fragment ID
60    pub fragment_id: FragmentId,
61    /// Priority level
62    pub priority: Priority,
63    /// Importance score (0.0 - 1.0, from ML model)
64    pub importance: f32,
65    /// Size in bytes (for bandwidth planning)
66    pub size: usize,
67    /// Deadline (if any)
68    pub deadline_ms: Option<u64>,
69    /// Creation timestamp
70    pub created_at: u64,
71}
72
73impl PrioritizedFragment {
74    /// Create a new prioritized fragment
75    pub fn new(fragment_id: FragmentId, priority: Priority) -> Self {
76        Self {
77            fragment_id,
78            priority,
79            importance: 0.5,
80            size: 0,
81            deadline_ms: None,
82            created_at: std::time::SystemTime::now()
83                .duration_since(std::time::UNIX_EPOCH)
84                .unwrap()
85                .as_millis() as u64,
86        }
87    }
88
89    /// Set importance score
90    pub fn with_importance(mut self, importance: f32) -> Self {
91        self.importance = importance.clamp(0.0, 1.0);
92        self
93    }
94
95    /// Set size
96    pub fn with_size(mut self, size: usize) -> Self {
97        self.size = size;
98        self
99    }
100
101    /// Set deadline
102    pub fn with_deadline(mut self, deadline_ms: u64) -> Self {
103        self.deadline_ms = Some(deadline_ms);
104        self
105    }
106
107    /// Compute effective priority score (lower = higher priority)
108    pub fn effective_priority(&self) -> f64 {
109        let base = self.priority.as_u8() as f64;
110        let importance_boost = (1.0 - self.importance as f64) * 0.5;
111
112        // Deadline urgency
113        let deadline_boost = if let Some(deadline) = self.deadline_ms {
114            let now = std::time::SystemTime::now()
115                .duration_since(std::time::UNIX_EPOCH)
116                .unwrap()
117                .as_millis() as u64;
118
119            if deadline <= now {
120                -1.0 // Past deadline, highest priority
121            } else {
122                let remaining = (deadline - now) as f64;
123                let urgency = 1.0 - (remaining / 10000.0).min(1.0); // 10s window
124                -urgency * 0.5
125            }
126        } else {
127            0.0
128        };
129
130        base + importance_boost + deadline_boost
131    }
132}
133
134impl PartialEq for PrioritizedFragment {
135    fn eq(&self, other: &Self) -> bool {
136        self.fragment_id == other.fragment_id
137    }
138}
139
140impl Eq for PrioritizedFragment {}
141
142impl PartialOrd for PrioritizedFragment {
143    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
144        Some(self.cmp(other))
145    }
146}
147
148impl Ord for PrioritizedFragment {
149    fn cmp(&self, other: &Self) -> Ordering {
150        // Lower effective priority = should come first
151        // BinaryHeap is max-heap, so we reverse
152        other
153            .effective_priority()
154            .partial_cmp(&self.effective_priority())
155            .unwrap_or(Ordering::Equal)
156    }
157}
158
159/// Thread-safe priority queue for fragments
160pub struct PriorityQueue {
161    heap: Arc<Mutex<BinaryHeap<PrioritizedFragment>>>,
162    pending: Arc<Mutex<std::collections::HashSet<FragmentId>>>,
163}
164
165impl PriorityQueue {
166    /// Create a new priority queue
167    pub fn new() -> Self {
168        Self {
169            heap: Arc::new(Mutex::new(BinaryHeap::new())),
170            pending: Arc::new(Mutex::new(std::collections::HashSet::new())),
171        }
172    }
173
174    /// Push a fragment onto the queue
175    pub fn push(&self, fragment: PrioritizedFragment) {
176        let mut pending = self.pending.lock().unwrap();
177        if pending.contains(&fragment.fragment_id) {
178            return; // Already queued
179        }
180        pending.insert(fragment.fragment_id);
181
182        let mut heap = self.heap.lock().unwrap();
183        heap.push(fragment);
184    }
185
186    /// Pop the highest priority fragment
187    pub fn pop(&self) -> Option<PrioritizedFragment> {
188        let mut heap = self.heap.lock().unwrap();
189        let fragment = heap.pop()?;
190
191        let mut pending = self.pending.lock().unwrap();
192        pending.remove(&fragment.fragment_id);
193
194        Some(fragment)
195    }
196
197    /// Peek at the highest priority fragment
198    pub fn peek(&self) -> Option<PrioritizedFragment> {
199        let heap = self.heap.lock().unwrap();
200        heap.peek().cloned()
201    }
202
203    /// Get queue length
204    pub fn len(&self) -> usize {
205        self.heap.lock().unwrap().len()
206    }
207
208    /// Check if queue is empty
209    pub fn is_empty(&self) -> bool {
210        self.heap.lock().unwrap().is_empty()
211    }
212
213    /// Check if a fragment is already queued
214    pub fn contains(&self, fragment_id: &FragmentId) -> bool {
215        self.pending.lock().unwrap().contains(fragment_id)
216    }
217
218    /// Clear the queue
219    pub fn clear(&self) {
220        self.heap.lock().unwrap().clear();
221        self.pending.lock().unwrap().clear();
222    }
223
224    /// Update priority of a fragment
225    pub fn update_priority(&self, fragment_id: &FragmentId, new_priority: Priority) {
226        let mut heap = self.heap.lock().unwrap();
227
228        // Remove and re-add with new priority
229        let items: Vec<_> = heap.drain().collect();
230        for mut item in items {
231            if item.fragment_id == *fragment_id {
232                item.priority = new_priority;
233            }
234            heap.push(item);
235        }
236    }
237
238    /// Get all fragments at or above a priority level
239    pub fn get_by_priority(&self, min_priority: Priority) -> Vec<PrioritizedFragment> {
240        let heap = self.heap.lock().unwrap();
241        heap.iter()
242            .filter(|f| f.priority <= min_priority)
243            .cloned()
244            .collect()
245    }
246}
247
248impl Default for PriorityQueue {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_priority_ordering() {
260        let queue = PriorityQueue::new();
261
262        let low = PrioritizedFragment::new(FragmentId::new([1; 16]), Priority::Low);
263        let high = PrioritizedFragment::new(FragmentId::new([2; 16]), Priority::High);
264        let critical = PrioritizedFragment::new(FragmentId::new([3; 16]), Priority::Critical);
265
266        queue.push(low);
267        queue.push(high);
268        queue.push(critical);
269
270        assert_eq!(queue.pop().unwrap().priority, Priority::Critical);
271        assert_eq!(queue.pop().unwrap().priority, Priority::High);
272        assert_eq!(queue.pop().unwrap().priority, Priority::Low);
273    }
274
275    #[test]
276    fn test_importance_affects_priority() {
277        let queue = PriorityQueue::new();
278
279        let normal_low_importance =
280            PrioritizedFragment::new(FragmentId::new([1; 16]), Priority::Normal)
281                .with_importance(0.2);
282        let normal_high_importance =
283            PrioritizedFragment::new(FragmentId::new([2; 16]), Priority::Normal)
284                .with_importance(0.9);
285
286        queue.push(normal_low_importance.clone());
287        queue.push(normal_high_importance.clone());
288
289        // Higher importance should come first
290        let first = queue.pop().unwrap();
291        assert_eq!(first.fragment_id, normal_high_importance.fragment_id);
292    }
293}