sim_lib_stream_combinators/
bridge.rs1use std::{collections::VecDeque, sync::Mutex};
2
3use sim_kernel::{Error, Ref, Result, Symbol};
4use sim_lib_stream_core::{
5 ClockDomain, DomainBridgeDescriptor, PcmPacket, StreamItem, StreamMetadata, StreamPacket,
6};
7
8use crate::{Stream, StreamNode};
9
10pub fn resample_pcm(source: Stream, input_hz: u32, output_hz: u32) -> Result<Stream> {
15 let descriptor = DomainBridgeDescriptor::resampler(input_hz, output_hz)?;
16 let metadata = source.metadata().clone();
17 Ok(Stream::new(ResamplePcmNode {
18 source,
19 metadata,
20 input_hz,
21 output_hz,
22 _descriptor: descriptor,
23 }))
24}
25
26pub fn jitter_buffer(source: Stream, clock: Symbol, max_late_packets: u32) -> Stream {
32 let descriptor = DomainBridgeDescriptor::jitter_buffer(max_late_packets);
33 let metadata = source.metadata().clone();
34 Stream::new(JitterBufferNode {
35 source,
36 metadata,
37 clock,
38 max_late_packets,
39 queue: Mutex::new(None),
40 _descriptor: descriptor,
41 })
42}
43
44pub fn latency_comp_delay(source: Stream, frames: u64) -> Stream {
49 let descriptor = DomainBridgeDescriptor::latency_comp_delay(frames);
50 let metadata = source.metadata().clone();
51 Stream::new(PassthroughBridgeNode {
52 source,
53 metadata,
54 _descriptor: descriptor,
55 })
56}
57
58pub fn event_rate_gate(source: Stream) -> Result<Stream> {
64 let input_domain =
65 ClockDomain::from_symbol(source.metadata().clock()).unwrap_or(ClockDomain::Control);
66 let descriptor = DomainBridgeDescriptor::event_rate_gate(input_domain)?;
67 let metadata = source.metadata().clone();
68 Ok(Stream::new(PassthroughBridgeNode {
69 source,
70 metadata,
71 _descriptor: descriptor,
72 }))
73}
74
75struct ResamplePcmNode {
76 source: Stream,
77 metadata: StreamMetadata,
78 input_hz: u32,
79 output_hz: u32,
80 _descriptor: DomainBridgeDescriptor,
81}
82
83impl StreamNode for ResamplePcmNode {
84 fn metadata(&self) -> &StreamMetadata {
85 &self.metadata
86 }
87
88 fn next_packet(&self) -> Result<Option<StreamItem>> {
89 let Some(item) = self.source.next_packet()? else {
90 return Ok(None);
91 };
92 let StreamPacket::Pcm(packet) = item.packet() else {
93 return Ok(Some(item));
94 };
95 let packet = resample_packet(packet, self.input_hz, self.output_hz)?;
96 StreamItem::with_ticks(StreamPacket::Pcm(packet), item.ticks().to_vec()).map(Some)
97 }
98
99 fn is_done(&self) -> Result<bool> {
100 self.source.is_done()
101 }
102}
103
104struct JitterBufferNode {
105 source: Stream,
106 metadata: StreamMetadata,
107 clock: Symbol,
108 max_late_packets: u32,
109 queue: Mutex<Option<VecDeque<StreamItem>>>,
110 _descriptor: DomainBridgeDescriptor,
111}
112
113impl StreamNode for JitterBufferNode {
114 fn metadata(&self) -> &StreamMetadata {
115 &self.metadata
116 }
117
118 fn next_packet(&self) -> Result<Option<StreamItem>> {
119 let mut queue = self
120 .queue
121 .lock()
122 .map_err(|_| Error::PoisonedLock("jitter-buffer queue"))?;
123 if queue.is_none() {
124 *queue = Some(VecDeque::from(load_jitter_buffer(
125 &self.source,
126 &self.clock,
127 self.max_late_packets,
128 )?));
129 }
130 Ok(queue.as_mut().and_then(VecDeque::pop_front))
131 }
132
133 fn is_done(&self) -> Result<bool> {
134 let queue = self
135 .queue
136 .lock()
137 .map_err(|_| Error::PoisonedLock("jitter-buffer queue"))?;
138 Ok(queue.as_ref().is_some_and(VecDeque::is_empty) || self.source.is_done()?)
139 }
140}
141
142struct PassthroughBridgeNode {
143 source: Stream,
144 metadata: StreamMetadata,
145 _descriptor: DomainBridgeDescriptor,
146}
147
148impl StreamNode for PassthroughBridgeNode {
149 fn metadata(&self) -> &StreamMetadata {
150 &self.metadata
151 }
152
153 fn next_packet(&self) -> Result<Option<StreamItem>> {
154 self.source.next_packet()
155 }
156
157 fn is_done(&self) -> Result<bool> {
158 self.source.is_done()
159 }
160}
161
162fn resample_packet(packet: &PcmPacket, input_hz: u32, output_hz: u32) -> Result<PcmPacket> {
163 if input_hz == 0 || output_hz == 0 {
164 return Err(Error::Eval("PCM resample rates must be nonzero".to_owned()));
165 }
166 let output_frames = resampled_frame_count(packet.frames(), input_hz, output_hz);
167 match packet.sample_format() {
168 sim_lib_stream_core::PcmSampleFormat::I16 => PcmPacket::i16(
169 packet.channels(),
170 output_frames,
171 resample_interleaved(
172 packet.samples_i16(),
173 packet.channels(),
174 output_frames,
175 |v| v,
176 ),
177 ),
178 sim_lib_stream_core::PcmSampleFormat::F32 => PcmPacket::f32(
179 packet.channels(),
180 output_frames,
181 resample_interleaved(
182 packet.samples_f32(),
183 packet.channels(),
184 output_frames,
185 |v| v,
186 ),
187 ),
188 }
189}
190
191fn resampled_frame_count(input_frames: usize, input_hz: u32, output_hz: u32) -> usize {
192 let frames = (input_frames as u64)
193 .saturating_mul(u64::from(output_hz))
194 .saturating_add(u64::from(input_hz / 2))
195 / u64::from(input_hz);
196 frames.max(1) as usize
197}
198
199fn resample_interleaved<T: Copy>(
200 samples: &[T],
201 channels: usize,
202 output_frames: usize,
203 copy: impl Fn(T) -> T,
204) -> Vec<T> {
205 let input_frames = samples.len() / channels;
206 let mut out = Vec::with_capacity(output_frames * channels);
207 for frame in 0..output_frames {
208 let source_frame = frame.saturating_mul(input_frames) / output_frames;
209 let source_frame = source_frame.min(input_frames.saturating_sub(1));
210 for channel in 0..channels {
211 out.push(copy(samples[source_frame * channels + channel]));
212 }
213 }
214 out
215}
216
217fn load_jitter_buffer(
218 source: &Stream,
219 clock: &Symbol,
220 max_late_packets: u32,
221) -> Result<Vec<StreamItem>> {
222 let mut highest_key = None;
223 let mut indexed = Vec::new();
224 let mut ordinal = 0usize;
225 while let Some(item) = source.next_packet()? {
226 let key = tick_key(&item, clock);
227 let late = highest_key
228 .as_ref()
229 .zip(key.as_ref())
230 .is_some_and(|(highest, key)| key < highest);
231 if late && max_late_packets == 0 {
232 continue;
233 }
234 if key
235 .as_ref()
236 .zip(highest_key.as_ref())
237 .is_some_and(|(key, highest)| key > highest)
238 || highest_key.is_none()
239 {
240 highest_key = key.clone();
241 }
242 indexed.push((ordinal, item));
243 ordinal = ordinal.saturating_add(1);
244 }
245 indexed.sort_by(|(left_index, left), (right_index, right)| {
246 match (tick_key(left, clock), tick_key(right, clock)) {
247 (Some(left), Some(right)) => left.cmp(&right).then(left_index.cmp(right_index)),
248 _ => left_index.cmp(right_index),
249 }
250 });
251 Ok(indexed.into_iter().map(|(_, item)| item).collect())
252}
253
254fn tick_key(item: &StreamItem, clock: &Symbol) -> Option<Ref> {
255 item.ticks()
256 .iter()
257 .find(|tick| &tick.clock == clock)
258 .map(|tick| tick.index.clone())
259}