Skip to main content

snapcast_client/
stream.rs

1//! Time-synchronized PCM audio stream buffer.
2
3use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10/// A decoded PCM chunk with a server-time timestamp and a read cursor.
11#[derive(Debug, Clone)]
12pub struct PcmChunk {
13    /// Server-time timestamp of this chunk.
14    pub timestamp: Timeval,
15    /// Raw PCM sample data.
16    pub data: Vec<u8>,
17    /// Sample format (rate, bits, channels).
18    pub format: SampleFormat,
19    read_pos: usize,
20}
21
22impl PcmChunk {
23    /// Create a new PCM chunk.
24    pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25        Self {
26            timestamp,
27            data,
28            format,
29            read_pos: 0,
30        }
31    }
32
33    /// Start time of this chunk in microseconds.
34    pub fn start_usec(&self) -> i64 {
35        self.timestamp.to_usec()
36    }
37
38    /// Duration of this chunk in microseconds.
39    pub fn duration_usec(&self) -> i64 {
40        if self.format.frame_size() == 0 || self.format.rate() == 0 {
41            return 0;
42        }
43        let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44        frames * 1_000_000 / self.format.rate() as i64
45    }
46
47    /// Read up to `frames` frames into `output`, returning the number read.
48    pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49        let frame_size = self.format.frame_size() as usize;
50        let available_bytes = self.data.len() - self.read_pos;
51        let available_frames = available_bytes / frame_size;
52        let to_read = (frames as usize).min(available_frames);
53        let bytes = to_read * frame_size;
54        output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55        self.read_pos += bytes;
56        to_read as u32
57    }
58
59    /// Returns true if all data has been read.
60    pub fn is_end(&self) -> bool {
61        self.read_pos >= self.data.len()
62    }
63
64    /// Skip forward by `frames` frames.
65    pub fn seek(&mut self, frames: u32) {
66        let bytes = frames as usize * self.format.frame_size() as usize;
67        self.read_pos = (self.read_pos + bytes).min(self.data.len());
68    }
69}
70
71/// Correction threshold — soft sync starts when |short_median| > 100µs
72const CORRECTION_BEGIN_USEC: i64 = 100;
73/// Hard sync: |median| exceeds this (µs).
74const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75/// Hard sync: |short_median| exceeds this (µs).
76const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77/// Hard sync: |mini_median| exceeds this (µs).
78const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79/// Hard sync: |age| exceeds this (µs).
80const HARD_SYNC_AGE_USEC: i64 = 500_000;
81/// Minimum |age| for hard sync re-trigger (µs).
82const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83/// Minimum |mini_median| for soft sync (µs).
84const SOFT_SYNC_MIN_USEC: i64 = 50;
85/// Maximum playback rate correction factor.
86const MAX_RATE_CORRECTION: f64 = 0.0005;
87/// Rate correction scaling factor.
88const RATE_CORRECTION_SCALE: f64 = 0.00005;
89/// DoubleBuffer capacity for mini (fast) drift detection.
90const MINI_BUFFER_SIZE: usize = 20;
91/// DoubleBuffer capacity for short-term drift detection.
92const SHORT_BUFFER_SIZE: usize = 100;
93/// DoubleBuffer capacity for long-term drift detection.
94const BUFFER_SIZE: usize = 500;
95/// Default buffer in milliseconds.
96const DEFAULT_BUFFER_MS: i64 = 1000;
97
98/// Time-synchronized PCM stream buffer.
99pub struct Stream {
100    format: SampleFormat,
101    chunks: VecDeque<PcmChunk>,
102    current: Option<PcmChunk>,
103    buffer_ms: i64,
104    hard_sync: bool,
105
106    // Drift detection
107    mini_buffer: DoubleBuffer,
108    short_buffer: DoubleBuffer,
109    buffer: DoubleBuffer,
110    median: i64,
111    short_median: i64,
112
113    // Soft sync
114    played_frames: u32,
115    correct_after_x_frames: i32,
116    frame_delta: i32,
117    read_buf: Vec<u8>,
118
119    // Stats
120    last_log_sec: i64,
121}
122
123impl Stream {
124    /// Create a new stream for the given sample format.
125    pub fn new(format: SampleFormat) -> Self {
126        Self {
127            format,
128            chunks: VecDeque::new(),
129            current: None,
130            buffer_ms: DEFAULT_BUFFER_MS,
131            hard_sync: true,
132            mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133            short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134            buffer: DoubleBuffer::new(BUFFER_SIZE),
135            median: 0,
136            short_median: 0,
137            played_frames: 0,
138            correct_after_x_frames: 0,
139            frame_delta: 0,
140            read_buf: Vec::new(),
141            last_log_sec: 0,
142        }
143    }
144
145    /// Returns the sample format.
146    pub fn format(&self) -> SampleFormat {
147        self.format
148    }
149
150    /// Set the target buffer size in milliseconds.
151    pub fn set_buffer_ms(&mut self, ms: i64) {
152        self.buffer_ms = ms;
153    }
154
155    /// Enqueue a decoded PCM chunk.
156    pub fn add_chunk(&mut self, chunk: PcmChunk) {
157        self.chunks.push_back(chunk);
158    }
159
160    /// Number of queued chunks.
161    pub fn chunk_count(&self) -> usize {
162        self.chunks.len()
163    }
164
165    /// Clear all queued chunks and reset sync state.
166    pub fn clear(&mut self) {
167        self.chunks.clear();
168        self.current = None;
169        self.hard_sync = true;
170    }
171
172    fn reset_buffers(&mut self) {
173        self.buffer.clear();
174        self.mini_buffer.clear();
175        self.short_buffer.clear();
176    }
177
178    fn update_buffers(&mut self, age: i64) {
179        self.buffer.add(age);
180        self.mini_buffer.add(age);
181        self.short_buffer.add(age);
182    }
183
184    fn set_real_sample_rate(&mut self, sample_rate: f64) {
185        let nominal = self.format.rate() as f64;
186        if (sample_rate - nominal).abs() < f64::EPSILON {
187            self.correct_after_x_frames = 0;
188        } else {
189            let ratio = nominal / sample_rate;
190            self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191        }
192    }
193
194    /// Fill `output` with time-synchronized PCM data. Returns false if no data available.
195    pub fn get_player_chunk(
196        &mut self,
197        server_now_usec: i64,
198        output_buffer_dac_time_usec: i64,
199        output: &mut [u8],
200        frames: u32,
201    ) -> bool {
202        let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203        if needs_new {
204            self.current = self.chunks.pop_front();
205        }
206        if self.current.is_none() {
207            return false;
208        }
209
210        // --- Hard sync: initial alignment ---
211        if self.hard_sync {
212            let chunk = self.current.as_ref().unwrap();
213            let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215                + output_buffer_dac_time_usec;
216
217            if age_usec < -req_duration_usec {
218                self.get_silence(output, frames);
219                return true;
220            }
221
222            if age_usec > 0 {
223                self.current = None;
224                while let Some(mut c) = self.chunks.pop_front() {
225                    let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226                        + output_buffer_dac_time_usec;
227                    if a > 0 && a < c.duration_usec() {
228                        let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229                        c.seek(skip);
230                        self.current = Some(c);
231                        break;
232                    } else if a <= 0 {
233                        self.current = Some(c);
234                        break;
235                    }
236                }
237                if self.current.is_none() {
238                    return false;
239                }
240            }
241
242            let chunk = self.current.as_ref().unwrap();
243            let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244                + output_buffer_dac_time_usec;
245
246            if age_usec <= 0 {
247                let silent_frames =
248                    (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249                let silent_frames = silent_frames.min(frames);
250                let frame_size = self.format.frame_size() as usize;
251
252                if silent_frames > 0 {
253                    output[..silent_frames as usize * frame_size].fill(0);
254                }
255                let remaining = frames - silent_frames;
256                if remaining > 0 {
257                    let offset = silent_frames as usize * frame_size;
258                    self.read_next(&mut output[offset..], remaining);
259                }
260                if silent_frames < frames {
261                    self.hard_sync = false;
262                    self.reset_buffers();
263                }
264                return true;
265            }
266            return false;
267        }
268
269        // --- Normal playback with drift correction ---
270
271        // Compute frames correction from current rate adjustment
272        let mut frames_correction: i32 = 0;
273        if self.correct_after_x_frames != 0 {
274            self.played_frames += frames;
275            if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276                frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277                self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278            }
279        }
280
281        // Read with correction (or plain read if correction == 0)
282        let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283            Some(ts) => ts,
284            None => return false,
285        };
286
287        let age_usec =
288            server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290        // Reset sample rate to nominal, soft sync may override below
291        self.set_real_sample_rate(self.format.rate() as f64);
292
293        // Hard sync re-trigger thresholds (matching C++)
294        if self.buffer.full()
295            && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297        {
298            tracing::info!(
299                median = self.median,
300                "Hard sync: buffer full, |median| > 2ms"
301            );
302            self.hard_sync = true;
303        } else if self.short_buffer.full()
304            && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306        {
307            tracing::info!(
308                short_median = self.short_median,
309                "Hard sync: short buffer full, |short_median| > 5ms"
310            );
311            self.hard_sync = true;
312        } else if self.mini_buffer.full()
313            && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314            && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315        {
316            tracing::info!("Hard sync: mini buffer full, |mini_median| > 50ms");
317            self.hard_sync = true;
318        } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
319            tracing::info!(age_usec, "Hard sync: |age| > 500ms");
320            self.hard_sync = true;
321        } else if self.short_buffer.full() {
322            // Soft sync: adjust playback speed based on drift
323            let mini_median = self.mini_buffer.median_simple();
324            if self.short_median > CORRECTION_BEGIN_USEC
325                && mini_median > SOFT_SYNC_MIN_USEC
326                && age_usec > SOFT_SYNC_MIN_USEC
327            {
328                let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
329                let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
330                self.set_real_sample_rate(self.format.rate() as f64 * rate);
331            } else if self.short_median < -CORRECTION_BEGIN_USEC
332                && mini_median < -SOFT_SYNC_MIN_USEC
333                && age_usec < -SOFT_SYNC_MIN_USEC
334            {
335                let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
336                let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
337                self.set_real_sample_rate(self.format.rate() as f64 * rate);
338            }
339        }
340
341        self.update_buffers(age_usec);
342
343        // Stats logging (once per second)
344        let now_sec = server_now_usec / 1_000_000;
345        if now_sec != self.last_log_sec {
346            self.last_log_sec = now_sec;
347            self.median = self.buffer.median_simple();
348            self.short_median = self.short_buffer.median_simple();
349            tracing::debug!(
350                target: "Stats",
351                "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
352                age_usec,
353                self.mini_buffer.median_simple(),
354                self.short_median,
355                self.median,
356                self.buffer.len(),
357                output_buffer_dac_time_usec / 1000,
358                self.frame_delta,
359            );
360            self.frame_delta = 0;
361        }
362
363        age_usec.abs() < 500_000
364    }
365
366    /// Fill `output` with silence.
367    pub fn get_silence(&self, output: &mut [u8], frames: u32) {
368        let bytes = frames as usize * self.format.frame_size() as usize;
369        let len = bytes.min(output.len());
370        output[..len].fill(0);
371    }
372
373    /// Like [`get_player_chunk`](Self::get_player_chunk), but fills silence on failure.
374    pub fn get_player_chunk_or_silence(
375        &mut self,
376        server_now_usec: i64,
377        output_buffer_dac_time_usec: i64,
378        output: &mut [u8],
379        frames: u32,
380    ) -> bool {
381        let result =
382            self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
383        if !result {
384            self.get_silence(output, frames);
385        }
386        result
387    }
388
389    fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
390        let chunk = self.current.as_mut()?;
391        // Adjusted timestamp: chunk start + already-consumed frames
392        let frame_size = self.format.frame_size() as usize;
393        let consumed_frames = chunk.read_pos / frame_size;
394        let ts =
395            chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
396        let mut read = 0u32;
397        while read < frames {
398            let offset = read as usize * frame_size;
399            let n = chunk.read_frames(&mut output[offset..], frames - read);
400            read += n;
401            if read < frames && chunk.is_end() {
402                match self.chunks.pop_front() {
403                    Some(next) => *chunk = next,
404                    None => break,
405                }
406            }
407        }
408        Some(ts)
409    }
410
411    fn read_with_correction(
412        &mut self,
413        output: &mut [u8],
414        frames: u32,
415        correction: i32,
416    ) -> Option<i64> {
417        if correction == 0 {
418            return self.read_next(output, frames);
419        }
420
421        // Clamp correction to avoid underflow
422        let correction = correction.max(-(frames as i32) + 1);
423
424        self.frame_delta -= correction;
425        let to_read = (frames as i32 + correction) as u32;
426        let frame_size = self.format.frame_size() as usize;
427
428        self.read_buf.resize(to_read as usize * frame_size, 0);
429        let mut read_buf = std::mem::take(&mut self.read_buf);
430        let ts = self.read_next(&mut read_buf, to_read);
431
432        let max = if correction < 0 {
433            frames as usize
434        } else {
435            to_read as usize
436        };
437        let slices = (correction.unsigned_abs() as usize + 1).min(max);
438        let slice_size = max / slices;
439
440        let mut pos = 0usize;
441        for n in 0..slices {
442            let size = if n + 1 == slices {
443                max - pos
444            } else {
445                slice_size
446            };
447
448            if correction < 0 {
449                let src_start = (pos - n) * frame_size;
450                let dst_start = pos * frame_size;
451                let len = size * frame_size;
452                output[dst_start..dst_start + len]
453                    .copy_from_slice(&read_buf[src_start..src_start + len]);
454            } else {
455                let src_start = pos * frame_size;
456                let dst_start = (pos - n) * frame_size;
457                let len = size * frame_size;
458                output[dst_start..dst_start + len]
459                    .copy_from_slice(&read_buf[src_start..src_start + len]);
460            }
461            pos += size;
462        }
463
464        self.read_buf = read_buf;
465        ts
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    fn fmt() -> SampleFormat {
474        SampleFormat::new(48000, 16, 2)
475    }
476
477    fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
478        let bytes = frames as usize * format.frame_size() as usize;
479        let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
480        PcmChunk::new(Timeval { sec, usec }, data, format)
481    }
482
483    #[test]
484    fn pcm_chunk_duration() {
485        let f = fmt();
486        let chunk = make_chunk(0, 0, 480, f);
487        assert_eq!(chunk.duration_usec(), 10_000);
488    }
489
490    #[test]
491    fn pcm_chunk_read_frames() {
492        let f = fmt();
493        let mut chunk = make_chunk(0, 0, 100, f);
494        let mut buf = vec![0u8; 50 * f.frame_size() as usize];
495        let read = chunk.read_frames(&mut buf, 50);
496        assert_eq!(read, 50);
497        assert!(!chunk.is_end());
498        let read = chunk.read_frames(&mut buf, 50);
499        assert_eq!(read, 50);
500        assert!(chunk.is_end());
501    }
502
503    #[test]
504    fn pcm_chunk_seek() {
505        let f = fmt();
506        let mut chunk = make_chunk(0, 0, 100, f);
507        chunk.seek(90);
508        let mut buf = vec![0u8; 100 * f.frame_size() as usize];
509        let read = chunk.read_frames(&mut buf, 100);
510        assert_eq!(read, 10);
511    }
512
513    #[test]
514    fn stream_add_and_count() {
515        let f = fmt();
516        let mut stream = Stream::new(f);
517        assert_eq!(stream.chunk_count(), 0);
518        stream.add_chunk(make_chunk(100, 0, 480, f));
519        stream.add_chunk(make_chunk(100, 10_000, 480, f));
520        assert_eq!(stream.chunk_count(), 2);
521    }
522
523    #[test]
524    fn stream_clear() {
525        let f = fmt();
526        let mut stream = Stream::new(f);
527        stream.add_chunk(make_chunk(100, 0, 480, f));
528        stream.clear();
529        assert_eq!(stream.chunk_count(), 0);
530    }
531
532    #[test]
533    fn stream_silence_when_empty() {
534        let f = fmt();
535        let mut stream = Stream::new(f);
536        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
537        let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
538        assert!(!result);
539    }
540
541    #[test]
542    fn stream_hard_sync_plays_silence_when_too_early() {
543        let f = fmt();
544        let mut stream = Stream::new(f);
545        stream.set_buffer_ms(1000);
546        stream.add_chunk(make_chunk(100, 0, 4800, f));
547        let server_now = 100_000_000i64;
548        let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
549        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
550        assert!(result);
551        assert!(buf.iter().all(|&b| b == 0));
552    }
553
554    #[test]
555    fn stream_hard_sync_plays_data_when_aligned() {
556        let f = fmt();
557        let mut stream = Stream::new(f);
558        stream.set_buffer_ms(1000);
559        stream.add_chunk(make_chunk(99, 0, 4800, f));
560        let server_now = 100_000_000i64;
561        let mut buf = vec![0u8; 480 * f.frame_size() as usize];
562        let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
563        assert!(result);
564        assert!(buf.iter().any(|&b| b != 0));
565    }
566
567    #[test]
568    fn set_real_sample_rate_correction() {
569        let f = fmt();
570        let mut stream = Stream::new(f);
571        stream.set_real_sample_rate(48000.0);
572        assert_eq!(stream.correct_after_x_frames, 0);
573
574        stream.set_real_sample_rate(47999.0);
575        assert_ne!(stream.correct_after_x_frames, 0);
576    }
577
578    #[test]
579    fn read_with_correction_remove_one_frame() {
580        let f = fmt(); // 48000:16:2, frame_size=4
581        let mut stream = Stream::new(f);
582
583        let mut data = Vec::new();
584        for i in 0..10u16 {
585            data.extend_from_slice(&i.to_le_bytes());
586            data.extend_from_slice(&(i + 100).to_le_bytes());
587        }
588        stream.add_chunk(make_chunk(100, 0, 10, f));
589        stream.chunks.back_mut().unwrap().data = data;
590        stream.current = stream.chunks.pop_front();
591
592        let mut output = vec![0u8; 9 * f.frame_size() as usize];
593        let ts = stream.read_with_correction(&mut output, 9, 1);
594        assert!(ts.is_some());
595        assert_eq!(output.len(), 36);
596        for (i, chunk) in output.chunks(4).enumerate() {
597            let left = u16::from_le_bytes([chunk[0], chunk[1]]);
598            assert!(left <= 10, "frame {i}: left={left}");
599        }
600    }
601
602    #[test]
603    fn read_with_correction_zero_is_passthrough() {
604        let f = fmt();
605        let mut stream = Stream::new(f);
606        stream.add_chunk(make_chunk(100, 0, 100, f));
607        stream.current = stream.chunks.pop_front();
608
609        let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
610        stream.read_with_correction(&mut out1, 50, 0);
611
612        stream.add_chunk(make_chunk(100, 0, 100, f));
613        stream.current = stream.chunks.pop_front();
614
615        let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
616        stream.read_next(&mut out2, 50);
617
618        assert_eq!(out1, out2);
619    }
620}