1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19 container::info::Info,
20 dsp::effects::AudioEffect,
21 playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27 Init,
28 Resuming,
29 Playing,
30 Pausing,
31 Paused,
32 Stopping,
33 Stopped,
34 Finished,
35}
36
37#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40 pub enabled: bool,
41 pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49 exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53 fn new(exists: Arc<AtomicBool>) -> Self {
54 exists.store(true, Ordering::Relaxed);
55 Self { exists }
56 }
57}
58
59impl Drop for PlaybackThreadGuard {
60 fn drop(&mut self) {
61 self.exists.store(false, Ordering::Relaxed);
62 }
63}
64
65#[derive(Clone)]
70pub struct Player {
71 pub info: Info,
72 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73 pub ts: Arc<Mutex<f64>>,
74 state: Arc<Mutex<PlayerState>>,
75 abort: Arc<AtomicBool>,
76 playback_thread_exists: Arc<AtomicBool>,
77 playback_id: Arc<AtomicU64>,
78 duration: Arc<Mutex<f64>>,
79 prot: Arc<Mutex<Prot>>,
80 audio_heard: Arc<AtomicBool>,
81 volume: Arc<Mutex<f32>>,
82 sink: Arc<Mutex<Sink>>,
83 reporter: Option<Arc<Mutex<Reporter>>>,
84 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85 effects: Arc<Mutex<Vec<AudioEffect>>>,
86 dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87 effects_reset: Arc<AtomicU64>,
88 output_meter: Arc<Mutex<OutputMeter>>,
89 buffering_done: Arc<AtomicBool>,
90 last_chunk_ms: Arc<AtomicU64>,
91 last_time_update_ms: Arc<AtomicU64>,
92 next_resume_fade_ms: Arc<Mutex<Option<f32>>>,
93 impulse_response_override: Option<ImpulseResponseSpec>,
94 impulse_response_tail_override: Option<f32>,
95}
96
97impl Player {
98 pub fn new(file_path: &String) -> Self {
100 let this = Self::new_from_path_or_paths(Some(file_path), None);
101 this
102 }
103
104 pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
106 let this = Self::new_from_path_or_paths(None, Some(file_paths));
107 this
108 }
109
110 pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
112 let this = Self::new_from_path_or_paths(
113 None,
114 Some(
115 file_paths
116 .into_iter()
117 .map(|paths| PathsTrack::new_from_file_paths(paths))
118 .collect(),
119 ),
120 );
121 this
122 }
123
124 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
126 let (prot, info) = match path {
127 Some(path) => {
128 let prot = Arc::new(Mutex::new(Prot::new(path)));
129 let info = Info::new(path.clone());
130 (prot, info)
131 }
132 None => {
133 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
134 let locked_prot = prot.lock().unwrap();
135 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
136 drop(locked_prot);
137 (prot, info)
138 }
139 };
140
141 let (sink, _queue) = Sink::new();
142 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
143
144 let channels = info.channels as usize;
145 let sample_rate = info.sample_rate;
146 let effects = {
147 let prot_locked = prot.lock().unwrap();
148 match prot_locked.get_effects() {
149 Some(effects) => Arc::new(Mutex::new(effects)),
150 None => Arc::new(Mutex::new(vec![])),
151 }
152 };
153
154 let mut this = Self {
155 info,
156 finished_tracks: Arc::new(Mutex::new(Vec::new())),
157 state: Arc::new(Mutex::new(PlayerState::Stopped)),
158 abort: Arc::new(AtomicBool::new(false)),
159 ts: Arc::new(Mutex::new(0.0)),
160 playback_thread_exists: Arc::new(AtomicBool::new(true)),
161 playback_id: Arc::new(AtomicU64::new(0)),
162 duration: Arc::new(Mutex::new(0.0)),
163 audio_heard: Arc::new(AtomicBool::new(false)),
164 volume: Arc::new(Mutex::new(0.8)),
165 sink,
166 prot,
167 reporter: None,
168 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
169 effects,
170 dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
171 effects_reset: Arc::new(AtomicU64::new(0)),
172 output_meter: Arc::new(Mutex::new(OutputMeter::new(
173 channels,
174 sample_rate,
175 OUTPUT_METER_REFRESH_HZ,
176 ))),
177 buffering_done: Arc::new(AtomicBool::new(false)),
178 last_chunk_ms: Arc::new(AtomicU64::new(0)),
179 last_time_update_ms: Arc::new(AtomicU64::new(0)),
180 next_resume_fade_ms: Arc::new(Mutex::new(None)),
181 impulse_response_override: None,
182 impulse_response_tail_override: None,
183 };
184
185 this.initialize_thread(None);
186
187 this
188 }
189
190 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
192 self.impulse_response_override = Some(spec.clone());
193 let mut prot = self.prot.lock().unwrap();
194 prot.set_impulse_response_spec(spec);
195 self.request_effects_reset();
196 }
197
198 pub fn set_impulse_response_from_string(&mut self, value: &str) {
200 if let Some(spec) = parse_impulse_response_string(value) {
201 self.set_impulse_response_spec(spec);
202 }
203 }
204
205 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
207 self.impulse_response_tail_override = Some(tail_db);
208 let mut prot = self.prot.lock().unwrap();
209 prot.set_impulse_response_tail_db(tail_db);
210 self.request_effects_reset();
211 }
212
213 pub fn set_reverb_enabled(&self, enabled: bool) {
215 let mut effects = self.effects.lock().unwrap();
216 if let Some(effect) = effects
217 .iter_mut()
218 .find_map(|effect| effect.as_convolution_reverb_mut())
219 {
220 effect.enabled = enabled;
221 }
222 if let Some(effect) = effects
223 .iter_mut()
224 .find_map(|effect| effect.as_delay_reverb_mut())
225 {
226 effect.enabled = enabled;
227 }
228 }
229
230 pub fn set_reverb_mix(&self, dry_wet: f32) {
232 let mut effects = self.effects.lock().unwrap();
233 if let Some(effect) = effects
234 .iter_mut()
235 .find_map(|effect| effect.as_convolution_reverb_mut())
236 {
237 effect.dry_wet = dry_wet.clamp(0.0, 1.0);
238 }
239 if let Some(effect) = effects
240 .iter_mut()
241 .find_map(|effect| effect.as_delay_reverb_mut())
242 {
243 effect.mix = dry_wet.clamp(0.0, 1.0);
244 }
245 if let Some(effect) = effects
246 .iter_mut()
247 .find_map(|effect| effect.as_diffusion_reverb_mut())
248 {
249 effect.mix = dry_wet.clamp(0.0, 1.0);
250 }
251 }
252
253 pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
255 let effects = self.effects.lock().unwrap();
256 if let Some(effect) = effects
257 .iter()
258 .find_map(|effect| effect.as_convolution_reverb())
259 {
260 return ReverbSettingsSnapshot {
261 enabled: effect.enabled,
262 dry_wet: effect.dry_wet,
263 };
264 }
265 if let Some(effect) = effects
266 .iter()
267 .find_map(|effect| effect.as_diffusion_reverb())
268 {
269 return ReverbSettingsSnapshot {
270 enabled: effect.enabled,
271 dry_wet: effect.mix,
272 };
273 }
274 if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
275 return ReverbSettingsSnapshot {
276 enabled: effect.enabled,
277 dry_wet: effect.mix,
278 };
279 }
280 ReverbSettingsSnapshot {
281 enabled: false,
282 dry_wet: 0.0,
283 }
284 }
285
286 #[allow(deprecated)]
288 pub fn get_effect_names(&self) -> Vec<String> {
289 let effects = self.effects.lock().unwrap();
290 effects
291 .iter()
292 .map(|effect| match effect {
293 AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
294 AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
295 AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
296 AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
297 AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
298 AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
299 AudioEffect::Distortion(_) => "Distortion".to_string(),
300 AudioEffect::Gain(_) => "Gain".to_string(),
301 AudioEffect::Compressor(_) => "Compressor".to_string(),
302 AudioEffect::Limiter(_) => "Limiter".to_string(),
303 })
304 .collect()
305 }
306
307 pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
309 {
310 let mut guard = self.effects.lock().unwrap();
311 println!("New Effects: {:?}", effects);
312 *guard = effects;
313 }
314 self.request_effects_reset();
315
316 if !self.thread_finished() {
319 let ts = self.get_time();
320 self.seek(ts);
321 }
322 }
323
324 pub fn get_dsp_metrics(&self) -> DspChainMetrics {
326 *self.dsp_metrics.lock().unwrap()
327 }
328
329 pub fn get_levels(&self) -> Vec<f32> {
331 self.output_meter.lock().unwrap().levels()
332 }
333
334 pub fn get_levels_db(&self) -> Vec<f32> {
336 self.output_meter
337 .lock()
338 .unwrap()
339 .levels()
340 .into_iter()
341 .map(linear_to_dbfs)
342 .collect()
343 }
344
345 pub fn get_levels_avg(&self) -> Vec<f32> {
347 self.output_meter.lock().unwrap().averages()
348 }
349
350 pub fn set_output_meter_refresh_hz(&self, hz: f32) {
352 self.output_meter.lock().unwrap().set_refresh_hz(hz);
353 }
354
355 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
357 (
358 self.playback_thread_exists.load(Ordering::SeqCst),
359 *self.state.lock().unwrap(),
360 self.audio_heard.load(Ordering::Relaxed),
361 )
362 }
363
364 pub fn debug_buffering_done(&self) -> bool {
366 self.buffering_done.load(Ordering::Relaxed)
367 }
368
369 pub fn debug_timing_ms(&self) -> (u64, u64) {
371 (
372 self.last_chunk_ms.load(Ordering::Relaxed),
373 self.last_time_update_ms.load(Ordering::Relaxed),
374 )
375 }
376
377 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
379 let sink = self.sink.lock().unwrap();
380 let paused = sink.is_paused();
381 let empty = sink.empty();
382 let len = sink.len();
383 (paused, empty, len)
384 }
385
386 fn request_effects_reset(&self) {
387 self.effects_reset.fetch_add(1, Ordering::SeqCst);
388 }
389
390 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
392 let mut settings = self.buffer_settings.lock().unwrap();
393 settings.start_buffer_ms = start_buffer_ms.max(0.0);
394 }
395
396 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
398 let mut settings = self.buffer_settings.lock().unwrap();
399 settings.track_eos_ms = track_eos_ms.max(0.0);
400 }
401
402 pub fn set_start_sink_chunks(&self, chunks: usize) {
404 let mut settings = self.buffer_settings.lock().unwrap();
405 settings.start_sink_chunks = chunks;
406 }
407
408 pub fn set_max_sink_chunks(&self, chunks: usize) {
412 let mut settings = self.buffer_settings.lock().unwrap();
413 settings.max_sink_chunks = chunks;
414 }
415
416 pub fn set_startup_silence_ms(&self, ms: f32) {
418 let mut settings = self.buffer_settings.lock().unwrap();
419 settings.startup_silence_ms = ms.max(0.0);
420 }
421
422 pub fn set_startup_fade_ms(&self, ms: f32) {
424 let mut settings = self.buffer_settings.lock().unwrap();
425 settings.startup_fade_ms = ms.max(0.0);
426 }
427
428 pub fn set_seek_fade_out_ms(&self, ms: f32) {
430 let mut settings = self.buffer_settings.lock().unwrap();
431 settings.seek_fade_out_ms = ms.max(0.0);
432 }
433
434 pub fn set_seek_fade_in_ms(&self, ms: f32) {
436 let mut settings = self.buffer_settings.lock().unwrap();
437 settings.seek_fade_in_ms = ms.max(0.0);
438 }
439
440 pub fn set_append_jitter_log_ms(&self, ms: f32) {
442 let mut settings = self.buffer_settings.lock().unwrap();
443 settings.append_jitter_log_ms = ms.max(0.0);
444 }
445
446 pub fn set_effect_boundary_log(&self, enabled: bool) {
448 let mut settings = self.buffer_settings.lock().unwrap();
449 settings.effect_boundary_log = enabled;
450 }
451
452 fn initialize_thread(&mut self, ts: Option<f64>) {
453 let mut finished_tracks = self.finished_tracks.lock().unwrap();
455 finished_tracks.clear();
456 drop(finished_tracks);
457
458 self.abort = Arc::new(AtomicBool::new(false));
461 self.playback_thread_exists.store(true, Ordering::SeqCst);
462 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
463 self.buffering_done.store(false, Ordering::SeqCst);
464 let now_ms_value = now_ms();
465 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
466 self.last_time_update_ms
467 .store(now_ms_value, Ordering::Relaxed);
468
469 let play_state = self.state.clone();
471 let abort = self.abort.clone();
472 let playback_thread_exists = self.playback_thread_exists.clone();
473 let playback_id_atomic = self.playback_id.clone();
474 let time_passed = self.ts.clone();
475
476 let duration = self.duration.clone();
477 let prot = self.prot.clone();
478 let buffer_settings = self.buffer_settings.clone();
479 let buffer_settings_for_state = self.buffer_settings.clone();
480 let effects = self.effects.clone();
481 let dsp_metrics = self.dsp_metrics.clone();
482 let dsp_metrics_for_sink = self.dsp_metrics.clone();
483 let effects_reset = self.effects_reset.clone();
484 let output_meter = self.output_meter.clone();
485 let audio_info = self.info.clone();
486 let next_resume_fade_ms = self.next_resume_fade_ms.clone();
487
488 let audio_heard = self.audio_heard.clone();
489 let volume = self.volume.clone();
490 let sink_mutex = self.sink.clone();
491 let buffer_done_thread_flag = self.buffering_done.clone();
492 let last_chunk_ms = self.last_chunk_ms.clone();
493 let last_time_update_ms = self.last_time_update_ms.clone();
494
495 audio_heard.store(false, Ordering::Relaxed);
496
497 {
498 let mut meter = self.output_meter.lock().unwrap();
499 meter.reset();
500 }
501
502 thread::spawn(move || {
504 let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
508
509 let start_time = match ts {
513 Some(ts) => ts,
514 None => 0.0,
515 };
516 let mut engine = PlayerEngine::new(
517 prot,
518 Some(abort.clone()),
519 start_time,
520 buffer_settings,
521 effects,
522 dsp_metrics,
523 effects_reset,
524 );
525 let mut stream = None;
526 for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
527 match OutputStreamBuilder::open_default_stream() {
528 Ok(s) => {
529 stream = Some(s);
530 break;
531 }
532 Err(err) => {
533 if attempt == OUTPUT_STREAM_OPEN_RETRIES {
534 error!(
535 "failed to open default output stream after {} attempts: {}",
536 OUTPUT_STREAM_OPEN_RETRIES, err
537 );
538 return;
539 }
540 warn!(
541 "open_default_stream attempt {}/{} failed: {}",
542 attempt, OUTPUT_STREAM_OPEN_RETRIES, err
543 );
544 thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
545 }
546 }
547 }
548 let stream = stream.expect("stream should exist after successful retry loop");
549 let mixer = stream.mixer().clone();
550
551 let mut sink = sink_mutex.lock().unwrap();
552 *sink = Sink::connect_new(&mixer);
553 sink.pause();
554 sink.set_volume(*volume.lock().unwrap());
555 drop(sink);
556
557 let mut duration = duration.lock().unwrap();
561 *duration = engine.get_duration();
562 drop(duration);
563
564 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
568 let mut time_passed_unlocked = time_passed.lock().unwrap();
569 *time_passed_unlocked = start_time;
570 drop(time_passed_unlocked);
571
572 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
573 let timestamp = *time_passed.lock().unwrap();
574
575 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
576 while sink.volume() > 0.0 && timestamp != start_time {
578 sink.set_volume(sink.volume() - fade_increments);
579 thread::sleep(Duration::from_millis(10));
580 }
581 sink.pause();
582 };
583
584 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
585 let volume = *volume.lock().unwrap();
586 if fade_length_in_seconds <= 0.0 {
587 sink.play();
588 sink.set_volume(volume);
589 return;
590 }
591 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
592 sink.play();
594 while sink.volume() < volume {
595 sink.set_volume(sink.volume() + fade_increments);
596 thread::sleep(Duration::from_millis(5));
597 }
598 };
599
600 {
604 let startup_settings = buffer_settings_for_state.lock().unwrap();
605 let startup_silence_ms = startup_settings.startup_silence_ms;
606 drop(startup_settings);
607
608 let sample_rate = audio_info.sample_rate as u32;
609 let channels = audio_info.channels as u16;
610
611 if startup_silence_ms > 0.0 {
612 let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
613 as usize
614 * channels as usize;
615 let silence = vec![0.0_f32; samples.max(1)];
616 let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
617 let sink = sink_mutex.lock().unwrap();
618 sink.append(silence_buffer);
619 drop(sink);
620 }
621 }
622
623 let startup_fade_pending = Cell::new(true);
627 let check_details = || {
628 if abort.load(Ordering::SeqCst) {
629 let sink = sink_mutex.lock().unwrap();
630 pause_sink(&sink, 0.1);
631 sink.clear();
632 drop(sink);
633
634 return false;
635 }
636
637 let sink = sink_mutex.lock().unwrap();
638 let state = play_state.lock().unwrap().clone();
639 let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
640 if state == PlayerState::Resuming
641 && start_sink_chunks > 0
642 && sink.len() < start_sink_chunks
643 {
644 sink.pause();
646 drop(sink);
647 return true;
648 }
649 if state == PlayerState::Pausing {
650 pause_sink(&sink, 0.1);
651 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
652 }
653 if state == PlayerState::Resuming {
654 let fade_length = if startup_fade_pending.replace(false) {
655 if let Some(ms) = next_resume_fade_ms.lock().unwrap().take() {
656 (ms / 1000.0).max(0.0)
657 } else {
658 let startup_fade_ms =
659 buffer_settings_for_state.lock().unwrap().startup_fade_ms;
660 (startup_fade_ms / 1000.0).max(0.0)
661 }
662 } else {
663 0.1
664 };
665 resume_sink(&sink, fade_length);
666 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
667 }
668 drop(sink);
669
670 true
671 };
672
673 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
677 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
678 let buffering_done = Arc::new(AtomicBool::new(false));
679 let buffering_done_flag = buffer_done_thread_flag.clone();
680 let final_duration = Arc::new(Mutex::new(None::<f64>));
681 let mut timer = timer_mut.lock().unwrap();
682 timer.start();
683 drop(timer);
684
685 let last_meter_time = Cell::new(0.0_f64);
686 let update_chunk_lengths = || {
687 if abort.load(Ordering::SeqCst) {
688 return;
689 }
690
691 let mut chunk_lengths = chunk_lengths.lock().unwrap();
692 let mut time_passed_unlocked = time_passed.lock().unwrap();
693 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
694 let mut timer = timer_mut.lock().unwrap();
695 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
696 let sink = sink_mutex.lock().unwrap();
697 if !buffering_done.load(Ordering::Relaxed) {
698 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
701
702 for _ in 0..chunks_played {
703 timer.reset();
704 timer.start();
705 *time_chunks_passed += chunk_lengths.remove(0);
706 }
707 }
708
709 if sink.is_paused() {
710 timer.pause();
711 } else {
712 timer.un_pause();
713 }
714
715 let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
716 let delta = (current_audio_time - last_meter_time.get()).max(0.0);
717 last_meter_time.set(current_audio_time);
718 {
719 let mut meter = output_meter.lock().unwrap();
720 meter.advance(delta);
721 }
722
723 *time_passed_unlocked = current_audio_time;
724
725 drop(sink);
726 drop(chunk_lengths);
727 drop(time_passed_unlocked);
728 drop(time_chunks_passed);
729 drop(timer);
730 };
731
732 let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
736 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
737 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
738 return;
739 }
740 let max_sink_chunks = {
741 let settings = buffer_settings_for_state.lock().unwrap();
742 settings.max_sink_chunks
743 };
744 if max_sink_chunks > 0 {
745 loop {
746 if abort.load(Ordering::SeqCst) {
747 return;
748 }
749 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
750 return;
751 }
752 let sink_len = {
753 let sink = sink_mutex.lock().unwrap();
754 sink.len()
755 };
756 if sink_len < max_sink_chunks {
757 break;
758 }
759 update_chunk_lengths();
760 if !check_details() {
761 return;
762 }
763 thread::sleep(Duration::from_millis(5));
764 }
765 }
766 let (delay_ms, late) = {
767 let mut timing = append_timing.lock().unwrap();
768 let now = Instant::now();
769 let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
770 let chunk_ms = length_in_seconds * 1000.0;
771 let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
772 if late {
773 timing.2 = timing.2.saturating_add(1);
774 }
775 timing.1 = if timing.1 == 0.0 {
776 delta_ms
777 } else {
778 (timing.1 * 0.9) + (delta_ms * 0.1)
779 };
780 timing.3 = timing.3.max(delta_ms);
781 timing.0 = now;
782 (delta_ms, late)
783 };
784 audio_heard.store(true, Ordering::Relaxed);
785 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
786
787 {
788 let mut meter = output_meter.lock().unwrap();
789 meter.push_samples(&mixer);
790 }
791 {
792 let mut metrics = dsp_metrics_for_sink.lock().unwrap();
793 metrics.append_delay_ms = delay_ms;
794 metrics.avg_append_delay_ms = {
795 if metrics.avg_append_delay_ms == 0.0 {
796 delay_ms
797 } else {
798 (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
799 }
800 };
801 metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
802 metrics.late_append_count = {
803 let timing = append_timing.lock().unwrap();
804 timing.2
805 };
806 metrics.late_append_active = late;
807 }
808
809 let sink = sink_mutex.lock().unwrap();
810 let append_jitter_log_ms = {
811 let settings = buffer_settings_for_state.lock().unwrap();
812 settings.append_jitter_log_ms
813 };
814 if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
815 let expected_ms = length_in_seconds * 1000.0;
816 log::info!(
817 "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
818 delay_ms,
819 expected_ms,
820 late,
821 append_jitter_log_ms,
822 sink.len()
823 );
824 }
825 let mut chunk_lengths = chunk_lengths.lock().unwrap();
826
827 sink.append(mixer);
828
829 drop(sink);
830
831 chunk_lengths.push(length_in_seconds);
832 drop(chunk_lengths);
833
834 update_chunk_lengths();
835 check_details();
836 };
837
838 let receiver = engine.start_receiver();
839 loop {
840 match receiver.recv_timeout(Duration::from_millis(20)) {
841 Ok(chunk) => {
842 update_sink(chunk);
843 }
844 Err(RecvTimeoutError::Timeout) => {
845 update_chunk_lengths();
846 if !check_details() {
847 break;
848 }
849 }
850 Err(RecvTimeoutError::Disconnected) => break,
851 }
852 }
853 #[cfg(feature = "debug")]
854 log::info!("engine reception loop finished");
855
856 buffering_done.store(true, Ordering::Relaxed);
859 buffering_done_flag.store(true, Ordering::Relaxed);
860 {
861 let mut final_duration = final_duration.lock().unwrap();
862 if final_duration.is_none() {
863 let chunk_lengths = chunk_lengths.lock().unwrap();
864 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
865 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
866 }
867 }
868
869 #[cfg(feature = "debug")]
873 {
874 let sink = sink_mutex.lock().unwrap();
875 let paused = sink.is_paused();
876 let empty = sink.empty();
877 let sink_len = sink.len();
878 drop(sink);
879 let time_passed = *time_passed.lock().unwrap();
880 let final_duration = *final_duration.lock().unwrap();
881 log::info!(
882 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
883 paused,
884 empty,
885 sink_len,
886 time_passed,
887 final_duration
888 );
889 }
890
891 loop {
892 update_chunk_lengths();
893 if !check_details() {
894 break;
895 }
896
897 let done = if engine.finished_buffering() {
898 if let Some(final_duration) = *final_duration.lock().unwrap() {
899 let time_passed = *time_passed.lock().unwrap();
900 time_passed >= (final_duration - 0.001).max(0.0)
901 } else {
902 false
903 }
904 } else {
905 false
906 };
907 if done {
908 break;
909 }
910
911 thread::sleep(Duration::from_millis(10));
912 }
913
914 #[cfg(feature = "debug")]
915 log::info!("Finished drain loop!");
916
917 drop(_thread_guard);
921 });
922 }
923
924 pub fn play_at(&mut self, ts: f64) {
926 let mut timestamp = self.ts.lock().unwrap();
927 *timestamp = ts;
928 drop(timestamp);
929
930 self.request_effects_reset();
931 self.kill_current();
932 self.initialize_thread(Some(ts));
934
935 self.resume();
936
937 self.wait_for_audio_heard(Duration::from_secs(5));
938 }
939
940 pub fn play(&mut self) {
942 info!("Playing audio");
943 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
944 if !thread_exists {
947 self.initialize_thread(None);
948 }
949
950 self.resume();
951
952 self.wait_for_audio_heard(Duration::from_secs(5));
953 }
954
955 pub fn pause(&self) {
957 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
958 }
959
960 pub fn resume(&self) {
962 self.state
963 .lock()
964 .unwrap()
965 .clone_from(&PlayerState::Resuming);
966 }
967
968 pub fn kill_current(&self) {
970 self.state
971 .lock()
972 .unwrap()
973 .clone_from(&PlayerState::Stopping);
974 {
975 let sink = self.sink.lock().unwrap();
976 sink.stop();
977 }
978 self.abort.store(true, Ordering::SeqCst);
979
980 while !self.thread_finished() {
981 thread::sleep(Duration::from_millis(10));
982 }
983
984 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
985 }
986
987 pub fn stop(&self) {
989 self.kill_current();
990 self.ts.lock().unwrap().clone_from(&0.0);
991 }
992
993 pub fn is_playing(&self) -> bool {
995 let state = self.state.lock().unwrap();
996 *state == PlayerState::Playing
997 }
998
999 pub fn is_paused(&self) -> bool {
1001 let state = self.state.lock().unwrap();
1002 *state == PlayerState::Paused
1003 }
1004
1005 pub fn get_time(&self) -> f64 {
1007 let ts = self.ts.lock().unwrap();
1008 *ts
1009 }
1010
1011 fn thread_finished(&self) -> bool {
1012 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
1013 !playback_thread_exists
1014 }
1015
1016 pub fn is_finished(&self) -> bool {
1018 self.thread_finished()
1019 }
1022
1023 pub fn sleep_until_end(&self) {
1025 loop {
1026 if self.thread_finished() {
1027 break;
1028 }
1029 thread::sleep(Duration::from_millis(100));
1030 }
1031 }
1032
1033 pub fn get_duration(&self) -> f64 {
1035 let duration = self.duration.lock().unwrap();
1036 *duration
1037 }
1038
1039 pub fn seek(&mut self, ts: f64) {
1041 let mut timestamp = self.ts.lock().unwrap();
1042 *timestamp = ts;
1043 drop(timestamp);
1044
1045 let state = self.state.lock().unwrap().clone();
1046 let (seek_fade_out_ms, seek_fade_in_ms) = {
1047 let settings = self.buffer_settings.lock().unwrap();
1048 (settings.seek_fade_out_ms, settings.seek_fade_in_ms)
1049 };
1050 if matches!(state, PlayerState::Playing | PlayerState::Resuming) && seek_fade_out_ms > 0.0 {
1051 self.fade_current_sink_out(seek_fade_out_ms);
1052 }
1053 self.request_effects_reset();
1054
1055 self.kill_current();
1056 self.state.lock().unwrap().clone_from(&state);
1057 self.initialize_thread(Some(ts));
1058
1059 if matches!(state, PlayerState::Playing | PlayerState::Resuming) {
1060 *self.next_resume_fade_ms.lock().unwrap() = Some(seek_fade_in_ms);
1061 self.resume();
1062 }
1063 }
1064
1065 fn fade_current_sink_out(&self, fade_ms: f32) {
1067 let steps = ((fade_ms / 5.0).ceil() as u32).max(1);
1068 let step_ms = (fade_ms / steps as f32).max(1.0) as u64;
1069 let sink = self.sink.lock().unwrap();
1070 let start_volume = sink.volume().max(0.0);
1071 if start_volume <= 0.0 {
1072 return;
1073 }
1074 for step in 1..=steps {
1075 let t = step as f32 / steps as f32;
1076 let gain = start_volume * (1.0 - t);
1077 sink.set_volume(gain.max(0.0));
1078 thread::sleep(Duration::from_millis(step_ms));
1079 }
1080 }
1081
1082 pub fn refresh_tracks(&mut self) {
1084 let mut prot = self.prot.lock().unwrap();
1085 prot.refresh_tracks();
1086 if let Some(spec) = self.impulse_response_override.clone() {
1087 prot.set_impulse_response_spec(spec);
1088 }
1089 if let Some(tail_db) = self.impulse_response_tail_override {
1090 prot.set_impulse_response_tail_db(tail_db);
1091 }
1092 drop(prot);
1093
1094 self.request_effects_reset();
1095 if self.thread_finished() {
1097 return;
1098 }
1099
1100 let ts = self.get_time();
1103 self.seek(ts);
1104
1105 if self.is_playing() {
1107 self.resume();
1108 }
1109
1110 self.wait_for_audio_heard(Duration::from_secs(5));
1111 }
1112
1113 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1114 let start = Instant::now();
1115 loop {
1116 if self.audio_heard.load(Ordering::Relaxed) {
1117 return true;
1118 }
1119 if self.thread_finished() {
1120 warn!("playback thread ended before audio was heard");
1121 return false;
1122 }
1123 if start.elapsed() >= timeout {
1124 warn!("timed out waiting for audio to start");
1125 return false;
1126 }
1127 thread::sleep(Duration::from_millis(10));
1128 }
1129 }
1130
1131 pub fn shuffle(&mut self) {
1133 self.refresh_tracks();
1134 }
1135
1136 pub fn set_volume(&mut self, new_volume: f32) {
1138 let sink = self.sink.lock().unwrap();
1139 sink.set_volume(new_volume);
1140 drop(sink);
1141
1142 let mut volume = self.volume.lock().unwrap();
1143 *volume = new_volume;
1144 drop(volume);
1145 }
1146
1147 pub fn get_volume(&self) -> f32 {
1149 *self.volume.lock().unwrap()
1150 }
1151
1152 pub fn get_ids(&self) -> Vec<String> {
1154 let prot = self.prot.lock().unwrap();
1155
1156 return prot.get_ids();
1157 }
1158
1159 pub fn get_shuffle_schedule(&self) -> Vec<(f64, Vec<String>)> {
1163 let prot = self.prot.lock().unwrap();
1164 prot.get_shuffle_schedule()
1165 }
1166
1167 pub fn set_reporting(
1169 &mut self,
1170 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1171 reporting_interval: Duration,
1172 ) {
1173 if self.reporter.is_some() {
1174 self.reporter.as_ref().unwrap().lock().unwrap().stop();
1175 }
1176
1177 let reporter = Arc::new(Mutex::new(Reporter::new(
1178 Arc::new(Mutex::new(self.clone())),
1179 reporting,
1180 reporting_interval,
1181 )));
1182
1183 reporter.lock().unwrap().start();
1184
1185 self.reporter = Some(reporter);
1186 }
1187}
1188
1189fn linear_to_dbfs(value: f32) -> f32 {
1190 if value <= 0.0 {
1191 f32::NEG_INFINITY
1192 } else {
1193 20.0 * value.log10()
1194 }
1195}
1196
1197fn now_ms() -> u64 {
1198 use std::time::SystemTime;
1199 SystemTime::now()
1200 .duration_since(SystemTime::UNIX_EPOCH)
1201 .map(|d| d.as_millis() as u64)
1202 .unwrap_or(0)
1203}