Skip to main content

selene_daemon/
decoder.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicBool, AtomicU32, Ordering},
5        mpsc::{Receiver, Sender, TryRecvError, channel},
6    },
7    thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12
13use crate::{
14    SHUTDOWN,
15    event_handler::EventTx,
16    player::{
17        AtomicPlaybackStatus, OpenedDecoder, PlaybackStatus, PlayerError, PlayerEvent,
18        PlayerRequest, PlayerTx,
19        playback::{CpalHandle, DeviceConfig},
20    },
21    playlist::PlayingTrack,
22    wait,
23};
24
25pub struct DecoderHandle {
26    rx: Receiver<DecoderCommand>,
27    player_tx: Sender<PlayerRequest>,
28    event_tx: Sender<PlayerEvent>,
29
30    cpal_handle: Option<CpalHandle>,
31    device_config: Arc<Mutex<DeviceConfig>>,
32
33    current_decoder: Option<OpenedDecoder>,
34    preload_decoder: Option<OpenedDecoder>,
35
36    playback_state: Arc<AtomicPlaybackStatus>,
37    looping: Arc<AtomicBool>,
38    volume: Arc<AtomicU32>,
39}
40
41impl DecoderHandle {
42    pub(crate) fn open(
43        player_tx: Sender<PlayerRequest>,
44        event_tx: Sender<PlayerEvent>,
45        device_config: Arc<Mutex<DeviceConfig>>,
46        playback_state: Arc<AtomicPlaybackStatus>,
47        volume: Arc<AtomicU32>,
48        looping: Arc<AtomicBool>,
49    ) -> Result<Sender<DecoderCommand>, PlayerError> {
50        let (tx, rx) = channel();
51
52        let handle = Self {
53            rx,
54            player_tx,
55            event_tx,
56
57            cpal_handle: None,
58            device_config,
59
60            current_decoder: None,
61            preload_decoder: None,
62
63            playback_state,
64            looping,
65            volume,
66        };
67
68        thread::spawn(move || {
69            if let Err(err) = handle.run() {
70                error!("DecoderHandle failed with error: {err}");
71            }
72            SHUTDOWN.store(true, Ordering::Relaxed);
73        });
74
75        Ok(tx)
76    }
77
78    fn run(mut self) -> Result<(), PlayerError> {
79        loop {
80            if SHUTDOWN.load(Ordering::Relaxed) {
81                return Ok(());
82            }
83
84            loop {
85                match self.rx.try_recv() {
86                    Ok(command) => self.run_command(command)?,
87                    Err(TryRecvError::Empty) => break,
88                    Err(TryRecvError::Disconnected) => return Ok(()),
89                }
90            }
91
92            if !matches!(
93                self.playback_state.load(Ordering::Relaxed),
94                PlaybackStatus::Playing
95            ) {
96                wait();
97                continue;
98            }
99
100            if let Some(cpal_handle) = self.cpal_handle.as_mut()
101                && let Some(finished_consuming) = cpal_handle.consume_packet()
102                && !finished_consuming
103            {
104                wait();
105                continue;
106            }
107
108            let Some(mut decoder) = self.current_decoder.take() else {
109                if let Some(ref cpal_handle) = self.cpal_handle
110                    && !cpal_handle.audio_buf.is_empty()
111                {
112                    wait();
113                    continue;
114                }
115
116                self.cpal_handle = None;
117                self.playback_state
118                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
119                self.event_tx.event(PlayerEvent::PlaybackStopped);
120                continue;
121            };
122
123            // Use the current CpalHandle, or open a new one if one doesnt exist
124            let mut cpal_handle = self.get_cpal_handle()?;
125
126            if decoder.at_eof {
127                if self.looping.load(Ordering::Relaxed) {
128                    decoder.seek(0.0, false)?;
129                    decoder.started_at = None;
130                    decoder.sent_scrobble = false;
131                    decoder.decoded_frames = 0;
132
133                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
134                        currently_playing: decoder.decoded_from.clone(),
135                    });
136
137                    self.current_decoder = Some(decoder);
138                    self.cpal_handle = Some(cpal_handle);
139                    continue;
140                }
141
142                if let Some(preload) = self.preload_decoder.take() {
143                    self.player_tx
144                        .decoder_event(DecoderEvent::PreloadConsumed)?;
145
146                    let currently_playing = preload.decoded_from.clone();
147
148                    decoder = preload;
149
150                    self.event_tx
151                        .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
152                } else {
153                    self.cpal_handle = Some(cpal_handle);
154                    continue;
155                }
156            }
157
158            if decoder.started() {
159                self.player_tx.decoder_event(DecoderEvent::NowPlaying {
160                    track: decoder.decoded_from.clone(),
161                })?;
162            }
163
164            let packet = decoder.decode_next_packet()?;
165            cpal_handle.pending_packet = packet;
166
167            let decoded_time = decoder.decoded_time();
168            if (decoded_time >= (decoder.decoded_from.source.container().stream().duration() / 2.0)
169                || decoded_time >= 240.0)
170                && !decoder.sent_scrobble
171            {
172                decoder.sent_scrobble = true;
173                self.player_tx.decoder_event(DecoderEvent::Scrobble {
174                    track: decoder.decoded_from.clone(),
175                    start_time: decoder.start_time().unwrap(),
176                })?;
177            }
178
179            self.cpal_handle = Some(cpal_handle);
180            self.current_decoder = Some(decoder);
181        }
182    }
183
184    fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
185        match command {
186            DecoderCommand::Load { decoder } => {
187                let currently_playing = decoder.decoded_from.clone();
188                self.current_decoder = Some(*decoder);
189                self.event_tx
190                    .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
191            }
192            DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
193            DecoderCommand::LoadAndPreload { load, preload } => {
194                let currently_playing = load.decoded_from.clone();
195                self.current_decoder = Some(*load);
196                self.preload_decoder = Some(*preload);
197                self.event_tx
198                    .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
199            }
200            DecoderCommand::Unpreload => {
201                self.preload_decoder = None;
202            }
203            DecoderCommand::SetPlaying(playing) => {
204                if let Some(ref decoder) = self.current_decoder {
205                    let new_state = if playing {
206                        PlaybackStatus::Playing
207                    } else {
208                        PlaybackStatus::Paused
209                    };
210
211                    if self.playback_state.load(Ordering::SeqCst) == new_state {
212                        return Ok(());
213                    }
214
215                    self.playback_state.store(new_state, Ordering::SeqCst);
216
217                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
218                        is_playing: playing,
219                        changed_at: decoder.time(),
220                    });
221                }
222            }
223            DecoderCommand::TogglePlaying { callback } => {
224                let current_state = self.playback_state.load(Ordering::SeqCst);
225                if let Some(ref decoder) = self.current_decoder {
226                    let new_state = match current_state {
227                        PlaybackStatus::Playing => PlaybackStatus::Paused,
228                        PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
229                    };
230
231                    self.playback_state.store(new_state, Ordering::SeqCst);
232                    callback.send(new_state)?;
233
234                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
235                        is_playing: matches!(new_state, PlaybackStatus::Playing),
236                        changed_at: decoder.time(),
237                    });
238                } else {
239                    callback.send(PlaybackStatus::Stopped)?;
240                }
241            }
242            DecoderCommand::Seek {
243                time,
244                increment,
245                callback,
246            } => {
247                if let Some(ref mut opened_decoder) = self.current_decoder {
248                    let time = opened_decoder.seek(time, increment)?;
249                    self.event_tx.event(PlayerEvent::SeekOccured { time });
250                    callback.send(Some(time))?;
251                } else {
252                    callback.send(None)?;
253                }
254            }
255            DecoderCommand::GetTime { callback } => {
256                if let Some(ref opened_decoder) = self.current_decoder {
257                    callback.send(Some(opened_decoder.time()))?;
258                } else {
259                    callback.send(None)?;
260                }
261            }
262            DecoderCommand::GetPlaying { callback } => {
263                if let Some(ref opened_decoder) = self.current_decoder {
264                    callback.send(Some(opened_decoder.decoded_from.clone()))?;
265                } else {
266                    callback.send(None)?;
267                }
268            }
269            DecoderCommand::Stop => {
270                self.current_decoder = None;
271                self.preload_decoder = None;
272                self.playback_state
273                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
274                self.event_tx.event(PlayerEvent::PlaybackStopped);
275            }
276            DecoderCommand::Skip => {
277                if let Some(take) = self.preload_decoder.take() {
278                    self.player_tx
279                        .decoder_event(DecoderEvent::PreloadConsumed)?;
280                    let currently_playing = take.decoded_from.clone();
281                    self.current_decoder = Some(take);
282                    self.event_tx
283                        .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
284                } else {
285                    self.current_decoder = None;
286                }
287            }
288        }
289        Ok(())
290    }
291}
292
293impl DecoderHandle {
294    /// Returns the current cpal handle, opening one of one does not exist
295    ///
296    /// # Errors
297    ///
298    /// Errors if the [`CpalHandle`] fails to open
299    pub fn get_cpal_handle(&mut self) -> Result<CpalHandle, PlayerError> {
300        if let Some(current) = self.cpal_handle.take() {
301            Ok(current)
302        } else {
303            CpalHandle::open(&*self.device_config.lock()?, self.volume.clone())
304        }
305    }
306}
307
308pub enum DecoderEvent {
309    PreloadConsumed,
310    Scrobble {
311        track: PlayingTrack,
312        start_time: u64,
313    },
314    NowPlaying {
315        track: PlayingTrack,
316    },
317}
318
319pub enum DecoderCommand {
320    Load {
321        decoder: Box<OpenedDecoder>,
322    },
323    Preload {
324        decoder: Box<OpenedDecoder>,
325    },
326    Unpreload,
327    LoadAndPreload {
328        load: Box<OpenedDecoder>,
329        preload: Box<OpenedDecoder>,
330    },
331    SetPlaying(bool),
332    TogglePlaying {
333        callback: Sender<PlaybackStatus>,
334    },
335    Seek {
336        time: f64,
337        increment: bool,
338        callback: Sender<Option<f64>>,
339    },
340    GetTime {
341        callback: Sender<Option<f64>>,
342    },
343    GetPlaying {
344        callback: Sender<Option<PlayingTrack>>,
345    },
346    Skip,
347    Stop,
348}
349
350pub trait DecoderTx {
351    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
352    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
353    fn load_and_preload(
354        &self,
355        load: OpenedDecoder,
356        preload: OpenedDecoder,
357    ) -> Result<(), PlayerError>;
358    fn unpreload(&self) -> Result<(), PlayerError>;
359    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
360    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
361    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
362    fn get_time(&self) -> Result<Option<f64>, PlayerError>;
363    fn get_playing(&self) -> Result<Option<PlayingTrack>, PlayerError>;
364    fn stop(&self) -> Result<(), PlayerError>;
365    fn skip(&self) -> Result<(), PlayerError>;
366}
367
368impl DecoderTx for Sender<DecoderCommand> {
369    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
370        self.send(DecoderCommand::Load {
371            decoder: Box::new(decoder),
372        })?;
373        Ok(())
374    }
375
376    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
377        self.send(DecoderCommand::Preload {
378            decoder: Box::new(decoder),
379        })?;
380        Ok(())
381    }
382
383    fn load_and_preload(
384        &self,
385        load: OpenedDecoder,
386        preload: OpenedDecoder,
387    ) -> Result<(), PlayerError> {
388        self.send(DecoderCommand::LoadAndPreload {
389            load: Box::new(load),
390            preload: Box::new(preload),
391        })?;
392        Ok(())
393    }
394
395    fn unpreload(&self) -> Result<(), PlayerError> {
396        self.send(DecoderCommand::Unpreload)?;
397        Ok(())
398    }
399
400    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
401        self.send(DecoderCommand::SetPlaying(is_playing))?;
402        Ok(())
403    }
404
405    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
406        let (tx, rx) = channel();
407        self.send(DecoderCommand::TogglePlaying { callback: tx })?;
408        Ok(rx.recv()?)
409    }
410
411    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
412        let (tx, rx) = channel();
413        self.send(DecoderCommand::Seek {
414            time,
415            increment,
416            callback: tx,
417        })?;
418        Ok(rx.recv()?)
419    }
420
421    fn get_time(&self) -> Result<Option<f64>, PlayerError> {
422        let (tx, rx) = channel();
423        self.send(DecoderCommand::GetTime { callback: tx })?;
424        Ok(rx.recv()?)
425    }
426
427    fn get_playing(&self) -> Result<Option<PlayingTrack>, PlayerError> {
428        let (tx, rx) = channel();
429        self.send(DecoderCommand::GetPlaying { callback: tx })?;
430        Ok(rx.recv()?)
431    }
432
433    fn stop(&self) -> Result<(), PlayerError> {
434        self.send(DecoderCommand::Stop)?;
435        Ok(())
436    }
437
438    fn skip(&self) -> Result<(), PlayerError> {
439        self.send(DecoderCommand::Skip)?;
440        Ok(())
441    }
442}