Skip to main content

selene_daemon/player/
thread.rs

1use std::{
2    cell::LazyCell,
3    sync::{
4        Arc, Mutex,
5        atomic::{AtomicBool, AtomicU32, Ordering},
6        mpsc::{Sender, TryRecvError, channel},
7    },
8    time::Instant,
9};
10
11use lunar_lib::{database::DbHandle, debug};
12use selene_core::database::LibraryDb;
13
14use crate::{
15    LoopMode, SHUTDOWN, STATE_CHANGED,
16    config::daemon_config,
17    decoder::{DecoderEvent, DecoderHandle},
18    event_handler::EventHandler,
19    player::{
20        AtomicPlaybackStatus, PlaybackStatus, Player, PlayerError, PlayerRequest,
21        playback::DeviceConfig, state::PlayerState,
22    },
23    playlist::{PlayingTrack, Playlist},
24    wait,
25};
26
27// Thread start impls
28impl Player {
29    fn new() -> Result<(Sender<PlayerRequest>, Self), PlayerError> {
30        let state = PlayerState::load();
31
32        let (player_tx, rx) = channel();
33
34        let event_tx = EventHandler::open(player_tx.clone())?;
35
36        state.trigger_events(&event_tx);
37
38        let volume = Arc::new(AtomicU32::new(state.volume.to_bits()));
39        let playback_state = Arc::new(AtomicPlaybackStatus::new(PlaybackStatus::Stopped));
40        let looping = Arc::new(AtomicBool::new(matches!(
41            state.loop_mode,
42            LoopMode::RepeatTrack
43        )));
44
45        let playlist = Playlist::new(event_tx.clone(), looping.clone(), &state)?;
46
47        let device_config = Arc::new(Mutex::new(DeviceConfig::default_config()?));
48
49        let decoder_tx = DecoderHandle::open(
50            player_tx.clone(),
51            event_tx.clone(),
52            device_config.clone(),
53            playback_state.clone(),
54            volume.clone(),
55            looping,
56        )?;
57
58        Ok((
59            player_tx,
60            Self {
61                rx,
62                decoder_tx,
63
64                event_tx,
65
66                playlist,
67                device_config,
68                playback_state,
69
70                volume,
71            },
72        ))
73    }
74
75    /// Opens a new [`Player`] and runs it on the current thread
76    pub fn start() -> Result<(), PlayerError> {
77        let (_, player) = Self::new()?;
78        player.run()?;
79        SHUTDOWN.store(true, Ordering::Relaxed);
80        Ok(())
81    }
82
83    fn run(mut self) -> Result<(), PlayerError> {
84        debug!("Engine thread started");
85
86        self.replace_decoders(false)?;
87
88        let mut last_state_save = Instant::now();
89
90        loop {
91            if SHUTDOWN.load(Ordering::Relaxed) {
92                return Ok(());
93            }
94
95            {
96                let db = LazyCell::new(|| DbHandle::<LibraryDb>::open().unwrap());
97                loop {
98                    match self.rx.try_recv() {
99                        Ok(PlayerRequest::Command(command)) => self.run_command(*command, &db)?,
100                        Ok(PlayerRequest::DecoderEvent(DecoderEvent::PreloadConsumed)) => {
101                            self.playlist.pop_next(&db)?;
102                            self.try_preload(&db)?;
103                        }
104                        Ok(PlayerRequest::DecoderEvent(DecoderEvent::Scrobble {
105                            track,
106                            start_time,
107                        })) => {
108                            scrobble(track, start_time);
109                        }
110                        Ok(PlayerRequest::DecoderEvent(DecoderEvent::NowPlaying { track })) => {
111                            now_playing(track);
112                        }
113                        Err(TryRecvError::Empty) => break,
114                        Err(TryRecvError::Disconnected) => {
115                            return Ok(());
116                        }
117                    }
118                }
119            }
120
121            if STATE_CHANGED.load(Ordering::Relaxed)
122                && last_state_save.elapsed().as_secs_f64() > 5.0
123            {
124                last_state_save = Instant::now();
125                let _ = PlayerState::save(&self);
126                STATE_CHANGED.store(false, Ordering::Release);
127            }
128
129            wait();
130            continue;
131        }
132    }
133}
134
135#[allow(unused_variables)]
136fn scrobble(track: PlayingTrack, start_time: u64) {
137    if !daemon_config().main.scrobbling {
138        return;
139    }
140
141    #[cfg(feature = "lastfm")]
142    {
143        use lunar_lib::error;
144        use selene_core::scrobbling::lastfm::last_fm_client;
145
146        if let Some(last_fm_client) = last_fm_client() {
147            tokio::runtime::Handle::current().spawn(async move {
148                if let Err(err) = last_fm_client
149                    .scrobble(&track.source, start_time, true)
150                    .await
151                {
152                    error!("LastFM scrobble error: {err}");
153                }
154            });
155        }
156    }
157}
158
159#[allow(unused_variables)]
160fn now_playing(track: PlayingTrack) {
161    if !daemon_config().main.scrobbling {
162        return;
163    }
164
165    #[cfg(feature = "lastfm")]
166    {
167        use lunar_lib::error;
168        use selene_core::scrobbling::lastfm::last_fm_client;
169
170        if let Some(last_fm_client) = last_fm_client() {
171            tokio::runtime::Handle::current().spawn(async move {
172                if let Err(err) = last_fm_client.now_playing(&track.source).await {
173                    error!("LastFM now_playing error: {err}");
174                }
175            });
176        }
177    }
178}