Skip to main content

snapcast_client/
stream.rs

1//! Time-synchronized PCM audio stream buffer.
2
3use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10/// In-memory representation of samples stored in a [`PcmChunk`].
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum SampleEncoding {
13    /// Signed little-endian integer PCM.
14    PcmInt,
15    /// Little-endian IEEE-754 `f32` samples.
16    Float32,
17}
18
19/// A decoded PCM chunk with a server-time timestamp and a read cursor.
20#[derive(Debug, Clone)]
21pub struct PcmChunk {
22    /// Server-time timestamp of this chunk.
23    pub timestamp: Timeval,
24    /// Raw PCM sample data.
25    pub data: Vec<u8>,
26    /// Sample format (rate, bits, channels).
27    pub format: SampleFormat,
28    /// Encoding of the sample bytes.
29    pub encoding: SampleEncoding,
30    read_pos: usize,
31}
32
33impl PcmChunk {
34    /// Create a new PCM chunk.
35    pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
36        Self::new_with_encoding(timestamp, data, format, SampleEncoding::PcmInt)
37    }
38
39    /// Create a new chunk with an explicit sample encoding.
40    pub fn new_with_encoding(
41        timestamp: Timeval,
42        data: Vec<u8>,
43        format: SampleFormat,
44        encoding: SampleEncoding,
45    ) -> Self {
46        Self {
47            timestamp,
48            data,
49            format,
50            encoding,
51            read_pos: 0,
52        }
53    }
54
55    /// Start time of this chunk in microseconds.
56    pub fn start_usec(&self) -> i64 {
57        self.timestamp.to_usec()
58    }
59
60    /// Duration of this chunk in microseconds.
61    pub fn duration_usec(&self) -> i64 {
62        if self.format.frame_size() == 0 || self.format.rate() == 0 {
63            return 0;
64        }
65        let frames = self.data.len() as i64 / self.format.frame_size() as i64;
66        frames * 1_000_000 / self.format.rate() as i64
67    }
68
69    /// Read up to `frames` frames into `output`, returning the number read.
70    pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
71        let frame_size = self.format.frame_size() as usize;
72        if frame_size == 0 {
73            return 0;
74        }
75        let available_bytes = self.data.len() - self.read_pos;
76        let available_frames = available_bytes / frame_size;
77        let to_read = (frames as usize).min(available_frames);
78        let bytes = to_read * frame_size;
79        output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
80        self.read_pos += bytes;
81        to_read as u32
82    }
83
84    /// Returns true if all data has been read.
85    pub fn is_end(&self) -> bool {
86        self.read_pos >= self.data.len()
87    }
88
89    /// Skip forward by `frames` frames.
90    pub fn seek(&mut self, frames: u32) {
91        let bytes = frames as usize * self.format.frame_size() as usize;
92        self.read_pos = (self.read_pos + bytes).min(self.data.len());
93    }
94}
95
96/// Correction threshold — soft sync starts when |short_median| > 100µs
97const CORRECTION_BEGIN_USEC: i64 = 100;
98/// Hard sync: |median| exceeds this (µs).
99const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
100/// Hard sync: |short_median| exceeds this (µs).
101const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
102/// Hard sync: |mini_median| exceeds this (µs).
103const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
104/// Hard sync: |age| exceeds this (µs).
105const HARD_SYNC_AGE_USEC: i64 = 500_000;
106/// Minimum |age| for hard sync re-trigger (µs).
107const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
108/// Minimum |mini_median| for soft sync (µs).
109const SOFT_SYNC_MIN_USEC: i64 = 50;
110/// Maximum playback rate correction factor.
111const MAX_RATE_CORRECTION: f64 = 0.0005;
112/// Rate correction scaling factor.
113const RATE_CORRECTION_SCALE: f64 = 0.00005;
114/// DoubleBuffer capacity for mini (fast) drift detection.
115const MINI_BUFFER_SIZE: usize = 20;
116/// DoubleBuffer capacity for short-term drift detection.
117const SHORT_BUFFER_SIZE: usize = 100;
118/// DoubleBuffer capacity for long-term drift detection.
119const BUFFER_SIZE: usize = 500;
120/// Default buffer in milliseconds.
121const DEFAULT_BUFFER_MS: i64 = 1000;
122
123/// Time-synchronized PCM stream buffer.
124///
125/// The `Stream` is responsible for buffering decoded PCM chunks and delivering them to the
126/// audio player in a way that remains synchronized with the server's time. It implements
127/// the same synchronization strategy as the original C++ Snapcast client.
128///
129/// ### Synchronization Strategy
130///
131/// There are two main modes of synchronization:
132///
133/// 1. **Hard Sync**: Used when the client is far out of sync (> 50ms) or just starting.
134///    In this mode, the stream skips forward in the buffer or inserts silence to reach
135///    the desired target time exactly.
136/// 2. **Soft Sync**: Used for fine-tuning when the drift is small (typically < 10ms).
137///    Instead of jumping, the stream subtly adjusts the playback rate (e.g., by 0.05%)
138///    by adding or removing single samples at regular intervals. This is inaudible to
139///    most listeners.
140///
141/// ### Drift Detection
142///
143/// Synchronization is based on "age", which is the difference between when a sample
144/// *should* have been played (server time) and when it *is* being played (now).
145///
146/// The stream maintains three `DoubleBuffer` instances to track drift over different timescales:
147/// - **Mini Buffer** (20 samples): Fast reaction to sudden network or system jitter.
148/// - **Short Buffer** (100 samples): Used for calculating soft sync rate corrections.
149/// - **Long Buffer** (500 samples): Long-term stability tracking and hard sync re-triggering.
150///
151/// The median value of these buffers is used to filter out outliers and ensure stable
152/// synchronization even in unstable network conditions.
153pub struct Stream {
154    /// Nominal format of the incoming PCM data.
155    format: SampleFormat,
156    /// Encoding of samples stored in chunks.
157    encoding: SampleEncoding,
158    /// Queue of pending PCM chunks.
159    chunks: VecDeque<PcmChunk>,
160    /// The chunk currently being read from.
161    current: Option<PcmChunk>,
162    /// Target buffer size in milliseconds.
163    buffer_ms: i64,
164    /// Whether we are currently in hard sync mode.
165    hard_sync: bool,
166
167    // Drift detection buffers
168    mini_buffer: DoubleBuffer,
169    short_buffer: DoubleBuffer,
170    buffer: DoubleBuffer,
171    /// Long-term median drift in microseconds.
172    median: i64,
173    /// Short-term median drift in microseconds.
174    short_median: i64,
175
176    // Soft sync (rate correction) state
177    /// Number of frames played at the current (corrected) rate.
178    played_frames: u32,
179    /// How many frames to play before adding/removing a single sample (0 if no correction).
180    correct_after_x_frames: i32,
181    /// Cumulative difference in frames caused by rate correction (for logging).
182    frame_delta: i32,
183    /// Internal buffer used for sample insertion/removal.
184    read_buf: Vec<u8>,
185
186    /// Last time (in server seconds) that stats were logged.
187    last_log_sec: i64,
188}
189
190impl Stream {
191    /// Create a new stream for the given sample format.
192    pub fn new(format: SampleFormat) -> Self {
193        Self::with_encoding(format, SampleEncoding::PcmInt)
194    }
195
196    /// Create a new stream for the given sample format and sample encoding.
197    pub fn with_encoding(format: SampleFormat, encoding: SampleEncoding) -> Self {
198        Self {
199            format,
200            encoding,
201            chunks: VecDeque::new(),
202            current: None,
203            buffer_ms: DEFAULT_BUFFER_MS,
204            hard_sync: true,
205            mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
206            short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
207            buffer: DoubleBuffer::new(BUFFER_SIZE),
208            median: 0,
209            short_median: 0,
210            played_frames: 0,
211            correct_after_x_frames: 0,
212            frame_delta: 0,
213            read_buf: Vec::new(),
214            last_log_sec: 0,
215        }
216    }
217
218    /// Returns the sample format.
219    pub fn format(&self) -> SampleFormat {
220        self.format
221    }
222
223    /// Returns the sample byte encoding.
224    pub fn encoding(&self) -> SampleEncoding {
225        self.encoding
226    }
227
228    /// Set the target buffer size in milliseconds.
229    pub fn set_buffer_ms(&mut self, ms: i64) {
230        self.buffer_ms = ms;
231    }
232
233    /// Enqueue a decoded PCM chunk.
234    pub fn add_chunk(&mut self, chunk: PcmChunk) {
235        self.chunks.push_back(chunk);
236    }
237
238    /// Number of queued chunks.
239    pub fn chunk_count(&self) -> usize {
240        self.chunks.len()
241    }
242
243    /// Clear all queued chunks and reset sync state.
244    pub fn clear(&mut self) {
245        self.chunks.clear();
246        self.current = None;
247        self.hard_sync = true;
248    }
249
250    fn reset_buffers(&mut self) {
251        self.buffer.clear();
252        self.mini_buffer.clear();
253        self.short_buffer.clear();
254    }
255
256    fn update_buffers(&mut self, age: i64) {
257        self.buffer.add(age);
258        self.mini_buffer.add(age);
259        self.short_buffer.add(age);
260    }
261
262    fn set_real_sample_rate(&mut self, sample_rate: f64) {
263        let nominal = self.format.rate() as f64;
264        if (sample_rate - nominal).abs() < f64::EPSILON {
265            self.correct_after_x_frames = 0;
266        } else {
267            let ratio = nominal / sample_rate;
268            self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
269        }
270    }
271
272    /// Fill `output` with time-synchronized PCM data. Returns false if no data available.
273    pub fn get_player_chunk(
274        &mut self,
275        server_now_usec: i64,
276        output_buffer_dac_time_usec: i64,
277        output: &mut [u8],
278        frames: u32,
279    ) -> bool {
280        let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
281        if needs_new {
282            self.current = self.chunks.pop_front();
283        }
284        if self.current.is_none() {
285            return false;
286        }
287
288        // --- Hard sync: initial alignment ---
289        if self.hard_sync {
290            let chunk = self.current.as_ref().unwrap();
291            let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
292            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
293                + output_buffer_dac_time_usec;
294
295            if age_usec < -req_duration_usec {
296                self.get_silence(output, frames);
297                return true;
298            }
299
300            if age_usec > 0 {
301                self.current = None;
302                while let Some(mut c) = self.chunks.pop_front() {
303                    let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
304                        + output_buffer_dac_time_usec;
305                    if a > 0 && a < c.duration_usec() {
306                        let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
307                        c.seek(skip);
308                        self.current = Some(c);
309                        break;
310                    } else if a <= 0 {
311                        self.current = Some(c);
312                        break;
313                    }
314                }
315                if self.current.is_none() {
316                    return false;
317                }
318            }
319
320            let chunk = self.current.as_ref().unwrap();
321            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
322                + output_buffer_dac_time_usec;
323
324            if age_usec <= 0 {
325                let silent_frames =
326                    (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
327                let silent_frames = silent_frames.min(frames);
328                let frame_size = self.format.frame_size() as usize;
329
330                if silent_frames > 0 {
331                    output[..silent_frames as usize * frame_size].fill(0);
332                }
333                let remaining = frames - silent_frames;
334                if remaining > 0 {
335                    let offset = silent_frames as usize * frame_size;
336                    self.read_next(&mut output[offset..], remaining);
337                }
338                if silent_frames < frames {
339                    self.hard_sync = false;
340                    self.reset_buffers();
341                }
342                return true;
343            }
344            return false;
345        }
346
347        // --- Normal playback with drift correction ---
348
349        // Compute frames correction from current rate adjustment
350        let mut frames_correction: i32 = 0;
351        if self.correct_after_x_frames != 0 {
352            self.played_frames += frames;
353            if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
354                frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
355                self.played_frames %= self.correct_after_x_frames.unsigned_abs();
356            }
357        }
358
359        // Read with correction (or plain read if correction == 0)
360        let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
361            Some(ts) => ts,
362            None => return false,
363        };
364
365        let age_usec =
366            server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
367
368        // Reset sample rate to nominal, soft sync may override below
369        self.set_real_sample_rate(self.format.rate() as f64);
370
371        // Hard sync re-trigger thresholds (matching C++)
372        if self.buffer.full()
373            && self.median.abs() > HARD_SYNC_MEDIAN_USEC
374            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
375        {
376            tracing::info!(
377                median = self.median,
378                "Hard sync: buffer full, |median| > 2ms"
379            );
380            self.hard_sync = true;
381        } else if self.short_buffer.full()
382            && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
383            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
384        {
385            tracing::info!(
386                short_median = self.short_median,
387                "Hard sync: short buffer full, |short_median| > 5ms"
388            );
389            self.hard_sync = true;
390        } else if self.mini_buffer.full()
391            && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
392            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
393        {
394            tracing::info!(
395                age_usec,
396                mini_median = self.mini_buffer.median_simple(),
397                "Hard sync: mini buffer full, |mini_median| > 50ms"
398            );
399            self.hard_sync = true;
400        } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
401            tracing::info!(age_usec, "Hard sync: |age| > 500ms");
402            self.hard_sync = true;
403        } else if self.short_buffer.full() {
404            // Soft sync: adjust playback speed based on drift
405            let mini_median = self.mini_buffer.median_simple();
406            if self.short_median > CORRECTION_BEGIN_USEC
407                && mini_median > SOFT_SYNC_MIN_USEC
408                && age_usec > SOFT_SYNC_MIN_USEC
409            {
410                let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
411                let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
412                self.set_real_sample_rate(self.format.rate() as f64 * rate);
413            } else if self.short_median < -CORRECTION_BEGIN_USEC
414                && mini_median < -SOFT_SYNC_MIN_USEC
415                && age_usec < -SOFT_SYNC_MIN_USEC
416            {
417                let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
418                let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
419                self.set_real_sample_rate(self.format.rate() as f64 * rate);
420            }
421        }
422
423        self.update_buffers(age_usec);
424
425        // Stats logging (once per second)
426        let now_sec = server_now_usec / 1_000_000;
427        if now_sec != self.last_log_sec {
428            self.last_log_sec = now_sec;
429            self.median = self.buffer.median_simple();
430            self.short_median = self.short_buffer.median_simple();
431            tracing::debug!(
432                target: "Stats",
433                "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
434                age_usec,
435                self.mini_buffer.median_simple(),
436                self.short_median,
437                self.median,
438                self.buffer.len(),
439                output_buffer_dac_time_usec / 1000,
440                self.frame_delta,
441            );
442            self.frame_delta = 0;
443        }
444
445        age_usec.abs() < 500_000
446    }
447
448    /// Fill `output` with silence.
449    pub fn get_silence(&self, output: &mut [u8], frames: u32) {
450        let bytes = frames as usize * self.format.frame_size() as usize;
451        let len = bytes.min(output.len());
452        output[..len].fill(0);
453    }
454
455    /// Like [`get_player_chunk`](Self::get_player_chunk), but fills silence on failure.
456    pub fn get_player_chunk_or_silence(
457        &mut self,
458        server_now_usec: i64,
459        output_buffer_dac_time_usec: i64,
460        output: &mut [u8],
461        frames: u32,
462    ) -> bool {
463        let result =
464            self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
465        if !result {
466            self.get_silence(output, frames);
467        }
468        result
469    }
470
471    fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
472        let chunk = self.current.as_mut()?;
473        // Adjusted timestamp: chunk start + already-consumed frames
474        let frame_size = self.format.frame_size() as usize;
475        let consumed_frames = chunk.read_pos / frame_size;
476        let ts =
477            chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
478        let mut read = 0u32;
479        while read < frames {
480            let offset = read as usize * frame_size;
481            let n = chunk.read_frames(&mut output[offset..], frames - read);
482            read += n;
483            if read < frames && chunk.is_end() {
484                match self.chunks.pop_front() {
485                    Some(next) => *chunk = next,
486                    None => break,
487                }
488            }
489        }
490        Some(ts)
491    }
492
493    fn read_with_correction(
494        &mut self,
495        output: &mut [u8],
496        frames: u32,
497        correction: i32,
498    ) -> Option<i64> {
499        if correction == 0 {
500            return self.read_next(output, frames);
501        }
502
503        // Clamp correction to avoid underflow
504        let correction = correction.max(-(frames as i32) + 1);
505
506        self.frame_delta -= correction;
507        let to_read = (frames as i32 + correction) as u32;
508        let frame_size = self.format.frame_size() as usize;
509
510        self.read_buf.resize(to_read as usize * frame_size, 0);
511        let mut read_buf = std::mem::take(&mut self.read_buf);
512        let ts = self.read_next(&mut read_buf, to_read);
513
514        let max = if correction < 0 {
515            frames as usize
516        } else {
517            to_read as usize
518        };
519        let slices = (correction.unsigned_abs() as usize + 1).min(max);
520        let slice_size = max / slices;
521
522        let mut pos = 0usize;
523        for n in 0..slices {
524            let size = if n + 1 == slices {
525                max - pos
526            } else {
527                slice_size
528            };
529
530            if correction < 0 {
531                let src_start = (pos - n) * frame_size;
532                let dst_start = pos * frame_size;
533                let len = size * frame_size;
534                output[dst_start..dst_start + len]
535                    .copy_from_slice(&read_buf[src_start..src_start + len]);
536            } else {
537                let src_start = pos * frame_size;
538                let dst_start = (pos - n) * frame_size;
539                let len = size * frame_size;
540                output[dst_start..dst_start + len]
541                    .copy_from_slice(&read_buf[src_start..src_start + len]);
542            }
543            pos += size;
544        }
545
546        self.read_buf = read_buf;
547        ts
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    fn fmt() -> SampleFormat {
556        SampleFormat::new(48000, 16, 2)
557    }
558
559    fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
560        let bytes = frames as usize * format.frame_size() as usize;
561        let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
562        PcmChunk::new(Timeval { sec, usec }, data, format)
563    }
564
565    #[test]
566    fn pcm_chunk_duration() {
567        let f = fmt();
568        let chunk = make_chunk(0, 0, 480, f);
569        assert_eq!(chunk.duration_usec(), 10_000);
570    }
571
572    #[test]
573    fn pcm_chunk_read_frames() {
574        let f = fmt();
575        let mut chunk = make_chunk(0, 0, 100, f);
576        let mut buf = vec![0u8; 50 * f.frame_size() as usize];
577        let read = chunk.read_frames(&mut buf, 50);
578        assert_eq!(read, 50);
579        assert!(!chunk.is_end());
580        let read = chunk.read_frames(&mut buf, 50);
581        assert_eq!(read, 50);
582        assert!(chunk.is_end());
583    }
584
585    #[test]
586    fn pcm_chunk_seek() {
587        let f = fmt();
588        let mut chunk = make_chunk(0, 0, 100, f);
589        chunk.seek(90);
590        let mut buf = vec![0u8; 100 * f.frame_size() as usize];
591        let read = chunk.read_frames(&mut buf, 100);
592        assert_eq!(read, 10);
593    }
594
595    #[test]
596    fn stream_add_and_count() {
597        let f = fmt();
598        let mut stream = Stream::new(f);
599        assert_eq!(stream.chunk_count(), 0);
600        stream.add_chunk(make_chunk(100, 0, 480, f));
601        stream.add_chunk(make_chunk(100, 10_000, 480, f));
602        assert_eq!(stream.chunk_count(), 2);
603    }
604
605    #[test]
606    fn stream_clear() {
607        let f = fmt();
608        let mut stream = Stream::new(f);
609        stream.add_chunk(make_chunk(100, 0, 480, f));
610        stream.clear();
611        assert_eq!(stream.chunk_count(), 0);
612    }
613
614    #[test]
615    fn stream_silence_when_empty() {
616        let f = fmt();
617        let mut stream = Stream::new(f);
618        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
619        let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
620        assert!(!result);
621    }
622
623    #[test]
624    fn stream_hard_sync_plays_silence_when_too_early() {
625        let f = fmt();
626        let mut stream = Stream::new(f);
627        stream.set_buffer_ms(1000);
628        stream.add_chunk(make_chunk(100, 0, 4800, f));
629        let server_now = 100_000_000i64;
630        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
631        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
632        assert!(result);
633        assert!(buf.iter().all(|&b| b == 0));
634    }
635
636    #[test]
637    fn stream_hard_sync_plays_data_when_aligned() {
638        let f = fmt();
639        let mut stream = Stream::new(f);
640        stream.set_buffer_ms(1000);
641        stream.add_chunk(make_chunk(99, 0, 4800, f));
642        let server_now = 100_000_000i64;
643        let mut buf = vec![0u8; 480 * f.frame_size() as usize];
644        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
645        assert!(result);
646        assert!(buf.iter().any(|&b| b != 0));
647    }
648
649    #[test]
650    fn set_real_sample_rate_correction() {
651        let f = fmt();
652        let mut stream = Stream::new(f);
653        stream.set_real_sample_rate(48000.0);
654        assert_eq!(stream.correct_after_x_frames, 0);
655
656        stream.set_real_sample_rate(47999.0);
657        assert_ne!(stream.correct_after_x_frames, 0);
658    }
659
660    #[test]
661    fn read_with_correction_remove_one_frame() {
662        let f = fmt(); // 48000:16:2, frame_size=4
663        let mut stream = Stream::new(f);
664
665        let mut data = Vec::new();
666        for i in 0..10u16 {
667            data.extend_from_slice(&i.to_le_bytes());
668            data.extend_from_slice(&(i + 100).to_le_bytes());
669        }
670        stream.add_chunk(make_chunk(100, 0, 10, f));
671        stream.chunks.back_mut().unwrap().data = data;
672        stream.current = stream.chunks.pop_front();
673
674        let mut output = vec![0u8; 9 * f.frame_size() as usize];
675        let ts = stream.read_with_correction(&mut output, 9, 1);
676        assert!(ts.is_some());
677        assert_eq!(output.len(), 36);
678        for (i, chunk) in output.chunks(4).enumerate() {
679            let left = u16::from_le_bytes([chunk[0], chunk[1]]);
680            assert!(left <= 10, "frame {i}: left={left}");
681        }
682    }
683
684    #[test]
685    fn read_with_correction_zero_is_passthrough() {
686        let f = fmt();
687        let mut stream = Stream::new(f);
688        stream.add_chunk(make_chunk(100, 0, 100, f));
689        stream.current = stream.chunks.pop_front();
690
691        let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
692        stream.read_with_correction(&mut out1, 50, 0);
693
694        stream.add_chunk(make_chunk(100, 0, 100, f));
695        stream.current = stream.chunks.pop_front();
696
697        let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
698        stream.read_next(&mut out2, 50);
699
700        assert_eq!(out1, out2);
701    }
702}