1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19 container::info::Info,
20 dsp::effects::AudioEffect,
21 playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27 Init,
28 Resuming,
29 Playing,
30 Pausing,
31 Paused,
32 Stopping,
33 Stopped,
34 Finished,
35}
36
37#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40 pub enabled: bool,
41 pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49 exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53 fn new(exists: Arc<AtomicBool>) -> Self {
54 exists.store(true, Ordering::Relaxed);
55 Self { exists }
56 }
57}
58
59impl Drop for PlaybackThreadGuard {
60 fn drop(&mut self) {
61 self.exists.store(false, Ordering::Relaxed);
62 }
63}
64
65#[derive(Clone)]
70pub struct Player {
71 pub info: Info,
72 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73 pub ts: Arc<Mutex<f64>>,
74 state: Arc<Mutex<PlayerState>>,
75 abort: Arc<AtomicBool>,
76 playback_thread_exists: Arc<AtomicBool>,
77 playback_id: Arc<AtomicU64>,
78 duration: Arc<Mutex<f64>>,
79 prot: Arc<Mutex<Prot>>,
80 audio_heard: Arc<AtomicBool>,
81 volume: Arc<Mutex<f32>>,
82 sink: Arc<Mutex<Sink>>,
83 reporter: Option<Arc<Mutex<Reporter>>>,
84 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85 effects: Arc<Mutex<Vec<AudioEffect>>>,
86 dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87 effects_reset: Arc<AtomicU64>,
88 output_meter: Arc<Mutex<OutputMeter>>,
89 buffering_done: Arc<AtomicBool>,
90 last_chunk_ms: Arc<AtomicU64>,
91 last_time_update_ms: Arc<AtomicU64>,
92 impulse_response_override: Option<ImpulseResponseSpec>,
93 impulse_response_tail_override: Option<f32>,
94}
95
96impl Player {
97 pub fn new(file_path: &String) -> Self {
99 let this = Self::new_from_path_or_paths(Some(file_path), None);
100 this
101 }
102
103 pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
105 let this = Self::new_from_path_or_paths(None, Some(file_paths));
106 this
107 }
108
109 pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
111 let this = Self::new_from_path_or_paths(
112 None,
113 Some(
114 file_paths
115 .into_iter()
116 .map(|paths| PathsTrack::new_from_file_paths(paths))
117 .collect(),
118 ),
119 );
120 this
121 }
122
123 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
125 let (prot, info) = match path {
126 Some(path) => {
127 let prot = Arc::new(Mutex::new(Prot::new(path)));
128 let info = Info::new(path.clone());
129 (prot, info)
130 }
131 None => {
132 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
133 let locked_prot = prot.lock().unwrap();
134 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
135 drop(locked_prot);
136 (prot, info)
137 }
138 };
139
140 let (sink, _queue) = Sink::new();
141 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
142
143 let channels = info.channels as usize;
144 let sample_rate = info.sample_rate;
145 let effects = {
146 let prot_locked = prot.lock().unwrap();
147 match prot_locked.get_effects() {
148 Some(effects) => Arc::new(Mutex::new(effects)),
149 None => Arc::new(Mutex::new(vec![])),
150 }
151 };
152
153 let mut this = Self {
154 info,
155 finished_tracks: Arc::new(Mutex::new(Vec::new())),
156 state: Arc::new(Mutex::new(PlayerState::Stopped)),
157 abort: Arc::new(AtomicBool::new(false)),
158 ts: Arc::new(Mutex::new(0.0)),
159 playback_thread_exists: Arc::new(AtomicBool::new(true)),
160 playback_id: Arc::new(AtomicU64::new(0)),
161 duration: Arc::new(Mutex::new(0.0)),
162 audio_heard: Arc::new(AtomicBool::new(false)),
163 volume: Arc::new(Mutex::new(0.8)),
164 sink,
165 prot,
166 reporter: None,
167 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
168 effects,
169 dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
170 effects_reset: Arc::new(AtomicU64::new(0)),
171 output_meter: Arc::new(Mutex::new(OutputMeter::new(
172 channels,
173 sample_rate,
174 OUTPUT_METER_REFRESH_HZ,
175 ))),
176 buffering_done: Arc::new(AtomicBool::new(false)),
177 last_chunk_ms: Arc::new(AtomicU64::new(0)),
178 last_time_update_ms: Arc::new(AtomicU64::new(0)),
179 impulse_response_override: None,
180 impulse_response_tail_override: None,
181 };
182
183 this.initialize_thread(None);
184
185 this
186 }
187
188 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
190 self.impulse_response_override = Some(spec.clone());
191 let mut prot = self.prot.lock().unwrap();
192 prot.set_impulse_response_spec(spec);
193 self.request_effects_reset();
194 }
195
196 pub fn set_impulse_response_from_string(&mut self, value: &str) {
198 if let Some(spec) = parse_impulse_response_string(value) {
199 self.set_impulse_response_spec(spec);
200 }
201 }
202
203 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
205 self.impulse_response_tail_override = Some(tail_db);
206 let mut prot = self.prot.lock().unwrap();
207 prot.set_impulse_response_tail_db(tail_db);
208 self.request_effects_reset();
209 }
210
211 pub fn set_reverb_enabled(&self, enabled: bool) {
213 let mut effects = self.effects.lock().unwrap();
214 if let Some(effect) = effects
215 .iter_mut()
216 .find_map(|effect| effect.as_convolution_reverb_mut())
217 {
218 effect.enabled = enabled;
219 }
220 if let Some(effect) = effects
221 .iter_mut()
222 .find_map(|effect| effect.as_delay_reverb_mut())
223 {
224 effect.enabled = enabled;
225 }
226 }
227
228 pub fn set_reverb_mix(&self, dry_wet: f32) {
230 let mut effects = self.effects.lock().unwrap();
231 if let Some(effect) = effects
232 .iter_mut()
233 .find_map(|effect| effect.as_convolution_reverb_mut())
234 {
235 effect.dry_wet = dry_wet.clamp(0.0, 1.0);
236 }
237 if let Some(effect) = effects
238 .iter_mut()
239 .find_map(|effect| effect.as_delay_reverb_mut())
240 {
241 effect.mix = dry_wet.clamp(0.0, 1.0);
242 }
243 if let Some(effect) = effects
244 .iter_mut()
245 .find_map(|effect| effect.as_diffusion_reverb_mut())
246 {
247 effect.mix = dry_wet.clamp(0.0, 1.0);
248 }
249 }
250
251 pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
253 let effects = self.effects.lock().unwrap();
254 if let Some(effect) = effects
255 .iter()
256 .find_map(|effect| effect.as_convolution_reverb())
257 {
258 return ReverbSettingsSnapshot {
259 enabled: effect.enabled,
260 dry_wet: effect.dry_wet,
261 };
262 }
263 if let Some(effect) = effects
264 .iter()
265 .find_map(|effect| effect.as_diffusion_reverb())
266 {
267 return ReverbSettingsSnapshot {
268 enabled: effect.enabled,
269 dry_wet: effect.mix,
270 };
271 }
272 if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
273 return ReverbSettingsSnapshot {
274 enabled: effect.enabled,
275 dry_wet: effect.mix,
276 };
277 }
278 ReverbSettingsSnapshot {
279 enabled: false,
280 dry_wet: 0.0,
281 }
282 }
283
284 #[allow(deprecated)]
286 pub fn get_effect_names(&self) -> Vec<String> {
287 let effects = self.effects.lock().unwrap();
288 effects
289 .iter()
290 .map(|effect| match effect {
291 AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
292 AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
293 AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
294 AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
295 AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
296 AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
297 AudioEffect::Distortion(_) => "Distortion".to_string(),
298 AudioEffect::Gain(_) => "Gain".to_string(),
299 AudioEffect::Compressor(_) => "Compressor".to_string(),
300 AudioEffect::Limiter(_) => "Limiter".to_string(),
301 })
302 .collect()
303 }
304
305 pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
307 {
308 let mut guard = self.effects.lock().unwrap();
309 println!("New Effects: {:?}", effects);
310 *guard = effects;
311 }
312 self.request_effects_reset();
313
314 if !self.thread_finished() {
317 let ts = self.get_time();
318 self.seek(ts);
319 }
320 }
321
322 pub fn get_dsp_metrics(&self) -> DspChainMetrics {
324 *self.dsp_metrics.lock().unwrap()
325 }
326
327 pub fn get_levels(&self) -> Vec<f32> {
329 self.output_meter.lock().unwrap().levels()
330 }
331
332 pub fn get_levels_db(&self) -> Vec<f32> {
334 self.output_meter
335 .lock()
336 .unwrap()
337 .levels()
338 .into_iter()
339 .map(linear_to_dbfs)
340 .collect()
341 }
342
343 pub fn get_levels_avg(&self) -> Vec<f32> {
345 self.output_meter.lock().unwrap().averages()
346 }
347
348 pub fn set_output_meter_refresh_hz(&self, hz: f32) {
350 self.output_meter.lock().unwrap().set_refresh_hz(hz);
351 }
352
353 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
355 (
356 self.playback_thread_exists.load(Ordering::SeqCst),
357 *self.state.lock().unwrap(),
358 self.audio_heard.load(Ordering::Relaxed),
359 )
360 }
361
362 pub fn debug_buffering_done(&self) -> bool {
364 self.buffering_done.load(Ordering::Relaxed)
365 }
366
367 pub fn debug_timing_ms(&self) -> (u64, u64) {
369 (
370 self.last_chunk_ms.load(Ordering::Relaxed),
371 self.last_time_update_ms.load(Ordering::Relaxed),
372 )
373 }
374
375 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
377 let sink = self.sink.lock().unwrap();
378 let paused = sink.is_paused();
379 let empty = sink.empty();
380 let len = sink.len();
381 (paused, empty, len)
382 }
383
384 fn request_effects_reset(&self) {
385 self.effects_reset.fetch_add(1, Ordering::SeqCst);
386 }
387
388 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
390 let mut settings = self.buffer_settings.lock().unwrap();
391 settings.start_buffer_ms = start_buffer_ms.max(0.0);
392 }
393
394 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
396 let mut settings = self.buffer_settings.lock().unwrap();
397 settings.track_eos_ms = track_eos_ms.max(0.0);
398 }
399
400 pub fn set_start_sink_chunks(&self, chunks: usize) {
402 let mut settings = self.buffer_settings.lock().unwrap();
403 settings.start_sink_chunks = chunks;
404 }
405
406 pub fn set_startup_silence_ms(&self, ms: f32) {
408 let mut settings = self.buffer_settings.lock().unwrap();
409 settings.startup_silence_ms = ms.max(0.0);
410 }
411
412 pub fn set_startup_fade_ms(&self, ms: f32) {
414 let mut settings = self.buffer_settings.lock().unwrap();
415 settings.startup_fade_ms = ms.max(0.0);
416 }
417
418 pub fn set_append_jitter_log_ms(&self, ms: f32) {
420 let mut settings = self.buffer_settings.lock().unwrap();
421 settings.append_jitter_log_ms = ms.max(0.0);
422 }
423
424 pub fn set_effect_boundary_log(&self, enabled: bool) {
426 let mut settings = self.buffer_settings.lock().unwrap();
427 settings.effect_boundary_log = enabled;
428 }
429
430 fn initialize_thread(&mut self, ts: Option<f64>) {
431 let mut finished_tracks = self.finished_tracks.lock().unwrap();
433 finished_tracks.clear();
434 drop(finished_tracks);
435
436 self.abort = Arc::new(AtomicBool::new(false));
439 self.playback_thread_exists.store(true, Ordering::SeqCst);
440 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
441 self.buffering_done.store(false, Ordering::SeqCst);
442 let now_ms_value = now_ms();
443 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
444 self.last_time_update_ms
445 .store(now_ms_value, Ordering::Relaxed);
446
447 let play_state = self.state.clone();
449 let abort = self.abort.clone();
450 let playback_thread_exists = self.playback_thread_exists.clone();
451 let playback_id_atomic = self.playback_id.clone();
452 let time_passed = self.ts.clone();
453
454 let duration = self.duration.clone();
455 let prot = self.prot.clone();
456 let buffer_settings = self.buffer_settings.clone();
457 let buffer_settings_for_state = self.buffer_settings.clone();
458 let effects = self.effects.clone();
459 let dsp_metrics = self.dsp_metrics.clone();
460 let dsp_metrics_for_sink = self.dsp_metrics.clone();
461 let effects_reset = self.effects_reset.clone();
462 let output_meter = self.output_meter.clone();
463 let audio_info = self.info.clone();
464
465 let audio_heard = self.audio_heard.clone();
466 let volume = self.volume.clone();
467 let sink_mutex = self.sink.clone();
468 let buffer_done_thread_flag = self.buffering_done.clone();
469 let last_chunk_ms = self.last_chunk_ms.clone();
470 let last_time_update_ms = self.last_time_update_ms.clone();
471
472 audio_heard.store(false, Ordering::Relaxed);
473
474 {
475 let mut meter = self.output_meter.lock().unwrap();
476 meter.reset();
477 }
478
479 thread::spawn(move || {
481 let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
485
486 let start_time = match ts {
490 Some(ts) => ts,
491 None => 0.0,
492 };
493 let mut engine = PlayerEngine::new(
494 prot,
495 Some(abort.clone()),
496 start_time,
497 buffer_settings,
498 effects,
499 dsp_metrics,
500 effects_reset,
501 );
502 let mut stream = None;
503 for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
504 match OutputStreamBuilder::open_default_stream() {
505 Ok(s) => {
506 stream = Some(s);
507 break;
508 }
509 Err(err) => {
510 if attempt == OUTPUT_STREAM_OPEN_RETRIES {
511 error!(
512 "failed to open default output stream after {} attempts: {}",
513 OUTPUT_STREAM_OPEN_RETRIES, err
514 );
515 return;
516 }
517 warn!(
518 "open_default_stream attempt {}/{} failed: {}",
519 attempt, OUTPUT_STREAM_OPEN_RETRIES, err
520 );
521 thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
522 }
523 }
524 }
525 let stream = stream.expect("stream should exist after successful retry loop");
526 let mixer = stream.mixer().clone();
527
528 let mut sink = sink_mutex.lock().unwrap();
529 *sink = Sink::connect_new(&mixer);
530 sink.pause();
531 sink.set_volume(*volume.lock().unwrap());
532 drop(sink);
533
534 let mut duration = duration.lock().unwrap();
538 *duration = engine.get_duration();
539 drop(duration);
540
541 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
545 let mut time_passed_unlocked = time_passed.lock().unwrap();
546 *time_passed_unlocked = start_time;
547 drop(time_passed_unlocked);
548
549 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
550 let timestamp = *time_passed.lock().unwrap();
551
552 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
553 while sink.volume() > 0.0 && timestamp != start_time {
555 sink.set_volume(sink.volume() - fade_increments);
556 thread::sleep(Duration::from_millis(10));
557 }
558 sink.pause();
559 };
560
561 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
562 let volume = *volume.lock().unwrap();
563 if fade_length_in_seconds <= 0.0 {
564 sink.play();
565 sink.set_volume(volume);
566 return;
567 }
568 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
569 sink.play();
571 while sink.volume() < volume {
572 sink.set_volume(sink.volume() + fade_increments);
573 thread::sleep(Duration::from_millis(5));
574 }
575 };
576
577 {
581 let startup_settings = buffer_settings_for_state.lock().unwrap();
582 let startup_silence_ms = startup_settings.startup_silence_ms;
583 drop(startup_settings);
584
585 let sample_rate = audio_info.sample_rate as u32;
586 let channels = audio_info.channels as u16;
587
588 if startup_silence_ms > 0.0 {
589 let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
590 as usize
591 * channels as usize;
592 let silence = vec![0.0_f32; samples.max(1)];
593 let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
594 let sink = sink_mutex.lock().unwrap();
595 sink.append(silence_buffer);
596 drop(sink);
597 }
598 }
599
600 let startup_fade_pending = Cell::new(true);
604 let check_details = || {
605 if abort.load(Ordering::SeqCst) {
606 let sink = sink_mutex.lock().unwrap();
607 pause_sink(&sink, 0.1);
608 sink.clear();
609 drop(sink);
610
611 return false;
612 }
613
614 let sink = sink_mutex.lock().unwrap();
615 let state = play_state.lock().unwrap().clone();
616 let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
617 if state == PlayerState::Resuming
618 && start_sink_chunks > 0
619 && sink.len() < start_sink_chunks
620 {
621 sink.pause();
623 drop(sink);
624 return true;
625 }
626 if state == PlayerState::Pausing {
627 pause_sink(&sink, 0.1);
628 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
629 }
630 if state == PlayerState::Resuming {
631 let fade_length = if startup_fade_pending.replace(false) {
632 let startup_fade_ms =
633 buffer_settings_for_state.lock().unwrap().startup_fade_ms;
634 (startup_fade_ms / 1000.0).max(0.0)
635 } else {
636 0.1
637 };
638 resume_sink(&sink, fade_length);
639 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
640 }
641 drop(sink);
642
643 true
644 };
645
646 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
650 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
651 let buffering_done = Arc::new(AtomicBool::new(false));
652 let buffering_done_flag = buffer_done_thread_flag.clone();
653 let final_duration = Arc::new(Mutex::new(None::<f64>));
654 let mut timer = timer_mut.lock().unwrap();
655 timer.start();
656 drop(timer);
657
658 let last_meter_time = Cell::new(0.0_f64);
659 let update_chunk_lengths = || {
660 if abort.load(Ordering::SeqCst) {
661 return;
662 }
663
664 let mut chunk_lengths = chunk_lengths.lock().unwrap();
665 let mut time_passed_unlocked = time_passed.lock().unwrap();
666 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
667 let mut timer = timer_mut.lock().unwrap();
668 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
669 let sink = sink_mutex.lock().unwrap();
670 if !buffering_done.load(Ordering::Relaxed) {
671 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
674
675 for _ in 0..chunks_played {
676 timer.reset();
677 timer.start();
678 *time_chunks_passed += chunk_lengths.remove(0);
679 }
680 }
681
682 if sink.is_paused() {
683 timer.pause();
684 } else {
685 timer.un_pause();
686 }
687
688 let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
689 let delta = (current_audio_time - last_meter_time.get()).max(0.0);
690 last_meter_time.set(current_audio_time);
691 {
692 let mut meter = output_meter.lock().unwrap();
693 meter.advance(delta);
694 }
695
696 *time_passed_unlocked = current_audio_time;
697
698 drop(sink);
699 drop(chunk_lengths);
700 drop(time_passed_unlocked);
701 drop(time_chunks_passed);
702 drop(timer);
703 };
704
705 let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
709 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
710 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
711 return;
712 }
713 let (delay_ms, late) = {
714 let mut timing = append_timing.lock().unwrap();
715 let now = Instant::now();
716 let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
717 let chunk_ms = length_in_seconds * 1000.0;
718 let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
719 if late {
720 timing.2 = timing.2.saturating_add(1);
721 }
722 timing.1 = if timing.1 == 0.0 {
723 delta_ms
724 } else {
725 (timing.1 * 0.9) + (delta_ms * 0.1)
726 };
727 timing.3 = timing.3.max(delta_ms);
728 timing.0 = now;
729 (delta_ms, late)
730 };
731 audio_heard.store(true, Ordering::Relaxed);
732 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
733
734 {
735 let mut meter = output_meter.lock().unwrap();
736 meter.push_samples(&mixer);
737 }
738 {
739 let mut metrics = dsp_metrics_for_sink.lock().unwrap();
740 metrics.append_delay_ms = delay_ms;
741 metrics.avg_append_delay_ms = {
742 if metrics.avg_append_delay_ms == 0.0 {
743 delay_ms
744 } else {
745 (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
746 }
747 };
748 metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
749 metrics.late_append_count = {
750 let timing = append_timing.lock().unwrap();
751 timing.2
752 };
753 metrics.late_append_active = late;
754 }
755
756 let sink = sink_mutex.lock().unwrap();
757 let append_jitter_log_ms = {
758 let settings = buffer_settings_for_state.lock().unwrap();
759 settings.append_jitter_log_ms
760 };
761 if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
762 let expected_ms = length_in_seconds * 1000.0;
763 log::info!(
764 "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
765 delay_ms,
766 expected_ms,
767 late,
768 append_jitter_log_ms,
769 sink.len()
770 );
771 }
772 let mut chunk_lengths = chunk_lengths.lock().unwrap();
773
774 sink.append(mixer);
775
776 drop(sink);
777
778 chunk_lengths.push(length_in_seconds);
779 drop(chunk_lengths);
780
781 update_chunk_lengths();
782 check_details();
783 };
784
785 let receiver = engine.start_receiver();
786 loop {
787 match receiver.recv_timeout(Duration::from_millis(20)) {
788 Ok(chunk) => {
789 update_sink(chunk);
790 }
791 Err(RecvTimeoutError::Timeout) => {
792 update_chunk_lengths();
793 if !check_details() {
794 break;
795 }
796 }
797 Err(RecvTimeoutError::Disconnected) => break,
798 }
799 }
800 #[cfg(feature = "debug")]
801 log::info!("engine reception loop finished");
802
803 buffering_done.store(true, Ordering::Relaxed);
806 buffering_done_flag.store(true, Ordering::Relaxed);
807 {
808 let mut final_duration = final_duration.lock().unwrap();
809 if final_duration.is_none() {
810 let chunk_lengths = chunk_lengths.lock().unwrap();
811 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
812 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
813 }
814 }
815
816 #[cfg(feature = "debug")]
820 {
821 let sink = sink_mutex.lock().unwrap();
822 let paused = sink.is_paused();
823 let empty = sink.empty();
824 let sink_len = sink.len();
825 drop(sink);
826 let time_passed = *time_passed.lock().unwrap();
827 let final_duration = *final_duration.lock().unwrap();
828 log::info!(
829 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
830 paused,
831 empty,
832 sink_len,
833 time_passed,
834 final_duration
835 );
836 }
837
838 loop {
839 update_chunk_lengths();
840 if !check_details() {
841 break;
842 }
843
844 let done = if engine.finished_buffering() {
845 if let Some(final_duration) = *final_duration.lock().unwrap() {
846 let time_passed = *time_passed.lock().unwrap();
847 time_passed >= (final_duration - 0.001).max(0.0)
848 } else {
849 false
850 }
851 } else {
852 false
853 };
854 if done {
855 break;
856 }
857
858 thread::sleep(Duration::from_millis(10));
859 }
860
861 #[cfg(feature = "debug")]
862 log::info!("Finished drain loop!");
863
864 drop(_thread_guard);
868 });
869 }
870
871 pub fn play_at(&mut self, ts: f64) {
873 let mut timestamp = self.ts.lock().unwrap();
874 *timestamp = ts;
875 drop(timestamp);
876
877 self.request_effects_reset();
878 self.kill_current();
879 self.initialize_thread(Some(ts));
881
882 self.resume();
883
884 self.wait_for_audio_heard(Duration::from_secs(5));
885 }
886
887 pub fn play(&mut self) {
889 info!("Playing audio");
890 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
891 if !thread_exists {
894 self.initialize_thread(None);
895 }
896
897 self.resume();
898
899 self.wait_for_audio_heard(Duration::from_secs(5));
900 }
901
902 pub fn pause(&self) {
904 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
905 }
906
907 pub fn resume(&self) {
909 self.state
910 .lock()
911 .unwrap()
912 .clone_from(&PlayerState::Resuming);
913 }
914
915 pub fn kill_current(&self) {
917 self.state
918 .lock()
919 .unwrap()
920 .clone_from(&PlayerState::Stopping);
921 {
922 let sink = self.sink.lock().unwrap();
923 sink.stop();
924 }
925 self.abort.store(true, Ordering::SeqCst);
926
927 while !self.thread_finished() {
928 thread::sleep(Duration::from_millis(10));
929 }
930
931 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
932 }
933
934 pub fn stop(&self) {
936 self.kill_current();
937 self.ts.lock().unwrap().clone_from(&0.0);
938 }
939
940 pub fn is_playing(&self) -> bool {
942 let state = self.state.lock().unwrap();
943 *state == PlayerState::Playing
944 }
945
946 pub fn is_paused(&self) -> bool {
948 let state = self.state.lock().unwrap();
949 *state == PlayerState::Paused
950 }
951
952 pub fn get_time(&self) -> f64 {
954 let ts = self.ts.lock().unwrap();
955 *ts
956 }
957
958 fn thread_finished(&self) -> bool {
959 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
960 !playback_thread_exists
961 }
962
963 pub fn is_finished(&self) -> bool {
965 self.thread_finished()
966 }
969
970 pub fn sleep_until_end(&self) {
972 loop {
973 if self.thread_finished() {
974 break;
975 }
976 thread::sleep(Duration::from_millis(100));
977 }
978 }
979
980 pub fn get_duration(&self) -> f64 {
982 let duration = self.duration.lock().unwrap();
983 *duration
984 }
985
986 pub fn seek(&mut self, ts: f64) {
988 let mut timestamp = self.ts.lock().unwrap();
989 *timestamp = ts;
990 drop(timestamp);
991
992 self.request_effects_reset();
993 let state = self.state.lock().unwrap().clone();
994
995 self.kill_current();
996 self.state.lock().unwrap().clone_from(&state);
997 self.initialize_thread(Some(ts));
998
999 if state == PlayerState::Playing {
1000 self.resume();
1001 }
1002 }
1003
1004 pub fn refresh_tracks(&mut self) {
1006 let mut prot = self.prot.lock().unwrap();
1007 prot.refresh_tracks();
1008 if let Some(spec) = self.impulse_response_override.clone() {
1009 prot.set_impulse_response_spec(spec);
1010 }
1011 if let Some(tail_db) = self.impulse_response_tail_override {
1012 prot.set_impulse_response_tail_db(tail_db);
1013 }
1014 drop(prot);
1015
1016 self.request_effects_reset();
1017 if self.thread_finished() {
1019 return;
1020 }
1021
1022 let ts = self.get_time();
1025 self.seek(ts);
1026
1027 if self.is_playing() {
1029 self.resume();
1030 }
1031
1032 self.wait_for_audio_heard(Duration::from_secs(5));
1033 }
1034
1035 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1036 let start = Instant::now();
1037 loop {
1038 if self.audio_heard.load(Ordering::Relaxed) {
1039 return true;
1040 }
1041 if self.thread_finished() {
1042 warn!("playback thread ended before audio was heard");
1043 return false;
1044 }
1045 if start.elapsed() >= timeout {
1046 warn!("timed out waiting for audio to start");
1047 return false;
1048 }
1049 thread::sleep(Duration::from_millis(10));
1050 }
1051 }
1052
1053 pub fn shuffle(&mut self) {
1055 self.refresh_tracks();
1056 }
1057
1058 pub fn set_volume(&mut self, new_volume: f32) {
1060 let sink = self.sink.lock().unwrap();
1061 sink.set_volume(new_volume);
1062 drop(sink);
1063
1064 let mut volume = self.volume.lock().unwrap();
1065 *volume = new_volume;
1066 drop(volume);
1067 }
1068
1069 pub fn get_volume(&self) -> f32 {
1071 *self.volume.lock().unwrap()
1072 }
1073
1074 pub fn get_ids(&self) -> Vec<String> {
1076 let prot = self.prot.lock().unwrap();
1077
1078 return prot.get_ids();
1079 }
1080
1081 pub fn set_reporting(
1083 &mut self,
1084 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1085 reporting_interval: Duration,
1086 ) {
1087 if self.reporter.is_some() {
1088 self.reporter.as_ref().unwrap().lock().unwrap().stop();
1089 }
1090
1091 let reporter = Arc::new(Mutex::new(Reporter::new(
1092 Arc::new(Mutex::new(self.clone())),
1093 reporting,
1094 reporting_interval,
1095 )));
1096
1097 reporter.lock().unwrap().start();
1098
1099 self.reporter = Some(reporter);
1100 }
1101}
1102
1103fn linear_to_dbfs(value: f32) -> f32 {
1104 if value <= 0.0 {
1105 f32::NEG_INFINITY
1106 } else {
1107 20.0 * value.log10()
1108 }
1109}
1110
1111fn now_ms() -> u64 {
1112 use std::time::SystemTime;
1113 SystemTime::now()
1114 .duration_since(SystemTime::UNIX_EPOCH)
1115 .map(|d| d.as_millis() as u64)
1116 .unwrap_or(0)
1117}