Skip to main content

proteus_lib/playback/
player.rs

1//! High-level playback controller for the Proteus library.
2
3use rodio::buffer::SamplesBuffer;
4use rodio::{OutputStreamBuilder, Sink};
5use std::cell::Cell;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{mpsc::RecvTimeoutError, Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use log::{info, warn};
12
13use crate::audio::samples::clone_samples_buffer;
14use crate::container::prot::Prot;
15use crate::diagnostics::reporter::{Report, Reporter};
16use crate::dsp::effects::convolution_reverb::{parse_impulse_response_string, ImpulseResponseSpec};
17use crate::playback::output_meter::OutputMeter;
18use crate::tools::timer;
19use crate::{
20    container::info::Info,
21    dsp::effects::AudioEffect,
22    playback::engine::{DspChainMetrics, PlaybackBufferSettings, PlayerEngine},
23};
24
25/// High-level playback state for the player.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum PlayerState {
28    Init,
29    Resuming,
30    Playing,
31    Pausing,
32    Paused,
33    Stopping,
34    Stopped,
35    Finished,
36}
37
38/// Snapshot of convolution reverb settings for UI consumers.
39#[derive(Debug, Clone, Copy)]
40pub struct ReverbSettingsSnapshot {
41    pub enabled: bool,
42    pub dry_wet: f32,
43}
44
45const OUTPUT_METER_REFRESH_HZ: f32 = 30.0;
46
47/// Primary playback controller.
48///
49/// `Player` owns the playback threads, buffering state, and runtime settings
50/// such as volume and reverb configuration.
51#[derive(Clone)]
52pub struct Player {
53    pub info: Info,
54    pub finished_tracks: Arc<Mutex<Vec<i32>>>,
55    pub ts: Arc<Mutex<f64>>,
56    state: Arc<Mutex<PlayerState>>,
57    abort: Arc<AtomicBool>,
58    playback_thread_exists: Arc<AtomicBool>,
59    playback_id: Arc<AtomicU64>,
60    duration: Arc<Mutex<f64>>,
61    prot: Arc<Mutex<Prot>>,
62    audio_heard: Arc<AtomicBool>,
63    volume: Arc<Mutex<f32>>,
64    sink: Arc<Mutex<Sink>>,
65    audition_source: Arc<Mutex<Option<SamplesBuffer>>>,
66    reporter: Option<Arc<Mutex<Reporter>>>,
67    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
68    effects: Arc<Mutex<Vec<AudioEffect>>>,
69    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
70    effects_reset: Arc<AtomicU64>,
71    output_meter: Arc<Mutex<OutputMeter>>,
72    buffering_done: Arc<AtomicBool>,
73    last_chunk_ms: Arc<AtomicU64>,
74    last_time_update_ms: Arc<AtomicU64>,
75    impulse_response_override: Option<ImpulseResponseSpec>,
76    impulse_response_tail_override: Option<f32>,
77}
78
79impl Player {
80    /// Create a new player for a single container path.
81    pub fn new(file_path: &String) -> Self {
82        let this = Self::new_from_path_or_paths(Some(file_path), None);
83        this
84    }
85
86    /// Create a new player for a set of standalone file paths.
87    pub fn new_from_file_paths(file_paths: &Vec<Vec<String>>) -> Self {
88        let this = Self::new_from_path_or_paths(None, Some(file_paths));
89        this
90    }
91
92    /// Create a player from either a container path or standalone file paths.
93    pub fn new_from_path_or_paths(path: Option<&String>, paths: Option<&Vec<Vec<String>>>) -> Self {
94        let (prot, info) = match path {
95            Some(path) => {
96                let prot = Arc::new(Mutex::new(Prot::new(path)));
97                let info = Info::new(path.clone());
98                (prot, info)
99            }
100            None => {
101                let prot = Arc::new(Mutex::new(Prot::new_from_file_paths(paths.unwrap())));
102                let locked_prot = prot.lock().unwrap();
103                let info = Info::new_from_file_paths(locked_prot.get_file_paths_dictionary());
104                drop(locked_prot);
105                (prot, info)
106            }
107        };
108
109        let (sink, _queue) = Sink::new();
110        let sink: Arc<Mutex<Sink>> = Arc::new(Mutex::new(sink));
111
112        let channels = info.channels as usize;
113        let sample_rate = info.sample_rate;
114        let effects = {
115            let prot_locked = prot.lock().unwrap();
116            match prot_locked.get_effects() {
117                Some(effects) => Arc::new(Mutex::new(effects)),
118                None => Arc::new(Mutex::new(vec![])),
119            }
120        };
121
122        let mut this = Self {
123            info,
124            finished_tracks: Arc::new(Mutex::new(Vec::new())),
125            state: Arc::new(Mutex::new(PlayerState::Stopped)),
126            abort: Arc::new(AtomicBool::new(false)),
127            ts: Arc::new(Mutex::new(0.0)),
128            playback_thread_exists: Arc::new(AtomicBool::new(true)),
129            playback_id: Arc::new(AtomicU64::new(0)),
130            duration: Arc::new(Mutex::new(0.0)),
131            audio_heard: Arc::new(AtomicBool::new(false)),
132            volume: Arc::new(Mutex::new(0.8)),
133            sink,
134            audition_source: Arc::new(Mutex::new(None)),
135            prot,
136            reporter: None,
137            buffer_settings: Arc::new(Mutex::new(PlaybackBufferSettings::new(20.0))),
138            effects,
139            dsp_metrics: Arc::new(Mutex::new(DspChainMetrics::default())),
140            effects_reset: Arc::new(AtomicU64::new(0)),
141            output_meter: Arc::new(Mutex::new(OutputMeter::new(
142                channels,
143                sample_rate,
144                OUTPUT_METER_REFRESH_HZ,
145            ))),
146            buffering_done: Arc::new(AtomicBool::new(false)),
147            last_chunk_ms: Arc::new(AtomicU64::new(0)),
148            last_time_update_ms: Arc::new(AtomicU64::new(0)),
149            impulse_response_override: None,
150            impulse_response_tail_override: None,
151        };
152
153        this.initialize_thread(None);
154
155        this
156    }
157
158    /// Override the impulse response used for convolution reverb.
159    pub fn set_impulse_response_spec(&mut self, spec: ImpulseResponseSpec) {
160        self.impulse_response_override = Some(spec.clone());
161        let mut prot = self.prot.lock().unwrap();
162        prot.set_impulse_response_spec(spec);
163        self.request_effects_reset();
164    }
165
166    /// Parse and apply an impulse response spec string.
167    pub fn set_impulse_response_from_string(&mut self, value: &str) {
168        if let Some(spec) = parse_impulse_response_string(value) {
169            self.set_impulse_response_spec(spec);
170        }
171    }
172
173    /// Override the impulse response tail trim (dB).
174    pub fn set_impulse_response_tail_db(&mut self, tail_db: f32) {
175        self.impulse_response_tail_override = Some(tail_db);
176        let mut prot = self.prot.lock().unwrap();
177        prot.set_impulse_response_tail_db(tail_db);
178        self.request_effects_reset();
179    }
180
181    /// Enable or disable convolution reverb.
182    pub fn set_reverb_enabled(&self, enabled: bool) {
183        let mut effects = self.effects.lock().unwrap();
184        if let Some(effect) = effects
185            .iter_mut()
186            .find_map(|effect| effect.as_convolution_reverb_mut())
187        {
188            effect.enabled = enabled;
189        }
190        if let Some(effect) = effects
191            .iter_mut()
192            .find_map(|effect| effect.as_delay_reverb_mut())
193        {
194            effect.enabled = enabled;
195        }
196    }
197
198    /// Set the reverb wet/dry mix (clamped to `[0.0, 1.0]`).
199    pub fn set_reverb_mix(&self, dry_wet: f32) {
200        let mut effects = self.effects.lock().unwrap();
201        if let Some(effect) = effects
202            .iter_mut()
203            .find_map(|effect| effect.as_convolution_reverb_mut())
204        {
205            effect.dry_wet = dry_wet.clamp(0.0, 1.0);
206        }
207        if let Some(effect) = effects
208            .iter_mut()
209            .find_map(|effect| effect.as_delay_reverb_mut())
210        {
211            effect.mix = dry_wet.clamp(0.0, 1.0);
212        }
213        if let Some(effect) = effects
214            .iter_mut()
215            .find_map(|effect| effect.as_diffusion_reverb_mut())
216        {
217            effect.mix = dry_wet.clamp(0.0, 1.0);
218        }
219    }
220
221    /// Retrieve the current reverb settings snapshot.
222    pub fn get_reverb_settings(&self) -> ReverbSettingsSnapshot {
223        let effects = self.effects.lock().unwrap();
224        if let Some(effect) = effects
225            .iter()
226            .find_map(|effect| effect.as_convolution_reverb())
227        {
228            return ReverbSettingsSnapshot {
229                enabled: effect.enabled,
230                dry_wet: effect.dry_wet,
231            };
232        }
233        if let Some(effect) = effects
234            .iter()
235            .find_map(|effect| effect.as_diffusion_reverb())
236        {
237            return ReverbSettingsSnapshot {
238                enabled: effect.enabled,
239                dry_wet: effect.mix,
240            };
241        }
242        if let Some(effect) = effects.iter().find_map(|effect| effect.as_delay_reverb()) {
243            return ReverbSettingsSnapshot {
244                enabled: effect.enabled,
245                dry_wet: effect.mix,
246            };
247        }
248        ReverbSettingsSnapshot {
249            enabled: false,
250            dry_wet: 0.0,
251        }
252    }
253
254    /// Snapshot the active effect chain names.
255    pub fn get_effect_names(&self) -> Vec<String> {
256        let effects = self.effects.lock().unwrap();
257        effects
258            .iter()
259            .map(|effect| match effect {
260                AudioEffect::DelayReverb(_) => "DelayReverb".to_string(),
261                AudioEffect::BasicReverb(_) => "DelayReverb".to_string(),
262                AudioEffect::DiffusionReverb(_) => "DiffusionReverb".to_string(),
263                AudioEffect::ConvolutionReverb(_) => "ConvolutionReverb".to_string(),
264                AudioEffect::LowPassFilter(_) => "LowPassFilter".to_string(),
265                AudioEffect::HighPassFilter(_) => "HighPassFilter".to_string(),
266                AudioEffect::Distortion(_) => "Distortion".to_string(),
267                AudioEffect::Compressor(_) => "Compressor".to_string(),
268                AudioEffect::Limiter(_) => "Limiter".to_string(),
269            })
270            .collect()
271    }
272
273    /// Replace the active DSP effects chain.
274    pub fn set_effects(&mut self, effects: Vec<AudioEffect>) {
275        {
276            let mut guard = self.effects.lock().unwrap();
277            println!("New Effects: {:?}", effects);
278            *guard = effects;
279        }
280        self.request_effects_reset();
281
282        // Seeking to the current time stamp refreshes the
283        // Sink so that the new effects are applied immediately.
284        if !self.thread_finished() {
285            let ts = self.get_time();
286            self.seek(ts);
287        }
288    }
289
290    /// Retrieve the latest DSP chain performance metrics.
291    pub fn get_dsp_metrics(&self) -> DspChainMetrics {
292        *self.dsp_metrics.lock().unwrap()
293    }
294
295    /// Retrieve the most recent per-channel peak levels.
296    pub fn get_levels(&self) -> Vec<f32> {
297        self.output_meter.lock().unwrap().levels()
298    }
299
300    /// Retrieve the most recent per-channel peak levels in dBFS.
301    pub fn get_levels_db(&self) -> Vec<f32> {
302        self.output_meter
303            .lock()
304            .unwrap()
305            .levels()
306            .into_iter()
307            .map(linear_to_dbfs)
308            .collect()
309    }
310
311    /// Retrieve the most recent per-channel average levels.
312    pub fn get_levels_avg(&self) -> Vec<f32> {
313        self.output_meter.lock().unwrap().averages()
314    }
315
316    /// Set the output meter refresh rate (frames per second).
317    pub fn set_output_meter_refresh_hz(&self, hz: f32) {
318        self.output_meter.lock().unwrap().set_refresh_hz(hz);
319    }
320
321    /// Debug helper returning thread alive, state, and audio heard flags.
322    pub fn debug_playback_state(&self) -> (bool, PlayerState, bool) {
323        (
324            self.playback_thread_exists.load(Ordering::SeqCst),
325            *self.state.lock().unwrap(),
326            self.audio_heard.load(Ordering::Relaxed),
327        )
328    }
329
330    /// Debug helper indicating whether buffering has completed.
331    pub fn debug_buffering_done(&self) -> bool {
332        self.buffering_done.load(Ordering::Relaxed)
333    }
334
335    /// Debug helper returning internal timing markers in milliseconds.
336    pub fn debug_timing_ms(&self) -> (u64, u64) {
337        (
338            self.last_chunk_ms.load(Ordering::Relaxed),
339            self.last_time_update_ms.load(Ordering::Relaxed),
340        )
341    }
342
343    /// Debug helper returning sink paused/empty flags and queued length.
344    pub fn debug_sink_state(&self) -> (bool, bool, usize) {
345        let sink = self.sink.lock().unwrap();
346        let paused = sink.is_paused();
347        let empty = sink.empty();
348        let len = sink.len();
349        (paused, empty, len)
350    }
351
352    fn request_effects_reset(&self) {
353        self.effects_reset.fetch_add(1, Ordering::SeqCst);
354    }
355
356    /// Configure the minimum buffered audio (ms) before playback starts.
357    pub fn set_start_buffer_ms(&self, start_buffer_ms: f32) {
358        let mut settings = self.buffer_settings.lock().unwrap();
359        settings.start_buffer_ms = start_buffer_ms.max(0.0);
360    }
361
362    /// Configure heuristic end-of-track threshold for containers (ms).
363    pub fn set_track_eos_ms(&self, track_eos_ms: f32) {
364        let mut settings = self.buffer_settings.lock().unwrap();
365        settings.track_eos_ms = track_eos_ms.max(0.0);
366    }
367
368    /// Configure minimum sink chunks queued before playback starts/resumes.
369    pub fn set_start_sink_chunks(&self, chunks: usize) {
370        let mut settings = self.buffer_settings.lock().unwrap();
371        settings.start_sink_chunks = chunks;
372    }
373
374    /// Configure the startup silence pre-roll (ms).
375    pub fn set_startup_silence_ms(&self, ms: f32) {
376        let mut settings = self.buffer_settings.lock().unwrap();
377        settings.startup_silence_ms = ms.max(0.0);
378    }
379
380    /// Configure the startup fade-in length (ms).
381    pub fn set_startup_fade_ms(&self, ms: f32) {
382        let mut settings = self.buffer_settings.lock().unwrap();
383        settings.startup_fade_ms = ms.max(0.0);
384    }
385
386    /// Configure the append jitter logging threshold (ms). 0 disables logging.
387    pub fn set_append_jitter_log_ms(&self, ms: f32) {
388        let mut settings = self.buffer_settings.lock().unwrap();
389        settings.append_jitter_log_ms = ms.max(0.0);
390    }
391
392    /// Enable or disable per-effect boundary discontinuity logging.
393    pub fn set_effect_boundary_log(&self, enabled: bool) {
394        let mut settings = self.buffer_settings.lock().unwrap();
395        settings.effect_boundary_log = enabled;
396    }
397
398    fn audition(&self, length: Duration) {
399        let audition_source_mutex = self.audition_source.clone();
400
401        // Create new thread to audition
402        thread::spawn(move || {
403            // Wait until audition source is ready
404            while audition_source_mutex.lock().unwrap().is_none() {
405                thread::sleep(Duration::from_millis(10));
406            }
407
408            let audition_source_option = audition_source_mutex.lock().unwrap().take();
409            let audition_source = audition_source_option.unwrap();
410
411            let _audition_stream = OutputStreamBuilder::open_default_stream().unwrap();
412            let audition_sink = Sink::connect_new(_audition_stream.mixer());
413            audition_sink.pause();
414            audition_sink.set_volume(0.8);
415            audition_sink.append(audition_source);
416            audition_sink.play();
417            thread::sleep(length);
418            audition_sink.pause();
419        });
420    }
421
422    fn initialize_thread(&mut self, ts: Option<f64>) {
423        // Empty finished_tracks
424        let mut finished_tracks = self.finished_tracks.lock().unwrap();
425        finished_tracks.clear();
426        drop(finished_tracks);
427
428        // ===== Set play options ===== //
429        // Use a fresh abort flag per playback thread so old mixer threads stay stopped.
430        self.abort = Arc::new(AtomicBool::new(false));
431        self.playback_thread_exists.store(true, Ordering::SeqCst);
432        let playback_id = self.playback_id.fetch_add(1, Ordering::SeqCst) + 1;
433        self.buffering_done.store(false, Ordering::SeqCst);
434        let now_ms_value = now_ms();
435        self.last_chunk_ms.store(now_ms_value, Ordering::Relaxed);
436        self.last_time_update_ms
437            .store(now_ms_value, Ordering::Relaxed);
438
439        // ===== Clone variables ===== //
440        let play_state = self.state.clone();
441        let abort = self.abort.clone();
442        let playback_thread_exists = self.playback_thread_exists.clone();
443        let playback_id_atomic = self.playback_id.clone();
444        let time_passed = self.ts.clone();
445
446        let duration = self.duration.clone();
447        let prot = self.prot.clone();
448        let buffer_settings = self.buffer_settings.clone();
449        let buffer_settings_for_state = self.buffer_settings.clone();
450        let effects = self.effects.clone();
451        let dsp_metrics = self.dsp_metrics.clone();
452        let dsp_metrics_for_sink = self.dsp_metrics.clone();
453        let effects_reset = self.effects_reset.clone();
454        let output_meter = self.output_meter.clone();
455        let audio_info = self.info.clone();
456
457        let audio_heard = self.audio_heard.clone();
458        let volume = self.volume.clone();
459        let sink_mutex = self.sink.clone();
460        let audition_source_mutex = self.audition_source.clone();
461        let buffer_done_thread_flag = self.buffering_done.clone();
462        let last_chunk_ms = self.last_chunk_ms.clone();
463        let last_time_update_ms = self.last_time_update_ms.clone();
464
465        audio_heard.store(false, Ordering::Relaxed);
466
467        // clear audition source
468        let mut audition_source = audition_source_mutex.lock().unwrap();
469        *audition_source = None;
470        drop(audition_source);
471
472        {
473            let mut meter = self.output_meter.lock().unwrap();
474            meter.reset();
475        }
476
477        // ===== Start playback ===== //
478        thread::spawn(move || {
479            // ===================== //
480            // Set playback_thread_exists to true
481            // ===================== //
482            playback_thread_exists.store(true, Ordering::Relaxed);
483
484            // ===================== //
485            // Initialize engine & sink
486            // ===================== //
487            let start_time = match ts {
488                Some(ts) => ts,
489                None => 0.0,
490            };
491            let mut engine = PlayerEngine::new(
492                prot,
493                Some(abort.clone()),
494                start_time,
495                buffer_settings,
496                effects,
497                dsp_metrics,
498                effects_reset,
499            );
500            let _stream = OutputStreamBuilder::open_default_stream().unwrap();
501            let mixer = _stream.mixer().clone();
502
503            let mut sink = sink_mutex.lock().unwrap();
504            *sink = Sink::connect_new(&mixer);
505            sink.pause();
506            sink.set_volume(*volume.lock().unwrap());
507            drop(sink);
508
509            // ===================== //
510            // Set duration from engine
511            // ===================== //
512            let mut duration = duration.lock().unwrap();
513            *duration = engine.get_duration();
514            drop(duration);
515
516            // ===================== //
517            // Initialize chunk_lengths & time_passed
518            // ===================== //
519            let chunk_lengths = Arc::new(Mutex::new(Vec::new()));
520            let mut time_passed_unlocked = time_passed.lock().unwrap();
521            *time_passed_unlocked = start_time;
522            drop(time_passed_unlocked);
523
524            let pause_sink = |sink: &Sink, fade_length_out_seconds: f32| {
525                let timestamp = *time_passed.lock().unwrap();
526
527                let fade_increments = sink.volume() / (fade_length_out_seconds * 100.0);
528                // Fade out and pause sink
529                while sink.volume() > 0.0 && timestamp != start_time {
530                    sink.set_volume(sink.volume() - fade_increments);
531                    thread::sleep(Duration::from_millis(10));
532                }
533                sink.pause();
534            };
535
536            let resume_sink = |sink: &Sink, fade_length_in_seconds: f32| {
537                let volume = *volume.lock().unwrap();
538                let fade_increments = (volume - sink.volume()) / (fade_length_in_seconds * 100.0);
539                // Fade in and play sink
540                sink.play();
541                while sink.volume() < volume {
542                    sink.set_volume(sink.volume() + fade_increments);
543                    thread::sleep(Duration::from_millis(5));
544                }
545            };
546
547            // ===================== //
548            // Start sink with startup silence + fade in
549            // ===================== //
550            {
551                let startup_settings = buffer_settings_for_state.lock().unwrap();
552                let startup_silence_ms = startup_settings.startup_silence_ms;
553                let startup_fade_ms = startup_settings.startup_fade_ms;
554                drop(startup_settings);
555
556                let sample_rate = audio_info.sample_rate as u32;
557                let channels = audio_info.channels as u16;
558
559                if startup_silence_ms > 0.0 {
560                    let samples = ((startup_silence_ms / 1000.0) * sample_rate as f32).ceil()
561                        as usize
562                        * channels as usize;
563                    let silence = vec![0.0_f32; samples.max(1)];
564                    let silence_buffer = SamplesBuffer::new(channels, sample_rate, silence);
565                    let sink = sink_mutex.lock().unwrap();
566                    sink.append(silence_buffer);
567                    drop(sink);
568                }
569
570                if startup_fade_ms > 0.0 {
571                    resume_sink(
572                        &sink_mutex.lock().unwrap(),
573                        (startup_fade_ms / 1000.0).max(0.0),
574                    );
575                }
576            }
577
578            // ===================== //
579            // Check if the player should be paused or not
580            // ===================== //
581            let check_details = || {
582                if abort.load(Ordering::SeqCst) {
583                    let sink = sink_mutex.lock().unwrap();
584                    pause_sink(&sink, 0.1);
585                    sink.clear();
586                    drop(sink);
587
588                    return false;
589                }
590
591                let sink = sink_mutex.lock().unwrap();
592                let state = play_state.lock().unwrap().clone();
593                let start_sink_chunks = buffer_settings_for_state.lock().unwrap().start_sink_chunks;
594                if state == PlayerState::Resuming
595                    && start_sink_chunks > 0
596                    && sink.len() < start_sink_chunks
597                {
598                    // Keep paused until enough chunks are queued.
599                    sink.pause();
600                    drop(sink);
601                    return true;
602                }
603                if state == PlayerState::Pausing {
604                    pause_sink(&sink, 0.1);
605                    play_state.lock().unwrap().clone_from(&PlayerState::Paused);
606                }
607                if state == PlayerState::Resuming {
608                    resume_sink(&sink, 0.1);
609                    play_state.lock().unwrap().clone_from(&PlayerState::Playing);
610                }
611                drop(sink);
612
613                true
614            };
615
616            // ===================== //
617            // Update chunk_lengths / time_passed
618            // ===================== //
619            let time_chunks_mutex = Arc::new(Mutex::new(start_time));
620            let timer_mut = Arc::new(Mutex::new(timer::Timer::new()));
621            let buffering_done = Arc::new(AtomicBool::new(false));
622            let buffering_done_flag = buffer_done_thread_flag.clone();
623            let final_duration = Arc::new(Mutex::new(None::<f64>));
624            let mut timer = timer_mut.lock().unwrap();
625            timer.start();
626            drop(timer);
627
628            let last_meter_time = Cell::new(0.0_f64);
629            let update_chunk_lengths = || {
630                if abort.load(Ordering::SeqCst) {
631                    return;
632                }
633
634                let mut chunk_lengths = chunk_lengths.lock().unwrap();
635                let mut time_passed_unlocked = time_passed.lock().unwrap();
636                let mut time_chunks_passed = time_chunks_mutex.lock().unwrap();
637                let mut timer = timer_mut.lock().unwrap();
638                last_time_update_ms.store(now_ms(), Ordering::Relaxed);
639                let sink = sink_mutex.lock().unwrap();
640                if !buffering_done.load(Ordering::Relaxed) {
641                    // Check how many chunks have been played (chunk_lengths.len() - sink.len())
642                    // since the last time this function was called and add that to time_passed.
643                    let chunks_played = chunk_lengths.len().saturating_sub(sink.len());
644
645                    for _ in 0..chunks_played {
646                        timer.reset();
647                        timer.start();
648                        *time_chunks_passed += chunk_lengths.remove(0);
649                    }
650                }
651
652                if sink.is_paused() {
653                    timer.pause();
654                } else {
655                    timer.un_pause();
656                }
657
658                let current_audio_time = *time_chunks_passed + timer.get_time().as_secs_f64();
659                let delta = (current_audio_time - last_meter_time.get()).max(0.0);
660                last_meter_time.set(current_audio_time);
661                {
662                    let mut meter = output_meter.lock().unwrap();
663                    meter.advance(delta);
664                }
665
666                *time_passed_unlocked = current_audio_time;
667
668                drop(sink);
669                drop(chunk_lengths);
670                drop(time_passed_unlocked);
671                drop(time_chunks_passed);
672                drop(timer);
673            };
674
675            // ===================== //
676            // Update sink for each chunk received from engine
677            // ===================== //
678            let append_timing = Arc::new(Mutex::new((Instant::now(), 0.0_f64, 0_u64, 0.0_f64)));
679            let update_sink = |(mixer, length_in_seconds): (SamplesBuffer, f64)| {
680                if playback_id_atomic.load(Ordering::SeqCst) != playback_id {
681                    return;
682                }
683                let (delay_ms, late) = {
684                    let mut timing = append_timing.lock().unwrap();
685                    let now = Instant::now();
686                    let delta_ms = now.duration_since(timing.0).as_secs_f64() * 1000.0;
687                    let chunk_ms = length_in_seconds * 1000.0;
688                    let late = delta_ms > (chunk_ms * 1.2) && chunk_ms > 0.0;
689                    if late {
690                        timing.2 = timing.2.saturating_add(1);
691                    }
692                    timing.1 = if timing.1 == 0.0 {
693                        delta_ms
694                    } else {
695                        (timing.1 * 0.9) + (delta_ms * 0.1)
696                    };
697                    timing.3 = timing.3.max(delta_ms);
698                    timing.0 = now;
699                    (delta_ms, late)
700                };
701                audio_heard.store(true, Ordering::Relaxed);
702                last_chunk_ms.store(now_ms(), Ordering::Relaxed);
703
704                {
705                    let mut meter = output_meter.lock().unwrap();
706                    meter.push_samples(&mixer);
707                }
708                {
709                    let mut metrics = dsp_metrics_for_sink.lock().unwrap();
710                    metrics.append_delay_ms = delay_ms;
711                    metrics.avg_append_delay_ms = {
712                        if metrics.avg_append_delay_ms == 0.0 {
713                            delay_ms
714                        } else {
715                            (metrics.avg_append_delay_ms * 0.9) + (delay_ms * 0.1)
716                        }
717                    };
718                    metrics.max_append_delay_ms = metrics.max_append_delay_ms.max(delay_ms);
719                    metrics.late_append_count = {
720                        let timing = append_timing.lock().unwrap();
721                        timing.2
722                    };
723                    metrics.late_append_active = late;
724                }
725
726                let mut audition_source = audition_source_mutex.lock().unwrap();
727                let sink = sink_mutex.lock().unwrap();
728                let append_jitter_log_ms = {
729                    let settings = buffer_settings_for_state.lock().unwrap();
730                    settings.append_jitter_log_ms
731                };
732                if append_jitter_log_ms > 0.0 && (late || delay_ms > append_jitter_log_ms as f64) {
733                    let expected_ms = length_in_seconds * 1000.0;
734                    log::info!(
735                        "append jitter: delta={:.2}ms expected={:.2}ms late={} threshold={:.2}ms sink_len={}",
736                        delay_ms,
737                        expected_ms,
738                        late,
739                        append_jitter_log_ms,
740                        sink.len()
741                    );
742                }
743                let mut chunk_lengths = chunk_lengths.lock().unwrap();
744
745                let total_time = chunk_lengths.iter().sum::<f64>();
746
747                // If total_time is less than 0.2 seconds, audition the chunk
748                if audition_source.is_none() {
749                    let (mixer_clone, mixer) = clone_samples_buffer(mixer);
750                    *audition_source = Some(mixer_clone);
751                    drop(audition_source);
752                    sink.append(mixer);
753                } else {
754                    sink.append(mixer);
755                }
756
757                drop(sink);
758
759                chunk_lengths.push(length_in_seconds);
760                drop(chunk_lengths);
761
762                update_chunk_lengths();
763                check_details();
764            };
765
766            let receiver = engine.start_receiver();
767            loop {
768                match receiver.recv_timeout(Duration::from_millis(20)) {
769                    Ok(chunk) => {
770                        update_sink(chunk);
771                    }
772                    Err(RecvTimeoutError::Timeout) => {
773                        update_chunk_lengths();
774                        if !check_details() {
775                            break;
776                        }
777                    }
778                    Err(RecvTimeoutError::Disconnected) => break,
779                }
780            }
781            #[cfg(feature = "debug")]
782            log::info!("engine reception loop finished");
783
784            // From here on, all audio is buffered. Stop relying on sink.len()
785            // to advance time so the UI keeps updating while the last buffer plays.
786            buffering_done.store(true, Ordering::Relaxed);
787            buffering_done_flag.store(true, Ordering::Relaxed);
788            {
789                let mut final_duration = final_duration.lock().unwrap();
790                if final_duration.is_none() {
791                    let chunk_lengths = chunk_lengths.lock().unwrap();
792                    let time_chunks_passed = time_chunks_mutex.lock().unwrap();
793                    *final_duration = Some(*time_chunks_passed + chunk_lengths.iter().sum::<f64>());
794                }
795            }
796
797            // ===================== //
798            // Wait until all tracks are finished playing in sink
799            // ===================== //
800            #[cfg(feature = "debug")]
801            {
802                let sink = sink_mutex.lock().unwrap();
803                let paused = sink.is_paused();
804                let empty = sink.empty();
805                let sink_len = sink.len();
806                drop(sink);
807                let time_passed = *time_passed.lock().unwrap();
808                let final_duration = *final_duration.lock().unwrap();
809                log::info!(
810                    "Starting drain loop: paused={} empty={} sink_len={} time={:.3} final={:?}",
811                    paused,
812                    empty,
813                    sink_len,
814                    time_passed,
815                    final_duration
816                );
817            }
818
819            loop {
820                update_chunk_lengths();
821                if !check_details() {
822                    break;
823                }
824
825                let done = if engine.finished_buffering() {
826                    if let Some(final_duration) = *final_duration.lock().unwrap() {
827                        let time_passed = *time_passed.lock().unwrap();
828                        time_passed >= (final_duration - 0.001).max(0.0)
829                    } else {
830                        false
831                    }
832                } else {
833                    false
834                };
835                if done {
836                    break;
837                }
838
839                thread::sleep(Duration::from_millis(10));
840            }
841
842            #[cfg(feature = "debug")]
843            log::info!("Finished drain loop!");
844
845            // ===================== //
846            // Set playback_thread_exists to false
847            // ===================== //
848            playback_thread_exists.store(false, Ordering::Relaxed);
849        });
850    }
851
852    /// Start playback from a specific timestamp (seconds).
853    pub fn play_at(&mut self, ts: f64) {
854        let mut timestamp = self.ts.lock().unwrap();
855        *timestamp = ts;
856        drop(timestamp);
857
858        self.request_effects_reset();
859        self.kill_current();
860        // self.stop.store(false, Ordering::SeqCst);
861        self.initialize_thread(Some(ts));
862
863        self.resume();
864
865        self.wait_for_audio_heard(Duration::from_secs(5));
866    }
867
868    /// Start playback from the current timestamp.
869    pub fn play(&mut self) {
870        info!("Playing audio");
871        let thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
872        // self.stop.store(false, Ordering::SeqCst);
873
874        if !thread_exists {
875            self.initialize_thread(None);
876        }
877
878        self.resume();
879
880        self.wait_for_audio_heard(Duration::from_secs(5));
881    }
882
883    /// Pause playback.
884    pub fn pause(&self) {
885        self.state.lock().unwrap().clone_from(&PlayerState::Pausing);
886    }
887
888    /// Resume playback if paused.
889    pub fn resume(&self) {
890        self.state
891            .lock()
892            .unwrap()
893            .clone_from(&PlayerState::Resuming);
894    }
895
896    /// Stop the current playback thread without changing state.
897    pub fn kill_current(&self) {
898        self.state
899            .lock()
900            .unwrap()
901            .clone_from(&PlayerState::Stopping);
902        {
903            let sink = self.sink.lock().unwrap();
904            sink.stop();
905        }
906        self.abort.store(true, Ordering::SeqCst);
907
908        while !self.thread_finished() {
909            thread::sleep(Duration::from_millis(10));
910        }
911
912        self.state.lock().unwrap().clone_from(&PlayerState::Stopped);
913    }
914
915    /// Stop playback and reset timing state.
916    pub fn stop(&self) {
917        self.kill_current();
918        self.ts.lock().unwrap().clone_from(&0.0);
919    }
920
921    /// Return true if playback is currently active.
922    pub fn is_playing(&self) -> bool {
923        let state = self.state.lock().unwrap();
924        *state == PlayerState::Playing
925    }
926
927    /// Return true if playback is currently paused.
928    pub fn is_paused(&self) -> bool {
929        let state = self.state.lock().unwrap();
930        *state == PlayerState::Paused
931    }
932
933    /// Get the current playback time in seconds.
934    pub fn get_time(&self) -> f64 {
935        let ts = self.ts.lock().unwrap();
936        *ts
937    }
938
939    fn thread_finished(&self) -> bool {
940        let playback_thread_exists = self.playback_thread_exists.load(Ordering::SeqCst);
941        !playback_thread_exists
942    }
943
944    /// Return true if playback has reached the end.
945    pub fn is_finished(&self) -> bool {
946        self.thread_finished()
947        // let state = self.state.lock().unwrap();
948        // *state == PlayerState::Finished
949    }
950
951    /// Block the current thread until playback finishes.
952    pub fn sleep_until_end(&self) {
953        loop {
954            if self.thread_finished() {
955                break;
956            }
957            thread::sleep(Duration::from_millis(100));
958        }
959    }
960
961    /// Get the total duration (seconds) of the active selection.
962    pub fn get_duration(&self) -> f64 {
963        let duration = self.duration.lock().unwrap();
964        *duration
965    }
966
967    /// Seek to the given timestamp (seconds).
968    pub fn seek(&mut self, ts: f64) {
969        let mut timestamp = self.ts.lock().unwrap();
970        *timestamp = ts;
971        drop(timestamp);
972
973        self.request_effects_reset();
974        let state = self.state.lock().unwrap().clone();
975
976        self.kill_current();
977        self.state.lock().unwrap().clone_from(&state);
978        self.initialize_thread(Some(ts));
979
980        match state {
981            PlayerState::Playing => self.resume(),
982            PlayerState::Paused => {
983                self.audition(Duration::from_millis(100));
984            }
985            _ => {}
986        }
987    }
988
989    /// Refresh active track selections from the underlying container.
990    pub fn refresh_tracks(&mut self) {
991        let mut prot = self.prot.lock().unwrap();
992        prot.refresh_tracks();
993        if let Some(spec) = self.impulse_response_override.clone() {
994            prot.set_impulse_response_spec(spec);
995        }
996        if let Some(tail_db) = self.impulse_response_tail_override {
997            prot.set_impulse_response_tail_db(tail_db);
998        }
999        drop(prot);
1000
1001        self.request_effects_reset();
1002        // If stopped, return
1003        if self.thread_finished() {
1004            return;
1005        }
1006
1007        // Kill current thread and start
1008        // new thread at the current timestamp
1009        let ts = self.get_time();
1010        self.seek(ts);
1011
1012        // If previously playing, resume
1013        if self.is_playing() {
1014            self.resume();
1015        }
1016
1017        self.wait_for_audio_heard(Duration::from_secs(5));
1018    }
1019
1020    fn wait_for_audio_heard(&self, timeout: Duration) -> bool {
1021        let start = Instant::now();
1022        loop {
1023            if self.audio_heard.load(Ordering::Relaxed) {
1024                return true;
1025            }
1026            if self.thread_finished() {
1027                warn!("playback thread ended before audio was heard");
1028                return false;
1029            }
1030            if start.elapsed() >= timeout {
1031                warn!("timed out waiting for audio to start");
1032                return false;
1033            }
1034            thread::sleep(Duration::from_millis(10));
1035        }
1036    }
1037
1038    /// Shuffle track selections and restart playback.
1039    pub fn shuffle(&mut self) {
1040        self.refresh_tracks();
1041    }
1042
1043    /// Set the playback volume (0.0-1.0).
1044    pub fn set_volume(&mut self, new_volume: f32) {
1045        let sink = self.sink.lock().unwrap();
1046        sink.set_volume(new_volume);
1047        drop(sink);
1048
1049        let mut volume = self.volume.lock().unwrap();
1050        *volume = new_volume;
1051        drop(volume);
1052    }
1053
1054    /// Get the current playback volume.
1055    pub fn get_volume(&self) -> f32 {
1056        *self.volume.lock().unwrap()
1057    }
1058
1059    /// Get the track identifiers used for display.
1060    pub fn get_ids(&self) -> Vec<String> {
1061        let prot = self.prot.lock().unwrap();
1062
1063        return prot.get_ids();
1064    }
1065
1066    /// Enable periodic reporting of playback status for UI consumers.
1067    pub fn set_reporting(
1068        &mut self,
1069        reporting: Arc<Mutex<dyn Fn(Report) + Send>>,
1070        reporting_interval: Duration,
1071    ) {
1072        if self.reporter.is_some() {
1073            self.reporter.as_ref().unwrap().lock().unwrap().stop();
1074        }
1075
1076        let reporter = Arc::new(Mutex::new(Reporter::new(
1077            Arc::new(Mutex::new(self.clone())),
1078            reporting,
1079            reporting_interval,
1080        )));
1081
1082        reporter.lock().unwrap().start();
1083
1084        self.reporter = Some(reporter);
1085    }
1086}
1087
1088fn linear_to_dbfs(value: f32) -> f32 {
1089    if value <= 0.0 {
1090        f32::NEG_INFINITY
1091    } else {
1092        20.0 * value.log10()
1093    }
1094}
1095
1096fn now_ms() -> u64 {
1097    use std::time::SystemTime;
1098    SystemTime::now()
1099        .duration_since(SystemTime::UNIX_EPOCH)
1100        .map(|d| d.as_millis() as u64)
1101        .unwrap_or(0)
1102}