Skip to main content

snapcast_client/
stream.rs

1//! Time-synchronized PCM audio stream buffer.
2
3use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10/// A decoded PCM chunk with a server-time timestamp and a read cursor.
11#[derive(Debug, Clone)]
12pub struct PcmChunk {
13    /// Server-time timestamp of this chunk.
14    pub timestamp: Timeval,
15    /// Raw PCM sample data.
16    pub data: Vec<u8>,
17    /// Sample format (rate, bits, channels).
18    pub format: SampleFormat,
19    read_pos: usize,
20}
21
22impl PcmChunk {
23    /// Create a new PCM chunk.
24    pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25        Self {
26            timestamp,
27            data,
28            format,
29            read_pos: 0,
30        }
31    }
32
33    /// Start time of this chunk in microseconds.
34    pub fn start_usec(&self) -> i64 {
35        self.timestamp.to_usec()
36    }
37
38    /// Duration of this chunk in microseconds.
39    pub fn duration_usec(&self) -> i64 {
40        if self.format.frame_size() == 0 || self.format.rate() == 0 {
41            return 0;
42        }
43        let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44        frames * 1_000_000 / self.format.rate() as i64
45    }
46
47    /// Read up to `frames` frames into `output`, returning the number read.
48    pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49        let frame_size = self.format.frame_size() as usize;
50        let available_bytes = self.data.len() - self.read_pos;
51        let available_frames = available_bytes / frame_size;
52        let to_read = (frames as usize).min(available_frames);
53        let bytes = to_read * frame_size;
54        output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55        self.read_pos += bytes;
56        to_read as u32
57    }
58
59    /// Returns true if all data has been read.
60    pub fn is_end(&self) -> bool {
61        self.read_pos >= self.data.len()
62    }
63
64    /// Skip forward by `frames` frames.
65    pub fn seek(&mut self, frames: u32) {
66        let bytes = frames as usize * self.format.frame_size() as usize;
67        self.read_pos = (self.read_pos + bytes).min(self.data.len());
68    }
69}
70
71/// Correction threshold — soft sync starts when |short_median| > 100µs
72const CORRECTION_BEGIN_USEC: i64 = 100;
73/// Hard sync: |median| exceeds this (µs).
74const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75/// Hard sync: |short_median| exceeds this (µs).
76const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77/// Hard sync: |mini_median| exceeds this (µs).
78const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79/// Hard sync: |age| exceeds this (µs).
80const HARD_SYNC_AGE_USEC: i64 = 500_000;
81/// Minimum |age| for hard sync re-trigger (µs).
82const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83/// Minimum |mini_median| for soft sync (µs).
84const SOFT_SYNC_MIN_USEC: i64 = 50;
85/// Maximum playback rate correction factor.
86const MAX_RATE_CORRECTION: f64 = 0.0005;
87/// Rate correction scaling factor.
88const RATE_CORRECTION_SCALE: f64 = 0.00005;
89/// DoubleBuffer capacity for mini (fast) drift detection.
90const MINI_BUFFER_SIZE: usize = 20;
91/// DoubleBuffer capacity for short-term drift detection.
92const SHORT_BUFFER_SIZE: usize = 100;
93/// DoubleBuffer capacity for long-term drift detection.
94const BUFFER_SIZE: usize = 500;
95/// Default buffer in milliseconds.
96const DEFAULT_BUFFER_MS: i64 = 1000;
97
98/// Time-synchronized PCM stream buffer.
99pub struct Stream {
100    format: SampleFormat,
101    chunks: VecDeque<PcmChunk>,
102    current: Option<PcmChunk>,
103    buffer_ms: i64,
104    hard_sync: bool,
105
106    // Drift detection
107    mini_buffer: DoubleBuffer,
108    short_buffer: DoubleBuffer,
109    buffer: DoubleBuffer,
110    median: i64,
111    short_median: i64,
112
113    // Soft sync
114    played_frames: u32,
115    correct_after_x_frames: i32,
116    frame_delta: i32,
117    read_buf: Vec<u8>,
118
119    // Stats
120    last_log_sec: i64,
121}
122
123impl Stream {
124    /// Create a new stream for the given sample format.
125    pub fn new(format: SampleFormat) -> Self {
126        Self {
127            format,
128            chunks: VecDeque::new(),
129            current: None,
130            buffer_ms: DEFAULT_BUFFER_MS,
131            hard_sync: true,
132            mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133            short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134            buffer: DoubleBuffer::new(BUFFER_SIZE),
135            median: 0,
136            short_median: 0,
137            played_frames: 0,
138            correct_after_x_frames: 0,
139            frame_delta: 0,
140            read_buf: Vec::new(),
141            last_log_sec: 0,
142        }
143    }
144
145    /// Returns the sample format.
146    pub fn format(&self) -> SampleFormat {
147        self.format
148    }
149
150    /// Set the target buffer size in milliseconds.
151    pub fn set_buffer_ms(&mut self, ms: i64) {
152        self.buffer_ms = ms;
153    }
154
155    /// Enqueue a decoded PCM chunk.
156    pub fn add_chunk(&mut self, chunk: PcmChunk) {
157        self.chunks.push_back(chunk);
158    }
159
160    /// Number of queued chunks.
161    pub fn chunk_count(&self) -> usize {
162        self.chunks.len()
163    }
164
165    /// Clear all queued chunks and reset sync state.
166    pub fn clear(&mut self) {
167        self.chunks.clear();
168        self.current = None;
169        self.hard_sync = true;
170    }
171
172    fn reset_buffers(&mut self) {
173        self.buffer.clear();
174        self.mini_buffer.clear();
175        self.short_buffer.clear();
176    }
177
178    fn update_buffers(&mut self, age: i64) {
179        self.buffer.add(age);
180        self.mini_buffer.add(age);
181        self.short_buffer.add(age);
182    }
183
184    fn set_real_sample_rate(&mut self, sample_rate: f64) {
185        let nominal = self.format.rate() as f64;
186        if (sample_rate - nominal).abs() < f64::EPSILON {
187            self.correct_after_x_frames = 0;
188        } else {
189            let ratio = nominal / sample_rate;
190            self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191        }
192    }
193
194    /// Fill `output` with time-synchronized PCM data. Returns false if no data available.
195    pub fn get_player_chunk(
196        &mut self,
197        server_now_usec: i64,
198        output_buffer_dac_time_usec: i64,
199        output: &mut [u8],
200        frames: u32,
201    ) -> bool {
202        let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203        if needs_new {
204            self.current = self.chunks.pop_front();
205        }
206        if self.current.is_none() {
207            return false;
208        }
209
210        // --- Hard sync: initial alignment ---
211        if self.hard_sync {
212            let chunk = self.current.as_ref().unwrap();
213            let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215                + output_buffer_dac_time_usec;
216
217            if age_usec < -req_duration_usec {
218                self.get_silence(output, frames);
219                return true;
220            }
221
222            if age_usec > 0 {
223                self.current = None;
224                while let Some(mut c) = self.chunks.pop_front() {
225                    let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226                        + output_buffer_dac_time_usec;
227                    if a > 0 && a < c.duration_usec() {
228                        let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229                        c.seek(skip);
230                        self.current = Some(c);
231                        break;
232                    } else if a <= 0 {
233                        self.current = Some(c);
234                        break;
235                    }
236                }
237                if self.current.is_none() {
238                    return false;
239                }
240            }
241
242            let chunk = self.current.as_ref().unwrap();
243            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244                + output_buffer_dac_time_usec;
245
246            if age_usec <= 0 {
247                let silent_frames =
248                    (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249                let silent_frames = silent_frames.min(frames);
250                let frame_size = self.format.frame_size() as usize;
251
252                if silent_frames > 0 {
253                    output[..silent_frames as usize * frame_size].fill(0);
254                }
255                let remaining = frames - silent_frames;
256                if remaining > 0 {
257                    let offset = silent_frames as usize * frame_size;
258                    self.read_next(&mut output[offset..], remaining);
259                }
260                if silent_frames < frames {
261                    self.hard_sync = false;
262                    self.reset_buffers();
263                }
264                return true;
265            }
266            return false;
267        }
268
269        // --- Normal playback with drift correction ---
270
271        // Compute frames correction from current rate adjustment
272        let mut frames_correction: i32 = 0;
273        if self.correct_after_x_frames != 0 {
274            self.played_frames += frames;
275            if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276                frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277                self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278            }
279        }
280
281        // Read with correction (or plain read if correction == 0)
282        let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283            Some(ts) => ts,
284            None => return false,
285        };
286
287        let age_usec =
288            server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290        // Reset sample rate to nominal, soft sync may override below
291        self.set_real_sample_rate(self.format.rate() as f64);
292
293        // Hard sync re-trigger thresholds (matching C++)
294        if self.buffer.full()
295            && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297        {
298            tracing::info!(
299                median = self.median,
300                "Hard sync: buffer full, |median| > 2ms"
301            );
302            self.hard_sync = true;
303        } else if self.short_buffer.full()
304            && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306        {
307            tracing::info!(
308                short_median = self.short_median,
309                "Hard sync: short buffer full, |short_median| > 5ms"
310            );
311            self.hard_sync = true;
312        } else if self.mini_buffer.full()
313            && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315        {
316            tracing::info!(
317                age_usec,
318                mini_median = self.mini_buffer.median_simple(),
319                "Hard sync: mini buffer full, |mini_median| > 50ms"
320            );
321            self.hard_sync = true;
322        } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
323            tracing::info!(age_usec, "Hard sync: |age| > 500ms");
324            self.hard_sync = true;
325        } else if self.short_buffer.full() {
326            // Soft sync: adjust playback speed based on drift
327            let mini_median = self.mini_buffer.median_simple();
328            if self.short_median > CORRECTION_BEGIN_USEC
329                && mini_median > SOFT_SYNC_MIN_USEC
330                && age_usec > SOFT_SYNC_MIN_USEC
331            {
332                let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
333                let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
334                self.set_real_sample_rate(self.format.rate() as f64 * rate);
335            } else if self.short_median < -CORRECTION_BEGIN_USEC
336                && mini_median < -SOFT_SYNC_MIN_USEC
337                && age_usec < -SOFT_SYNC_MIN_USEC
338            {
339                let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
340                let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
341                self.set_real_sample_rate(self.format.rate() as f64 * rate);
342            }
343        }
344
345        self.update_buffers(age_usec);
346
347        // Stats logging (once per second)
348        let now_sec = server_now_usec / 1_000_000;
349        if now_sec != self.last_log_sec {
350            self.last_log_sec = now_sec;
351            self.median = self.buffer.median_simple();
352            self.short_median = self.short_buffer.median_simple();
353            tracing::debug!(
354                target: "Stats",
355                "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
356                age_usec,
357                self.mini_buffer.median_simple(),
358                self.short_median,
359                self.median,
360                self.buffer.len(),
361                output_buffer_dac_time_usec / 1000,
362                self.frame_delta,
363            );
364            self.frame_delta = 0;
365        }
366
367        age_usec.abs() < 500_000
368    }
369
370    /// Fill `output` with silence.
371    pub fn get_silence(&self, output: &mut [u8], frames: u32) {
372        let bytes = frames as usize * self.format.frame_size() as usize;
373        let len = bytes.min(output.len());
374        output[..len].fill(0);
375    }
376
377    /// Like [`get_player_chunk`](Self::get_player_chunk), but fills silence on failure.
378    pub fn get_player_chunk_or_silence(
379        &mut self,
380        server_now_usec: i64,
381        output_buffer_dac_time_usec: i64,
382        output: &mut [u8],
383        frames: u32,
384    ) -> bool {
385        let result =
386            self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
387        if !result {
388            self.get_silence(output, frames);
389        }
390        result
391    }
392
393    fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
394        let chunk = self.current.as_mut()?;
395        // Adjusted timestamp: chunk start + already-consumed frames
396        let frame_size = self.format.frame_size() as usize;
397        let consumed_frames = chunk.read_pos / frame_size;
398        let ts =
399            chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
400        let mut read = 0u32;
401        while read < frames {
402            let offset = read as usize * frame_size;
403            let n = chunk.read_frames(&mut output[offset..], frames - read);
404            read += n;
405            if read < frames && chunk.is_end() {
406                match self.chunks.pop_front() {
407                    Some(next) => *chunk = next,
408                    None => break,
409                }
410            }
411        }
412        Some(ts)
413    }
414
415    fn read_with_correction(
416        &mut self,
417        output: &mut [u8],
418        frames: u32,
419        correction: i32,
420    ) -> Option<i64> {
421        if correction == 0 {
422            return self.read_next(output, frames);
423        }
424
425        // Clamp correction to avoid underflow
426        let correction = correction.max(-(frames as i32) + 1);
427
428        self.frame_delta -= correction;
429        let to_read = (frames as i32 + correction) as u32;
430        let frame_size = self.format.frame_size() as usize;
431
432        self.read_buf.resize(to_read as usize * frame_size, 0);
433        let mut read_buf = std::mem::take(&mut self.read_buf);
434        let ts = self.read_next(&mut read_buf, to_read);
435
436        let max = if correction < 0 {
437            frames as usize
438        } else {
439            to_read as usize
440        };
441        let slices = (correction.unsigned_abs() as usize + 1).min(max);
442        let slice_size = max / slices;
443
444        let mut pos = 0usize;
445        for n in 0..slices {
446            let size = if n + 1 == slices {
447                max - pos
448            } else {
449                slice_size
450            };
451
452            if correction < 0 {
453                let src_start = (pos - n) * frame_size;
454                let dst_start = pos * frame_size;
455                let len = size * frame_size;
456                output[dst_start..dst_start + len]
457                    .copy_from_slice(&read_buf[src_start..src_start + len]);
458            } else {
459                let src_start = pos * frame_size;
460                let dst_start = (pos - n) * frame_size;
461                let len = size * frame_size;
462                output[dst_start..dst_start + len]
463                    .copy_from_slice(&read_buf[src_start..src_start + len]);
464            }
465            pos += size;
466        }
467
468        self.read_buf = read_buf;
469        ts
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    fn fmt() -> SampleFormat {
478        SampleFormat::new(48000, 16, 2)
479    }
480
481    fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
482        let bytes = frames as usize * format.frame_size() as usize;
483        let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
484        PcmChunk::new(Timeval { sec, usec }, data, format)
485    }
486
487    #[test]
488    fn pcm_chunk_duration() {
489        let f = fmt();
490        let chunk = make_chunk(0, 0, 480, f);
491        assert_eq!(chunk.duration_usec(), 10_000);
492    }
493
494    #[test]
495    fn pcm_chunk_read_frames() {
496        let f = fmt();
497        let mut chunk = make_chunk(0, 0, 100, f);
498        let mut buf = vec![0u8; 50 * f.frame_size() as usize];
499        let read = chunk.read_frames(&mut buf, 50);
500        assert_eq!(read, 50);
501        assert!(!chunk.is_end());
502        let read = chunk.read_frames(&mut buf, 50);
503        assert_eq!(read, 50);
504        assert!(chunk.is_end());
505    }
506
507    #[test]
508    fn pcm_chunk_seek() {
509        let f = fmt();
510        let mut chunk = make_chunk(0, 0, 100, f);
511        chunk.seek(90);
512        let mut buf = vec![0u8; 100 * f.frame_size() as usize];
513        let read = chunk.read_frames(&mut buf, 100);
514        assert_eq!(read, 10);
515    }
516
517    #[test]
518    fn stream_add_and_count() {
519        let f = fmt();
520        let mut stream = Stream::new(f);
521        assert_eq!(stream.chunk_count(), 0);
522        stream.add_chunk(make_chunk(100, 0, 480, f));
523        stream.add_chunk(make_chunk(100, 10_000, 480, f));
524        assert_eq!(stream.chunk_count(), 2);
525    }
526
527    #[test]
528    fn stream_clear() {
529        let f = fmt();
530        let mut stream = Stream::new(f);
531        stream.add_chunk(make_chunk(100, 0, 480, f));
532        stream.clear();
533        assert_eq!(stream.chunk_count(), 0);
534    }
535
536    #[test]
537    fn stream_silence_when_empty() {
538        let f = fmt();
539        let mut stream = Stream::new(f);
540        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
541        let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
542        assert!(!result);
543    }
544
545    #[test]
546    fn stream_hard_sync_plays_silence_when_too_early() {
547        let f = fmt();
548        let mut stream = Stream::new(f);
549        stream.set_buffer_ms(1000);
550        stream.add_chunk(make_chunk(100, 0, 4800, f));
551        let server_now = 100_000_000i64;
552        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
553        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
554        assert!(result);
555        assert!(buf.iter().all(|&b| b == 0));
556    }
557
558    #[test]
559    fn stream_hard_sync_plays_data_when_aligned() {
560        let f = fmt();
561        let mut stream = Stream::new(f);
562        stream.set_buffer_ms(1000);
563        stream.add_chunk(make_chunk(99, 0, 4800, f));
564        let server_now = 100_000_000i64;
565        let mut buf = vec![0u8; 480 * f.frame_size() as usize];
566        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
567        assert!(result);
568        assert!(buf.iter().any(|&b| b != 0));
569    }
570
571    #[test]
572    fn set_real_sample_rate_correction() {
573        let f = fmt();
574        let mut stream = Stream::new(f);
575        stream.set_real_sample_rate(48000.0);
576        assert_eq!(stream.correct_after_x_frames, 0);
577
578        stream.set_real_sample_rate(47999.0);
579        assert_ne!(stream.correct_after_x_frames, 0);
580    }
581
582    #[test]
583    fn read_with_correction_remove_one_frame() {
584        let f = fmt(); // 48000:16:2, frame_size=4
585        let mut stream = Stream::new(f);
586
587        let mut data = Vec::new();
588        for i in 0..10u16 {
589            data.extend_from_slice(&i.to_le_bytes());
590            data.extend_from_slice(&(i + 100).to_le_bytes());
591        }
592        stream.add_chunk(make_chunk(100, 0, 10, f));
593        stream.chunks.back_mut().unwrap().data = data;
594        stream.current = stream.chunks.pop_front();
595
596        let mut output = vec![0u8; 9 * f.frame_size() as usize];
597        let ts = stream.read_with_correction(&mut output, 9, 1);
598        assert!(ts.is_some());
599        assert_eq!(output.len(), 36);
600        for (i, chunk) in output.chunks(4).enumerate() {
601            let left = u16::from_le_bytes([chunk[0], chunk[1]]);
602            assert!(left <= 10, "frame {i}: left={left}");
603        }
604    }
605
606    #[test]
607    fn read_with_correction_zero_is_passthrough() {
608        let f = fmt();
609        let mut stream = Stream::new(f);
610        stream.add_chunk(make_chunk(100, 0, 100, f));
611        stream.current = stream.chunks.pop_front();
612
613        let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
614        stream.read_with_correction(&mut out1, 50, 0);
615
616        stream.add_chunk(make_chunk(100, 0, 100, f));
617        stream.current = stream.chunks.pop_front();
618
619        let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
620        stream.read_next(&mut out2, 50);
621
622        assert_eq!(out1, out2);
623    }
624}