1use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use log::{info, warn};
11
12use crate::audio::samples::clone_samples_buffer;
13use crate::container::prot::{parse_impulse_response_string, ImpulseResponseSpec, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::tools::timer;
16use crate::{
17 container::info::Info,
18 playback::engine::{PlaybackBufferSettings, PlayerEngine, ReverbMetrics, ReverbSettings},
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PlayerState {
24 Init,
25 Resuming,
26 Playing,
27 Pausing,
28 Paused,
29 Stopping,
30 Stopped,
31 Finished,
32}
33
34#[derive(Clone)]
39pub struct Player {
40 pub info: Info,
41 pub finished_tracks: Arc<Mutex<Vec<i32>>>,
42 pub ts: Arc<Mutex<f64>>,
43 state: Arc<Mutex<PlayerState>>,
44 abort: Arc<AtomicBool>,
45 playback_thread_exists: Arc<AtomicBool>,
46 playback_id: Arc<AtomicU64>,
47 duration: Arc<Mutex<f64>>,
48 prot: Arc<Mutex<Prot>>,
49 audio_heard: Arc<AtomicBool>,
50 volume: Arc<Mutex<f32>>,
51 sink: Arc<Mutex<Sink>>,
52 audition_source: Arc<Mutex<Option<SamplesBuffer>>>,
53 reporter: Option<Arc<Mutex<Reporter>>>,
54 buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
55 reverb_settings: Arc<Mutex<ReverbSettings>>,
56 reverb_metrics: Arc<Mutex<ReverbMetrics>>,
57 reverb_reset: Arc<AtomicU64>,
58 buffering_done: Arc<AtomicBool>,
59 last_chunk_ms: Arc<AtomicU64>,
60 last_time_update_ms: Arc<AtomicU64>,
61 impulse_response_override: Option<ImpulseResponseSpec>,
62 impulse_response_tail_override: Option<f32>,
63}
64
65impl Player {
66 pub fn new(file_path: &String) -> Self {
68 let this = Self::new_from_path_or_paths(Some(file_path), None);
69 this
70 }
71
72 pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
74 let this = Self::new_from_path_or_paths(None, Some(file_paths));
75 this
76 }
77
78 pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
80 let (prot, info) = match path {
81 Some(path) => {
82 let prot = Arc::new(Mutex::new(Prot::new(path)));
83 let info = Info::new(path.clone());
84 (prot, info)
85 }
86 None => {
87 let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
88 let locked_prot = prot.lock().unwrap();
89 let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
90 drop(locked_prot);
91 (prot, info)
92 }
93 };
94
95 let (sink, _queue) = Sink::new();
96 let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
97
98 let mut this = Self {
99 info,
100 finished_tracks: Arc::new(Mutex::new(Vec::new())),
101 state: Arc::new(Mutex::new(PlayerState::Stopped)),
102 abort: Arc::new(AtomicBool::new(false)),
103 ts: Arc::new(Mutex::new(0.0)),
104 playback_thread_exists: Arc::new(AtomicBool::new(true)),
105 playback_id: Arc::new(AtomicU64::new(0)),
106 duration: Arc::new(Mutex::new(0.0)),
107 audio_heard: Arc::new(AtomicBool::new(false)),
108 volume: Arc::new(Mutex::new(0.8)),
109 sink,
110 audition_source: Arc::new(Mutex::new(None)),
111 prot,
112 reporter: None,
113 buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
114 reverb_settings: Arc::new(Mutex::new(ReverbSettings::new(0.000001))),
115 reverb_metrics: Arc::new(Mutex::new(ReverbMetrics::default())),
116 reverb_reset: Arc::new(AtomicU64::new(0)),
117 buffering_done: Arc::new(AtomicBool::new(false)),
118 last_chunk_ms: Arc::new(AtomicU64::new(0)),
119 last_time_update_ms: Arc::new(AtomicU64::new(0)),
120 impulse_response_override: None,
121 impulse_response_tail_override: None,
122 };
123
124 this.initialize_thread(None);
125
126 this
127 }
128
129 pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
131 self.impulse_response_override = Some(spec.clone());
132 let mut prot = self.prot.lock().unwrap();
133 prot.set_impulse_response_spec(spec);
134 self.request_reverb_reset();
135 }
136
137 pub fn set_impulse_response_from_string(&mut self, value: &str) {
139 if let Some(spec) = parse_impulse_response_string(value) {
140 self.set_impulse_response_spec(spec);
141 }
142 }
143
144 pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
146 self.impulse_response_tail_override = Some(tail_db);
147 let mut prot = self.prot.lock().unwrap();
148 prot.set_impulse_response_tail_db(tail_db);
149 self.request_reverb_reset();
150 }
151
152 pub fn set_reverb_enabled(&self, enabled: bool) {
154 let mut settings = self.reverb_settings.lock().unwrap();
155 settings.enabled = enabled;
156 settings.reset_pending = true;
157 }
158
159 pub fn set_reverb_mix(&self, dry_wet: f32) {
161 let mut settings = self.reverb_settings.lock().unwrap();
162 settings.dry_wet = dry_wet.clamp(0.0, 1.0);
163 settings.reset_pending = true;
164 }
165
166 pub fn get_reverb_settings(&self) -> ReverbSettings {
168 *self.reverb_settings.lock().unwrap()
169 }
170
171 pub fn get_reverb_metrics(&self) -> ReverbMetrics {
173 *self.reverb_metrics.lock().unwrap()
174 }
175
176 pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
178 (
179 self.playback_thread_exists.load(Ordering::SeqCst),
180 *self.state.lock().unwrap(),
181 self.audio_heard.load(Ordering::Relaxed),
182 )
183 }
184
185 pub fn debug_buffering_done(&self) -> bool {
187 self.buffering_done.load(Ordering::Relaxed)
188 }
189
190 pub fn debug_timing_ms(&self) -> (u64, u64) {
192 (
193 self.last_chunk_ms.load(Ordering::Relaxed),
194 self.last_time_update_ms.load(Ordering::Relaxed),
195 )
196 }
197
198 pub fn debug_sink_state(&self) -> (bool, bool, usize) {
200 let sink = self.sink.lock().unwrap();
201 let paused = sink.is_paused();
202 let empty = sink.empty();
203 let len = sink.len();
204 (paused, empty, len)
205 }
206
207 fn request_reverb_reset(&self) {
208 self.reverb_reset.fetch_add(1, Ordering::SeqCst);
209 }
210
211 pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
213 let mut settings = self.buffer_settings.lock().unwrap();
214 settings.start_buffer_ms = start_buffer_ms.max(0.0);
215 }
216
217 pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
219 let mut settings = self.buffer_settings.lock().unwrap();
220 settings.track_eos_ms = track_eos_ms.max(0.0);
221 }
222
223 fn audition(&self, length: Duration) {
224 let audition_source_mutex = self.audition_source.clone();
225
226 thread::spawn(move || {
228 while audition_source_mutex.lock().unwrap().is_none() {
230 thread::sleep(Duration::from_millis(10));
231 }
232
233 let audition_source_option = audition_source_mutex.lock().unwrap().take();
234 let audition_source = audition_source_option.unwrap();
235
236 let _audition_stream = OutputStreamBuilder::open_default_stream().unwrap();
237 let audition_sink = Sink::connect_new(_audition_stream.mixer());
238 audition_sink.pause();
239 audition_sink.set_volume(0.8);
240 audition_sink.append(audition_source);
241 audition_sink.play();
242 thread::sleep(length);
243 audition_sink.pause();
244 });
245 }
246
247 fn initialize_thread(&mut self, ts: Option<f64>) {
248 let mut finished_tracks = self.finished_tracks.lock().unwrap();
250 finished_tracks.clear();
251 drop(finished_tracks);
252
253 self.abort.store(false, Ordering::SeqCst);
255 self.playback_thread_exists.store(true, Ordering::SeqCst);
256 let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
257 self.buffering_done.store(false, Ordering::SeqCst);
258 let now_ms_value = now_ms();
259 self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
260 self.last_time_update_ms
261 .store(now_ms_value, Ordering::Relaxed);
262
263 let play_state = self.state.clone();
265 let abort = self.abort.clone();
266 let playback_thread_exists = self.playback_thread_exists.clone();
267 let playback_id_atomic = self.playback_id.clone();
268 let time_passed = self.ts.clone();
269
270 let duration = self.duration.clone();
271 let prot = self.prot.clone();
272 let buffer_settings = self.buffer_settings.clone();
273 let reverb_settings = self.reverb_settings.clone();
274 let reverb_metrics = self.reverb_metrics.clone();
275 let reverb_reset = self.reverb_reset.clone();
276
277 let audio_heard = self.audio_heard.clone();
278 let volume = self.volume.clone();
279 let sink_mutex = self.sink.clone();
280 let audition_source_mutex = self.audition_source.clone();
281 let channels = 1.0 * self.info.channels as f64;
282 let buffer_done_thread_flag = self.buffering_done.clone();
283 let last_chunk_ms = self.last_chunk_ms.clone();
284 let last_time_update_ms = self.last_time_update_ms.clone();
285
286 audio_heard.store(false, Ordering::Relaxed);
287
288 let mut audition_source = audition_source_mutex.lock().unwrap();
290 *audition_source = None;
291 drop(audition_source);
292
293 thread::spawn(move || {
295 playback_thread_exists.store(true, Ordering::Relaxed);
299
300 let start_time = match ts {
304 Some(ts) => ts,
305 None => 0.0,
306 };
307 let mut engine = PlayerEngine::new(
308 prot,
309 Some(abort.clone()),
310 start_time,
311 buffer_settings,
312 reverb_settings,
313 reverb_metrics,
314 reverb_reset,
315 );
316 let _stream = OutputStreamBuilder::open_default_stream().unwrap();
317 let mixer = _stream.mixer().clone();
318
319 let mut sink = sink_mutex.lock().unwrap();
320 *sink = Sink::connect_new(&mixer);
321 sink.pause();
322 sink.set_volume(*volume.lock().unwrap());
323 drop(sink);
324
325 let mut duration = duration.lock().unwrap();
329 *duration = engine.get_duration();
330 drop(duration);
331
332 let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
336 let mut time_passed_unlocked = time_passed.lock().unwrap();
337 *time_passed_unlocked = start_time;
338 drop(time_passed_unlocked);
339
340 let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
341 let timestamp = *time_passed.lock().unwrap();
342
343 let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
344 while sink.volume() > 0.0 && timestamp != start_time {
346 sink.set_volume(sink.volume() - fade_increments);
347 thread::sleep(Duration::from_millis(10));
348 }
349 sink.pause();
350 };
351
352 let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
353 let volume = *volume.lock().unwrap();
354 let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
355 sink.play();
357 while sink.volume() < volume {
358 sink.set_volume(sink.volume() + fade_increments);
359 thread::sleep(Duration::from_millis(5));
360 }
361 };
362
363 let check_details = || {
372 if abort.load(Ordering::SeqCst) {
373 let sink = sink_mutex.lock().unwrap();
374 pause_sink(&sink, 0.1);
375 sink.clear();
376 drop(sink);
377
378 return false;
379 }
380
381 let sink = sink_mutex.lock().unwrap();
382 let state = play_state.lock().unwrap().clone();
383 if state == PlayerState::Pausing {
384 pause_sink(&sink, 0.1);
385 play_state.lock().unwrap().clone_from(&PlayerState::Paused);
386 }
387 if state == PlayerState::Resuming {
388 resume_sink(&sink, 0.1);
389 play_state.lock().unwrap().clone_from(&PlayerState::Playing);
390 }
391 drop(sink);
392
393 true
394 };
395
396 let time_chunks_mutex = Arc::new(Mutex::new(start_time));
400 let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
401 let buffering_done = Arc::new(AtomicBool::new(false));
402 let buffering_done_flag = buffer_done_thread_flag.clone();
403 let final_duration = Arc::new(Mutex::new(None::<f64>));
404 let mut timer = timer_mut.lock().unwrap();
405 timer.start();
406 drop(timer);
407
408 let update_chunk_lengths = || {
409 if abort.load(Ordering::SeqCst) {
410 return;
411 }
412
413 let mut chunk_lengths = chunk_lengths.lock().unwrap();
414 let mut time_passed_unlocked = time_passed.lock().unwrap();
415 let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
416 let mut timer = timer_mut.lock().unwrap();
417 last_time_update_ms.store(now_ms(), Ordering::Relaxed);
418 let sink = sink_mutex.lock().unwrap();
419 if !buffering_done.load(Ordering::Relaxed) {
420 let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
423
424 for _ in 0..chunks_played {
425 timer.reset();
426 timer.start();
427 *time_chunks_passed += chunk_lengths.remove(0);
428 }
429 }
430
431 if sink.is_paused() {
432 timer.pause();
433 } else {
434 timer.un_pause();
435 }
436
437 *time_passed_unlocked = *time_chunks_passed + timer.get_time().as_secs_f64();
438
439 drop(sink);
440 drop(chunk_lengths);
441 drop(time_passed_unlocked);
442 drop(time_chunks_passed);
443 drop(timer);
444 };
445
446 let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
450 if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
451 return;
452 }
453 audio_heard.store(true, Ordering::Relaxed);
454 last_chunk_ms.store(now_ms(), Ordering::Relaxed);
455
456 let mut audition_source = audition_source_mutex.lock().unwrap();
457 let sink = sink_mutex.lock().unwrap();
458 let mut chunk_lengths = chunk_lengths.lock().unwrap();
459
460 let total_time = chunk_lengths.iter().sum::<f64>();
461
462 if audition_source.is_none() {
464 let (mixer_clone, mixer) = clone_samples_buffer(mixer);
465 *audition_source = Some(mixer_clone);
466 drop(audition_source);
467 sink.append(mixer);
468 } else {
469 sink.append(mixer);
470 }
471 drop(sink);
472
473 chunk_lengths.push(length_in_seconds);
474 drop(chunk_lengths);
475
476 update_chunk_lengths();
477 check_details();
478 };
479
480 let receiver = engine.start_receiver();
481 loop {
482 match receiver.recv_timeout(Duration::from_millis(20)) {
483 Ok(chunk) => {
484 update_sink(chunk);
485 }
486 Err(RecvTimeoutError::Timeout) => {
487 update_chunk_lengths();
488 if !check_details() {
489 break;
490 }
491 }
492 Err(RecvTimeoutError::Disconnected) => break,
493 }
494 }
495 #[cfg(feature = "debug")]
496 log::info!("engine reception loop finished");
497
498 buffering_done.store(true, Ordering::Relaxed);
501 buffering_done_flag.store(true, Ordering::Relaxed);
502 {
503 let mut final_duration = final_duration.lock().unwrap();
504 if final_duration.is_none() {
505 let chunk_lengths = chunk_lengths.lock().unwrap();
506 let time_chunks_passed = time_chunks_mutex.lock().unwrap();
507 *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
508 }
509 }
510
511 #[cfg(feature = "debug")]
515 {
516 let sink = sink_mutex.lock().unwrap();
517 let paused = sink.is_paused();
518 let empty = sink.empty();
519 let sink_len = sink.len();
520 drop(sink);
521 let time_passed = *time_passed.lock().unwrap();
522 let final_duration = *final_duration.lock().unwrap();
523 log::info!(
524 "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
525 paused,
526 empty,
527 sink_len,
528 time_passed,
529 final_duration
530 );
531 }
532
533 loop {
534 update_chunk_lengths();
535 if !check_details() {
536 break;
537 }
538
539 let done = if engine.finished_buffering() {
540 if let Some(final_duration) = *final_duration.lock().unwrap() {
541 let time_passed = *time_passed.lock().unwrap();
542 time_passed >= (final_duration - 0.001).max(0.0)
543 } else {
544 false
545 }
546 } else {
547 false
548 };
549 if done {
550 break;
551 }
552
553 thread::sleep(Duration::from_millis(10));
554 }
555
556 #[cfg(feature = "debug")]
557 log::info!("Finished drain loop!");
558
559 playback_thread_exists.store(false, Ordering::Relaxed);
563 });
564 }
565
566 pub fn play_at(&mut self, ts: f64) {
568 let mut timestamp = self.ts.lock().unwrap();
569 *timestamp = ts;
570 drop(timestamp);
571
572 self.request_reverb_reset();
573 self.kill_current();
574 self.initialize_thread(Some(ts));
576
577 self.resume();
578
579 self.wait_for_audio_heard(Duration::from_secs(5));
580 }
581
582 pub fn play(&mut self) {
584 info!("Playing audio");
585 let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
586 if !thread_exists {
589 self.initialize_thread(None);
590 }
591
592 self.resume();
593
594 self.wait_for_audio_heard(Duration::from_secs(5));
595 }
596
597 pub fn pause(&self) {
599 self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
600 }
601
602 pub fn resume(&self) {
604 self.state
605 .lock()
606 .unwrap()
607 .clone_from(&PlayerState::Resuming);
608 }
609
610 pub fn kill_current(&self) {
612 self.state
613 .lock()
614 .unwrap()
615 .clone_from(&PlayerState::Stopping);
616 {
617 let sink = self.sink.lock().unwrap();
618 sink.stop();
619 }
620 self.abort.store(true, Ordering::SeqCst);
621
622 while !self.thread_finished() {
623 thread::sleep(Duration::from_millis(10));
624 }
625
626 self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
627 }
628
629 pub fn stop(&self) {
631 self.kill_current();
632 self.ts.lock().unwrap().clone_from(&0.0);
633 }
634
635 pub fn is_playing(&self) -> bool {
637 let state = self.state.lock().unwrap();
638 *state == PlayerState::Playing
639 }
640
641 pub fn is_paused(&self) -> bool {
643 let state = self.state.lock().unwrap();
644 *state == PlayerState::Paused
645 }
646
647 pub fn get_time(&self) -> f64 {
649 let ts = self.ts.lock().unwrap();
650 *ts
651 }
652
653 fn thread_finished(&self) -> bool {
654 let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
655 !playback_thread_exists
656 }
657
658 pub fn is_finished(&self) -> bool {
660 self.thread_finished()
661 }
664
665 pub fn sleep_until_end(&self) {
667 loop {
668 if self.thread_finished() {
669 break;
670 }
671 thread::sleep(Duration::from_millis(100));
672 }
673 }
674
675 pub fn get_duration(&self) -> f64 {
677 let duration = self.duration.lock().unwrap();
678 *duration
679 }
680
681 pub fn seek(&mut self, ts: f64) {
683 let mut timestamp = self.ts.lock().unwrap();
684 *timestamp = ts;
685 drop(timestamp);
686
687 self.request_reverb_reset();
688 let state = self.state.lock().unwrap().clone();
689
690 self.kill_current();
691 self.state.lock().unwrap().clone_from(&state);
692 self.initialize_thread(Some(ts));
693
694 match state {
695 PlayerState::Playing => self.resume(),
696 PlayerState::Paused => {
697 self.audition(Duration::from_millis(100));
698 }
699 _ => {}
700 }
701 }
702
703 pub fn refresh_tracks(&mut self) {
705 let mut prot = self.prot.lock().unwrap();
706 prot.refresh_tracks();
707 if let Some(spec) = self.impulse_response_override.clone() {
708 prot.set_impulse_response_spec(spec);
709 }
710 if let Some(tail_db) = self.impulse_response_tail_override {
711 prot.set_impulse_response_tail_db(tail_db);
712 }
713 drop(prot);
714
715 self.request_reverb_reset();
716 if self.thread_finished() {
718 return;
719 }
720
721 let ts = self.get_time();
724 self.seek(ts);
725
726 if self.is_playing() {
728 self.resume();
729 }
730
731 self.wait_for_audio_heard(Duration::from_secs(5));
732 }
733
734 fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
735 let start = Instant::now();
736 loop {
737 if self.audio_heard.load(Ordering::Relaxed) {
738 return true;
739 }
740 if self.thread_finished() {
741 warn!("playback thread ended before audio was heard");
742 return false;
743 }
744 if start.elapsed() >= timeout {
745 warn!("timed out waiting for audio to start");
746 return false;
747 }
748 thread::sleep(Duration::from_millis(10));
749 }
750 }
751
752 pub fn shuffle(&mut self) {
754 self.refresh_tracks();
755 }
756
757 pub fn set_volume(&mut self, new_volume: f32) {
759 let sink = self.sink.lock().unwrap();
760 sink.set_volume(new_volume);
761 drop(sink);
762
763 let mut volume = self.volume.lock().unwrap();
764 *volume = new_volume;
765 drop(volume);
766 }
767
768 pub fn get_volume(&self) -> f32 {
770 *self.volume.lock().unwrap()
771 }
772
773 pub fn get_ids(&self) -> Vec<String> {
775 let prot = self.prot.lock().unwrap();
776
777 return prot.get_ids();
778 }
779
780 pub fn set_reporting(
782 &mut self,
783 reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
784 reporting_interval: Duration,
785 ) {
786 if self.reporter.is_some() {
787 self.reporter.as_ref().unwrap().lock().unwrap().stop();
788 }
789
790 let reporter = Arc::new(Mutex::new(Reporter::new(
791 Arc::new(Mutex::new(self.clone())),
792 reporting,
793 reporting_interval,
794 )));
795
796 reporter.lock().unwrap().start();
797
798 self.reporter = Some(reporter);
799 }
800}
801
802fn now_ms() -> u64 {
803 use std::time::SystemTime;
804 SystemTime::now()
805 .duration_since(SystemTime::UNIX_EPOCH)
806 .map(|d| d.as_millis() as u64)
807 .unwrap_or(0)
808}