Skip to main content

selene_daemon/
decoder.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicU32, Ordering},
5        mpsc::{Receiver, Sender, TryRecvError, channel},
6    },
7    thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12use selene_core::library::track::ResolvedTrack;
13
14use crate::{
15    PlayerEvent, SHUTDOWN,
16    event_handler::EventTx,
17    player::{
18        OpenedDecoder, PlayerError, PlayerRequest, PlayerTx,
19        playback::{CpalHandle, DeviceConfig},
20    },
21    playlist::{AtomicPlaybackStatus, PlaybackStatus},
22    wait,
23};
24
25pub struct DecoderHandle {
26    rx: Receiver<DecoderCommand>,
27    player_tx: Sender<PlayerRequest>,
28    event_tx: Sender<PlayerEvent>,
29
30    cpal_handle: Option<CpalHandle>,
31    device_config: Arc<Mutex<DeviceConfig>>,
32
33    current_decoder: Option<OpenedDecoder>,
34    preload_decoder: Option<OpenedDecoder>,
35
36    playback_state: Arc<AtomicPlaybackStatus>,
37    volume: Arc<AtomicU32>,
38}
39
40impl DecoderHandle {
41    pub(crate) fn open(
42        player_tx: Sender<PlayerRequest>,
43        event_tx: Sender<PlayerEvent>,
44        device_config: Arc<Mutex<DeviceConfig>>,
45        playback_state: Arc<AtomicPlaybackStatus>,
46        volume: Arc<AtomicU32>,
47    ) -> Result<Sender<DecoderCommand>, PlayerError> {
48        let (tx, rx) = channel();
49
50        let handle = Self {
51            rx,
52            player_tx,
53            event_tx,
54
55            cpal_handle: None,
56            device_config,
57
58            current_decoder: None,
59            preload_decoder: None,
60
61            playback_state,
62            volume,
63        };
64
65        thread::spawn(move || {
66            if let Err(err) = handle.run() {
67                error!("DecoderHandle failed with error: {err}");
68            }
69            SHUTDOWN.store(true, Ordering::Relaxed);
70        });
71
72        Ok(tx)
73    }
74
75    fn run(mut self) -> Result<(), PlayerError> {
76        loop {
77            if SHUTDOWN.load(Ordering::Relaxed) {
78                return Ok(());
79            }
80
81            loop {
82                match self.rx.try_recv() {
83                    Ok(command) => self.run_command(command)?,
84                    Err(TryRecvError::Empty) => break,
85                    Err(TryRecvError::Disconnected) => return Ok(()),
86                }
87            }
88
89            if !matches!(
90                self.playback_state.load(Ordering::Relaxed),
91                PlaybackStatus::Playing
92            ) {
93                wait();
94                continue;
95            }
96
97            let volume = f32::from_bits(self.volume.load(Ordering::Relaxed));
98            if let Some(ref mut cpal_handle) = self.cpal_handle
99                && let Some(finished_consuming) = cpal_handle.consume_packet(volume)
100                && !finished_consuming
101            {
102                wait();
103                continue;
104            }
105
106            if self.current_decoder.is_none() {
107                if let Some(ref cpal_handle) = self.cpal_handle
108                    && !cpal_handle.audio_buf.is_empty()
109                {
110                    wait();
111                    continue;
112                }
113
114                self.cpal_handle = None;
115                self.playback_state
116                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
117                self.event_tx.event(PlayerEvent::PlaybackStopped);
118                continue;
119            }
120
121            // Use the current CpalHandle, or open a new one if one doesnt exist
122            self.ensure_cpal_handle()?;
123
124            if self.current_decoder.as_ref().unwrap().at_eof {
125                if let Some(preload) = self.preload_decoder.take() {
126                    self.player_tx
127                        .decoder_event(DecoderEvent::PreloadConsumed)?;
128
129                    let currently_playing = Box::new((*preload.decoded_from).clone());
130                    self.current_decoder = Some(preload);
131
132                    self.event_tx
133                        .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
134                } else {
135                    self.current_decoder = None;
136                    continue;
137                }
138            }
139
140            let Some(decoder) = self.current_decoder.as_mut() else {
141                panic!("Decoder was already validated")
142            };
143
144            if decoder.started() {
145                self.player_tx.decoder_event(DecoderEvent::NowPlaying {
146                    track: decoder.decoded_from.clone(),
147                })?
148            };
149
150            let packet = decoder.decode_next_packet()?;
151            self.cpal_handle.as_mut().unwrap().pending_packet = packet;
152
153            let decoded_time = decoder.decoded_time();
154            if (decoded_time >= (decoder.decoded_from.track.container().stream().duration() / 2.0)
155                || decoded_time >= 240.0)
156                && !decoder.sent_scrobble
157            {
158                decoder.sent_scrobble = true;
159                self.player_tx.decoder_event(DecoderEvent::Scrobble {
160                    track: decoder.decoded_from.clone(),
161                    start_time: decoder.start_time().unwrap(),
162                })?
163            }
164        }
165    }
166
167    fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
168        match command {
169            DecoderCommand::Load { decoder } => {
170                let currently_playing = decoder.decoded_from.clone();
171                self.current_decoder = Some(*decoder);
172                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
173                    currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
174                });
175            }
176            DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
177            DecoderCommand::LoadAndPreload { load, preload } => {
178                let currently_playing = load.decoded_from.clone();
179                self.current_decoder = Some(*load);
180                self.preload_decoder = Some(*preload);
181                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
182                    currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
183                });
184            }
185            DecoderCommand::SetPlaying(playing) => {
186                if let Some(ref decoder) = self.current_decoder {
187                    let new_state = if playing {
188                        PlaybackStatus::Playing
189                    } else {
190                        PlaybackStatus::Paused
191                    };
192
193                    if self.playback_state.load(Ordering::SeqCst) == new_state {
194                        return Ok(());
195                    }
196
197                    self.playback_state.store(new_state, Ordering::SeqCst);
198
199                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
200                        is_playing: playing,
201                        changed_at: decoder.time(),
202                    });
203                }
204            }
205            DecoderCommand::TogglePlaying { callback } => {
206                let current_state = self.playback_state.load(Ordering::SeqCst);
207                if let Some(ref decoder) = self.current_decoder {
208                    let new_state = match current_state {
209                        PlaybackStatus::Playing => PlaybackStatus::Paused,
210                        PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
211                    };
212
213                    self.playback_state.store(new_state, Ordering::SeqCst);
214                    callback.send(new_state)?;
215
216                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
217                        is_playing: matches!(new_state, PlaybackStatus::Playing),
218                        changed_at: decoder.time(),
219                    });
220                } else {
221                    callback.send(PlaybackStatus::Stopped)?;
222                }
223            }
224            DecoderCommand::Seek {
225                time,
226                increment,
227                callback,
228            } => {
229                if let Some(ref mut opened_decoder) = self.current_decoder {
230                    let time = opened_decoder.seek(time, increment)?;
231                    self.event_tx.event(PlayerEvent::SeekOccured { time });
232                    callback.send(Some(time))?;
233                } else {
234                    callback.send(None)?;
235                }
236            }
237            DecoderCommand::GetTime { callback } => {
238                if let Some(ref opened_decoder) = self.current_decoder {
239                    callback.send(Some(opened_decoder.time()))?;
240                } else {
241                    callback.send(None)?;
242                }
243            }
244            DecoderCommand::GetPlaying { callback } => {
245                if let Some(ref opened_decoder) = self.current_decoder {
246                    callback.send(Some(opened_decoder.decoded_from.clone()))?;
247                } else {
248                    callback.send(None)?;
249                }
250            }
251            DecoderCommand::Stop => {
252                self.current_decoder = None;
253                self.preload_decoder = None;
254                self.playback_state
255                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
256                self.event_tx.event(PlayerEvent::PlaybackStopped);
257            }
258            DecoderCommand::Skip => {
259                if let Some(take) = self.preload_decoder.take() {
260                    self.player_tx
261                        .decoder_event(DecoderEvent::PreloadConsumed)?;
262                    let currently_playing = take.decoded_from.clone();
263                    self.current_decoder = Some(take);
264                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
265                        currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
266                    });
267                } else {
268                    self.current_decoder = None;
269                }
270            }
271        }
272        Ok(())
273    }
274}
275
276impl DecoderHandle {
277    /// Returns a mutable reference to the current [`CpalHandle`], opening one of one does not exist
278    ///
279    /// # Errors
280    ///
281    /// Errors if the [`CpalHandle`] fails to open
282    pub fn ensure_cpal_handle(&mut self) -> Result<(), PlayerError> {
283        if self.cpal_handle.is_none() {
284            self.cpal_handle = Some(CpalHandle::open(&*self.device_config.lock()?)?);
285        }
286
287        Ok(())
288    }
289}
290
291pub enum DecoderEvent {
292    PreloadConsumed,
293    Scrobble {
294        track: Arc<ResolvedTrack>,
295        start_time: u64,
296    },
297    NowPlaying {
298        track: Arc<ResolvedTrack>,
299    },
300}
301
302pub enum DecoderCommand {
303    Load {
304        decoder: Box<OpenedDecoder>,
305    },
306    Preload {
307        decoder: Box<OpenedDecoder>,
308    },
309    LoadAndPreload {
310        load: Box<OpenedDecoder>,
311        preload: Box<OpenedDecoder>,
312    },
313    SetPlaying(bool),
314    TogglePlaying {
315        callback: Sender<PlaybackStatus>,
316    },
317    Seek {
318        time: f64,
319        increment: bool,
320        callback: Sender<Option<f64>>,
321    },
322    GetTime {
323        callback: Sender<Option<f64>>,
324    },
325    GetPlaying {
326        callback: Sender<Option<Arc<ResolvedTrack>>>,
327    },
328    Skip,
329    Stop,
330}
331
332pub trait DecoderTx {
333    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
334    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
335    fn load_and_preload(
336        &self,
337        load: OpenedDecoder,
338        preload: OpenedDecoder,
339    ) -> Result<(), PlayerError>;
340    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
341    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
342    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
343    fn get_time(&self) -> Result<Option<f64>, PlayerError>;
344    fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError>;
345    fn stop(&self) -> Result<(), PlayerError>;
346    fn skip(&self) -> Result<(), PlayerError>;
347}
348
349impl DecoderTx for Sender<DecoderCommand> {
350    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
351        self.send(DecoderCommand::Load {
352            decoder: Box::new(decoder),
353        })?;
354        Ok(())
355    }
356
357    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
358        self.send(DecoderCommand::Preload {
359            decoder: Box::new(decoder),
360        })?;
361        Ok(())
362    }
363
364    fn load_and_preload(
365        &self,
366        load: OpenedDecoder,
367        preload: OpenedDecoder,
368    ) -> Result<(), PlayerError> {
369        self.send(DecoderCommand::LoadAndPreload {
370            load: Box::new(load),
371            preload: Box::new(preload),
372        })?;
373        Ok(())
374    }
375
376    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
377        self.send(DecoderCommand::SetPlaying(is_playing))?;
378        Ok(())
379    }
380
381    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
382        let (tx, rx) = channel();
383        self.send(DecoderCommand::TogglePlaying { callback: tx })?;
384        Ok(rx.recv()?)
385    }
386
387    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
388        let (tx, rx) = channel();
389        self.send(DecoderCommand::Seek {
390            time,
391            increment,
392            callback: tx,
393        })?;
394        Ok(rx.recv()?)
395    }
396
397    fn get_time(&self) -> Result<Option<f64>, PlayerError> {
398        let (tx, rx) = channel();
399        self.send(DecoderCommand::GetTime { callback: tx })?;
400        Ok(rx.recv()?)
401    }
402
403    fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError> {
404        let (tx, rx) = channel();
405        self.send(DecoderCommand::GetPlaying { callback: tx })?;
406        Ok(rx.recv()?)
407    }
408
409    fn stop(&self) -> Result<(), PlayerError> {
410        self.send(DecoderCommand::Stop)?;
411        Ok(())
412    }
413
414    fn skip(&self) -> Result<(), PlayerError> {
415        self.send(DecoderCommand::Skip)?;
416        Ok(())
417    }
418}