Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{error, info, warn};
12
13use crate::container::prot::{PathsTrack, Prot};
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19    container::info::Info,
20    dsp::effects::AudioEffect,
21    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24/// High-level playback state for the player.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27    Init,
28    Resuming,
29    Playing,
30    Pausing,
31    Paused,
32    Stopping,
33    Stopped,
34    Finished,
35}
36
37/// Snapshot of convolution reverb settings for UI consumers.
38#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40    pub enabled: bool,
41    pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45const OUTPUT_STREAM_OPEN_RETRIES: usize = 20;
46const OUTPUT_STREAM_OPEN_RETRY_MS: u64 = 100;
47
48struct PlaybackThreadGuard {
49    exists: Arc<AtomicBool>,
50}
51
52impl PlaybackThreadGuard {
53    fn new(exists: Arc<AtomicBool>) -> Self {
54        exists.store(true, Ordering::Relaxed);
55        Self { exists }
56    }
57}
58
59impl Drop for PlaybackThreadGuard {
60    fn drop(&mut self) {
61        self.exists.store(false, Ordering::Relaxed);
62    }
63}
64
65/// Primary playback controller.
66///
67/// `Player` owns the playback threads, buffering state, and runtime settings
68/// such as volume and reverb configuration.
69#[derive(Clone)]
70pub struct Player {
71    pub info: Info,
72    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
73    pub ts: Arc<Mutex<f64>>,
74    state: Arc<Mutex<PlayerState>>,
75    abort: Arc<AtomicBool>,
76    playback_thread_exists: Arc<AtomicBool>,
77    playback_id: Arc<AtomicU64>,
78    duration: Arc<Mutex<f64>>,
79    prot: Arc<Mutex<Prot>>,
80    audio_heard: Arc<AtomicBool>,
81    volume: Arc<Mutex<f32>>,
82    sink: Arc<Mutex<Sink>>,
83    reporter: Option<Arc<Mutex<Reporter>>>,
84    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
85    effects: Arc<Mutex<Vec<AudioEffect>>>,
86    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
87    effects_reset: Arc<AtomicU64>,
88    output_meter: Arc<Mutex<OutputMeter>>,
89    buffering_done: Arc<AtomicBool>,
90    last_chunk_ms: Arc<AtomicU64>,
91    last_time_update_ms: Arc<AtomicU64>,
92    next_resume_fade_ms: Arc<Mutex<Option<f32>>>,
93    impulse_response_override: Option<ImpulseResponseSpec>,
94    impulse_response_tail_override: Option<f32>,
95}
96
97impl Player {
98    /// Create a new player for a single container path.
99    pub fn new(file_path: &String) -> Self {
100        let this = Self::new_from_path_or_paths(Some(file_path), None);
101        this
102    }
103
104    /// Create a new player for a set of standalone file paths.
105    pub fn new_from_file_paths(file_paths: Vec<PathsTrack>) -> Self {
106        let this = Self::new_from_path_or_paths(None, Some(file_paths));
107        this
108    }
109
110    /// Create a new player for a set of standalone file paths.
111    pub fn new_from_file_paths_legacy(file_paths: Vec<Vec<String>>) -> Self {
112        let this = Self::new_from_path_or_paths(
113            None,
114            Some(
115                file_paths
116                    .into_iter()
117                    .map(|paths| PathsTrack::new_from_file_paths(paths))
118                    .collect(),
119            ),
120        );
121        this
122    }
123
124    /// Create a player from either a container path or standalone file paths.
125    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<Vec<PathsTrack>>) -> Self {
126        let (prot, info) = match path {
127            Some(path) => {
128                let prot = Arc::new(Mutex::new(Prot::new(path)));
129                let info = Info::new(path.clone());
130                (prot, info)
131            }
132            None => {
133                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
134                let locked_prot = prot.lock().unwrap();
135                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
136                drop(locked_prot);
137                (prot, info)
138            }
139        };
140
141        let (sink, _queue) = Sink::new();
142        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
143
144        let channels = info.channels as usize;
145        let sample_rate = info.sample_rate;
146        let effects = {
147            let prot_locked = prot.lock().unwrap();
148            match prot_locked.get_effects() {
149                Some(effects) => Arc::new(Mutex::new(effects)),
150                None => Arc::new(Mutex::new(vec![])),
151            }
152        };
153
154        let mut this = Self {
155            info,
156            finished_tracks: Arc::new(Mutex::new(Vec::new())),
157            state: Arc::new(Mutex::new(PlayerState::Stopped)),
158            abort: Arc::new(AtomicBool::new(false)),
159            ts: Arc::new(Mutex::new(0.0)),
160            playback_thread_exists: Arc::new(AtomicBool::new(true)),
161            playback_id: Arc::new(AtomicU64::new(0)),
162            duration: Arc::new(Mutex::new(0.0)),
163            audio_heard: Arc::new(AtomicBool::new(false)),
164            volume: Arc::new(Mutex::new(0.8)),
165            sink,
166            prot,
167            reporter: None,
168            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
169            effects,
170            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
171            effects_reset: Arc::new(AtomicU64::new(0)),
172            output_meter: Arc::new(Mutex::new(OutputMeter::new(
173                channels,
174                sample_rate,
175                OUTPUT_METER_REFRESH_HZ,
176            ))),
177            buffering_done: Arc::new(AtomicBool::new(false)),
178            last_chunk_ms: Arc::new(AtomicU64::new(0)),
179            last_time_update_ms: Arc::new(AtomicU64::new(0)),
180            next_resume_fade_ms: Arc::new(Mutex::new(None)),
181            impulse_response_override: None,
182            impulse_response_tail_override: None,
183        };
184
185        this.initialize_thread(None);
186
187        this
188    }
189
190    /// Override the impulse response used for convolution reverb.
191    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
192        self.impulse_response_override = Some(spec.clone());
193        let mut prot = self.prot.lock().unwrap();
194        prot.set_impulse_response_spec(spec);
195        self.request_effects_reset();
196    }
197
198    /// Parse and apply an impulse response spec string.
199    pub fn set_impulse_response_from_string(&mut self, value: &str) {
200        if let Some(spec) = parse_impulse_response_string(value) {
201            self.set_impulse_response_spec(spec);
202        }
203    }
204
205    /// Override the impulse response tail trim (dB).
206    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
207        self.impulse_response_tail_override = Some(tail_db);
208        let mut prot = self.prot.lock().unwrap();
209        prot.set_impulse_response_tail_db(tail_db);
210        self.request_effects_reset();
211    }
212
213    /// Enable or disable convolution reverb.
214    pub fn set_reverb_enabled(&self, enabled: bool) {
215        let mut effects = self.effects.lock().unwrap();
216        if let Some(effect) = effects
217            .iter_mut()
218            .find_map(|effect| effect.as_convolution_reverb_mut())
219        {
220            effect.enabled = enabled;
221        }
222        if let Some(effect) = effects
223            .iter_mut()
224            .find_map(|effect| effect.as_delay_reverb_mut())
225        {
226            effect.enabled = enabled;
227        }
228    }
229
230    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
231    pub fn set_reverb_mix(&self, dry_wet: f32) {
232        let mut effects = self.effects.lock().unwrap();
233        if let Some(effect) = effects
234            .iter_mut()
235            .find_map(|effect| effect.as_convolution_reverb_mut())
236        {
237            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
238        }
239        if let Some(effect) = effects
240            .iter_mut()
241            .find_map(|effect| effect.as_delay_reverb_mut())
242        {
243            effect.mix = dry_wet.clamp(0.0, 1.0);
244        }
245        if let Some(effect) = effects
246            .iter_mut()
247            .find_map(|effect| effect.as_diffusion_reverb_mut())
248        {
249            effect.mix = dry_wet.clamp(0.0, 1.0);
250        }
251    }
252
253    /// Retrieve the current reverb settings snapshot.
254    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
255        let effects = self.effects.lock().unwrap();
256        if let Some(effect) = effects
257            .iter()
258            .find_map(|effect| effect.as_convolution_reverb())
259        {
260            return ReverbSettingsSnapshot {
261                enabled: effect.enabled,
262                dry_wet: effect.dry_wet,
263            };
264        }
265        if let Some(effect) = effects
266            .iter()
267            .find_map(|effect| effect.as_diffusion_reverb())
268        {
269            return ReverbSettingsSnapshot {
270                enabled: effect.enabled,
271                dry_wet: effect.mix,
272            };
273        }
274        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
275            return ReverbSettingsSnapshot {
276                enabled: effect.enabled,
277                dry_wet: effect.mix,
278            };
279        }
280        ReverbSettingsSnapshot {
281            enabled: false,
282            dry_wet: 0.0,
283        }
284    }
285
286    /// Snapshot the active effect chain names.
287    #[allow(deprecated)]
288    pub fn get_effect_names(&self) -> Vec<String> {
289        let effects = self.effects.lock().unwrap();
290        effects
291            .iter()
292            .map(|effect| match effect {
293                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
294                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
295                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
296                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
297                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
298                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
299                AudioEffect::Distortion(_) => "Distortion".to_string(),
300                AudioEffect::Gain(_) => "Gain".to_string(),
301                AudioEffect::Compressor(_) => "Compressor".to_string(),
302                AudioEffect::Limiter(_) => "Limiter".to_string(),
303            })
304            .collect()
305    }
306
307    /// Replace the active DSP effects chain.
308    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
309        {
310            let mut guard = self.effects.lock().unwrap();
311            println!("New Effects: {:?}", effects);
312            *guard = effects;
313        }
314        self.request_effects_reset();
315
316        // Seeking to the current time stamp refreshes the
317        // Sink so that the new effects are applied immediately.
318        if !self.thread_finished() {
319            let ts = self.get_time();
320            self.seek(ts);
321        }
322    }
323
324    /// Retrieve the latest DSP chain performance metrics.
325    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
326        *self.dsp_metrics.lock().unwrap()
327    }
328
329    /// Retrieve the most recent per-channel peak levels.
330    pub fn get_levels(&self) -> Vec<f32> {
331        self.output_meter.lock().unwrap().levels()
332    }
333
334    /// Retrieve the most recent per-channel peak levels in dBFS.
335    pub fn get_levels_db(&self) -> Vec<f32> {
336        self.output_meter
337            .lock()
338            .unwrap()
339            .levels()
340            .into_iter()
341            .map(linear_to_dbfs)
342            .collect()
343    }
344
345    /// Retrieve the most recent per-channel average levels.
346    pub fn get_levels_avg(&self) -> Vec<f32> {
347        self.output_meter.lock().unwrap().averages()
348    }
349
350    /// Set the output meter refresh rate (frames per second).
351    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
352        self.output_meter.lock().unwrap().set_refresh_hz(hz);
353    }
354
355    /// Debug helper returning thread alive, state, and audio heard flags.
356    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
357        (
358            self.playback_thread_exists.load(Ordering::SeqCst),
359            *self.state.lock().unwrap(),
360            self.audio_heard.load(Ordering::Relaxed),
361        )
362    }
363
364    /// Debug helper indicating whether buffering has completed.
365    pub fn debug_buffering_done(&self) -> bool {
366        self.buffering_done.load(Ordering::Relaxed)
367    }
368
369    /// Debug helper returning internal timing markers in milliseconds.
370    pub fn debug_timing_ms(&self) -> (u64, u64) {
371        (
372            self.last_chunk_ms.load(Ordering::Relaxed),
373            self.last_time_update_ms.load(Ordering::Relaxed),
374        )
375    }
376
377    /// Debug helper returning sink paused/empty flags and queued length.
378    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
379        let sink = self.sink.lock().unwrap();
380        let paused = sink.is_paused();
381        let empty = sink.empty();
382        let len = sink.len();
383        (paused, empty, len)
384    }
385
386    fn request_effects_reset(&self) {
387        self.effects_reset.fetch_add(1, Ordering::SeqCst);
388    }
389
390    /// Configure the minimum buffered audio (ms) before playback starts.
391    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
392        let mut settings = self.buffer_settings.lock().unwrap();
393        settings.start_buffer_ms = start_buffer_ms.max(0.0);
394    }
395
396    /// Configure heuristic end-of-track threshold for containers (ms).
397    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
398        let mut settings = self.buffer_settings.lock().unwrap();
399        settings.track_eos_ms = track_eos_ms.max(0.0);
400    }
401
402    /// Configure minimum sink chunks queued before playback starts/resumes.
403    pub fn set_start_sink_chunks(&self, chunks: usize) {
404        let mut settings = self.buffer_settings.lock().unwrap();
405        settings.start_sink_chunks = chunks;
406    }
407
408    /// Configure the maximum sink chunks queued before producer backpressure.
409    ///
410    /// Set to `0` to disable this guard.
411    pub fn set_max_sink_chunks(&self, chunks: usize) {
412        let mut settings = self.buffer_settings.lock().unwrap();
413        settings.max_sink_chunks = chunks;
414    }
415
416    /// Configure the startup silence pre-roll (ms).
417    pub fn set_startup_silence_ms(&self, ms: f32) {
418        let mut settings = self.buffer_settings.lock().unwrap();
419        settings.startup_silence_ms = ms.max(0.0);
420    }
421
422    /// Configure the startup fade-in length (ms).
423    pub fn set_startup_fade_ms(&self, ms: f32) {
424        let mut settings = self.buffer_settings.lock().unwrap();
425        settings.startup_fade_ms = ms.max(0.0);
426    }
427
428    /// Configure seek fade-out length (ms) before restarting playback.
429    pub fn set_seek_fade_out_ms(&self, ms: f32) {
430        let mut settings = self.buffer_settings.lock().unwrap();
431        settings.seek_fade_out_ms = ms.max(0.0);
432    }
433
434    /// Configure seek fade-in length (ms) after restarting playback.
435    pub fn set_seek_fade_in_ms(&self, ms: f32) {
436        let mut settings = self.buffer_settings.lock().unwrap();
437        settings.seek_fade_in_ms = ms.max(0.0);
438    }
439
440    /// Configure the append jitter logging threshold (ms). 0 disables logging.
441    pub fn set_append_jitter_log_ms(&self, ms: f32) {
442        let mut settings = self.buffer_settings.lock().unwrap();
443        settings.append_jitter_log_ms = ms.max(0.0);
444    }
445
446    /// Enable or disable per-effect boundary discontinuity logging.
447    pub fn set_effect_boundary_log(&self, enabled: bool) {
448        let mut settings = self.buffer_settings.lock().unwrap();
449        settings.effect_boundary_log = enabled;
450    }
451
452    fn initialize_thread(&mut self, ts: Option<f64>) {
453        // Empty finished_tracks
454        let mut finished_tracks = self.finished_tracks.lock().unwrap();
455        finished_tracks.clear();
456        drop(finished_tracks);
457
458        // ===== Set play options ===== //
459        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
460        self.abort = Arc::new(AtomicBool::new(false));
461        self.playback_thread_exists.store(true, Ordering::SeqCst);
462        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
463        self.buffering_done.store(false, Ordering::SeqCst);
464        let now_ms_value = now_ms();
465        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
466        self.last_time_update_ms
467            .store(now_ms_value, Ordering::Relaxed);
468
469        // ===== Clone variables ===== //
470        let play_state = self.state.clone();
471        let abort = self.abort.clone();
472        let playback_thread_exists = self.playback_thread_exists.clone();
473        let playback_id_atomic = self.playback_id.clone();
474        let time_passed = self.ts.clone();
475
476        let duration = self.duration.clone();
477        let prot = self.prot.clone();
478        let buffer_settings = self.buffer_settings.clone();
479        let buffer_settings_for_state = self.buffer_settings.clone();
480        let effects = self.effects.clone();
481        let dsp_metrics = self.dsp_metrics.clone();
482        let dsp_metrics_for_sink = self.dsp_metrics.clone();
483        let effects_reset = self.effects_reset.clone();
484        let output_meter = self.output_meter.clone();
485        let audio_info = self.info.clone();
486        let next_resume_fade_ms = self.next_resume_fade_ms.clone();
487
488        let audio_heard = self.audio_heard.clone();
489        let volume = self.volume.clone();
490        let sink_mutex = self.sink.clone();
491        let buffer_done_thread_flag = self.buffering_done.clone();
492        let last_chunk_ms = self.last_chunk_ms.clone();
493        let last_time_update_ms = self.last_time_update_ms.clone();
494
495        audio_heard.store(false, Ordering::Relaxed);
496
497        {
498            let mut meter = self.output_meter.lock().unwrap();
499            meter.reset();
500        }
501
502        // ===== Start playback ===== //
503        thread::spawn(move || {
504            // ===================== //
505            // Set playback_thread_exists to true
506            // ===================== //
507            let _thread_guard = PlaybackThreadGuard::new(playback_thread_exists.clone());
508
509            // ===================== //
510            // Initialize engine & sink
511            // ===================== //
512            let start_time = match ts {
513                Some(ts) => ts,
514                None => 0.0,
515            };
516            let mut engine = PlayerEngine::new(
517                prot,
518                Some(abort.clone()),
519                start_time,
520                buffer_settings,
521                effects,
522                dsp_metrics,
523                effects_reset,
524            );
525            let mut stream = None;
526            for attempt in 1..=OUTPUT_STREAM_OPEN_RETRIES {
527                match OutputStreamBuilder::open_default_stream() {
528                    Ok(s) => {
529                        stream = Some(s);
530                        break;
531                    }
532                    Err(err) => {
533                        if attempt == OUTPUT_STREAM_OPEN_RETRIES {
534                            error!(
535                                "failed to open default output stream after {} attempts: {}",
536                                OUTPUT_STREAM_OPEN_RETRIES, err
537                            );
538                            return;
539                        }
540                        warn!(
541                            "open_default_stream attempt {}/{} failed: {}",
542                            attempt, OUTPUT_STREAM_OPEN_RETRIES, err
543                        );
544                        thread::sleep(Duration::from_millis(OUTPUT_STREAM_OPEN_RETRY_MS));
545                    }
546                }
547            }
548            let stream = stream.expect("stream should exist after successful retry loop");
549            let mixer = stream.mixer().clone();
550
551            let mut sink = sink_mutex.lock().unwrap();
552            *sink = Sink::connect_new(&mixer);
553            sink.pause();
554            sink.set_volume(*volume.lock().unwrap());
555            drop(sink);
556
557            // ===================== //
558            // Set duration from engine
559            // ===================== //
560            let mut duration = duration.lock().unwrap();
561            *duration = engine.get_duration();
562            drop(duration);
563
564            // ===================== //
565            // Initialize chunk_lengths & time_passed
566            // ===================== //
567            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
568            let mut time_passed_unlocked = time_passed.lock().unwrap();
569            *time_passed_unlocked = start_time;
570            drop(time_passed_unlocked);
571
572            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
573                let timestamp = *time_passed.lock().unwrap();
574
575                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
576                // Fade out and pause sink
577                while sink.volume() > 0.0 && timestamp != start_time {
578                    sink.set_volume(sink.volume() - fade_increments);
579                    thread::sleep(Duration::from_millis(10));
580                }
581                sink.pause();
582            };
583
584            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
585                let volume = *volume.lock().unwrap();
586                if fade_length_in_seconds <= 0.0 {
587                    sink.play();
588                    sink.set_volume(volume);
589                    return;
590                }
591                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
592                // Fade in and play sink
593                sink.play();
594                while sink.volume() < volume {
595                    sink.set_volume(sink.volume() + fade_increments);
596                    thread::sleep(Duration::from_millis(5));
597                }
598            };
599
600            // ===================== //
601            // Start sink with startup silence + fade in
602            // ===================== //
603            {
604                let startup_settings = buffer_settings_for_state.lock().unwrap();
605                let startup_silence_ms = startup_settings.startup_silence_ms;
606                drop(startup_settings);
607
608                let sample_rate = audio_info.sample_rate as u32;
609                let channels = audio_info.channels as u16;
610
611                if startup_silence_ms > 0.0 {
612                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
613                        as usize
614                        * channels as usize;
615                    let silence = vec![0.0_f32; samples.max(1)];
616                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
617                    let sink = sink_mutex.lock().unwrap();
618                    sink.append(silence_buffer);
619                    drop(sink);
620                }
621            }
622
623            // ===================== //
624            // Check if the player should be paused or not
625            // ===================== //
626            let startup_fade_pending = Cell::new(true);
627            let check_details = || {
628                if abort.load(Ordering::SeqCst) {
629                    let sink = sink_mutex.lock().unwrap();
630                    pause_sink(&sink, 0.1);
631                    sink.clear();
632                    drop(sink);
633
634                    return false;
635                }
636
637                let sink = sink_mutex.lock().unwrap();
638                let state = play_state.lock().unwrap().clone();
639                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
640                if state == PlayerState::Resuming
641                    && start_sink_chunks > 0
642                    && sink.len() < start_sink_chunks
643                {
644                    // Keep paused until enough chunks are queued.
645                    sink.pause();
646                    drop(sink);
647                    return true;
648                }
649                if state == PlayerState::Pausing {
650                    pause_sink(&sink, 0.1);
651                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
652                }
653                if state == PlayerState::Resuming {
654                    let fade_length = if startup_fade_pending.replace(false) {
655                        if let Some(ms) = next_resume_fade_ms.lock().unwrap().take() {
656                            (ms / 1000.0).max(0.0)
657                        } else {
658                            let startup_fade_ms =
659                                buffer_settings_for_state.lock().unwrap().startup_fade_ms;
660                            (startup_fade_ms / 1000.0).max(0.0)
661                        }
662                    } else {
663                        0.1
664                    };
665                    resume_sink(&sink, fade_length);
666                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
667                }
668                drop(sink);
669
670                true
671            };
672
673            // ===================== //
674            // Update chunk_lengths / time_passed
675            // ===================== //
676            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
677            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
678            let buffering_done = Arc::new(AtomicBool::new(false));
679            let buffering_done_flag = buffer_done_thread_flag.clone();
680            let final_duration = Arc::new(Mutex::new(None::<f64>));
681            let mut timer = timer_mut.lock().unwrap();
682            timer.start();
683            drop(timer);
684
685            let last_meter_time = Cell::new(0.0_f64);
686            let update_chunk_lengths = || {
687                if abort.load(Ordering::SeqCst) {
688                    return;
689                }
690
691                let mut chunk_lengths = chunk_lengths.lock().unwrap();
692                let mut time_passed_unlocked = time_passed.lock().unwrap();
693                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
694                let mut timer = timer_mut.lock().unwrap();
695                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
696                let sink = sink_mutex.lock().unwrap();
697                if !buffering_done.load(Ordering::Relaxed) {
698                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
699                    // since the last time this function was called and add that to time_passed.
700                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
701
702                    for _ in 0..chunks_played {
703                        timer.reset();
704                        timer.start();
705                        *time_chunks_passed += chunk_lengths.remove(0);
706                    }
707                }
708
709                if sink.is_paused() {
710                    timer.pause();
711                } else {
712                    timer.un_pause();
713                }
714
715                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
716                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
717                last_meter_time.set(current_audio_time);
718                {
719                    let mut meter = output_meter.lock().unwrap();
720                    meter.advance(delta);
721                }
722
723                *time_passed_unlocked = current_audio_time;
724
725                drop(sink);
726                drop(chunk_lengths);
727                drop(time_passed_unlocked);
728                drop(time_chunks_passed);
729                drop(timer);
730            };
731
732            // ===================== //
733            // Update sink for each chunk received from engine
734            // ===================== //
735            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
736            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
737                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
738                    return;
739                }
740                let max_sink_chunks = {
741                    let settings = buffer_settings_for_state.lock().unwrap();
742                    settings.max_sink_chunks
743                };
744                if max_sink_chunks > 0 {
745                    loop {
746                        if abort.load(Ordering::SeqCst) {
747                            return;
748                        }
749                        if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
750                            return;
751                        }
752                        let sink_len = {
753                            let sink = sink_mutex.lock().unwrap();
754                            sink.len()
755                        };
756                        if sink_len < max_sink_chunks {
757                            break;
758                        }
759                        update_chunk_lengths();
760                        if !check_details() {
761                            return;
762                        }
763                        thread::sleep(Duration::from_millis(5));
764                    }
765                }
766                let (delay_ms, late) = {
767                    let mut timing = append_timing.lock().unwrap();
768                    let now = Instant::now();
769                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
770                    let chunk_ms = length_in_seconds * 1000.0;
771                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
772                    if late {
773                        timing.2 = timing.2.saturating_add(1);
774                    }
775                    timing.1 = if timing.1 == 0.0 {
776                        delta_ms
777                    } else {
778                        (timing.1 * 0.9) + (delta_ms * 0.1)
779                    };
780                    timing.3 = timing.3.max(delta_ms);
781                    timing.0 = now;
782                    (delta_ms, late)
783                };
784                audio_heard.store(true, Ordering::Relaxed);
785                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
786
787                {
788                    let mut meter = output_meter.lock().unwrap();
789                    meter.push_samples(&mixer);
790                }
791                {
792                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
793                    metrics.append_delay_ms = delay_ms;
794                    metrics.avg_append_delay_ms = {
795                        if metrics.avg_append_delay_ms == 0.0 {
796                            delay_ms
797                        } else {
798                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
799                        }
800                    };
801                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
802                    metrics.late_append_count = {
803                        let timing = append_timing.lock().unwrap();
804                        timing.2
805                    };
806                    metrics.late_append_active = late;
807                }
808
809                let sink = sink_mutex.lock().unwrap();
810                let append_jitter_log_ms = {
811                    let settings = buffer_settings_for_state.lock().unwrap();
812                    settings.append_jitter_log_ms
813                };
814                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
815                    let expected_ms = length_in_seconds * 1000.0;
816                    log::info!(
817                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
818                        delay_ms,
819                        expected_ms,
820                        late,
821                        append_jitter_log_ms,
822                        sink.len()
823                    );
824                }
825                let mut chunk_lengths = chunk_lengths.lock().unwrap();
826
827                sink.append(mixer);
828
829                drop(sink);
830
831                chunk_lengths.push(length_in_seconds);
832                drop(chunk_lengths);
833
834                update_chunk_lengths();
835                check_details();
836            };
837
838            let receiver = engine.start_receiver();
839            loop {
840                match receiver.recv_timeout(Duration::from_millis(20)) {
841                    Ok(chunk) => {
842                        update_sink(chunk);
843                    }
844                    Err(RecvTimeoutError::Timeout) => {
845                        update_chunk_lengths();
846                        if !check_details() {
847                            break;
848                        }
849                    }
850                    Err(RecvTimeoutError::Disconnected) => break,
851                }
852            }
853            #[cfg(feature = "debug")]
854            log::info!("engine reception loop finished");
855
856            // From here on, all audio is buffered. Stop relying on sink.len()
857            // to advance time so the UI keeps updating while the last buffer plays.
858            buffering_done.store(true, Ordering::Relaxed);
859            buffering_done_flag.store(true, Ordering::Relaxed);
860            {
861                let mut final_duration = final_duration.lock().unwrap();
862                if final_duration.is_none() {
863                    let chunk_lengths = chunk_lengths.lock().unwrap();
864                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
865                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
866                }
867            }
868
869            // ===================== //
870            // Wait until all tracks are finished playing in sink
871            // ===================== //
872            #[cfg(feature = "debug")]
873            {
874                let sink = sink_mutex.lock().unwrap();
875                let paused = sink.is_paused();
876                let empty = sink.empty();
877                let sink_len = sink.len();
878                drop(sink);
879                let time_passed = *time_passed.lock().unwrap();
880                let final_duration = *final_duration.lock().unwrap();
881                log::info!(
882                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
883                    paused,
884                    empty,
885                    sink_len,
886                    time_passed,
887                    final_duration
888                );
889            }
890
891            loop {
892                update_chunk_lengths();
893                if !check_details() {
894                    break;
895                }
896
897                let done = if engine.finished_buffering() {
898                    if let Some(final_duration) = *final_duration.lock().unwrap() {
899                        let time_passed = *time_passed.lock().unwrap();
900                        time_passed >= (final_duration - 0.001).max(0.0)
901                    } else {
902                        false
903                    }
904                } else {
905                    false
906                };
907                if done {
908                    break;
909                }
910
911                thread::sleep(Duration::from_millis(10));
912            }
913
914            #[cfg(feature = "debug")]
915            log::info!("Finished drain loop!");
916
917            // ===================== //
918            // Set playback_thread_exists to false
919            // ===================== //
920            drop(_thread_guard);
921        });
922    }
923
924    /// Start playback from a specific timestamp (seconds).
925    pub fn play_at(&mut self, ts: f64) {
926        let mut timestamp = self.ts.lock().unwrap();
927        *timestamp = ts;
928        drop(timestamp);
929
930        self.request_effects_reset();
931        self.kill_current();
932        // self.stop.store(false, Ordering::SeqCst);
933        self.initialize_thread(Some(ts));
934
935        self.resume();
936
937        self.wait_for_audio_heard(Duration::from_secs(5));
938    }
939
940    /// Start playback from the current timestamp.
941    pub fn play(&mut self) {
942        info!("Playing audio");
943        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
944        // self.stop.store(false, Ordering::SeqCst);
945
946        if !thread_exists {
947            self.initialize_thread(None);
948        }
949
950        self.resume();
951
952        self.wait_for_audio_heard(Duration::from_secs(5));
953    }
954
955    /// Pause playback.
956    pub fn pause(&self) {
957        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
958    }
959
960    /// Resume playback if paused.
961    pub fn resume(&self) {
962        self.state
963            .lock()
964            .unwrap()
965            .clone_from(&PlayerState::Resuming);
966    }
967
968    /// Stop the current playback thread without changing state.
969    pub fn kill_current(&self) {
970        self.state
971            .lock()
972            .unwrap()
973            .clone_from(&PlayerState::Stopping);
974        {
975            let sink = self.sink.lock().unwrap();
976            sink.stop();
977        }
978        self.abort.store(true, Ordering::SeqCst);
979
980        while !self.thread_finished() {
981            thread::sleep(Duration::from_millis(10));
982        }
983
984        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
985    }
986
987    /// Stop playback and reset timing state.
988    pub fn stop(&self) {
989        self.kill_current();
990        self.ts.lock().unwrap().clone_from(&0.0);
991    }
992
993    /// Return true if playback is currently active.
994    pub fn is_playing(&self) -> bool {
995        let state = self.state.lock().unwrap();
996        *state == PlayerState::Playing
997    }
998
999    /// Return true if playback is currently paused.
1000    pub fn is_paused(&self) -> bool {
1001        let state = self.state.lock().unwrap();
1002        *state == PlayerState::Paused
1003    }
1004
1005    /// Get the current playback time in seconds.
1006    pub fn get_time(&self) -> f64 {
1007        let ts = self.ts.lock().unwrap();
1008        *ts
1009    }
1010
1011    fn thread_finished(&self) -> bool {
1012        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
1013        !playback_thread_exists
1014    }
1015
1016    /// Return true if playback has reached the end.
1017    pub fn is_finished(&self) -> bool {
1018        self.thread_finished()
1019        // let state = self.state.lock().unwrap();
1020        // *state == PlayerState::Finished
1021    }
1022
1023    /// Block the current thread until playback finishes.
1024    pub fn sleep_until_end(&self) {
1025        loop {
1026            if self.thread_finished() {
1027                break;
1028            }
1029            thread::sleep(Duration::from_millis(100));
1030        }
1031    }
1032
1033    /// Get the total duration (seconds) of the active selection.
1034    pub fn get_duration(&self) -> f64 {
1035        let duration = self.duration.lock().unwrap();
1036        *duration
1037    }
1038
1039    /// Seek to the given timestamp (seconds).
1040    pub fn seek(&mut self, ts: f64) {
1041        let mut timestamp = self.ts.lock().unwrap();
1042        *timestamp = ts;
1043        drop(timestamp);
1044
1045        let state = self.state.lock().unwrap().clone();
1046        let (seek_fade_out_ms, seek_fade_in_ms) = {
1047            let settings = self.buffer_settings.lock().unwrap();
1048            (settings.seek_fade_out_ms, settings.seek_fade_in_ms)
1049        };
1050        if matches!(state, PlayerState::Playing | PlayerState::Resuming) && seek_fade_out_ms > 0.0 {
1051            self.fade_current_sink_out(seek_fade_out_ms);
1052        }
1053        self.request_effects_reset();
1054
1055        self.kill_current();
1056        self.state.lock().unwrap().clone_from(&state);
1057        self.initialize_thread(Some(ts));
1058
1059        if matches!(state, PlayerState::Playing | PlayerState::Resuming) {
1060            *self.next_resume_fade_ms.lock().unwrap() = Some(seek_fade_in_ms);
1061            self.resume();
1062        }
1063    }
1064
1065    /// Apply a short linear fade-out to the current sink before disruptive ops.
1066    fn fade_current_sink_out(&self, fade_ms: f32) {
1067        let steps = ((fade_ms / 5.0).ceil() as u32).max(1);
1068        let step_ms = (fade_ms / steps as f32).max(1.0) as u64;
1069        let sink = self.sink.lock().unwrap();
1070        let start_volume = sink.volume().max(0.0);
1071        if start_volume <= 0.0 {
1072            return;
1073        }
1074        for step in 1..=steps {
1075            let t = step as f32 / steps as f32;
1076            let gain = start_volume * (1.0 - t);
1077            sink.set_volume(gain.max(0.0));
1078            thread::sleep(Duration::from_millis(step_ms));
1079        }
1080    }
1081
1082    /// Refresh active track selections from the underlying container.
1083    pub fn refresh_tracks(&mut self) {
1084        let mut prot = self.prot.lock().unwrap();
1085        prot.refresh_tracks();
1086        if let Some(spec) = self.impulse_response_override.clone() {
1087            prot.set_impulse_response_spec(spec);
1088        }
1089        if let Some(tail_db) = self.impulse_response_tail_override {
1090            prot.set_impulse_response_tail_db(tail_db);
1091        }
1092        drop(prot);
1093
1094        self.request_effects_reset();
1095        // If stopped, return
1096        if self.thread_finished() {
1097            return;
1098        }
1099
1100        // Kill current thread and start
1101        // new thread at the current timestamp
1102        let ts = self.get_time();
1103        self.seek(ts);
1104
1105        // If previously playing, resume
1106        if self.is_playing() {
1107            self.resume();
1108        }
1109
1110        self.wait_for_audio_heard(Duration::from_secs(5));
1111    }
1112
1113    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1114        let start = Instant::now();
1115        loop {
1116            if self.audio_heard.load(Ordering::Relaxed) {
1117                return true;
1118            }
1119            if self.thread_finished() {
1120                warn!("playback thread ended before audio was heard");
1121                return false;
1122            }
1123            if start.elapsed() >= timeout {
1124                warn!("timed out waiting for audio to start");
1125                return false;
1126            }
1127            thread::sleep(Duration::from_millis(10));
1128        }
1129    }
1130
1131    /// Shuffle track selections and restart playback.
1132    pub fn shuffle(&mut self) {
1133        self.refresh_tracks();
1134    }
1135
1136    /// Set the playback volume (0.0-1.0).
1137    pub fn set_volume(&mut self, new_volume: f32) {
1138        let sink = self.sink.lock().unwrap();
1139        sink.set_volume(new_volume);
1140        drop(sink);
1141
1142        let mut volume = self.volume.lock().unwrap();
1143        *volume = new_volume;
1144        drop(volume);
1145    }
1146
1147    /// Get the current playback volume.
1148    pub fn get_volume(&self) -> f32 {
1149        *self.volume.lock().unwrap()
1150    }
1151
1152    /// Get the track identifiers used for display.
1153    pub fn get_ids(&self) -> Vec<String> {
1154        let prot = self.prot.lock().unwrap();
1155
1156        return prot.get_ids();
1157    }
1158
1159    /// Get the full timestamped shuffle schedule used by playback.
1160    ///
1161    /// Each entry is `(time_seconds, selected_ids_or_paths)`.
1162    pub fn get_shuffle_schedule(&self) -> Vec<(f64, Vec<String>)> {
1163        let prot = self.prot.lock().unwrap();
1164        prot.get_shuffle_schedule()
1165    }
1166
1167    /// Enable periodic reporting of playback status for UI consumers.
1168    pub fn set_reporting(
1169        &mut self,
1170        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1171        reporting_interval: Duration,
1172    ) {
1173        if self.reporter.is_some() {
1174            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1175        }
1176
1177        let reporter = Arc::new(Mutex::new(Reporter::new(
1178            Arc::new(Mutex::new(self.clone())),
1179            reporting,
1180            reporting_interval,
1181        )));
1182
1183        reporter.lock().unwrap().start();
1184
1185        self.reporter = Some(reporter);
1186    }
1187}
1188
1189fn linear_to_dbfs(value: f32) -> f32 {
1190    if value <= 0.0 {
1191        f32::NEG_INFINITY
1192    } else {
1193        20.0 * value.log10()
1194    }
1195}
1196
1197fn now_ms() -> u64 {
1198    use std::time::SystemTime;
1199    SystemTime::now()
1200        .duration_since(SystemTime::UNIX_EPOCH)
1201        .map(|d| d.as_millis() as u64)
1202        .unwrap_or(0)
1203}