mecomp_mpris/
lib.rs

1pub mod interfaces;
2
3use std::time::Duration;
4
5use anyhow::{anyhow, Context as _, Result};
6
7use mecomp_core::{
8    rpc::{init_client, MusicPlayerClient},
9    state::{Percent, RepeatMode, StateAudio, Status},
10    udp::{Event, Listener, Message, StateChange},
11};
12use mecomp_storage::db::schemas::song::Song;
13use mpris_server::{
14    zbus::{zvariant::ObjectPath, Error as ZbusError},
15    LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
16};
17use tarpc::context::Context;
18use tokio::sync::{RwLock, RwLockReadGuard};
19
20pub struct Mpris {
21    daemon: RwLock<Option<MusicPlayerClient>>,
22    pub port: u16,
23    pub state: RwLock<StateAudio>,
24}
25
26impl Mpris {
27    /// Create a new Mpris instance pending a connection to a daemon.
28    #[must_use]
29    pub fn new(port: u16) -> Self {
30        Self {
31            daemon: RwLock::new(None),
32            port,
33            state: RwLock::new(StateAudio::default()),
34        }
35    }
36
37    /// Give access to the inner Daemon client (checks if the daemon is connected first).
38    pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
39        let mut maybedaemon = self.daemon.write().await;
40        if let Some(daemon) = maybedaemon.as_ref() {
41            let context = Context::current();
42            if daemon.ping(context).await.is_ok() {
43                return maybedaemon.downgrade();
44            }
45        }
46
47        // if we get here, either the daemon is not connected, or it's not responding
48        *maybedaemon = None;
49        log::info!("Lost connection to daemon, shutting down");
50        // spawn a new thread to kill the server after some delay
51        #[cfg(not(test))] // we don't want to exit the process in tests
52        std::thread::spawn(|| {
53            std::thread::sleep(Duration::from_secs(5));
54            std::process::exit(0);
55        });
56        // if let Err(e) = self.connect_with_retry().await {
57        //     log::error!("Failed to reconnect to daemon: {}", e);
58        // } else {
59        //     log::info!("Reconnected to daemon");
60        // }
61        maybedaemon.downgrade()
62    }
63
64    /// Create a new Mpris instance with a daemon already connected.
65    #[must_use]
66    pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
67        Self {
68            daemon: RwLock::new(Some(daemon)),
69            port: 0,
70            state: RwLock::new(StateAudio::default()),
71        }
72    }
73
74    /// Connect to the daemon.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the daemon cannot be connected.
79    pub async fn connect(&self) -> Result<()> {
80        if self.daemon.read().await.is_some() {
81            return Ok(());
82        }
83
84        let daemon = init_client(self.port).await.context(format!(
85            "Failed to connect to daemon on port: {}",
86            self.port
87        ))?;
88
89        *self.state.write().await = daemon
90            .state_audio(Context::current())
91            .await
92            .context(
93                "Failed to get initial state from daemon, please ensure the daemon is running",
94            )?
95            .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
96        *self.daemon.write().await = Some(daemon);
97
98        Ok(())
99    }
100
101    /// Connect to the daemon if not already connected.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the daemon cannot be connected after 5 retries.
106    pub async fn connect_with_retry(&self) -> Result<()> {
107        const MAX_RETRIES: u8 = 5;
108        const BASE_DELAY: Duration = Duration::from_secs(1);
109
110        let mut retries = 0;
111
112        while retries < MAX_RETRIES {
113            match self.connect().await {
114                Ok(()) => return Ok(()),
115                Err(e) => {
116                    retries += 1;
117                    log::warn!("Failed to connect to daemon: {}", e);
118                    tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119                }
120            }
121        }
122
123        Err(anyhow!(
124            "Failed to connect to daemon on port {} after {} retries",
125            self.port,
126            MAX_RETRIES
127        ))
128    }
129
130    /// Start the Mpris server.
131    ///
132    /// Consumes self, but you can get back a reference to it by calling `imp()` on the returned `Server`.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the server cannot be started.
137    pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
138        Server::new(bus_name_suffix, self).await
139    }
140}
141
142#[derive(Debug)]
143pub enum MessageOutcomes {
144    Nothing,
145    Signal(Signal),
146    Properties(Vec<Property>),
147    Quit,
148}
149
150/// Should be the same as the tick rate used by other clients (e.g. the TUI).
151pub const TICK_RATE: Duration = Duration::from_millis(100);
152
153#[derive(Debug)]
154pub struct Subscriber;
155
156impl Subscriber {
157    /// Main loop for the UDP subscriber.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
162    pub async fn main_loop(
163        &self,
164        server: &Server<Mpris>,
165        // kill: tokio::sync::broadcast::Receiver<()>,
166    ) -> anyhow::Result<()> {
167        let mut listener = Listener::new().await?;
168
169        let maybe_daemon = server.imp().daemon().await;
170        if let Some(daemon) = maybe_daemon.as_ref() {
171            daemon
172                .register_listener(Context::current(), listener.local_addr()?)
173                .await?;
174        } else {
175            return Err(anyhow!("Daemon not connected"));
176        }
177        drop(maybe_daemon);
178
179        let mut ticker = tokio::time::interval(TICK_RATE);
180
181        #[allow(clippy::redundant_pub_crate)]
182        loop {
183            let daemon = server.imp().daemon().await;
184            let mut state = server.imp().state.write().await;
185
186            tokio::select! {
187                Ok(message) = listener.recv() => {
188                    match self
189                        .handle_message(message, &mut state, daemon.as_ref())
190                        .await?
191                    {
192                        MessageOutcomes::Nothing => continue,
193                        MessageOutcomes::Signal(signal) => server.emit(signal).await?,
194                        MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
195                        MessageOutcomes::Quit => break,
196                    }
197                }
198                _ = ticker.tick() => {
199                    if let Some(runtime) = &mut state.runtime {
200                        runtime.seek_position += TICK_RATE;
201                        runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
202                    }
203                }
204            }
205        }
206
207        Ok(())
208    }
209
210    /// Handle a message received from the UDP socket.
211    ///
212    /// # Returns
213    ///
214    /// Either nothing, a signal, a list of changed properties, or a notification to quit.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the message cannot be handled.
219    pub async fn handle_message(
220        &self,
221        message: Message,
222        state: &mut StateAudio,
223        daemon: Option<&MusicPlayerClient>,
224    ) -> anyhow::Result<MessageOutcomes> {
225        log::info!("Received event: {:?}", message);
226        match message {
227            Message::Event(
228                Event::LibraryAnalysisFinished
229                | Event::LibraryReclusterFinished
230                | Event::LibraryRescanFinished,
231            ) => Ok(MessageOutcomes::Nothing),
232            Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
233            Message::StateChange(StateChange::Muted) => {
234                state.muted = true;
235                Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
236            }
237            Message::StateChange(StateChange::Unmuted) => {
238                state.muted = false;
239                Ok(MessageOutcomes::Properties(vec![Property::Volume(
240                    state.volume.into(),
241                )]))
242            }
243            Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
244                state.volume = new_volume;
245                Ok(MessageOutcomes::Properties(vec![Property::Volume(
246                    new_volume.into(),
247                )]))
248            }
249            // generally speaking, a lot can change when a track is changed, therefore we update the entire internal state (even if we only emit the new metadata)
250            Message::StateChange(StateChange::TrackChanged(_)) => {
251                let context = Context::current();
252                // we'll need to update the internal state with the new song (and it's duration info and such)
253                if let Some(daemon) = daemon {
254                    *state = daemon
255                        .state_audio(context)
256                        .await
257                        .context("Failed to get state from daemon")?
258                        .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
259                } else {
260                    state.current_song = None;
261                    state.runtime = None;
262                }
263
264                let metadata = metadata_from_opt_song(state.current_song.as_ref());
265                Ok(MessageOutcomes::Properties(vec![Property::Metadata(
266                    metadata,
267                )]))
268            }
269            Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
270                state.repeat_mode = new_mode;
271                Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
272                    match new_mode {
273                        RepeatMode::None => LoopStatus::None,
274                        RepeatMode::One => LoopStatus::Track,
275                        RepeatMode::All => LoopStatus::Playlist,
276                    },
277                )]))
278            }
279            Message::StateChange(StateChange::Seeked(position)) => {
280                if let Some(runtime) = &mut state.runtime {
281                    runtime.seek_position = position;
282                    runtime.seek_percent = Percent::new(
283                        position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
284                    );
285                }
286                Ok(MessageOutcomes::Signal(Signal::Seeked {
287                    position: Time::from_micros(
288                        i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
289                    ),
290                }))
291            }
292            Message::StateChange(StateChange::StatusChanged(status)) => {
293                state.status = status;
294                Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
295                    match status {
296                        Status::Stopped => PlaybackStatus::Stopped,
297                        Status::Paused => PlaybackStatus::Paused,
298                        Status::Playing => PlaybackStatus::Playing,
299                    },
300                )]))
301            }
302        }
303    }
304}
305
306#[must_use]
307pub fn metadata_from_opt_song(song: Option<&Song>) -> Metadata {
308    song.map_or_else(
309        || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
310        |song| {
311            Metadata::builder()
312                .trackid(object_path_from_thing(&song.id.clone().into()))
313                .length(Time::from_micros(
314                    i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
315                ))
316                .artist(song.artist.iter().map(ToString::to_string))
317                .album(song.album.to_string())
318                .title(song.title.to_string())
319                .build()
320        },
321    )
322}
323
324fn object_path_from_thing(thing: &mecomp_storage::db::schemas::Thing) -> ObjectPath {
325    ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
326        .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
327}
328
329#[cfg(test)]
330mod subscriber_tests {
331    use std::{num::NonZero, sync::Arc};
332
333    use super::*;
334    use mecomp_core::{audio::AudioKernelSender, config::Settings};
335    use mecomp_daemon::init_test_client_server;
336    use mecomp_storage::{
337        db::schemas::song::Song,
338        test_utils::{arb_song_case, init_test_database_with_state},
339    };
340    use mpris_server::Metadata;
341    use pretty_assertions::assert_str_eq;
342    use rstest::rstest;
343    use tempfile::TempDir;
344
345    #[rstest]
346    #[case::nothing(
347        Message::Event(Event::LibraryAnalysisFinished),
348        MessageOutcomes::Nothing
349    )]
350    #[case::nothing(
351        Message::Event(Event::LibraryReclusterFinished),
352        MessageOutcomes::Nothing
353    )]
354    #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
355    #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
356    #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
357    #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
358    #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
359    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
360    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
361    #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
362    #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
363    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
364    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
365    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
366    #[tokio::test]
367    async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
368        let tempdir = TempDir::new().unwrap();
369
370        let db = init_test_database_with_state(
371            NonZero::new(4).unwrap(),
372            |i| (arb_song_case()(), i > 1, i > 2),
373            None,
374            &tempdir,
375        )
376        .await;
377
378        let settings = Arc::new(Settings::default());
379
380        let (event_tx, _) = std::sync::mpsc::channel();
381
382        let audio_kernel = AudioKernelSender::start(event_tx);
383
384        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
385            .await
386            .unwrap();
387
388        let state = &mut StateAudio::default();
389
390        let actual = Subscriber
391            .handle_message(message, state, Some(&daemon))
392            .await
393            .unwrap();
394
395        // Since the structs from mpris_server don't implement PartialEq, we can't compare them directly, so instead we compare the Debug representations
396        assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
397    }
398}
399
400#[cfg(test)]
401pub mod test_utils {
402    use std::{num::NonZero, sync::Arc};
403
404    use super::*;
405    use mecomp_core::{audio::AudioKernelSender, config::Settings};
406    use mecomp_daemon::init_test_client_server;
407    use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
408    use rstest::fixture;
409    use surrealdb::{engine::local::Db, Surreal};
410    use tempfile::TempDir;
411
412    // Create a database with some songs, a playlist, and a collection
413    async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
414        init_test_database_with_state(
415            NonZero::new(4).unwrap(),
416            |i| (arb_song_case()(), i > 1, i > 2),
417            None,
418            tempdir,
419        )
420        .await
421    }
422
423    #[fixture]
424    pub async fn fixtures() -> (
425        Mpris,
426        std::sync::mpsc::Receiver<StateChange>,
427        TempDir,
428        Arc<AudioKernelSender>,
429    ) {
430        let tempdir = TempDir::new().unwrap();
431
432        let db = db(&tempdir).await;
433
434        let settings = Arc::new(Settings::default());
435
436        let (event_tx, event_rx) = std::sync::mpsc::channel();
437
438        let audio_kernel = AudioKernelSender::start(event_tx);
439
440        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
441            .await
442            .unwrap();
443
444        let mpris = Mpris::new_with_daemon(daemon);
445
446        (mpris, event_rx, tempdir, audio_kernel)
447    }
448}