Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::container::prot::Prot;
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19    container::info::Info,
20    dsp::effects::AudioEffect,
21    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24/// High-level playback state for the player.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27    Init,
28    Resuming,
29    Playing,
30    Pausing,
31    Paused,
32    Stopping,
33    Stopped,
34    Finished,
35}
36
37/// Snapshot of convolution reverb settings for UI consumers.
38#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40    pub enabled: bool,
41    pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45
46/// Primary playback controller.
47///
48/// `Player` owns the playback threads, buffering state, and runtime settings
49/// such as volume and reverb configuration.
50#[derive(Clone)]
51pub struct Player {
52    pub info: Info,
53    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
54    pub ts: Arc<Mutex<f64>>,
55    state: Arc<Mutex<PlayerState>>,
56    abort: Arc<AtomicBool>,
57    playback_thread_exists: Arc<AtomicBool>,
58    playback_id: Arc<AtomicU64>,
59    duration: Arc<Mutex<f64>>,
60    prot: Arc<Mutex<Prot>>,
61    audio_heard: Arc<AtomicBool>,
62    volume: Arc<Mutex<f32>>,
63    sink: Arc<Mutex<Sink>>,
64    reporter: Option<Arc<Mutex<Reporter>>>,
65    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
66    effects: Arc<Mutex<Vec<AudioEffect>>>,
67    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
68    effects_reset: Arc<AtomicU64>,
69    output_meter: Arc<Mutex<OutputMeter>>,
70    buffering_done: Arc<AtomicBool>,
71    last_chunk_ms: Arc<AtomicU64>,
72    last_time_update_ms: Arc<AtomicU64>,
73    impulse_response_override: Option<ImpulseResponseSpec>,
74    impulse_response_tail_override: Option<f32>,
75}
76
77impl Player {
78    /// Create a new player for a single container path.
79    pub fn new(file_path: &String) -> Self {
80        let this = Self::new_from_path_or_paths(Some(file_path), None);
81        this
82    }
83
84    /// Create a new player for a set of standalone file paths.
85    pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
86        let this = Self::new_from_path_or_paths(None, Some(file_paths));
87        this
88    }
89
90    /// Create a player from either a container path or standalone file paths.
91    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
92        let (prot, info) = match path {
93            Some(path) => {
94                let prot = Arc::new(Mutex::new(Prot::new(path)));
95                let info = Info::new(path.clone());
96                (prot, info)
97            }
98            None => {
99                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
100                let locked_prot = prot.lock().unwrap();
101                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
102                drop(locked_prot);
103                (prot, info)
104            }
105        };
106
107        let (sink, _queue) = Sink::new();
108        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
109
110        let channels = info.channels as usize;
111        let sample_rate = info.sample_rate;
112        let effects = {
113            let prot_locked = prot.lock().unwrap();
114            match prot_locked.get_effects() {
115                Some(effects) => Arc::new(Mutex::new(effects)),
116                None => Arc::new(Mutex::new(vec![])),
117            }
118        };
119
120        let mut this = Self {
121            info,
122            finished_tracks: Arc::new(Mutex::new(Vec::new())),
123            state: Arc::new(Mutex::new(PlayerState::Stopped)),
124            abort: Arc::new(AtomicBool::new(false)),
125            ts: Arc::new(Mutex::new(0.0)),
126            playback_thread_exists: Arc::new(AtomicBool::new(true)),
127            playback_id: Arc::new(AtomicU64::new(0)),
128            duration: Arc::new(Mutex::new(0.0)),
129            audio_heard: Arc::new(AtomicBool::new(false)),
130            volume: Arc::new(Mutex::new(0.8)),
131            sink,
132            prot,
133            reporter: None,
134            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
135            effects,
136            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
137            effects_reset: Arc::new(AtomicU64::new(0)),
138            output_meter: Arc::new(Mutex::new(OutputMeter::new(
139                channels,
140                sample_rate,
141                OUTPUT_METER_REFRESH_HZ,
142            ))),
143            buffering_done: Arc::new(AtomicBool::new(false)),
144            last_chunk_ms: Arc::new(AtomicU64::new(0)),
145            last_time_update_ms: Arc::new(AtomicU64::new(0)),
146            impulse_response_override: None,
147            impulse_response_tail_override: None,
148        };
149
150        this.initialize_thread(None);
151
152        this
153    }
154
155    /// Override the impulse response used for convolution reverb.
156    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
157        self.impulse_response_override = Some(spec.clone());
158        let mut prot = self.prot.lock().unwrap();
159        prot.set_impulse_response_spec(spec);
160        self.request_effects_reset();
161    }
162
163    /// Parse and apply an impulse response spec string.
164    pub fn set_impulse_response_from_string(&mut self, value: &str) {
165        if let Some(spec) = parse_impulse_response_string(value) {
166            self.set_impulse_response_spec(spec);
167        }
168    }
169
170    /// Override the impulse response tail trim (dB).
171    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
172        self.impulse_response_tail_override = Some(tail_db);
173        let mut prot = self.prot.lock().unwrap();
174        prot.set_impulse_response_tail_db(tail_db);
175        self.request_effects_reset();
176    }
177
178    /// Enable or disable convolution reverb.
179    pub fn set_reverb_enabled(&self, enabled: bool) {
180        let mut effects = self.effects.lock().unwrap();
181        if let Some(effect) = effects
182            .iter_mut()
183            .find_map(|effect| effect.as_convolution_reverb_mut())
184        {
185            effect.enabled = enabled;
186        }
187        if let Some(effect) = effects
188            .iter_mut()
189            .find_map(|effect| effect.as_delay_reverb_mut())
190        {
191            effect.enabled = enabled;
192        }
193    }
194
195    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
196    pub fn set_reverb_mix(&self, dry_wet: f32) {
197        let mut effects = self.effects.lock().unwrap();
198        if let Some(effect) = effects
199            .iter_mut()
200            .find_map(|effect| effect.as_convolution_reverb_mut())
201        {
202            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
203        }
204        if let Some(effect) = effects
205            .iter_mut()
206            .find_map(|effect| effect.as_delay_reverb_mut())
207        {
208            effect.mix = dry_wet.clamp(0.0, 1.0);
209        }
210        if let Some(effect) = effects
211            .iter_mut()
212            .find_map(|effect| effect.as_diffusion_reverb_mut())
213        {
214            effect.mix = dry_wet.clamp(0.0, 1.0);
215        }
216    }
217
218    /// Retrieve the current reverb settings snapshot.
219    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
220        let effects = self.effects.lock().unwrap();
221        if let Some(effect) = effects
222            .iter()
223            .find_map(|effect| effect.as_convolution_reverb())
224        {
225            return ReverbSettingsSnapshot {
226                enabled: effect.enabled,
227                dry_wet: effect.dry_wet,
228            };
229        }
230        if let Some(effect) = effects
231            .iter()
232            .find_map(|effect| effect.as_diffusion_reverb())
233        {
234            return ReverbSettingsSnapshot {
235                enabled: effect.enabled,
236                dry_wet: effect.mix,
237            };
238        }
239        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
240            return ReverbSettingsSnapshot {
241                enabled: effect.enabled,
242                dry_wet: effect.mix,
243            };
244        }
245        ReverbSettingsSnapshot {
246            enabled: false,
247            dry_wet: 0.0,
248        }
249    }
250
251    /// Snapshot the active effect chain names.
252    #[allow(deprecated)]
253    pub fn get_effect_names(&self) -> Vec<String> {
254        let effects = self.effects.lock().unwrap();
255        effects
256            .iter()
257            .map(|effect| match effect {
258                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
259                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
260                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
261                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
262                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
263                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
264                AudioEffect::Distortion(_) => "Distortion".to_string(),
265                AudioEffect::Compressor(_) => "Compressor".to_string(),
266                AudioEffect::Limiter(_) => "Limiter".to_string(),
267            })
268            .collect()
269    }
270
271    /// Replace the active DSP effects chain.
272    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
273        {
274            let mut guard = self.effects.lock().unwrap();
275            println!("New Effects: {:?}", effects);
276            *guard = effects;
277        }
278        self.request_effects_reset();
279
280        // Seeking to the current time stamp refreshes the
281        // Sink so that the new effects are applied immediately.
282        if !self.thread_finished() {
283            let ts = self.get_time();
284            self.seek(ts);
285        }
286    }
287
288    /// Retrieve the latest DSP chain performance metrics.
289    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
290        *self.dsp_metrics.lock().unwrap()
291    }
292
293    /// Retrieve the most recent per-channel peak levels.
294    pub fn get_levels(&self) -> Vec<f32> {
295        self.output_meter.lock().unwrap().levels()
296    }
297
298    /// Retrieve the most recent per-channel peak levels in dBFS.
299    pub fn get_levels_db(&self) -> Vec<f32> {
300        self.output_meter
301            .lock()
302            .unwrap()
303            .levels()
304            .into_iter()
305            .map(linear_to_dbfs)
306            .collect()
307    }
308
309    /// Retrieve the most recent per-channel average levels.
310    pub fn get_levels_avg(&self) -> Vec<f32> {
311        self.output_meter.lock().unwrap().averages()
312    }
313
314    /// Set the output meter refresh rate (frames per second).
315    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
316        self.output_meter.lock().unwrap().set_refresh_hz(hz);
317    }
318
319    /// Debug helper returning thread alive, state, and audio heard flags.
320    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
321        (
322            self.playback_thread_exists.load(Ordering::SeqCst),
323            *self.state.lock().unwrap(),
324            self.audio_heard.load(Ordering::Relaxed),
325        )
326    }
327
328    /// Debug helper indicating whether buffering has completed.
329    pub fn debug_buffering_done(&self) -> bool {
330        self.buffering_done.load(Ordering::Relaxed)
331    }
332
333    /// Debug helper returning internal timing markers in milliseconds.
334    pub fn debug_timing_ms(&self) -> (u64, u64) {
335        (
336            self.last_chunk_ms.load(Ordering::Relaxed),
337            self.last_time_update_ms.load(Ordering::Relaxed),
338        )
339    }
340
341    /// Debug helper returning sink paused/empty flags and queued length.
342    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
343        let sink = self.sink.lock().unwrap();
344        let paused = sink.is_paused();
345        let empty = sink.empty();
346        let len = sink.len();
347        (paused, empty, len)
348    }
349
350    fn request_effects_reset(&self) {
351        self.effects_reset.fetch_add(1, Ordering::SeqCst);
352    }
353
354    /// Configure the minimum buffered audio (ms) before playback starts.
355    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
356        let mut settings = self.buffer_settings.lock().unwrap();
357        settings.start_buffer_ms = start_buffer_ms.max(0.0);
358    }
359
360    /// Configure heuristic end-of-track threshold for containers (ms).
361    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
362        let mut settings = self.buffer_settings.lock().unwrap();
363        settings.track_eos_ms = track_eos_ms.max(0.0);
364    }
365
366    /// Configure minimum sink chunks queued before playback starts/resumes.
367    pub fn set_start_sink_chunks(&self, chunks: usize) {
368        let mut settings = self.buffer_settings.lock().unwrap();
369        settings.start_sink_chunks = chunks;
370    }
371
372    /// Configure the startup silence pre-roll (ms).
373    pub fn set_startup_silence_ms(&self, ms: f32) {
374        let mut settings = self.buffer_settings.lock().unwrap();
375        settings.startup_silence_ms = ms.max(0.0);
376    }
377
378    /// Configure the startup fade-in length (ms).
379    pub fn set_startup_fade_ms(&self, ms: f32) {
380        let mut settings = self.buffer_settings.lock().unwrap();
381        settings.startup_fade_ms = ms.max(0.0);
382    }
383
384    /// Configure the append jitter logging threshold (ms). 0 disables logging.
385    pub fn set_append_jitter_log_ms(&self, ms: f32) {
386        let mut settings = self.buffer_settings.lock().unwrap();
387        settings.append_jitter_log_ms = ms.max(0.0);
388    }
389
390    /// Enable or disable per-effect boundary discontinuity logging.
391    pub fn set_effect_boundary_log(&self, enabled: bool) {
392        let mut settings = self.buffer_settings.lock().unwrap();
393        settings.effect_boundary_log = enabled;
394    }
395
396    fn initialize_thread(&mut self, ts: Option<f64>) {
397        // Empty finished_tracks
398        let mut finished_tracks = self.finished_tracks.lock().unwrap();
399        finished_tracks.clear();
400        drop(finished_tracks);
401
402        // ===== Set play options ===== //
403        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
404        self.abort = Arc::new(AtomicBool::new(false));
405        self.playback_thread_exists.store(true, Ordering::SeqCst);
406        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
407        self.buffering_done.store(false, Ordering::SeqCst);
408        let now_ms_value = now_ms();
409        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
410        self.last_time_update_ms
411            .store(now_ms_value, Ordering::Relaxed);
412
413        // ===== Clone variables ===== //
414        let play_state = self.state.clone();
415        let abort = self.abort.clone();
416        let playback_thread_exists = self.playback_thread_exists.clone();
417        let playback_id_atomic = self.playback_id.clone();
418        let time_passed = self.ts.clone();
419
420        let duration = self.duration.clone();
421        let prot = self.prot.clone();
422        let buffer_settings = self.buffer_settings.clone();
423        let buffer_settings_for_state = self.buffer_settings.clone();
424        let effects = self.effects.clone();
425        let dsp_metrics = self.dsp_metrics.clone();
426        let dsp_metrics_for_sink = self.dsp_metrics.clone();
427        let effects_reset = self.effects_reset.clone();
428        let output_meter = self.output_meter.clone();
429        let audio_info = self.info.clone();
430
431        let audio_heard = self.audio_heard.clone();
432        let volume = self.volume.clone();
433        let sink_mutex = self.sink.clone();
434        let buffer_done_thread_flag = self.buffering_done.clone();
435        let last_chunk_ms = self.last_chunk_ms.clone();
436        let last_time_update_ms = self.last_time_update_ms.clone();
437
438        audio_heard.store(false, Ordering::Relaxed);
439
440        {
441            let mut meter = self.output_meter.lock().unwrap();
442            meter.reset();
443        }
444
445        // ===== Start playback ===== //
446        thread::spawn(move || {
447            // ===================== //
448            // Set playback_thread_exists to true
449            // ===================== //
450            playback_thread_exists.store(true, Ordering::Relaxed);
451
452            // ===================== //
453            // Initialize engine & sink
454            // ===================== //
455            let start_time = match ts {
456                Some(ts) => ts,
457                None => 0.0,
458            };
459            let mut engine = PlayerEngine::new(
460                prot,
461                Some(abort.clone()),
462                start_time,
463                buffer_settings,
464                effects,
465                dsp_metrics,
466                effects_reset,
467            );
468            let _stream = OutputStreamBuilder::open_default_stream().unwrap();
469            let mixer = _stream.mixer().clone();
470
471            let mut sink = sink_mutex.lock().unwrap();
472            *sink = Sink::connect_new(&mixer);
473            sink.pause();
474            sink.set_volume(*volume.lock().unwrap());
475            drop(sink);
476
477            // ===================== //
478            // Set duration from engine
479            // ===================== //
480            let mut duration = duration.lock().unwrap();
481            *duration = engine.get_duration();
482            drop(duration);
483
484            // ===================== //
485            // Initialize chunk_lengths & time_passed
486            // ===================== //
487            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
488            let mut time_passed_unlocked = time_passed.lock().unwrap();
489            *time_passed_unlocked = start_time;
490            drop(time_passed_unlocked);
491
492            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
493                let timestamp = *time_passed.lock().unwrap();
494
495                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
496                // Fade out and pause sink
497                while sink.volume() > 0.0 && timestamp != start_time {
498                    sink.set_volume(sink.volume() - fade_increments);
499                    thread::sleep(Duration::from_millis(10));
500                }
501                sink.pause();
502            };
503
504            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
505                let volume = *volume.lock().unwrap();
506                if fade_length_in_seconds <= 0.0 {
507                    sink.play();
508                    sink.set_volume(volume);
509                    return;
510                }
511                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
512                // Fade in and play sink
513                sink.play();
514                while sink.volume() < volume {
515                    sink.set_volume(sink.volume() + fade_increments);
516                    thread::sleep(Duration::from_millis(5));
517                }
518            };
519
520            // ===================== //
521            // Start sink with startup silence + fade in
522            // ===================== //
523            {
524                let startup_settings = buffer_settings_for_state.lock().unwrap();
525                let startup_silence_ms = startup_settings.startup_silence_ms;
526                drop(startup_settings);
527
528                let sample_rate = audio_info.sample_rate as u32;
529                let channels = audio_info.channels as u16;
530
531                if startup_silence_ms > 0.0 {
532                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
533                        as usize
534                        * channels as usize;
535                    let silence = vec![0.0_f32; samples.max(1)];
536                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
537                    let sink = sink_mutex.lock().unwrap();
538                    sink.append(silence_buffer);
539                    drop(sink);
540                }
541
542            }
543
544            // ===================== //
545            // Check if the player should be paused or not
546            // ===================== //
547            let startup_fade_pending = Cell::new(true);
548            let check_details = || {
549                if abort.load(Ordering::SeqCst) {
550                    let sink = sink_mutex.lock().unwrap();
551                    pause_sink(&sink, 0.1);
552                    sink.clear();
553                    drop(sink);
554
555                    return false;
556                }
557
558                let sink = sink_mutex.lock().unwrap();
559                let state = play_state.lock().unwrap().clone();
560                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
561                if state == PlayerState::Resuming
562                    && start_sink_chunks > 0
563                    && sink.len() < start_sink_chunks
564                {
565                    // Keep paused until enough chunks are queued.
566                    sink.pause();
567                    drop(sink);
568                    return true;
569                }
570                if state == PlayerState::Pausing {
571                    pause_sink(&sink, 0.1);
572                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
573                }
574                if state == PlayerState::Resuming {
575                    let fade_length = if startup_fade_pending.replace(false) {
576                        let startup_fade_ms = buffer_settings_for_state.lock().unwrap().startup_fade_ms;
577                        (startup_fade_ms / 1000.0).max(0.0)
578                    } else {
579                        0.1
580                    };
581                    resume_sink(&sink, fade_length);
582                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
583                }
584                drop(sink);
585
586                true
587            };
588
589            // ===================== //
590            // Update chunk_lengths / time_passed
591            // ===================== //
592            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
593            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
594            let buffering_done = Arc::new(AtomicBool::new(false));
595            let buffering_done_flag = buffer_done_thread_flag.clone();
596            let final_duration = Arc::new(Mutex::new(None::<f64>));
597            let mut timer = timer_mut.lock().unwrap();
598            timer.start();
599            drop(timer);
600
601            let last_meter_time = Cell::new(0.0_f64);
602            let update_chunk_lengths = || {
603                if abort.load(Ordering::SeqCst) {
604                    return;
605                }
606
607                let mut chunk_lengths = chunk_lengths.lock().unwrap();
608                let mut time_passed_unlocked = time_passed.lock().unwrap();
609                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
610                let mut timer = timer_mut.lock().unwrap();
611                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
612                let sink = sink_mutex.lock().unwrap();
613                if !buffering_done.load(Ordering::Relaxed) {
614                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
615                    // since the last time this function was called and add that to time_passed.
616                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
617
618                    for _ in 0..chunks_played {
619                        timer.reset();
620                        timer.start();
621                        *time_chunks_passed += chunk_lengths.remove(0);
622                    }
623                }
624
625                if sink.is_paused() {
626                    timer.pause();
627                } else {
628                    timer.un_pause();
629                }
630
631                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
632                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
633                last_meter_time.set(current_audio_time);
634                {
635                    let mut meter = output_meter.lock().unwrap();
636                    meter.advance(delta);
637                }
638
639                *time_passed_unlocked = current_audio_time;
640
641                drop(sink);
642                drop(chunk_lengths);
643                drop(time_passed_unlocked);
644                drop(time_chunks_passed);
645                drop(timer);
646            };
647
648            // ===================== //
649            // Update sink for each chunk received from engine
650            // ===================== //
651            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
652            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
653                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
654                    return;
655                }
656                let (delay_ms, late) = {
657                    let mut timing = append_timing.lock().unwrap();
658                    let now = Instant::now();
659                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
660                    let chunk_ms = length_in_seconds * 1000.0;
661                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
662                    if late {
663                        timing.2 = timing.2.saturating_add(1);
664                    }
665                    timing.1 = if timing.1 == 0.0 {
666                        delta_ms
667                    } else {
668                        (timing.1 * 0.9) + (delta_ms * 0.1)
669                    };
670                    timing.3 = timing.3.max(delta_ms);
671                    timing.0 = now;
672                    (delta_ms, late)
673                };
674                audio_heard.store(true, Ordering::Relaxed);
675                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
676
677                {
678                    let mut meter = output_meter.lock().unwrap();
679                    meter.push_samples(&mixer);
680                }
681                {
682                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
683                    metrics.append_delay_ms = delay_ms;
684                    metrics.avg_append_delay_ms = {
685                        if metrics.avg_append_delay_ms == 0.0 {
686                            delay_ms
687                        } else {
688                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
689                        }
690                    };
691                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
692                    metrics.late_append_count = {
693                        let timing = append_timing.lock().unwrap();
694                        timing.2
695                    };
696                    metrics.late_append_active = late;
697                }
698
699                let sink = sink_mutex.lock().unwrap();
700                let append_jitter_log_ms = {
701                    let settings = buffer_settings_for_state.lock().unwrap();
702                    settings.append_jitter_log_ms
703                };
704                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
705                    let expected_ms = length_in_seconds * 1000.0;
706                    log::info!(
707                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
708                        delay_ms,
709                        expected_ms,
710                        late,
711                        append_jitter_log_ms,
712                        sink.len()
713                    );
714                }
715                let mut chunk_lengths = chunk_lengths.lock().unwrap();
716
717                sink.append(mixer);
718
719                drop(sink);
720
721                chunk_lengths.push(length_in_seconds);
722                drop(chunk_lengths);
723
724                update_chunk_lengths();
725                check_details();
726            };
727
728            let receiver = engine.start_receiver();
729            loop {
730                match receiver.recv_timeout(Duration::from_millis(20)) {
731                    Ok(chunk) => {
732                        update_sink(chunk);
733                    }
734                    Err(RecvTimeoutError::Timeout) => {
735                        update_chunk_lengths();
736                        if !check_details() {
737                            break;
738                        }
739                    }
740                    Err(RecvTimeoutError::Disconnected) => break,
741                }
742            }
743            #[cfg(feature = "debug")]
744            log::info!("engine reception loop finished");
745
746            // From here on, all audio is buffered. Stop relying on sink.len()
747            // to advance time so the UI keeps updating while the last buffer plays.
748            buffering_done.store(true, Ordering::Relaxed);
749            buffering_done_flag.store(true, Ordering::Relaxed);
750            {
751                let mut final_duration = final_duration.lock().unwrap();
752                if final_duration.is_none() {
753                    let chunk_lengths = chunk_lengths.lock().unwrap();
754                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
755                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
756                }
757            }
758
759            // ===================== //
760            // Wait until all tracks are finished playing in sink
761            // ===================== //
762            #[cfg(feature = "debug")]
763            {
764                let sink = sink_mutex.lock().unwrap();
765                let paused = sink.is_paused();
766                let empty = sink.empty();
767                let sink_len = sink.len();
768                drop(sink);
769                let time_passed = *time_passed.lock().unwrap();
770                let final_duration = *final_duration.lock().unwrap();
771                log::info!(
772                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
773                    paused,
774                    empty,
775                    sink_len,
776                    time_passed,
777                    final_duration
778                );
779            }
780
781            loop {
782                update_chunk_lengths();
783                if !check_details() {
784                    break;
785                }
786
787                let done = if engine.finished_buffering() {
788                    if let Some(final_duration) = *final_duration.lock().unwrap() {
789                        let time_passed = *time_passed.lock().unwrap();
790                        time_passed >= (final_duration - 0.001).max(0.0)
791                    } else {
792                        false
793                    }
794                } else {
795                    false
796                };
797                if done {
798                    break;
799                }
800
801                thread::sleep(Duration::from_millis(10));
802            }
803
804            #[cfg(feature = "debug")]
805            log::info!("Finished drain loop!");
806
807            // ===================== //
808            // Set playback_thread_exists to false
809            // ===================== //
810            playback_thread_exists.store(false, Ordering::Relaxed);
811        });
812    }
813
814    /// Start playback from a specific timestamp (seconds).
815    pub fn play_at(&mut self, ts: f64) {
816        let mut timestamp = self.ts.lock().unwrap();
817        *timestamp = ts;
818        drop(timestamp);
819
820        self.request_effects_reset();
821        self.kill_current();
822        // self.stop.store(false, Ordering::SeqCst);
823        self.initialize_thread(Some(ts));
824
825        self.resume();
826
827        self.wait_for_audio_heard(Duration::from_secs(5));
828    }
829
830    /// Start playback from the current timestamp.
831    pub fn play(&mut self) {
832        info!("Playing audio");
833        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
834        // self.stop.store(false, Ordering::SeqCst);
835
836        if !thread_exists {
837            self.initialize_thread(None);
838        }
839
840        self.resume();
841
842        self.wait_for_audio_heard(Duration::from_secs(5));
843    }
844
845    /// Pause playback.
846    pub fn pause(&self) {
847        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
848    }
849
850    /// Resume playback if paused.
851    pub fn resume(&self) {
852        self.state
853            .lock()
854            .unwrap()
855            .clone_from(&PlayerState::Resuming);
856    }
857
858    /// Stop the current playback thread without changing state.
859    pub fn kill_current(&self) {
860        self.state
861            .lock()
862            .unwrap()
863            .clone_from(&PlayerState::Stopping);
864        {
865            let sink = self.sink.lock().unwrap();
866            sink.stop();
867        }
868        self.abort.store(true, Ordering::SeqCst);
869
870        while !self.thread_finished() {
871            thread::sleep(Duration::from_millis(10));
872        }
873
874        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
875    }
876
877    /// Stop playback and reset timing state.
878    pub fn stop(&self) {
879        self.kill_current();
880        self.ts.lock().unwrap().clone_from(&0.0);
881    }
882
883    /// Return true if playback is currently active.
884    pub fn is_playing(&self) -> bool {
885        let state = self.state.lock().unwrap();
886        *state == PlayerState::Playing
887    }
888
889    /// Return true if playback is currently paused.
890    pub fn is_paused(&self) -> bool {
891        let state = self.state.lock().unwrap();
892        *state == PlayerState::Paused
893    }
894
895    /// Get the current playback time in seconds.
896    pub fn get_time(&self) -> f64 {
897        let ts = self.ts.lock().unwrap();
898        *ts
899    }
900
901    fn thread_finished(&self) -> bool {
902        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
903        !playback_thread_exists
904    }
905
906    /// Return true if playback has reached the end.
907    pub fn is_finished(&self) -> bool {
908        self.thread_finished()
909        // let state = self.state.lock().unwrap();
910        // *state == PlayerState::Finished
911    }
912
913    /// Block the current thread until playback finishes.
914    pub fn sleep_until_end(&self) {
915        loop {
916            if self.thread_finished() {
917                break;
918            }
919            thread::sleep(Duration::from_millis(100));
920        }
921    }
922
923    /// Get the total duration (seconds) of the active selection.
924    pub fn get_duration(&self) -> f64 {
925        let duration = self.duration.lock().unwrap();
926        *duration
927    }
928
929    /// Seek to the given timestamp (seconds).
930    pub fn seek(&mut self, ts: f64) {
931        let mut timestamp = self.ts.lock().unwrap();
932        *timestamp = ts;
933        drop(timestamp);
934
935        self.request_effects_reset();
936        let state = self.state.lock().unwrap().clone();
937
938        self.kill_current();
939        self.state.lock().unwrap().clone_from(&state);
940        self.initialize_thread(Some(ts));
941
942        if state == PlayerState::Playing {
943            self.resume();
944        }
945    }
946
947    /// Refresh active track selections from the underlying container.
948    pub fn refresh_tracks(&mut self) {
949        let mut prot = self.prot.lock().unwrap();
950        prot.refresh_tracks();
951        if let Some(spec) = self.impulse_response_override.clone() {
952            prot.set_impulse_response_spec(spec);
953        }
954        if let Some(tail_db) = self.impulse_response_tail_override {
955            prot.set_impulse_response_tail_db(tail_db);
956        }
957        drop(prot);
958
959        self.request_effects_reset();
960        // If stopped, return
961        if self.thread_finished() {
962            return;
963        }
964
965        // Kill current thread and start
966        // new thread at the current timestamp
967        let ts = self.get_time();
968        self.seek(ts);
969
970        // If previously playing, resume
971        if self.is_playing() {
972            self.resume();
973        }
974
975        self.wait_for_audio_heard(Duration::from_secs(5));
976    }
977
978    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
979        let start = Instant::now();
980        loop {
981            if self.audio_heard.load(Ordering::Relaxed) {
982                return true;
983            }
984            if self.thread_finished() {
985                warn!("playback thread ended before audio was heard");
986                return false;
987            }
988            if start.elapsed() >= timeout {
989                warn!("timed out waiting for audio to start");
990                return false;
991            }
992            thread::sleep(Duration::from_millis(10));
993        }
994    }
995
996    /// Shuffle track selections and restart playback.
997    pub fn shuffle(&mut self) {
998        self.refresh_tracks();
999    }
1000
1001    /// Set the playback volume (0.0-1.0).
1002    pub fn set_volume(&mut self, new_volume: f32) {
1003        let sink = self.sink.lock().unwrap();
1004        sink.set_volume(new_volume);
1005        drop(sink);
1006
1007        let mut volume = self.volume.lock().unwrap();
1008        *volume = new_volume;
1009        drop(volume);
1010    }
1011
1012    /// Get the current playback volume.
1013    pub fn get_volume(&self) -> f32 {
1014        *self.volume.lock().unwrap()
1015    }
1016
1017    /// Get the track identifiers used for display.
1018    pub fn get_ids(&self) -> Vec<String> {
1019        let prot = self.prot.lock().unwrap();
1020
1021        return prot.get_ids();
1022    }
1023
1024    /// Enable periodic reporting of playback status for UI consumers.
1025    pub fn set_reporting(
1026        &mut self,
1027        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1028        reporting_interval: Duration,
1029    ) {
1030        if self.reporter.is_some() {
1031            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1032        }
1033
1034        let reporter = Arc::new(Mutex::new(Reporter::new(
1035            Arc::new(Mutex::new(self.clone())),
1036            reporting,
1037            reporting_interval,
1038        )));
1039
1040        reporter.lock().unwrap().start();
1041
1042        self.reporter = Some(reporter);
1043    }
1044}
1045
1046fn linear_to_dbfs(value: f32) -> f32 {
1047    if value <= 0.0 {
1048        f32::NEG_INFINITY
1049    } else {
1050        20.0 * value.log10()
1051    }
1052}
1053
1054fn now_ms() -> u64 {
1055    use std::time::SystemTime;
1056    SystemTime::now()
1057        .duration_since(SystemTime::UNIX_EPOCH)
1058        .map(|d| d.as_millis() as u64)
1059        .unwrap_or(0)
1060}