1use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
9use crate::errors::QueueError;
10use crate::policy::EdgePolicy;
11use crate::prelude::BatchView;
12use crate::types::MessageToken;
13use crate::types::QoSClass;
14
15extern crate alloc;
16
17pub struct Priority2<QHi, QLo>
20where
21 QHi: Edge,
22 QLo: Edge,
23{
24 hi: QHi,
25 lo: QLo,
26}
27
28impl<QHi, QLo> Priority2<QHi, QLo>
29where
30 QHi: Edge,
31 QLo: Edge,
32{
33 pub fn new(hi: QHi, lo: QLo) -> Self {
35 Self { hi, lo }
36 }
37}
38
39impl<QHi, QLo> Edge for Priority2<QHi, QLo>
40where
41 QHi: Edge,
42 QLo: Edge,
43{
44 fn try_push<H: crate::prelude::HeaderStore>(
45 &mut self,
46 token: MessageToken,
47 policy: &EdgePolicy,
48 headers: &H,
49 ) -> EnqueueResult {
50 let qos = headers
52 .peek_header(token)
53 .map(|h| *h.qos())
54 .unwrap_or(QoSClass::BestEffort);
55
56 match qos {
57 QoSClass::LatencyCritical => self.hi.try_push(token, policy, headers),
58 _ => self.lo.try_push(token, policy, headers),
59 }
60 }
61
62 fn try_pop<H: crate::prelude::HeaderStore>(
63 &mut self,
64 headers: &H,
65 ) -> Result<MessageToken, QueueError> {
66 match self.hi.try_pop(headers) {
67 Ok(tok) => Ok(tok),
68 Err(QueueError::Empty) => self.lo.try_pop(headers),
69 Err(e) => Err(e),
70 }
71 }
72
73 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
74 let hi = self.hi.occupancy(policy);
75 let lo = self.lo.occupancy(policy);
76 let items = hi.items() + lo.items();
77 let bytes = hi.bytes() + lo.bytes();
78 let watermark = policy.watermark(items, bytes);
79 EdgeOccupancy::new(items, bytes, watermark)
80 }
81
82 fn is_empty(&self) -> bool {
83 self.hi.is_empty() && self.lo.is_empty()
84 }
85
86 fn try_peek(&self) -> Result<MessageToken, QueueError> {
87 match self.hi.try_peek() {
88 Ok(tok) => Ok(tok),
89 Err(QueueError::Empty) => self.lo.try_peek(),
90 Err(e) => Err(e),
91 }
92 }
93
94 fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
95 match self.hi.try_peek_at(index) {
96 Ok(tok) => Ok(tok),
97 Err(QueueError::Empty) => self.lo.try_peek_at(index),
98 Err(e) => Err(e),
99 }
100 }
101
102 fn try_pop_batch<H: crate::prelude::HeaderStore>(
103 &mut self,
104 policy: &crate::policy::BatchingPolicy,
105 headers: &H,
106 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
107 match self.hi.try_pop_batch(policy, headers) {
108 Ok(batch) => Ok(batch),
109 Err(QueueError::Empty) => self.lo.try_pop_batch(policy, headers),
110 Err(e) => Err(e),
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 use crate::edge::bench::TestSpscRingBuf;
120 use crate::memory::manager::MemoryManager;
121 use crate::memory::static_manager::StaticMemoryManager;
122 use crate::message::{Message, MessageHeader};
123 use crate::policy::{AdmissionPolicy, BatchingPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
124 use crate::prelude::{create_test_tensor_filled_with, HeaderStore as _, TestTensor};
125 use crate::types::{QoSClass, Ticks};
126
127 const POLICY: EdgePolicy = EdgePolicy::new(
128 QueueCaps::new(8, 6, None, None),
129 AdmissionPolicy::DropNewest,
130 OverBudgetAction::Drop,
131 );
132
133 const MGR_DEPTH: usize = 32;
134
135 fn make_msg(tick: u64, qos: QoSClass) -> Message<TestTensor> {
136 let mut h = MessageHeader::empty();
137 h.set_creation_tick(Ticks::new(tick));
138 h.set_qos(qos);
139 Message::new(h, create_test_tensor_filled_with(0))
140 }
141
142 fn store(
143 mgr: &mut StaticMemoryManager<TestTensor, MGR_DEPTH>,
144 msg: Message<TestTensor>,
145 ) -> MessageToken {
146 mgr.store(msg).expect("memory manager store failed")
147 }
148
149 crate::run_edge_contract_tests!(priority2_edge_contract, || {
155 let hi = TestSpscRingBuf::<16>::new();
156 let lo = TestSpscRingBuf::<16>::new();
157 Priority2::new(hi, lo)
158 });
159
160 #[test]
163 fn routes_latency_critical_to_hi_and_others_to_lo() {
164 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
165 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
166
167 let t_hi = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
168 let t_lo = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
169
170 assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
171 assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
172
173 let a = q.try_pop(&mgr).expect("pop hi");
175 let b = q.try_pop(&mgr).expect("pop lo");
176
177 let ha = mgr.peek_header(a).expect("ha");
178 let hb = mgr.peek_header(b).expect("hb");
179 assert_eq!(*ha.creation_tick(), Ticks::new(1));
180 assert_eq!(*hb.creation_tick(), Ticks::new(2));
181 }
182
183 #[test]
184 fn peek_prefers_hi_when_both_non_empty() {
185 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
186 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
187
188 let t_lo = store(&mut mgr, make_msg(10, QoSClass::BestEffort));
190 let t_hi = store(&mut mgr, make_msg(20, QoSClass::LatencyCritical));
191
192 assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
193 assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
194
195 let peek_tok = q.try_peek().expect("peek");
196 let ph = mgr.peek_header(peek_tok).expect("peek header");
197 assert_eq!(*ph.creation_tick(), Ticks::new(20));
198
199 let popped = q.try_pop(&mgr).expect("pop");
201 assert_eq!(popped, peek_tok);
202 let popped_h = mgr.peek_header(popped).expect("popped header");
203 assert_eq!(*popped_h.creation_tick(), Ticks::new(20));
204 }
205
206 #[test]
207 fn pop_batch_prefers_hi_when_non_empty() {
208 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
209 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
210
211 let mut lo_tokens = [MessageToken::INVALID; 3];
213 for (i, t) in (1u64..=3u64).enumerate() {
214 lo_tokens[i] = store(&mut mgr, make_msg(t, QoSClass::BestEffort));
215 assert_eq!(
216 q.try_push(lo_tokens[i], &POLICY, &mgr),
217 EnqueueResult::Enqueued
218 );
219 }
220
221 let mut hi_tokens = [MessageToken::INVALID; 2];
223 for (i, t) in (100u64..=101u64).enumerate() {
224 hi_tokens[i] = store(&mut mgr, make_msg(t, QoSClass::LatencyCritical));
225 assert_eq!(
226 q.try_push(hi_tokens[i], &POLICY, &mgr),
227 EnqueueResult::Enqueued
228 );
229 }
230
231 let batch_policy = BatchingPolicy::fixed(4);
232
233 let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
235 assert_eq!(batch.len(), 2);
236 let mut iter = batch.iter();
237 let a = iter.next().expect("batch[0]");
238 let b = iter.next().expect("batch[1]");
239 assert_eq!(
240 *mgr.peek_header(*a).unwrap().creation_tick(),
241 Ticks::new(100)
242 );
243 assert_eq!(
244 *mgr.peek_header(*b).unwrap().creation_tick(),
245 Ticks::new(101)
246 );
247 assert!(iter.next().is_none());
248
249 let ra = q.try_pop(&mgr).expect("lo-1");
251 let rb = q.try_pop(&mgr).expect("lo-2");
252 let rc = q.try_pop(&mgr).expect("lo-3");
253 assert_eq!(*mgr.peek_header(ra).unwrap().creation_tick(), Ticks::new(1));
254 assert_eq!(*mgr.peek_header(rb).unwrap().creation_tick(), Ticks::new(2));
255 assert_eq!(*mgr.peek_header(rc).unwrap().creation_tick(), Ticks::new(3));
256 }
257
258 #[test]
259 fn occupancy_is_sum_of_lanes() {
260 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
261 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
262
263 let t1 = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
264 let t2 = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
265
266 assert_eq!(q.try_push(t1, &POLICY, &mgr), EnqueueResult::Enqueued);
267 assert_eq!(q.try_push(t2, &POLICY, &mgr), EnqueueResult::Enqueued);
268
269 let occ = q.occupancy(&POLICY);
270 assert_eq!(*occ.items(), 2usize);
271
272 assert!(*occ.bytes() > 0usize);
274 }
275
276 #[test]
277 fn is_empty_reflects_both_lanes() {
278 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
279 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
280
281 assert!(q.is_empty());
282
283 let t_hi = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
285 assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
286 assert!(!q.is_empty());
287
288 let _ = q.try_pop(&mgr).expect("pop hi");
290 assert!(q.is_empty());
291
292 let t_lo = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
294 assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
295 assert!(!q.is_empty());
296
297 let _ = q.try_pop(&mgr).expect("pop lo");
299 assert!(q.is_empty());
300 }
301
302 #[test]
303 fn background_qos_routes_to_lo() {
304 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
305 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
306
307 let t_bg = store(&mut mgr, make_msg(1, QoSClass::Background));
308 let t_be = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
309
310 assert_eq!(q.try_push(t_bg, &POLICY, &mgr), EnqueueResult::Enqueued);
311 assert_eq!(q.try_push(t_be, &POLICY, &mgr), EnqueueResult::Enqueued);
312
313 let a = q.try_pop(&mgr).expect("pop bg");
316 let b = q.try_pop(&mgr).expect("pop be");
317 assert_eq!(*mgr.peek_header(a).unwrap().creation_tick(), Ticks::new(1));
318 assert_eq!(*mgr.peek_header(b).unwrap().creation_tick(), Ticks::new(2));
319 assert!(q.is_empty());
320 }
321
322 #[test]
323 fn lane_specific_admission_interactions_smoke_test() {
324 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
325 let mut q = Priority2::new(TestSpscRingBuf::<2>::new(), TestSpscRingBuf::<2>::new());
326
327 let small_caps_policy = EdgePolicy::new(
328 QueueCaps::new(2, 1, None, None),
329 AdmissionPolicy::DropOldest,
330 OverBudgetAction::Drop,
331 );
332
333 let t1 = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
334 let t2 = store(&mut mgr, make_msg(2, QoSClass::LatencyCritical));
335
336 assert_eq!(
337 q.try_push(t1, &small_caps_policy, &mgr),
338 EnqueueResult::Enqueued
339 );
340 assert_eq!(
343 q.try_push(t2, &small_caps_policy, &mgr),
344 EnqueueResult::Enqueued
345 );
346
347 let t3 = store(&mut mgr, make_msg(3, QoSClass::LatencyCritical));
349 assert_eq!(
350 q.try_push(t3, &small_caps_policy, &mgr),
351 EnqueueResult::Rejected
352 );
353
354 let evicted = q.try_pop(&mgr).expect("pre-evict");
356 assert_eq!(evicted, t1);
357 let _ = mgr.free(evicted);
358 assert_eq!(
359 q.try_push(t3, &small_caps_policy, &mgr),
360 EnqueueResult::Enqueued
361 );
362
363 let p1 = q.try_pop(&mgr).expect("pop t2");
365 assert_eq!(*mgr.peek_header(p1).unwrap().creation_tick(), Ticks::new(2));
366 let p2 = q.try_pop(&mgr).expect("pop t3");
367 assert_eq!(*mgr.peek_header(p2).unwrap().creation_tick(), Ticks::new(3));
368 assert!(q.is_empty());
369 }
370
371 #[test]
372 fn hi_drains_before_lo_across_multiple_pops() {
373 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
374 let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
375
376 let t_lo1 = store(&mut mgr, make_msg(1, QoSClass::BestEffort));
378 let t_hi1 = store(&mut mgr, make_msg(2, QoSClass::LatencyCritical));
379 let t_lo2 = store(&mut mgr, make_msg(3, QoSClass::BestEffort));
380 let t_hi2 = store(&mut mgr, make_msg(4, QoSClass::LatencyCritical));
381
382 assert_eq!(q.try_push(t_lo1, &POLICY, &mgr), EnqueueResult::Enqueued);
383 assert_eq!(q.try_push(t_hi1, &POLICY, &mgr), EnqueueResult::Enqueued);
384 assert_eq!(q.try_push(t_lo2, &POLICY, &mgr), EnqueueResult::Enqueued);
385 assert_eq!(q.try_push(t_hi2, &POLICY, &mgr), EnqueueResult::Enqueued);
386
387 let p1 = q.try_pop(&mgr).expect("pop 1");
389 let p2 = q.try_pop(&mgr).expect("pop 2");
390 let p3 = q.try_pop(&mgr).expect("pop 3");
391 let p4 = q.try_pop(&mgr).expect("pop 4");
392
393 assert_eq!(*mgr.peek_header(p1).unwrap().creation_tick(), Ticks::new(2));
394 assert_eq!(*mgr.peek_header(p2).unwrap().creation_tick(), Ticks::new(4));
395 assert_eq!(*mgr.peek_header(p3).unwrap().creation_tick(), Ticks::new(1));
396 assert_eq!(*mgr.peek_header(p4).unwrap().creation_tick(), Ticks::new(3));
397 assert!(q.is_empty());
398 }
399}