1use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10#[derive(Debug, Clone)]
12pub struct PcmChunk {
13 pub timestamp: Timeval,
15 pub data: Vec<u8>,
17 pub format: SampleFormat,
19 read_pos: usize,
20}
21
22impl PcmChunk {
23 pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25 Self {
26 timestamp,
27 data,
28 format,
29 read_pos: 0,
30 }
31 }
32
33 pub fn start_usec(&self) -> i64 {
35 self.timestamp.to_usec()
36 }
37
38 pub fn duration_usec(&self) -> i64 {
40 if self.format.frame_size() == 0 || self.format.rate() == 0 {
41 return 0;
42 }
43 let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44 frames * 1_000_000 / self.format.rate() as i64
45 }
46
47 pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49 let frame_size = self.format.frame_size() as usize;
50 let available_bytes = self.data.len() - self.read_pos;
51 let available_frames = available_bytes / frame_size;
52 let to_read = (frames as usize).min(available_frames);
53 let bytes = to_read * frame_size;
54 output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55 self.read_pos += bytes;
56 to_read as u32
57 }
58
59 pub fn is_end(&self) -> bool {
61 self.read_pos >= self.data.len()
62 }
63
64 pub fn seek(&mut self, frames: u32) {
66 let bytes = frames as usize * self.format.frame_size() as usize;
67 self.read_pos = (self.read_pos + bytes).min(self.data.len());
68 }
69}
70
71const CORRECTION_BEGIN_USEC: i64 = 100;
73const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79const HARD_SYNC_AGE_USEC: i64 = 500_000;
81const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83const SOFT_SYNC_MIN_USEC: i64 = 50;
85const MAX_RATE_CORRECTION: f64 = 0.0005;
87const RATE_CORRECTION_SCALE: f64 = 0.00005;
89const MINI_BUFFER_SIZE: usize = 20;
91const SHORT_BUFFER_SIZE: usize = 100;
93const BUFFER_SIZE: usize = 500;
95const DEFAULT_BUFFER_MS: i64 = 1000;
97
98pub struct Stream {
100 format: SampleFormat,
101 chunks: VecDeque<PcmChunk>,
102 current: Option<PcmChunk>,
103 buffer_ms: i64,
104 hard_sync: bool,
105
106 mini_buffer: DoubleBuffer,
108 short_buffer: DoubleBuffer,
109 buffer: DoubleBuffer,
110 median: i64,
111 short_median: i64,
112
113 played_frames: u32,
115 correct_after_x_frames: i32,
116 frame_delta: i32,
117 read_buf: Vec<u8>,
118
119 last_log_sec: i64,
121}
122
123impl Stream {
124 pub fn new(format: SampleFormat) -> Self {
126 Self {
127 format,
128 chunks: VecDeque::new(),
129 current: None,
130 buffer_ms: DEFAULT_BUFFER_MS,
131 hard_sync: true,
132 mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133 short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134 buffer: DoubleBuffer::new(BUFFER_SIZE),
135 median: 0,
136 short_median: 0,
137 played_frames: 0,
138 correct_after_x_frames: 0,
139 frame_delta: 0,
140 read_buf: Vec::new(),
141 last_log_sec: 0,
142 }
143 }
144
145 pub fn format(&self) -> SampleFormat {
147 self.format
148 }
149
150 pub fn set_buffer_ms(&mut self, ms: i64) {
152 self.buffer_ms = ms;
153 }
154
155 pub fn add_chunk(&mut self, chunk: PcmChunk) {
157 self.chunks.push_back(chunk);
158 }
159
160 pub fn chunk_count(&self) -> usize {
162 self.chunks.len()
163 }
164
165 pub fn clear(&mut self) {
167 self.chunks.clear();
168 self.current = None;
169 self.hard_sync = true;
170 }
171
172 fn reset_buffers(&mut self) {
173 self.buffer.clear();
174 self.mini_buffer.clear();
175 self.short_buffer.clear();
176 }
177
178 fn update_buffers(&mut self, age: i64) {
179 self.buffer.add(age);
180 self.mini_buffer.add(age);
181 self.short_buffer.add(age);
182 }
183
184 fn set_real_sample_rate(&mut self, sample_rate: f64) {
185 let nominal = self.format.rate() as f64;
186 if (sample_rate - nominal).abs() < f64::EPSILON {
187 self.correct_after_x_frames = 0;
188 } else {
189 let ratio = nominal / sample_rate;
190 self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191 }
192 }
193
194 pub fn get_player_chunk(
196 &mut self,
197 server_now_usec: i64,
198 output_buffer_dac_time_usec: i64,
199 output: &mut [u8],
200 frames: u32,
201 ) -> bool {
202 let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203 if needs_new {
204 self.current = self.chunks.pop_front();
205 }
206 if self.current.is_none() {
207 return false;
208 }
209
210 if self.hard_sync {
212 let chunk = self.current.as_ref().unwrap();
213 let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215 + output_buffer_dac_time_usec;
216
217 if age_usec < -req_duration_usec {
218 self.get_silence(output, frames);
219 return true;
220 }
221
222 if age_usec > 0 {
223 self.current = None;
224 while let Some(mut c) = self.chunks.pop_front() {
225 let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226 + output_buffer_dac_time_usec;
227 if a > 0 && a < c.duration_usec() {
228 let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229 c.seek(skip);
230 self.current = Some(c);
231 break;
232 } else if a <= 0 {
233 self.current = Some(c);
234 break;
235 }
236 }
237 if self.current.is_none() {
238 return false;
239 }
240 }
241
242 let chunk = self.current.as_ref().unwrap();
243 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244 + output_buffer_dac_time_usec;
245
246 if age_usec <= 0 {
247 let silent_frames =
248 (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249 let silent_frames = silent_frames.min(frames);
250 let frame_size = self.format.frame_size() as usize;
251
252 if silent_frames > 0 {
253 output[..silent_frames as usize * frame_size].fill(0);
254 }
255 let remaining = frames - silent_frames;
256 if remaining > 0 {
257 let offset = silent_frames as usize * frame_size;
258 self.read_next(&mut output[offset..], remaining);
259 }
260 if silent_frames < frames {
261 self.hard_sync = false;
262 self.reset_buffers();
263 }
264 return true;
265 }
266 return false;
267 }
268
269 let mut frames_correction: i32 = 0;
273 if self.correct_after_x_frames != 0 {
274 self.played_frames += frames;
275 if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276 frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277 self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278 }
279 }
280
281 let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283 Some(ts) => ts,
284 None => return false,
285 };
286
287 let age_usec =
288 server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290 self.set_real_sample_rate(self.format.rate() as f64);
292
293 if self.buffer.full()
295 && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297 {
298 tracing::info!(
299 median = self.median,
300 "Hard sync: buffer full, |median| > 2ms"
301 );
302 self.hard_sync = true;
303 } else if self.short_buffer.full()
304 && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306 {
307 tracing::info!(
308 short_median = self.short_median,
309 "Hard sync: short buffer full, |short_median| > 5ms"
310 );
311 self.hard_sync = true;
312 } else if self.mini_buffer.full()
313 && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315 {
316 tracing::info!(
317 age_usec,
318 mini_median = self.mini_buffer.median_simple(),
319 "Hard sync: mini buffer full, |mini_median| > 50ms"
320 );
321 self.hard_sync = true;
322 } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
323 tracing::info!(age_usec, "Hard sync: |age| > 500ms");
324 self.hard_sync = true;
325 } else if self.short_buffer.full() {
326 let mini_median = self.mini_buffer.median_simple();
328 if self.short_median > CORRECTION_BEGIN_USEC
329 && mini_median > SOFT_SYNC_MIN_USEC
330 && age_usec > SOFT_SYNC_MIN_USEC
331 {
332 let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
333 let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
334 self.set_real_sample_rate(self.format.rate() as f64 * rate);
335 } else if self.short_median < -CORRECTION_BEGIN_USEC
336 && mini_median < -SOFT_SYNC_MIN_USEC
337 && age_usec < -SOFT_SYNC_MIN_USEC
338 {
339 let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
340 let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
341 self.set_real_sample_rate(self.format.rate() as f64 * rate);
342 }
343 }
344
345 self.update_buffers(age_usec);
346
347 let now_sec = server_now_usec / 1_000_000;
349 if now_sec != self.last_log_sec {
350 self.last_log_sec = now_sec;
351 self.median = self.buffer.median_simple();
352 self.short_median = self.short_buffer.median_simple();
353 tracing::debug!(
354 target: "Stats",
355 "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
356 age_usec,
357 self.mini_buffer.median_simple(),
358 self.short_median,
359 self.median,
360 self.buffer.len(),
361 output_buffer_dac_time_usec / 1000,
362 self.frame_delta,
363 );
364 self.frame_delta = 0;
365 }
366
367 age_usec.abs() < 500_000
368 }
369
370 pub fn get_silence(&self, output: &mut [u8], frames: u32) {
372 let bytes = frames as usize * self.format.frame_size() as usize;
373 let len = bytes.min(output.len());
374 output[..len].fill(0);
375 }
376
377 pub fn get_player_chunk_or_silence(
379 &mut self,
380 server_now_usec: i64,
381 output_buffer_dac_time_usec: i64,
382 output: &mut [u8],
383 frames: u32,
384 ) -> bool {
385 let result =
386 self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
387 if !result {
388 self.get_silence(output, frames);
389 }
390 result
391 }
392
393 fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
394 let chunk = self.current.as_mut()?;
395 let frame_size = self.format.frame_size() as usize;
397 let consumed_frames = chunk.read_pos / frame_size;
398 let ts =
399 chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
400 let mut read = 0u32;
401 while read < frames {
402 let offset = read as usize * frame_size;
403 let n = chunk.read_frames(&mut output[offset..], frames - read);
404 read += n;
405 if read < frames && chunk.is_end() {
406 match self.chunks.pop_front() {
407 Some(next) => *chunk = next,
408 None => break,
409 }
410 }
411 }
412 Some(ts)
413 }
414
415 fn read_with_correction(
416 &mut self,
417 output: &mut [u8],
418 frames: u32,
419 correction: i32,
420 ) -> Option<i64> {
421 if correction == 0 {
422 return self.read_next(output, frames);
423 }
424
425 let correction = correction.max(-(frames as i32) + 1);
427
428 self.frame_delta -= correction;
429 let to_read = (frames as i32 + correction) as u32;
430 let frame_size = self.format.frame_size() as usize;
431
432 self.read_buf.resize(to_read as usize * frame_size, 0);
433 let mut read_buf = std::mem::take(&mut self.read_buf);
434 let ts = self.read_next(&mut read_buf, to_read);
435
436 let max = if correction < 0 {
437 frames as usize
438 } else {
439 to_read as usize
440 };
441 let slices = (correction.unsigned_abs() as usize + 1).min(max);
442 let slice_size = max / slices;
443
444 let mut pos = 0usize;
445 for n in 0..slices {
446 let size = if n + 1 == slices {
447 max - pos
448 } else {
449 slice_size
450 };
451
452 if correction < 0 {
453 let src_start = (pos - n) * frame_size;
454 let dst_start = pos * frame_size;
455 let len = size * frame_size;
456 output[dst_start..dst_start + len]
457 .copy_from_slice(&read_buf[src_start..src_start + len]);
458 } else {
459 let src_start = pos * frame_size;
460 let dst_start = (pos - n) * frame_size;
461 let len = size * frame_size;
462 output[dst_start..dst_start + len]
463 .copy_from_slice(&read_buf[src_start..src_start + len]);
464 }
465 pos += size;
466 }
467
468 self.read_buf = read_buf;
469 ts
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 fn fmt() -> SampleFormat {
478 SampleFormat::new(48000, 16, 2)
479 }
480
481 fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
482 let bytes = frames as usize * format.frame_size() as usize;
483 let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
484 PcmChunk::new(Timeval { sec, usec }, data, format)
485 }
486
487 #[test]
488 fn pcm_chunk_duration() {
489 let f = fmt();
490 let chunk = make_chunk(0, 0, 480, f);
491 assert_eq!(chunk.duration_usec(), 10_000);
492 }
493
494 #[test]
495 fn pcm_chunk_read_frames() {
496 let f = fmt();
497 let mut chunk = make_chunk(0, 0, 100, f);
498 let mut buf = vec![0u8; 50 * f.frame_size() as usize];
499 let read = chunk.read_frames(&mut buf, 50);
500 assert_eq!(read, 50);
501 assert!(!chunk.is_end());
502 let read = chunk.read_frames(&mut buf, 50);
503 assert_eq!(read, 50);
504 assert!(chunk.is_end());
505 }
506
507 #[test]
508 fn pcm_chunk_seek() {
509 let f = fmt();
510 let mut chunk = make_chunk(0, 0, 100, f);
511 chunk.seek(90);
512 let mut buf = vec![0u8; 100 * f.frame_size() as usize];
513 let read = chunk.read_frames(&mut buf, 100);
514 assert_eq!(read, 10);
515 }
516
517 #[test]
518 fn stream_add_and_count() {
519 let f = fmt();
520 let mut stream = Stream::new(f);
521 assert_eq!(stream.chunk_count(), 0);
522 stream.add_chunk(make_chunk(100, 0, 480, f));
523 stream.add_chunk(make_chunk(100, 10_000, 480, f));
524 assert_eq!(stream.chunk_count(), 2);
525 }
526
527 #[test]
528 fn stream_clear() {
529 let f = fmt();
530 let mut stream = Stream::new(f);
531 stream.add_chunk(make_chunk(100, 0, 480, f));
532 stream.clear();
533 assert_eq!(stream.chunk_count(), 0);
534 }
535
536 #[test]
537 fn stream_silence_when_empty() {
538 let f = fmt();
539 let mut stream = Stream::new(f);
540 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
541 let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
542 assert!(!result);
543 }
544
545 #[test]
546 fn stream_hard_sync_plays_silence_when_too_early() {
547 let f = fmt();
548 let mut stream = Stream::new(f);
549 stream.set_buffer_ms(1000);
550 stream.add_chunk(make_chunk(100, 0, 4800, f));
551 let server_now = 100_000_000i64;
552 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
553 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
554 assert!(result);
555 assert!(buf.iter().all(|&b| b == 0));
556 }
557
558 #[test]
559 fn stream_hard_sync_plays_data_when_aligned() {
560 let f = fmt();
561 let mut stream = Stream::new(f);
562 stream.set_buffer_ms(1000);
563 stream.add_chunk(make_chunk(99, 0, 4800, f));
564 let server_now = 100_000_000i64;
565 let mut buf = vec![0u8; 480 * f.frame_size() as usize];
566 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
567 assert!(result);
568 assert!(buf.iter().any(|&b| b != 0));
569 }
570
571 #[test]
572 fn set_real_sample_rate_correction() {
573 let f = fmt();
574 let mut stream = Stream::new(f);
575 stream.set_real_sample_rate(48000.0);
576 assert_eq!(stream.correct_after_x_frames, 0);
577
578 stream.set_real_sample_rate(47999.0);
579 assert_ne!(stream.correct_after_x_frames, 0);
580 }
581
582 #[test]
583 fn read_with_correction_remove_one_frame() {
584 let f = fmt(); let mut stream = Stream::new(f);
586
587 let mut data = Vec::new();
588 for i in 0..10u16 {
589 data.extend_from_slice(&i.to_le_bytes());
590 data.extend_from_slice(&(i + 100).to_le_bytes());
591 }
592 stream.add_chunk(make_chunk(100, 0, 10, f));
593 stream.chunks.back_mut().unwrap().data = data;
594 stream.current = stream.chunks.pop_front();
595
596 let mut output = vec![0u8; 9 * f.frame_size() as usize];
597 let ts = stream.read_with_correction(&mut output, 9, 1);
598 assert!(ts.is_some());
599 assert_eq!(output.len(), 36);
600 for (i, chunk) in output.chunks(4).enumerate() {
601 let left = u16::from_le_bytes([chunk[0], chunk[1]]);
602 assert!(left <= 10, "frame {i}: left={left}");
603 }
604 }
605
606 #[test]
607 fn read_with_correction_zero_is_passthrough() {
608 let f = fmt();
609 let mut stream = Stream::new(f);
610 stream.add_chunk(make_chunk(100, 0, 100, f));
611 stream.current = stream.chunks.pop_front();
612
613 let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
614 stream.read_with_correction(&mut out1, 50, 0);
615
616 stream.add_chunk(make_chunk(100, 0, 100, f));
617 stream.current = stream.chunks.pop_front();
618
619 let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
620 stream.read_next(&mut out2, 50);
621
622 assert_eq!(out1, out2);
623 }
624}