Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::audio::samples::clone_samples_buffer;
14use crate::container::prot::Prot;
15use crate::diagnostics::reporter::{Report, Reporter};
16use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
17use crate::tools::timer;
18use crate::playback::output_meter::OutputMeter;
19use crate::{
20    container::info::Info,
21    dsp::effects::{AudioEffect, BasicReverbEffect, ConvolutionReverbEffect},
22    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
23};
24
25/// High-level playback state for the player.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum PlayerState {
28    Init,
29    Resuming,
30    Playing,
31    Pausing,
32    Paused,
33    Stopping,
34    Stopped,
35    Finished,
36}
37
38/// Snapshot of convolution reverb settings for UI consumers.
39#[derive(Debug, Clone, Copy)]
40pub struct ReverbSettingsSnapshot {
41    pub enabled: bool,
42    pub dry_wet: f32,
43}
44
45const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
46
47/// Primary playback controller.
48///
49/// `Player` owns the playback threads, buffering state, and runtime settings
50/// such as volume and reverb configuration.
51#[derive(Clone)]
52pub struct Player {
53    pub info: Info,
54    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
55    pub ts: Arc<Mutex<f64>>,
56    state: Arc<Mutex<PlayerState>>,
57    abort: Arc<AtomicBool>,
58    playback_thread_exists: Arc<AtomicBool>,
59    playback_id: Arc<AtomicU64>,
60    duration: Arc<Mutex<f64>>,
61    prot: Arc<Mutex<Prot>>,
62    audio_heard: Arc<AtomicBool>,
63    volume: Arc<Mutex<f32>>,
64    sink: Arc<Mutex<Sink>>,
65    audition_source: Arc<Mutex<Option<SamplesBuffer>>>,
66    reporter: Option<Arc<Mutex<Reporter>>>,
67    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
68    effects: Arc<Mutex<Vec<AudioEffect>>>,
69    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
70    effects_reset: Arc<AtomicU64>,
71    output_meter: Arc<Mutex<OutputMeter>>,
72    buffering_done: Arc<AtomicBool>,
73    last_chunk_ms: Arc<AtomicU64>,
74    last_time_update_ms: Arc<AtomicU64>,
75    impulse_response_override: Option<ImpulseResponseSpec>,
76    impulse_response_tail_override: Option<f32>,
77}
78
79impl Player {
80    /// Create a new player for a single container path.
81    pub fn new(file_path: &String) -> Self {
82        let this = Self::new_from_path_or_paths(Some(file_path), None);
83        this
84    }
85
86    /// Create a new player for a set of standalone file paths.
87    pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
88        let this = Self::new_from_path_or_paths(None, Some(file_paths));
89        this
90    }
91
92    /// Create a player from either a container path or standalone file paths.
93    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
94        let (prot, info) = match path {
95            Some(path) => {
96                let prot = Arc::new(Mutex::new(Prot::new(path)));
97                let info = Info::new(path.clone());
98                (prot, info)
99            }
100            None => {
101                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
102                let locked_prot = prot.lock().unwrap();
103                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
104                drop(locked_prot);
105                (prot, info)
106            }
107        };
108
109        let (sink, _queue) = Sink::new();
110        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
111
112        let channels = info.channels as usize;
113        let sample_rate = info.sample_rate;
114        let mut this = Self {
115            info,
116            finished_tracks: Arc::new(Mutex::new(Vec::new())),
117            state: Arc::new(Mutex::new(PlayerState::Stopped)),
118            abort: Arc::new(AtomicBool::new(false)),
119            ts: Arc::new(Mutex::new(0.0)),
120            playback_thread_exists: Arc::new(AtomicBool::new(true)),
121            playback_id: Arc::new(AtomicU64::new(0)),
122            duration: Arc::new(Mutex::new(0.0)),
123            audio_heard: Arc::new(AtomicBool::new(false)),
124            volume: Arc::new(Mutex::new(0.8)),
125            sink,
126            audition_source: Arc::new(Mutex::new(None)),
127            prot,
128            reporter: None,
129            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
130            effects: Arc::new(Mutex::new(vec![
131                AudioEffect::ConvolutionReverb(ConvolutionReverbEffect::new(0.000001)),
132                AudioEffect::BasicReverb(BasicReverbEffect::new(0.000001)),
133            ])),
134            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
135            effects_reset: Arc::new(AtomicU64::new(0)),
136            output_meter: Arc::new(Mutex::new(OutputMeter::new(
137                channels,
138                sample_rate,
139                OUTPUT_METER_REFRESH_HZ,
140            ))),
141            buffering_done: Arc::new(AtomicBool::new(false)),
142            last_chunk_ms: Arc::new(AtomicU64::new(0)),
143            last_time_update_ms: Arc::new(AtomicU64::new(0)),
144            impulse_response_override: None,
145            impulse_response_tail_override: None,
146        };
147
148        this.initialize_thread(None);
149
150        this
151    }
152
153    /// Override the impulse response used for convolution reverb.
154    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
155        self.impulse_response_override = Some(spec.clone());
156        let mut prot = self.prot.lock().unwrap();
157        prot.set_impulse_response_spec(spec);
158        self.request_effects_reset();
159    }
160
161    /// Parse and apply an impulse response spec string.
162    pub fn set_impulse_response_from_string(&mut self, value: &str) {
163        if let Some(spec) = parse_impulse_response_string(value) {
164            self.set_impulse_response_spec(spec);
165        }
166    }
167
168    /// Override the impulse response tail trim (dB).
169    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
170        self.impulse_response_tail_override = Some(tail_db);
171        let mut prot = self.prot.lock().unwrap();
172        prot.set_impulse_response_tail_db(tail_db);
173        self.request_effects_reset();
174    }
175
176    /// Enable or disable convolution reverb.
177    pub fn set_reverb_enabled(&self, enabled: bool) {
178        let mut effects = self.effects.lock().unwrap();
179        if let Some(effect) = effects
180            .iter_mut()
181            .find_map(|effect| effect.as_convolution_reverb_mut())
182        {
183            effect.enabled = enabled;
184        }
185        if let Some(effect) = effects
186            .iter_mut()
187            .find_map(|effect| effect.as_basic_reverb_mut())
188        {
189            effect.enabled = enabled;
190        }
191    }
192
193    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
194    pub fn set_reverb_mix(&self, dry_wet: f32) {
195        let mut effects = self.effects.lock().unwrap();
196        if let Some(effect) = effects
197            .iter_mut()
198            .find_map(|effect| effect.as_convolution_reverb_mut())
199        {
200            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
201        }
202        if let Some(effect) = effects
203            .iter_mut()
204            .find_map(|effect| effect.as_basic_reverb_mut())
205        {
206            effect.mix = dry_wet.clamp(0.0, 1.0);
207        }
208    }
209
210    /// Retrieve the current reverb settings snapshot.
211    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
212        let effects = self.effects.lock().unwrap();
213        if let Some(effect) = effects
214            .iter()
215            .find_map(|effect| effect.as_convolution_reverb())
216        {
217            return ReverbSettingsSnapshot {
218                enabled: effect.enabled,
219                dry_wet: effect.dry_wet,
220            };
221        }
222        if let Some(effect) = effects.iter().find_map(|effect| effect.as_basic_reverb()) {
223            return ReverbSettingsSnapshot {
224                enabled: effect.enabled,
225                dry_wet: effect.mix,
226            };
227        }
228        ReverbSettingsSnapshot {
229            enabled: false,
230            dry_wet: 0.0,
231        }
232    }
233
234    /// Snapshot the active effect chain names.
235    pub fn get_effect_names(&self) -> Vec<String> {
236        let effects = self.effects.lock().unwrap();
237        effects
238            .iter()
239            .map(|effect| match effect {
240                AudioEffect::BasicReverb(_) => "BasicReverb".to_string(),
241                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
242                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
243                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
244                AudioEffect::Distortion(_) => "Distortion".to_string(),
245                AudioEffect::Compressor(_) => "Compressor".to_string(),
246                AudioEffect::Limiter(_) => "Limiter".to_string(),
247            })
248            .collect()
249    }
250
251    /// Replace the active DSP effects chain.
252    pub fn set_effects(&self, effects: Vec<AudioEffect>) {
253        let mut guard = self.effects.lock().unwrap();
254        *guard = effects;
255        self.request_effects_reset();
256    }
257
258    /// Retrieve the latest DSP chain performance metrics.
259    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
260        *self.dsp_metrics.lock().unwrap()
261    }
262
263    /// Retrieve the most recent per-channel peak levels.
264    pub fn get_levels(&self) -> Vec<f32> {
265        self.output_meter.lock().unwrap().levels()
266    }
267
268    /// Retrieve the most recent per-channel average levels.
269    pub fn get_levels_avg(&self) -> Vec<f32> {
270        self.output_meter.lock().unwrap().averages()
271    }
272
273    /// Set the output meter refresh rate (frames per second).
274    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
275        self.output_meter.lock().unwrap().set_refresh_hz(hz);
276    }
277
278    /// Debug helper returning thread alive, state, and audio heard flags.
279    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
280        (
281            self.playback_thread_exists.load(Ordering::SeqCst),
282            *self.state.lock().unwrap(),
283            self.audio_heard.load(Ordering::Relaxed),
284        )
285    }
286
287    /// Debug helper indicating whether buffering has completed.
288    pub fn debug_buffering_done(&self) -> bool {
289        self.buffering_done.load(Ordering::Relaxed)
290    }
291
292    /// Debug helper returning internal timing markers in milliseconds.
293    pub fn debug_timing_ms(&self) -> (u64, u64) {
294        (
295            self.last_chunk_ms.load(Ordering::Relaxed),
296            self.last_time_update_ms.load(Ordering::Relaxed),
297        )
298    }
299
300    /// Debug helper returning sink paused/empty flags and queued length.
301    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
302        let sink = self.sink.lock().unwrap();
303        let paused = sink.is_paused();
304        let empty = sink.empty();
305        let len = sink.len();
306        (paused, empty, len)
307    }
308
309    fn request_effects_reset(&self) {
310        self.effects_reset.fetch_add(1, Ordering::SeqCst);
311    }
312
313    /// Configure the minimum buffered audio (ms) before playback starts.
314    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
315        let mut settings = self.buffer_settings.lock().unwrap();
316        settings.start_buffer_ms = start_buffer_ms.max(0.0);
317    }
318
319    /// Configure heuristic end-of-track threshold for containers (ms).
320    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
321        let mut settings = self.buffer_settings.lock().unwrap();
322        settings.track_eos_ms = track_eos_ms.max(0.0);
323    }
324
325    fn audition(&self, length: Duration) {
326        let audition_source_mutex = self.audition_source.clone();
327
328        // Create new thread to audition
329        thread::spawn(move || {
330            // Wait until audition source is ready
331            while audition_source_mutex.lock().unwrap().is_none() {
332                thread::sleep(Duration::from_millis(10));
333            }
334
335            let audition_source_option = audition_source_mutex.lock().unwrap().take();
336            let audition_source = audition_source_option.unwrap();
337
338            let _audition_stream = OutputStreamBuilder::open_default_stream().unwrap();
339            let audition_sink = Sink::connect_new(_audition_stream.mixer());
340            audition_sink.pause();
341            audition_sink.set_volume(0.8);
342            audition_sink.append(audition_source);
343            audition_sink.play();
344            thread::sleep(length);
345            audition_sink.pause();
346        });
347    }
348
349    fn initialize_thread(&mut self, ts: Option<f64>) {
350        // Empty finished_tracks
351        let mut finished_tracks = self.finished_tracks.lock().unwrap();
352        finished_tracks.clear();
353        drop(finished_tracks);
354
355        // ===== Set play options ===== //
356        self.abort.store(false, Ordering::SeqCst);
357        self.playback_thread_exists.store(true, Ordering::SeqCst);
358        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
359        self.buffering_done.store(false, Ordering::SeqCst);
360        let now_ms_value = now_ms();
361        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
362        self.last_time_update_ms
363            .store(now_ms_value, Ordering::Relaxed);
364
365        // ===== Clone variables ===== //
366        let play_state = self.state.clone();
367        let abort = self.abort.clone();
368        let playback_thread_exists = self.playback_thread_exists.clone();
369        let playback_id_atomic = self.playback_id.clone();
370        let time_passed = self.ts.clone();
371
372        let duration = self.duration.clone();
373        let prot = self.prot.clone();
374        let buffer_settings = self.buffer_settings.clone();
375        let effects = self.effects.clone();
376        let dsp_metrics = self.dsp_metrics.clone();
377        let effects_reset = self.effects_reset.clone();
378        let output_meter = self.output_meter.clone();
379
380        let audio_heard = self.audio_heard.clone();
381        let volume = self.volume.clone();
382        let sink_mutex = self.sink.clone();
383        let audition_source_mutex = self.audition_source.clone();
384        let buffer_done_thread_flag = self.buffering_done.clone();
385        let last_chunk_ms = self.last_chunk_ms.clone();
386        let last_time_update_ms = self.last_time_update_ms.clone();
387
388        audio_heard.store(false, Ordering::Relaxed);
389
390        // clear audition source
391        let mut audition_source = audition_source_mutex.lock().unwrap();
392        *audition_source = None;
393        drop(audition_source);
394
395        {
396            let mut meter = self.output_meter.lock().unwrap();
397            meter.reset();
398        }
399
400        // ===== Start playback ===== //
401        thread::spawn(move || {
402            // ===================== //
403            // Set playback_thread_exists to true
404            // ===================== //
405            playback_thread_exists.store(true, Ordering::Relaxed);
406
407            // ===================== //
408            // Initialize engine & sink
409            // ===================== //
410            let start_time = match ts {
411                Some(ts) => ts,
412                None => 0.0,
413            };
414            let mut engine = PlayerEngine::new(
415                prot,
416                Some(abort.clone()),
417                start_time,
418                buffer_settings,
419                effects,
420                dsp_metrics,
421                effects_reset,
422            );
423            let _stream = OutputStreamBuilder::open_default_stream().unwrap();
424            let mixer = _stream.mixer().clone();
425
426            let mut sink = sink_mutex.lock().unwrap();
427            *sink = Sink::connect_new(&mixer);
428            sink.pause();
429            sink.set_volume(*volume.lock().unwrap());
430            drop(sink);
431
432            // ===================== //
433            // Set duration from engine
434            // ===================== //
435            let mut duration = duration.lock().unwrap();
436            *duration = engine.get_duration();
437            drop(duration);
438
439            // ===================== //
440            // Initialize chunk_lengths & time_passed
441            // ===================== //
442            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
443            let mut time_passed_unlocked = time_passed.lock().unwrap();
444            *time_passed_unlocked = start_time;
445            drop(time_passed_unlocked);
446
447            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
448                let timestamp = *time_passed.lock().unwrap();
449
450                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
451                // Fade out and pause sink
452                while sink.volume() > 0.0 && timestamp != start_time {
453                    sink.set_volume(sink.volume() - fade_increments);
454                    thread::sleep(Duration::from_millis(10));
455                }
456                sink.pause();
457            };
458
459            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
460                let volume = *volume.lock().unwrap();
461                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
462                // Fade in and play sink
463                sink.play();
464                while sink.volume() < volume {
465                    sink.set_volume(sink.volume() + fade_increments);
466                    thread::sleep(Duration::from_millis(5));
467                }
468            };
469
470            // ===================== //
471            // Start sink with fade in
472            // ===================== //
473            // resume_sink(&sink_mutex.lock().unwrap(), 0.1);
474
475            // ===================== //
476            // Check if the player should be paused or not
477            // ===================== //
478            let check_details = || {
479                if abort.load(Ordering::SeqCst) {
480                    let sink = sink_mutex.lock().unwrap();
481                    pause_sink(&sink, 0.1);
482                    sink.clear();
483                    drop(sink);
484
485                    return false;
486                }
487
488                let sink = sink_mutex.lock().unwrap();
489                let state = play_state.lock().unwrap().clone();
490                if state == PlayerState::Pausing {
491                    pause_sink(&sink, 0.1);
492                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
493                }
494                if state == PlayerState::Resuming {
495                    resume_sink(&sink, 0.1);
496                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
497                }
498                drop(sink);
499
500                true
501            };
502
503            // ===================== //
504            // Update chunk_lengths / time_passed
505            // ===================== //
506            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
507            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
508            let buffering_done = Arc::new(AtomicBool::new(false));
509            let buffering_done_flag = buffer_done_thread_flag.clone();
510            let final_duration = Arc::new(Mutex::new(None::<f64>));
511            let mut timer = timer_mut.lock().unwrap();
512            timer.start();
513            drop(timer);
514
515            let last_meter_time = Cell::new(0.0_f64);
516            let update_chunk_lengths = || {
517                if abort.load(Ordering::SeqCst) {
518                    return;
519                }
520
521                let mut chunk_lengths = chunk_lengths.lock().unwrap();
522                let mut time_passed_unlocked = time_passed.lock().unwrap();
523                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
524                let mut timer = timer_mut.lock().unwrap();
525                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
526                let sink = sink_mutex.lock().unwrap();
527                if !buffering_done.load(Ordering::Relaxed) {
528                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
529                    // since the last time this function was called and add that to time_passed.
530                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
531
532                    for _ in 0..chunks_played {
533                        timer.reset();
534                        timer.start();
535                        *time_chunks_passed += chunk_lengths.remove(0);
536                    }
537                }
538
539                if sink.is_paused() {
540                    timer.pause();
541                } else {
542                    timer.un_pause();
543                }
544
545                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
546                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
547                last_meter_time.set(current_audio_time);
548                {
549                    let mut meter = output_meter.lock().unwrap();
550                    meter.advance(delta);
551                }
552
553                *time_passed_unlocked = current_audio_time;
554
555                drop(sink);
556                drop(chunk_lengths);
557                drop(time_passed_unlocked);
558                drop(time_chunks_passed);
559                drop(timer);
560            };
561
562            // ===================== //
563            // Update sink for each chunk received from engine
564            // ===================== //
565            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
566                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
567                    return;
568                }
569                audio_heard.store(true, Ordering::Relaxed);
570                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
571
572                {
573                    let mut meter = output_meter.lock().unwrap();
574                    meter.push_samples(&mixer);
575                }
576
577                let mut audition_source = audition_source_mutex.lock().unwrap();
578                let sink = sink_mutex.lock().unwrap();
579                let mut chunk_lengths = chunk_lengths.lock().unwrap();
580
581                let total_time = chunk_lengths.iter().sum::<f64>();
582
583                // If total_time is less than 0.2 seconds, audition the chunk
584                if audition_source.is_none() {
585                    let (mixer_clone, mixer) = clone_samples_buffer(mixer);
586                    *audition_source = Some(mixer_clone);
587                    drop(audition_source);
588                    sink.append(mixer);
589                } else {
590                    sink.append(mixer);
591                }
592
593                drop(sink);
594
595                chunk_lengths.push(length_in_seconds);
596                drop(chunk_lengths);
597
598                update_chunk_lengths();
599                check_details();
600            };
601
602            let receiver = engine.start_receiver();
603            loop {
604                match receiver.recv_timeout(Duration::from_millis(20)) {
605                    Ok(chunk) => {
606                        update_sink(chunk);
607                    }
608                    Err(RecvTimeoutError::Timeout) => {
609                        update_chunk_lengths();
610                        if !check_details() {
611                            break;
612                        }
613                    }
614                    Err(RecvTimeoutError::Disconnected) => break,
615                }
616            }
617            #[cfg(feature = "debug")]
618            log::info!("engine reception loop finished");
619
620            // From here on, all audio is buffered. Stop relying on sink.len()
621            // to advance time so the UI keeps updating while the last buffer plays.
622            buffering_done.store(true, Ordering::Relaxed);
623            buffering_done_flag.store(true, Ordering::Relaxed);
624            {
625                let mut final_duration = final_duration.lock().unwrap();
626                if final_duration.is_none() {
627                    let chunk_lengths = chunk_lengths.lock().unwrap();
628                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
629                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
630                }
631            }
632
633            // ===================== //
634            // Wait until all tracks are finished playing in sink
635            // ===================== //
636            #[cfg(feature = "debug")]
637            {
638                let sink = sink_mutex.lock().unwrap();
639                let paused = sink.is_paused();
640                let empty = sink.empty();
641                let sink_len = sink.len();
642                drop(sink);
643                let time_passed = *time_passed.lock().unwrap();
644                let final_duration = *final_duration.lock().unwrap();
645                log::info!(
646                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
647                    paused,
648                    empty,
649                    sink_len,
650                    time_passed,
651                    final_duration
652                );
653            }
654
655            loop {
656                update_chunk_lengths();
657                if !check_details() {
658                    break;
659                }
660
661                let done = if engine.finished_buffering() {
662                    if let Some(final_duration) = *final_duration.lock().unwrap() {
663                        let time_passed = *time_passed.lock().unwrap();
664                        time_passed >= (final_duration - 0.001).max(0.0)
665                    } else {
666                        false
667                    }
668                } else {
669                    false
670                };
671                if done {
672                    break;
673                }
674
675                thread::sleep(Duration::from_millis(10));
676            }
677
678            #[cfg(feature = "debug")]
679            log::info!("Finished drain loop!");
680
681            // ===================== //
682            // Set playback_thread_exists to false
683            // ===================== //
684            playback_thread_exists.store(false, Ordering::Relaxed);
685        });
686    }
687
688    /// Start playback from a specific timestamp (seconds).
689    pub fn play_at(&mut self, ts: f64) {
690        let mut timestamp = self.ts.lock().unwrap();
691        *timestamp = ts;
692        drop(timestamp);
693
694        self.request_effects_reset();
695        self.kill_current();
696        // self.stop.store(false, Ordering::SeqCst);
697        self.initialize_thread(Some(ts));
698
699        self.resume();
700
701        self.wait_for_audio_heard(Duration::from_secs(5));
702    }
703
704    /// Start playback from the current timestamp.
705    pub fn play(&mut self) {
706        info!("Playing audio");
707        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
708        // self.stop.store(false, Ordering::SeqCst);
709
710        if !thread_exists {
711            self.initialize_thread(None);
712        }
713
714        self.resume();
715
716        self.wait_for_audio_heard(Duration::from_secs(5));
717    }
718
719    /// Pause playback.
720    pub fn pause(&self) {
721        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
722    }
723
724    /// Resume playback if paused.
725    pub fn resume(&self) {
726        self.state
727            .lock()
728            .unwrap()
729            .clone_from(&PlayerState::Resuming);
730    }
731
732    /// Stop the current playback thread without changing state.
733    pub fn kill_current(&self) {
734        self.state
735            .lock()
736            .unwrap()
737            .clone_from(&PlayerState::Stopping);
738        {
739            let sink = self.sink.lock().unwrap();
740            sink.stop();
741        }
742        self.abort.store(true, Ordering::SeqCst);
743
744        while !self.thread_finished() {
745            thread::sleep(Duration::from_millis(10));
746        }
747
748        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
749    }
750
751    /// Stop playback and reset timing state.
752    pub fn stop(&self) {
753        self.kill_current();
754        self.ts.lock().unwrap().clone_from(&0.0);
755    }
756
757    /// Return true if playback is currently active.
758    pub fn is_playing(&self) -> bool {
759        let state = self.state.lock().unwrap();
760        *state == PlayerState::Playing
761    }
762
763    /// Return true if playback is currently paused.
764    pub fn is_paused(&self) -> bool {
765        let state = self.state.lock().unwrap();
766        *state == PlayerState::Paused
767    }
768
769    /// Get the current playback time in seconds.
770    pub fn get_time(&self) -> f64 {
771        let ts = self.ts.lock().unwrap();
772        *ts
773    }
774
775    fn thread_finished(&self) -> bool {
776        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
777        !playback_thread_exists
778    }
779
780    /// Return true if playback has reached the end.
781    pub fn is_finished(&self) -> bool {
782        self.thread_finished()
783        // let state = self.state.lock().unwrap();
784        // *state == PlayerState::Finished
785    }
786
787    /// Block the current thread until playback finishes.
788    pub fn sleep_until_end(&self) {
789        loop {
790            if self.thread_finished() {
791                break;
792            }
793            thread::sleep(Duration::from_millis(100));
794        }
795    }
796
797    /// Get the total duration (seconds) of the active selection.
798    pub fn get_duration(&self) -> f64 {
799        let duration = self.duration.lock().unwrap();
800        *duration
801    }
802
803    /// Seek to the given timestamp (seconds).
804    pub fn seek(&mut self, ts: f64) {
805        let mut timestamp = self.ts.lock().unwrap();
806        *timestamp = ts;
807        drop(timestamp);
808
809        self.request_effects_reset();
810        let state = self.state.lock().unwrap().clone();
811
812        self.kill_current();
813        self.state.lock().unwrap().clone_from(&state);
814        self.initialize_thread(Some(ts));
815
816        match state {
817            PlayerState::Playing => self.resume(),
818            PlayerState::Paused => {
819                self.audition(Duration::from_millis(100));
820            }
821            _ => {}
822        }
823    }
824
825    /// Refresh active track selections from the underlying container.
826    pub fn refresh_tracks(&mut self) {
827        let mut prot = self.prot.lock().unwrap();
828        prot.refresh_tracks();
829        if let Some(spec) = self.impulse_response_override.clone() {
830            prot.set_impulse_response_spec(spec);
831        }
832        if let Some(tail_db) = self.impulse_response_tail_override {
833            prot.set_impulse_response_tail_db(tail_db);
834        }
835        drop(prot);
836
837        self.request_effects_reset();
838        // If stopped, return
839        if self.thread_finished() {
840            return;
841        }
842
843        // Kill current thread and start
844        // new thread at the current timestamp
845        let ts = self.get_time();
846        self.seek(ts);
847
848        // If previously playing, resume
849        if self.is_playing() {
850            self.resume();
851        }
852
853        self.wait_for_audio_heard(Duration::from_secs(5));
854    }
855
856    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
857        let start = Instant::now();
858        loop {
859            if self.audio_heard.load(Ordering::Relaxed) {
860                return true;
861            }
862            if self.thread_finished() {
863                warn!("playback thread ended before audio was heard");
864                return false;
865            }
866            if start.elapsed() >= timeout {
867                warn!("timed out waiting for audio to start");
868                return false;
869            }
870            thread::sleep(Duration::from_millis(10));
871        }
872    }
873
874    /// Shuffle track selections and restart playback.
875    pub fn shuffle(&mut self) {
876        self.refresh_tracks();
877    }
878
879    /// Set the playback volume (0.0-1.0).
880    pub fn set_volume(&mut self, new_volume: f32) {
881        let sink = self.sink.lock().unwrap();
882        sink.set_volume(new_volume);
883        drop(sink);
884
885        let mut volume = self.volume.lock().unwrap();
886        *volume = new_volume;
887        drop(volume);
888    }
889
890    /// Get the current playback volume.
891    pub fn get_volume(&self) -> f32 {
892        *self.volume.lock().unwrap()
893    }
894
895    /// Get the track identifiers used for display.
896    pub fn get_ids(&self) -> Vec<String> {
897        let prot = self.prot.lock().unwrap();
898
899        return prot.get_ids();
900    }
901
902    /// Enable periodic reporting of playback status for UI consumers.
903    pub fn set_reporting(
904        &mut self,
905        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
906        reporting_interval: Duration,
907    ) {
908        if self.reporter.is_some() {
909            self.reporter.as_ref().unwrap().lock().unwrap().stop();
910        }
911
912        let reporter = Arc::new(Mutex::new(Reporter::new(
913            Arc::new(Mutex::new(self.clone())),
914            reporting,
915            reporting_interval,
916        )));
917
918        reporter.lock().unwrap().start();
919
920        self.reporter = Some(reporter);
921    }
922}
923
924fn now_ms() -> u64 {
925    use std::time::SystemTime;
926    SystemTime::now()
927        .duration_since(SystemTime::UNIX_EPOCH)
928        .map(|d| d.as_millis() as u64)
929        .unwrap_or(0)
930}