Skip to main content

snapcast_client/
stream.rs

1//! Time-synchronized PCM audio stream buffer.
2
3use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10/// A decoded PCM chunk with a server-time timestamp and a read cursor.
11#[derive(Debug, Clone)]
12pub struct PcmChunk {
13    /// Server-time timestamp of this chunk.
14    pub timestamp: Timeval,
15    /// Raw PCM sample data.
16    pub data: Vec<u8>,
17    /// Sample format (rate, bits, channels).
18    pub format: SampleFormat,
19    read_pos: usize,
20}
21
22impl PcmChunk {
23    /// Create a new PCM chunk.
24    pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25        Self {
26            timestamp,
27            data,
28            format,
29            read_pos: 0,
30        }
31    }
32
33    /// Start time of this chunk in microseconds.
34    pub fn start_usec(&self) -> i64 {
35        self.timestamp.to_usec()
36    }
37
38    /// Duration of this chunk in microseconds.
39    pub fn duration_usec(&self) -> i64 {
40        if self.format.frame_size() == 0 || self.format.rate() == 0 {
41            return 0;
42        }
43        let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44        frames * 1_000_000 / self.format.rate() as i64
45    }
46
47    /// Read up to `frames` frames into `output`, returning the number read.
48    pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49        let frame_size = self.format.frame_size() as usize;
50        let available_bytes = self.data.len() - self.read_pos;
51        let available_frames = available_bytes / frame_size;
52        let to_read = (frames as usize).min(available_frames);
53        let bytes = to_read * frame_size;
54        output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55        self.read_pos += bytes;
56        to_read as u32
57    }
58
59    /// Returns true if all data has been read.
60    pub fn is_end(&self) -> bool {
61        self.read_pos >= self.data.len()
62    }
63
64    /// Skip forward by `frames` frames.
65    pub fn seek(&mut self, frames: u32) {
66        let bytes = frames as usize * self.format.frame_size() as usize;
67        self.read_pos = (self.read_pos + bytes).min(self.data.len());
68    }
69}
70
71/// Correction threshold — soft sync starts when |short_median| > 100µs
72const CORRECTION_BEGIN_USEC: i64 = 100;
73/// Hard sync: |median| exceeds this (µs).
74const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75/// Hard sync: |short_median| exceeds this (µs).
76const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77/// Hard sync: |mini_median| exceeds this (µs).
78const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79/// Hard sync: |age| exceeds this (µs).
80const HARD_SYNC_AGE_USEC: i64 = 500_000;
81/// Minimum |age| for hard sync re-trigger (µs).
82const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83/// Minimum |mini_median| for soft sync (µs).
84const SOFT_SYNC_MIN_USEC: i64 = 50;
85/// Maximum playback rate correction factor.
86const MAX_RATE_CORRECTION: f64 = 0.0005;
87/// Rate correction scaling factor.
88const RATE_CORRECTION_SCALE: f64 = 0.00005;
89/// DoubleBuffer capacity for mini (fast) drift detection.
90const MINI_BUFFER_SIZE: usize = 20;
91/// DoubleBuffer capacity for short-term drift detection.
92const SHORT_BUFFER_SIZE: usize = 100;
93/// DoubleBuffer capacity for long-term drift detection.
94const BUFFER_SIZE: usize = 500;
95/// Default buffer in milliseconds.
96const DEFAULT_BUFFER_MS: i64 = 1000;
97
98/// Time-synchronized PCM stream buffer.
99pub struct Stream {
100    format: SampleFormat,
101    chunks: VecDeque<PcmChunk>,
102    current: Option<PcmChunk>,
103    buffer_ms: i64,
104    hard_sync: bool,
105
106    // Drift detection
107    mini_buffer: DoubleBuffer,
108    short_buffer: DoubleBuffer,
109    buffer: DoubleBuffer,
110    median: i64,
111    short_median: i64,
112
113    // Soft sync
114    played_frames: u32,
115    correct_after_x_frames: i32,
116    frame_delta: i32,
117    read_buf: Vec<u8>,
118
119    // Stats
120    last_log_sec: i64,
121}
122
123impl Stream {
124    /// Create a new stream for the given sample format.
125    pub fn new(format: SampleFormat) -> Self {
126        Self {
127            format,
128            chunks: VecDeque::new(),
129            current: None,
130            buffer_ms: DEFAULT_BUFFER_MS,
131            hard_sync: true,
132            mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133            short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134            buffer: DoubleBuffer::new(BUFFER_SIZE),
135            median: 0,
136            short_median: 0,
137            played_frames: 0,
138            correct_after_x_frames: 0,
139            frame_delta: 0,
140            read_buf: Vec::new(),
141            last_log_sec: 0,
142        }
143    }
144
145    /// Returns the sample format.
146    pub fn format(&self) -> SampleFormat {
147        self.format
148    }
149
150    /// Set the target buffer size in milliseconds.
151    pub fn set_buffer_ms(&mut self, ms: i64) {
152        self.buffer_ms = ms;
153    }
154
155    /// Enqueue a decoded PCM chunk.
156    pub fn add_chunk(&mut self, chunk: PcmChunk) {
157        self.chunks.push_back(chunk);
158    }
159
160    /// Number of queued chunks.
161    pub fn chunk_count(&self) -> usize {
162        self.chunks.len()
163    }
164
165    /// Clear all queued chunks and reset sync state.
166    pub fn clear(&mut self) {
167        self.chunks.clear();
168        self.current = None;
169        self.hard_sync = true;
170    }
171
172    fn reset_buffers(&mut self) {
173        self.buffer.clear();
174        self.mini_buffer.clear();
175        self.short_buffer.clear();
176    }
177
178    fn update_buffers(&mut self, age: i64) {
179        self.buffer.add(age);
180        self.mini_buffer.add(age);
181        self.short_buffer.add(age);
182    }
183
184    fn set_real_sample_rate(&mut self, sample_rate: f64) {
185        let nominal = self.format.rate() as f64;
186        if (sample_rate - nominal).abs() < f64::EPSILON {
187            self.correct_after_x_frames = 0;
188        } else {
189            let ratio = nominal / sample_rate;
190            self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191        }
192    }
193
194    /// Fill `output` with time-synchronized PCM data. Returns false if no data available.
195    pub fn get_player_chunk(
196        &mut self,
197        server_now_usec: i64,
198        output_buffer_dac_time_usec: i64,
199        output: &mut [u8],
200        frames: u32,
201    ) -> bool {
202        let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203        if needs_new {
204            self.current = self.chunks.pop_front();
205        }
206        if self.current.is_none() {
207            return false;
208        }
209
210        // --- Hard sync: initial alignment ---
211        if self.hard_sync {
212            let chunk = self.current.as_ref().unwrap();
213            let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215                + output_buffer_dac_time_usec;
216
217            if age_usec < -req_duration_usec {
218                self.get_silence(output, frames);
219                return true;
220            }
221
222            if age_usec > 0 {
223                self.current = None;
224                while let Some(mut c) = self.chunks.pop_front() {
225                    let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226                        + output_buffer_dac_time_usec;
227                    if a > 0 && a < c.duration_usec() {
228                        let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229                        c.seek(skip);
230                        self.current = Some(c);
231                        break;
232                    } else if a <= 0 {
233                        self.current = Some(c);
234                        break;
235                    }
236                }
237                if self.current.is_none() {
238                    return false;
239                }
240            }
241
242            let chunk = self.current.as_ref().unwrap();
243            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244                + output_buffer_dac_time_usec;
245
246            if age_usec <= 0 {
247                let silent_frames =
248                    (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249                let silent_frames = silent_frames.min(frames);
250                let frame_size = self.format.frame_size() as usize;
251
252                if silent_frames > 0 {
253                    output[..silent_frames as usize * frame_size].fill(0);
254                }
255                let remaining = frames - silent_frames;
256                if remaining > 0 {
257                    let offset = silent_frames as usize * frame_size;
258                    self.read_next(&mut output[offset..], remaining);
259                }
260                if silent_frames < frames {
261                    self.hard_sync = false;
262                    self.reset_buffers();
263                }
264                return true;
265            }
266            return false;
267        }
268
269        // --- Normal playback with drift correction ---
270
271        // Compute frames correction from current rate adjustment
272        let mut frames_correction: i32 = 0;
273        if self.correct_after_x_frames != 0 {
274            self.played_frames += frames;
275            if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276                frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277                self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278            }
279        }
280
281        // Read with correction (or plain read if correction == 0)
282        let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283            Some(ts) => ts,
284            None => return false,
285        };
286
287        let age_usec =
288            server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290        // Reset sample rate to nominal, soft sync may override below
291        self.set_real_sample_rate(self.format.rate() as f64);
292
293        // Hard sync re-trigger thresholds (matching C++)
294        if self.buffer.full()
295            && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297        {
298            tracing::info!(
299                median = self.median,
300                "Hard sync: buffer full, |median| > 2ms"
301            );
302            self.hard_sync = true;
303        } else if self.short_buffer.full()
304            && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306        {
307            tracing::info!(
308                short_median = self.short_median,
309                "Hard sync: short buffer full, |short_median| > 5ms"
310            );
311            self.hard_sync = true;
312        } else if self.mini_buffer.full()
313            && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315        {
316            tracing::info!("Hard sync: mini buffer full, |mini_median| > 50ms");
317            self.hard_sync = true;
318        } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
319            tracing::info!(age_usec, "Hard sync: |age| > 500ms");
320            self.hard_sync = true;
321        } else if self.short_buffer.full() {
322            // Soft sync: adjust playback speed based on drift
323            let mini_median = self.mini_buffer.median_simple();
324            if self.short_median > CORRECTION_BEGIN_USEC
325                && mini_median > SOFT_SYNC_MIN_USEC
326                && age_usec > SOFT_SYNC_MIN_USEC
327            {
328                let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
329                let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
330                self.set_real_sample_rate(self.format.rate() as f64 * rate);
331            } else if self.short_median < -CORRECTION_BEGIN_USEC
332                && mini_median < -SOFT_SYNC_MIN_USEC
333                && age_usec < -SOFT_SYNC_MIN_USEC
334            {
335                let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
336                let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
337                self.set_real_sample_rate(self.format.rate() as f64 * rate);
338            }
339        }
340
341        self.update_buffers(age_usec);
342
343        // Stats logging (once per second)
344        let now_sec = server_now_usec / 1_000_000;
345        if now_sec != self.last_log_sec {
346            self.last_log_sec = now_sec;
347            self.median = self.buffer.median_simple();
348            self.short_median = self.short_buffer.median_simple();
349            tracing::debug!(
350                target: "Stats",
351                "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
352                age_usec,
353                self.mini_buffer.median_simple(),
354                self.short_median,
355                self.median,
356                self.buffer.len(),
357                output_buffer_dac_time_usec / 1000,
358                self.frame_delta,
359            );
360            self.frame_delta = 0;
361        }
362
363        age_usec.abs() < 500_000
364    }
365
366    /// Fill `output` with silence.
367    pub fn get_silence(&self, output: &mut [u8], frames: u32) {
368        let bytes = frames as usize * self.format.frame_size() as usize;
369        output[..bytes].fill(0);
370    }
371
372    /// Like [`get_player_chunk`](Self::get_player_chunk), but fills silence on failure.
373    pub fn get_player_chunk_or_silence(
374        &mut self,
375        server_now_usec: i64,
376        output_buffer_dac_time_usec: i64,
377        output: &mut [u8],
378        frames: u32,
379    ) -> bool {
380        let result =
381            self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
382        if !result {
383            self.get_silence(output, frames);
384        }
385        result
386    }
387
388    fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
389        let chunk = self.current.as_mut()?;
390        // Adjusted timestamp: chunk start + already-consumed frames
391        let frame_size = self.format.frame_size() as usize;
392        let consumed_frames = chunk.read_pos / frame_size;
393        let ts =
394            chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
395        let mut read = 0u32;
396        while read < frames {
397            let offset = read as usize * frame_size;
398            let n = chunk.read_frames(&mut output[offset..], frames - read);
399            read += n;
400            if read < frames && chunk.is_end() {
401                match self.chunks.pop_front() {
402                    Some(next) => *chunk = next,
403                    None => break,
404                }
405            }
406        }
407        Some(ts)
408    }
409
410    fn read_with_correction(
411        &mut self,
412        output: &mut [u8],
413        frames: u32,
414        correction: i32,
415    ) -> Option<i64> {
416        if correction == 0 {
417            return self.read_next(output, frames);
418        }
419
420        // Clamp correction to avoid underflow
421        let correction = correction.max(-(frames as i32) + 1);
422
423        self.frame_delta -= correction;
424        let to_read = (frames as i32 + correction) as u32;
425        let frame_size = self.format.frame_size() as usize;
426
427        self.read_buf.resize(to_read as usize * frame_size, 0);
428        let mut read_buf = std::mem::take(&mut self.read_buf);
429        let ts = self.read_next(&mut read_buf, to_read);
430
431        let max = if correction < 0 {
432            frames as usize
433        } else {
434            to_read as usize
435        };
436        let slices = (correction.unsigned_abs() as usize + 1).min(max);
437        let slice_size = max / slices;
438
439        let mut pos = 0usize;
440        for n in 0..slices {
441            let size = if n + 1 == slices {
442                max - pos
443            } else {
444                slice_size
445            };
446
447            if correction < 0 {
448                let src_start = (pos - n) * frame_size;
449                let dst_start = pos * frame_size;
450                let len = size * frame_size;
451                output[dst_start..dst_start + len]
452                    .copy_from_slice(&read_buf[src_start..src_start + len]);
453            } else {
454                let src_start = pos * frame_size;
455                let dst_start = (pos - n) * frame_size;
456                let len = size * frame_size;
457                output[dst_start..dst_start + len]
458                    .copy_from_slice(&read_buf[src_start..src_start + len]);
459            }
460            pos += size;
461        }
462
463        self.read_buf = read_buf;
464        ts
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    fn fmt() -> SampleFormat {
473        SampleFormat::new(48000, 16, 2)
474    }
475
476    fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
477        let bytes = frames as usize * format.frame_size() as usize;
478        let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
479        PcmChunk::new(Timeval { sec, usec }, data, format)
480    }
481
482    #[test]
483    fn pcm_chunk_duration() {
484        let f = fmt();
485        let chunk = make_chunk(0, 0, 480, f);
486        assert_eq!(chunk.duration_usec(), 10_000);
487    }
488
489    #[test]
490    fn pcm_chunk_read_frames() {
491        let f = fmt();
492        let mut chunk = make_chunk(0, 0, 100, f);
493        let mut buf = vec![0u8; 50 * f.frame_size() as usize];
494        let read = chunk.read_frames(&mut buf, 50);
495        assert_eq!(read, 50);
496        assert!(!chunk.is_end());
497        let read = chunk.read_frames(&mut buf, 50);
498        assert_eq!(read, 50);
499        assert!(chunk.is_end());
500    }
501
502    #[test]
503    fn pcm_chunk_seek() {
504        let f = fmt();
505        let mut chunk = make_chunk(0, 0, 100, f);
506        chunk.seek(90);
507        let mut buf = vec![0u8; 100 * f.frame_size() as usize];
508        let read = chunk.read_frames(&mut buf, 100);
509        assert_eq!(read, 10);
510    }
511
512    #[test]
513    fn stream_add_and_count() {
514        let f = fmt();
515        let mut stream = Stream::new(f);
516        assert_eq!(stream.chunk_count(), 0);
517        stream.add_chunk(make_chunk(100, 0, 480, f));
518        stream.add_chunk(make_chunk(100, 10_000, 480, f));
519        assert_eq!(stream.chunk_count(), 2);
520    }
521
522    #[test]
523    fn stream_clear() {
524        let f = fmt();
525        let mut stream = Stream::new(f);
526        stream.add_chunk(make_chunk(100, 0, 480, f));
527        stream.clear();
528        assert_eq!(stream.chunk_count(), 0);
529    }
530
531    #[test]
532    fn stream_silence_when_empty() {
533        let f = fmt();
534        let mut stream = Stream::new(f);
535        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
536        let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
537        assert!(!result);
538    }
539
540    #[test]
541    fn stream_hard_sync_plays_silence_when_too_early() {
542        let f = fmt();
543        let mut stream = Stream::new(f);
544        stream.set_buffer_ms(1000);
545        stream.add_chunk(make_chunk(100, 0, 4800, f));
546        let server_now = 100_000_000i64;
547        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
548        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
549        assert!(result);
550        assert!(buf.iter().all(|&b| b == 0));
551    }
552
553    #[test]
554    fn stream_hard_sync_plays_data_when_aligned() {
555        let f = fmt();
556        let mut stream = Stream::new(f);
557        stream.set_buffer_ms(1000);
558        stream.add_chunk(make_chunk(99, 0, 4800, f));
559        let server_now = 100_000_000i64;
560        let mut buf = vec![0u8; 480 * f.frame_size() as usize];
561        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
562        assert!(result);
563        assert!(buf.iter().any(|&b| b != 0));
564    }
565
566    #[test]
567    fn set_real_sample_rate_correction() {
568        let f = fmt();
569        let mut stream = Stream::new(f);
570        stream.set_real_sample_rate(48000.0);
571        assert_eq!(stream.correct_after_x_frames, 0);
572
573        stream.set_real_sample_rate(47999.0);
574        assert_ne!(stream.correct_after_x_frames, 0);
575    }
576
577    #[test]
578    fn read_with_correction_remove_one_frame() {
579        let f = fmt(); // 48000:16:2, frame_size=4
580        let mut stream = Stream::new(f);
581
582        let mut data = Vec::new();
583        for i in 0..10u16 {
584            data.extend_from_slice(&i.to_le_bytes());
585            data.extend_from_slice(&(i + 100).to_le_bytes());
586        }
587        stream.add_chunk(make_chunk(100, 0, 10, f));
588        stream.chunks.back_mut().unwrap().data = data;
589        stream.current = stream.chunks.pop_front();
590
591        let mut output = vec![0u8; 9 * f.frame_size() as usize];
592        let ts = stream.read_with_correction(&mut output, 9, 1);
593        assert!(ts.is_some());
594        assert_eq!(output.len(), 36);
595        for (i, chunk) in output.chunks(4).enumerate() {
596            let left = u16::from_le_bytes([chunk[0], chunk[1]]);
597            assert!(left <= 10, "frame {i}: left={left}");
598        }
599    }
600
601    #[test]
602    fn read_with_correction_zero_is_passthrough() {
603        let f = fmt();
604        let mut stream = Stream::new(f);
605        stream.add_chunk(make_chunk(100, 0, 100, f));
606        stream.current = stream.chunks.pop_front();
607
608        let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
609        stream.read_with_correction(&mut out1, 50, 0);
610
611        stream.add_chunk(make_chunk(100, 0, 100, f));
612        stream.current = stream.chunks.pop_front();
613
614        let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
615        stream.read_next(&mut out2, 50);
616
617        assert_eq!(out1, out2);
618    }
619}