1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::container::prot::Prot;
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19 container::info::Info,
20 dsp::effects::AudioEffect,
21 playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27 Init,
28 Resuming,
29 Playing,
30 Pausing,
31 Paused,
32 Stopping,
33 Stopped,
34 Finished,
35}
36
37#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40 pub enabled: bool,
41 pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45
46#[derive(Clone)]
51pub struct Player {
52 pub info: Info,
53 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
54 pub ts: Arc<Mutex<f64>>,
55 state: Arc<Mutex<PlayerState>>,
56 abort: Arc<AtomicBool>,
57 playback_thread_exists: Arc<AtomicBool>,
58 playback_id: Arc<AtomicU64>,
59 duration: Arc<Mutex<f64>>,
60 prot: Arc<Mutex<Prot>>,
61 audio_heard: Arc<AtomicBool>,
62 volume: Arc<Mutex<f32>>,
63 sink: Arc<Mutex<Sink>>,
64 reporter: Option<Arc<Mutex<Reporter>>>,
65 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
66 effects: Arc<Mutex<Vec<AudioEffect>>>,
67 dsp_metrics: Arc<Mutex<DspChainMetrics>>,
68 effects_reset: Arc<AtomicU64>,
69 output_meter: Arc<Mutex<OutputMeter>>,
70 buffering_done: Arc<AtomicBool>,
71 last_chunk_ms: Arc<AtomicU64>,
72 last_time_update_ms: Arc<AtomicU64>,
73 impulse_response_override: Option<ImpulseResponseSpec>,
74 impulse_response_tail_override: Option<f32>,
75}
76
77impl Player {
78 pub fn new(file_path: &String) -> Self {
80 let this = Self::new_from_path_or_paths(Some(file_path), None);
81 this
82 }
83
84 pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
86 let this = Self::new_from_path_or_paths(None, Some(file_paths));
87 this
88 }
89
90 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
92 let (prot, info) = match path {
93 Some(path) => {
94 let prot = Arc::new(Mutex::new(Prot::new(path)));
95 let info = Info::new(path.clone());
96 (prot, info)
97 }
98 None => {
99 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
100 let locked_prot = prot.lock().unwrap();
101 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
102 drop(locked_prot);
103 (prot, info)
104 }
105 };
106
107 let (sink, _queue) = Sink::new();
108 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
109
110 let channels = info.channels as usize;
111 let sample_rate = info.sample_rate;
112 let effects = {
113 let prot_locked = prot.lock().unwrap();
114 match prot_locked.get_effects() {
115 Some(effects) => Arc::new(Mutex::new(effects)),
116 None => Arc::new(Mutex::new(vec![])),
117 }
118 };
119
120 let mut this = Self {
121 info,
122 finished_tracks: Arc::new(Mutex::new(Vec::new())),
123 state: Arc::new(Mutex::new(PlayerState::Stopped)),
124 abort: Arc::new(AtomicBool::new(false)),
125 ts: Arc::new(Mutex::new(0.0)),
126 playback_thread_exists: Arc::new(AtomicBool::new(true)),
127 playback_id: Arc::new(AtomicU64::new(0)),
128 duration: Arc::new(Mutex::new(0.0)),
129 audio_heard: Arc::new(AtomicBool::new(false)),
130 volume: Arc::new(Mutex::new(0.8)),
131 sink,
132 prot,
133 reporter: None,
134 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
135 effects,
136 dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
137 effects_reset: Arc::new(AtomicU64::new(0)),
138 output_meter: Arc::new(Mutex::new(OutputMeter::new(
139 channels,
140 sample_rate,
141 OUTPUT_METER_REFRESH_HZ,
142 ))),
143 buffering_done: Arc::new(AtomicBool::new(false)),
144 last_chunk_ms: Arc::new(AtomicU64::new(0)),
145 last_time_update_ms: Arc::new(AtomicU64::new(0)),
146 impulse_response_override: None,
147 impulse_response_tail_override: None,
148 };
149
150 this.initialize_thread(None);
151
152 this
153 }
154
155 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
157 self.impulse_response_override = Some(spec.clone());
158 let mut prot = self.prot.lock().unwrap();
159 prot.set_impulse_response_spec(spec);
160 self.request_effects_reset();
161 }
162
163 pub fn set_impulse_response_from_string(&mut self, value: &str) {
165 if let Some(spec) = parse_impulse_response_string(value) {
166 self.set_impulse_response_spec(spec);
167 }
168 }
169
170 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
172 self.impulse_response_tail_override = Some(tail_db);
173 let mut prot = self.prot.lock().unwrap();
174 prot.set_impulse_response_tail_db(tail_db);
175 self.request_effects_reset();
176 }
177
178 pub fn set_reverb_enabled(&self, enabled: bool) {
180 let mut effects = self.effects.lock().unwrap();
181 if let Some(effect) = effects
182 .iter_mut()
183 .find_map(|effect| effect.as_convolution_reverb_mut())
184 {
185 effect.enabled = enabled;
186 }
187 if let Some(effect) = effects
188 .iter_mut()
189 .find_map(|effect| effect.as_delay_reverb_mut())
190 {
191 effect.enabled = enabled;
192 }
193 }
194
195 pub fn set_reverb_mix(&self, dry_wet: f32) {
197 let mut effects = self.effects.lock().unwrap();
198 if let Some(effect) = effects
199 .iter_mut()
200 .find_map(|effect| effect.as_convolution_reverb_mut())
201 {
202 effect.dry_wet = dry_wet.clamp(0.0, 1.0);
203 }
204 if let Some(effect) = effects
205 .iter_mut()
206 .find_map(|effect| effect.as_delay_reverb_mut())
207 {
208 effect.mix = dry_wet.clamp(0.0, 1.0);
209 }
210 if let Some(effect) = effects
211 .iter_mut()
212 .find_map(|effect| effect.as_diffusion_reverb_mut())
213 {
214 effect.mix = dry_wet.clamp(0.0, 1.0);
215 }
216 }
217
218 pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
220 let effects = self.effects.lock().unwrap();
221 if let Some(effect) = effects
222 .iter()
223 .find_map(|effect| effect.as_convolution_reverb())
224 {
225 return ReverbSettingsSnapshot {
226 enabled: effect.enabled,
227 dry_wet: effect.dry_wet,
228 };
229 }
230 if let Some(effect) = effects
231 .iter()
232 .find_map(|effect| effect.as_diffusion_reverb())
233 {
234 return ReverbSettingsSnapshot {
235 enabled: effect.enabled,
236 dry_wet: effect.mix,
237 };
238 }
239 if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
240 return ReverbSettingsSnapshot {
241 enabled: effect.enabled,
242 dry_wet: effect.mix,
243 };
244 }
245 ReverbSettingsSnapshot {
246 enabled: false,
247 dry_wet: 0.0,
248 }
249 }
250
251 #[allow(deprecated)]
253 pub fn get_effect_names(&self) -> Vec<String> {
254 let effects = self.effects.lock().unwrap();
255 effects
256 .iter()
257 .map(|effect| match effect {
258 AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
259 AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
260 AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
261 AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
262 AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
263 AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
264 AudioEffect::Distortion(_) => "Distortion".to_string(),
265 AudioEffect::Compressor(_) => "Compressor".to_string(),
266 AudioEffect::Limiter(_) => "Limiter".to_string(),
267 })
268 .collect()
269 }
270
271 pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
273 {
274 let mut guard = self.effects.lock().unwrap();
275 println!("New Effects: {:?}", effects);
276 *guard = effects;
277 }
278 self.request_effects_reset();
279
280 if !self.thread_finished() {
283 let ts = self.get_time();
284 self.seek(ts);
285 }
286 }
287
288 pub fn get_dsp_metrics(&self) -> DspChainMetrics {
290 *self.dsp_metrics.lock().unwrap()
291 }
292
293 pub fn get_levels(&self) -> Vec<f32> {
295 self.output_meter.lock().unwrap().levels()
296 }
297
298 pub fn get_levels_db(&self) -> Vec<f32> {
300 self.output_meter
301 .lock()
302 .unwrap()
303 .levels()
304 .into_iter()
305 .map(linear_to_dbfs)
306 .collect()
307 }
308
309 pub fn get_levels_avg(&self) -> Vec<f32> {
311 self.output_meter.lock().unwrap().averages()
312 }
313
314 pub fn set_output_meter_refresh_hz(&self, hz: f32) {
316 self.output_meter.lock().unwrap().set_refresh_hz(hz);
317 }
318
319 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
321 (
322 self.playback_thread_exists.load(Ordering::SeqCst),
323 *self.state.lock().unwrap(),
324 self.audio_heard.load(Ordering::Relaxed),
325 )
326 }
327
328 pub fn debug_buffering_done(&self) -> bool {
330 self.buffering_done.load(Ordering::Relaxed)
331 }
332
333 pub fn debug_timing_ms(&self) -> (u64, u64) {
335 (
336 self.last_chunk_ms.load(Ordering::Relaxed),
337 self.last_time_update_ms.load(Ordering::Relaxed),
338 )
339 }
340
341 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
343 let sink = self.sink.lock().unwrap();
344 let paused = sink.is_paused();
345 let empty = sink.empty();
346 let len = sink.len();
347 (paused, empty, len)
348 }
349
350 fn request_effects_reset(&self) {
351 self.effects_reset.fetch_add(1, Ordering::SeqCst);
352 }
353
354 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
356 let mut settings = self.buffer_settings.lock().unwrap();
357 settings.start_buffer_ms = start_buffer_ms.max(0.0);
358 }
359
360 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
362 let mut settings = self.buffer_settings.lock().unwrap();
363 settings.track_eos_ms = track_eos_ms.max(0.0);
364 }
365
366 pub fn set_start_sink_chunks(&self, chunks: usize) {
368 let mut settings = self.buffer_settings.lock().unwrap();
369 settings.start_sink_chunks = chunks;
370 }
371
372 pub fn set_startup_silence_ms(&self, ms: f32) {
374 let mut settings = self.buffer_settings.lock().unwrap();
375 settings.startup_silence_ms = ms.max(0.0);
376 }
377
378 pub fn set_startup_fade_ms(&self, ms: f32) {
380 let mut settings = self.buffer_settings.lock().unwrap();
381 settings.startup_fade_ms = ms.max(0.0);
382 }
383
384 pub fn set_append_jitter_log_ms(&self, ms: f32) {
386 let mut settings = self.buffer_settings.lock().unwrap();
387 settings.append_jitter_log_ms = ms.max(0.0);
388 }
389
390 pub fn set_effect_boundary_log(&self, enabled: bool) {
392 let mut settings = self.buffer_settings.lock().unwrap();
393 settings.effect_boundary_log = enabled;
394 }
395
396 fn initialize_thread(&mut self, ts: Option<f64>) {
397 let mut finished_tracks = self.finished_tracks.lock().unwrap();
399 finished_tracks.clear();
400 drop(finished_tracks);
401
402 self.abort = Arc::new(AtomicBool::new(false));
405 self.playback_thread_exists.store(true, Ordering::SeqCst);
406 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
407 self.buffering_done.store(false, Ordering::SeqCst);
408 let now_ms_value = now_ms();
409 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
410 self.last_time_update_ms
411 .store(now_ms_value, Ordering::Relaxed);
412
413 let play_state = self.state.clone();
415 let abort = self.abort.clone();
416 let playback_thread_exists = self.playback_thread_exists.clone();
417 let playback_id_atomic = self.playback_id.clone();
418 let time_passed = self.ts.clone();
419
420 let duration = self.duration.clone();
421 let prot = self.prot.clone();
422 let buffer_settings = self.buffer_settings.clone();
423 let buffer_settings_for_state = self.buffer_settings.clone();
424 let effects = self.effects.clone();
425 let dsp_metrics = self.dsp_metrics.clone();
426 let dsp_metrics_for_sink = self.dsp_metrics.clone();
427 let effects_reset = self.effects_reset.clone();
428 let output_meter = self.output_meter.clone();
429 let audio_info = self.info.clone();
430
431 let audio_heard = self.audio_heard.clone();
432 let volume = self.volume.clone();
433 let sink_mutex = self.sink.clone();
434 let buffer_done_thread_flag = self.buffering_done.clone();
435 let last_chunk_ms = self.last_chunk_ms.clone();
436 let last_time_update_ms = self.last_time_update_ms.clone();
437
438 audio_heard.store(false, Ordering::Relaxed);
439
440 {
441 let mut meter = self.output_meter.lock().unwrap();
442 meter.reset();
443 }
444
445 thread::spawn(move || {
447 playback_thread_exists.store(true, Ordering::Relaxed);
451
452 let start_time = match ts {
456 Some(ts) => ts,
457 None => 0.0,
458 };
459 let mut engine = PlayerEngine::new(
460 prot,
461 Some(abort.clone()),
462 start_time,
463 buffer_settings,
464 effects,
465 dsp_metrics,
466 effects_reset,
467 );
468 let _stream = OutputStreamBuilder::open_default_stream().unwrap();
469 let mixer = _stream.mixer().clone();
470
471 let mut sink = sink_mutex.lock().unwrap();
472 *sink = Sink::connect_new(&mixer);
473 sink.pause();
474 sink.set_volume(*volume.lock().unwrap());
475 drop(sink);
476
477 let mut duration = duration.lock().unwrap();
481 *duration = engine.get_duration();
482 drop(duration);
483
484 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
488 let mut time_passed_unlocked = time_passed.lock().unwrap();
489 *time_passed_unlocked = start_time;
490 drop(time_passed_unlocked);
491
492 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
493 let timestamp = *time_passed.lock().unwrap();
494
495 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
496 while sink.volume() > 0.0 && timestamp != start_time {
498 sink.set_volume(sink.volume() - fade_increments);
499 thread::sleep(Duration::from_millis(10));
500 }
501 sink.pause();
502 };
503
504 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
505 let volume = *volume.lock().unwrap();
506 if fade_length_in_seconds <= 0.0 {
507 sink.play();
508 sink.set_volume(volume);
509 return;
510 }
511 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
512 sink.play();
514 while sink.volume() < volume {
515 sink.set_volume(sink.volume() + fade_increments);
516 thread::sleep(Duration::from_millis(5));
517 }
518 };
519
520 {
524 let startup_settings = buffer_settings_for_state.lock().unwrap();
525 let startup_silence_ms = startup_settings.startup_silence_ms;
526 drop(startup_settings);
527
528 let sample_rate = audio_info.sample_rate as u32;
529 let channels = audio_info.channels as u16;
530
531 if startup_silence_ms > 0.0 {
532 let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
533 as usize
534 * channels as usize;
535 let silence = vec![0.0_f32; samples.max(1)];
536 let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
537 let sink = sink_mutex.lock().unwrap();
538 sink.append(silence_buffer);
539 drop(sink);
540 }
541
542 }
543
544 let startup_fade_pending = Cell::new(true);
548 let check_details = || {
549 if abort.load(Ordering::SeqCst) {
550 let sink = sink_mutex.lock().unwrap();
551 pause_sink(&sink, 0.1);
552 sink.clear();
553 drop(sink);
554
555 return false;
556 }
557
558 let sink = sink_mutex.lock().unwrap();
559 let state = play_state.lock().unwrap().clone();
560 let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
561 if state == PlayerState::Resuming
562 && start_sink_chunks > 0
563 && sink.len() < start_sink_chunks
564 {
565 sink.pause();
567 drop(sink);
568 return true;
569 }
570 if state == PlayerState::Pausing {
571 pause_sink(&sink, 0.1);
572 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
573 }
574 if state == PlayerState::Resuming {
575 let fade_length = if startup_fade_pending.replace(false) {
576 let startup_fade_ms = buffer_settings_for_state.lock().unwrap().startup_fade_ms;
577 (startup_fade_ms / 1000.0).max(0.0)
578 } else {
579 0.1
580 };
581 resume_sink(&sink, fade_length);
582 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
583 }
584 drop(sink);
585
586 true
587 };
588
589 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
593 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
594 let buffering_done = Arc::new(AtomicBool::new(false));
595 let buffering_done_flag = buffer_done_thread_flag.clone();
596 let final_duration = Arc::new(Mutex::new(None::<f64>));
597 let mut timer = timer_mut.lock().unwrap();
598 timer.start();
599 drop(timer);
600
601 let last_meter_time = Cell::new(0.0_f64);
602 let update_chunk_lengths = || {
603 if abort.load(Ordering::SeqCst) {
604 return;
605 }
606
607 let mut chunk_lengths = chunk_lengths.lock().unwrap();
608 let mut time_passed_unlocked = time_passed.lock().unwrap();
609 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
610 let mut timer = timer_mut.lock().unwrap();
611 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
612 let sink = sink_mutex.lock().unwrap();
613 if !buffering_done.load(Ordering::Relaxed) {
614 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
617
618 for _ in 0..chunks_played {
619 timer.reset();
620 timer.start();
621 *time_chunks_passed += chunk_lengths.remove(0);
622 }
623 }
624
625 if sink.is_paused() {
626 timer.pause();
627 } else {
628 timer.un_pause();
629 }
630
631 let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
632 let delta = (current_audio_time - last_meter_time.get()).max(0.0);
633 last_meter_time.set(current_audio_time);
634 {
635 let mut meter = output_meter.lock().unwrap();
636 meter.advance(delta);
637 }
638
639 *time_passed_unlocked = current_audio_time;
640
641 drop(sink);
642 drop(chunk_lengths);
643 drop(time_passed_unlocked);
644 drop(time_chunks_passed);
645 drop(timer);
646 };
647
648 let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
652 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
653 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
654 return;
655 }
656 let (delay_ms, late) = {
657 let mut timing = append_timing.lock().unwrap();
658 let now = Instant::now();
659 let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
660 let chunk_ms = length_in_seconds * 1000.0;
661 let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
662 if late {
663 timing.2 = timing.2.saturating_add(1);
664 }
665 timing.1 = if timing.1 == 0.0 {
666 delta_ms
667 } else {
668 (timing.1 * 0.9) + (delta_ms * 0.1)
669 };
670 timing.3 = timing.3.max(delta_ms);
671 timing.0 = now;
672 (delta_ms, late)
673 };
674 audio_heard.store(true, Ordering::Relaxed);
675 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
676
677 {
678 let mut meter = output_meter.lock().unwrap();
679 meter.push_samples(&mixer);
680 }
681 {
682 let mut metrics = dsp_metrics_for_sink.lock().unwrap();
683 metrics.append_delay_ms = delay_ms;
684 metrics.avg_append_delay_ms = {
685 if metrics.avg_append_delay_ms == 0.0 {
686 delay_ms
687 } else {
688 (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
689 }
690 };
691 metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
692 metrics.late_append_count = {
693 let timing = append_timing.lock().unwrap();
694 timing.2
695 };
696 metrics.late_append_active = late;
697 }
698
699 let sink = sink_mutex.lock().unwrap();
700 let append_jitter_log_ms = {
701 let settings = buffer_settings_for_state.lock().unwrap();
702 settings.append_jitter_log_ms
703 };
704 if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
705 let expected_ms = length_in_seconds * 1000.0;
706 log::info!(
707 "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
708 delay_ms,
709 expected_ms,
710 late,
711 append_jitter_log_ms,
712 sink.len()
713 );
714 }
715 let mut chunk_lengths = chunk_lengths.lock().unwrap();
716
717 sink.append(mixer);
718
719 drop(sink);
720
721 chunk_lengths.push(length_in_seconds);
722 drop(chunk_lengths);
723
724 update_chunk_lengths();
725 check_details();
726 };
727
728 let receiver = engine.start_receiver();
729 loop {
730 match receiver.recv_timeout(Duration::from_millis(20)) {
731 Ok(chunk) => {
732 update_sink(chunk);
733 }
734 Err(RecvTimeoutError::Timeout) => {
735 update_chunk_lengths();
736 if !check_details() {
737 break;
738 }
739 }
740 Err(RecvTimeoutError::Disconnected) => break,
741 }
742 }
743 #[cfg(feature = "debug")]
744 log::info!("engine reception loop finished");
745
746 buffering_done.store(true, Ordering::Relaxed);
749 buffering_done_flag.store(true, Ordering::Relaxed);
750 {
751 let mut final_duration = final_duration.lock().unwrap();
752 if final_duration.is_none() {
753 let chunk_lengths = chunk_lengths.lock().unwrap();
754 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
755 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
756 }
757 }
758
759 #[cfg(feature = "debug")]
763 {
764 let sink = sink_mutex.lock().unwrap();
765 let paused = sink.is_paused();
766 let empty = sink.empty();
767 let sink_len = sink.len();
768 drop(sink);
769 let time_passed = *time_passed.lock().unwrap();
770 let final_duration = *final_duration.lock().unwrap();
771 log::info!(
772 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
773 paused,
774 empty,
775 sink_len,
776 time_passed,
777 final_duration
778 );
779 }
780
781 loop {
782 update_chunk_lengths();
783 if !check_details() {
784 break;
785 }
786
787 let done = if engine.finished_buffering() {
788 if let Some(final_duration) = *final_duration.lock().unwrap() {
789 let time_passed = *time_passed.lock().unwrap();
790 time_passed >= (final_duration - 0.001).max(0.0)
791 } else {
792 false
793 }
794 } else {
795 false
796 };
797 if done {
798 break;
799 }
800
801 thread::sleep(Duration::from_millis(10));
802 }
803
804 #[cfg(feature = "debug")]
805 log::info!("Finished drain loop!");
806
807 playback_thread_exists.store(false, Ordering::Relaxed);
811 });
812 }
813
814 pub fn play_at(&mut self, ts: f64) {
816 let mut timestamp = self.ts.lock().unwrap();
817 *timestamp = ts;
818 drop(timestamp);
819
820 self.request_effects_reset();
821 self.kill_current();
822 self.initialize_thread(Some(ts));
824
825 self.resume();
826
827 self.wait_for_audio_heard(Duration::from_secs(5));
828 }
829
830 pub fn play(&mut self) {
832 info!("Playing audio");
833 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
834 if !thread_exists {
837 self.initialize_thread(None);
838 }
839
840 self.resume();
841
842 self.wait_for_audio_heard(Duration::from_secs(5));
843 }
844
845 pub fn pause(&self) {
847 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
848 }
849
850 pub fn resume(&self) {
852 self.state
853 .lock()
854 .unwrap()
855 .clone_from(&PlayerState::Resuming);
856 }
857
858 pub fn kill_current(&self) {
860 self.state
861 .lock()
862 .unwrap()
863 .clone_from(&PlayerState::Stopping);
864 {
865 let sink = self.sink.lock().unwrap();
866 sink.stop();
867 }
868 self.abort.store(true, Ordering::SeqCst);
869
870 while !self.thread_finished() {
871 thread::sleep(Duration::from_millis(10));
872 }
873
874 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
875 }
876
877 pub fn stop(&self) {
879 self.kill_current();
880 self.ts.lock().unwrap().clone_from(&0.0);
881 }
882
883 pub fn is_playing(&self) -> bool {
885 let state = self.state.lock().unwrap();
886 *state == PlayerState::Playing
887 }
888
889 pub fn is_paused(&self) -> bool {
891 let state = self.state.lock().unwrap();
892 *state == PlayerState::Paused
893 }
894
895 pub fn get_time(&self) -> f64 {
897 let ts = self.ts.lock().unwrap();
898 *ts
899 }
900
901 fn thread_finished(&self) -> bool {
902 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
903 !playback_thread_exists
904 }
905
906 pub fn is_finished(&self) -> bool {
908 self.thread_finished()
909 }
912
913 pub fn sleep_until_end(&self) {
915 loop {
916 if self.thread_finished() {
917 break;
918 }
919 thread::sleep(Duration::from_millis(100));
920 }
921 }
922
923 pub fn get_duration(&self) -> f64 {
925 let duration = self.duration.lock().unwrap();
926 *duration
927 }
928
929 pub fn seek(&mut self, ts: f64) {
931 let mut timestamp = self.ts.lock().unwrap();
932 *timestamp = ts;
933 drop(timestamp);
934
935 self.request_effects_reset();
936 let state = self.state.lock().unwrap().clone();
937
938 self.kill_current();
939 self.state.lock().unwrap().clone_from(&state);
940 self.initialize_thread(Some(ts));
941
942 if state == PlayerState::Playing {
943 self.resume();
944 }
945 }
946
947 pub fn refresh_tracks(&mut self) {
949 let mut prot = self.prot.lock().unwrap();
950 prot.refresh_tracks();
951 if let Some(spec) = self.impulse_response_override.clone() {
952 prot.set_impulse_response_spec(spec);
953 }
954 if let Some(tail_db) = self.impulse_response_tail_override {
955 prot.set_impulse_response_tail_db(tail_db);
956 }
957 drop(prot);
958
959 self.request_effects_reset();
960 if self.thread_finished() {
962 return;
963 }
964
965 let ts = self.get_time();
968 self.seek(ts);
969
970 if self.is_playing() {
972 self.resume();
973 }
974
975 self.wait_for_audio_heard(Duration::from_secs(5));
976 }
977
978 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
979 let start = Instant::now();
980 loop {
981 if self.audio_heard.load(Ordering::Relaxed) {
982 return true;
983 }
984 if self.thread_finished() {
985 warn!("playback thread ended before audio was heard");
986 return false;
987 }
988 if start.elapsed() >= timeout {
989 warn!("timed out waiting for audio to start");
990 return false;
991 }
992 thread::sleep(Duration::from_millis(10));
993 }
994 }
995
996 pub fn shuffle(&mut self) {
998 self.refresh_tracks();
999 }
1000
1001 pub fn set_volume(&mut self, new_volume: f32) {
1003 let sink = self.sink.lock().unwrap();
1004 sink.set_volume(new_volume);
1005 drop(sink);
1006
1007 let mut volume = self.volume.lock().unwrap();
1008 *volume = new_volume;
1009 drop(volume);
1010 }
1011
1012 pub fn get_volume(&self) -> f32 {
1014 *self.volume.lock().unwrap()
1015 }
1016
1017 pub fn get_ids(&self) -> Vec<String> {
1019 let prot = self.prot.lock().unwrap();
1020
1021 return prot.get_ids();
1022 }
1023
1024 pub fn set_reporting(
1026 &mut self,
1027 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1028 reporting_interval: Duration,
1029 ) {
1030 if self.reporter.is_some() {
1031 self.reporter.as_ref().unwrap().lock().unwrap().stop();
1032 }
1033
1034 let reporter = Arc::new(Mutex::new(Reporter::new(
1035 Arc::new(Mutex::new(self.clone())),
1036 reporting,
1037 reporting_interval,
1038 )));
1039
1040 reporter.lock().unwrap().start();
1041
1042 self.reporter = Some(reporter);
1043 }
1044}
1045
1046fn linear_to_dbfs(value: f32) -> f32 {
1047 if value <= 0.0 {
1048 f32::NEG_INFINITY
1049 } else {
1050 20.0 * value.log10()
1051 }
1052}
1053
1054fn now_ms() -> u64 {
1055 use std::time::SystemTime;
1056 SystemTime::now()
1057 .duration_since(SystemTime::UNIX_EPOCH)
1058 .map(|d| d.as_millis() as u64)
1059 .unwrap_or(0)
1060}