1use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10#[derive(Debug, Clone)]
12pub struct PcmChunk {
13 pub timestamp: Timeval,
15 pub data: Vec<u8>,
17 pub format: SampleFormat,
19 read_pos: usize,
20}
21
22impl PcmChunk {
23 pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25 Self {
26 timestamp,
27 data,
28 format,
29 read_pos: 0,
30 }
31 }
32
33 pub fn start_usec(&self) -> i64 {
35 self.timestamp.to_usec()
36 }
37
38 pub fn duration_usec(&self) -> i64 {
40 if self.format.frame_size() == 0 || self.format.rate() == 0 {
41 return 0;
42 }
43 let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44 frames * 1_000_000 / self.format.rate() as i64
45 }
46
47 pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49 let frame_size = self.format.frame_size() as usize;
50 let available_bytes = self.data.len() - self.read_pos;
51 let available_frames = available_bytes / frame_size;
52 let to_read = (frames as usize).min(available_frames);
53 let bytes = to_read * frame_size;
54 output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55 self.read_pos += bytes;
56 to_read as u32
57 }
58
59 pub fn is_end(&self) -> bool {
61 self.read_pos >= self.data.len()
62 }
63
64 pub fn seek(&mut self, frames: u32) {
66 let bytes = frames as usize * self.format.frame_size() as usize;
67 self.read_pos = (self.read_pos + bytes).min(self.data.len());
68 }
69}
70
71const CORRECTION_BEGIN_USEC: i64 = 100;
73const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79const HARD_SYNC_AGE_USEC: i64 = 500_000;
81const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83const SOFT_SYNC_MIN_USEC: i64 = 50;
85const MAX_RATE_CORRECTION: f64 = 0.0005;
87const RATE_CORRECTION_SCALE: f64 = 0.00005;
89const MINI_BUFFER_SIZE: usize = 20;
91const SHORT_BUFFER_SIZE: usize = 100;
93const BUFFER_SIZE: usize = 500;
95const DEFAULT_BUFFER_MS: i64 = 1000;
97
98pub struct Stream {
100 format: SampleFormat,
101 chunks: VecDeque<PcmChunk>,
102 current: Option<PcmChunk>,
103 buffer_ms: i64,
104 hard_sync: bool,
105
106 mini_buffer: DoubleBuffer,
108 short_buffer: DoubleBuffer,
109 buffer: DoubleBuffer,
110 median: i64,
111 short_median: i64,
112
113 played_frames: u32,
115 correct_after_x_frames: i32,
116 frame_delta: i32,
117 read_buf: Vec<u8>,
118
119 last_log_sec: i64,
121}
122
123impl Stream {
124 pub fn new(format: SampleFormat) -> Self {
126 Self {
127 format,
128 chunks: VecDeque::new(),
129 current: None,
130 buffer_ms: DEFAULT_BUFFER_MS,
131 hard_sync: true,
132 mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133 short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134 buffer: DoubleBuffer::new(BUFFER_SIZE),
135 median: 0,
136 short_median: 0,
137 played_frames: 0,
138 correct_after_x_frames: 0,
139 frame_delta: 0,
140 read_buf: Vec::new(),
141 last_log_sec: 0,
142 }
143 }
144
145 pub fn format(&self) -> SampleFormat {
147 self.format
148 }
149
150 pub fn set_buffer_ms(&mut self, ms: i64) {
152 self.buffer_ms = ms;
153 }
154
155 pub fn add_chunk(&mut self, chunk: PcmChunk) {
157 self.chunks.push_back(chunk);
158 }
159
160 pub fn chunk_count(&self) -> usize {
162 self.chunks.len()
163 }
164
165 pub fn clear(&mut self) {
167 self.chunks.clear();
168 self.current = None;
169 self.hard_sync = true;
170 }
171
172 fn reset_buffers(&mut self) {
173 self.buffer.clear();
174 self.mini_buffer.clear();
175 self.short_buffer.clear();
176 }
177
178 fn update_buffers(&mut self, age: i64) {
179 self.buffer.add(age);
180 self.mini_buffer.add(age);
181 self.short_buffer.add(age);
182 }
183
184 fn set_real_sample_rate(&mut self, sample_rate: f64) {
185 let nominal = self.format.rate() as f64;
186 if (sample_rate - nominal).abs() < f64::EPSILON {
187 self.correct_after_x_frames = 0;
188 } else {
189 let ratio = nominal / sample_rate;
190 self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191 }
192 }
193
194 pub fn get_player_chunk(
196 &mut self,
197 server_now_usec: i64,
198 output_buffer_dac_time_usec: i64,
199 output: &mut [u8],
200 frames: u32,
201 ) -> bool {
202 let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203 if needs_new {
204 self.current = self.chunks.pop_front();
205 }
206 if self.current.is_none() {
207 return false;
208 }
209
210 if self.hard_sync {
212 let chunk = self.current.as_ref().unwrap();
213 let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215 + output_buffer_dac_time_usec;
216
217 if age_usec < -req_duration_usec {
218 self.get_silence(output, frames);
219 return true;
220 }
221
222 if age_usec > 0 {
223 self.current = None;
224 while let Some(mut c) = self.chunks.pop_front() {
225 let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226 + output_buffer_dac_time_usec;
227 if a > 0 && a < c.duration_usec() {
228 let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229 c.seek(skip);
230 self.current = Some(c);
231 break;
232 } else if a <= 0 {
233 self.current = Some(c);
234 break;
235 }
236 }
237 if self.current.is_none() {
238 return false;
239 }
240 }
241
242 let chunk = self.current.as_ref().unwrap();
243 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244 + output_buffer_dac_time_usec;
245
246 if age_usec <= 0 {
247 let silent_frames =
248 (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249 let silent_frames = silent_frames.min(frames);
250 let frame_size = self.format.frame_size() as usize;
251
252 if silent_frames > 0 {
253 output[..silent_frames as usize * frame_size].fill(0);
254 }
255 let remaining = frames - silent_frames;
256 if remaining > 0 {
257 let offset = silent_frames as usize * frame_size;
258 self.read_next(&mut output[offset..], remaining);
259 }
260 if silent_frames < frames {
261 self.hard_sync = false;
262 self.reset_buffers();
263 }
264 return true;
265 }
266 return false;
267 }
268
269 let mut frames_correction: i32 = 0;
273 if self.correct_after_x_frames != 0 {
274 self.played_frames += frames;
275 if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276 frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277 self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278 }
279 }
280
281 let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283 Some(ts) => ts,
284 None => return false,
285 };
286
287 let age_usec =
288 server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290 self.set_real_sample_rate(self.format.rate() as f64);
292
293 if self.buffer.full()
295 && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297 {
298 tracing::info!(
299 median = self.median,
300 "Hard sync: buffer full, |median| > 2ms"
301 );
302 self.hard_sync = true;
303 } else if self.short_buffer.full()
304 && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306 {
307 tracing::info!(
308 short_median = self.short_median,
309 "Hard sync: short buffer full, |short_median| > 5ms"
310 );
311 self.hard_sync = true;
312 } else if self.mini_buffer.full()
313 && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315 {
316 tracing::info!("Hard sync: mini buffer full, |mini_median| > 50ms");
317 self.hard_sync = true;
318 } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
319 tracing::info!(age_usec, "Hard sync: |age| > 500ms");
320 self.hard_sync = true;
321 } else if self.short_buffer.full() {
322 let mini_median = self.mini_buffer.median_simple();
324 if self.short_median > CORRECTION_BEGIN_USEC
325 && mini_median > SOFT_SYNC_MIN_USEC
326 && age_usec > SOFT_SYNC_MIN_USEC
327 {
328 let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
329 let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
330 self.set_real_sample_rate(self.format.rate() as f64 * rate);
331 } else if self.short_median < -CORRECTION_BEGIN_USEC
332 && mini_median < -SOFT_SYNC_MIN_USEC
333 && age_usec < -SOFT_SYNC_MIN_USEC
334 {
335 let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
336 let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
337 self.set_real_sample_rate(self.format.rate() as f64 * rate);
338 }
339 }
340
341 self.update_buffers(age_usec);
342
343 let now_sec = server_now_usec / 1_000_000;
345 if now_sec != self.last_log_sec {
346 self.last_log_sec = now_sec;
347 self.median = self.buffer.median_simple();
348 self.short_median = self.short_buffer.median_simple();
349 tracing::debug!(
350 target: "Stats",
351 "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
352 age_usec,
353 self.mini_buffer.median_simple(),
354 self.short_median,
355 self.median,
356 self.buffer.len(),
357 output_buffer_dac_time_usec / 1000,
358 self.frame_delta,
359 );
360 self.frame_delta = 0;
361 }
362
363 age_usec.abs() < 500_000
364 }
365
366 pub fn get_silence(&self, output: &mut [u8], frames: u32) {
368 let bytes = frames as usize * self.format.frame_size() as usize;
369 let len = bytes.min(output.len());
370 output[..len].fill(0);
371 }
372
373 pub fn get_player_chunk_or_silence(
375 &mut self,
376 server_now_usec: i64,
377 output_buffer_dac_time_usec: i64,
378 output: &mut [u8],
379 frames: u32,
380 ) -> bool {
381 let result =
382 self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
383 if !result {
384 self.get_silence(output, frames);
385 }
386 result
387 }
388
389 fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
390 let chunk = self.current.as_mut()?;
391 let frame_size = self.format.frame_size() as usize;
393 let consumed_frames = chunk.read_pos / frame_size;
394 let ts =
395 chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
396 let mut read = 0u32;
397 while read < frames {
398 let offset = read as usize * frame_size;
399 let n = chunk.read_frames(&mut output[offset..], frames - read);
400 read += n;
401 if read < frames && chunk.is_end() {
402 match self.chunks.pop_front() {
403 Some(next) => *chunk = next,
404 None => break,
405 }
406 }
407 }
408 Some(ts)
409 }
410
411 fn read_with_correction(
412 &mut self,
413 output: &mut [u8],
414 frames: u32,
415 correction: i32,
416 ) -> Option<i64> {
417 if correction == 0 {
418 return self.read_next(output, frames);
419 }
420
421 let correction = correction.max(-(frames as i32) + 1);
423
424 self.frame_delta -= correction;
425 let to_read = (frames as i32 + correction) as u32;
426 let frame_size = self.format.frame_size() as usize;
427
428 self.read_buf.resize(to_read as usize * frame_size, 0);
429 let mut read_buf = std::mem::take(&mut self.read_buf);
430 let ts = self.read_next(&mut read_buf, to_read);
431
432 let max = if correction < 0 {
433 frames as usize
434 } else {
435 to_read as usize
436 };
437 let slices = (correction.unsigned_abs() as usize + 1).min(max);
438 let slice_size = max / slices;
439
440 let mut pos = 0usize;
441 for n in 0..slices {
442 let size = if n + 1 == slices {
443 max - pos
444 } else {
445 slice_size
446 };
447
448 if correction < 0 {
449 let src_start = (pos - n) * frame_size;
450 let dst_start = pos * frame_size;
451 let len = size * frame_size;
452 output[dst_start..dst_start + len]
453 .copy_from_slice(&read_buf[src_start..src_start + len]);
454 } else {
455 let src_start = pos * frame_size;
456 let dst_start = (pos - n) * frame_size;
457 let len = size * frame_size;
458 output[dst_start..dst_start + len]
459 .copy_from_slice(&read_buf[src_start..src_start + len]);
460 }
461 pos += size;
462 }
463
464 self.read_buf = read_buf;
465 ts
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472
473 fn fmt() -> SampleFormat {
474 SampleFormat::new(48000, 16, 2)
475 }
476
477 fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
478 let bytes = frames as usize * format.frame_size() as usize;
479 let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
480 PcmChunk::new(Timeval { sec, usec }, data, format)
481 }
482
483 #[test]
484 fn pcm_chunk_duration() {
485 let f = fmt();
486 let chunk = make_chunk(0, 0, 480, f);
487 assert_eq!(chunk.duration_usec(), 10_000);
488 }
489
490 #[test]
491 fn pcm_chunk_read_frames() {
492 let f = fmt();
493 let mut chunk = make_chunk(0, 0, 100, f);
494 let mut buf = vec![0u8; 50 * f.frame_size() as usize];
495 let read = chunk.read_frames(&mut buf, 50);
496 assert_eq!(read, 50);
497 assert!(!chunk.is_end());
498 let read = chunk.read_frames(&mut buf, 50);
499 assert_eq!(read, 50);
500 assert!(chunk.is_end());
501 }
502
503 #[test]
504 fn pcm_chunk_seek() {
505 let f = fmt();
506 let mut chunk = make_chunk(0, 0, 100, f);
507 chunk.seek(90);
508 let mut buf = vec![0u8; 100 * f.frame_size() as usize];
509 let read = chunk.read_frames(&mut buf, 100);
510 assert_eq!(read, 10);
511 }
512
513 #[test]
514 fn stream_add_and_count() {
515 let f = fmt();
516 let mut stream = Stream::new(f);
517 assert_eq!(stream.chunk_count(), 0);
518 stream.add_chunk(make_chunk(100, 0, 480, f));
519 stream.add_chunk(make_chunk(100, 10_000, 480, f));
520 assert_eq!(stream.chunk_count(), 2);
521 }
522
523 #[test]
524 fn stream_clear() {
525 let f = fmt();
526 let mut stream = Stream::new(f);
527 stream.add_chunk(make_chunk(100, 0, 480, f));
528 stream.clear();
529 assert_eq!(stream.chunk_count(), 0);
530 }
531
532 #[test]
533 fn stream_silence_when_empty() {
534 let f = fmt();
535 let mut stream = Stream::new(f);
536 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
537 let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
538 assert!(!result);
539 }
540
541 #[test]
542 fn stream_hard_sync_plays_silence_when_too_early() {
543 let f = fmt();
544 let mut stream = Stream::new(f);
545 stream.set_buffer_ms(1000);
546 stream.add_chunk(make_chunk(100, 0, 4800, f));
547 let server_now = 100_000_000i64;
548 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
549 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
550 assert!(result);
551 assert!(buf.iter().all(|&b| b == 0));
552 }
553
554 #[test]
555 fn stream_hard_sync_plays_data_when_aligned() {
556 let f = fmt();
557 let mut stream = Stream::new(f);
558 stream.set_buffer_ms(1000);
559 stream.add_chunk(make_chunk(99, 0, 4800, f));
560 let server_now = 100_000_000i64;
561 let mut buf = vec![0u8; 480 * f.frame_size() as usize];
562 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
563 assert!(result);
564 assert!(buf.iter().any(|&b| b != 0));
565 }
566
567 #[test]
568 fn set_real_sample_rate_correction() {
569 let f = fmt();
570 let mut stream = Stream::new(f);
571 stream.set_real_sample_rate(48000.0);
572 assert_eq!(stream.correct_after_x_frames, 0);
573
574 stream.set_real_sample_rate(47999.0);
575 assert_ne!(stream.correct_after_x_frames, 0);
576 }
577
578 #[test]
579 fn read_with_correction_remove_one_frame() {
580 let f = fmt(); let mut stream = Stream::new(f);
582
583 let mut data = Vec::new();
584 for i in 0..10u16 {
585 data.extend_from_slice(&i.to_le_bytes());
586 data.extend_from_slice(&(i + 100).to_le_bytes());
587 }
588 stream.add_chunk(make_chunk(100, 0, 10, f));
589 stream.chunks.back_mut().unwrap().data = data;
590 stream.current = stream.chunks.pop_front();
591
592 let mut output = vec![0u8; 9 * f.frame_size() as usize];
593 let ts = stream.read_with_correction(&mut output, 9, 1);
594 assert!(ts.is_some());
595 assert_eq!(output.len(), 36);
596 for (i, chunk) in output.chunks(4).enumerate() {
597 let left = u16::from_le_bytes([chunk[0], chunk[1]]);
598 assert!(left <= 10, "frame {i}: left={left}");
599 }
600 }
601
602 #[test]
603 fn read_with_correction_zero_is_passthrough() {
604 let f = fmt();
605 let mut stream = Stream::new(f);
606 stream.add_chunk(make_chunk(100, 0, 100, f));
607 stream.current = stream.chunks.pop_front();
608
609 let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
610 stream.read_with_correction(&mut out1, 50, 0);
611
612 stream.add_chunk(make_chunk(100, 0, 100, f));
613 stream.current = stream.chunks.pop_front();
614
615 let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
616 stream.read_next(&mut out2, 50);
617
618 assert_eq!(out1, out2);
619 }
620}