Skip to main content

selene_daemon/player/
thread.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicU32, Ordering},
5        mpsc::{Sender, TryRecvError, channel},
6    },
7    thread,
8};
9
10use lunar_lib::{database::Database, debug, error};
11use selene_core::database::LibraryDb;
12
13use crate::{
14    SHUTDOWN,
15    decoder::{DecoderEvent, DecoderHandle},
16    event_handler::EventHandler,
17    player::{Player, PlayerError, PlayerRequest, playback::DeviceConfig},
18    playlist::{AtomicPlaybackStatus, PlaybackStatus, Playlist},
19    wait,
20};
21
22// Thread start impls
23impl Player {
24    fn new() -> Result<(Sender<PlayerRequest>, Self), PlayerError> {
25        let (player_tx, rx) = channel();
26
27        let event_tx = EventHandler::open(player_tx.clone())?;
28
29        let volume = Arc::new(AtomicU32::new(1.0_f32.to_bits()));
30        let playback_state = Arc::new(AtomicPlaybackStatus::new(PlaybackStatus::Stopped));
31        let playlist = Playlist::new(event_tx.clone());
32        let device_config = Arc::new(Mutex::new(DeviceConfig::default_config()?));
33
34        let decoder_tx = DecoderHandle::open(
35            player_tx.clone(),
36            event_tx.clone(),
37            device_config.clone(),
38            playback_state.clone(),
39            volume.clone(),
40        )?;
41
42        Ok((
43            player_tx,
44            Self {
45                rx,
46                decoder_tx,
47
48                event_tx,
49
50                playlist,
51                device_config,
52                playback_state,
53
54                volume,
55            },
56        ))
57    }
58
59    /// Opens a new [`Player`] and runs it on a new thread
60    pub fn open() -> Result<Sender<PlayerRequest>, PlayerError> {
61        let (player_tx, player) = Self::new()?;
62
63        thread::spawn(move || {
64            if let Err(err) = player.run() {
65                error!("Engine failed with error: {err}");
66            }
67            SHUTDOWN.store(true, Ordering::Relaxed);
68        });
69
70        Ok(player_tx)
71    }
72
73    /// Opens a new [`Player`] and runs it on the current thread
74    pub fn start() -> Result<(), PlayerError> {
75        let (_, player) = Self::new()?;
76        player.run()?;
77        SHUTDOWN.store(true, Ordering::Relaxed);
78        Ok(())
79    }
80
81    fn run(mut self) -> Result<(), PlayerError> {
82        debug!("Engine thread started");
83
84        loop {
85            if SHUTDOWN.load(Ordering::Relaxed) {
86                return Ok(());
87            }
88
89            loop {
90                match self.rx.try_recv() {
91                    Ok(PlayerRequest::Command(command)) => self.run_command(*command)?,
92                    Ok(PlayerRequest::DecoderEvent(DecoderEvent::PreloadConsumed)) => {
93                        let db = LibraryDb::open().unwrap();
94                        self.playlist.pop_next(&db)?;
95                        if let Some(peek_next) = self.playlist.peek_next(&db)? {
96                            self.preload(peek_next)?;
97                        }
98                    }
99                    Err(TryRecvError::Empty) => break,
100                    Err(TryRecvError::Disconnected) => {
101                        return Ok(());
102                    }
103                }
104            }
105
106            wait();
107            continue;
108        }
109    }
110}