1use rtrb::{Consumer, Producer};
9
10use crate::messages::{EcsCommand, RtEvent};
11
12pub struct RtHandles {
18 pub events: Consumer<RtEvent>,
21 pub commands: Producer<EcsCommand>,
24}
25
26impl std::fmt::Debug for RtHandles {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("RtHandles").finish_non_exhaustive()
29 }
30}
31
32pub(crate) struct RtSideQueues {
34 pub(crate) events: Producer<RtEvent>,
36 pub(crate) commands: Consumer<EcsCommand>,
38}
39
40pub(crate) fn create_queues(
50 event_capacity: usize,
51 command_capacity: usize,
52) -> (RtSideQueues, RtHandles) {
53 let (event_producer, event_consumer) = rtrb::RingBuffer::new(event_capacity);
54 let (command_producer, command_consumer) = rtrb::RingBuffer::new(command_capacity);
55
56 let rt_side = RtSideQueues {
57 events: event_producer,
58 commands: command_consumer,
59 };
60
61 let ecs_side = RtHandles {
62 events: event_consumer,
63 commands: command_producer,
64 };
65
66 (rt_side, ecs_side)
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72 use crate::messages::{MidiMessage, RtErrorCode, TransportEvent};
73 use pretty_assertions::assert_eq;
74
75 #[test]
76 fn test_queue_roundtrip_rt_event() {
77 let (mut rt_side, mut ecs_side) = create_queues(16, 16);
78
79 let tick = RtEvent::ClockTick {
80 subdivision: 0,
81 beat: 1,
82 tempo_bpm: 120.0,
83 timestamp_ns: 500_000,
84 };
85
86 rt_side.events.push(tick).expect("push should succeed");
87 let received = ecs_side.events.pop().expect("pop should succeed");
88 assert_eq!(received, tick);
89 }
90
91 #[test]
92 fn test_queue_roundtrip_ecs_command() {
93 let (mut rt_side, mut ecs_side) = create_queues(16, 16);
94
95 let cmd = EcsCommand::SetTempo { bpm: 140.0 };
96
97 ecs_side.commands.push(cmd).expect("push should succeed");
98 let received = rt_side.commands.pop().expect("pop should succeed");
99 assert_eq!(received, cmd);
100 }
101
102 #[test]
103 fn test_queue_full_returns_error() {
104 let (mut rt_side, _ecs_side) = create_queues(4, 4);
105
106 let tick = RtEvent::ClockTick {
107 subdivision: 0,
108 beat: 0,
109 tempo_bpm: 120.0,
110 timestamp_ns: 0,
111 };
112
113 for _ in 0..4 {
115 rt_side.events.push(tick).expect("push should succeed");
116 }
117
118 let result = rt_side.events.push(tick);
120 assert!(result.is_err(), "push to a full queue should return Err");
121 }
122
123 #[test]
124 fn test_queue_empty_returns_none() {
125 let (mut rt_side, mut ecs_side) = create_queues(16, 16);
126
127 let event_result = ecs_side.events.pop();
128 assert!(
129 event_result.is_err(),
130 "pop from empty event queue should return Err"
131 );
132
133 let command_result = rt_side.commands.pop();
134 assert!(
135 command_result.is_err(),
136 "pop from empty command queue should return Err"
137 );
138 }
139
140 #[test]
141 fn test_queue_cross_thread() {
142 let (mut rt_side, mut ecs_side) = create_queues(128, 16);
143
144 let handle = std::thread::spawn(move || {
145 for i in 0..100u64 {
146 let tick = RtEvent::ClockTick {
147 subdivision: 0,
148 beat: i,
149 tempo_bpm: 120.0,
150 timestamp_ns: i * 1_000,
151 };
152 while rt_side.events.push(tick).is_err() {
155 std::thread::yield_now();
156 }
157 }
158 });
159
160 let mut received = Vec::with_capacity(100);
161 while received.len() < 100 {
162 match ecs_side.events.pop() {
163 Ok(event) => received.push(event),
164 Err(_) => std::thread::sleep(std::time::Duration::from_micros(10)),
165 }
166 }
167
168 handle.join().expect("producer thread should not panic");
169
170 assert_eq!(received.len(), 100);
171 for (i, event) in received.iter().enumerate() {
172 let expected = RtEvent::ClockTick {
173 subdivision: 0,
174 beat: i as u64,
175 tempo_bpm: 120.0,
176 timestamp_ns: i as u64 * 1_000,
177 };
178 assert_eq!(*event, expected);
179 }
180 }
181
182 #[test]
183 fn test_queue_multiple_event_types() {
184 let (mut rt_side, mut ecs_side) = create_queues(16, 16);
185
186 let events = [
187 RtEvent::ClockTick {
188 subdivision: 12,
189 beat: 42,
190 tempo_bpm: 128.0,
191 timestamp_ns: 999,
192 },
193 RtEvent::Transport(TransportEvent::Start),
194 RtEvent::MidiInput {
195 input_port_index: 0,
196 timestamp_ns: 1_000_000,
197 message: MidiMessage::note_on(0, 60, 100),
198 },
199 RtEvent::SongPosition { position: 384 },
200 RtEvent::NonFatalError(RtErrorCode::QueueOverflow),
201 ];
202
203 for event in &events {
204 rt_side.events.push(*event).expect("push should succeed");
205 }
206
207 for expected in &events {
208 let received = ecs_side.events.pop().expect("pop should succeed");
209 assert_eq!(received, *expected);
210 }
211 }
212}