Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19    container::info::Info,
20    dsp::effects::AudioEffect,
21    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24/// High-level playback state for the player.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27    Init,
28    Resuming,
29    Playing,
30    Pausing,
31    Paused,
32    Stopping,
33    Stopped,
34    Finished,
35}
36
37/// Snapshot of convolution reverb settings for UI consumers.
38#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40    pub enabled: bool,
41    pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49    exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53    fn new(exists: Arc<AtomicBool>) -> Self {
54        exists.store(true, Ordering::Relaxed);
55        Self { exists }
56    }
57}
58
59impl Drop for PlaybackThreadGuard {
60    fn drop(&mut self) {
61        self.exists.store(false, Ordering::Relaxed);
62    }
63}
64
65/// Primary playback controller.
66///
67/// `Player` owns the playback threads, buffering state, and runtime settings
68/// such as volume and reverb configuration.
69#[derive(Clone)]
70pub struct Player {
71    pub info: Info,
72    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73    pub ts: Arc<Mutex<f64>>,
74    state: Arc<Mutex<PlayerState>>,
75    abort: Arc<AtomicBool>,
76    playback_thread_exists: Arc<AtomicBool>,
77    playback_id: Arc<AtomicU64>,
78    duration: Arc<Mutex<f64>>,
79    prot: Arc<Mutex<Prot>>,
80    audio_heard: Arc<AtomicBool>,
81    volume: Arc<Mutex<f32>>,
82    sink: Arc<Mutex<Sink>>,
83    reporter: Option<Arc<Mutex<Reporter>>>,
84    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85    effects: Arc<Mutex<Vec<AudioEffect>>>,
86    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87    effects_reset: Arc<AtomicU64>,
88    output_meter: Arc<Mutex<OutputMeter>>,
89    buffering_done: Arc<AtomicBool>,
90    last_chunk_ms: Arc<AtomicU64>,
91    last_time_update_ms: Arc<AtomicU64>,
92    impulse_response_override: Option<ImpulseResponseSpec>,
93    impulse_response_tail_override: Option<f32>,
94}
95
96impl Player {
97    /// Create a new player for a single container path.
98    pub fn new(file_path: &String) -> Self {
99        let this = Self::new_from_path_or_paths(Some(file_path), None);
100        this
101    }
102
103    /// Create a new player for a set of standalone file paths.
104    pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
105        let this = Self::new_from_path_or_paths(None, Some(file_paths));
106        this
107    }
108
109    /// Create a new player for a set of standalone file paths.
110    pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
111        let this = Self::new_from_path_or_paths(
112            None,
113            Some(
114                file_paths
115                    .into_iter()
116                    .map(|paths| PathsTrack::new_from_file_paths(paths))
117                    .collect(),
118            ),
119        );
120        this
121    }
122
123    /// Create a player from either a container path or standalone file paths.
124    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
125        let (prot, info) = match path {
126            Some(path) => {
127                let prot = Arc::new(Mutex::new(Prot::new(path)));
128                let info = Info::new(path.clone());
129                (prot, info)
130            }
131            None => {
132                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
133                let locked_prot = prot.lock().unwrap();
134                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
135                drop(locked_prot);
136                (prot, info)
137            }
138        };
139
140        let (sink, _queue) = Sink::new();
141        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
142
143        let channels = info.channels as usize;
144        let sample_rate = info.sample_rate;
145        let effects = {
146            let prot_locked = prot.lock().unwrap();
147            match prot_locked.get_effects() {
148                Some(effects) => Arc::new(Mutex::new(effects)),
149                None => Arc::new(Mutex::new(vec![])),
150            }
151        };
152
153        let mut this = Self {
154            info,
155            finished_tracks: Arc::new(Mutex::new(Vec::new())),
156            state: Arc::new(Mutex::new(PlayerState::Stopped)),
157            abort: Arc::new(AtomicBool::new(false)),
158            ts: Arc::new(Mutex::new(0.0)),
159            playback_thread_exists: Arc::new(AtomicBool::new(true)),
160            playback_id: Arc::new(AtomicU64::new(0)),
161            duration: Arc::new(Mutex::new(0.0)),
162            audio_heard: Arc::new(AtomicBool::new(false)),
163            volume: Arc::new(Mutex::new(0.8)),
164            sink,
165            prot,
166            reporter: None,
167            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
168            effects,
169            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
170            effects_reset: Arc::new(AtomicU64::new(0)),
171            output_meter: Arc::new(Mutex::new(OutputMeter::new(
172                channels,
173                sample_rate,
174                OUTPUT_METER_REFRESH_HZ,
175            ))),
176            buffering_done: Arc::new(AtomicBool::new(false)),
177            last_chunk_ms: Arc::new(AtomicU64::new(0)),
178            last_time_update_ms: Arc::new(AtomicU64::new(0)),
179            impulse_response_override: None,
180            impulse_response_tail_override: None,
181        };
182
183        this.initialize_thread(None);
184
185        this
186    }
187
188    /// Override the impulse response used for convolution reverb.
189    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
190        self.impulse_response_override = Some(spec.clone());
191        let mut prot = self.prot.lock().unwrap();
192        prot.set_impulse_response_spec(spec);
193        self.request_effects_reset();
194    }
195
196    /// Parse and apply an impulse response spec string.
197    pub fn set_impulse_response_from_string(&mut self, value: &str) {
198        if let Some(spec) = parse_impulse_response_string(value) {
199            self.set_impulse_response_spec(spec);
200        }
201    }
202
203    /// Override the impulse response tail trim (dB).
204    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
205        self.impulse_response_tail_override = Some(tail_db);
206        let mut prot = self.prot.lock().unwrap();
207        prot.set_impulse_response_tail_db(tail_db);
208        self.request_effects_reset();
209    }
210
211    /// Enable or disable convolution reverb.
212    pub fn set_reverb_enabled(&self, enabled: bool) {
213        let mut effects = self.effects.lock().unwrap();
214        if let Some(effect) = effects
215            .iter_mut()
216            .find_map(|effect| effect.as_convolution_reverb_mut())
217        {
218            effect.enabled = enabled;
219        }
220        if let Some(effect) = effects
221            .iter_mut()
222            .find_map(|effect| effect.as_delay_reverb_mut())
223        {
224            effect.enabled = enabled;
225        }
226    }
227
228    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
229    pub fn set_reverb_mix(&self, dry_wet: f32) {
230        let mut effects = self.effects.lock().unwrap();
231        if let Some(effect) = effects
232            .iter_mut()
233            .find_map(|effect| effect.as_convolution_reverb_mut())
234        {
235            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
236        }
237        if let Some(effect) = effects
238            .iter_mut()
239            .find_map(|effect| effect.as_delay_reverb_mut())
240        {
241            effect.mix = dry_wet.clamp(0.0, 1.0);
242        }
243        if let Some(effect) = effects
244            .iter_mut()
245            .find_map(|effect| effect.as_diffusion_reverb_mut())
246        {
247            effect.mix = dry_wet.clamp(0.0, 1.0);
248        }
249    }
250
251    /// Retrieve the current reverb settings snapshot.
252    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
253        let effects = self.effects.lock().unwrap();
254        if let Some(effect) = effects
255            .iter()
256            .find_map(|effect| effect.as_convolution_reverb())
257        {
258            return ReverbSettingsSnapshot {
259                enabled: effect.enabled,
260                dry_wet: effect.dry_wet,
261            };
262        }
263        if let Some(effect) = effects
264            .iter()
265            .find_map(|effect| effect.as_diffusion_reverb())
266        {
267            return ReverbSettingsSnapshot {
268                enabled: effect.enabled,
269                dry_wet: effect.mix,
270            };
271        }
272        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
273            return ReverbSettingsSnapshot {
274                enabled: effect.enabled,
275                dry_wet: effect.mix,
276            };
277        }
278        ReverbSettingsSnapshot {
279            enabled: false,
280            dry_wet: 0.0,
281        }
282    }
283
284    /// Snapshot the active effect chain names.
285    #[allow(deprecated)]
286    pub fn get_effect_names(&self) -> Vec<String> {
287        let effects = self.effects.lock().unwrap();
288        effects
289            .iter()
290            .map(|effect| match effect {
291                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
292                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
293                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
294                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
295                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
296                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
297                AudioEffect::Distortion(_) => "Distortion".to_string(),
298                AudioEffect::Gain(_) => "Gain".to_string(),
299                AudioEffect::Compressor(_) => "Compressor".to_string(),
300                AudioEffect::Limiter(_) => "Limiter".to_string(),
301            })
302            .collect()
303    }
304
305    /// Replace the active DSP effects chain.
306    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
307        {
308            let mut guard = self.effects.lock().unwrap();
309            println!("New Effects: {:?}", effects);
310            *guard = effects;
311        }
312        self.request_effects_reset();
313
314        // Seeking to the current time stamp refreshes the
315        // Sink so that the new effects are applied immediately.
316        if !self.thread_finished() {
317            let ts = self.get_time();
318            self.seek(ts);
319        }
320    }
321
322    /// Retrieve the latest DSP chain performance metrics.
323    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
324        *self.dsp_metrics.lock().unwrap()
325    }
326
327    /// Retrieve the most recent per-channel peak levels.
328    pub fn get_levels(&self) -> Vec<f32> {
329        self.output_meter.lock().unwrap().levels()
330    }
331
332    /// Retrieve the most recent per-channel peak levels in dBFS.
333    pub fn get_levels_db(&self) -> Vec<f32> {
334        self.output_meter
335            .lock()
336            .unwrap()
337            .levels()
338            .into_iter()
339            .map(linear_to_dbfs)
340            .collect()
341    }
342
343    /// Retrieve the most recent per-channel average levels.
344    pub fn get_levels_avg(&self) -> Vec<f32> {
345        self.output_meter.lock().unwrap().averages()
346    }
347
348    /// Set the output meter refresh rate (frames per second).
349    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
350        self.output_meter.lock().unwrap().set_refresh_hz(hz);
351    }
352
353    /// Debug helper returning thread alive, state, and audio heard flags.
354    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
355        (
356            self.playback_thread_exists.load(Ordering::SeqCst),
357            *self.state.lock().unwrap(),
358            self.audio_heard.load(Ordering::Relaxed),
359        )
360    }
361
362    /// Debug helper indicating whether buffering has completed.
363    pub fn debug_buffering_done(&self) -> bool {
364        self.buffering_done.load(Ordering::Relaxed)
365    }
366
367    /// Debug helper returning internal timing markers in milliseconds.
368    pub fn debug_timing_ms(&self) -> (u64, u64) {
369        (
370            self.last_chunk_ms.load(Ordering::Relaxed),
371            self.last_time_update_ms.load(Ordering::Relaxed),
372        )
373    }
374
375    /// Debug helper returning sink paused/empty flags and queued length.
376    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
377        let sink = self.sink.lock().unwrap();
378        let paused = sink.is_paused();
379        let empty = sink.empty();
380        let len = sink.len();
381        (paused, empty, len)
382    }
383
384    fn request_effects_reset(&self) {
385        self.effects_reset.fetch_add(1, Ordering::SeqCst);
386    }
387
388    /// Configure the minimum buffered audio (ms) before playback starts.
389    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
390        let mut settings = self.buffer_settings.lock().unwrap();
391        settings.start_buffer_ms = start_buffer_ms.max(0.0);
392    }
393
394    /// Configure heuristic end-of-track threshold for containers (ms).
395    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
396        let mut settings = self.buffer_settings.lock().unwrap();
397        settings.track_eos_ms = track_eos_ms.max(0.0);
398    }
399
400    /// Configure minimum sink chunks queued before playback starts/resumes.
401    pub fn set_start_sink_chunks(&self, chunks: usize) {
402        let mut settings = self.buffer_settings.lock().unwrap();
403        settings.start_sink_chunks = chunks;
404    }
405
406    /// Configure the maximum sink chunks queued before producer backpressure.
407    ///
408    /// Set to `0` to disable this guard.
409    pub fn set_max_sink_chunks(&self, chunks: usize) {
410        let mut settings = self.buffer_settings.lock().unwrap();
411        settings.max_sink_chunks = chunks;
412    }
413
414    /// Configure the startup silence pre-roll (ms).
415    pub fn set_startup_silence_ms(&self, ms: f32) {
416        let mut settings = self.buffer_settings.lock().unwrap();
417        settings.startup_silence_ms = ms.max(0.0);
418    }
419
420    /// Configure the startup fade-in length (ms).
421    pub fn set_startup_fade_ms(&self, ms: f32) {
422        let mut settings = self.buffer_settings.lock().unwrap();
423        settings.startup_fade_ms = ms.max(0.0);
424    }
425
426    /// Configure the append jitter logging threshold (ms). 0 disables logging.
427    pub fn set_append_jitter_log_ms(&self, ms: f32) {
428        let mut settings = self.buffer_settings.lock().unwrap();
429        settings.append_jitter_log_ms = ms.max(0.0);
430    }
431
432    /// Enable or disable per-effect boundary discontinuity logging.
433    pub fn set_effect_boundary_log(&self, enabled: bool) {
434        let mut settings = self.buffer_settings.lock().unwrap();
435        settings.effect_boundary_log = enabled;
436    }
437
438    fn initialize_thread(&mut self, ts: Option<f64>) {
439        // Empty finished_tracks
440        let mut finished_tracks = self.finished_tracks.lock().unwrap();
441        finished_tracks.clear();
442        drop(finished_tracks);
443
444        // ===== Set play options ===== //
445        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
446        self.abort = Arc::new(AtomicBool::new(false));
447        self.playback_thread_exists.store(true, Ordering::SeqCst);
448        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
449        self.buffering_done.store(false, Ordering::SeqCst);
450        let now_ms_value = now_ms();
451        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
452        self.last_time_update_ms
453            .store(now_ms_value, Ordering::Relaxed);
454
455        // ===== Clone variables ===== //
456        let play_state = self.state.clone();
457        let abort = self.abort.clone();
458        let playback_thread_exists = self.playback_thread_exists.clone();
459        let playback_id_atomic = self.playback_id.clone();
460        let time_passed = self.ts.clone();
461
462        let duration = self.duration.clone();
463        let prot = self.prot.clone();
464        let buffer_settings = self.buffer_settings.clone();
465        let buffer_settings_for_state = self.buffer_settings.clone();
466        let effects = self.effects.clone();
467        let dsp_metrics = self.dsp_metrics.clone();
468        let dsp_metrics_for_sink = self.dsp_metrics.clone();
469        let effects_reset = self.effects_reset.clone();
470        let output_meter = self.output_meter.clone();
471        let audio_info = self.info.clone();
472
473        let audio_heard = self.audio_heard.clone();
474        let volume = self.volume.clone();
475        let sink_mutex = self.sink.clone();
476        let buffer_done_thread_flag = self.buffering_done.clone();
477        let last_chunk_ms = self.last_chunk_ms.clone();
478        let last_time_update_ms = self.last_time_update_ms.clone();
479
480        audio_heard.store(false, Ordering::Relaxed);
481
482        {
483            let mut meter = self.output_meter.lock().unwrap();
484            meter.reset();
485        }
486
487        // ===== Start playback ===== //
488        thread::spawn(move || {
489            // ===================== //
490            // Set playback_thread_exists to true
491            // ===================== //
492            let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
493
494            // ===================== //
495            // Initialize engine & sink
496            // ===================== //
497            let start_time = match ts {
498                Some(ts) => ts,
499                None => 0.0,
500            };
501            let mut engine = PlayerEngine::new(
502                prot,
503                Some(abort.clone()),
504                start_time,
505                buffer_settings,
506                effects,
507                dsp_metrics,
508                effects_reset,
509            );
510            let mut stream = None;
511            for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
512                match OutputStreamBuilder::open_default_stream() {
513                    Ok(s) => {
514                        stream = Some(s);
515                        break;
516                    }
517                    Err(err) => {
518                        if attempt == OUTPUT_STREAM_OPEN_RETRIES {
519                            error!(
520                                "failed to open default output stream after {} attempts: {}",
521                                OUTPUT_STREAM_OPEN_RETRIES, err
522                            );
523                            return;
524                        }
525                        warn!(
526                            "open_default_stream attempt {}/{} failed: {}",
527                            attempt, OUTPUT_STREAM_OPEN_RETRIES, err
528                        );
529                        thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
530                    }
531                }
532            }
533            let stream = stream.expect("stream should exist after successful retry loop");
534            let mixer = stream.mixer().clone();
535
536            let mut sink = sink_mutex.lock().unwrap();
537            *sink = Sink::connect_new(&mixer);
538            sink.pause();
539            sink.set_volume(*volume.lock().unwrap());
540            drop(sink);
541
542            // ===================== //
543            // Set duration from engine
544            // ===================== //
545            let mut duration = duration.lock().unwrap();
546            *duration = engine.get_duration();
547            drop(duration);
548
549            // ===================== //
550            // Initialize chunk_lengths & time_passed
551            // ===================== //
552            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
553            let mut time_passed_unlocked = time_passed.lock().unwrap();
554            *time_passed_unlocked = start_time;
555            drop(time_passed_unlocked);
556
557            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
558                let timestamp = *time_passed.lock().unwrap();
559
560                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
561                // Fade out and pause sink
562                while sink.volume() > 0.0 && timestamp != start_time {
563                    sink.set_volume(sink.volume() - fade_increments);
564                    thread::sleep(Duration::from_millis(10));
565                }
566                sink.pause();
567            };
568
569            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
570                let volume = *volume.lock().unwrap();
571                if fade_length_in_seconds <= 0.0 {
572                    sink.play();
573                    sink.set_volume(volume);
574                    return;
575                }
576                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
577                // Fade in and play sink
578                sink.play();
579                while sink.volume() < volume {
580                    sink.set_volume(sink.volume() + fade_increments);
581                    thread::sleep(Duration::from_millis(5));
582                }
583            };
584
585            // ===================== //
586            // Start sink with startup silence + fade in
587            // ===================== //
588            {
589                let startup_settings = buffer_settings_for_state.lock().unwrap();
590                let startup_silence_ms = startup_settings.startup_silence_ms;
591                drop(startup_settings);
592
593                let sample_rate = audio_info.sample_rate as u32;
594                let channels = audio_info.channels as u16;
595
596                if startup_silence_ms > 0.0 {
597                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
598                        as usize
599                        * channels as usize;
600                    let silence = vec![0.0_f32; samples.max(1)];
601                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
602                    let sink = sink_mutex.lock().unwrap();
603                    sink.append(silence_buffer);
604                    drop(sink);
605                }
606            }
607
608            // ===================== //
609            // Check if the player should be paused or not
610            // ===================== //
611            let startup_fade_pending = Cell::new(true);
612            let check_details = || {
613                if abort.load(Ordering::SeqCst) {
614                    let sink = sink_mutex.lock().unwrap();
615                    pause_sink(&sink, 0.1);
616                    sink.clear();
617                    drop(sink);
618
619                    return false;
620                }
621
622                let sink = sink_mutex.lock().unwrap();
623                let state = play_state.lock().unwrap().clone();
624                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
625                if state == PlayerState::Resuming
626                    && start_sink_chunks > 0
627                    && sink.len() < start_sink_chunks
628                {
629                    // Keep paused until enough chunks are queued.
630                    sink.pause();
631                    drop(sink);
632                    return true;
633                }
634                if state == PlayerState::Pausing {
635                    pause_sink(&sink, 0.1);
636                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
637                }
638                if state == PlayerState::Resuming {
639                    let fade_length = if startup_fade_pending.replace(false) {
640                        let startup_fade_ms =
641                            buffer_settings_for_state.lock().unwrap().startup_fade_ms;
642                        (startup_fade_ms / 1000.0).max(0.0)
643                    } else {
644                        0.1
645                    };
646                    resume_sink(&sink, fade_length);
647                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
648                }
649                drop(sink);
650
651                true
652            };
653
654            // ===================== //
655            // Update chunk_lengths / time_passed
656            // ===================== //
657            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
658            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
659            let buffering_done = Arc::new(AtomicBool::new(false));
660            let buffering_done_flag = buffer_done_thread_flag.clone();
661            let final_duration = Arc::new(Mutex::new(None::<f64>));
662            let mut timer = timer_mut.lock().unwrap();
663            timer.start();
664            drop(timer);
665
666            let last_meter_time = Cell::new(0.0_f64);
667            let update_chunk_lengths = || {
668                if abort.load(Ordering::SeqCst) {
669                    return;
670                }
671
672                let mut chunk_lengths = chunk_lengths.lock().unwrap();
673                let mut time_passed_unlocked = time_passed.lock().unwrap();
674                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
675                let mut timer = timer_mut.lock().unwrap();
676                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
677                let sink = sink_mutex.lock().unwrap();
678                if !buffering_done.load(Ordering::Relaxed) {
679                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
680                    // since the last time this function was called and add that to time_passed.
681                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
682
683                    for _ in 0..chunks_played {
684                        timer.reset();
685                        timer.start();
686                        *time_chunks_passed += chunk_lengths.remove(0);
687                    }
688                }
689
690                if sink.is_paused() {
691                    timer.pause();
692                } else {
693                    timer.un_pause();
694                }
695
696                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
697                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
698                last_meter_time.set(current_audio_time);
699                {
700                    let mut meter = output_meter.lock().unwrap();
701                    meter.advance(delta);
702                }
703
704                *time_passed_unlocked = current_audio_time;
705
706                drop(sink);
707                drop(chunk_lengths);
708                drop(time_passed_unlocked);
709                drop(time_chunks_passed);
710                drop(timer);
711            };
712
713            // ===================== //
714            // Update sink for each chunk received from engine
715            // ===================== //
716            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
717            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
718                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
719                    return;
720                }
721                let max_sink_chunks = {
722                    let settings = buffer_settings_for_state.lock().unwrap();
723                    settings.max_sink_chunks
724                };
725                if max_sink_chunks > 0 {
726                    loop {
727                        if abort.load(Ordering::SeqCst) {
728                            return;
729                        }
730                        if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
731                            return;
732                        }
733                        let sink_len = {
734                            let sink = sink_mutex.lock().unwrap();
735                            sink.len()
736                        };
737                        if sink_len < max_sink_chunks {
738                            break;
739                        }
740                        update_chunk_lengths();
741                        if !check_details() {
742                            return;
743                        }
744                        thread::sleep(Duration::from_millis(5));
745                    }
746                }
747                let (delay_ms, late) = {
748                    let mut timing = append_timing.lock().unwrap();
749                    let now = Instant::now();
750                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
751                    let chunk_ms = length_in_seconds * 1000.0;
752                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
753                    if late {
754                        timing.2 = timing.2.saturating_add(1);
755                    }
756                    timing.1 = if timing.1 == 0.0 {
757                        delta_ms
758                    } else {
759                        (timing.1 * 0.9) + (delta_ms * 0.1)
760                    };
761                    timing.3 = timing.3.max(delta_ms);
762                    timing.0 = now;
763                    (delta_ms, late)
764                };
765                audio_heard.store(true, Ordering::Relaxed);
766                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
767
768                {
769                    let mut meter = output_meter.lock().unwrap();
770                    meter.push_samples(&mixer);
771                }
772                {
773                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
774                    metrics.append_delay_ms = delay_ms;
775                    metrics.avg_append_delay_ms = {
776                        if metrics.avg_append_delay_ms == 0.0 {
777                            delay_ms
778                        } else {
779                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
780                        }
781                    };
782                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
783                    metrics.late_append_count = {
784                        let timing = append_timing.lock().unwrap();
785                        timing.2
786                    };
787                    metrics.late_append_active = late;
788                }
789
790                let sink = sink_mutex.lock().unwrap();
791                let append_jitter_log_ms = {
792                    let settings = buffer_settings_for_state.lock().unwrap();
793                    settings.append_jitter_log_ms
794                };
795                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
796                    let expected_ms = length_in_seconds * 1000.0;
797                    log::info!(
798                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
799                        delay_ms,
800                        expected_ms,
801                        late,
802                        append_jitter_log_ms,
803                        sink.len()
804                    );
805                }
806                let mut chunk_lengths = chunk_lengths.lock().unwrap();
807
808                sink.append(mixer);
809
810                drop(sink);
811
812                chunk_lengths.push(length_in_seconds);
813                drop(chunk_lengths);
814
815                update_chunk_lengths();
816                check_details();
817            };
818
819            let receiver = engine.start_receiver();
820            loop {
821                match receiver.recv_timeout(Duration::from_millis(20)) {
822                    Ok(chunk) => {
823                        update_sink(chunk);
824                    }
825                    Err(RecvTimeoutError::Timeout) => {
826                        update_chunk_lengths();
827                        if !check_details() {
828                            break;
829                        }
830                    }
831                    Err(RecvTimeoutError::Disconnected) => break,
832                }
833            }
834            #[cfg(feature = "debug")]
835            log::info!("engine reception loop finished");
836
837            // From here on, all audio is buffered. Stop relying on sink.len()
838            // to advance time so the UI keeps updating while the last buffer plays.
839            buffering_done.store(true, Ordering::Relaxed);
840            buffering_done_flag.store(true, Ordering::Relaxed);
841            {
842                let mut final_duration = final_duration.lock().unwrap();
843                if final_duration.is_none() {
844                    let chunk_lengths = chunk_lengths.lock().unwrap();
845                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
846                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
847                }
848            }
849
850            // ===================== //
851            // Wait until all tracks are finished playing in sink
852            // ===================== //
853            #[cfg(feature = "debug")]
854            {
855                let sink = sink_mutex.lock().unwrap();
856                let paused = sink.is_paused();
857                let empty = sink.empty();
858                let sink_len = sink.len();
859                drop(sink);
860                let time_passed = *time_passed.lock().unwrap();
861                let final_duration = *final_duration.lock().unwrap();
862                log::info!(
863                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
864                    paused,
865                    empty,
866                    sink_len,
867                    time_passed,
868                    final_duration
869                );
870            }
871
872            loop {
873                update_chunk_lengths();
874                if !check_details() {
875                    break;
876                }
877
878                let done = if engine.finished_buffering() {
879                    if let Some(final_duration) = *final_duration.lock().unwrap() {
880                        let time_passed = *time_passed.lock().unwrap();
881                        time_passed >= (final_duration - 0.001).max(0.0)
882                    } else {
883                        false
884                    }
885                } else {
886                    false
887                };
888                if done {
889                    break;
890                }
891
892                thread::sleep(Duration::from_millis(10));
893            }
894
895            #[cfg(feature = "debug")]
896            log::info!("Finished drain loop!");
897
898            // ===================== //
899            // Set playback_thread_exists to false
900            // ===================== //
901            drop(_thread_guard);
902        });
903    }
904
905    /// Start playback from a specific timestamp (seconds).
906    pub fn play_at(&mut self, ts: f64) {
907        let mut timestamp = self.ts.lock().unwrap();
908        *timestamp = ts;
909        drop(timestamp);
910
911        self.request_effects_reset();
912        self.kill_current();
913        // self.stop.store(false, Ordering::SeqCst);
914        self.initialize_thread(Some(ts));
915
916        self.resume();
917
918        self.wait_for_audio_heard(Duration::from_secs(5));
919    }
920
921    /// Start playback from the current timestamp.
922    pub fn play(&mut self) {
923        info!("Playing audio");
924        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
925        // self.stop.store(false, Ordering::SeqCst);
926
927        if !thread_exists {
928            self.initialize_thread(None);
929        }
930
931        self.resume();
932
933        self.wait_for_audio_heard(Duration::from_secs(5));
934    }
935
936    /// Pause playback.
937    pub fn pause(&self) {
938        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
939    }
940
941    /// Resume playback if paused.
942    pub fn resume(&self) {
943        self.state
944            .lock()
945            .unwrap()
946            .clone_from(&PlayerState::Resuming);
947    }
948
949    /// Stop the current playback thread without changing state.
950    pub fn kill_current(&self) {
951        self.state
952            .lock()
953            .unwrap()
954            .clone_from(&PlayerState::Stopping);
955        {
956            let sink = self.sink.lock().unwrap();
957            sink.stop();
958        }
959        self.abort.store(true, Ordering::SeqCst);
960
961        while !self.thread_finished() {
962            thread::sleep(Duration::from_millis(10));
963        }
964
965        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
966    }
967
968    /// Stop playback and reset timing state.
969    pub fn stop(&self) {
970        self.kill_current();
971        self.ts.lock().unwrap().clone_from(&0.0);
972    }
973
974    /// Return true if playback is currently active.
975    pub fn is_playing(&self) -> bool {
976        let state = self.state.lock().unwrap();
977        *state == PlayerState::Playing
978    }
979
980    /// Return true if playback is currently paused.
981    pub fn is_paused(&self) -> bool {
982        let state = self.state.lock().unwrap();
983        *state == PlayerState::Paused
984    }
985
986    /// Get the current playback time in seconds.
987    pub fn get_time(&self) -> f64 {
988        let ts = self.ts.lock().unwrap();
989        *ts
990    }
991
992    fn thread_finished(&self) -> bool {
993        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
994        !playback_thread_exists
995    }
996
997    /// Return true if playback has reached the end.
998    pub fn is_finished(&self) -> bool {
999        self.thread_finished()
1000        // let state = self.state.lock().unwrap();
1001        // *state == PlayerState::Finished
1002    }
1003
1004    /// Block the current thread until playback finishes.
1005    pub fn sleep_until_end(&self) {
1006        loop {
1007            if self.thread_finished() {
1008                break;
1009            }
1010            thread::sleep(Duration::from_millis(100));
1011        }
1012    }
1013
1014    /// Get the total duration (seconds) of the active selection.
1015    pub fn get_duration(&self) -> f64 {
1016        let duration = self.duration.lock().unwrap();
1017        *duration
1018    }
1019
1020    /// Seek to the given timestamp (seconds).
1021    pub fn seek(&mut self, ts: f64) {
1022        let mut timestamp = self.ts.lock().unwrap();
1023        *timestamp = ts;
1024        drop(timestamp);
1025
1026        self.request_effects_reset();
1027        let state = self.state.lock().unwrap().clone();
1028
1029        self.kill_current();
1030        self.state.lock().unwrap().clone_from(&state);
1031        self.initialize_thread(Some(ts));
1032
1033        if state == PlayerState::Playing {
1034            self.resume();
1035        }
1036    }
1037
1038    /// Refresh active track selections from the underlying container.
1039    pub fn refresh_tracks(&mut self) {
1040        let mut prot = self.prot.lock().unwrap();
1041        prot.refresh_tracks();
1042        if let Some(spec) = self.impulse_response_override.clone() {
1043            prot.set_impulse_response_spec(spec);
1044        }
1045        if let Some(tail_db) = self.impulse_response_tail_override {
1046            prot.set_impulse_response_tail_db(tail_db);
1047        }
1048        drop(prot);
1049
1050        self.request_effects_reset();
1051        // If stopped, return
1052        if self.thread_finished() {
1053            return;
1054        }
1055
1056        // Kill current thread and start
1057        // new thread at the current timestamp
1058        let ts = self.get_time();
1059        self.seek(ts);
1060
1061        // If previously playing, resume
1062        if self.is_playing() {
1063            self.resume();
1064        }
1065
1066        self.wait_for_audio_heard(Duration::from_secs(5));
1067    }
1068
1069    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1070        let start = Instant::now();
1071        loop {
1072            if self.audio_heard.load(Ordering::Relaxed) {
1073                return true;
1074            }
1075            if self.thread_finished() {
1076                warn!("playback thread ended before audio was heard");
1077                return false;
1078            }
1079            if start.elapsed() >= timeout {
1080                warn!("timed out waiting for audio to start");
1081                return false;
1082            }
1083            thread::sleep(Duration::from_millis(10));
1084        }
1085    }
1086
1087    /// Shuffle track selections and restart playback.
1088    pub fn shuffle(&mut self) {
1089        self.refresh_tracks();
1090    }
1091
1092    /// Set the playback volume (0.0-1.0).
1093    pub fn set_volume(&mut self, new_volume: f32) {
1094        let sink = self.sink.lock().unwrap();
1095        sink.set_volume(new_volume);
1096        drop(sink);
1097
1098        let mut volume = self.volume.lock().unwrap();
1099        *volume = new_volume;
1100        drop(volume);
1101    }
1102
1103    /// Get the current playback volume.
1104    pub fn get_volume(&self) -> f32 {
1105        *self.volume.lock().unwrap()
1106    }
1107
1108    /// Get the track identifiers used for display.
1109    pub fn get_ids(&self) -> Vec<String> {
1110        let prot = self.prot.lock().unwrap();
1111
1112        return prot.get_ids();
1113    }
1114
1115    /// Enable periodic reporting of playback status for UI consumers.
1116    pub fn set_reporting(
1117        &mut self,
1118        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1119        reporting_interval: Duration,
1120    ) {
1121        if self.reporter.is_some() {
1122            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1123        }
1124
1125        let reporter = Arc::new(Mutex::new(Reporter::new(
1126            Arc::new(Mutex::new(self.clone())),
1127            reporting,
1128            reporting_interval,
1129        )));
1130
1131        reporter.lock().unwrap().start();
1132
1133        self.reporter = Some(reporter);
1134    }
1135}
1136
1137fn linear_to_dbfs(value: f32) -> f32 {
1138    if value <= 0.0 {
1139        f32::NEG_INFINITY
1140    } else {
1141        20.0 * value.log10()
1142    }
1143}
1144
1145fn now_ms() -> u64 {
1146    use std::time::SystemTime;
1147    SystemTime::now()
1148        .duration_since(SystemTime::UNIX_EPOCH)
1149        .map(|d| d.as_millis() as u64)
1150        .unwrap_or(0)
1151}