Skip to main content

oxurack_rt/
queues.rs

1//! Lock-free SPSC queues for RT-thread / ECS-world communication.
2//!
3//! Uses `rtrb` (real-time ring buffer) to provide bounded, lock-free,
4//! single-producer / single-consumer queues. The RT thread produces
5//! [`RtEvent`]s and consumes [`EcsCommand`]s; the ECS world does the
6//! reverse.
7
8use rtrb::{Consumer, Producer};
9
10use crate::messages::{EcsCommand, RtEvent};
11
12/// The ECS-side handles for communicating with the RT thread.
13///
14/// Returned by [`Runtime::start()`](crate::Runtime::start).
15/// The ECS world uses these to receive events from the RT thread and
16/// send commands to it.
17pub struct RtHandles {
18    /// Receives events produced by the RT thread (clock ticks, MIDI
19    /// input, transport changes, errors).
20    pub events: Consumer<RtEvent>,
21    /// Sends commands to the RT thread (MIDI output, tempo changes,
22    /// transport, shutdown).
23    pub commands: Producer<EcsCommand>,
24}
25
26impl std::fmt::Debug for RtHandles {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("RtHandles").finish_non_exhaustive()
29    }
30}
31
32/// The RT-thread-side handles (not exposed publicly).
33pub(crate) struct RtSideQueues {
34    /// Produces events for the ECS world to consume.
35    pub(crate) events: Producer<RtEvent>,
36    /// Consumes commands sent by the ECS world.
37    pub(crate) commands: Consumer<EcsCommand>,
38}
39
40/// Creates a matched pair of lock-free queue handles.
41///
42/// Returns the RT-side handles (for the spawned thread) and the
43/// ECS-side handles (for the caller).
44///
45/// # Arguments
46///
47/// * `event_capacity` - Maximum number of buffered RT-to-ECS events.
48/// * `command_capacity` - Maximum number of buffered ECS-to-RT commands.
49pub(crate) fn create_queues(
50    event_capacity: usize,
51    command_capacity: usize,
52) -> (RtSideQueues, RtHandles) {
53    let (event_producer, event_consumer) = rtrb::RingBuffer::new(event_capacity);
54    let (command_producer, command_consumer) = rtrb::RingBuffer::new(command_capacity);
55
56    let rt_side = RtSideQueues {
57        events: event_producer,
58        commands: command_consumer,
59    };
60
61    let ecs_side = RtHandles {
62        events: event_consumer,
63        commands: command_producer,
64    };
65
66    (rt_side, ecs_side)
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use crate::messages::{MidiMessage, RtErrorCode, TransportEvent};
73    use pretty_assertions::assert_eq;
74
75    #[test]
76    fn test_queue_roundtrip_rt_event() {
77        let (mut rt_side, mut ecs_side) = create_queues(16, 16);
78
79        let tick = RtEvent::ClockTick {
80            subdivision: 0,
81            beat: 1,
82            tempo_bpm: 120.0,
83            timestamp_ns: 500_000,
84        };
85
86        rt_side.events.push(tick).expect("push should succeed");
87        let received = ecs_side.events.pop().expect("pop should succeed");
88        assert_eq!(received, tick);
89    }
90
91    #[test]
92    fn test_queue_roundtrip_ecs_command() {
93        let (mut rt_side, mut ecs_side) = create_queues(16, 16);
94
95        let cmd = EcsCommand::SetTempo { bpm: 140.0 };
96
97        ecs_side.commands.push(cmd).expect("push should succeed");
98        let received = rt_side.commands.pop().expect("pop should succeed");
99        assert_eq!(received, cmd);
100    }
101
102    #[test]
103    fn test_queue_full_returns_error() {
104        let (mut rt_side, _ecs_side) = create_queues(4, 4);
105
106        let tick = RtEvent::ClockTick {
107            subdivision: 0,
108            beat: 0,
109            tempo_bpm: 120.0,
110            timestamp_ns: 0,
111        };
112
113        // Fill the queue to capacity.
114        for _ in 0..4 {
115            rt_side.events.push(tick).expect("push should succeed");
116        }
117
118        // The 5th push must fail (queue is full).
119        let result = rt_side.events.push(tick);
120        assert!(result.is_err(), "push to a full queue should return Err");
121    }
122
123    #[test]
124    fn test_queue_empty_returns_none() {
125        let (mut rt_side, mut ecs_side) = create_queues(16, 16);
126
127        let event_result = ecs_side.events.pop();
128        assert!(
129            event_result.is_err(),
130            "pop from empty event queue should return Err"
131        );
132
133        let command_result = rt_side.commands.pop();
134        assert!(
135            command_result.is_err(),
136            "pop from empty command queue should return Err"
137        );
138    }
139
140    #[test]
141    fn test_queue_cross_thread() {
142        let (mut rt_side, mut ecs_side) = create_queues(128, 16);
143
144        let handle = std::thread::spawn(move || {
145            for i in 0..100u64 {
146                let tick = RtEvent::ClockTick {
147                    subdivision: 0,
148                    beat: i,
149                    tempo_bpm: 120.0,
150                    timestamp_ns: i * 1_000,
151                };
152                // Spin until the push succeeds (queue should never be
153                // full with capacity 128, but this is defensive).
154                while rt_side.events.push(tick).is_err() {
155                    std::thread::yield_now();
156                }
157            }
158        });
159
160        let mut received = Vec::with_capacity(100);
161        while received.len() < 100 {
162            match ecs_side.events.pop() {
163                Ok(event) => received.push(event),
164                Err(_) => std::thread::sleep(std::time::Duration::from_micros(10)),
165            }
166        }
167
168        handle.join().expect("producer thread should not panic");
169
170        assert_eq!(received.len(), 100);
171        for (i, event) in received.iter().enumerate() {
172            let expected = RtEvent::ClockTick {
173                subdivision: 0,
174                beat: i as u64,
175                tempo_bpm: 120.0,
176                timestamp_ns: i as u64 * 1_000,
177            };
178            assert_eq!(*event, expected);
179        }
180    }
181
182    #[test]
183    fn test_queue_multiple_event_types() {
184        let (mut rt_side, mut ecs_side) = create_queues(16, 16);
185
186        let events = [
187            RtEvent::ClockTick {
188                subdivision: 12,
189                beat: 42,
190                tempo_bpm: 128.0,
191                timestamp_ns: 999,
192            },
193            RtEvent::Transport(TransportEvent::Start),
194            RtEvent::MidiInput {
195                input_port_index: 0,
196                timestamp_ns: 1_000_000,
197                message: MidiMessage::note_on(0, 60, 100),
198            },
199            RtEvent::SongPosition { position: 384 },
200            RtEvent::NonFatalError(RtErrorCode::QueueOverflow),
201        ];
202
203        for event in &events {
204            rt_side.events.push(*event).expect("push should succeed");
205        }
206
207        for expected in &events {
208            let received = ecs_side.events.pop().expect("pop should succeed");
209            assert_eq!(received, *expected);
210        }
211    }
212}