1use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10#[derive(Debug, Clone)]
12pub struct PcmChunk {
13 pub timestamp: Timeval,
15 pub data: Vec<u8>,
17 pub format: SampleFormat,
19 read_pos: usize,
20}
21
22impl PcmChunk {
23 pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25 Self {
26 timestamp,
27 data,
28 format,
29 read_pos: 0,
30 }
31 }
32
33 pub fn start_usec(&self) -> i64 {
35 self.timestamp.to_usec()
36 }
37
38 pub fn duration_usec(&self) -> i64 {
40 if self.format.frame_size() == 0 || self.format.rate() == 0 {
41 return 0;
42 }
43 let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44 frames * 1_000_000 / self.format.rate() as i64
45 }
46
47 pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49 let frame_size = self.format.frame_size() as usize;
50 let available_bytes = self.data.len() - self.read_pos;
51 let available_frames = available_bytes / frame_size;
52 let to_read = (frames as usize).min(available_frames);
53 let bytes = to_read * frame_size;
54 output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55 self.read_pos += bytes;
56 to_read as u32
57 }
58
59 pub fn is_end(&self) -> bool {
61 self.read_pos >= self.data.len()
62 }
63
64 pub fn seek(&mut self, frames: u32) {
66 let bytes = frames as usize * self.format.frame_size() as usize;
67 self.read_pos = (self.read_pos + bytes).min(self.data.len());
68 }
69}
70
71const CORRECTION_BEGIN_USEC: i64 = 100;
73const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79const HARD_SYNC_AGE_USEC: i64 = 500_000;
81const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83const SOFT_SYNC_MIN_USEC: i64 = 50;
85const MAX_RATE_CORRECTION: f64 = 0.0005;
87const RATE_CORRECTION_SCALE: f64 = 0.00005;
89const MINI_BUFFER_SIZE: usize = 20;
91const SHORT_BUFFER_SIZE: usize = 100;
93const BUFFER_SIZE: usize = 500;
95const DEFAULT_BUFFER_MS: i64 = 1000;
97
98pub struct Stream {
129 format: SampleFormat,
131 chunks: VecDeque<PcmChunk>,
133 current: Option<PcmChunk>,
135 buffer_ms: i64,
137 hard_sync: bool,
139
140 mini_buffer: DoubleBuffer,
142 short_buffer: DoubleBuffer,
143 buffer: DoubleBuffer,
144 median: i64,
146 short_median: i64,
148
149 played_frames: u32,
152 correct_after_x_frames: i32,
154 frame_delta: i32,
156 read_buf: Vec<u8>,
158
159 last_log_sec: i64,
161}
162
163impl Stream {
164 pub fn new(format: SampleFormat) -> Self {
166 Self {
167 format,
168 chunks: VecDeque::new(),
169 current: None,
170 buffer_ms: DEFAULT_BUFFER_MS,
171 hard_sync: true,
172 mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
173 short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
174 buffer: DoubleBuffer::new(BUFFER_SIZE),
175 median: 0,
176 short_median: 0,
177 played_frames: 0,
178 correct_after_x_frames: 0,
179 frame_delta: 0,
180 read_buf: Vec::new(),
181 last_log_sec: 0,
182 }
183 }
184
185 pub fn format(&self) -> SampleFormat {
187 self.format
188 }
189
190 pub fn set_buffer_ms(&mut self, ms: i64) {
192 self.buffer_ms = ms;
193 }
194
195 pub fn add_chunk(&mut self, chunk: PcmChunk) {
197 self.chunks.push_back(chunk);
198 }
199
200 pub fn chunk_count(&self) -> usize {
202 self.chunks.len()
203 }
204
205 pub fn clear(&mut self) {
207 self.chunks.clear();
208 self.current = None;
209 self.hard_sync = true;
210 }
211
212 fn reset_buffers(&mut self) {
213 self.buffer.clear();
214 self.mini_buffer.clear();
215 self.short_buffer.clear();
216 }
217
218 fn update_buffers(&mut self, age: i64) {
219 self.buffer.add(age);
220 self.mini_buffer.add(age);
221 self.short_buffer.add(age);
222 }
223
224 fn set_real_sample_rate(&mut self, sample_rate: f64) {
225 let nominal = self.format.rate() as f64;
226 if (sample_rate - nominal).abs() < f64::EPSILON {
227 self.correct_after_x_frames = 0;
228 } else {
229 let ratio = nominal / sample_rate;
230 self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
231 }
232 }
233
234 pub fn get_player_chunk(
236 &mut self,
237 server_now_usec: i64,
238 output_buffer_dac_time_usec: i64,
239 output: &mut [u8],
240 frames: u32,
241 ) -> bool {
242 let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
243 if needs_new {
244 self.current = self.chunks.pop_front();
245 }
246 if self.current.is_none() {
247 return false;
248 }
249
250 if self.hard_sync {
252 let chunk = self.current.as_ref().unwrap();
253 let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
254 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
255 + output_buffer_dac_time_usec;
256
257 if age_usec < -req_duration_usec {
258 self.get_silence(output, frames);
259 return true;
260 }
261
262 if age_usec > 0 {
263 self.current = None;
264 while let Some(mut c) = self.chunks.pop_front() {
265 let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
266 + output_buffer_dac_time_usec;
267 if a > 0 && a < c.duration_usec() {
268 let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
269 c.seek(skip);
270 self.current = Some(c);
271 break;
272 } else if a <= 0 {
273 self.current = Some(c);
274 break;
275 }
276 }
277 if self.current.is_none() {
278 return false;
279 }
280 }
281
282 let chunk = self.current.as_ref().unwrap();
283 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
284 + output_buffer_dac_time_usec;
285
286 if age_usec <= 0 {
287 let silent_frames =
288 (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
289 let silent_frames = silent_frames.min(frames);
290 let frame_size = self.format.frame_size() as usize;
291
292 if silent_frames > 0 {
293 output[..silent_frames as usize * frame_size].fill(0);
294 }
295 let remaining = frames - silent_frames;
296 if remaining > 0 {
297 let offset = silent_frames as usize * frame_size;
298 self.read_next(&mut output[offset..], remaining);
299 }
300 if silent_frames < frames {
301 self.hard_sync = false;
302 self.reset_buffers();
303 }
304 return true;
305 }
306 return false;
307 }
308
309 let mut frames_correction: i32 = 0;
313 if self.correct_after_x_frames != 0 {
314 self.played_frames += frames;
315 if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
316 frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
317 self.played_frames %= self.correct_after_x_frames.unsigned_abs();
318 }
319 }
320
321 let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
323 Some(ts) => ts,
324 None => return false,
325 };
326
327 let age_usec =
328 server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
329
330 self.set_real_sample_rate(self.format.rate() as f64);
332
333 if self.buffer.full()
335 && self.median.abs() > HARD_SYNC_MEDIAN_USEC
336 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
337 {
338 tracing::info!(
339 median = self.median,
340 "Hard sync: buffer full, |median| > 2ms"
341 );
342 self.hard_sync = true;
343 } else if self.short_buffer.full()
344 && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
345 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
346 {
347 tracing::info!(
348 short_median = self.short_median,
349 "Hard sync: short buffer full, |short_median| > 5ms"
350 );
351 self.hard_sync = true;
352 } else if self.mini_buffer.full()
353 && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
354 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
355 {
356 tracing::info!(
357 age_usec,
358 mini_median = self.mini_buffer.median_simple(),
359 "Hard sync: mini buffer full, |mini_median| > 50ms"
360 );
361 self.hard_sync = true;
362 } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
363 tracing::info!(age_usec, "Hard sync: |age| > 500ms");
364 self.hard_sync = true;
365 } else if self.short_buffer.full() {
366 let mini_median = self.mini_buffer.median_simple();
368 if self.short_median > CORRECTION_BEGIN_USEC
369 && mini_median > SOFT_SYNC_MIN_USEC
370 && age_usec > SOFT_SYNC_MIN_USEC
371 {
372 let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
373 let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
374 self.set_real_sample_rate(self.format.rate() as f64 * rate);
375 } else if self.short_median < -CORRECTION_BEGIN_USEC
376 && mini_median < -SOFT_SYNC_MIN_USEC
377 && age_usec < -SOFT_SYNC_MIN_USEC
378 {
379 let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
380 let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
381 self.set_real_sample_rate(self.format.rate() as f64 * rate);
382 }
383 }
384
385 self.update_buffers(age_usec);
386
387 let now_sec = server_now_usec / 1_000_000;
389 if now_sec != self.last_log_sec {
390 self.last_log_sec = now_sec;
391 self.median = self.buffer.median_simple();
392 self.short_median = self.short_buffer.median_simple();
393 tracing::debug!(
394 target: "Stats",
395 "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
396 age_usec,
397 self.mini_buffer.median_simple(),
398 self.short_median,
399 self.median,
400 self.buffer.len(),
401 output_buffer_dac_time_usec / 1000,
402 self.frame_delta,
403 );
404 self.frame_delta = 0;
405 }
406
407 age_usec.abs() < 500_000
408 }
409
410 pub fn get_silence(&self, output: &mut [u8], frames: u32) {
412 let bytes = frames as usize * self.format.frame_size() as usize;
413 let len = bytes.min(output.len());
414 output[..len].fill(0);
415 }
416
417 pub fn get_player_chunk_or_silence(
419 &mut self,
420 server_now_usec: i64,
421 output_buffer_dac_time_usec: i64,
422 output: &mut [u8],
423 frames: u32,
424 ) -> bool {
425 let result =
426 self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
427 if !result {
428 self.get_silence(output, frames);
429 }
430 result
431 }
432
433 fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
434 let chunk = self.current.as_mut()?;
435 let frame_size = self.format.frame_size() as usize;
437 let consumed_frames = chunk.read_pos / frame_size;
438 let ts =
439 chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
440 let mut read = 0u32;
441 while read < frames {
442 let offset = read as usize * frame_size;
443 let n = chunk.read_frames(&mut output[offset..], frames - read);
444 read += n;
445 if read < frames && chunk.is_end() {
446 match self.chunks.pop_front() {
447 Some(next) => *chunk = next,
448 None => break,
449 }
450 }
451 }
452 Some(ts)
453 }
454
455 fn read_with_correction(
456 &mut self,
457 output: &mut [u8],
458 frames: u32,
459 correction: i32,
460 ) -> Option<i64> {
461 if correction == 0 {
462 return self.read_next(output, frames);
463 }
464
465 let correction = correction.max(-(frames as i32) + 1);
467
468 self.frame_delta -= correction;
469 let to_read = (frames as i32 + correction) as u32;
470 let frame_size = self.format.frame_size() as usize;
471
472 self.read_buf.resize(to_read as usize * frame_size, 0);
473 let mut read_buf = std::mem::take(&mut self.read_buf);
474 let ts = self.read_next(&mut read_buf, to_read);
475
476 let max = if correction < 0 {
477 frames as usize
478 } else {
479 to_read as usize
480 };
481 let slices = (correction.unsigned_abs() as usize + 1).min(max);
482 let slice_size = max / slices;
483
484 let mut pos = 0usize;
485 for n in 0..slices {
486 let size = if n + 1 == slices {
487 max - pos
488 } else {
489 slice_size
490 };
491
492 if correction < 0 {
493 let src_start = (pos - n) * frame_size;
494 let dst_start = pos * frame_size;
495 let len = size * frame_size;
496 output[dst_start..dst_start + len]
497 .copy_from_slice(&read_buf[src_start..src_start + len]);
498 } else {
499 let src_start = pos * frame_size;
500 let dst_start = (pos - n) * frame_size;
501 let len = size * frame_size;
502 output[dst_start..dst_start + len]
503 .copy_from_slice(&read_buf[src_start..src_start + len]);
504 }
505 pos += size;
506 }
507
508 self.read_buf = read_buf;
509 ts
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 fn fmt() -> SampleFormat {
518 SampleFormat::new(48000, 16, 2)
519 }
520
521 fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
522 let bytes = frames as usize * format.frame_size() as usize;
523 let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
524 PcmChunk::new(Timeval { sec, usec }, data, format)
525 }
526
527 #[test]
528 fn pcm_chunk_duration() {
529 let f = fmt();
530 let chunk = make_chunk(0, 0, 480, f);
531 assert_eq!(chunk.duration_usec(), 10_000);
532 }
533
534 #[test]
535 fn pcm_chunk_read_frames() {
536 let f = fmt();
537 let mut chunk = make_chunk(0, 0, 100, f);
538 let mut buf = vec![0u8; 50 * f.frame_size() as usize];
539 let read = chunk.read_frames(&mut buf, 50);
540 assert_eq!(read, 50);
541 assert!(!chunk.is_end());
542 let read = chunk.read_frames(&mut buf, 50);
543 assert_eq!(read, 50);
544 assert!(chunk.is_end());
545 }
546
547 #[test]
548 fn pcm_chunk_seek() {
549 let f = fmt();
550 let mut chunk = make_chunk(0, 0, 100, f);
551 chunk.seek(90);
552 let mut buf = vec![0u8; 100 * f.frame_size() as usize];
553 let read = chunk.read_frames(&mut buf, 100);
554 assert_eq!(read, 10);
555 }
556
557 #[test]
558 fn stream_add_and_count() {
559 let f = fmt();
560 let mut stream = Stream::new(f);
561 assert_eq!(stream.chunk_count(), 0);
562 stream.add_chunk(make_chunk(100, 0, 480, f));
563 stream.add_chunk(make_chunk(100, 10_000, 480, f));
564 assert_eq!(stream.chunk_count(), 2);
565 }
566
567 #[test]
568 fn stream_clear() {
569 let f = fmt();
570 let mut stream = Stream::new(f);
571 stream.add_chunk(make_chunk(100, 0, 480, f));
572 stream.clear();
573 assert_eq!(stream.chunk_count(), 0);
574 }
575
576 #[test]
577 fn stream_silence_when_empty() {
578 let f = fmt();
579 let mut stream = Stream::new(f);
580 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
581 let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
582 assert!(!result);
583 }
584
585 #[test]
586 fn stream_hard_sync_plays_silence_when_too_early() {
587 let f = fmt();
588 let mut stream = Stream::new(f);
589 stream.set_buffer_ms(1000);
590 stream.add_chunk(make_chunk(100, 0, 4800, f));
591 let server_now = 100_000_000i64;
592 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
593 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
594 assert!(result);
595 assert!(buf.iter().all(|&b| b == 0));
596 }
597
598 #[test]
599 fn stream_hard_sync_plays_data_when_aligned() {
600 let f = fmt();
601 let mut stream = Stream::new(f);
602 stream.set_buffer_ms(1000);
603 stream.add_chunk(make_chunk(99, 0, 4800, f));
604 let server_now = 100_000_000i64;
605 let mut buf = vec![0u8; 480 * f.frame_size() as usize];
606 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
607 assert!(result);
608 assert!(buf.iter().any(|&b| b != 0));
609 }
610
611 #[test]
612 fn set_real_sample_rate_correction() {
613 let f = fmt();
614 let mut stream = Stream::new(f);
615 stream.set_real_sample_rate(48000.0);
616 assert_eq!(stream.correct_after_x_frames, 0);
617
618 stream.set_real_sample_rate(47999.0);
619 assert_ne!(stream.correct_after_x_frames, 0);
620 }
621
622 #[test]
623 fn read_with_correction_remove_one_frame() {
624 let f = fmt(); let mut stream = Stream::new(f);
626
627 let mut data = Vec::new();
628 for i in 0..10u16 {
629 data.extend_from_slice(&i.to_le_bytes());
630 data.extend_from_slice(&(i + 100).to_le_bytes());
631 }
632 stream.add_chunk(make_chunk(100, 0, 10, f));
633 stream.chunks.back_mut().unwrap().data = data;
634 stream.current = stream.chunks.pop_front();
635
636 let mut output = vec![0u8; 9 * f.frame_size() as usize];
637 let ts = stream.read_with_correction(&mut output, 9, 1);
638 assert!(ts.is_some());
639 assert_eq!(output.len(), 36);
640 for (i, chunk) in output.chunks(4).enumerate() {
641 let left = u16::from_le_bytes([chunk[0], chunk[1]]);
642 assert!(left <= 10, "frame {i}: left={left}");
643 }
644 }
645
646 #[test]
647 fn read_with_correction_zero_is_passthrough() {
648 let f = fmt();
649 let mut stream = Stream::new(f);
650 stream.add_chunk(make_chunk(100, 0, 100, f));
651 stream.current = stream.chunks.pop_front();
652
653 let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
654 stream.read_with_correction(&mut out1, 50, 0);
655
656 stream.add_chunk(make_chunk(100, 0, 100, f));
657 stream.current = stream.chunks.pop_front();
658
659 let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
660 stream.read_next(&mut out2, 50);
661
662 assert_eq!(out1, out2);
663 }
664}