1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19 container::info::Info,
20 dsp::effects::AudioEffect,
21 playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27 Init,
28 Resuming,
29 Playing,
30 Pausing,
31 Paused,
32 Stopping,
33 Stopped,
34 Finished,
35}
36
37#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40 pub enabled: bool,
41 pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49 exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53 fn new(exists: Arc<AtomicBool>) -> Self {
54 exists.store(true, Ordering::Relaxed);
55 Self { exists }
56 }
57}
58
59impl Drop for PlaybackThreadGuard {
60 fn drop(&mut self) {
61 self.exists.store(false, Ordering::Relaxed);
62 }
63}
64
65#[derive(Clone)]
70pub struct Player {
71 pub info: Info,
72 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73 pub ts: Arc<Mutex<f64>>,
74 state: Arc<Mutex<PlayerState>>,
75 abort: Arc<AtomicBool>,
76 playback_thread_exists: Arc<AtomicBool>,
77 playback_id: Arc<AtomicU64>,
78 duration: Arc<Mutex<f64>>,
79 prot: Arc<Mutex<Prot>>,
80 audio_heard: Arc<AtomicBool>,
81 volume: Arc<Mutex<f32>>,
82 sink: Arc<Mutex<Sink>>,
83 reporter: Option<Arc<Mutex<Reporter>>>,
84 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85 effects: Arc<Mutex<Vec<AudioEffect>>>,
86 dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87 effects_reset: Arc<AtomicU64>,
88 output_meter: Arc<Mutex<OutputMeter>>,
89 buffering_done: Arc<AtomicBool>,
90 last_chunk_ms: Arc<AtomicU64>,
91 last_time_update_ms: Arc<AtomicU64>,
92 impulse_response_override: Option<ImpulseResponseSpec>,
93 impulse_response_tail_override: Option<f32>,
94}
95
96impl Player {
97 pub fn new(file_path: &String) -> Self {
99 let this = Self::new_from_path_or_paths(Some(file_path), None);
100 this
101 }
102
103 pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
105 let this = Self::new_from_path_or_paths(None, Some(file_paths));
106 this
107 }
108
109 pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
111 let this = Self::new_from_path_or_paths(
112 None,
113 Some(
114 file_paths
115 .into_iter()
116 .map(|paths| PathsTrack::new_from_file_paths(paths))
117 .collect(),
118 ),
119 );
120 this
121 }
122
123 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
125 let (prot, info) = match path {
126 Some(path) => {
127 let prot = Arc::new(Mutex::new(Prot::new(path)));
128 let info = Info::new(path.clone());
129 (prot, info)
130 }
131 None => {
132 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
133 let locked_prot = prot.lock().unwrap();
134 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
135 drop(locked_prot);
136 (prot, info)
137 }
138 };
139
140 let (sink, _queue) = Sink::new();
141 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
142
143 let channels = info.channels as usize;
144 let sample_rate = info.sample_rate;
145 let effects = {
146 let prot_locked = prot.lock().unwrap();
147 match prot_locked.get_effects() {
148 Some(effects) => Arc::new(Mutex::new(effects)),
149 None => Arc::new(Mutex::new(vec![])),
150 }
151 };
152
153 let mut this = Self {
154 info,
155 finished_tracks: Arc::new(Mutex::new(Vec::new())),
156 state: Arc::new(Mutex::new(PlayerState::Stopped)),
157 abort: Arc::new(AtomicBool::new(false)),
158 ts: Arc::new(Mutex::new(0.0)),
159 playback_thread_exists: Arc::new(AtomicBool::new(true)),
160 playback_id: Arc::new(AtomicU64::new(0)),
161 duration: Arc::new(Mutex::new(0.0)),
162 audio_heard: Arc::new(AtomicBool::new(false)),
163 volume: Arc::new(Mutex::new(0.8)),
164 sink,
165 prot,
166 reporter: None,
167 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
168 effects,
169 dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
170 effects_reset: Arc::new(AtomicU64::new(0)),
171 output_meter: Arc::new(Mutex::new(OutputMeter::new(
172 channels,
173 sample_rate,
174 OUTPUT_METER_REFRESH_HZ,
175 ))),
176 buffering_done: Arc::new(AtomicBool::new(false)),
177 last_chunk_ms: Arc::new(AtomicU64::new(0)),
178 last_time_update_ms: Arc::new(AtomicU64::new(0)),
179 impulse_response_override: None,
180 impulse_response_tail_override: None,
181 };
182
183 this.initialize_thread(None);
184
185 this
186 }
187
188 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
190 self.impulse_response_override = Some(spec.clone());
191 let mut prot = self.prot.lock().unwrap();
192 prot.set_impulse_response_spec(spec);
193 self.request_effects_reset();
194 }
195
196 pub fn set_impulse_response_from_string(&mut self, value: &str) {
198 if let Some(spec) = parse_impulse_response_string(value) {
199 self.set_impulse_response_spec(spec);
200 }
201 }
202
203 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
205 self.impulse_response_tail_override = Some(tail_db);
206 let mut prot = self.prot.lock().unwrap();
207 prot.set_impulse_response_tail_db(tail_db);
208 self.request_effects_reset();
209 }
210
211 pub fn set_reverb_enabled(&self, enabled: bool) {
213 let mut effects = self.effects.lock().unwrap();
214 if let Some(effect) = effects
215 .iter_mut()
216 .find_map(|effect| effect.as_convolution_reverb_mut())
217 {
218 effect.enabled = enabled;
219 }
220 if let Some(effect) = effects
221 .iter_mut()
222 .find_map(|effect| effect.as_delay_reverb_mut())
223 {
224 effect.enabled = enabled;
225 }
226 }
227
228 pub fn set_reverb_mix(&self, dry_wet: f32) {
230 let mut effects = self.effects.lock().unwrap();
231 if let Some(effect) = effects
232 .iter_mut()
233 .find_map(|effect| effect.as_convolution_reverb_mut())
234 {
235 effect.dry_wet = dry_wet.clamp(0.0, 1.0);
236 }
237 if let Some(effect) = effects
238 .iter_mut()
239 .find_map(|effect| effect.as_delay_reverb_mut())
240 {
241 effect.mix = dry_wet.clamp(0.0, 1.0);
242 }
243 if let Some(effect) = effects
244 .iter_mut()
245 .find_map(|effect| effect.as_diffusion_reverb_mut())
246 {
247 effect.mix = dry_wet.clamp(0.0, 1.0);
248 }
249 }
250
251 pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
253 let effects = self.effects.lock().unwrap();
254 if let Some(effect) = effects
255 .iter()
256 .find_map(|effect| effect.as_convolution_reverb())
257 {
258 return ReverbSettingsSnapshot {
259 enabled: effect.enabled,
260 dry_wet: effect.dry_wet,
261 };
262 }
263 if let Some(effect) = effects
264 .iter()
265 .find_map(|effect| effect.as_diffusion_reverb())
266 {
267 return ReverbSettingsSnapshot {
268 enabled: effect.enabled,
269 dry_wet: effect.mix,
270 };
271 }
272 if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
273 return ReverbSettingsSnapshot {
274 enabled: effect.enabled,
275 dry_wet: effect.mix,
276 };
277 }
278 ReverbSettingsSnapshot {
279 enabled: false,
280 dry_wet: 0.0,
281 }
282 }
283
284 #[allow(deprecated)]
286 pub fn get_effect_names(&self) -> Vec<String> {
287 let effects = self.effects.lock().unwrap();
288 effects
289 .iter()
290 .map(|effect| match effect {
291 AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
292 AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
293 AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
294 AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
295 AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
296 AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
297 AudioEffect::Distortion(_) => "Distortion".to_string(),
298 AudioEffect::Gain(_) => "Gain".to_string(),
299 AudioEffect::Compressor(_) => "Compressor".to_string(),
300 AudioEffect::Limiter(_) => "Limiter".to_string(),
301 })
302 .collect()
303 }
304
305 pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
307 {
308 let mut guard = self.effects.lock().unwrap();
309 println!("New Effects: {:?}", effects);
310 *guard = effects;
311 }
312 self.request_effects_reset();
313
314 if !self.thread_finished() {
317 let ts = self.get_time();
318 self.seek(ts);
319 }
320 }
321
322 pub fn get_dsp_metrics(&self) -> DspChainMetrics {
324 *self.dsp_metrics.lock().unwrap()
325 }
326
327 pub fn get_levels(&self) -> Vec<f32> {
329 self.output_meter.lock().unwrap().levels()
330 }
331
332 pub fn get_levels_db(&self) -> Vec<f32> {
334 self.output_meter
335 .lock()
336 .unwrap()
337 .levels()
338 .into_iter()
339 .map(linear_to_dbfs)
340 .collect()
341 }
342
343 pub fn get_levels_avg(&self) -> Vec<f32> {
345 self.output_meter.lock().unwrap().averages()
346 }
347
348 pub fn set_output_meter_refresh_hz(&self, hz: f32) {
350 self.output_meter.lock().unwrap().set_refresh_hz(hz);
351 }
352
353 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
355 (
356 self.playback_thread_exists.load(Ordering::SeqCst),
357 *self.state.lock().unwrap(),
358 self.audio_heard.load(Ordering::Relaxed),
359 )
360 }
361
362 pub fn debug_buffering_done(&self) -> bool {
364 self.buffering_done.load(Ordering::Relaxed)
365 }
366
367 pub fn debug_timing_ms(&self) -> (u64, u64) {
369 (
370 self.last_chunk_ms.load(Ordering::Relaxed),
371 self.last_time_update_ms.load(Ordering::Relaxed),
372 )
373 }
374
375 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
377 let sink = self.sink.lock().unwrap();
378 let paused = sink.is_paused();
379 let empty = sink.empty();
380 let len = sink.len();
381 (paused, empty, len)
382 }
383
384 fn request_effects_reset(&self) {
385 self.effects_reset.fetch_add(1, Ordering::SeqCst);
386 }
387
388 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
390 let mut settings = self.buffer_settings.lock().unwrap();
391 settings.start_buffer_ms = start_buffer_ms.max(0.0);
392 }
393
394 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
396 let mut settings = self.buffer_settings.lock().unwrap();
397 settings.track_eos_ms = track_eos_ms.max(0.0);
398 }
399
400 pub fn set_start_sink_chunks(&self, chunks: usize) {
402 let mut settings = self.buffer_settings.lock().unwrap();
403 settings.start_sink_chunks = chunks;
404 }
405
406 pub fn set_max_sink_chunks(&self, chunks: usize) {
410 let mut settings = self.buffer_settings.lock().unwrap();
411 settings.max_sink_chunks = chunks;
412 }
413
414 pub fn set_startup_silence_ms(&self, ms: f32) {
416 let mut settings = self.buffer_settings.lock().unwrap();
417 settings.startup_silence_ms = ms.max(0.0);
418 }
419
420 pub fn set_startup_fade_ms(&self, ms: f32) {
422 let mut settings = self.buffer_settings.lock().unwrap();
423 settings.startup_fade_ms = ms.max(0.0);
424 }
425
426 pub fn set_append_jitter_log_ms(&self, ms: f32) {
428 let mut settings = self.buffer_settings.lock().unwrap();
429 settings.append_jitter_log_ms = ms.max(0.0);
430 }
431
432 pub fn set_effect_boundary_log(&self, enabled: bool) {
434 let mut settings = self.buffer_settings.lock().unwrap();
435 settings.effect_boundary_log = enabled;
436 }
437
438 fn initialize_thread(&mut self, ts: Option<f64>) {
439 let mut finished_tracks = self.finished_tracks.lock().unwrap();
441 finished_tracks.clear();
442 drop(finished_tracks);
443
444 self.abort = Arc::new(AtomicBool::new(false));
447 self.playback_thread_exists.store(true, Ordering::SeqCst);
448 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
449 self.buffering_done.store(false, Ordering::SeqCst);
450 let now_ms_value = now_ms();
451 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
452 self.last_time_update_ms
453 .store(now_ms_value, Ordering::Relaxed);
454
455 let play_state = self.state.clone();
457 let abort = self.abort.clone();
458 let playback_thread_exists = self.playback_thread_exists.clone();
459 let playback_id_atomic = self.playback_id.clone();
460 let time_passed = self.ts.clone();
461
462 let duration = self.duration.clone();
463 let prot = self.prot.clone();
464 let buffer_settings = self.buffer_settings.clone();
465 let buffer_settings_for_state = self.buffer_settings.clone();
466 let effects = self.effects.clone();
467 let dsp_metrics = self.dsp_metrics.clone();
468 let dsp_metrics_for_sink = self.dsp_metrics.clone();
469 let effects_reset = self.effects_reset.clone();
470 let output_meter = self.output_meter.clone();
471 let audio_info = self.info.clone();
472
473 let audio_heard = self.audio_heard.clone();
474 let volume = self.volume.clone();
475 let sink_mutex = self.sink.clone();
476 let buffer_done_thread_flag = self.buffering_done.clone();
477 let last_chunk_ms = self.last_chunk_ms.clone();
478 let last_time_update_ms = self.last_time_update_ms.clone();
479
480 audio_heard.store(false, Ordering::Relaxed);
481
482 {
483 let mut meter = self.output_meter.lock().unwrap();
484 meter.reset();
485 }
486
487 thread::spawn(move || {
489 let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
493
494 let start_time = match ts {
498 Some(ts) => ts,
499 None => 0.0,
500 };
501 let mut engine = PlayerEngine::new(
502 prot,
503 Some(abort.clone()),
504 start_time,
505 buffer_settings,
506 effects,
507 dsp_metrics,
508 effects_reset,
509 );
510 let mut stream = None;
511 for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
512 match OutputStreamBuilder::open_default_stream() {
513 Ok(s) => {
514 stream = Some(s);
515 break;
516 }
517 Err(err) => {
518 if attempt == OUTPUT_STREAM_OPEN_RETRIES {
519 error!(
520 "failed to open default output stream after {} attempts: {}",
521 OUTPUT_STREAM_OPEN_RETRIES, err
522 );
523 return;
524 }
525 warn!(
526 "open_default_stream attempt {}/{} failed: {}",
527 attempt, OUTPUT_STREAM_OPEN_RETRIES, err
528 );
529 thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
530 }
531 }
532 }
533 let stream = stream.expect("stream should exist after successful retry loop");
534 let mixer = stream.mixer().clone();
535
536 let mut sink = sink_mutex.lock().unwrap();
537 *sink = Sink::connect_new(&mixer);
538 sink.pause();
539 sink.set_volume(*volume.lock().unwrap());
540 drop(sink);
541
542 let mut duration = duration.lock().unwrap();
546 *duration = engine.get_duration();
547 drop(duration);
548
549 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
553 let mut time_passed_unlocked = time_passed.lock().unwrap();
554 *time_passed_unlocked = start_time;
555 drop(time_passed_unlocked);
556
557 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
558 let timestamp = *time_passed.lock().unwrap();
559
560 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
561 while sink.volume() > 0.0 && timestamp != start_time {
563 sink.set_volume(sink.volume() - fade_increments);
564 thread::sleep(Duration::from_millis(10));
565 }
566 sink.pause();
567 };
568
569 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
570 let volume = *volume.lock().unwrap();
571 if fade_length_in_seconds <= 0.0 {
572 sink.play();
573 sink.set_volume(volume);
574 return;
575 }
576 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
577 sink.play();
579 while sink.volume() < volume {
580 sink.set_volume(sink.volume() + fade_increments);
581 thread::sleep(Duration::from_millis(5));
582 }
583 };
584
585 {
589 let startup_settings = buffer_settings_for_state.lock().unwrap();
590 let startup_silence_ms = startup_settings.startup_silence_ms;
591 drop(startup_settings);
592
593 let sample_rate = audio_info.sample_rate as u32;
594 let channels = audio_info.channels as u16;
595
596 if startup_silence_ms > 0.0 {
597 let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
598 as usize
599 * channels as usize;
600 let silence = vec![0.0_f32; samples.max(1)];
601 let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
602 let sink = sink_mutex.lock().unwrap();
603 sink.append(silence_buffer);
604 drop(sink);
605 }
606 }
607
608 let startup_fade_pending = Cell::new(true);
612 let check_details = || {
613 if abort.load(Ordering::SeqCst) {
614 let sink = sink_mutex.lock().unwrap();
615 pause_sink(&sink, 0.1);
616 sink.clear();
617 drop(sink);
618
619 return false;
620 }
621
622 let sink = sink_mutex.lock().unwrap();
623 let state = play_state.lock().unwrap().clone();
624 let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
625 if state == PlayerState::Resuming
626 && start_sink_chunks > 0
627 && sink.len() < start_sink_chunks
628 {
629 sink.pause();
631 drop(sink);
632 return true;
633 }
634 if state == PlayerState::Pausing {
635 pause_sink(&sink, 0.1);
636 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
637 }
638 if state == PlayerState::Resuming {
639 let fade_length = if startup_fade_pending.replace(false) {
640 let startup_fade_ms =
641 buffer_settings_for_state.lock().unwrap().startup_fade_ms;
642 (startup_fade_ms / 1000.0).max(0.0)
643 } else {
644 0.1
645 };
646 resume_sink(&sink, fade_length);
647 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
648 }
649 drop(sink);
650
651 true
652 };
653
654 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
658 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
659 let buffering_done = Arc::new(AtomicBool::new(false));
660 let buffering_done_flag = buffer_done_thread_flag.clone();
661 let final_duration = Arc::new(Mutex::new(None::<f64>));
662 let mut timer = timer_mut.lock().unwrap();
663 timer.start();
664 drop(timer);
665
666 let last_meter_time = Cell::new(0.0_f64);
667 let update_chunk_lengths = || {
668 if abort.load(Ordering::SeqCst) {
669 return;
670 }
671
672 let mut chunk_lengths = chunk_lengths.lock().unwrap();
673 let mut time_passed_unlocked = time_passed.lock().unwrap();
674 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
675 let mut timer = timer_mut.lock().unwrap();
676 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
677 let sink = sink_mutex.lock().unwrap();
678 if !buffering_done.load(Ordering::Relaxed) {
679 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
682
683 for _ in 0..chunks_played {
684 timer.reset();
685 timer.start();
686 *time_chunks_passed += chunk_lengths.remove(0);
687 }
688 }
689
690 if sink.is_paused() {
691 timer.pause();
692 } else {
693 timer.un_pause();
694 }
695
696 let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
697 let delta = (current_audio_time - last_meter_time.get()).max(0.0);
698 last_meter_time.set(current_audio_time);
699 {
700 let mut meter = output_meter.lock().unwrap();
701 meter.advance(delta);
702 }
703
704 *time_passed_unlocked = current_audio_time;
705
706 drop(sink);
707 drop(chunk_lengths);
708 drop(time_passed_unlocked);
709 drop(time_chunks_passed);
710 drop(timer);
711 };
712
713 let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
717 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
718 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
719 return;
720 }
721 let max_sink_chunks = {
722 let settings = buffer_settings_for_state.lock().unwrap();
723 settings.max_sink_chunks
724 };
725 if max_sink_chunks > 0 {
726 loop {
727 if abort.load(Ordering::SeqCst) {
728 return;
729 }
730 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
731 return;
732 }
733 let sink_len = {
734 let sink = sink_mutex.lock().unwrap();
735 sink.len()
736 };
737 if sink_len < max_sink_chunks {
738 break;
739 }
740 update_chunk_lengths();
741 if !check_details() {
742 return;
743 }
744 thread::sleep(Duration::from_millis(5));
745 }
746 }
747 let (delay_ms, late) = {
748 let mut timing = append_timing.lock().unwrap();
749 let now = Instant::now();
750 let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
751 let chunk_ms = length_in_seconds * 1000.0;
752 let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
753 if late {
754 timing.2 = timing.2.saturating_add(1);
755 }
756 timing.1 = if timing.1 == 0.0 {
757 delta_ms
758 } else {
759 (timing.1 * 0.9) + (delta_ms * 0.1)
760 };
761 timing.3 = timing.3.max(delta_ms);
762 timing.0 = now;
763 (delta_ms, late)
764 };
765 audio_heard.store(true, Ordering::Relaxed);
766 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
767
768 {
769 let mut meter = output_meter.lock().unwrap();
770 meter.push_samples(&mixer);
771 }
772 {
773 let mut metrics = dsp_metrics_for_sink.lock().unwrap();
774 metrics.append_delay_ms = delay_ms;
775 metrics.avg_append_delay_ms = {
776 if metrics.avg_append_delay_ms == 0.0 {
777 delay_ms
778 } else {
779 (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
780 }
781 };
782 metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
783 metrics.late_append_count = {
784 let timing = append_timing.lock().unwrap();
785 timing.2
786 };
787 metrics.late_append_active = late;
788 }
789
790 let sink = sink_mutex.lock().unwrap();
791 let append_jitter_log_ms = {
792 let settings = buffer_settings_for_state.lock().unwrap();
793 settings.append_jitter_log_ms
794 };
795 if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
796 let expected_ms = length_in_seconds * 1000.0;
797 log::info!(
798 "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
799 delay_ms,
800 expected_ms,
801 late,
802 append_jitter_log_ms,
803 sink.len()
804 );
805 }
806 let mut chunk_lengths = chunk_lengths.lock().unwrap();
807
808 sink.append(mixer);
809
810 drop(sink);
811
812 chunk_lengths.push(length_in_seconds);
813 drop(chunk_lengths);
814
815 update_chunk_lengths();
816 check_details();
817 };
818
819 let receiver = engine.start_receiver();
820 loop {
821 match receiver.recv_timeout(Duration::from_millis(20)) {
822 Ok(chunk) => {
823 update_sink(chunk);
824 }
825 Err(RecvTimeoutError::Timeout) => {
826 update_chunk_lengths();
827 if !check_details() {
828 break;
829 }
830 }
831 Err(RecvTimeoutError::Disconnected) => break,
832 }
833 }
834 #[cfg(feature = "debug")]
835 log::info!("engine reception loop finished");
836
837 buffering_done.store(true, Ordering::Relaxed);
840 buffering_done_flag.store(true, Ordering::Relaxed);
841 {
842 let mut final_duration = final_duration.lock().unwrap();
843 if final_duration.is_none() {
844 let chunk_lengths = chunk_lengths.lock().unwrap();
845 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
846 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
847 }
848 }
849
850 #[cfg(feature = "debug")]
854 {
855 let sink = sink_mutex.lock().unwrap();
856 let paused = sink.is_paused();
857 let empty = sink.empty();
858 let sink_len = sink.len();
859 drop(sink);
860 let time_passed = *time_passed.lock().unwrap();
861 let final_duration = *final_duration.lock().unwrap();
862 log::info!(
863 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
864 paused,
865 empty,
866 sink_len,
867 time_passed,
868 final_duration
869 );
870 }
871
872 loop {
873 update_chunk_lengths();
874 if !check_details() {
875 break;
876 }
877
878 let done = if engine.finished_buffering() {
879 if let Some(final_duration) = *final_duration.lock().unwrap() {
880 let time_passed = *time_passed.lock().unwrap();
881 time_passed >= (final_duration - 0.001).max(0.0)
882 } else {
883 false
884 }
885 } else {
886 false
887 };
888 if done {
889 break;
890 }
891
892 thread::sleep(Duration::from_millis(10));
893 }
894
895 #[cfg(feature = "debug")]
896 log::info!("Finished drain loop!");
897
898 drop(_thread_guard);
902 });
903 }
904
905 pub fn play_at(&mut self, ts: f64) {
907 let mut timestamp = self.ts.lock().unwrap();
908 *timestamp = ts;
909 drop(timestamp);
910
911 self.request_effects_reset();
912 self.kill_current();
913 self.initialize_thread(Some(ts));
915
916 self.resume();
917
918 self.wait_for_audio_heard(Duration::from_secs(5));
919 }
920
921 pub fn play(&mut self) {
923 info!("Playing audio");
924 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
925 if !thread_exists {
928 self.initialize_thread(None);
929 }
930
931 self.resume();
932
933 self.wait_for_audio_heard(Duration::from_secs(5));
934 }
935
936 pub fn pause(&self) {
938 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
939 }
940
941 pub fn resume(&self) {
943 self.state
944 .lock()
945 .unwrap()
946 .clone_from(&PlayerState::Resuming);
947 }
948
949 pub fn kill_current(&self) {
951 self.state
952 .lock()
953 .unwrap()
954 .clone_from(&PlayerState::Stopping);
955 {
956 let sink = self.sink.lock().unwrap();
957 sink.stop();
958 }
959 self.abort.store(true, Ordering::SeqCst);
960
961 while !self.thread_finished() {
962 thread::sleep(Duration::from_millis(10));
963 }
964
965 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
966 }
967
968 pub fn stop(&self) {
970 self.kill_current();
971 self.ts.lock().unwrap().clone_from(&0.0);
972 }
973
974 pub fn is_playing(&self) -> bool {
976 let state = self.state.lock().unwrap();
977 *state == PlayerState::Playing
978 }
979
980 pub fn is_paused(&self) -> bool {
982 let state = self.state.lock().unwrap();
983 *state == PlayerState::Paused
984 }
985
986 pub fn get_time(&self) -> f64 {
988 let ts = self.ts.lock().unwrap();
989 *ts
990 }
991
992 fn thread_finished(&self) -> bool {
993 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
994 !playback_thread_exists
995 }
996
997 pub fn is_finished(&self) -> bool {
999 self.thread_finished()
1000 }
1003
1004 pub fn sleep_until_end(&self) {
1006 loop {
1007 if self.thread_finished() {
1008 break;
1009 }
1010 thread::sleep(Duration::from_millis(100));
1011 }
1012 }
1013
1014 pub fn get_duration(&self) -> f64 {
1016 let duration = self.duration.lock().unwrap();
1017 *duration
1018 }
1019
1020 pub fn seek(&mut self, ts: f64) {
1022 let mut timestamp = self.ts.lock().unwrap();
1023 *timestamp = ts;
1024 drop(timestamp);
1025
1026 self.request_effects_reset();
1027 let state = self.state.lock().unwrap().clone();
1028
1029 self.kill_current();
1030 self.state.lock().unwrap().clone_from(&state);
1031 self.initialize_thread(Some(ts));
1032
1033 if state == PlayerState::Playing {
1034 self.resume();
1035 }
1036 }
1037
1038 pub fn refresh_tracks(&mut self) {
1040 let mut prot = self.prot.lock().unwrap();
1041 prot.refresh_tracks();
1042 if let Some(spec) = self.impulse_response_override.clone() {
1043 prot.set_impulse_response_spec(spec);
1044 }
1045 if let Some(tail_db) = self.impulse_response_tail_override {
1046 prot.set_impulse_response_tail_db(tail_db);
1047 }
1048 drop(prot);
1049
1050 self.request_effects_reset();
1051 if self.thread_finished() {
1053 return;
1054 }
1055
1056 let ts = self.get_time();
1059 self.seek(ts);
1060
1061 if self.is_playing() {
1063 self.resume();
1064 }
1065
1066 self.wait_for_audio_heard(Duration::from_secs(5));
1067 }
1068
1069 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1070 let start = Instant::now();
1071 loop {
1072 if self.audio_heard.load(Ordering::Relaxed) {
1073 return true;
1074 }
1075 if self.thread_finished() {
1076 warn!("playback thread ended before audio was heard");
1077 return false;
1078 }
1079 if start.elapsed() >= timeout {
1080 warn!("timed out waiting for audio to start");
1081 return false;
1082 }
1083 thread::sleep(Duration::from_millis(10));
1084 }
1085 }
1086
1087 pub fn shuffle(&mut self) {
1089 self.refresh_tracks();
1090 }
1091
1092 pub fn set_volume(&mut self, new_volume: f32) {
1094 let sink = self.sink.lock().unwrap();
1095 sink.set_volume(new_volume);
1096 drop(sink);
1097
1098 let mut volume = self.volume.lock().unwrap();
1099 *volume = new_volume;
1100 drop(volume);
1101 }
1102
1103 pub fn get_volume(&self) -> f32 {
1105 *self.volume.lock().unwrap()
1106 }
1107
1108 pub fn get_ids(&self) -> Vec<String> {
1110 let prot = self.prot.lock().unwrap();
1111
1112 return prot.get_ids();
1113 }
1114
1115 pub fn get_shuffle_schedule(&self) -> Vec<(f64, Vec<String>)> {
1119 let prot = self.prot.lock().unwrap();
1120 prot.get_shuffle_schedule()
1121 }
1122
1123 pub fn set_reporting(
1125 &mut self,
1126 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1127 reporting_interval: Duration,
1128 ) {
1129 if self.reporter.is_some() {
1130 self.reporter.as_ref().unwrap().lock().unwrap().stop();
1131 }
1132
1133 let reporter = Arc::new(Mutex::new(Reporter::new(
1134 Arc::new(Mutex::new(self.clone())),
1135 reporting,
1136 reporting_interval,
1137 )));
1138
1139 reporter.lock().unwrap().start();
1140
1141 self.reporter = Some(reporter);
1142 }
1143}
1144
1145fn linear_to_dbfs(value: f32) -> f32 {
1146 if value <= 0.0 {
1147 f32::NEG_INFINITY
1148 } else {
1149 20.0 * value.log10()
1150 }
1151}
1152
1153fn now_ms() -> u64 {
1154 use std::time::SystemTime;
1155 SystemTime::now()
1156 .duration_since(SystemTime::UNIX_EPOCH)
1157 .map(|d| d.as_millis() as u64)
1158 .unwrap_or(0)
1159}