1use std::collections::VecDeque;
4
5use snapcast_proto::SampleFormat;
6use snapcast_proto::types::Timeval;
7
8use crate::double_buffer::DoubleBuffer;
9
10#[derive(Debug, Clone)]
12pub struct PcmChunk {
13 pub timestamp: Timeval,
15 pub data: Vec<u8>,
17 pub format: SampleFormat,
19 read_pos: usize,
20}
21
22impl PcmChunk {
23 pub fn new(timestamp: Timeval, data: Vec<u8>, format: SampleFormat) -> Self {
25 Self {
26 timestamp,
27 data,
28 format,
29 read_pos: 0,
30 }
31 }
32
33 pub fn start_usec(&self) -> i64 {
35 self.timestamp.to_usec()
36 }
37
38 pub fn duration_usec(&self) -> i64 {
40 if self.format.frame_size() == 0 || self.format.rate() == 0 {
41 return 0;
42 }
43 let frames = self.data.len() as i64 / self.format.frame_size() as i64;
44 frames * 1_000_000 / self.format.rate() as i64
45 }
46
47 pub fn read_frames(&mut self, output: &mut [u8], frames: u32) -> u32 {
49 let frame_size = self.format.frame_size() as usize;
50 let available_bytes = self.data.len() - self.read_pos;
51 let available_frames = available_bytes / frame_size;
52 let to_read = (frames as usize).min(available_frames);
53 let bytes = to_read * frame_size;
54 output[..bytes].copy_from_slice(&self.data[self.read_pos..self.read_pos + bytes]);
55 self.read_pos += bytes;
56 to_read as u32
57 }
58
59 pub fn is_end(&self) -> bool {
61 self.read_pos >= self.data.len()
62 }
63
64 pub fn seek(&mut self, frames: u32) {
66 let bytes = frames as usize * self.format.frame_size() as usize;
67 self.read_pos = (self.read_pos + bytes).min(self.data.len());
68 }
69}
70
71const CORRECTION_BEGIN_USEC: i64 = 100;
73const HARD_SYNC_MEDIAN_USEC: i64 = 2000;
75const HARD_SYNC_SHORT_MEDIAN_USEC: i64 = 5000;
77const HARD_SYNC_MINI_MEDIAN_USEC: i64 = 50000;
79const HARD_SYNC_AGE_USEC: i64 = 500_000;
81const HARD_SYNC_MIN_AGE_USEC: i64 = 500;
83const SOFT_SYNC_MIN_USEC: i64 = 50;
85const MAX_RATE_CORRECTION: f64 = 0.0005;
87const RATE_CORRECTION_SCALE: f64 = 0.00005;
89const MINI_BUFFER_SIZE: usize = 20;
91const SHORT_BUFFER_SIZE: usize = 100;
93const BUFFER_SIZE: usize = 500;
95const DEFAULT_BUFFER_MS: i64 = 1000;
97
98pub struct Stream {
100 format: SampleFormat,
101 chunks: VecDeque<PcmChunk>,
102 current: Option<PcmChunk>,
103 buffer_ms: i64,
104 hard_sync: bool,
105
106 mini_buffer: DoubleBuffer,
108 short_buffer: DoubleBuffer,
109 buffer: DoubleBuffer,
110 median: i64,
111 short_median: i64,
112
113 played_frames: u32,
115 correct_after_x_frames: i32,
116 frame_delta: i32,
117 read_buf: Vec<u8>,
118
119 last_log_sec: i64,
121}
122
123impl Stream {
124 pub fn new(format: SampleFormat) -> Self {
126 Self {
127 format,
128 chunks: VecDeque::new(),
129 current: None,
130 buffer_ms: DEFAULT_BUFFER_MS,
131 hard_sync: true,
132 mini_buffer: DoubleBuffer::new(MINI_BUFFER_SIZE),
133 short_buffer: DoubleBuffer::new(SHORT_BUFFER_SIZE),
134 buffer: DoubleBuffer::new(BUFFER_SIZE),
135 median: 0,
136 short_median: 0,
137 played_frames: 0,
138 correct_after_x_frames: 0,
139 frame_delta: 0,
140 read_buf: Vec::new(),
141 last_log_sec: 0,
142 }
143 }
144
145 pub fn format(&self) -> SampleFormat {
147 self.format
148 }
149
150 pub fn set_buffer_ms(&mut self, ms: i64) {
152 self.buffer_ms = ms;
153 }
154
155 pub fn add_chunk(&mut self, chunk: PcmChunk) {
157 self.chunks.push_back(chunk);
158 }
159
160 pub fn chunk_count(&self) -> usize {
162 self.chunks.len()
163 }
164
165 pub fn clear(&mut self) {
167 self.chunks.clear();
168 self.current = None;
169 self.hard_sync = true;
170 }
171
172 fn reset_buffers(&mut self) {
173 self.buffer.clear();
174 self.mini_buffer.clear();
175 self.short_buffer.clear();
176 }
177
178 fn update_buffers(&mut self, age: i64) {
179 self.buffer.add(age);
180 self.mini_buffer.add(age);
181 self.short_buffer.add(age);
182 }
183
184 fn set_real_sample_rate(&mut self, sample_rate: f64) {
185 let nominal = self.format.rate() as f64;
186 if (sample_rate - nominal).abs() < f64::EPSILON {
187 self.correct_after_x_frames = 0;
188 } else {
189 let ratio = nominal / sample_rate;
190 self.correct_after_x_frames = (ratio / (ratio - 1.0)).round() as i32;
191 }
192 }
193
194 pub fn get_player_chunk(
196 &mut self,
197 server_now_usec: i64,
198 output_buffer_dac_time_usec: i64,
199 output: &mut [u8],
200 frames: u32,
201 ) -> bool {
202 let needs_new = self.current.as_ref().is_none_or(|c| c.is_end());
203 if needs_new {
204 self.current = self.chunks.pop_front();
205 }
206 if self.current.is_none() {
207 return false;
208 }
209
210 if self.hard_sync {
212 let chunk = self.current.as_ref().unwrap();
213 let req_duration_usec = (frames as i64 * 1_000_000) / self.format.rate() as i64;
214 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
215 + output_buffer_dac_time_usec;
216
217 if age_usec < -req_duration_usec {
218 self.get_silence(output, frames);
219 return true;
220 }
221
222 if age_usec > 0 {
223 self.current = None;
224 while let Some(mut c) = self.chunks.pop_front() {
225 let a = server_now_usec - c.start_usec() - self.buffer_ms * 1000
226 + output_buffer_dac_time_usec;
227 if a > 0 && a < c.duration_usec() {
228 let skip = (self.format.rate() as f64 * a as f64 / 1_000_000.0) as u32;
229 c.seek(skip);
230 self.current = Some(c);
231 break;
232 } else if a <= 0 {
233 self.current = Some(c);
234 break;
235 }
236 }
237 if self.current.is_none() {
238 return false;
239 }
240 }
241
242 let chunk = self.current.as_ref().unwrap();
243 let age_usec = server_now_usec - chunk.start_usec() - self.buffer_ms * 1000
244 + output_buffer_dac_time_usec;
245
246 if age_usec <= 0 {
247 let silent_frames =
248 (self.format.rate() as f64 * (-age_usec) as f64 / 1_000_000.0) as u32;
249 let silent_frames = silent_frames.min(frames);
250 let frame_size = self.format.frame_size() as usize;
251
252 if silent_frames > 0 {
253 output[..silent_frames as usize * frame_size].fill(0);
254 }
255 let remaining = frames - silent_frames;
256 if remaining > 0 {
257 let offset = silent_frames as usize * frame_size;
258 self.read_next(&mut output[offset..], remaining);
259 }
260 if silent_frames < frames {
261 self.hard_sync = false;
262 self.reset_buffers();
263 }
264 return true;
265 }
266 return false;
267 }
268
269 let mut frames_correction: i32 = 0;
273 if self.correct_after_x_frames != 0 {
274 self.played_frames += frames;
275 if self.played_frames >= self.correct_after_x_frames.unsigned_abs() {
276 frames_correction = self.played_frames as i32 / self.correct_after_x_frames;
277 self.played_frames %= self.correct_after_x_frames.unsigned_abs();
278 }
279 }
280
281 let chunk_start = match self.read_with_correction(output, frames, frames_correction) {
283 Some(ts) => ts,
284 None => return false,
285 };
286
287 let age_usec =
288 server_now_usec - chunk_start - self.buffer_ms * 1000 + output_buffer_dac_time_usec;
289
290 self.set_real_sample_rate(self.format.rate() as f64);
292
293 if self.buffer.full()
295 && self.median.abs() > HARD_SYNC_MEDIAN_USEC
296 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
297 {
298 tracing::info!(
299 median = self.median,
300 "Hard sync: buffer full, |median| > 2ms"
301 );
302 self.hard_sync = true;
303 } else if self.short_buffer.full()
304 && self.short_median.abs() > HARD_SYNC_SHORT_MEDIAN_USEC
305 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
306 {
307 tracing::info!(
308 short_median = self.short_median,
309 "Hard sync: short buffer full, |short_median| > 5ms"
310 );
311 self.hard_sync = true;
312 } else if self.mini_buffer.full()
313 && self.mini_buffer.median_simple().abs() > HARD_SYNC_MINI_MEDIAN_USEC
314 && age_usec.abs() > HARD_SYNC_MIN_AGE_USEC
315 {
316 tracing::info!("Hard sync: mini buffer full, |mini_median| > 50ms");
317 self.hard_sync = true;
318 } else if age_usec.abs() > HARD_SYNC_AGE_USEC {
319 tracing::info!(age_usec, "Hard sync: |age| > 500ms");
320 self.hard_sync = true;
321 } else if self.short_buffer.full() {
322 let mini_median = self.mini_buffer.median_simple();
324 if self.short_median > CORRECTION_BEGIN_USEC
325 && mini_median > SOFT_SYNC_MIN_USEC
326 && age_usec > SOFT_SYNC_MIN_USEC
327 {
328 let rate = (self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
329 let rate = 1.0 - rate.min(MAX_RATE_CORRECTION);
330 self.set_real_sample_rate(self.format.rate() as f64 * rate);
331 } else if self.short_median < -CORRECTION_BEGIN_USEC
332 && mini_median < -SOFT_SYNC_MIN_USEC
333 && age_usec < -SOFT_SYNC_MIN_USEC
334 {
335 let rate = (-self.short_median as f64 / 100.0) * RATE_CORRECTION_SCALE;
336 let rate = 1.0 + rate.min(MAX_RATE_CORRECTION);
337 self.set_real_sample_rate(self.format.rate() as f64 * rate);
338 }
339 }
340
341 self.update_buffers(age_usec);
342
343 let now_sec = server_now_usec / 1_000_000;
345 if now_sec != self.last_log_sec {
346 self.last_log_sec = now_sec;
347 self.median = self.buffer.median_simple();
348 self.short_median = self.short_buffer.median_simple();
349 tracing::debug!(
350 target: "Stats",
351 "Chunk: {}\t{}\t{}\t{}\t{}\t{}\t{}",
352 age_usec,
353 self.mini_buffer.median_simple(),
354 self.short_median,
355 self.median,
356 self.buffer.len(),
357 output_buffer_dac_time_usec / 1000,
358 self.frame_delta,
359 );
360 self.frame_delta = 0;
361 }
362
363 age_usec.abs() < 500_000
364 }
365
366 pub fn get_silence(&self, output: &mut [u8], frames: u32) {
368 let bytes = frames as usize * self.format.frame_size() as usize;
369 output[..bytes].fill(0);
370 }
371
372 pub fn get_player_chunk_or_silence(
374 &mut self,
375 server_now_usec: i64,
376 output_buffer_dac_time_usec: i64,
377 output: &mut [u8],
378 frames: u32,
379 ) -> bool {
380 let result =
381 self.get_player_chunk(server_now_usec, output_buffer_dac_time_usec, output, frames);
382 if !result {
383 self.get_silence(output, frames);
384 }
385 result
386 }
387
388 fn read_next(&mut self, output: &mut [u8], frames: u32) -> Option<i64> {
389 let chunk = self.current.as_mut()?;
390 let frame_size = self.format.frame_size() as usize;
392 let consumed_frames = chunk.read_pos / frame_size;
393 let ts =
394 chunk.start_usec() + consumed_frames as i64 * 1_000_000 / self.format.rate() as i64;
395 let mut read = 0u32;
396 while read < frames {
397 let offset = read as usize * frame_size;
398 let n = chunk.read_frames(&mut output[offset..], frames - read);
399 read += n;
400 if read < frames && chunk.is_end() {
401 match self.chunks.pop_front() {
402 Some(next) => *chunk = next,
403 None => break,
404 }
405 }
406 }
407 Some(ts)
408 }
409
410 fn read_with_correction(
411 &mut self,
412 output: &mut [u8],
413 frames: u32,
414 correction: i32,
415 ) -> Option<i64> {
416 if correction == 0 {
417 return self.read_next(output, frames);
418 }
419
420 let correction = correction.max(-(frames as i32) + 1);
422
423 self.frame_delta -= correction;
424 let to_read = (frames as i32 + correction) as u32;
425 let frame_size = self.format.frame_size() as usize;
426
427 self.read_buf.resize(to_read as usize * frame_size, 0);
428 let mut read_buf = std::mem::take(&mut self.read_buf);
429 let ts = self.read_next(&mut read_buf, to_read);
430
431 let max = if correction < 0 {
432 frames as usize
433 } else {
434 to_read as usize
435 };
436 let slices = (correction.unsigned_abs() as usize + 1).min(max);
437 let slice_size = max / slices;
438
439 let mut pos = 0usize;
440 for n in 0..slices {
441 let size = if n + 1 == slices {
442 max - pos
443 } else {
444 slice_size
445 };
446
447 if correction < 0 {
448 let src_start = (pos - n) * frame_size;
449 let dst_start = pos * frame_size;
450 let len = size * frame_size;
451 output[dst_start..dst_start + len]
452 .copy_from_slice(&read_buf[src_start..src_start + len]);
453 } else {
454 let src_start = pos * frame_size;
455 let dst_start = (pos - n) * frame_size;
456 let len = size * frame_size;
457 output[dst_start..dst_start + len]
458 .copy_from_slice(&read_buf[src_start..src_start + len]);
459 }
460 pos += size;
461 }
462
463 self.read_buf = read_buf;
464 ts
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 fn fmt() -> SampleFormat {
473 SampleFormat::new(48000, 16, 2)
474 }
475
476 fn make_chunk(sec: i32, usec: i32, frames: u32, format: SampleFormat) -> PcmChunk {
477 let bytes = frames as usize * format.frame_size() as usize;
478 let data: Vec<u8> = (0..bytes).map(|i| (i % 256) as u8).collect();
479 PcmChunk::new(Timeval { sec, usec }, data, format)
480 }
481
482 #[test]
483 fn pcm_chunk_duration() {
484 let f = fmt();
485 let chunk = make_chunk(0, 0, 480, f);
486 assert_eq!(chunk.duration_usec(), 10_000);
487 }
488
489 #[test]
490 fn pcm_chunk_read_frames() {
491 let f = fmt();
492 let mut chunk = make_chunk(0, 0, 100, f);
493 let mut buf = vec![0u8; 50 * f.frame_size() as usize];
494 let read = chunk.read_frames(&mut buf, 50);
495 assert_eq!(read, 50);
496 assert!(!chunk.is_end());
497 let read = chunk.read_frames(&mut buf, 50);
498 assert_eq!(read, 50);
499 assert!(chunk.is_end());
500 }
501
502 #[test]
503 fn pcm_chunk_seek() {
504 let f = fmt();
505 let mut chunk = make_chunk(0, 0, 100, f);
506 chunk.seek(90);
507 let mut buf = vec![0u8; 100 * f.frame_size() as usize];
508 let read = chunk.read_frames(&mut buf, 100);
509 assert_eq!(read, 10);
510 }
511
512 #[test]
513 fn stream_add_and_count() {
514 let f = fmt();
515 let mut stream = Stream::new(f);
516 assert_eq!(stream.chunk_count(), 0);
517 stream.add_chunk(make_chunk(100, 0, 480, f));
518 stream.add_chunk(make_chunk(100, 10_000, 480, f));
519 assert_eq!(stream.chunk_count(), 2);
520 }
521
522 #[test]
523 fn stream_clear() {
524 let f = fmt();
525 let mut stream = Stream::new(f);
526 stream.add_chunk(make_chunk(100, 0, 480, f));
527 stream.clear();
528 assert_eq!(stream.chunk_count(), 0);
529 }
530
531 #[test]
532 fn stream_silence_when_empty() {
533 let f = fmt();
534 let mut stream = Stream::new(f);
535 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
536 let result = stream.get_player_chunk(100_000_000, 0, &mut buf, 480);
537 assert!(!result);
538 }
539
540 #[test]
541 fn stream_hard_sync_plays_silence_when_too_early() {
542 let f = fmt();
543 let mut stream = Stream::new(f);
544 stream.set_buffer_ms(1000);
545 stream.add_chunk(make_chunk(100, 0, 4800, f));
546 let server_now = 100_000_000i64;
547 let mut buf = vec![0xFFu8; 480 * f.frame_size() as usize];
548 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
549 assert!(result);
550 assert!(buf.iter().all(|&b| b == 0));
551 }
552
553 #[test]
554 fn stream_hard_sync_plays_data_when_aligned() {
555 let f = fmt();
556 let mut stream = Stream::new(f);
557 stream.set_buffer_ms(1000);
558 stream.add_chunk(make_chunk(99, 0, 4800, f));
559 let server_now = 100_000_000i64;
560 let mut buf = vec![0u8; 480 * f.frame_size() as usize];
561 let result = stream.get_player_chunk(server_now, 0, &mut buf, 480);
562 assert!(result);
563 assert!(buf.iter().any(|&b| b != 0));
564 }
565
566 #[test]
567 fn set_real_sample_rate_correction() {
568 let f = fmt();
569 let mut stream = Stream::new(f);
570 stream.set_real_sample_rate(48000.0);
571 assert_eq!(stream.correct_after_x_frames, 0);
572
573 stream.set_real_sample_rate(47999.0);
574 assert_ne!(stream.correct_after_x_frames, 0);
575 }
576
577 #[test]
578 fn read_with_correction_remove_one_frame() {
579 let f = fmt(); let mut stream = Stream::new(f);
581
582 let mut data = Vec::new();
583 for i in 0..10u16 {
584 data.extend_from_slice(&i.to_le_bytes());
585 data.extend_from_slice(&(i + 100).to_le_bytes());
586 }
587 stream.add_chunk(make_chunk(100, 0, 10, f));
588 stream.chunks.back_mut().unwrap().data = data;
589 stream.current = stream.chunks.pop_front();
590
591 let mut output = vec![0u8; 9 * f.frame_size() as usize];
592 let ts = stream.read_with_correction(&mut output, 9, 1);
593 assert!(ts.is_some());
594 assert_eq!(output.len(), 36);
595 for (i, chunk) in output.chunks(4).enumerate() {
596 let left = u16::from_le_bytes([chunk[0], chunk[1]]);
597 assert!(left <= 10, "frame {i}: left={left}");
598 }
599 }
600
601 #[test]
602 fn read_with_correction_zero_is_passthrough() {
603 let f = fmt();
604 let mut stream = Stream::new(f);
605 stream.add_chunk(make_chunk(100, 0, 100, f));
606 stream.current = stream.chunks.pop_front();
607
608 let mut out1 = vec![0u8; 50 * f.frame_size() as usize];
609 stream.read_with_correction(&mut out1, 50, 0);
610
611 stream.add_chunk(make_chunk(100, 0, 100, f));
612 stream.current = stream.chunks.pop_front();
613
614 let mut out2 = vec![0u8; 50 * f.frame_size() as usize];
615 stream.read_next(&mut out2, 50);
616
617 assert_eq!(out1, out2);
618 }
619}