Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::container::prot::Prot;
14use crate::diagnostics::reporter::{Report, Reporter};
15use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
16use crate::playback::output_meter::OutputMeter;
17use crate::tools::timer;
18use crate::{
19    container::info::Info,
20    dsp::effects::AudioEffect,
21    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
22};
23
24/// High-level playback state for the player.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum PlayerState {
27    Init,
28    Resuming,
29    Playing,
30    Pausing,
31    Paused,
32    Stopping,
33    Stopped,
34    Finished,
35}
36
37/// Snapshot of convolution reverb settings for UI consumers.
38#[derive(Debug, Clone, Copy)]
39pub struct ReverbSettingsSnapshot {
40    pub enabled: bool,
41    pub dry_wet: f32,
42}
43
44const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
45
46/// Primary playback controller.
47///
48/// `Player` owns the playback threads, buffering state, and runtime settings
49/// such as volume and reverb configuration.
50#[derive(Clone)]
51pub struct Player {
52    pub info: Info,
53    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
54    pub ts: Arc<Mutex<f64>>,
55    state: Arc<Mutex<PlayerState>>,
56    abort: Arc<AtomicBool>,
57    playback_thread_exists: Arc<AtomicBool>,
58    playback_id: Arc<AtomicU64>,
59    duration: Arc<Mutex<f64>>,
60    prot: Arc<Mutex<Prot>>,
61    audio_heard: Arc<AtomicBool>,
62    volume: Arc<Mutex<f32>>,
63    sink: Arc<Mutex<Sink>>,
64    reporter: Option<Arc<Mutex<Reporter>>>,
65    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
66    effects: Arc<Mutex<Vec<AudioEffect>>>,
67    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
68    effects_reset: Arc<AtomicU64>,
69    output_meter: Arc<Mutex<OutputMeter>>,
70    buffering_done: Arc<AtomicBool>,
71    last_chunk_ms: Arc<AtomicU64>,
72    last_time_update_ms: Arc<AtomicU64>,
73    impulse_response_override: Option<ImpulseResponseSpec>,
74    impulse_response_tail_override: Option<f32>,
75}
76
77impl Player {
78    /// Create a new player for a single container path.
79    pub fn new(file_path: &String) -> Self {
80        let this = Self::new_from_path_or_paths(Some(file_path), None);
81        this
82    }
83
84    /// Create a new player for a set of standalone file paths.
85    pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
86        let this = Self::new_from_path_or_paths(None, Some(file_paths));
87        this
88    }
89
90    /// Create a player from either a container path or standalone file paths.
91    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
92        let (prot, info) = match path {
93            Some(path) => {
94                let prot = Arc::new(Mutex::new(Prot::new(path)));
95                let info = Info::new(path.clone());
96                (prot, info)
97            }
98            None => {
99                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
100                let locked_prot = prot.lock().unwrap();
101                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
102                drop(locked_prot);
103                (prot, info)
104            }
105        };
106
107        let (sink, _queue) = Sink::new();
108        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
109
110        let channels = info.channels as usize;
111        let sample_rate = info.sample_rate;
112        let effects = {
113            let prot_locked = prot.lock().unwrap();
114            match prot_locked.get_effects() {
115                Some(effects) => Arc::new(Mutex::new(effects)),
116                None => Arc::new(Mutex::new(vec![])),
117            }
118        };
119
120        let mut this = Self {
121            info,
122            finished_tracks: Arc::new(Mutex::new(Vec::new())),
123            state: Arc::new(Mutex::new(PlayerState::Stopped)),
124            abort: Arc::new(AtomicBool::new(false)),
125            ts: Arc::new(Mutex::new(0.0)),
126            playback_thread_exists: Arc::new(AtomicBool::new(true)),
127            playback_id: Arc::new(AtomicU64::new(0)),
128            duration: Arc::new(Mutex::new(0.0)),
129            audio_heard: Arc::new(AtomicBool::new(false)),
130            volume: Arc::new(Mutex::new(0.8)),
131            sink,
132            prot,
133            reporter: None,
134            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
135            effects,
136            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
137            effects_reset: Arc::new(AtomicU64::new(0)),
138            output_meter: Arc::new(Mutex::new(OutputMeter::new(
139                channels,
140                sample_rate,
141                OUTPUT_METER_REFRESH_HZ,
142            ))),
143            buffering_done: Arc::new(AtomicBool::new(false)),
144            last_chunk_ms: Arc::new(AtomicU64::new(0)),
145            last_time_update_ms: Arc::new(AtomicU64::new(0)),
146            impulse_response_override: None,
147            impulse_response_tail_override: None,
148        };
149
150        this.initialize_thread(None);
151
152        this
153    }
154
155    /// Override the impulse response used for convolution reverb.
156    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
157        self.impulse_response_override = Some(spec.clone());
158        let mut prot = self.prot.lock().unwrap();
159        prot.set_impulse_response_spec(spec);
160        self.request_effects_reset();
161    }
162
163    /// Parse and apply an impulse response spec string.
164    pub fn set_impulse_response_from_string(&mut self, value: &str) {
165        if let Some(spec) = parse_impulse_response_string(value) {
166            self.set_impulse_response_spec(spec);
167        }
168    }
169
170    /// Override the impulse response tail trim (dB).
171    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
172        self.impulse_response_tail_override = Some(tail_db);
173        let mut prot = self.prot.lock().unwrap();
174        prot.set_impulse_response_tail_db(tail_db);
175        self.request_effects_reset();
176    }
177
178    /// Enable or disable convolution reverb.
179    pub fn set_reverb_enabled(&self, enabled: bool) {
180        let mut effects = self.effects.lock().unwrap();
181        if let Some(effect) = effects
182            .iter_mut()
183            .find_map(|effect| effect.as_convolution_reverb_mut())
184        {
185            effect.enabled = enabled;
186        }
187        if let Some(effect) = effects
188            .iter_mut()
189            .find_map(|effect| effect.as_delay_reverb_mut())
190        {
191            effect.enabled = enabled;
192        }
193    }
194
195    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
196    pub fn set_reverb_mix(&self, dry_wet: f32) {
197        let mut effects = self.effects.lock().unwrap();
198        if let Some(effect) = effects
199            .iter_mut()
200            .find_map(|effect| effect.as_convolution_reverb_mut())
201        {
202            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
203        }
204        if let Some(effect) = effects
205            .iter_mut()
206            .find_map(|effect| effect.as_delay_reverb_mut())
207        {
208            effect.mix = dry_wet.clamp(0.0, 1.0);
209        }
210        if let Some(effect) = effects
211            .iter_mut()
212            .find_map(|effect| effect.as_diffusion_reverb_mut())
213        {
214            effect.mix = dry_wet.clamp(0.0, 1.0);
215        }
216    }
217
218    /// Retrieve the current reverb settings snapshot.
219    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
220        let effects = self.effects.lock().unwrap();
221        if let Some(effect) = effects
222            .iter()
223            .find_map(|effect| effect.as_convolution_reverb())
224        {
225            return ReverbSettingsSnapshot {
226                enabled: effect.enabled,
227                dry_wet: effect.dry_wet,
228            };
229        }
230        if let Some(effect) = effects
231            .iter()
232            .find_map(|effect| effect.as_diffusion_reverb())
233        {
234            return ReverbSettingsSnapshot {
235                enabled: effect.enabled,
236                dry_wet: effect.mix,
237            };
238        }
239        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
240            return ReverbSettingsSnapshot {
241                enabled: effect.enabled,
242                dry_wet: effect.mix,
243            };
244        }
245        ReverbSettingsSnapshot {
246            enabled: false,
247            dry_wet: 0.0,
248        }
249    }
250
251    /// Snapshot the active effect chain names.
252    pub fn get_effect_names(&self) -> Vec<String> {
253        let effects = self.effects.lock().unwrap();
254        effects
255            .iter()
256            .map(|effect| match effect {
257                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
258                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
259                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
260                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
261                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
262                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
263                AudioEffect::Distortion(_) => "Distortion".to_string(),
264                AudioEffect::Compressor(_) => "Compressor".to_string(),
265                AudioEffect::Limiter(_) => "Limiter".to_string(),
266            })
267            .collect()
268    }
269
270    /// Replace the active DSP effects chain.
271    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
272        {
273            let mut guard = self.effects.lock().unwrap();
274            println!("New Effects: {:?}", effects);
275            *guard = effects;
276        }
277        self.request_effects_reset();
278
279        // Seeking to the current time stamp refreshes the
280        // Sink so that the new effects are applied immediately.
281        if !self.thread_finished() {
282            let ts = self.get_time();
283            self.seek(ts);
284        }
285    }
286
287    /// Retrieve the latest DSP chain performance metrics.
288    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
289        *self.dsp_metrics.lock().unwrap()
290    }
291
292    /// Retrieve the most recent per-channel peak levels.
293    pub fn get_levels(&self) -> Vec<f32> {
294        self.output_meter.lock().unwrap().levels()
295    }
296
297    /// Retrieve the most recent per-channel peak levels in dBFS.
298    pub fn get_levels_db(&self) -> Vec<f32> {
299        self.output_meter
300            .lock()
301            .unwrap()
302            .levels()
303            .into_iter()
304            .map(linear_to_dbfs)
305            .collect()
306    }
307
308    /// Retrieve the most recent per-channel average levels.
309    pub fn get_levels_avg(&self) -> Vec<f32> {
310        self.output_meter.lock().unwrap().averages()
311    }
312
313    /// Set the output meter refresh rate (frames per second).
314    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
315        self.output_meter.lock().unwrap().set_refresh_hz(hz);
316    }
317
318    /// Debug helper returning thread alive, state, and audio heard flags.
319    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
320        (
321            self.playback_thread_exists.load(Ordering::SeqCst),
322            *self.state.lock().unwrap(),
323            self.audio_heard.load(Ordering::Relaxed),
324        )
325    }
326
327    /// Debug helper indicating whether buffering has completed.
328    pub fn debug_buffering_done(&self) -> bool {
329        self.buffering_done.load(Ordering::Relaxed)
330    }
331
332    /// Debug helper returning internal timing markers in milliseconds.
333    pub fn debug_timing_ms(&self) -> (u64, u64) {
334        (
335            self.last_chunk_ms.load(Ordering::Relaxed),
336            self.last_time_update_ms.load(Ordering::Relaxed),
337        )
338    }
339
340    /// Debug helper returning sink paused/empty flags and queued length.
341    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
342        let sink = self.sink.lock().unwrap();
343        let paused = sink.is_paused();
344        let empty = sink.empty();
345        let len = sink.len();
346        (paused, empty, len)
347    }
348
349    fn request_effects_reset(&self) {
350        self.effects_reset.fetch_add(1, Ordering::SeqCst);
351    }
352
353    /// Configure the minimum buffered audio (ms) before playback starts.
354    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
355        let mut settings = self.buffer_settings.lock().unwrap();
356        settings.start_buffer_ms = start_buffer_ms.max(0.0);
357    }
358
359    /// Configure heuristic end-of-track threshold for containers (ms).
360    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
361        let mut settings = self.buffer_settings.lock().unwrap();
362        settings.track_eos_ms = track_eos_ms.max(0.0);
363    }
364
365    /// Configure minimum sink chunks queued before playback starts/resumes.
366    pub fn set_start_sink_chunks(&self, chunks: usize) {
367        let mut settings = self.buffer_settings.lock().unwrap();
368        settings.start_sink_chunks = chunks;
369    }
370
371    /// Configure the startup silence pre-roll (ms).
372    pub fn set_startup_silence_ms(&self, ms: f32) {
373        let mut settings = self.buffer_settings.lock().unwrap();
374        settings.startup_silence_ms = ms.max(0.0);
375    }
376
377    /// Configure the startup fade-in length (ms).
378    pub fn set_startup_fade_ms(&self, ms: f32) {
379        let mut settings = self.buffer_settings.lock().unwrap();
380        settings.startup_fade_ms = ms.max(0.0);
381    }
382
383    /// Configure the append jitter logging threshold (ms). 0 disables logging.
384    pub fn set_append_jitter_log_ms(&self, ms: f32) {
385        let mut settings = self.buffer_settings.lock().unwrap();
386        settings.append_jitter_log_ms = ms.max(0.0);
387    }
388
389    /// Enable or disable per-effect boundary discontinuity logging.
390    pub fn set_effect_boundary_log(&self, enabled: bool) {
391        let mut settings = self.buffer_settings.lock().unwrap();
392        settings.effect_boundary_log = enabled;
393    }
394
395    fn initialize_thread(&mut self, ts: Option<f64>) {
396        // Empty finished_tracks
397        let mut finished_tracks = self.finished_tracks.lock().unwrap();
398        finished_tracks.clear();
399        drop(finished_tracks);
400
401        // ===== Set play options ===== //
402        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
403        self.abort = Arc::new(AtomicBool::new(false));
404        self.playback_thread_exists.store(true, Ordering::SeqCst);
405        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
406        self.buffering_done.store(false, Ordering::SeqCst);
407        let now_ms_value = now_ms();
408        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
409        self.last_time_update_ms
410            .store(now_ms_value, Ordering::Relaxed);
411
412        // ===== Clone variables ===== //
413        let play_state = self.state.clone();
414        let abort = self.abort.clone();
415        let playback_thread_exists = self.playback_thread_exists.clone();
416        let playback_id_atomic = self.playback_id.clone();
417        let time_passed = self.ts.clone();
418
419        let duration = self.duration.clone();
420        let prot = self.prot.clone();
421        let buffer_settings = self.buffer_settings.clone();
422        let buffer_settings_for_state = self.buffer_settings.clone();
423        let effects = self.effects.clone();
424        let dsp_metrics = self.dsp_metrics.clone();
425        let dsp_metrics_for_sink = self.dsp_metrics.clone();
426        let effects_reset = self.effects_reset.clone();
427        let output_meter = self.output_meter.clone();
428        let audio_info = self.info.clone();
429
430        let audio_heard = self.audio_heard.clone();
431        let volume = self.volume.clone();
432        let sink_mutex = self.sink.clone();
433        let buffer_done_thread_flag = self.buffering_done.clone();
434        let last_chunk_ms = self.last_chunk_ms.clone();
435        let last_time_update_ms = self.last_time_update_ms.clone();
436
437        audio_heard.store(false, Ordering::Relaxed);
438
439        {
440            let mut meter = self.output_meter.lock().unwrap();
441            meter.reset();
442        }
443
444        // ===== Start playback ===== //
445        thread::spawn(move || {
446            // ===================== //
447            // Set playback_thread_exists to true
448            // ===================== //
449            playback_thread_exists.store(true, Ordering::Relaxed);
450
451            // ===================== //
452            // Initialize engine & sink
453            // ===================== //
454            let start_time = match ts {
455                Some(ts) => ts,
456                None => 0.0,
457            };
458            let mut engine = PlayerEngine::new(
459                prot,
460                Some(abort.clone()),
461                start_time,
462                buffer_settings,
463                effects,
464                dsp_metrics,
465                effects_reset,
466            );
467            let _stream = OutputStreamBuilder::open_default_stream().unwrap();
468            let mixer = _stream.mixer().clone();
469
470            let mut sink = sink_mutex.lock().unwrap();
471            *sink = Sink::connect_new(&mixer);
472            sink.pause();
473            sink.set_volume(*volume.lock().unwrap());
474            drop(sink);
475
476            // ===================== //
477            // Set duration from engine
478            // ===================== //
479            let mut duration = duration.lock().unwrap();
480            *duration = engine.get_duration();
481            drop(duration);
482
483            // ===================== //
484            // Initialize chunk_lengths & time_passed
485            // ===================== //
486            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
487            let mut time_passed_unlocked = time_passed.lock().unwrap();
488            *time_passed_unlocked = start_time;
489            drop(time_passed_unlocked);
490
491            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
492                let timestamp = *time_passed.lock().unwrap();
493
494                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
495                // Fade out and pause sink
496                while sink.volume() > 0.0 && timestamp != start_time {
497                    sink.set_volume(sink.volume() - fade_increments);
498                    thread::sleep(Duration::from_millis(10));
499                }
500                sink.pause();
501            };
502
503            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
504                let volume = *volume.lock().unwrap();
505                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
506                // Fade in and play sink
507                sink.play();
508                while sink.volume() < volume {
509                    sink.set_volume(sink.volume() + fade_increments);
510                    thread::sleep(Duration::from_millis(5));
511                }
512            };
513
514            // ===================== //
515            // Start sink with startup silence + fade in
516            // ===================== //
517            {
518                let startup_settings = buffer_settings_for_state.lock().unwrap();
519                let startup_silence_ms = startup_settings.startup_silence_ms;
520                let startup_fade_ms = startup_settings.startup_fade_ms;
521                drop(startup_settings);
522
523                let sample_rate = audio_info.sample_rate as u32;
524                let channels = audio_info.channels as u16;
525
526                if startup_silence_ms > 0.0 {
527                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
528                        as usize
529                        * channels as usize;
530                    let silence = vec![0.0_f32; samples.max(1)];
531                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
532                    let sink = sink_mutex.lock().unwrap();
533                    sink.append(silence_buffer);
534                    drop(sink);
535                }
536
537                if startup_fade_ms > 0.0 {
538                    let state = play_state.lock().unwrap().clone();
539                    if state != PlayerState::Paused && state != PlayerState::Pausing {
540                        resume_sink(
541                            &sink_mutex.lock().unwrap(),
542                            (startup_fade_ms / 1000.0).max(0.0),
543                        );
544                    }
545                }
546            }
547
548            // ===================== //
549            // Check if the player should be paused or not
550            // ===================== //
551            let check_details = || {
552                if abort.load(Ordering::SeqCst) {
553                    let sink = sink_mutex.lock().unwrap();
554                    pause_sink(&sink, 0.1);
555                    sink.clear();
556                    drop(sink);
557
558                    return false;
559                }
560
561                let sink = sink_mutex.lock().unwrap();
562                let state = play_state.lock().unwrap().clone();
563                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
564                if state == PlayerState::Resuming
565                    && start_sink_chunks > 0
566                    && sink.len() < start_sink_chunks
567                {
568                    // Keep paused until enough chunks are queued.
569                    sink.pause();
570                    drop(sink);
571                    return true;
572                }
573                if state == PlayerState::Pausing {
574                    pause_sink(&sink, 0.1);
575                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
576                }
577                if state == PlayerState::Resuming {
578                    resume_sink(&sink, 0.1);
579                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
580                }
581                drop(sink);
582
583                true
584            };
585
586            // ===================== //
587            // Update chunk_lengths / time_passed
588            // ===================== //
589            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
590            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
591            let buffering_done = Arc::new(AtomicBool::new(false));
592            let buffering_done_flag = buffer_done_thread_flag.clone();
593            let final_duration = Arc::new(Mutex::new(None::<f64>));
594            let mut timer = timer_mut.lock().unwrap();
595            timer.start();
596            drop(timer);
597
598            let last_meter_time = Cell::new(0.0_f64);
599            let update_chunk_lengths = || {
600                if abort.load(Ordering::SeqCst) {
601                    return;
602                }
603
604                let mut chunk_lengths = chunk_lengths.lock().unwrap();
605                let mut time_passed_unlocked = time_passed.lock().unwrap();
606                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
607                let mut timer = timer_mut.lock().unwrap();
608                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
609                let sink = sink_mutex.lock().unwrap();
610                if !buffering_done.load(Ordering::Relaxed) {
611                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
612                    // since the last time this function was called and add that to time_passed.
613                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
614
615                    for _ in 0..chunks_played {
616                        timer.reset();
617                        timer.start();
618                        *time_chunks_passed += chunk_lengths.remove(0);
619                    }
620                }
621
622                if sink.is_paused() {
623                    timer.pause();
624                } else {
625                    timer.un_pause();
626                }
627
628                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
629                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
630                last_meter_time.set(current_audio_time);
631                {
632                    let mut meter = output_meter.lock().unwrap();
633                    meter.advance(delta);
634                }
635
636                *time_passed_unlocked = current_audio_time;
637
638                drop(sink);
639                drop(chunk_lengths);
640                drop(time_passed_unlocked);
641                drop(time_chunks_passed);
642                drop(timer);
643            };
644
645            // ===================== //
646            // Update sink for each chunk received from engine
647            // ===================== //
648            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
649            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
650                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
651                    return;
652                }
653                let (delay_ms, late) = {
654                    let mut timing = append_timing.lock().unwrap();
655                    let now = Instant::now();
656                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
657                    let chunk_ms = length_in_seconds * 1000.0;
658                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
659                    if late {
660                        timing.2 = timing.2.saturating_add(1);
661                    }
662                    timing.1 = if timing.1 == 0.0 {
663                        delta_ms
664                    } else {
665                        (timing.1 * 0.9) + (delta_ms * 0.1)
666                    };
667                    timing.3 = timing.3.max(delta_ms);
668                    timing.0 = now;
669                    (delta_ms, late)
670                };
671                audio_heard.store(true, Ordering::Relaxed);
672                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
673
674                {
675                    let mut meter = output_meter.lock().unwrap();
676                    meter.push_samples(&mixer);
677                }
678                {
679                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
680                    metrics.append_delay_ms = delay_ms;
681                    metrics.avg_append_delay_ms = {
682                        if metrics.avg_append_delay_ms == 0.0 {
683                            delay_ms
684                        } else {
685                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
686                        }
687                    };
688                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
689                    metrics.late_append_count = {
690                        let timing = append_timing.lock().unwrap();
691                        timing.2
692                    };
693                    metrics.late_append_active = late;
694                }
695
696                let sink = sink_mutex.lock().unwrap();
697                let append_jitter_log_ms = {
698                    let settings = buffer_settings_for_state.lock().unwrap();
699                    settings.append_jitter_log_ms
700                };
701                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
702                    let expected_ms = length_in_seconds * 1000.0;
703                    log::info!(
704                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
705                        delay_ms,
706                        expected_ms,
707                        late,
708                        append_jitter_log_ms,
709                        sink.len()
710                    );
711                }
712                let mut chunk_lengths = chunk_lengths.lock().unwrap();
713
714                let total_time = chunk_lengths.iter().sum::<f64>();
715
716                sink.append(mixer);
717
718                drop(sink);
719
720                chunk_lengths.push(length_in_seconds);
721                drop(chunk_lengths);
722
723                update_chunk_lengths();
724                check_details();
725            };
726
727            let receiver = engine.start_receiver();
728            loop {
729                match receiver.recv_timeout(Duration::from_millis(20)) {
730                    Ok(chunk) => {
731                        update_sink(chunk);
732                    }
733                    Err(RecvTimeoutError::Timeout) => {
734                        update_chunk_lengths();
735                        if !check_details() {
736                            break;
737                        }
738                    }
739                    Err(RecvTimeoutError::Disconnected) => break,
740                }
741            }
742            #[cfg(feature = "debug")]
743            log::info!("engine reception loop finished");
744
745            // From here on, all audio is buffered. Stop relying on sink.len()
746            // to advance time so the UI keeps updating while the last buffer plays.
747            buffering_done.store(true, Ordering::Relaxed);
748            buffering_done_flag.store(true, Ordering::Relaxed);
749            {
750                let mut final_duration = final_duration.lock().unwrap();
751                if final_duration.is_none() {
752                    let chunk_lengths = chunk_lengths.lock().unwrap();
753                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
754                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
755                }
756            }
757
758            // ===================== //
759            // Wait until all tracks are finished playing in sink
760            // ===================== //
761            #[cfg(feature = "debug")]
762            {
763                let sink = sink_mutex.lock().unwrap();
764                let paused = sink.is_paused();
765                let empty = sink.empty();
766                let sink_len = sink.len();
767                drop(sink);
768                let time_passed = *time_passed.lock().unwrap();
769                let final_duration = *final_duration.lock().unwrap();
770                log::info!(
771                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
772                    paused,
773                    empty,
774                    sink_len,
775                    time_passed,
776                    final_duration
777                );
778            }
779
780            loop {
781                update_chunk_lengths();
782                if !check_details() {
783                    break;
784                }
785
786                let done = if engine.finished_buffering() {
787                    if let Some(final_duration) = *final_duration.lock().unwrap() {
788                        let time_passed = *time_passed.lock().unwrap();
789                        time_passed >= (final_duration - 0.001).max(0.0)
790                    } else {
791                        false
792                    }
793                } else {
794                    false
795                };
796                if done {
797                    break;
798                }
799
800                thread::sleep(Duration::from_millis(10));
801            }
802
803            #[cfg(feature = "debug")]
804            log::info!("Finished drain loop!");
805
806            // ===================== //
807            // Set playback_thread_exists to false
808            // ===================== //
809            playback_thread_exists.store(false, Ordering::Relaxed);
810        });
811    }
812
813    /// Start playback from a specific timestamp (seconds).
814    pub fn play_at(&mut self, ts: f64) {
815        let mut timestamp = self.ts.lock().unwrap();
816        *timestamp = ts;
817        drop(timestamp);
818
819        self.request_effects_reset();
820        self.kill_current();
821        // self.stop.store(false, Ordering::SeqCst);
822        self.initialize_thread(Some(ts));
823
824        self.resume();
825
826        self.wait_for_audio_heard(Duration::from_secs(5));
827    }
828
829    /// Start playback from the current timestamp.
830    pub fn play(&mut self) {
831        info!("Playing audio");
832        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
833        // self.stop.store(false, Ordering::SeqCst);
834
835        if !thread_exists {
836            self.initialize_thread(None);
837        }
838
839        self.resume();
840
841        self.wait_for_audio_heard(Duration::from_secs(5));
842    }
843
844    /// Pause playback.
845    pub fn pause(&self) {
846        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
847    }
848
849    /// Resume playback if paused.
850    pub fn resume(&self) {
851        self.state
852            .lock()
853            .unwrap()
854            .clone_from(&PlayerState::Resuming);
855    }
856
857    /// Stop the current playback thread without changing state.
858    pub fn kill_current(&self) {
859        self.state
860            .lock()
861            .unwrap()
862            .clone_from(&PlayerState::Stopping);
863        {
864            let sink = self.sink.lock().unwrap();
865            sink.stop();
866        }
867        self.abort.store(true, Ordering::SeqCst);
868
869        while !self.thread_finished() {
870            thread::sleep(Duration::from_millis(10));
871        }
872
873        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
874    }
875
876    /// Stop playback and reset timing state.
877    pub fn stop(&self) {
878        self.kill_current();
879        self.ts.lock().unwrap().clone_from(&0.0);
880    }
881
882    /// Return true if playback is currently active.
883    pub fn is_playing(&self) -> bool {
884        let state = self.state.lock().unwrap();
885        *state == PlayerState::Playing
886    }
887
888    /// Return true if playback is currently paused.
889    pub fn is_paused(&self) -> bool {
890        let state = self.state.lock().unwrap();
891        *state == PlayerState::Paused
892    }
893
894    /// Get the current playback time in seconds.
895    pub fn get_time(&self) -> f64 {
896        let ts = self.ts.lock().unwrap();
897        *ts
898    }
899
900    fn thread_finished(&self) -> bool {
901        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
902        !playback_thread_exists
903    }
904
905    /// Return true if playback has reached the end.
906    pub fn is_finished(&self) -> bool {
907        self.thread_finished()
908        // let state = self.state.lock().unwrap();
909        // *state == PlayerState::Finished
910    }
911
912    /// Block the current thread until playback finishes.
913    pub fn sleep_until_end(&self) {
914        loop {
915            if self.thread_finished() {
916                break;
917            }
918            thread::sleep(Duration::from_millis(100));
919        }
920    }
921
922    /// Get the total duration (seconds) of the active selection.
923    pub fn get_duration(&self) -> f64 {
924        let duration = self.duration.lock().unwrap();
925        *duration
926    }
927
928    /// Seek to the given timestamp (seconds).
929    pub fn seek(&mut self, ts: f64) {
930        let mut timestamp = self.ts.lock().unwrap();
931        *timestamp = ts;
932        drop(timestamp);
933
934        self.request_effects_reset();
935        let state = self.state.lock().unwrap().clone();
936
937        self.kill_current();
938        self.state.lock().unwrap().clone_from(&state);
939        self.initialize_thread(Some(ts));
940
941        if state == PlayerState::Playing {
942            self.resume();
943        }
944    }
945
946    /// Refresh active track selections from the underlying container.
947    pub fn refresh_tracks(&mut self) {
948        let mut prot = self.prot.lock().unwrap();
949        prot.refresh_tracks();
950        if let Some(spec) = self.impulse_response_override.clone() {
951            prot.set_impulse_response_spec(spec);
952        }
953        if let Some(tail_db) = self.impulse_response_tail_override {
954            prot.set_impulse_response_tail_db(tail_db);
955        }
956        drop(prot);
957
958        self.request_effects_reset();
959        // If stopped, return
960        if self.thread_finished() {
961            return;
962        }
963
964        // Kill current thread and start
965        // new thread at the current timestamp
966        let ts = self.get_time();
967        self.seek(ts);
968
969        // If previously playing, resume
970        if self.is_playing() {
971            self.resume();
972        }
973
974        self.wait_for_audio_heard(Duration::from_secs(5));
975    }
976
977    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
978        let start = Instant::now();
979        loop {
980            if self.audio_heard.load(Ordering::Relaxed) {
981                return true;
982            }
983            if self.thread_finished() {
984                warn!("playback thread ended before audio was heard");
985                return false;
986            }
987            if start.elapsed() >= timeout {
988                warn!("timed out waiting for audio to start");
989                return false;
990            }
991            thread::sleep(Duration::from_millis(10));
992        }
993    }
994
995    /// Shuffle track selections and restart playback.
996    pub fn shuffle(&mut self) {
997        self.refresh_tracks();
998    }
999
1000    /// Set the playback volume (0.0-1.0).
1001    pub fn set_volume(&mut self, new_volume: f32) {
1002        let sink = self.sink.lock().unwrap();
1003        sink.set_volume(new_volume);
1004        drop(sink);
1005
1006        let mut volume = self.volume.lock().unwrap();
1007        *volume = new_volume;
1008        drop(volume);
1009    }
1010
1011    /// Get the current playback volume.
1012    pub fn get_volume(&self) -> f32 {
1013        *self.volume.lock().unwrap()
1014    }
1015
1016    /// Get the track identifiers used for display.
1017    pub fn get_ids(&self) -> Vec<String> {
1018        let prot = self.prot.lock().unwrap();
1019
1020        return prot.get_ids();
1021    }
1022
1023    /// Enable periodic reporting of playback status for UI consumers.
1024    pub fn set_reporting(
1025        &mut self,
1026        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1027        reporting_interval: Duration,
1028    ) {
1029        if self.reporter.is_some() {
1030            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1031        }
1032
1033        let reporter = Arc::new(Mutex::new(Reporter::new(
1034            Arc::new(Mutex::new(self.clone())),
1035            reporting,
1036            reporting_interval,
1037        )));
1038
1039        reporter.lock().unwrap().start();
1040
1041        self.reporter = Some(reporter);
1042    }
1043}
1044
1045fn linear_to_dbfs(value: f32) -> f32 {
1046    if value <= 0.0 {
1047        f32::NEG_INFINITY
1048    } else {
1049        20.0 * value.log10()
1050    }
1051}
1052
1053fn now_ms() -> u64 {
1054    use std::time::SystemTime;
1055    SystemTime::now()
1056        .duration_since(SystemTime::UNIX_EPOCH)
1057        .map(|d| d.as_millis() as u64)
1058        .unwrap_or(0)
1059}