mecomp_mpris/
lib.rs

1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10    rpc::{MusicPlayerClient, init_client},
11    state::{Percent, RepeatMode, StateAudio, Status},
12    udp::{Event, Listener, Message, StateChange},
13};
14use mecomp_storage::db::schemas::song::Song;
15use mpris_server::{
16    LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17    zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tarpc::context::Context;
20use tokio::sync::{RwLock, RwLockReadGuard};
21
22pub struct Mpris {
23    daemon: RwLock<Option<MusicPlayerClient>>,
24    pub port: u16,
25    pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29    /// Create a new Mpris instance pending a connection to a daemon.
30    #[must_use]
31    pub fn new(port: u16) -> Self {
32        Self {
33            daemon: RwLock::new(None),
34            port,
35            state: RwLock::new(StateAudio::default()),
36        }
37    }
38
39    /// Give access to the inner Daemon client (checks if the daemon is connected first).
40    pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
41        let mut maybedaemon = self.daemon.write().await;
42        if let Some(daemon) = maybedaemon.as_ref() {
43            let context = Context::current();
44            if daemon.ping(context).await.is_ok() {
45                return maybedaemon.downgrade();
46            }
47        }
48
49        // if we get here, either the daemon is not connected, or it's not responding
50        *maybedaemon = None;
51        log::info!("Lost connection to daemon, shutting down");
52        // spawn a new thread to kill the server after some delay
53        #[cfg(not(test))] // we don't want to exit the process in tests
54        std::thread::spawn(|| {
55            std::thread::sleep(Duration::from_secs(5));
56            std::process::exit(0);
57        });
58        // if let Err(e) = self.connect_with_retry().await {
59        //     log::error!("Failed to reconnect to daemon: {}", e);
60        // } else {
61        //     log::info!("Reconnected to daemon");
62        // }
63        maybedaemon.downgrade()
64    }
65
66    /// Create a new Mpris instance with a daemon already connected.
67    #[must_use]
68    pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
69        Self {
70            daemon: RwLock::new(Some(daemon)),
71            port: 0,
72            state: RwLock::new(StateAudio::default()),
73        }
74    }
75
76    /// Connect to the daemon.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the daemon cannot be connected.
81    pub async fn connect(&self) -> Result<()> {
82        if self.daemon.read().await.is_some() {
83            return Ok(());
84        }
85
86        let daemon = init_client(self.port).await.context(format!(
87            "Failed to connect to daemon on port: {}",
88            self.port
89        ))?;
90
91        *self.state.write().await = daemon
92            .state_audio(Context::current())
93            .await
94            .context(
95                "Failed to get initial state from daemon, please ensure the daemon is running",
96            )?
97            .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
98        *self.daemon.write().await = Some(daemon);
99
100        Ok(())
101    }
102
103    /// Connect to the daemon if not already connected.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the daemon cannot be connected after 5 retries.
108    pub async fn connect_with_retry(&self) -> Result<()> {
109        const MAX_RETRIES: u8 = 5;
110        const BASE_DELAY: Duration = Duration::from_secs(1);
111
112        let mut retries = 0;
113
114        while retries < MAX_RETRIES {
115            if let Err(e) = self.connect().await {
116                retries += 1;
117                log::warn!("Failed to connect to daemon: {e}");
118                tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119            } else {
120                return Ok(());
121            }
122        }
123
124        Err(anyhow!(
125            "Failed to connect to daemon on port {} after {} retries",
126            self.port,
127            MAX_RETRIES
128        ))
129    }
130
131    /// Start the Mpris server.
132    ///
133    /// Consumes self, but you can get back a reference to it by calling `imp()` on the returned `Server`.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the server cannot be started.
138    pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
139        Server::new(bus_name_suffix, self).await
140    }
141}
142
143#[derive(Debug)]
144pub enum MessageOutcomes {
145    Nothing,
146    Signal(Signal),
147    Properties(Vec<Property>),
148    Quit,
149}
150
151/// Should be the same as the tick rate used by other clients (e.g. the TUI).
152pub const TICK_RATE: Duration = Duration::from_millis(100);
153
154#[derive(Debug)]
155pub struct Subscriber;
156
157impl Subscriber {
158    /// Main loop for the UDP subscriber.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
163    pub async fn main_loop(
164        &self,
165        server: &Server<Mpris>,
166        // kill: tokio::sync::broadcast::Receiver<()>,
167    ) -> anyhow::Result<()> {
168        let mut listener = Listener::new().await?;
169
170        let maybe_daemon = server.imp().daemon().await;
171        if let Some(daemon) = maybe_daemon.as_ref() {
172            daemon
173                .register_listener(Context::current(), listener.local_addr()?)
174                .await?;
175        } else {
176            return Err(anyhow!("Daemon not connected"));
177        }
178        drop(maybe_daemon);
179
180        let mut ticker = tokio::time::interval(TICK_RATE);
181
182        #[allow(clippy::redundant_pub_crate)]
183        loop {
184            let daemon = server.imp().daemon().await;
185            let mut state = server.imp().state.write().await;
186
187            tokio::select! {
188                Ok(message) = listener.recv() => {
189                    match self
190                        .handle_message(message, &mut state, daemon.as_ref())
191                        .await?
192                    {
193                        MessageOutcomes::Nothing => continue,
194                        MessageOutcomes::Signal(signal) => server.emit(signal).await?,
195                        MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
196                        MessageOutcomes::Quit => break,
197                    }
198                }
199                _ = ticker.tick() => {
200                    if let Some(runtime) = &mut state.runtime {
201                        runtime.seek_position += TICK_RATE;
202                        runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
203                    }
204                }
205            }
206        }
207
208        Ok(())
209    }
210
211    /// Handle a message received from the UDP socket.
212    ///
213    /// # Returns
214    ///
215    /// Either nothing, a signal, a list of changed properties, or a notification to quit.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the message cannot be handled.
220    pub async fn handle_message(
221        &self,
222        message: Message,
223        state: &mut StateAudio,
224        daemon: Option<&MusicPlayerClient>,
225    ) -> anyhow::Result<MessageOutcomes> {
226        log::info!("Received event: {message:?}");
227        match message {
228            Message::Event(
229                Event::LibraryAnalysisFinished
230                | Event::LibraryReclusterFinished
231                | Event::LibraryRescanFinished,
232            ) => Ok(MessageOutcomes::Nothing),
233            Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
234            Message::StateChange(StateChange::Muted) => {
235                state.muted = true;
236                Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
237            }
238            Message::StateChange(StateChange::Unmuted) => {
239                state.muted = false;
240                Ok(MessageOutcomes::Properties(vec![Property::Volume(
241                    state.volume.into(),
242                )]))
243            }
244            Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
245                state.volume = new_volume;
246                Ok(MessageOutcomes::Properties(vec![Property::Volume(
247                    new_volume.into(),
248                )]))
249            }
250            // generally speaking, a lot can change when a track is changed, therefore we update the entire internal state (even if we only emit the new metadata)
251            Message::StateChange(StateChange::TrackChanged(_)) => {
252                let context = Context::current();
253                // we'll need to update the internal state with the new song (and it's duration info and such)
254                if let Some(daemon) = daemon {
255                    *state = daemon
256                        .state_audio(context)
257                        .await
258                        .context("Failed to get state from daemon")?
259                        .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
260                } else {
261                    state.current_song = None;
262                    state.runtime = None;
263                }
264
265                let metadata = metadata_from_opt_song(state.current_song.as_ref());
266                Ok(MessageOutcomes::Properties(vec![Property::Metadata(
267                    metadata,
268                )]))
269            }
270            Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
271                state.repeat_mode = new_mode;
272                Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
273                    match new_mode {
274                        RepeatMode::None => LoopStatus::None,
275                        RepeatMode::One => LoopStatus::Track,
276                        RepeatMode::All => LoopStatus::Playlist,
277                    },
278                )]))
279            }
280            Message::StateChange(StateChange::Seeked(position)) => {
281                if let Some(runtime) = &mut state.runtime {
282                    runtime.seek_position = position;
283                    runtime.seek_percent = Percent::new(
284                        position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
285                    );
286                }
287                Ok(MessageOutcomes::Signal(Signal::Seeked {
288                    position: Time::from_micros(
289                        i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
290                    ),
291                }))
292            }
293            Message::StateChange(StateChange::StatusChanged(status)) => {
294                state.status = status;
295                Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
296                    match status {
297                        Status::Stopped => PlaybackStatus::Stopped,
298                        Status::Paused => PlaybackStatus::Paused,
299                        Status::Playing => PlaybackStatus::Playing,
300                    },
301                )]))
302            }
303        }
304    }
305}
306
307#[must_use]
308pub fn metadata_from_opt_song(song: Option<&Song>) -> Metadata {
309    song.map_or_else(
310        || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
311        |song| {
312            Metadata::builder()
313                .trackid(object_path_from_thing(&song.id.clone().into()))
314                .length(Time::from_micros(
315                    i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
316                ))
317                .artist(song.artist.iter().map(ToString::to_string))
318                .album(song.album.to_string())
319                .title(song.title.to_string())
320                .build()
321        },
322    )
323}
324
325fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath {
326    ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
327        .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
328}
329
330#[cfg(test)]
331mod subscriber_tests {
332    use std::{num::NonZero, sync::Arc};
333
334    use super::*;
335    use mecomp_core::{audio::AudioKernelSender, config::Settings};
336    use mecomp_daemon::init_test_client_server;
337    use mecomp_storage::{
338        db::schemas::song::Song,
339        test_utils::{arb_song_case, init_test_database_with_state},
340    };
341    use mpris_server::Metadata;
342    use pretty_assertions::assert_str_eq;
343    use rstest::rstest;
344    use tempfile::TempDir;
345
346    #[rstest]
347    #[case::nothing(
348        Message::Event(Event::LibraryAnalysisFinished),
349        MessageOutcomes::Nothing
350    )]
351    #[case::nothing(
352        Message::Event(Event::LibraryReclusterFinished),
353        MessageOutcomes::Nothing
354    )]
355    #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
356    #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
357    #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
358    #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
359    #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
360    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
361    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
362    #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
363    #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
364    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
365    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
366    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
367    #[tokio::test]
368    async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
369        let tempdir = TempDir::new().unwrap();
370
371        let db = init_test_database_with_state(
372            NonZero::new(4).unwrap(),
373            |i| (arb_song_case()(), i > 1, i > 2),
374            None,
375            &tempdir,
376        )
377        .await;
378
379        let settings = Arc::new(Settings::default());
380
381        let (event_tx, _) = std::sync::mpsc::channel();
382
383        let audio_kernel = AudioKernelSender::start(event_tx);
384
385        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
386            .await
387            .unwrap();
388
389        let state = &mut StateAudio::default();
390
391        let actual = Subscriber
392            .handle_message(message, state, Some(&daemon))
393            .await
394            .unwrap();
395
396        // Since the structs from mpris_server don't implement PartialEq, we can't compare them directly, so instead we compare the Debug representations
397        assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
398    }
399}
400
401#[cfg(test)]
402pub mod test_utils {
403    use std::{num::NonZero, sync::Arc};
404
405    use super::*;
406    use mecomp_core::{audio::AudioKernelSender, config::Settings};
407    use mecomp_daemon::init_test_client_server;
408    use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
409    use rstest::fixture;
410    use surrealdb::{Surreal, engine::local::Db};
411    use tempfile::TempDir;
412
413    // Create a database with some songs, a playlist, and a collection
414    async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
415        init_test_database_with_state(
416            NonZero::new(4).unwrap(),
417            |i| (arb_song_case()(), i > 1, i > 2),
418            None,
419            tempdir,
420        )
421        .await
422    }
423
424    #[fixture]
425    pub async fn fixtures() -> (
426        Mpris,
427        std::sync::mpsc::Receiver<StateChange>,
428        TempDir,
429        Arc<AudioKernelSender>,
430    ) {
431        let tempdir = TempDir::new().unwrap();
432
433        let db = db(&tempdir).await;
434
435        let settings = Arc::new(Settings::default());
436
437        let (event_tx, event_rx) = std::sync::mpsc::channel();
438
439        let audio_kernel = AudioKernelSender::start(event_tx);
440
441        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
442            .await
443            .unwrap();
444
445        let mpris = Mpris::new_with_daemon(daemon);
446
447        (mpris, event_rx, tempdir, audio_kernel)
448    }
449}