Skip to main content

selene_daemon/
decoder.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicU32, Ordering},
5        mpsc::{Receiver, Sender, TryRecvError, channel},
6    },
7    thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12
13use crate::{
14    PlayerEvent, SHUTDOWN,
15    event_handler::EventTx,
16    player::{
17        OpenedDecoder, PlayerError, PlayerRequest, PlayerTx,
18        playback::{CpalHandle, DeviceConfig},
19    },
20    playlist::{AtomicPlaybackStatus, PlaybackStatus, ResolvedTrack},
21    wait,
22};
23
24pub struct DecoderHandle {
25    rx: Receiver<DecoderCommand>,
26    player_tx: Sender<PlayerRequest>,
27    event_tx: Sender<PlayerEvent>,
28
29    cpal_handle: Option<CpalHandle>,
30    device_config: Arc<Mutex<DeviceConfig>>,
31
32    current_decoder: Option<OpenedDecoder>,
33    preload_decoder: Option<OpenedDecoder>,
34
35    playback_state: Arc<AtomicPlaybackStatus>,
36    volume: Arc<AtomicU32>,
37}
38
39impl DecoderHandle {
40    pub(crate) fn open(
41        player_tx: Sender<PlayerRequest>,
42        event_tx: Sender<PlayerEvent>,
43        device_config: Arc<Mutex<DeviceConfig>>,
44        playback_state: Arc<AtomicPlaybackStatus>,
45        volume: Arc<AtomicU32>,
46    ) -> Result<Sender<DecoderCommand>, PlayerError> {
47        let (tx, rx) = channel();
48
49        let handle = Self {
50            rx,
51            player_tx,
52            event_tx,
53
54            cpal_handle: None,
55            device_config,
56
57            current_decoder: None,
58            preload_decoder: None,
59
60            playback_state,
61            volume,
62        };
63
64        thread::spawn(move || {
65            if let Err(err) = handle.run() {
66                error!("DecoderHandle failed with error: {err}");
67            }
68            SHUTDOWN.store(true, Ordering::Relaxed);
69        });
70
71        Ok(tx)
72    }
73
74    fn run(mut self) -> Result<(), PlayerError> {
75        loop {
76            if SHUTDOWN.load(Ordering::Relaxed) {
77                return Ok(());
78            }
79
80            loop {
81                match self.rx.try_recv() {
82                    Ok(command) => self.run_command(command)?,
83                    Err(TryRecvError::Empty) => break,
84                    Err(TryRecvError::Disconnected) => return Ok(()),
85                }
86            }
87
88            if !matches!(
89                self.playback_state.load(Ordering::Relaxed),
90                PlaybackStatus::Playing
91            ) {
92                wait();
93                continue;
94            }
95
96            let volume = f32::from_bits(self.volume.load(Ordering::Relaxed));
97            if let Some(ref mut cpal_handle) = self.cpal_handle
98                && let Some(finished_consuming) = cpal_handle.consume_packet(volume)
99                && !finished_consuming
100            {
101                wait();
102                continue;
103            }
104
105            if self.current_decoder.is_none() {
106                if let Some(ref cpal_handle) = self.cpal_handle
107                    && !cpal_handle.audio_buf.is_empty()
108                {
109                    wait();
110                    continue;
111                }
112
113                self.cpal_handle = None;
114                self.playback_state
115                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
116                self.event_tx.event(PlayerEvent::PlaybackStopped);
117                continue;
118            }
119
120            // Use the current CpalHandle, or open a new one if one doesnt exist
121            self.ensure_cpal_handle()?;
122
123            if self.current_decoder.as_ref().unwrap().at_eof {
124                if let Some(take) = self.preload_decoder.take() {
125                    self.player_tx
126                        .decoder_event(DecoderEvent::PreloadConsumed)?;
127                    let currently_playing = take.decoded_from.clone();
128                    self.current_decoder = Some(take);
129                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
130                        currently_playing: Box::new(currently_playing),
131                    });
132                } else {
133                    self.current_decoder = None;
134                    continue;
135                }
136            }
137
138            let packet = self
139                .current_decoder
140                .as_mut()
141                .unwrap()
142                .decode_next_packet()?;
143            self.cpal_handle.as_mut().unwrap().pending_packet = packet;
144        }
145    }
146
147    fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
148        match command {
149            DecoderCommand::Load { decoder } => {
150                let currently_playing = decoder.decoded_from.clone();
151                self.current_decoder = Some(*decoder);
152                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
153                    currently_playing: Box::new(currently_playing),
154                });
155            }
156            DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
157            DecoderCommand::LoadAndPreload { load, preload } => {
158                let currently_playing = load.decoded_from.clone();
159                self.current_decoder = Some(*load);
160                self.preload_decoder = Some(*preload);
161                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
162                    currently_playing: Box::new(currently_playing),
163                });
164            }
165            DecoderCommand::SetPlaying(playing) => {
166                if let Some(ref decoder) = self.current_decoder {
167                    let new_state = if playing {
168                        PlaybackStatus::Playing
169                    } else {
170                        PlaybackStatus::Paused
171                    };
172
173                    if self.playback_state.load(Ordering::SeqCst) == new_state {
174                        return Ok(());
175                    }
176
177                    self.playback_state.store(new_state, Ordering::SeqCst);
178
179                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
180                        is_playing: playing,
181                        changed_at: decoder.time(),
182                    });
183                }
184            }
185            DecoderCommand::TogglePlaying { callback } => {
186                let current_state = self.playback_state.load(Ordering::SeqCst);
187                if let Some(ref decoder) = self.current_decoder {
188                    let new_state = match current_state {
189                        PlaybackStatus::Playing => PlaybackStatus::Paused,
190                        PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
191                    };
192
193                    self.playback_state.store(new_state, Ordering::SeqCst);
194                    callback.send(new_state)?;
195
196                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
197                        is_playing: matches!(new_state, PlaybackStatus::Playing),
198                        changed_at: decoder.time(),
199                    });
200                } else {
201                    callback.send(PlaybackStatus::Stopped)?;
202                }
203            }
204            DecoderCommand::Seek {
205                time,
206                increment,
207                callback,
208            } => {
209                if let Some(ref mut opened_decoder) = self.current_decoder {
210                    let time = opened_decoder.seek(time, increment)?;
211                    self.event_tx.event(PlayerEvent::SeekOccured { time });
212                    callback.send(Some(time))?;
213                } else {
214                    callback.send(None)?;
215                }
216            }
217            DecoderCommand::GetTime { callback } => {
218                if let Some(ref opened_decoder) = self.current_decoder {
219                    callback.send(Some(opened_decoder.time()))?;
220                } else {
221                    callback.send(None)?;
222                }
223            }
224            DecoderCommand::GetPlaying { callback } => {
225                if let Some(ref opened_decoder) = self.current_decoder {
226                    callback.send(Some(opened_decoder.decoded_from.clone()))?;
227                } else {
228                    callback.send(None)?;
229                }
230            }
231            DecoderCommand::Stop => {
232                self.current_decoder = None;
233                self.preload_decoder = None;
234                self.playback_state
235                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
236                self.event_tx.event(PlayerEvent::PlaybackStopped);
237            }
238            DecoderCommand::Skip => {
239                if let Some(take) = self.preload_decoder.take() {
240                    self.player_tx
241                        .decoder_event(DecoderEvent::PreloadConsumed)?;
242                    let currently_playing = take.decoded_from.clone();
243                    self.current_decoder = Some(take);
244                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
245                        currently_playing: Box::new(currently_playing),
246                    });
247                } else {
248                    self.current_decoder = None;
249                }
250            }
251        }
252        Ok(())
253    }
254}
255
256impl DecoderHandle {
257    /// Returns a mutable reference to the current [`CpalHandle`], opening one of one does not exist
258    ///
259    /// # Errors
260    ///
261    /// Errors if the [`CpalHandle`] fails to open
262    pub fn ensure_cpal_handle(&mut self) -> Result<(), PlayerError> {
263        if self.cpal_handle.is_none() {
264            self.cpal_handle = Some(CpalHandle::open(&*self.device_config.lock()?)?);
265        }
266
267        Ok(())
268    }
269}
270
271pub enum DecoderEvent {
272    PreloadConsumed,
273}
274
275pub enum DecoderCommand {
276    Load {
277        decoder: Box<OpenedDecoder>,
278    },
279    Preload {
280        decoder: Box<OpenedDecoder>,
281    },
282    LoadAndPreload {
283        load: Box<OpenedDecoder>,
284        preload: Box<OpenedDecoder>,
285    },
286    SetPlaying(bool),
287    TogglePlaying {
288        callback: Sender<PlaybackStatus>,
289    },
290    Seek {
291        time: f64,
292        increment: bool,
293        callback: Sender<Option<f64>>,
294    },
295    GetTime {
296        callback: Sender<Option<f64>>,
297    },
298    GetPlaying {
299        callback: Sender<Option<ResolvedTrack>>,
300    },
301    Skip,
302    Stop,
303}
304
305pub trait DecoderTx {
306    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
307    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
308    fn load_and_preload(
309        &self,
310        load: OpenedDecoder,
311        preload: OpenedDecoder,
312    ) -> Result<(), PlayerError>;
313    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
314    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
315    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
316    fn get_time(&self) -> Result<Option<f64>, PlayerError>;
317    fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError>;
318    fn stop(&self) -> Result<(), PlayerError>;
319    fn skip(&self) -> Result<(), PlayerError>;
320}
321
322impl DecoderTx for Sender<DecoderCommand> {
323    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
324        self.send(DecoderCommand::Load {
325            decoder: Box::new(decoder),
326        })?;
327        Ok(())
328    }
329
330    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
331        self.send(DecoderCommand::Preload {
332            decoder: Box::new(decoder),
333        })?;
334        Ok(())
335    }
336
337    fn load_and_preload(
338        &self,
339        load: OpenedDecoder,
340        preload: OpenedDecoder,
341    ) -> Result<(), PlayerError> {
342        self.send(DecoderCommand::LoadAndPreload {
343            load: Box::new(load),
344            preload: Box::new(preload),
345        })?;
346        Ok(())
347    }
348
349    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
350        self.send(DecoderCommand::SetPlaying(is_playing))?;
351        Ok(())
352    }
353
354    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
355        let (tx, rx) = channel();
356        self.send(DecoderCommand::TogglePlaying { callback: tx })?;
357        Ok(rx.recv()?)
358    }
359
360    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
361        let (tx, rx) = channel();
362        self.send(DecoderCommand::Seek {
363            time,
364            increment,
365            callback: tx,
366        })?;
367        Ok(rx.recv()?)
368    }
369
370    fn get_time(&self) -> Result<Option<f64>, PlayerError> {
371        let (tx, rx) = channel();
372        self.send(DecoderCommand::GetTime { callback: tx })?;
373        Ok(rx.recv()?)
374    }
375
376    fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError> {
377        let (tx, rx) = channel();
378        self.send(DecoderCommand::GetPlaying { callback: tx })?;
379        Ok(rx.recv()?)
380    }
381
382    fn stop(&self) -> Result<(), PlayerError> {
383        self.send(DecoderCommand::Stop)?;
384        Ok(())
385    }
386
387    fn skip(&self) -> Result<(), PlayerError> {
388        self.send(DecoderCommand::Skip)?;
389        Ok(())
390    }
391}