Skip to main content

limen_core/edge/
spsc_priority2.rs

1//! Two-lane priority wrapper for SPSC queues (feature: `priority_lanes`).
2//!
3//! This composes two underlying SPSC queues (hi/lo) that store the same
4//! `MessageToken` handles and routes `try_push` by inspecting the message
5//! header's QoS class. `try_pop` always prefers the high-priority lane when
6//! available.
7
8use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
9use crate::errors::QueueError;
10use crate::policy::EdgePolicy;
11use crate::prelude::BatchView;
12use crate::types::MessageToken;
13use crate::types::QoSClass;
14
15extern crate alloc;
16
17/// Two-lane priority queue. The lanes store `MessageToken` values and the
18/// priority decision is made using the provided `HeaderStore`.
19pub struct Priority2<QHi, QLo>
20where
21    QHi: Edge,
22    QLo: Edge,
23{
24    hi: QHi,
25    lo: QLo,
26}
27
28impl<QHi, QLo> Priority2<QHi, QLo>
29where
30    QHi: Edge,
31    QLo: Edge,
32{
33    /// Build a two-lane priority queue from hi/lo queues.
34    pub fn new(hi: QHi, lo: QLo) -> Self {
35        Self { hi, lo }
36    }
37}
38
39impl<QHi, QLo> Edge for Priority2<QHi, QLo>
40where
41    QHi: Edge,
42    QLo: Edge,
43{
44    fn try_push<H: crate::prelude::HeaderStore>(
45        &mut self,
46        token: MessageToken,
47        policy: &EdgePolicy,
48        headers: &H,
49    ) -> EnqueueResult {
50        // Resolve QoS via HeaderStore; default to BestEffort if header missing.
51        let qos = headers
52            .peek_header(token)
53            .map(|h| *h.qos())
54            .unwrap_or(QoSClass::BestEffort);
55
56        match qos {
57            QoSClass::LatencyCritical => self.hi.try_push(token, policy, headers),
58            _ => self.lo.try_push(token, policy, headers),
59        }
60    }
61
62    fn try_pop<H: crate::prelude::HeaderStore>(
63        &mut self,
64        headers: &H,
65    ) -> Result<MessageToken, QueueError> {
66        match self.hi.try_pop(headers) {
67            Ok(tok) => Ok(tok),
68            Err(QueueError::Empty) => self.lo.try_pop(headers),
69            Err(e) => Err(e),
70        }
71    }
72
73    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
74        let hi = self.hi.occupancy(policy);
75        let lo = self.lo.occupancy(policy);
76        let items = hi.items() + lo.items();
77        let bytes = hi.bytes() + lo.bytes();
78        let watermark = policy.watermark(items, bytes);
79        EdgeOccupancy::new(items, bytes, watermark)
80    }
81
82    fn is_empty(&self) -> bool {
83        self.hi.is_empty() && self.lo.is_empty()
84    }
85
86    fn try_peek(&self) -> Result<MessageToken, QueueError> {
87        match self.hi.try_peek() {
88            Ok(tok) => Ok(tok),
89            Err(QueueError::Empty) => self.lo.try_peek(),
90            Err(e) => Err(e),
91        }
92    }
93
94    fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
95        match self.hi.try_peek_at(index) {
96            Ok(tok) => Ok(tok),
97            Err(QueueError::Empty) => self.lo.try_peek_at(index),
98            Err(e) => Err(e),
99        }
100    }
101
102    fn try_pop_batch<H: crate::prelude::HeaderStore>(
103        &mut self,
104        policy: &crate::policy::BatchingPolicy,
105        headers: &H,
106    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
107        match self.hi.try_pop_batch(policy, headers) {
108            Ok(batch) => Ok(batch),
109            Err(QueueError::Empty) => self.lo.try_pop_batch(policy, headers),
110            Err(e) => Err(e),
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    use crate::edge::bench::TestSpscRingBuf;
120    use crate::memory::manager::MemoryManager;
121    use crate::memory::static_manager::StaticMemoryManager;
122    use crate::message::{Message, MessageHeader};
123    use crate::policy::{AdmissionPolicy, BatchingPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
124    use crate::prelude::{create_test_tensor_filled_with, HeaderStore as _, TestTensor};
125    use crate::types::{QoSClass, Ticks};
126
127    const POLICY: EdgePolicy = EdgePolicy::new(
128        QueueCaps::new(8, 6, None, None),
129        AdmissionPolicy::DropNewest,
130        OverBudgetAction::Drop,
131    );
132
133    const MGR_DEPTH: usize = 32;
134
135    fn make_msg(tick: u64, qos: QoSClass) -> Message<TestTensor> {
136        let mut h = MessageHeader::empty();
137        h.set_creation_tick(Ticks::new(tick));
138        h.set_qos(qos);
139        Message::new(h, create_test_tensor_filled_with(0))
140    }
141
142    fn store(
143        mgr: &mut StaticMemoryManager<TestTensor, MGR_DEPTH>,
144        msg: Message<TestTensor>,
145    ) -> MessageToken {
146        mgr.store(msg).expect("memory manager store failed")
147    }
148
149    // --- 1) Run the full Edge contract suite against Priority2 ---
150    //
151    // The contract tests push messages with default QoS (BestEffort), so all
152    // traffic routes to the lo lane. This validates that Priority2 satisfies
153    // the Edge contract for the common single-lane path.
154    crate::run_edge_contract_tests!(priority2_edge_contract, || {
155        let hi = TestSpscRingBuf::<16>::new();
156        let lo = TestSpscRingBuf::<16>::new();
157        Priority2::new(hi, lo)
158    });
159
160    // --- 2) Priority-specific behaviour tests ---
161
162    #[test]
163    fn routes_latency_critical_to_hi_and_others_to_lo() {
164        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
165        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
166
167        let t_hi = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
168        let t_lo = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
169
170        assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
171        assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
172
173        // `try_pop` should return hi first (tick=1), then lo (tick=2)
174        let a = q.try_pop(&mgr).expect("pop hi");
175        let b = q.try_pop(&mgr).expect("pop lo");
176
177        let ha = mgr.peek_header(a).expect("ha");
178        let hb = mgr.peek_header(b).expect("hb");
179        assert_eq!(*ha.creation_tick(), Ticks::new(1));
180        assert_eq!(*hb.creation_tick(), Ticks::new(2));
181    }
182
183    #[test]
184    fn peek_prefers_hi_when_both_non_empty() {
185        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
186        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
187
188        // Push lo first, then hi — peek should still return hi.
189        let t_lo = store(&mut mgr, make_msg(10, QoSClass::BestEffort));
190        let t_hi = store(&mut mgr, make_msg(20, QoSClass::LatencyCritical));
191
192        assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
193        assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
194
195        let peek_tok = q.try_peek().expect("peek");
196        let ph = mgr.peek_header(peek_tok).expect("peek header");
197        assert_eq!(*ph.creation_tick(), Ticks::new(20));
198
199        // Ensure peek did not remove it.
200        let popped = q.try_pop(&mgr).expect("pop");
201        assert_eq!(popped, peek_tok);
202        let popped_h = mgr.peek_header(popped).expect("popped header");
203        assert_eq!(*popped_h.creation_tick(), Ticks::new(20));
204    }
205
206    #[test]
207    fn pop_batch_prefers_hi_when_non_empty() {
208        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
209        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
210
211        // lo: ticks 1,2,3
212        let mut lo_tokens = [MessageToken::INVALID; 3];
213        for (i, t) in (1u64..=3u64).enumerate() {
214            lo_tokens[i] = store(&mut mgr, make_msg(t, QoSClass::BestEffort));
215            assert_eq!(
216                q.try_push(lo_tokens[i], &POLICY, &mgr),
217                EnqueueResult::Enqueued
218            );
219        }
220
221        // hi: ticks 100,101
222        let mut hi_tokens = [MessageToken::INVALID; 2];
223        for (i, t) in (100u64..=101u64).enumerate() {
224            hi_tokens[i] = store(&mut mgr, make_msg(t, QoSClass::LatencyCritical));
225            assert_eq!(
226                q.try_push(hi_tokens[i], &POLICY, &mgr),
227                EnqueueResult::Enqueued
228            );
229        }
230
231        let batch_policy = BatchingPolicy::fixed(4);
232
233        // Should batch from hi lane first (only 2 items available there).
234        let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
235        assert_eq!(batch.len(), 2);
236        let mut iter = batch.iter();
237        let a = iter.next().expect("batch[0]");
238        let b = iter.next().expect("batch[1]");
239        assert_eq!(
240            *mgr.peek_header(*a).unwrap().creation_tick(),
241            Ticks::new(100)
242        );
243        assert_eq!(
244            *mgr.peek_header(*b).unwrap().creation_tick(),
245            Ticks::new(101)
246        );
247        assert!(iter.next().is_none());
248
249        // Remaining should be lo items in order.
250        let ra = q.try_pop(&mgr).expect("lo-1");
251        let rb = q.try_pop(&mgr).expect("lo-2");
252        let rc = q.try_pop(&mgr).expect("lo-3");
253        assert_eq!(*mgr.peek_header(ra).unwrap().creation_tick(), Ticks::new(1));
254        assert_eq!(*mgr.peek_header(rb).unwrap().creation_tick(), Ticks::new(2));
255        assert_eq!(*mgr.peek_header(rc).unwrap().creation_tick(), Ticks::new(3));
256    }
257
258    #[test]
259    fn occupancy_is_sum_of_lanes() {
260        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
261        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
262
263        let t1 = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
264        let t2 = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
265
266        assert_eq!(q.try_push(t1, &POLICY, &mgr), EnqueueResult::Enqueued);
267        assert_eq!(q.try_push(t2, &POLICY, &mgr), EnqueueResult::Enqueued);
268
269        let occ = q.occupancy(&POLICY);
270        assert_eq!(*occ.items(), 2usize);
271
272        // bytes should reflect stored payload sizes for the shared test tensor payload.
273        assert!(*occ.bytes() > 0usize);
274    }
275
276    #[test]
277    fn is_empty_reflects_both_lanes() {
278        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
279        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
280
281        assert!(q.is_empty());
282
283        // Push to hi lane only.
284        let t_hi = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
285        assert_eq!(q.try_push(t_hi, &POLICY, &mgr), EnqueueResult::Enqueued);
286        assert!(!q.is_empty());
287
288        // Drain hi.
289        let _ = q.try_pop(&mgr).expect("pop hi");
290        assert!(q.is_empty());
291
292        // Push to lo lane only.
293        let t_lo = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
294        assert_eq!(q.try_push(t_lo, &POLICY, &mgr), EnqueueResult::Enqueued);
295        assert!(!q.is_empty());
296
297        // Drain lo.
298        let _ = q.try_pop(&mgr).expect("pop lo");
299        assert!(q.is_empty());
300    }
301
302    #[test]
303    fn background_qos_routes_to_lo() {
304        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
305        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
306
307        let t_bg = store(&mut mgr, make_msg(1, QoSClass::Background));
308        let t_be = store(&mut mgr, make_msg(2, QoSClass::BestEffort));
309
310        assert_eq!(q.try_push(t_bg, &POLICY, &mgr), EnqueueResult::Enqueued);
311        assert_eq!(q.try_push(t_be, &POLICY, &mgr), EnqueueResult::Enqueued);
312
313        // Both should be in lo lane; hi is empty.
314        // Pop order should be FIFO within lo: tick 1, then tick 2.
315        let a = q.try_pop(&mgr).expect("pop bg");
316        let b = q.try_pop(&mgr).expect("pop be");
317        assert_eq!(*mgr.peek_header(a).unwrap().creation_tick(), Ticks::new(1));
318        assert_eq!(*mgr.peek_header(b).unwrap().creation_tick(), Ticks::new(2));
319        assert!(q.is_empty());
320    }
321
322    #[test]
323    fn lane_specific_admission_interactions_smoke_test() {
324        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
325        let mut q = Priority2::new(TestSpscRingBuf::<2>::new(), TestSpscRingBuf::<2>::new());
326
327        let small_caps_policy = EdgePolicy::new(
328            QueueCaps::new(2, 1, None, None),
329            AdmissionPolicy::DropOldest,
330            OverBudgetAction::Drop,
331        );
332
333        let t1 = store(&mut mgr, make_msg(1, QoSClass::LatencyCritical));
334        let t2 = store(&mut mgr, make_msg(2, QoSClass::LatencyCritical));
335
336        assert_eq!(
337            q.try_push(t1, &small_caps_policy, &mgr),
338            EnqueueResult::Enqueued
339        );
340        // At soft threshold (1 item, soft=1) but not at hard cap (max=2).
341        // try_push does NOT evict internally — returns Enqueued.
342        assert_eq!(
343            q.try_push(t2, &small_caps_policy, &mgr),
344            EnqueueResult::Enqueued
345        );
346
347        // Queue full (2 items). Next push is Rejected without pre-eviction.
348        let t3 = store(&mut mgr, make_msg(3, QoSClass::LatencyCritical));
349        assert_eq!(
350            q.try_push(t3, &small_caps_policy, &mgr),
351            EnqueueResult::Rejected
352        );
353
354        // Pre-evict oldest (t1), then push t3 succeeds.
355        let evicted = q.try_pop(&mgr).expect("pre-evict");
356        assert_eq!(evicted, t1);
357        let _ = mgr.free(evicted);
358        assert_eq!(
359            q.try_push(t3, &small_caps_policy, &mgr),
360            EnqueueResult::Enqueued
361        );
362
363        // t2 and t3 remain; t1 was evicted.
364        let p1 = q.try_pop(&mgr).expect("pop t2");
365        assert_eq!(*mgr.peek_header(p1).unwrap().creation_tick(), Ticks::new(2));
366        let p2 = q.try_pop(&mgr).expect("pop t3");
367        assert_eq!(*mgr.peek_header(p2).unwrap().creation_tick(), Ticks::new(3));
368        assert!(q.is_empty());
369    }
370
371    #[test]
372    fn hi_drains_before_lo_across_multiple_pops() {
373        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
374        let mut q = Priority2::new(TestSpscRingBuf::<16>::new(), TestSpscRingBuf::<16>::new());
375
376        // Interleave pushes: lo, hi, lo, hi
377        let t_lo1 = store(&mut mgr, make_msg(1, QoSClass::BestEffort));
378        let t_hi1 = store(&mut mgr, make_msg(2, QoSClass::LatencyCritical));
379        let t_lo2 = store(&mut mgr, make_msg(3, QoSClass::BestEffort));
380        let t_hi2 = store(&mut mgr, make_msg(4, QoSClass::LatencyCritical));
381
382        assert_eq!(q.try_push(t_lo1, &POLICY, &mgr), EnqueueResult::Enqueued);
383        assert_eq!(q.try_push(t_hi1, &POLICY, &mgr), EnqueueResult::Enqueued);
384        assert_eq!(q.try_push(t_lo2, &POLICY, &mgr), EnqueueResult::Enqueued);
385        assert_eq!(q.try_push(t_hi2, &POLICY, &mgr), EnqueueResult::Enqueued);
386
387        // Pop order: hi lane FIFO first (tick 2, 4), then lo lane FIFO (tick 1, 3).
388        let p1 = q.try_pop(&mgr).expect("pop 1");
389        let p2 = q.try_pop(&mgr).expect("pop 2");
390        let p3 = q.try_pop(&mgr).expect("pop 3");
391        let p4 = q.try_pop(&mgr).expect("pop 4");
392
393        assert_eq!(*mgr.peek_header(p1).unwrap().creation_tick(), Ticks::new(2));
394        assert_eq!(*mgr.peek_header(p2).unwrap().creation_tick(), Ticks::new(4));
395        assert_eq!(*mgr.peek_header(p3).unwrap().creation_tick(), Ticks::new(1));
396        assert_eq!(*mgr.peek_header(p4).unwrap().creation_tick(), Ticks::new(3));
397        assert!(q.is_empty());
398    }
399}