Skip to main content

oximedia_core/
event_queue.rs

1//! Priority-aware media event queue.
2//!
3//! Provides an in-memory queue of [`MediaEvent`] items ordered by
4//! [`EventPriority`], suitable for coordinating asynchronous media pipeline
5//! stages without an external message broker.
6
7#![allow(dead_code)]
8
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12/// Scheduling priority for a media event.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14pub enum EventPriority {
15    /// Low-priority background work (e.g. index updates).
16    Low = 0,
17    /// Normal pipeline processing.
18    Normal = 1,
19    /// High-priority control signals (e.g. EOS, flush).
20    High = 2,
21    /// Critical errors that must be handled immediately.
22    Critical = 3,
23}
24
25impl EventPriority {
26    /// Returns a numeric value for this priority level (higher = more urgent).
27    #[must_use]
28    pub fn value(self) -> u8 {
29        self as u8
30    }
31}
32
33/// A media pipeline event with an associated payload string and priority.
34#[derive(Debug, Clone, Eq, PartialEq)]
35pub struct MediaEvent {
36    /// Human-readable event kind (e.g. `"frame.ready"`, `"eos"`).
37    pub kind: String,
38    /// Optional payload associated with this event.
39    pub payload: Option<String>,
40    /// Scheduling priority.
41    pub priority: EventPriority,
42    /// Sequence number for FIFO ordering among same-priority events.
43    pub(crate) seq: u64,
44}
45
46impl MediaEvent {
47    /// Create a new [`MediaEvent`].
48    #[must_use]
49    pub fn new(kind: impl Into<String>, priority: EventPriority) -> Self {
50        Self {
51            kind: kind.into(),
52            payload: None,
53            priority,
54            seq: 0,
55        }
56    }
57
58    /// Attach a payload to this event, returning `self` for chaining.
59    #[must_use]
60    pub fn with_payload(mut self, payload: impl Into<String>) -> Self {
61        self.payload = Some(payload.into());
62        self
63    }
64
65    /// Returns `true` if this event has [`EventPriority::High`] or
66    /// [`EventPriority::Critical`] priority.
67    #[must_use]
68    pub fn is_high_priority(&self) -> bool {
69        matches!(self.priority, EventPriority::High | EventPriority::Critical)
70    }
71}
72
73// BinaryHeap is a max-heap; we want higher EventPriority::value() to come out
74// first, then lower seq (FIFO within the same priority).
75impl Ord for MediaEvent {
76    fn cmp(&self, other: &Self) -> Ordering {
77        self.priority
78            .cmp(&other.priority)
79            .then_with(|| other.seq.cmp(&self.seq)) // lower seq = older = first
80    }
81}
82
83impl PartialOrd for MediaEvent {
84    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
85        Some(self.cmp(other))
86    }
87}
88
89/// A bounded, priority-ordered queue of [`MediaEvent`]s.
90#[derive(Debug)]
91pub struct EventQueue {
92    heap: BinaryHeap<MediaEvent>,
93    capacity: usize,
94    next_seq: u64,
95}
96
97impl EventQueue {
98    /// Create a new [`EventQueue`] with the given maximum `capacity`.
99    #[must_use]
100    pub fn new(capacity: usize) -> Self {
101        Self {
102            heap: BinaryHeap::new(),
103            capacity,
104            next_seq: 0,
105        }
106    }
107
108    /// Push an event onto the queue.
109    ///
110    /// Returns `false` if the queue is full (event is discarded).
111    pub fn push(&mut self, mut event: MediaEvent) -> bool {
112        if self.heap.len() >= self.capacity {
113            return false;
114        }
115        event.seq = self.next_seq;
116        self.next_seq += 1;
117        self.heap.push(event);
118        true
119    }
120
121    /// Pop the highest-priority event from the queue.
122    ///
123    /// Returns `None` if the queue is empty.
124    pub fn pop(&mut self) -> Option<MediaEvent> {
125        self.heap.pop()
126    }
127
128    /// Drain all events with [`EventPriority::High`] or higher into a `Vec`.
129    ///
130    /// The remaining events stay in the queue.
131    pub fn drain_high_priority(&mut self) -> Vec<MediaEvent> {
132        let mut high: Vec<MediaEvent> = Vec::new();
133        let mut rest: Vec<MediaEvent> = Vec::new();
134
135        while let Some(ev) = self.heap.pop() {
136            if ev.is_high_priority() {
137                high.push(ev);
138            } else {
139                rest.push(ev);
140            }
141        }
142        for ev in rest {
143            self.heap.push(ev);
144        }
145        high
146    }
147
148    /// Returns the number of events currently in the queue.
149    #[must_use]
150    pub fn len(&self) -> usize {
151        self.heap.len()
152    }
153
154    /// Returns `true` if the queue contains no events.
155    #[must_use]
156    pub fn is_empty(&self) -> bool {
157        self.heap.is_empty()
158    }
159
160    /// Returns the configured capacity.
161    #[must_use]
162    pub fn capacity(&self) -> usize {
163        self.capacity
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    fn make_event(kind: &str, prio: EventPriority) -> MediaEvent {
172        MediaEvent::new(kind, prio)
173    }
174
175    #[test]
176    fn test_priority_value_ordering() {
177        assert!(EventPriority::Critical.value() > EventPriority::High.value());
178        assert!(EventPriority::High.value() > EventPriority::Normal.value());
179        assert!(EventPriority::Normal.value() > EventPriority::Low.value());
180    }
181
182    #[test]
183    fn test_event_is_high_priority() {
184        assert!(make_event("flush", EventPriority::High).is_high_priority());
185        assert!(make_event("err", EventPriority::Critical).is_high_priority());
186        assert!(!make_event("frame", EventPriority::Normal).is_high_priority());
187        assert!(!make_event("bg", EventPriority::Low).is_high_priority());
188    }
189
190    #[test]
191    fn test_event_with_payload() {
192        let ev = MediaEvent::new("test", EventPriority::Normal).with_payload("data");
193        assert_eq!(ev.payload.as_deref(), Some("data"));
194    }
195
196    #[test]
197    fn test_queue_push_pop_single() {
198        let mut q = EventQueue::new(8);
199        let ev = make_event("eos", EventPriority::High);
200        assert!(q.push(ev));
201        assert_eq!(q.len(), 1);
202        let popped = q.pop().expect("pop should return item");
203        assert_eq!(popped.kind, "eos");
204        assert!(q.is_empty());
205    }
206
207    #[test]
208    fn test_queue_priority_ordering() {
209        let mut q = EventQueue::new(8);
210        q.push(make_event("low", EventPriority::Low));
211        q.push(make_event("high", EventPriority::High));
212        q.push(make_event("normal", EventPriority::Normal));
213
214        assert_eq!(q.pop().expect("pop should return item").kind, "high");
215        assert_eq!(q.pop().expect("pop should return item").kind, "normal");
216        assert_eq!(q.pop().expect("pop should return item").kind, "low");
217    }
218
219    #[test]
220    fn test_queue_fifo_same_priority() {
221        let mut q = EventQueue::new(8);
222        q.push(make_event("first", EventPriority::Normal));
223        q.push(make_event("second", EventPriority::Normal));
224        q.push(make_event("third", EventPriority::Normal));
225
226        assert_eq!(q.pop().expect("pop should return item").kind, "first");
227        assert_eq!(q.pop().expect("pop should return item").kind, "second");
228        assert_eq!(q.pop().expect("pop should return item").kind, "third");
229    }
230
231    #[test]
232    fn test_queue_capacity_limit() {
233        let mut q = EventQueue::new(2);
234        assert!(q.push(make_event("a", EventPriority::Low)));
235        assert!(q.push(make_event("b", EventPriority::Low)));
236        assert!(!q.push(make_event("c", EventPriority::Low))); // rejected
237        assert_eq!(q.len(), 2);
238    }
239
240    #[test]
241    fn test_queue_drain_high_priority() {
242        let mut q = EventQueue::new(8);
243        q.push(make_event("low1", EventPriority::Low));
244        q.push(make_event("high1", EventPriority::High));
245        q.push(make_event("normal1", EventPriority::Normal));
246        q.push(make_event("crit1", EventPriority::Critical));
247
248        let high = q.drain_high_priority();
249        assert_eq!(high.len(), 2);
250        assert_eq!(q.len(), 2); // low1 + normal1 remain
251    }
252
253    #[test]
254    fn test_queue_empty_pop() {
255        let mut q = EventQueue::new(4);
256        assert!(q.pop().is_none());
257    }
258
259    #[test]
260    fn test_queue_capacity_accessor() {
261        let q = EventQueue::new(16);
262        assert_eq!(q.capacity(), 16);
263    }
264
265    #[test]
266    fn test_event_kind_stored() {
267        let ev = MediaEvent::new("frame.ready", EventPriority::Normal);
268        assert_eq!(ev.kind, "frame.ready");
269    }
270
271    #[test]
272    fn test_drain_high_priority_empty() {
273        let mut q = EventQueue::new(8);
274        q.push(make_event("low", EventPriority::Low));
275        let high = q.drain_high_priority();
276        assert!(high.is_empty());
277        assert_eq!(q.len(), 1);
278    }
279}