1use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum SampleEncoding {
13 PcmInt,
15 Float32,
17}
18
19#[derive(Debug, Clone)]
21pub struct PcmChunk {
22 pub timestamp: Timeval,
24 pub data: Vec<u8>,
26 pub format: SampleFormat,
28 pub encoding: SampleEncoding,
30 read_pos: usize,
31}
32
33impl PcmChunk {
34 pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
36 Self::new_with_encoding(timestamp, data, format, SampleEncoding::PcmInt)
37 }
38
39 pub fn new_with_encoding(
41 timestamp: Timeval,
42 data: Vec<u8>,
43 format: SampleFormat,
44 encoding: SampleEncoding,
45 ) -> Self {
46 Self {
47 timestamp,
48 data,
49 format,
50 encoding,
51 read_pos: 0,
52 }
53 }
54
55 pub fn start_usec(&self) -> i64 {
57 self.timestamp.to_usec()
58 }
59
60 pub fn duration_usec(&self) -> i64 {
62 if self.format.frame_size() == 0 || self.format.rate() == 0 {
63 return 0;
64 }
65 let frames = self.data.len() as i64 / self.format.frame_size() as i64;
66 frames * 1_000_000 / self.format.rate() as i64
67 }
68
69 pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
71 let frame_size = self.format.frame_size() as usize;
72 if frame_size == 0 {
73 return 0;
74 }
75 let available_bytes = self.data.len() - self.read_pos;
76 let available_frames = available_bytes / frame_size;
77 let to_read = (frames as usize).min(available_frames);
78 let bytes = to_read * frame_size;
79 output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
80 self.read_pos += bytes;
81 to_read as u32
82 }
83
84 pub fn is_end(&self) -> bool {
86 self.read_pos >= self.data.len()
87 }
88
89 pub fn seek(&mut self, frames: u32) {
91 let bytes = frames as usize * self.format.frame_size() as usize;
92 self.read_pos = (self.read_pos + bytes).min(self.data.len());
93 }
94}
95
96const CORRECTION_BEGIN_USEC: i64 = 100;
98const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
100const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
102const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
104const HARD_SYNC_AGE_USEC: i64 = 500_000;
106const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
108const SOFT_SYNC_MIN_USEC: i64 = 50;
110const MAX_RATE_CORRECTION: f64 = 0.0005;
112const RATE_CORRECTION_SCALE: f64 = 0.00005;
114const MINI_BUFFER_SIZE: usize = 20;
116const SHORT_BUFFER_SIZE: usize = 100;
118const BUFFER_SIZE: usize = 500;
120const DEFAULT_BUFFER_MS: i64 = 1000;
122
123pub struct Stream {
154 format: SampleFormat,
156 encoding: SampleEncoding,
158 chunks: VecDeque<PcmChunk>,
160 current: Option<PcmChunk>,
162 buffer_ms: i64,
164 hard_sync: bool,
166
167 mini_buffer: DoubleBuffer,
169 short_buffer: DoubleBuffer,
170 buffer: DoubleBuffer,
171 median: i64,
173 short_median: i64,
175
176 played_frames: u32,
179 correct_after_x_frames: i32,
181 frame_delta: i32,
183 read_buf: Vec<u8>,
185
186 last_log_sec: i64,
188}
189
190impl Stream {
191 pub fn new(format: SampleFormat) -> Self {
193 Self::with_encoding(format, SampleEncoding::PcmInt)
194 }
195
196 pub fn with_encoding(format: SampleFormat, encoding: SampleEncoding) -> Self {
198 Self {
199 format,
200 encoding,
201 chunks: VecDeque::new(),
202 current: None,
203 buffer_ms: DEFAULT_BUFFER_MS,
204 hard_sync: true,
205 mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
206 short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
207 buffer: DoubleBuffer::new(BUFFER_SIZE),
208 median: 0,
209 short_median: 0,
210 played_frames: 0,
211 correct_after_x_frames: 0,
212 frame_delta: 0,
213 read_buf: Vec::new(),
214 last_log_sec: 0,
215 }
216 }
217
218 pub fn format(&self) -> SampleFormat {
220 self.format
221 }
222
223 pub fn encoding(&self) -> SampleEncoding {
225 self.encoding
226 }
227
228 pub fn set_buffer_ms(&mut self, ms: i64) {
230 self.buffer_ms = ms;
231 }
232
233 pub fn add_chunk(&mut self, chunk: PcmChunk) {
235 self.chunks.push_back(chunk);
236 }
237
238 pub fn chunk_count(&self) -> usize {
240 self.chunks.len()
241 }
242
243 pub fn clear(&mut self) {
245 self.chunks.clear();
246 self.current = None;
247 self.hard_sync = true;
248 }
249
250 fn reset_buffers(&mut self) {
251 self.buffer.clear();
252 self.mini_buffer.clear();
253 self.short_buffer.clear();
254 }
255
256 fn update_buffers(&mut self, age: i64) {
257 self.buffer.add(age);
258 self.mini_buffer.add(age);
259 self.short_buffer.add(age);
260 }
261
262 fn set_real_sample_rate(&mut self, sample_rate: f64) {
263 let nominal = self.format.rate() as f64;
264 if (sample_rate - nominal).abs() < f64::EPSILON {
265 self.correct_after_x_frames = 0;
266 } else {
267 let ratio = nominal / sample_rate;
268 self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
269 }
270 }
271
272 pub fn get_player_chunk(
274 &mut self,
275 server_now_usec: i64,
276 output_buffer_dac_time_usec: i64,
277 output: &mut [u8],
278 frames: u32,
279 ) -> bool {
280 let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
281 if needs_new {
282 self.current = self.chunks.pop_front();
283 }
284 if self.current.is_none() {
285 return false;
286 }
287
288 if self.hard_sync {
290 let chunk = self.current.as_ref().unwrap();
291 let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
292 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
293 + output_buffer_dac_time_usec;
294
295 if age_usec < -req_duration_usec {
296 self.get_silence(output, frames);
297 return true;
298 }
299
300 if age_usec > 0 {
301 self.current = None;
302 while let Some(mut c) = self.chunks.pop_front() {
303 let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
304 + output_buffer_dac_time_usec;
305 if a > 0 && a < c.duration_usec() {
306 let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
307 c.seek(skip);
308 self.current = Some(c);
309 break;
310 } else if a <= 0 {
311 self.current = Some(c);
312 break;
313 }
314 }
315 if self.current.is_none() {
316 return false;
317 }
318 }
319
320 let chunk = self.current.as_ref().unwrap();
321 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
322 + output_buffer_dac_time_usec;
323
324 if age_usec <= 0 {
325 let silent_frames =
326 (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
327 let silent_frames = silent_frames.min(frames);
328 let frame_size = self.format.frame_size() as usize;
329
330 if silent_frames > 0 {
331 output[..silent_frames as usize * frame_size].fill(0);
332 }
333 let remaining = frames - silent_frames;
334 if remaining > 0 {
335 let offset = silent_frames as usize * frame_size;
336 self.read_next(&mut output[offset..], remaining);
337 }
338 if silent_frames < frames {
339 self.hard_sync = false;
340 self.reset_buffers();
341 }
342 return true;
343 }
344 return false;
345 }
346
347 let mut frames_correction: i32 = 0;
351 if self.correct_after_x_frames != 0 {
352 self.played_frames += frames;
353 if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
354 frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
355 self.played_frames %= self.correct_after_x_frames.unsigned_abs();
356 }
357 }
358
359 let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
361 Some(ts) => ts,
362 None => return false,
363 };
364
365 let age_usec =
366 server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
367
368 self.set_real_sample_rate(self.format.rate() as f64);
370
371 if self.buffer.full()
373 && self.median.abs() > HARD_SYNC_MEDIAN_USEC
374 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
375 {
376 tracing::info!(
377 median = self.median,
378 "Hard sync: buffer full, |median| > 2ms"
379 );
380 self.hard_sync = true;
381 } else if self.short_buffer.full()
382 && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
383 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
384 {
385 tracing::info!(
386 short_median = self.short_median,
387 "Hard sync: short buffer full, |short_median| > 5ms"
388 );
389 self.hard_sync = true;
390 } else if self.mini_buffer.full()
391 && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
392 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
393 {
394 tracing::info!(
395 age_usec,
396 mini_median = self.mini_buffer.median_simple(),
397 "Hard sync: mini buffer full, |mini_median| > 50ms"
398 );
399 self.hard_sync = true;
400 } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
401 tracing::info!(age_usec, "Hard sync: |age| > 500ms");
402 self.hard_sync = true;
403 } else if self.short_buffer.full() {
404 let mini_median = self.mini_buffer.median_simple();
406 if self.short_median > CORRECTION_BEGIN_USEC
407 && mini_median > SOFT_SYNC_MIN_USEC
408 && age_usec > SOFT_SYNC_MIN_USEC
409 {
410 let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
411 let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
412 self.set_real_sample_rate(self.format.rate() as f64 * rate);
413 } else if self.short_median < -CORRECTION_BEGIN_USEC
414 && mini_median < -SOFT_SYNC_MIN_USEC
415 && age_usec < -SOFT_SYNC_MIN_USEC
416 {
417 let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
418 let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
419 self.set_real_sample_rate(self.format.rate() as f64 * rate);
420 }
421 }
422
423 self.update_buffers(age_usec);
424
425 let now_sec = server_now_usec / 1_000_000;
427 if now_sec != self.last_log_sec {
428 self.last_log_sec = now_sec;
429 self.median = self.buffer.median_simple();
430 self.short_median = self.short_buffer.median_simple();
431 tracing::debug!(
432 target: "Stats",
433 "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
434 age_usec,
435 self.mini_buffer.median_simple(),
436 self.short_median,
437 self.median,
438 self.buffer.len(),
439 output_buffer_dac_time_usec / 1000,
440 self.frame_delta,
441 );
442 self.frame_delta = 0;
443 }
444
445 age_usec.abs() < 500_000
446 }
447
448 pub fn get_silence(&self, output: &mut [u8], frames: u32) {
450 let bytes = frames as usize * self.format.frame_size() as usize;
451 let len = bytes.min(output.len());
452 output[..len].fill(0);
453 }
454
455 pub fn get_player_chunk_or_silence(
457 &mut self,
458 server_now_usec: i64,
459 output_buffer_dac_time_usec: i64,
460 output: &mut [u8],
461 frames: u32,
462 ) -> bool {
463 let result =
464 self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
465 if !result {
466 self.get_silence(output, frames);
467 }
468 result
469 }
470
471 fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
472 let chunk = self.current.as_mut()?;
473 let frame_size = self.format.frame_size() as usize;
475 let consumed_frames = chunk.read_pos / frame_size;
476 let ts =
477 chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
478 let mut read = 0u32;
479 while read < frames {
480 let offset = read as usize * frame_size;
481 let n = chunk.read_frames(&mut output[offset..], frames - read);
482 read += n;
483 if read < frames && chunk.is_end() {
484 match self.chunks.pop_front() {
485 Some(next) => *chunk = next,
486 None => break,
487 }
488 }
489 }
490 Some(ts)
491 }
492
493 fn read_with_correction(
494 &mut self,
495 output: &mut [u8],
496 frames: u32,
497 correction: i32,
498 ) -> Option<i64> {
499 if correction == 0 {
500 return self.read_next(output, frames);
501 }
502
503 let correction = correction.max(-(frames as i32) + 1);
505
506 self.frame_delta -= correction;
507 let to_read = (frames as i32 + correction) as u32;
508 let frame_size = self.format.frame_size() as usize;
509
510 self.read_buf.resize(to_read as usize * frame_size, 0);
511 let mut read_buf = std::mem::take(&mut self.read_buf);
512 let ts = self.read_next(&mut read_buf, to_read);
513
514 let max = if correction < 0 {
515 frames as usize
516 } else {
517 to_read as usize
518 };
519 let slices = (correction.unsigned_abs() as usize + 1).min(max);
520 let slice_size = max / slices;
521
522 let mut pos = 0usize;
523 for n in 0..slices {
524 let size = if n + 1 == slices {
525 max - pos
526 } else {
527 slice_size
528 };
529
530 if correction < 0 {
531 let src_start = (pos - n) * frame_size;
532 let dst_start = pos * frame_size;
533 let len = size * frame_size;
534 output[dst_start..dst_start + len]
535 .copy_from_slice(&read_buf[src_start..src_start + len]);
536 } else {
537 let src_start = pos * frame_size;
538 let dst_start = (pos - n) * frame_size;
539 let len = size * frame_size;
540 output[dst_start..dst_start + len]
541 .copy_from_slice(&read_buf[src_start..src_start + len]);
542 }
543 pos += size;
544 }
545
546 self.read_buf = read_buf;
547 ts
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 fn fmt() -> SampleFormat {
556 SampleFormat::new(48000, 16, 2)
557 }
558
559 fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
560 let bytes = frames as usize * format.frame_size() as usize;
561 let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
562 PcmChunk::new(Timeval { sec, usec }, data, format)
563 }
564
565 #[test]
566 fn pcm_chunk_duration() {
567 let f = fmt();
568 let chunk = make_chunk(0, 0, 480, f);
569 assert_eq!(chunk.duration_usec(), 10_000);
570 }
571
572 #[test]
573 fn pcm_chunk_read_frames() {
574 let f = fmt();
575 let mut chunk = make_chunk(0, 0, 100, f);
576 let mut buf = vec![0u8; 50 * f.frame_size() as usize];
577 let read = chunk.read_frames(&mut buf, 50);
578 assert_eq!(read, 50);
579 assert!(!chunk.is_end());
580 let read = chunk.read_frames(&mut buf, 50);
581 assert_eq!(read, 50);
582 assert!(chunk.is_end());
583 }
584
585 #[test]
586 fn pcm_chunk_seek() {
587 let f = fmt();
588 let mut chunk = make_chunk(0, 0, 100, f);
589 chunk.seek(90);
590 let mut buf = vec![0u8; 100 * f.frame_size() as usize];
591 let read = chunk.read_frames(&mut buf, 100);
592 assert_eq!(read, 10);
593 }
594
595 #[test]
596 fn stream_add_and_count() {
597 let f = fmt();
598 let mut stream = Stream::new(f);
599 assert_eq!(stream.chunk_count(), 0);
600 stream.add_chunk(make_chunk(100, 0, 480, f));
601 stream.add_chunk(make_chunk(100, 10_000, 480, f));
602 assert_eq!(stream.chunk_count(), 2);
603 }
604
605 #[test]
606 fn stream_clear() {
607 let f = fmt();
608 let mut stream = Stream::new(f);
609 stream.add_chunk(make_chunk(100, 0, 480, f));
610 stream.clear();
611 assert_eq!(stream.chunk_count(), 0);
612 }
613
614 #[test]
615 fn stream_silence_when_empty() {
616 let f = fmt();
617 let mut stream = Stream::new(f);
618 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
619 let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
620 assert!(!result);
621 }
622
623 #[test]
624 fn stream_hard_sync_plays_silence_when_too_early() {
625 let f = fmt();
626 let mut stream = Stream::new(f);
627 stream.set_buffer_ms(1000);
628 stream.add_chunk(make_chunk(100, 0, 4800, f));
629 let server_now = 100_000_000i64;
630 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
631 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
632 assert!(result);
633 assert!(buf.iter().all(|&b| b == 0));
634 }
635
636 #[test]
637 fn stream_hard_sync_plays_data_when_aligned() {
638 let f = fmt();
639 let mut stream = Stream::new(f);
640 stream.set_buffer_ms(1000);
641 stream.add_chunk(make_chunk(99, 0, 4800, f));
642 let server_now = 100_000_000i64;
643 let mut buf = vec![0u8; 480 * f.frame_size() as usize];
644 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
645 assert!(result);
646 assert!(buf.iter().any(|&b| b != 0));
647 }
648
649 #[test]
650 fn set_real_sample_rate_correction() {
651 let f = fmt();
652 let mut stream = Stream::new(f);
653 stream.set_real_sample_rate(48000.0);
654 assert_eq!(stream.correct_after_x_frames, 0);
655
656 stream.set_real_sample_rate(47999.0);
657 assert_ne!(stream.correct_after_x_frames, 0);
658 }
659
660 #[test]
661 fn read_with_correction_remove_one_frame() {
662 let f = fmt(); let mut stream = Stream::new(f);
664
665 let mut data = Vec::new();
666 for i in 0..10u16 {
667 data.extend_from_slice(&i.to_le_bytes());
668 data.extend_from_slice(&(i + 100).to_le_bytes());
669 }
670 stream.add_chunk(make_chunk(100, 0, 10, f));
671 stream.chunks.back_mut().unwrap().data = data;
672 stream.current = stream.chunks.pop_front();
673
674 let mut output = vec![0u8; 9 * f.frame_size() as usize];
675 let ts = stream.read_with_correction(&mut output, 9, 1);
676 assert!(ts.is_some());
677 assert_eq!(output.len(), 36);
678 for (i, chunk) in output.chunks(4).enumerate() {
679 let left = u16::from_le_bytes([chunk[0], chunk[1]]);
680 assert!(left <= 10, "frame {i}: left={left}");
681 }
682 }
683
684 #[test]
685 fn read_with_correction_zero_is_passthrough() {
686 let f = fmt();
687 let mut stream = Stream::new(f);
688 stream.add_chunk(make_chunk(100, 0, 100, f));
689 stream.current = stream.chunks.pop_front();
690
691 let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
692 stream.read_with_correction(&mut out1, 50, 0);
693
694 stream.add_chunk(make_chunk(100, 0, 100, f));
695 stream.current = stream.chunks.pop_front();
696
697 let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
698 stream.read_next(&mut out2, 50);
699
700 assert_eq!(out1, out2);
701 }
702}