1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::audio::samples::clone_samples_buffer;
14use crate::container::prot::Prot;
15use crate::diagnostics::reporter::{Report, Reporter};
16use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
17use crate::playback::output_meter::OutputMeter;
18use crate::tools::timer;
19use crate::{
20 container::info::Info,
21 dsp::effects::AudioEffect,
22 playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
23};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum PlayerState {
28 Init,
29 Resuming,
30 Playing,
31 Pausing,
32 Paused,
33 Stopping,
34 Stopped,
35 Finished,
36}
37
38#[derive(Debug, Clone, Copy)]
40pub struct ReverbSettingsSnapshot {
41 pub enabled: bool,
42 pub dry_wet: f32,
43}
44
45const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
46
47#[derive(Clone)]
52pub struct Player {
53 pub info: Info,
54 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
55 pub ts: Arc<Mutex<f64>>,
56 state: Arc<Mutex<PlayerState>>,
57 abort: Arc<AtomicBool>,
58 playback_thread_exists: Arc<AtomicBool>,
59 playback_id: Arc<AtomicU64>,
60 duration: Arc<Mutex<f64>>,
61 prot: Arc<Mutex<Prot>>,
62 audio_heard: Arc<AtomicBool>,
63 volume: Arc<Mutex<f32>>,
64 sink: Arc<Mutex<Sink>>,
65 audition_source: Arc<Mutex<Option<SamplesBuffer>>>,
66 reporter: Option<Arc<Mutex<Reporter>>>,
67 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
68 effects: Arc<Mutex<Vec<AudioEffect>>>,
69 dsp_metrics: Arc<Mutex<DspChainMetrics>>,
70 effects_reset: Arc<AtomicU64>,
71 output_meter: Arc<Mutex<OutputMeter>>,
72 buffering_done: Arc<AtomicBool>,
73 last_chunk_ms: Arc<AtomicU64>,
74 last_time_update_ms: Arc<AtomicU64>,
75 impulse_response_override: Option<ImpulseResponseSpec>,
76 impulse_response_tail_override: Option<f32>,
77}
78
79impl Player {
80 pub fn new(file_path: &String) -> Self {
82 let this = Self::new_from_path_or_paths(Some(file_path), None);
83 this
84 }
85
86 pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
88 let this = Self::new_from_path_or_paths(None, Some(file_paths));
89 this
90 }
91
92 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
94 let (prot, info) = match path {
95 Some(path) => {
96 let prot = Arc::new(Mutex::new(Prot::new(path)));
97 let info = Info::new(path.clone());
98 (prot, info)
99 }
100 None => {
101 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
102 let locked_prot = prot.lock().unwrap();
103 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
104 drop(locked_prot);
105 (prot, info)
106 }
107 };
108
109 let (sink, _queue) = Sink::new();
110 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
111
112 let channels = info.channels as usize;
113 let sample_rate = info.sample_rate;
114 let effects = {
115 let prot_locked = prot.lock().unwrap();
116 match prot_locked.get_effects() {
117 Some(effects) => Arc::new(Mutex::new(effects)),
118 None => Arc::new(Mutex::new(vec![])),
119 }
120 };
121
122 let mut this = Self {
123 info,
124 finished_tracks: Arc::new(Mutex::new(Vec::new())),
125 state: Arc::new(Mutex::new(PlayerState::Stopped)),
126 abort: Arc::new(AtomicBool::new(false)),
127 ts: Arc::new(Mutex::new(0.0)),
128 playback_thread_exists: Arc::new(AtomicBool::new(true)),
129 playback_id: Arc::new(AtomicU64::new(0)),
130 duration: Arc::new(Mutex::new(0.0)),
131 audio_heard: Arc::new(AtomicBool::new(false)),
132 volume: Arc::new(Mutex::new(0.8)),
133 sink,
134 audition_source: Arc::new(Mutex::new(None)),
135 prot,
136 reporter: None,
137 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
138 effects,
139 dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
140 effects_reset: Arc::new(AtomicU64::new(0)),
141 output_meter: Arc::new(Mutex::new(OutputMeter::new(
142 channels,
143 sample_rate,
144 OUTPUT_METER_REFRESH_HZ,
145 ))),
146 buffering_done: Arc::new(AtomicBool::new(false)),
147 last_chunk_ms: Arc::new(AtomicU64::new(0)),
148 last_time_update_ms: Arc::new(AtomicU64::new(0)),
149 impulse_response_override: None,
150 impulse_response_tail_override: None,
151 };
152
153 this.initialize_thread(None);
154
155 this
156 }
157
158 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
160 self.impulse_response_override = Some(spec.clone());
161 let mut prot = self.prot.lock().unwrap();
162 prot.set_impulse_response_spec(spec);
163 self.request_effects_reset();
164 }
165
166 pub fn set_impulse_response_from_string(&mut self, value: &str) {
168 if let Some(spec) = parse_impulse_response_string(value) {
169 self.set_impulse_response_spec(spec);
170 }
171 }
172
173 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
175 self.impulse_response_tail_override = Some(tail_db);
176 let mut prot = self.prot.lock().unwrap();
177 prot.set_impulse_response_tail_db(tail_db);
178 self.request_effects_reset();
179 }
180
181 pub fn set_reverb_enabled(&self, enabled: bool) {
183 let mut effects = self.effects.lock().unwrap();
184 if let Some(effect) = effects
185 .iter_mut()
186 .find_map(|effect| effect.as_convolution_reverb_mut())
187 {
188 effect.enabled = enabled;
189 }
190 if let Some(effect) = effects
191 .iter_mut()
192 .find_map(|effect| effect.as_delay_reverb_mut())
193 {
194 effect.enabled = enabled;
195 }
196 }
197
198 pub fn set_reverb_mix(&self, dry_wet: f32) {
200 let mut effects = self.effects.lock().unwrap();
201 if let Some(effect) = effects
202 .iter_mut()
203 .find_map(|effect| effect.as_convolution_reverb_mut())
204 {
205 effect.dry_wet = dry_wet.clamp(0.0, 1.0);
206 }
207 if let Some(effect) = effects
208 .iter_mut()
209 .find_map(|effect| effect.as_delay_reverb_mut())
210 {
211 effect.mix = dry_wet.clamp(0.0, 1.0);
212 }
213 if let Some(effect) = effects
214 .iter_mut()
215 .find_map(|effect| effect.as_diffusion_reverb_mut())
216 {
217 effect.mix = dry_wet.clamp(0.0, 1.0);
218 }
219 }
220
221 pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
223 let effects = self.effects.lock().unwrap();
224 if let Some(effect) = effects
225 .iter()
226 .find_map(|effect| effect.as_convolution_reverb())
227 {
228 return ReverbSettingsSnapshot {
229 enabled: effect.enabled,
230 dry_wet: effect.dry_wet,
231 };
232 }
233 if let Some(effect) = effects
234 .iter()
235 .find_map(|effect| effect.as_diffusion_reverb())
236 {
237 return ReverbSettingsSnapshot {
238 enabled: effect.enabled,
239 dry_wet: effect.mix,
240 };
241 }
242 if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
243 return ReverbSettingsSnapshot {
244 enabled: effect.enabled,
245 dry_wet: effect.mix,
246 };
247 }
248 ReverbSettingsSnapshot {
249 enabled: false,
250 dry_wet: 0.0,
251 }
252 }
253
254 pub fn get_effect_names(&self) -> Vec<String> {
256 let effects = self.effects.lock().unwrap();
257 effects
258 .iter()
259 .map(|effect| match effect {
260 AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
261 AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
262 AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
263 AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
264 AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
265 AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
266 AudioEffect::Distortion(_) => "Distortion".to_string(),
267 AudioEffect::Compressor(_) => "Compressor".to_string(),
268 AudioEffect::Limiter(_) => "Limiter".to_string(),
269 })
270 .collect()
271 }
272
273 pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
275 {
276 let mut guard = self.effects.lock().unwrap();
277 println!("New Effects: {:?}", effects);
278 *guard = effects;
279 }
280 self.request_effects_reset();
281
282 if !self.thread_finished() {
285 let ts = self.get_time();
286 self.seek(ts);
287 }
288 }
289
290 pub fn get_dsp_metrics(&self) -> DspChainMetrics {
292 *self.dsp_metrics.lock().unwrap()
293 }
294
295 pub fn get_levels(&self) -> Vec<f32> {
297 self.output_meter.lock().unwrap().levels()
298 }
299
300 pub fn get_levels_db(&self) -> Vec<f32> {
302 self.output_meter
303 .lock()
304 .unwrap()
305 .levels()
306 .into_iter()
307 .map(linear_to_dbfs)
308 .collect()
309 }
310
311 pub fn get_levels_avg(&self) -> Vec<f32> {
313 self.output_meter.lock().unwrap().averages()
314 }
315
316 pub fn set_output_meter_refresh_hz(&self, hz: f32) {
318 self.output_meter.lock().unwrap().set_refresh_hz(hz);
319 }
320
321 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
323 (
324 self.playback_thread_exists.load(Ordering::SeqCst),
325 *self.state.lock().unwrap(),
326 self.audio_heard.load(Ordering::Relaxed),
327 )
328 }
329
330 pub fn debug_buffering_done(&self) -> bool {
332 self.buffering_done.load(Ordering::Relaxed)
333 }
334
335 pub fn debug_timing_ms(&self) -> (u64, u64) {
337 (
338 self.last_chunk_ms.load(Ordering::Relaxed),
339 self.last_time_update_ms.load(Ordering::Relaxed),
340 )
341 }
342
343 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
345 let sink = self.sink.lock().unwrap();
346 let paused = sink.is_paused();
347 let empty = sink.empty();
348 let len = sink.len();
349 (paused, empty, len)
350 }
351
352 fn request_effects_reset(&self) {
353 self.effects_reset.fetch_add(1, Ordering::SeqCst);
354 }
355
356 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
358 let mut settings = self.buffer_settings.lock().unwrap();
359 settings.start_buffer_ms = start_buffer_ms.max(0.0);
360 }
361
362 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
364 let mut settings = self.buffer_settings.lock().unwrap();
365 settings.track_eos_ms = track_eos_ms.max(0.0);
366 }
367
368 pub fn set_start_sink_chunks(&self, chunks: usize) {
370 let mut settings = self.buffer_settings.lock().unwrap();
371 settings.start_sink_chunks = chunks;
372 }
373
374 pub fn set_startup_silence_ms(&self, ms: f32) {
376 let mut settings = self.buffer_settings.lock().unwrap();
377 settings.startup_silence_ms = ms.max(0.0);
378 }
379
380 pub fn set_startup_fade_ms(&self, ms: f32) {
382 let mut settings = self.buffer_settings.lock().unwrap();
383 settings.startup_fade_ms = ms.max(0.0);
384 }
385
386 pub fn set_append_jitter_log_ms(&self, ms: f32) {
388 let mut settings = self.buffer_settings.lock().unwrap();
389 settings.append_jitter_log_ms = ms.max(0.0);
390 }
391
392 pub fn set_effect_boundary_log(&self, enabled: bool) {
394 let mut settings = self.buffer_settings.lock().unwrap();
395 settings.effect_boundary_log = enabled;
396 }
397
398 fn audition(&self, length: Duration) {
399 let audition_source_mutex = self.audition_source.clone();
400
401 thread::spawn(move || {
403 while audition_source_mutex.lock().unwrap().is_none() {
405 thread::sleep(Duration::from_millis(10));
406 }
407
408 let audition_source_option = audition_source_mutex.lock().unwrap().take();
409 let audition_source = audition_source_option.unwrap();
410
411 let _audition_stream = OutputStreamBuilder::open_default_stream().unwrap();
412 let audition_sink = Sink::connect_new(_audition_stream.mixer());
413 audition_sink.pause();
414 audition_sink.set_volume(0.8);
415 audition_sink.append(audition_source);
416 audition_sink.play();
417 thread::sleep(length);
418 audition_sink.pause();
419 });
420 }
421
422 fn initialize_thread(&mut self, ts: Option<f64>) {
423 let mut finished_tracks = self.finished_tracks.lock().unwrap();
425 finished_tracks.clear();
426 drop(finished_tracks);
427
428 self.abort = Arc::new(AtomicBool::new(false));
431 self.playback_thread_exists.store(true, Ordering::SeqCst);
432 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
433 self.buffering_done.store(false, Ordering::SeqCst);
434 let now_ms_value = now_ms();
435 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
436 self.last_time_update_ms
437 .store(now_ms_value, Ordering::Relaxed);
438
439 let play_state = self.state.clone();
441 let abort = self.abort.clone();
442 let playback_thread_exists = self.playback_thread_exists.clone();
443 let playback_id_atomic = self.playback_id.clone();
444 let time_passed = self.ts.clone();
445
446 let duration = self.duration.clone();
447 let prot = self.prot.clone();
448 let buffer_settings = self.buffer_settings.clone();
449 let buffer_settings_for_state = self.buffer_settings.clone();
450 let effects = self.effects.clone();
451 let dsp_metrics = self.dsp_metrics.clone();
452 let dsp_metrics_for_sink = self.dsp_metrics.clone();
453 let effects_reset = self.effects_reset.clone();
454 let output_meter = self.output_meter.clone();
455 let audio_info = self.info.clone();
456
457 let audio_heard = self.audio_heard.clone();
458 let volume = self.volume.clone();
459 let sink_mutex = self.sink.clone();
460 let audition_source_mutex = self.audition_source.clone();
461 let buffer_done_thread_flag = self.buffering_done.clone();
462 let last_chunk_ms = self.last_chunk_ms.clone();
463 let last_time_update_ms = self.last_time_update_ms.clone();
464
465 audio_heard.store(false, Ordering::Relaxed);
466
467 let mut audition_source = audition_source_mutex.lock().unwrap();
469 *audition_source = None;
470 drop(audition_source);
471
472 {
473 let mut meter = self.output_meter.lock().unwrap();
474 meter.reset();
475 }
476
477 thread::spawn(move || {
479 playback_thread_exists.store(true, Ordering::Relaxed);
483
484 let start_time = match ts {
488 Some(ts) => ts,
489 None => 0.0,
490 };
491 let mut engine = PlayerEngine::new(
492 prot,
493 Some(abort.clone()),
494 start_time,
495 buffer_settings,
496 effects,
497 dsp_metrics,
498 effects_reset,
499 );
500 let _stream = OutputStreamBuilder::open_default_stream().unwrap();
501 let mixer = _stream.mixer().clone();
502
503 let mut sink = sink_mutex.lock().unwrap();
504 *sink = Sink::connect_new(&mixer);
505 sink.pause();
506 sink.set_volume(*volume.lock().unwrap());
507 drop(sink);
508
509 let mut duration = duration.lock().unwrap();
513 *duration = engine.get_duration();
514 drop(duration);
515
516 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
520 let mut time_passed_unlocked = time_passed.lock().unwrap();
521 *time_passed_unlocked = start_time;
522 drop(time_passed_unlocked);
523
524 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
525 let timestamp = *time_passed.lock().unwrap();
526
527 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
528 while sink.volume() > 0.0 && timestamp != start_time {
530 sink.set_volume(sink.volume() - fade_increments);
531 thread::sleep(Duration::from_millis(10));
532 }
533 sink.pause();
534 };
535
536 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
537 let volume = *volume.lock().unwrap();
538 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
539 sink.play();
541 while sink.volume() < volume {
542 sink.set_volume(sink.volume() + fade_increments);
543 thread::sleep(Duration::from_millis(5));
544 }
545 };
546
547 {
551 let startup_settings = buffer_settings_for_state.lock().unwrap();
552 let startup_silence_ms = startup_settings.startup_silence_ms;
553 let startup_fade_ms = startup_settings.startup_fade_ms;
554 drop(startup_settings);
555
556 let sample_rate = audio_info.sample_rate as u32;
557 let channels = audio_info.channels as u16;
558
559 if startup_silence_ms > 0.0 {
560 let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
561 as usize
562 * channels as usize;
563 let silence = vec![0.0_f32; samples.max(1)];
564 let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
565 let sink = sink_mutex.lock().unwrap();
566 sink.append(silence_buffer);
567 drop(sink);
568 }
569
570 if startup_fade_ms > 0.0 {
571 resume_sink(
572 &sink_mutex.lock().unwrap(),
573 (startup_fade_ms / 1000.0).max(0.0),
574 );
575 }
576 }
577
578 let check_details = || {
582 if abort.load(Ordering::SeqCst) {
583 let sink = sink_mutex.lock().unwrap();
584 pause_sink(&sink, 0.1);
585 sink.clear();
586 drop(sink);
587
588 return false;
589 }
590
591 let sink = sink_mutex.lock().unwrap();
592 let state = play_state.lock().unwrap().clone();
593 let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
594 if state == PlayerState::Resuming
595 && start_sink_chunks > 0
596 && sink.len() < start_sink_chunks
597 {
598 sink.pause();
600 drop(sink);
601 return true;
602 }
603 if state == PlayerState::Pausing {
604 pause_sink(&sink, 0.1);
605 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
606 }
607 if state == PlayerState::Resuming {
608 resume_sink(&sink, 0.1);
609 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
610 }
611 drop(sink);
612
613 true
614 };
615
616 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
620 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
621 let buffering_done = Arc::new(AtomicBool::new(false));
622 let buffering_done_flag = buffer_done_thread_flag.clone();
623 let final_duration = Arc::new(Mutex::new(None::<f64>));
624 let mut timer = timer_mut.lock().unwrap();
625 timer.start();
626 drop(timer);
627
628 let last_meter_time = Cell::new(0.0_f64);
629 let update_chunk_lengths = || {
630 if abort.load(Ordering::SeqCst) {
631 return;
632 }
633
634 let mut chunk_lengths = chunk_lengths.lock().unwrap();
635 let mut time_passed_unlocked = time_passed.lock().unwrap();
636 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
637 let mut timer = timer_mut.lock().unwrap();
638 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
639 let sink = sink_mutex.lock().unwrap();
640 if !buffering_done.load(Ordering::Relaxed) {
641 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
644
645 for _ in 0..chunks_played {
646 timer.reset();
647 timer.start();
648 *time_chunks_passed += chunk_lengths.remove(0);
649 }
650 }
651
652 if sink.is_paused() {
653 timer.pause();
654 } else {
655 timer.un_pause();
656 }
657
658 let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
659 let delta = (current_audio_time - last_meter_time.get()).max(0.0);
660 last_meter_time.set(current_audio_time);
661 {
662 let mut meter = output_meter.lock().unwrap();
663 meter.advance(delta);
664 }
665
666 *time_passed_unlocked = current_audio_time;
667
668 drop(sink);
669 drop(chunk_lengths);
670 drop(time_passed_unlocked);
671 drop(time_chunks_passed);
672 drop(timer);
673 };
674
675 let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
679 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
680 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
681 return;
682 }
683 let (delay_ms, late) = {
684 let mut timing = append_timing.lock().unwrap();
685 let now = Instant::now();
686 let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
687 let chunk_ms = length_in_seconds * 1000.0;
688 let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
689 if late {
690 timing.2 = timing.2.saturating_add(1);
691 }
692 timing.1 = if timing.1 == 0.0 {
693 delta_ms
694 } else {
695 (timing.1 * 0.9) + (delta_ms * 0.1)
696 };
697 timing.3 = timing.3.max(delta_ms);
698 timing.0 = now;
699 (delta_ms, late)
700 };
701 audio_heard.store(true, Ordering::Relaxed);
702 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
703
704 {
705 let mut meter = output_meter.lock().unwrap();
706 meter.push_samples(&mixer);
707 }
708 {
709 let mut metrics = dsp_metrics_for_sink.lock().unwrap();
710 metrics.append_delay_ms = delay_ms;
711 metrics.avg_append_delay_ms = {
712 if metrics.avg_append_delay_ms == 0.0 {
713 delay_ms
714 } else {
715 (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
716 }
717 };
718 metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
719 metrics.late_append_count = {
720 let timing = append_timing.lock().unwrap();
721 timing.2
722 };
723 metrics.late_append_active = late;
724 }
725
726 let mut audition_source = audition_source_mutex.lock().unwrap();
727 let sink = sink_mutex.lock().unwrap();
728 let append_jitter_log_ms = {
729 let settings = buffer_settings_for_state.lock().unwrap();
730 settings.append_jitter_log_ms
731 };
732 if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
733 let expected_ms = length_in_seconds * 1000.0;
734 log::info!(
735 "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
736 delay_ms,
737 expected_ms,
738 late,
739 append_jitter_log_ms,
740 sink.len()
741 );
742 }
743 let mut chunk_lengths = chunk_lengths.lock().unwrap();
744
745 let total_time = chunk_lengths.iter().sum::<f64>();
746
747 if audition_source.is_none() {
749 let (mixer_clone, mixer) = clone_samples_buffer(mixer);
750 *audition_source = Some(mixer_clone);
751 drop(audition_source);
752 sink.append(mixer);
753 } else {
754 sink.append(mixer);
755 }
756
757 drop(sink);
758
759 chunk_lengths.push(length_in_seconds);
760 drop(chunk_lengths);
761
762 update_chunk_lengths();
763 check_details();
764 };
765
766 let receiver = engine.start_receiver();
767 loop {
768 match receiver.recv_timeout(Duration::from_millis(20)) {
769 Ok(chunk) => {
770 update_sink(chunk);
771 }
772 Err(RecvTimeoutError::Timeout) => {
773 update_chunk_lengths();
774 if !check_details() {
775 break;
776 }
777 }
778 Err(RecvTimeoutError::Disconnected) => break,
779 }
780 }
781 #[cfg(feature = "debug")]
782 log::info!("engine reception loop finished");
783
784 buffering_done.store(true, Ordering::Relaxed);
787 buffering_done_flag.store(true, Ordering::Relaxed);
788 {
789 let mut final_duration = final_duration.lock().unwrap();
790 if final_duration.is_none() {
791 let chunk_lengths = chunk_lengths.lock().unwrap();
792 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
793 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
794 }
795 }
796
797 #[cfg(feature = "debug")]
801 {
802 let sink = sink_mutex.lock().unwrap();
803 let paused = sink.is_paused();
804 let empty = sink.empty();
805 let sink_len = sink.len();
806 drop(sink);
807 let time_passed = *time_passed.lock().unwrap();
808 let final_duration = *final_duration.lock().unwrap();
809 log::info!(
810 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
811 paused,
812 empty,
813 sink_len,
814 time_passed,
815 final_duration
816 );
817 }
818
819 loop {
820 update_chunk_lengths();
821 if !check_details() {
822 break;
823 }
824
825 let done = if engine.finished_buffering() {
826 if let Some(final_duration) = *final_duration.lock().unwrap() {
827 let time_passed = *time_passed.lock().unwrap();
828 time_passed >= (final_duration - 0.001).max(0.0)
829 } else {
830 false
831 }
832 } else {
833 false
834 };
835 if done {
836 break;
837 }
838
839 thread::sleep(Duration::from_millis(10));
840 }
841
842 #[cfg(feature = "debug")]
843 log::info!("Finished drain loop!");
844
845 playback_thread_exists.store(false, Ordering::Relaxed);
849 });
850 }
851
852 pub fn play_at(&mut self, ts: f64) {
854 let mut timestamp = self.ts.lock().unwrap();
855 *timestamp = ts;
856 drop(timestamp);
857
858 self.request_effects_reset();
859 self.kill_current();
860 self.initialize_thread(Some(ts));
862
863 self.resume();
864
865 self.wait_for_audio_heard(Duration::from_secs(5));
866 }
867
868 pub fn play(&mut self) {
870 info!("Playing audio");
871 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
872 if !thread_exists {
875 self.initialize_thread(None);
876 }
877
878 self.resume();
879
880 self.wait_for_audio_heard(Duration::from_secs(5));
881 }
882
883 pub fn pause(&self) {
885 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
886 }
887
888 pub fn resume(&self) {
890 self.state
891 .lock()
892 .unwrap()
893 .clone_from(&PlayerState::Resuming);
894 }
895
896 pub fn kill_current(&self) {
898 self.state
899 .lock()
900 .unwrap()
901 .clone_from(&PlayerState::Stopping);
902 {
903 let sink = self.sink.lock().unwrap();
904 sink.stop();
905 }
906 self.abort.store(true, Ordering::SeqCst);
907
908 while !self.thread_finished() {
909 thread::sleep(Duration::from_millis(10));
910 }
911
912 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
913 }
914
915 pub fn stop(&self) {
917 self.kill_current();
918 self.ts.lock().unwrap().clone_from(&0.0);
919 }
920
921 pub fn is_playing(&self) -> bool {
923 let state = self.state.lock().unwrap();
924 *state == PlayerState::Playing
925 }
926
927 pub fn is_paused(&self) -> bool {
929 let state = self.state.lock().unwrap();
930 *state == PlayerState::Paused
931 }
932
933 pub fn get_time(&self) -> f64 {
935 let ts = self.ts.lock().unwrap();
936 *ts
937 }
938
939 fn thread_finished(&self) -> bool {
940 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
941 !playback_thread_exists
942 }
943
944 pub fn is_finished(&self) -> bool {
946 self.thread_finished()
947 }
950
951 pub fn sleep_until_end(&self) {
953 loop {
954 if self.thread_finished() {
955 break;
956 }
957 thread::sleep(Duration::from_millis(100));
958 }
959 }
960
961 pub fn get_duration(&self) -> f64 {
963 let duration = self.duration.lock().unwrap();
964 *duration
965 }
966
967 pub fn seek(&mut self, ts: f64) {
969 let mut timestamp = self.ts.lock().unwrap();
970 *timestamp = ts;
971 drop(timestamp);
972
973 self.request_effects_reset();
974 let state = self.state.lock().unwrap().clone();
975
976 self.kill_current();
977 self.state.lock().unwrap().clone_from(&state);
978 self.initialize_thread(Some(ts));
979
980 match state {
981 PlayerState::Playing => self.resume(),
982 PlayerState::Paused => {
983 self.audition(Duration::from_millis(100));
984 }
985 _ => {}
986 }
987 }
988
989 pub fn refresh_tracks(&mut self) {
991 let mut prot = self.prot.lock().unwrap();
992 prot.refresh_tracks();
993 if let Some(spec) = self.impulse_response_override.clone() {
994 prot.set_impulse_response_spec(spec);
995 }
996 if let Some(tail_db) = self.impulse_response_tail_override {
997 prot.set_impulse_response_tail_db(tail_db);
998 }
999 drop(prot);
1000
1001 self.request_effects_reset();
1002 if self.thread_finished() {
1004 return;
1005 }
1006
1007 let ts = self.get_time();
1010 self.seek(ts);
1011
1012 if self.is_playing() {
1014 self.resume();
1015 }
1016
1017 self.wait_for_audio_heard(Duration::from_secs(5));
1018 }
1019
1020 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1021 let start = Instant::now();
1022 loop {
1023 if self.audio_heard.load(Ordering::Relaxed) {
1024 return true;
1025 }
1026 if self.thread_finished() {
1027 warn!("playback thread ended before audio was heard");
1028 return false;
1029 }
1030 if start.elapsed() >= timeout {
1031 warn!("timed out waiting for audio to start");
1032 return false;
1033 }
1034 thread::sleep(Duration::from_millis(10));
1035 }
1036 }
1037
1038 pub fn shuffle(&mut self) {
1040 self.refresh_tracks();
1041 }
1042
1043 pub fn set_volume(&mut self, new_volume: f32) {
1045 let sink = self.sink.lock().unwrap();
1046 sink.set_volume(new_volume);
1047 drop(sink);
1048
1049 let mut volume = self.volume.lock().unwrap();
1050 *volume = new_volume;
1051 drop(volume);
1052 }
1053
1054 pub fn get_volume(&self) -> f32 {
1056 *self.volume.lock().unwrap()
1057 }
1058
1059 pub fn get_ids(&self) -> Vec<String> {
1061 let prot = self.prot.lock().unwrap();
1062
1063 return prot.get_ids();
1064 }
1065
1066 pub fn set_reporting(
1068 &mut self,
1069 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1070 reporting_interval: Duration,
1071 ) {
1072 if self.reporter.is_some() {
1073 self.reporter.as_ref().unwrap().lock().unwrap().stop();
1074 }
1075
1076 let reporter = Arc::new(Mutex::new(Reporter::new(
1077 Arc::new(Mutex::new(self.clone())),
1078 reporting,
1079 reporting_interval,
1080 )));
1081
1082 reporter.lock().unwrap().start();
1083
1084 self.reporter = Some(reporter);
1085 }
1086}
1087
1088fn linear_to_dbfs(value: f32) -> f32 {
1089 if value <= 0.0 {
1090 f32::NEG_INFINITY
1091 } else {
1092 20.0 * value.log10()
1093 }
1094}
1095
1096fn now_ms() -> u64 {
1097 use std::time::SystemTime;
1098 SystemTime::now()
1099 .duration_since(SystemTime::UNIX_EPOCH)
1100 .map(|d| d.as_millis() as u64)
1101 .unwrap_or(0)
1102}