Skip to main content

deepstrike_core/signals/
queue.rs

1use std::cmp::Ordering;
2use std::collections::BinaryHeap;
3
4use crate::types::signal::{RuntimeSignal, Urgency};
5
6/// Wrapper for priority ordering: higher urgency first, then older timestamp first.
7struct PrioritizedSignal {
8    urgency: Urgency,
9    timestamp_ms: u64,
10    signal: RuntimeSignal,
11}
12
13impl PartialEq for PrioritizedSignal {
14    fn eq(&self, other: &Self) -> bool {
15        self.signal.id == other.signal.id
16    }
17}
18impl Eq for PrioritizedSignal {}
19
20impl PartialOrd for PrioritizedSignal {
21    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
22        Some(self.cmp(other))
23    }
24}
25
26impl Ord for PrioritizedSignal {
27    fn cmp(&self, other: &Self) -> Ordering {
28        self.urgency
29            .cmp(&other.urgency)
30            .then_with(|| other.timestamp_ms.cmp(&self.timestamp_ms))
31            .then_with(|| self.signal.id.cmp(&other.signal.id))
32    }
33}
34
35/// Priority queue for runtime signals. Internal to the signals module.
36pub(super) struct SignalQueue {
37    heap: BinaryHeap<PrioritizedSignal>,
38    max_size: usize,
39}
40
41impl SignalQueue {
42    pub(super) fn new(max_size: usize) -> Self {
43        Self {
44            heap: BinaryHeap::new(),
45            max_size,
46        }
47    }
48
49    /// Returns false if the queue is full (signal is dropped).
50    pub(super) fn push(&mut self, signal: RuntimeSignal) -> bool {
51        if self.heap.len() >= self.max_size {
52            return false;
53        }
54        let urgency = signal.urgency;
55        let timestamp_ms = signal.timestamp_ms;
56        self.heap.push(PrioritizedSignal {
57            urgency,
58            timestamp_ms,
59            signal,
60        });
61        true
62    }
63
64    pub(super) fn pop(&mut self) -> Option<RuntimeSignal> {
65        self.heap.pop().map(|ps| ps.signal)
66    }
67
68    /// Pop the highest-priority signal visible to `recipient`: broadcasts
69    /// (`signal.recipient == None`) plus those addressed to `recipient`.
70    /// Non-matching entries are retained (re-pushed) so heap ordering is preserved.
71    /// `recipient == None` ⇒ no filtering (drains anything; back-compat with `pop`).
72    pub(super) fn pop_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
73        let Some(recipient) = recipient else {
74            return self.pop();
75        };
76        let mut skipped: Vec<PrioritizedSignal> = Vec::new();
77        let mut found = None;
78        while let Some(ps) = self.heap.pop() {
79            let visible = ps
80                .signal
81                .recipient
82                .as_deref()
83                .is_none_or(|r| r == recipient);
84            if visible {
85                found = Some(ps.signal);
86                break;
87            }
88            skipped.push(ps);
89        }
90        for ps in skipped {
91            self.heap.push(ps);
92        }
93        found
94    }
95
96    pub(super) fn len(&self) -> usize {
97        self.heap.len()
98    }
99
100    #[allow(dead_code)]
101    pub(super) fn is_empty(&self) -> bool {
102        self.heap.is_empty()
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::types::signal::{SignalSource, SignalType};
110
111    #[test]
112    fn higher_urgency_dequeued_first() {
113        let mut q = SignalQueue::new(10);
114        q.push(
115            RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "low")
116                .with_timestamp(1),
117        );
118        q.push(
119            RuntimeSignal::new(
120                SignalSource::Gateway,
121                SignalType::Alert,
122                Urgency::Critical,
123                "crit",
124            )
125            .with_timestamp(2),
126        );
127        q.push(
128            RuntimeSignal::new(
129                SignalSource::Cron,
130                SignalType::Event,
131                Urgency::Normal,
132                "norm",
133            )
134            .with_timestamp(3),
135        );
136
137        assert_eq!(q.pop().unwrap().urgency, Urgency::Critical);
138        assert_eq!(q.pop().unwrap().urgency, Urgency::Normal);
139        assert_eq!(q.pop().unwrap().urgency, Urgency::Low);
140    }
141
142    #[test]
143    fn respects_max_size() {
144        let mut q = SignalQueue::new(1);
145        assert!(
146            q.push(
147                RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "a")
148                    .with_timestamp(1)
149            )
150        );
151        assert!(
152            !q.push(
153                RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "b")
154                    .with_timestamp(2)
155            )
156        );
157    }
158
159    #[test]
160    fn same_urgency_older_first() {
161        let mut q = SignalQueue::new(10);
162        q.push(
163            RuntimeSignal::new(
164                SignalSource::Cron,
165                SignalType::Event,
166                Urgency::Normal,
167                "newer",
168            )
169            .with_timestamp(100),
170        );
171        q.push(
172            RuntimeSignal::new(
173                SignalSource::Cron,
174                SignalType::Event,
175                Urgency::Normal,
176                "older",
177            )
178            .with_timestamp(1),
179        );
180
181        assert_eq!(q.pop().unwrap().summary.as_str(), "older");
182    }
183}