Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19    container::info::Info,
20    dsp::effects::AudioEffect,
21    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24/// High-level playback state for the player.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27    Init,
28    Resuming,
29    Playing,
30    Pausing,
31    Paused,
32    Stopping,
33    Stopped,
34    Finished,
35}
36
37/// Snapshot of convolution reverb settings for UI consumers.
38#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40    pub enabled: bool,
41    pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49    exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53    fn new(exists: Arc<AtomicBool>) -> Self {
54        exists.store(true, Ordering::Relaxed);
55        Self { exists }
56    }
57}
58
59impl Drop for PlaybackThreadGuard {
60    fn drop(&mut self) {
61        self.exists.store(false, Ordering::Relaxed);
62    }
63}
64
65/// Primary playback controller.
66///
67/// `Player` owns the playback threads, buffering state, and runtime settings
68/// such as volume and reverb configuration.
69#[derive(Clone)]
70pub struct Player {
71    pub info: Info,
72    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73    pub ts: Arc<Mutex<f64>>,
74    state: Arc<Mutex<PlayerState>>,
75    abort: Arc<AtomicBool>,
76    playback_thread_exists: Arc<AtomicBool>,
77    playback_id: Arc<AtomicU64>,
78    duration: Arc<Mutex<f64>>,
79    prot: Arc<Mutex<Prot>>,
80    audio_heard: Arc<AtomicBool>,
81    volume: Arc<Mutex<f32>>,
82    sink: Arc<Mutex<Sink>>,
83    reporter: Option<Arc<Mutex<Reporter>>>,
84    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85    effects: Arc<Mutex<Vec<AudioEffect>>>,
86    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87    effects_reset: Arc<AtomicU64>,
88    output_meter: Arc<Mutex<OutputMeter>>,
89    buffering_done: Arc<AtomicBool>,
90    last_chunk_ms: Arc<AtomicU64>,
91    last_time_update_ms: Arc<AtomicU64>,
92    impulse_response_override: Option<ImpulseResponseSpec>,
93    impulse_response_tail_override: Option<f32>,
94}
95
96impl Player {
97    /// Create a new player for a single container path.
98    pub fn new(file_path: &String) -> Self {
99        let this = Self::new_from_path_or_paths(Some(file_path), None);
100        this
101    }
102
103    /// Create a new player for a set of standalone file paths.
104    pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
105        let this = Self::new_from_path_or_paths(None, Some(file_paths));
106        this
107    }
108
109    /// Create a new player for a set of standalone file paths.
110    pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
111        let this = Self::new_from_path_or_paths(
112            None,
113            Some(
114                file_paths
115                    .into_iter()
116                    .map(|paths| PathsTrack::new_from_file_paths(paths))
117                    .collect(),
118            ),
119        );
120        this
121    }
122
123    /// Create a player from either a container path or standalone file paths.
124    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
125        let (prot, info) = match path {
126            Some(path) => {
127                let prot = Arc::new(Mutex::new(Prot::new(path)));
128                let info = Info::new(path.clone());
129                (prot, info)
130            }
131            None => {
132                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
133                let locked_prot = prot.lock().unwrap();
134                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
135                drop(locked_prot);
136                (prot, info)
137            }
138        };
139
140        let (sink, _queue) = Sink::new();
141        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
142
143        let channels = info.channels as usize;
144        let sample_rate = info.sample_rate;
145        let effects = {
146            let prot_locked = prot.lock().unwrap();
147            match prot_locked.get_effects() {
148                Some(effects) => Arc::new(Mutex::new(effects)),
149                None => Arc::new(Mutex::new(vec![])),
150            }
151        };
152
153        let mut this = Self {
154            info,
155            finished_tracks: Arc::new(Mutex::new(Vec::new())),
156            state: Arc::new(Mutex::new(PlayerState::Stopped)),
157            abort: Arc::new(AtomicBool::new(false)),
158            ts: Arc::new(Mutex::new(0.0)),
159            playback_thread_exists: Arc::new(AtomicBool::new(true)),
160            playback_id: Arc::new(AtomicU64::new(0)),
161            duration: Arc::new(Mutex::new(0.0)),
162            audio_heard: Arc::new(AtomicBool::new(false)),
163            volume: Arc::new(Mutex::new(0.8)),
164            sink,
165            prot,
166            reporter: None,
167            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
168            effects,
169            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
170            effects_reset: Arc::new(AtomicU64::new(0)),
171            output_meter: Arc::new(Mutex::new(OutputMeter::new(
172                channels,
173                sample_rate,
174                OUTPUT_METER_REFRESH_HZ,
175            ))),
176            buffering_done: Arc::new(AtomicBool::new(false)),
177            last_chunk_ms: Arc::new(AtomicU64::new(0)),
178            last_time_update_ms: Arc::new(AtomicU64::new(0)),
179            impulse_response_override: None,
180            impulse_response_tail_override: None,
181        };
182
183        this.initialize_thread(None);
184
185        this
186    }
187
188    /// Override the impulse response used for convolution reverb.
189    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
190        self.impulse_response_override = Some(spec.clone());
191        let mut prot = self.prot.lock().unwrap();
192        prot.set_impulse_response_spec(spec);
193        self.request_effects_reset();
194    }
195
196    /// Parse and apply an impulse response spec string.
197    pub fn set_impulse_response_from_string(&mut self, value: &str) {
198        if let Some(spec) = parse_impulse_response_string(value) {
199            self.set_impulse_response_spec(spec);
200        }
201    }
202
203    /// Override the impulse response tail trim (dB).
204    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
205        self.impulse_response_tail_override = Some(tail_db);
206        let mut prot = self.prot.lock().unwrap();
207        prot.set_impulse_response_tail_db(tail_db);
208        self.request_effects_reset();
209    }
210
211    /// Enable or disable convolution reverb.
212    pub fn set_reverb_enabled(&self, enabled: bool) {
213        let mut effects = self.effects.lock().unwrap();
214        if let Some(effect) = effects
215            .iter_mut()
216            .find_map(|effect| effect.as_convolution_reverb_mut())
217        {
218            effect.enabled = enabled;
219        }
220        if let Some(effect) = effects
221            .iter_mut()
222            .find_map(|effect| effect.as_delay_reverb_mut())
223        {
224            effect.enabled = enabled;
225        }
226    }
227
228    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
229    pub fn set_reverb_mix(&self, dry_wet: f32) {
230        let mut effects = self.effects.lock().unwrap();
231        if let Some(effect) = effects
232            .iter_mut()
233            .find_map(|effect| effect.as_convolution_reverb_mut())
234        {
235            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
236        }
237        if let Some(effect) = effects
238            .iter_mut()
239            .find_map(|effect| effect.as_delay_reverb_mut())
240        {
241            effect.mix = dry_wet.clamp(0.0, 1.0);
242        }
243        if let Some(effect) = effects
244            .iter_mut()
245            .find_map(|effect| effect.as_diffusion_reverb_mut())
246        {
247            effect.mix = dry_wet.clamp(0.0, 1.0);
248        }
249    }
250
251    /// Retrieve the current reverb settings snapshot.
252    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
253        let effects = self.effects.lock().unwrap();
254        if let Some(effect) = effects
255            .iter()
256            .find_map(|effect| effect.as_convolution_reverb())
257        {
258            return ReverbSettingsSnapshot {
259                enabled: effect.enabled,
260                dry_wet: effect.dry_wet,
261            };
262        }
263        if let Some(effect) = effects
264            .iter()
265            .find_map(|effect| effect.as_diffusion_reverb())
266        {
267            return ReverbSettingsSnapshot {
268                enabled: effect.enabled,
269                dry_wet: effect.mix,
270            };
271        }
272        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
273            return ReverbSettingsSnapshot {
274                enabled: effect.enabled,
275                dry_wet: effect.mix,
276            };
277        }
278        ReverbSettingsSnapshot {
279            enabled: false,
280            dry_wet: 0.0,
281        }
282    }
283
284    /// Snapshot the active effect chain names.
285    #[allow(deprecated)]
286    pub fn get_effect_names(&self) -> Vec<String> {
287        let effects = self.effects.lock().unwrap();
288        effects
289            .iter()
290            .map(|effect| match effect {
291                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
292                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
293                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
294                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
295                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
296                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
297                AudioEffect::Distortion(_) => "Distortion".to_string(),
298                AudioEffect::Gain(_) => "Gain".to_string(),
299                AudioEffect::Compressor(_) => "Compressor".to_string(),
300                AudioEffect::Limiter(_) => "Limiter".to_string(),
301            })
302            .collect()
303    }
304
305    /// Replace the active DSP effects chain.
306    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
307        {
308            let mut guard = self.effects.lock().unwrap();
309            println!("New Effects: {:?}", effects);
310            *guard = effects;
311        }
312        self.request_effects_reset();
313
314        // Seeking to the current time stamp refreshes the
315        // Sink so that the new effects are applied immediately.
316        if !self.thread_finished() {
317            let ts = self.get_time();
318            self.seek(ts);
319        }
320    }
321
322    /// Retrieve the latest DSP chain performance metrics.
323    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
324        *self.dsp_metrics.lock().unwrap()
325    }
326
327    /// Retrieve the most recent per-channel peak levels.
328    pub fn get_levels(&self) -> Vec<f32> {
329        self.output_meter.lock().unwrap().levels()
330    }
331
332    /// Retrieve the most recent per-channel peak levels in dBFS.
333    pub fn get_levels_db(&self) -> Vec<f32> {
334        self.output_meter
335            .lock()
336            .unwrap()
337            .levels()
338            .into_iter()
339            .map(linear_to_dbfs)
340            .collect()
341    }
342
343    /// Retrieve the most recent per-channel average levels.
344    pub fn get_levels_avg(&self) -> Vec<f32> {
345        self.output_meter.lock().unwrap().averages()
346    }
347
348    /// Set the output meter refresh rate (frames per second).
349    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
350        self.output_meter.lock().unwrap().set_refresh_hz(hz);
351    }
352
353    /// Debug helper returning thread alive, state, and audio heard flags.
354    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
355        (
356            self.playback_thread_exists.load(Ordering::SeqCst),
357            *self.state.lock().unwrap(),
358            self.audio_heard.load(Ordering::Relaxed),
359        )
360    }
361
362    /// Debug helper indicating whether buffering has completed.
363    pub fn debug_buffering_done(&self) -> bool {
364        self.buffering_done.load(Ordering::Relaxed)
365    }
366
367    /// Debug helper returning internal timing markers in milliseconds.
368    pub fn debug_timing_ms(&self) -> (u64, u64) {
369        (
370            self.last_chunk_ms.load(Ordering::Relaxed),
371            self.last_time_update_ms.load(Ordering::Relaxed),
372        )
373    }
374
375    /// Debug helper returning sink paused/empty flags and queued length.
376    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
377        let sink = self.sink.lock().unwrap();
378        let paused = sink.is_paused();
379        let empty = sink.empty();
380        let len = sink.len();
381        (paused, empty, len)
382    }
383
384    fn request_effects_reset(&self) {
385        self.effects_reset.fetch_add(1, Ordering::SeqCst);
386    }
387
388    /// Configure the minimum buffered audio (ms) before playback starts.
389    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
390        let mut settings = self.buffer_settings.lock().unwrap();
391        settings.start_buffer_ms = start_buffer_ms.max(0.0);
392    }
393
394    /// Configure heuristic end-of-track threshold for containers (ms).
395    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
396        let mut settings = self.buffer_settings.lock().unwrap();
397        settings.track_eos_ms = track_eos_ms.max(0.0);
398    }
399
400    /// Configure minimum sink chunks queued before playback starts/resumes.
401    pub fn set_start_sink_chunks(&self, chunks: usize) {
402        let mut settings = self.buffer_settings.lock().unwrap();
403        settings.start_sink_chunks = chunks;
404    }
405
406    /// Configure the startup silence pre-roll (ms).
407    pub fn set_startup_silence_ms(&self, ms: f32) {
408        let mut settings = self.buffer_settings.lock().unwrap();
409        settings.startup_silence_ms = ms.max(0.0);
410    }
411
412    /// Configure the startup fade-in length (ms).
413    pub fn set_startup_fade_ms(&self, ms: f32) {
414        let mut settings = self.buffer_settings.lock().unwrap();
415        settings.startup_fade_ms = ms.max(0.0);
416    }
417
418    /// Configure the append jitter logging threshold (ms). 0 disables logging.
419    pub fn set_append_jitter_log_ms(&self, ms: f32) {
420        let mut settings = self.buffer_settings.lock().unwrap();
421        settings.append_jitter_log_ms = ms.max(0.0);
422    }
423
424    /// Enable or disable per-effect boundary discontinuity logging.
425    pub fn set_effect_boundary_log(&self, enabled: bool) {
426        let mut settings = self.buffer_settings.lock().unwrap();
427        settings.effect_boundary_log = enabled;
428    }
429
430    fn initialize_thread(&mut self, ts: Option<f64>) {
431        // Empty finished_tracks
432        let mut finished_tracks = self.finished_tracks.lock().unwrap();
433        finished_tracks.clear();
434        drop(finished_tracks);
435
436        // ===== Set play options ===== //
437        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
438        self.abort = Arc::new(AtomicBool::new(false));
439        self.playback_thread_exists.store(true, Ordering::SeqCst);
440        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
441        self.buffering_done.store(false, Ordering::SeqCst);
442        let now_ms_value = now_ms();
443        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
444        self.last_time_update_ms
445            .store(now_ms_value, Ordering::Relaxed);
446
447        // ===== Clone variables ===== //
448        let play_state = self.state.clone();
449        let abort = self.abort.clone();
450        let playback_thread_exists = self.playback_thread_exists.clone();
451        let playback_id_atomic = self.playback_id.clone();
452        let time_passed = self.ts.clone();
453
454        let duration = self.duration.clone();
455        let prot = self.prot.clone();
456        let buffer_settings = self.buffer_settings.clone();
457        let buffer_settings_for_state = self.buffer_settings.clone();
458        let effects = self.effects.clone();
459        let dsp_metrics = self.dsp_metrics.clone();
460        let dsp_metrics_for_sink = self.dsp_metrics.clone();
461        let effects_reset = self.effects_reset.clone();
462        let output_meter = self.output_meter.clone();
463        let audio_info = self.info.clone();
464
465        let audio_heard = self.audio_heard.clone();
466        let volume = self.volume.clone();
467        let sink_mutex = self.sink.clone();
468        let buffer_done_thread_flag = self.buffering_done.clone();
469        let last_chunk_ms = self.last_chunk_ms.clone();
470        let last_time_update_ms = self.last_time_update_ms.clone();
471
472        audio_heard.store(false, Ordering::Relaxed);
473
474        {
475            let mut meter = self.output_meter.lock().unwrap();
476            meter.reset();
477        }
478
479        // ===== Start playback ===== //
480        thread::spawn(move || {
481            // ===================== //
482            // Set playback_thread_exists to true
483            // ===================== //
484            let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
485
486            // ===================== //
487            // Initialize engine & sink
488            // ===================== //
489            let start_time = match ts {
490                Some(ts) => ts,
491                None => 0.0,
492            };
493            let mut engine = PlayerEngine::new(
494                prot,
495                Some(abort.clone()),
496                start_time,
497                buffer_settings,
498                effects,
499                dsp_metrics,
500                effects_reset,
501            );
502            let mut stream = None;
503            for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
504                match OutputStreamBuilder::open_default_stream() {
505                    Ok(s) => {
506                        stream = Some(s);
507                        break;
508                    }
509                    Err(err) => {
510                        if attempt == OUTPUT_STREAM_OPEN_RETRIES {
511                            error!(
512                                "failed to open default output stream after {} attempts: {}",
513                                OUTPUT_STREAM_OPEN_RETRIES, err
514                            );
515                            return;
516                        }
517                        warn!(
518                            "open_default_stream attempt {}/{} failed: {}",
519                            attempt, OUTPUT_STREAM_OPEN_RETRIES, err
520                        );
521                        thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
522                    }
523                }
524            }
525            let stream = stream.expect("stream should exist after successful retry loop");
526            let mixer = stream.mixer().clone();
527
528            let mut sink = sink_mutex.lock().unwrap();
529            *sink = Sink::connect_new(&mixer);
530            sink.pause();
531            sink.set_volume(*volume.lock().unwrap());
532            drop(sink);
533
534            // ===================== //
535            // Set duration from engine
536            // ===================== //
537            let mut duration = duration.lock().unwrap();
538            *duration = engine.get_duration();
539            drop(duration);
540
541            // ===================== //
542            // Initialize chunk_lengths & time_passed
543            // ===================== //
544            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
545            let mut time_passed_unlocked = time_passed.lock().unwrap();
546            *time_passed_unlocked = start_time;
547            drop(time_passed_unlocked);
548
549            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
550                let timestamp = *time_passed.lock().unwrap();
551
552                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
553                // Fade out and pause sink
554                while sink.volume() > 0.0 && timestamp != start_time {
555                    sink.set_volume(sink.volume() - fade_increments);
556                    thread::sleep(Duration::from_millis(10));
557                }
558                sink.pause();
559            };
560
561            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
562                let volume = *volume.lock().unwrap();
563                if fade_length_in_seconds <= 0.0 {
564                    sink.play();
565                    sink.set_volume(volume);
566                    return;
567                }
568                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
569                // Fade in and play sink
570                sink.play();
571                while sink.volume() < volume {
572                    sink.set_volume(sink.volume() + fade_increments);
573                    thread::sleep(Duration::from_millis(5));
574                }
575            };
576
577            // ===================== //
578            // Start sink with startup silence + fade in
579            // ===================== //
580            {
581                let startup_settings = buffer_settings_for_state.lock().unwrap();
582                let startup_silence_ms = startup_settings.startup_silence_ms;
583                drop(startup_settings);
584
585                let sample_rate = audio_info.sample_rate as u32;
586                let channels = audio_info.channels as u16;
587
588                if startup_silence_ms > 0.0 {
589                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
590                        as usize
591                        * channels as usize;
592                    let silence = vec![0.0_f32; samples.max(1)];
593                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
594                    let sink = sink_mutex.lock().unwrap();
595                    sink.append(silence_buffer);
596                    drop(sink);
597                }
598            }
599
600            // ===================== //
601            // Check if the player should be paused or not
602            // ===================== //
603            let startup_fade_pending = Cell::new(true);
604            let check_details = || {
605                if abort.load(Ordering::SeqCst) {
606                    let sink = sink_mutex.lock().unwrap();
607                    pause_sink(&sink, 0.1);
608                    sink.clear();
609                    drop(sink);
610
611                    return false;
612                }
613
614                let sink = sink_mutex.lock().unwrap();
615                let state = play_state.lock().unwrap().clone();
616                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
617                if state == PlayerState::Resuming
618                    && start_sink_chunks > 0
619                    && sink.len() < start_sink_chunks
620                {
621                    // Keep paused until enough chunks are queued.
622                    sink.pause();
623                    drop(sink);
624                    return true;
625                }
626                if state == PlayerState::Pausing {
627                    pause_sink(&sink, 0.1);
628                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
629                }
630                if state == PlayerState::Resuming {
631                    let fade_length = if startup_fade_pending.replace(false) {
632                        let startup_fade_ms =
633                            buffer_settings_for_state.lock().unwrap().startup_fade_ms;
634                        (startup_fade_ms / 1000.0).max(0.0)
635                    } else {
636                        0.1
637                    };
638                    resume_sink(&sink, fade_length);
639                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
640                }
641                drop(sink);
642
643                true
644            };
645
646            // ===================== //
647            // Update chunk_lengths / time_passed
648            // ===================== //
649            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
650            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
651            let buffering_done = Arc::new(AtomicBool::new(false));
652            let buffering_done_flag = buffer_done_thread_flag.clone();
653            let final_duration = Arc::new(Mutex::new(None::<f64>));
654            let mut timer = timer_mut.lock().unwrap();
655            timer.start();
656            drop(timer);
657
658            let last_meter_time = Cell::new(0.0_f64);
659            let update_chunk_lengths = || {
660                if abort.load(Ordering::SeqCst) {
661                    return;
662                }
663
664                let mut chunk_lengths = chunk_lengths.lock().unwrap();
665                let mut time_passed_unlocked = time_passed.lock().unwrap();
666                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
667                let mut timer = timer_mut.lock().unwrap();
668                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
669                let sink = sink_mutex.lock().unwrap();
670                if !buffering_done.load(Ordering::Relaxed) {
671                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
672                    // since the last time this function was called and add that to time_passed.
673                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
674
675                    for _ in 0..chunks_played {
676                        timer.reset();
677                        timer.start();
678                        *time_chunks_passed += chunk_lengths.remove(0);
679                    }
680                }
681
682                if sink.is_paused() {
683                    timer.pause();
684                } else {
685                    timer.un_pause();
686                }
687
688                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
689                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
690                last_meter_time.set(current_audio_time);
691                {
692                    let mut meter = output_meter.lock().unwrap();
693                    meter.advance(delta);
694                }
695
696                *time_passed_unlocked = current_audio_time;
697
698                drop(sink);
699                drop(chunk_lengths);
700                drop(time_passed_unlocked);
701                drop(time_chunks_passed);
702                drop(timer);
703            };
704
705            // ===================== //
706            // Update sink for each chunk received from engine
707            // ===================== //
708            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
709            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
710                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
711                    return;
712                }
713                let (delay_ms, late) = {
714                    let mut timing = append_timing.lock().unwrap();
715                    let now = Instant::now();
716                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
717                    let chunk_ms = length_in_seconds * 1000.0;
718                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
719                    if late {
720                        timing.2 = timing.2.saturating_add(1);
721                    }
722                    timing.1 = if timing.1 == 0.0 {
723                        delta_ms
724                    } else {
725                        (timing.1 * 0.9) + (delta_ms * 0.1)
726                    };
727                    timing.3 = timing.3.max(delta_ms);
728                    timing.0 = now;
729                    (delta_ms, late)
730                };
731                audio_heard.store(true, Ordering::Relaxed);
732                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
733
734                {
735                    let mut meter = output_meter.lock().unwrap();
736                    meter.push_samples(&mixer);
737                }
738                {
739                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
740                    metrics.append_delay_ms = delay_ms;
741                    metrics.avg_append_delay_ms = {
742                        if metrics.avg_append_delay_ms == 0.0 {
743                            delay_ms
744                        } else {
745                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
746                        }
747                    };
748                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
749                    metrics.late_append_count = {
750                        let timing = append_timing.lock().unwrap();
751                        timing.2
752                    };
753                    metrics.late_append_active = late;
754                }
755
756                let sink = sink_mutex.lock().unwrap();
757                let append_jitter_log_ms = {
758                    let settings = buffer_settings_for_state.lock().unwrap();
759                    settings.append_jitter_log_ms
760                };
761                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
762                    let expected_ms = length_in_seconds * 1000.0;
763                    log::info!(
764                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
765                        delay_ms,
766                        expected_ms,
767                        late,
768                        append_jitter_log_ms,
769                        sink.len()
770                    );
771                }
772                let mut chunk_lengths = chunk_lengths.lock().unwrap();
773
774                sink.append(mixer);
775
776                drop(sink);
777
778                chunk_lengths.push(length_in_seconds);
779                drop(chunk_lengths);
780
781                update_chunk_lengths();
782                check_details();
783            };
784
785            let receiver = engine.start_receiver();
786            loop {
787                match receiver.recv_timeout(Duration::from_millis(20)) {
788                    Ok(chunk) => {
789                        update_sink(chunk);
790                    }
791                    Err(RecvTimeoutError::Timeout) => {
792                        update_chunk_lengths();
793                        if !check_details() {
794                            break;
795                        }
796                    }
797                    Err(RecvTimeoutError::Disconnected) => break,
798                }
799            }
800            #[cfg(feature = "debug")]
801            log::info!("engine reception loop finished");
802
803            // From here on, all audio is buffered. Stop relying on sink.len()
804            // to advance time so the UI keeps updating while the last buffer plays.
805            buffering_done.store(true, Ordering::Relaxed);
806            buffering_done_flag.store(true, Ordering::Relaxed);
807            {
808                let mut final_duration = final_duration.lock().unwrap();
809                if final_duration.is_none() {
810                    let chunk_lengths = chunk_lengths.lock().unwrap();
811                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
812                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
813                }
814            }
815
816            // ===================== //
817            // Wait until all tracks are finished playing in sink
818            // ===================== //
819            #[cfg(feature = "debug")]
820            {
821                let sink = sink_mutex.lock().unwrap();
822                let paused = sink.is_paused();
823                let empty = sink.empty();
824                let sink_len = sink.len();
825                drop(sink);
826                let time_passed = *time_passed.lock().unwrap();
827                let final_duration = *final_duration.lock().unwrap();
828                log::info!(
829                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
830                    paused,
831                    empty,
832                    sink_len,
833                    time_passed,
834                    final_duration
835                );
836            }
837
838            loop {
839                update_chunk_lengths();
840                if !check_details() {
841                    break;
842                }
843
844                let done = if engine.finished_buffering() {
845                    if let Some(final_duration) = *final_duration.lock().unwrap() {
846                        let time_passed = *time_passed.lock().unwrap();
847                        time_passed >= (final_duration - 0.001).max(0.0)
848                    } else {
849                        false
850                    }
851                } else {
852                    false
853                };
854                if done {
855                    break;
856                }
857
858                thread::sleep(Duration::from_millis(10));
859            }
860
861            #[cfg(feature = "debug")]
862            log::info!("Finished drain loop!");
863
864            // ===================== //
865            // Set playback_thread_exists to false
866            // ===================== //
867            drop(_thread_guard);
868        });
869    }
870
871    /// Start playback from a specific timestamp (seconds).
872    pub fn play_at(&mut self, ts: f64) {
873        let mut timestamp = self.ts.lock().unwrap();
874        *timestamp = ts;
875        drop(timestamp);
876
877        self.request_effects_reset();
878        self.kill_current();
879        // self.stop.store(false, Ordering::SeqCst);
880        self.initialize_thread(Some(ts));
881
882        self.resume();
883
884        self.wait_for_audio_heard(Duration::from_secs(5));
885    }
886
887    /// Start playback from the current timestamp.
888    pub fn play(&mut self) {
889        info!("Playing audio");
890        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
891        // self.stop.store(false, Ordering::SeqCst);
892
893        if !thread_exists {
894            self.initialize_thread(None);
895        }
896
897        self.resume();
898
899        self.wait_for_audio_heard(Duration::from_secs(5));
900    }
901
902    /// Pause playback.
903    pub fn pause(&self) {
904        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
905    }
906
907    /// Resume playback if paused.
908    pub fn resume(&self) {
909        self.state
910            .lock()
911            .unwrap()
912            .clone_from(&PlayerState::Resuming);
913    }
914
915    /// Stop the current playback thread without changing state.
916    pub fn kill_current(&self) {
917        self.state
918            .lock()
919            .unwrap()
920            .clone_from(&PlayerState::Stopping);
921        {
922            let sink = self.sink.lock().unwrap();
923            sink.stop();
924        }
925        self.abort.store(true, Ordering::SeqCst);
926
927        while !self.thread_finished() {
928            thread::sleep(Duration::from_millis(10));
929        }
930
931        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
932    }
933
934    /// Stop playback and reset timing state.
935    pub fn stop(&self) {
936        self.kill_current();
937        self.ts.lock().unwrap().clone_from(&0.0);
938    }
939
940    /// Return true if playback is currently active.
941    pub fn is_playing(&self) -> bool {
942        let state = self.state.lock().unwrap();
943        *state == PlayerState::Playing
944    }
945
946    /// Return true if playback is currently paused.
947    pub fn is_paused(&self) -> bool {
948        let state = self.state.lock().unwrap();
949        *state == PlayerState::Paused
950    }
951
952    /// Get the current playback time in seconds.
953    pub fn get_time(&self) -> f64 {
954        let ts = self.ts.lock().unwrap();
955        *ts
956    }
957
958    fn thread_finished(&self) -> bool {
959        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
960        !playback_thread_exists
961    }
962
963    /// Return true if playback has reached the end.
964    pub fn is_finished(&self) -> bool {
965        self.thread_finished()
966        // let state = self.state.lock().unwrap();
967        // *state == PlayerState::Finished
968    }
969
970    /// Block the current thread until playback finishes.
971    pub fn sleep_until_end(&self) {
972        loop {
973            if self.thread_finished() {
974                break;
975            }
976            thread::sleep(Duration::from_millis(100));
977        }
978    }
979
980    /// Get the total duration (seconds) of the active selection.
981    pub fn get_duration(&self) -> f64 {
982        let duration = self.duration.lock().unwrap();
983        *duration
984    }
985
986    /// Seek to the given timestamp (seconds).
987    pub fn seek(&mut self, ts: f64) {
988        let mut timestamp = self.ts.lock().unwrap();
989        *timestamp = ts;
990        drop(timestamp);
991
992        self.request_effects_reset();
993        let state = self.state.lock().unwrap().clone();
994
995        self.kill_current();
996        self.state.lock().unwrap().clone_from(&state);
997        self.initialize_thread(Some(ts));
998
999        if state == PlayerState::Playing {
1000            self.resume();
1001        }
1002    }
1003
1004    /// Refresh active track selections from the underlying container.
1005    pub fn refresh_tracks(&mut self) {
1006        let mut prot = self.prot.lock().unwrap();
1007        prot.refresh_tracks();
1008        if let Some(spec) = self.impulse_response_override.clone() {
1009            prot.set_impulse_response_spec(spec);
1010        }
1011        if let Some(tail_db) = self.impulse_response_tail_override {
1012            prot.set_impulse_response_tail_db(tail_db);
1013        }
1014        drop(prot);
1015
1016        self.request_effects_reset();
1017        // If stopped, return
1018        if self.thread_finished() {
1019            return;
1020        }
1021
1022        // Kill current thread and start
1023        // new thread at the current timestamp
1024        let ts = self.get_time();
1025        self.seek(ts);
1026
1027        // If previously playing, resume
1028        if self.is_playing() {
1029            self.resume();
1030        }
1031
1032        self.wait_for_audio_heard(Duration::from_secs(5));
1033    }
1034
1035    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1036        let start = Instant::now();
1037        loop {
1038            if self.audio_heard.load(Ordering::Relaxed) {
1039                return true;
1040            }
1041            if self.thread_finished() {
1042                warn!("playback thread ended before audio was heard");
1043                return false;
1044            }
1045            if start.elapsed() >= timeout {
1046                warn!("timed out waiting for audio to start");
1047                return false;
1048            }
1049            thread::sleep(Duration::from_millis(10));
1050        }
1051    }
1052
1053    /// Shuffle track selections and restart playback.
1054    pub fn shuffle(&mut self) {
1055        self.refresh_tracks();
1056    }
1057
1058    /// Set the playback volume (0.0-1.0).
1059    pub fn set_volume(&mut self, new_volume: f32) {
1060        let sink = self.sink.lock().unwrap();
1061        sink.set_volume(new_volume);
1062        drop(sink);
1063
1064        let mut volume = self.volume.lock().unwrap();
1065        *volume = new_volume;
1066        drop(volume);
1067    }
1068
1069    /// Get the current playback volume.
1070    pub fn get_volume(&self) -> f32 {
1071        *self.volume.lock().unwrap()
1072    }
1073
1074    /// Get the track identifiers used for display.
1075    pub fn get_ids(&self) -> Vec<String> {
1076        let prot = self.prot.lock().unwrap();
1077
1078        return prot.get_ids();
1079    }
1080
1081    /// Enable periodic reporting of playback status for UI consumers.
1082    pub fn set_reporting(
1083        &mut self,
1084        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1085        reporting_interval: Duration,
1086    ) {
1087        if self.reporter.is_some() {
1088            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1089        }
1090
1091        let reporter = Arc::new(Mutex::new(Reporter::new(
1092            Arc::new(Mutex::new(self.clone())),
1093            reporting,
1094            reporting_interval,
1095        )));
1096
1097        reporter.lock().unwrap().start();
1098
1099        self.reporter = Some(reporter);
1100    }
1101}
1102
1103fn linear_to_dbfs(value: f32) -> f32 {
1104    if value <= 0.0 {
1105        f32::NEG_INFINITY
1106    } else {
1107        20.0 * value.log10()
1108    }
1109}
1110
1111fn now_ms() -> u64 {
1112    use std::time::SystemTime;
1113    SystemTime::now()
1114        .duration_since(SystemTime::UNIX_EPOCH)
1115        .map(|d| d.as_millis() as u64)
1116        .unwrap_or(0)
1117}