haagenti_network/
priority.rs1use haagenti_fragments::FragmentId;
4use serde::{Deserialize, Serialize};
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::sync::{Arc, Mutex};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
11pub enum Priority {
12 Critical = 0,
14 High = 1,
16 #[default]
18 Normal = 2,
19 Low = 3,
21 Idle = 4,
23}
24
25impl Priority {
26 pub fn as_u8(&self) -> u8 {
28 *self as u8
29 }
30
31 pub fn from_u8(val: u8) -> Self {
33 match val {
34 0 => Priority::Critical,
35 1 => Priority::High,
36 2 => Priority::Normal,
37 3 => Priority::Low,
38 _ => Priority::Idle,
39 }
40 }
41}
42
43impl PartialOrd for Priority {
44 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49impl Ord for Priority {
50 fn cmp(&self, other: &Self) -> Ordering {
51 other.as_u8().cmp(&self.as_u8())
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PrioritizedFragment {
59 pub fragment_id: FragmentId,
61 pub priority: Priority,
63 pub importance: f32,
65 pub size: usize,
67 pub deadline_ms: Option<u64>,
69 pub created_at: u64,
71}
72
73impl PrioritizedFragment {
74 pub fn new(fragment_id: FragmentId, priority: Priority) -> Self {
76 Self {
77 fragment_id,
78 priority,
79 importance: 0.5,
80 size: 0,
81 deadline_ms: None,
82 created_at: std::time::SystemTime::now()
83 .duration_since(std::time::UNIX_EPOCH)
84 .unwrap()
85 .as_millis() as u64,
86 }
87 }
88
89 pub fn with_importance(mut self, importance: f32) -> Self {
91 self.importance = importance.clamp(0.0, 1.0);
92 self
93 }
94
95 pub fn with_size(mut self, size: usize) -> Self {
97 self.size = size;
98 self
99 }
100
101 pub fn with_deadline(mut self, deadline_ms: u64) -> Self {
103 self.deadline_ms = Some(deadline_ms);
104 self
105 }
106
107 pub fn effective_priority(&self) -> f64 {
109 let base = self.priority.as_u8() as f64;
110 let importance_boost = (1.0 - self.importance as f64) * 0.5;
111
112 let deadline_boost = if let Some(deadline) = self.deadline_ms {
114 let now = std::time::SystemTime::now()
115 .duration_since(std::time::UNIX_EPOCH)
116 .unwrap()
117 .as_millis() as u64;
118
119 if deadline <= now {
120 -1.0 } else {
122 let remaining = (deadline - now) as f64;
123 let urgency = 1.0 - (remaining / 10000.0).min(1.0); -urgency * 0.5
125 }
126 } else {
127 0.0
128 };
129
130 base + importance_boost + deadline_boost
131 }
132}
133
134impl PartialEq for PrioritizedFragment {
135 fn eq(&self, other: &Self) -> bool {
136 self.fragment_id == other.fragment_id
137 }
138}
139
140impl Eq for PrioritizedFragment {}
141
142impl PartialOrd for PrioritizedFragment {
143 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
144 Some(self.cmp(other))
145 }
146}
147
148impl Ord for PrioritizedFragment {
149 fn cmp(&self, other: &Self) -> Ordering {
150 other
153 .effective_priority()
154 .partial_cmp(&self.effective_priority())
155 .unwrap_or(Ordering::Equal)
156 }
157}
158
159pub struct PriorityQueue {
161 heap: Arc<Mutex<BinaryHeap<PrioritizedFragment>>>,
162 pending: Arc<Mutex<std::collections::HashSet<FragmentId>>>,
163}
164
165impl PriorityQueue {
166 pub fn new() -> Self {
168 Self {
169 heap: Arc::new(Mutex::new(BinaryHeap::new())),
170 pending: Arc::new(Mutex::new(std::collections::HashSet::new())),
171 }
172 }
173
174 pub fn push(&self, fragment: PrioritizedFragment) {
176 let mut pending = self.pending.lock().unwrap();
177 if pending.contains(&fragment.fragment_id) {
178 return; }
180 pending.insert(fragment.fragment_id);
181
182 let mut heap = self.heap.lock().unwrap();
183 heap.push(fragment);
184 }
185
186 pub fn pop(&self) -> Option<PrioritizedFragment> {
188 let mut heap = self.heap.lock().unwrap();
189 let fragment = heap.pop()?;
190
191 let mut pending = self.pending.lock().unwrap();
192 pending.remove(&fragment.fragment_id);
193
194 Some(fragment)
195 }
196
197 pub fn peek(&self) -> Option<PrioritizedFragment> {
199 let heap = self.heap.lock().unwrap();
200 heap.peek().cloned()
201 }
202
203 pub fn len(&self) -> usize {
205 self.heap.lock().unwrap().len()
206 }
207
208 pub fn is_empty(&self) -> bool {
210 self.heap.lock().unwrap().is_empty()
211 }
212
213 pub fn contains(&self, fragment_id: &FragmentId) -> bool {
215 self.pending.lock().unwrap().contains(fragment_id)
216 }
217
218 pub fn clear(&self) {
220 self.heap.lock().unwrap().clear();
221 self.pending.lock().unwrap().clear();
222 }
223
224 pub fn update_priority(&self, fragment_id: &FragmentId, new_priority: Priority) {
226 let mut heap = self.heap.lock().unwrap();
227
228 let items: Vec<_> = heap.drain().collect();
230 for mut item in items {
231 if item.fragment_id == *fragment_id {
232 item.priority = new_priority;
233 }
234 heap.push(item);
235 }
236 }
237
238 pub fn get_by_priority(&self, min_priority: Priority) -> Vec<PrioritizedFragment> {
240 let heap = self.heap.lock().unwrap();
241 heap.iter()
242 .filter(|f| f.priority <= min_priority)
243 .cloned()
244 .collect()
245 }
246}
247
248impl Default for PriorityQueue {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn test_priority_ordering() {
260 let queue = PriorityQueue::new();
261
262 let low = PrioritizedFragment::new(FragmentId::new([1; 16]), Priority::Low);
263 let high = PrioritizedFragment::new(FragmentId::new([2; 16]), Priority::High);
264 let critical = PrioritizedFragment::new(FragmentId::new([3; 16]), Priority::Critical);
265
266 queue.push(low);
267 queue.push(high);
268 queue.push(critical);
269
270 assert_eq!(queue.pop().unwrap().priority, Priority::Critical);
271 assert_eq!(queue.pop().unwrap().priority, Priority::High);
272 assert_eq!(queue.pop().unwrap().priority, Priority::Low);
273 }
274
275 #[test]
276 fn test_importance_affects_priority() {
277 let queue = PriorityQueue::new();
278
279 let normal_low_importance =
280 PrioritizedFragment::new(FragmentId::new([1; 16]), Priority::Normal)
281 .with_importance(0.2);
282 let normal_high_importance =
283 PrioritizedFragment::new(FragmentId::new([2; 16]), Priority::Normal)
284 .with_importance(0.9);
285
286 queue.push(normal_low_importance.clone());
287 queue.push(normal_high_importance.clone());
288
289 let first = queue.pop().unwrap();
291 assert_eq!(first.fragment_id, normal_high_importance.fragment_id);
292 }
293}