Skip to main content

proteus_lib/playback/engine/
mod.rs

1//! Playback mixing engine and buffer coordination.
2
3use dasp_ring_buffer::Bounded;
4use rodio::buffer::SamplesBuffer;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::AtomicU64;
8use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
9
10use crate::audio::buffer::{init_buffer_map, TrackBuffer};
11use crate::container::prot::Prot;
12
13mod mix;
14mod state;
15
16pub use state::{DspChainMetrics, PlaybackBufferSettings};
17
18use mix::{spawn_mix_thread, MixThreadArgs};
19
20/// Internal playback engine used by the high-level [`Player`].
21#[derive(Debug, Clone)]
22pub struct PlayerEngine {
23    pub finished_tracks: Arc<Mutex<Vec<u16>>>,
24    start_time: f64,
25    abort: Arc<AtomicBool>,
26    buffer_map: Arc<Mutex<HashMap<u16, TrackBuffer>>>,
27    buffer_notify: Arc<Condvar>,
28    effects_buffer: Arc<Mutex<Bounded<Vec<f32>>>>,
29    track_weights: Arc<Mutex<HashMap<u16, f32>>>,
30    effects_reset: Arc<AtomicU64>,
31    prot: Arc<Mutex<Prot>>,
32    buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
33    effects: Arc<Mutex<Vec<crate::dsp::effects::AudioEffect>>>,
34    dsp_metrics: Arc<Mutex<DspChainMetrics>>,
35}
36
37impl PlayerEngine {
38    /// Create a new engine for the given container and settings.
39    pub fn new(
40        prot: Arc<Mutex<Prot>>,
41        abort_option: Option<Arc<AtomicBool>>,
42        start_time: f64,
43        buffer_settings: Arc<Mutex<PlaybackBufferSettings>>,
44        effects: Arc<Mutex<Vec<crate::dsp::effects::AudioEffect>>>,
45        dsp_metrics: Arc<Mutex<DspChainMetrics>>,
46        effects_reset: Arc<AtomicU64>,
47    ) -> Self {
48        let buffer_map = init_buffer_map();
49        let buffer_notify = Arc::new(Condvar::new());
50        let track_weights = Arc::new(Mutex::new(HashMap::new()));
51        let finished_tracks: Arc<Mutex<Vec<u16>>> = Arc::new(Mutex::new(Vec::new()));
52        let abort = if abort_option.is_some() {
53            abort_option.unwrap()
54        } else {
55            Arc::new(AtomicBool::new(false))
56        };
57
58        let prot_unlocked = prot.lock().unwrap();
59        let start_buffer_ms = buffer_settings.lock().unwrap().start_buffer_ms;
60        let channels = prot_unlocked.info.channels as usize;
61        let start_samples = ((prot_unlocked.info.sample_rate as f32 * start_buffer_ms) / 1000.0)
62            as usize
63            * channels;
64        let buffer_size = (prot_unlocked.info.sample_rate as usize * 10).max(start_samples * 2);
65        let effects_buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from(vec![
66            0.0;
67            buffer_size
68        ])));
69        drop(prot_unlocked);
70
71        Self {
72            finished_tracks,
73            start_time,
74            buffer_map,
75            buffer_notify,
76            effects_buffer,
77            track_weights,
78            effects_reset,
79            abort,
80            prot,
81            buffer_settings,
82            effects,
83            dsp_metrics,
84        }
85    }
86
87    /// Start the mixing loop and invoke `f` for each mixed chunk.
88    pub fn reception_loop(&mut self, f: &dyn Fn((SamplesBuffer, f64))) {
89        let prot = self.prot.lock().unwrap();
90        let keys = prot.get_keys();
91        drop(prot);
92        self.ready_buffer_map(&keys);
93        let receiver = self.get_receiver();
94
95        for (mixer, length_in_seconds) in receiver {
96            f((mixer, length_in_seconds));
97        }
98    }
99
100    /// Start mixing and return a receiver for `(buffer, duration)` chunks.
101    pub fn start_receiver(&mut self) -> Receiver<(SamplesBuffer, f64)> {
102        let prot = self.prot.lock().unwrap();
103        let keys = prot.get_keys();
104        drop(prot);
105        self.ready_buffer_map(&keys);
106        self.get_receiver()
107    }
108
109    fn get_receiver(&self) -> Receiver<(SamplesBuffer, f64)> {
110        let prot = self.prot.lock().unwrap();
111        let audio_info = prot.info.clone();
112        drop(prot);
113
114        spawn_mix_thread(MixThreadArgs {
115            audio_info,
116            buffer_map: self.buffer_map.clone(),
117            buffer_notify: self.buffer_notify.clone(),
118            effects_buffer: self.effects_buffer.clone(),
119            track_weights: self.track_weights.clone(),
120            effects_reset: self.effects_reset.clone(),
121            finished_tracks: self.finished_tracks.clone(),
122            prot: self.prot.clone(),
123            abort: self.abort.clone(),
124            start_time: self.start_time,
125            buffer_settings: self.buffer_settings.clone(),
126            effects: self.effects.clone(),
127            dsp_metrics: self.dsp_metrics.clone(),
128        })
129    }
130
131    /// Get the total duration (seconds) of the active selection.
132    pub fn get_duration(&self) -> f64 {
133        let prot = self.prot.lock().unwrap();
134        *prot.get_duration()
135    }
136
137    fn ready_buffer_map(&mut self, keys: &Vec<u32>) {
138        self.buffer_map = init_buffer_map();
139        self.track_weights.lock().unwrap().clear();
140
141        let prot = self.prot.lock().unwrap();
142        let sample_rate = prot.info.sample_rate;
143        let channels = prot.info.channels as usize;
144        let start_buffer_ms = self.buffer_settings.lock().unwrap().start_buffer_ms;
145        drop(prot);
146        let start_samples = ((sample_rate as f32 * start_buffer_ms) / 1000.0) as usize * channels;
147        let buffer_size = (sample_rate as usize * 10).max(start_samples * 2);
148
149        for key in keys {
150            let ring_buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from(vec![
151                    0.0;
152                    buffer_size
153                ])));
154            self.buffer_map
155                .lock()
156                .unwrap()
157                .insert(*key as u16, ring_buffer);
158            self.track_weights.lock().unwrap().insert(*key as u16, 1.0);
159        }
160    }
161
162    /// Return true if all tracks have reported end-of-stream.
163    pub fn finished_buffering(&self) -> bool {
164        let finished_tracks = self.finished_tracks.lock().unwrap();
165        let prot = self.prot.lock().unwrap();
166        let keys = prot.get_keys();
167        drop(prot);
168
169        for key in keys {
170            if !finished_tracks.contains(&(key as u16)) {
171                return false;
172            }
173        }
174
175        true
176    }
177}