Skip to main content

sim_lib_stream_combinators/
bridge.rs

1use std::{collections::VecDeque, sync::Mutex};
2
3use sim_kernel::{Error, Ref, Result, Symbol};
4use sim_lib_stream_core::{
5    ClockDomain, DomainBridgeDescriptor, PcmPacket, StreamItem, StreamMetadata, StreamPacket,
6};
7
8use crate::{Stream, StreamNode};
9
10/// Resamples a PCM stream from `input_hz` to `output_hz`.
11///
12/// Each PCM packet is rate-converted by nearest-source-frame interleaving;
13/// non-PCM packets pass through unchanged. Errors if either rate is zero.
14pub fn resample_pcm(source: Stream, input_hz: u32, output_hz: u32) -> Result<Stream> {
15    let descriptor = DomainBridgeDescriptor::resampler(input_hz, output_hz)?;
16    let metadata = source.metadata().clone();
17    Ok(Stream::new(ResamplePcmNode {
18        source,
19        metadata,
20        input_hz,
21        output_hz,
22        _descriptor: descriptor,
23    }))
24}
25
26/// Reorders packets by `clock` tick, tolerating up to `max_late_packets`.
27///
28/// The buffer drains the source, sorts packets by their tick index on `clock`
29/// (stable on ties), and replays them in order. With `max_late_packets` of `0`
30/// any out-of-order packet is dropped rather than reordered.
31pub fn jitter_buffer(source: Stream, clock: Symbol, max_late_packets: u32) -> Stream {
32    let descriptor = DomainBridgeDescriptor::jitter_buffer(max_late_packets);
33    let metadata = source.metadata().clone();
34    Stream::new(JitterBufferNode {
35        source,
36        metadata,
37        clock,
38        max_late_packets,
39        queue: Mutex::new(None),
40        _descriptor: descriptor,
41    })
42}
43
44/// Records a `frames`-frame latency-compensation delay over the stream.
45///
46/// The packets pass through untouched; the descriptor carries the declared
47/// delay so downstream clock alignment can account for it.
48pub fn latency_comp_delay(source: Stream, frames: u64) -> Stream {
49    let descriptor = DomainBridgeDescriptor::latency_comp_delay(frames);
50    let metadata = source.metadata().clone();
51    Stream::new(PassthroughBridgeNode {
52        source,
53        metadata,
54        _descriptor: descriptor,
55    })
56}
57
58/// Bridges an event stream into the control clock domain as a rate gate.
59///
60/// The source clock domain is read from its metadata (defaulting to
61/// [`ClockDomain::Control`](sim_lib_stream_core::ClockDomain) when unknown) and
62/// validated into a gate descriptor; packets pass through unchanged.
63pub fn event_rate_gate(source: Stream) -> Result<Stream> {
64    let input_domain =
65        ClockDomain::from_symbol(source.metadata().clock()).unwrap_or(ClockDomain::Control);
66    let descriptor = DomainBridgeDescriptor::event_rate_gate(input_domain)?;
67    let metadata = source.metadata().clone();
68    Ok(Stream::new(PassthroughBridgeNode {
69        source,
70        metadata,
71        _descriptor: descriptor,
72    }))
73}
74
75struct ResamplePcmNode {
76    source: Stream,
77    metadata: StreamMetadata,
78    input_hz: u32,
79    output_hz: u32,
80    _descriptor: DomainBridgeDescriptor,
81}
82
83impl StreamNode for ResamplePcmNode {
84    fn metadata(&self) -> &StreamMetadata {
85        &self.metadata
86    }
87
88    fn next_packet(&self) -> Result<Option<StreamItem>> {
89        let Some(item) = self.source.next_packet()? else {
90            return Ok(None);
91        };
92        let StreamPacket::Pcm(packet) = item.packet() else {
93            return Ok(Some(item));
94        };
95        let packet = resample_packet(packet, self.input_hz, self.output_hz)?;
96        StreamItem::with_ticks(StreamPacket::Pcm(packet), item.ticks().to_vec()).map(Some)
97    }
98
99    fn is_done(&self) -> Result<bool> {
100        self.source.is_done()
101    }
102}
103
104struct JitterBufferNode {
105    source: Stream,
106    metadata: StreamMetadata,
107    clock: Symbol,
108    max_late_packets: u32,
109    queue: Mutex<Option<VecDeque<StreamItem>>>,
110    _descriptor: DomainBridgeDescriptor,
111}
112
113impl StreamNode for JitterBufferNode {
114    fn metadata(&self) -> &StreamMetadata {
115        &self.metadata
116    }
117
118    fn next_packet(&self) -> Result<Option<StreamItem>> {
119        let mut queue = self
120            .queue
121            .lock()
122            .map_err(|_| Error::PoisonedLock("jitter-buffer queue"))?;
123        if queue.is_none() {
124            *queue = Some(VecDeque::from(load_jitter_buffer(
125                &self.source,
126                &self.clock,
127                self.max_late_packets,
128            )?));
129        }
130        Ok(queue.as_mut().and_then(VecDeque::pop_front))
131    }
132
133    fn is_done(&self) -> Result<bool> {
134        let queue = self
135            .queue
136            .lock()
137            .map_err(|_| Error::PoisonedLock("jitter-buffer queue"))?;
138        Ok(queue.as_ref().is_some_and(VecDeque::is_empty) || self.source.is_done()?)
139    }
140}
141
142struct PassthroughBridgeNode {
143    source: Stream,
144    metadata: StreamMetadata,
145    _descriptor: DomainBridgeDescriptor,
146}
147
148impl StreamNode for PassthroughBridgeNode {
149    fn metadata(&self) -> &StreamMetadata {
150        &self.metadata
151    }
152
153    fn next_packet(&self) -> Result<Option<StreamItem>> {
154        self.source.next_packet()
155    }
156
157    fn is_done(&self) -> Result<bool> {
158        self.source.is_done()
159    }
160}
161
162fn resample_packet(packet: &PcmPacket, input_hz: u32, output_hz: u32) -> Result<PcmPacket> {
163    if input_hz == 0 || output_hz == 0 {
164        return Err(Error::Eval("PCM resample rates must be nonzero".to_owned()));
165    }
166    let output_frames = resampled_frame_count(packet.frames(), input_hz, output_hz);
167    match packet.sample_format() {
168        sim_lib_stream_core::PcmSampleFormat::I16 => PcmPacket::i16(
169            packet.channels(),
170            output_frames,
171            resample_interleaved(
172                packet.samples_i16(),
173                packet.channels(),
174                output_frames,
175                |v| v,
176            ),
177        ),
178        sim_lib_stream_core::PcmSampleFormat::F32 => PcmPacket::f32(
179            packet.channels(),
180            output_frames,
181            resample_interleaved(
182                packet.samples_f32(),
183                packet.channels(),
184                output_frames,
185                |v| v,
186            ),
187        ),
188    }
189}
190
191fn resampled_frame_count(input_frames: usize, input_hz: u32, output_hz: u32) -> usize {
192    let frames = (input_frames as u64)
193        .saturating_mul(u64::from(output_hz))
194        .saturating_add(u64::from(input_hz / 2))
195        / u64::from(input_hz);
196    frames.max(1) as usize
197}
198
199fn resample_interleaved<T: Copy>(
200    samples: &[T],
201    channels: usize,
202    output_frames: usize,
203    copy: impl Fn(T) -> T,
204) -> Vec<T> {
205    let input_frames = samples.len() / channels;
206    let mut out = Vec::with_capacity(output_frames * channels);
207    for frame in 0..output_frames {
208        let source_frame = frame.saturating_mul(input_frames) / output_frames;
209        let source_frame = source_frame.min(input_frames.saturating_sub(1));
210        for channel in 0..channels {
211            out.push(copy(samples[source_frame * channels + channel]));
212        }
213    }
214    out
215}
216
217fn load_jitter_buffer(
218    source: &Stream,
219    clock: &Symbol,
220    max_late_packets: u32,
221) -> Result<Vec<StreamItem>> {
222    let mut highest_key = None;
223    let mut indexed = Vec::new();
224    let mut ordinal = 0usize;
225    while let Some(item) = source.next_packet()? {
226        let key = tick_key(&item, clock);
227        let late = highest_key
228            .as_ref()
229            .zip(key.as_ref())
230            .is_some_and(|(highest, key)| key < highest);
231        if late && max_late_packets == 0 {
232            continue;
233        }
234        if key
235            .as_ref()
236            .zip(highest_key.as_ref())
237            .is_some_and(|(key, highest)| key > highest)
238            || highest_key.is_none()
239        {
240            highest_key = key.clone();
241        }
242        indexed.push((ordinal, item));
243        ordinal = ordinal.saturating_add(1);
244    }
245    indexed.sort_by(|(left_index, left), (right_index, right)| {
246        match (tick_key(left, clock), tick_key(right, clock)) {
247            (Some(left), Some(right)) => left.cmp(&right).then(left_index.cmp(right_index)),
248            _ => left_index.cmp(right_index),
249        }
250    });
251    Ok(indexed.into_iter().map(|(_, item)| item).collect())
252}
253
254fn tick_key(item: &StreamItem, clock: &Symbol) -> Option<Ref> {
255    item.ticks()
256        .iter()
257        .find(|tick| &tick.clock == clock)
258        .map(|tick| tick.index.clone())
259}