mecomp_mpris/
lib.rs

1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10    rpc::{MusicPlayerClient, init_client},
11    state::{Percent, RepeatMode, StateAudio, Status},
12    udp::{Event, Listener, Message, StateChange},
13};
14use mecomp_storage::db::schemas::song::SongBrief;
15use mpris_server::{
16    LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17    zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tarpc::context::Context;
20use tokio::sync::{RwLock, RwLockReadGuard};
21
22pub struct Mpris {
23    daemon: RwLock<Option<MusicPlayerClient>>,
24    pub port: u16,
25    pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29    /// Create a new Mpris instance pending a connection to a daemon.
30    #[must_use]
31    pub fn new(port: u16) -> Self {
32        Self {
33            daemon: RwLock::new(None),
34            port,
35            state: RwLock::new(StateAudio::default()),
36        }
37    }
38
39    /// Give access to the inner Daemon client (checks if the daemon is connected first).
40    pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
41        let mut maybedaemon = self.daemon.write().await;
42        if let Some(daemon) = maybedaemon.as_ref() {
43            let context = Context::current();
44            if daemon.ping(context).await.is_ok() {
45                return maybedaemon.downgrade();
46            }
47        }
48
49        // if we get here, either the daemon is not connected, or it's not responding
50        *maybedaemon = None;
51        log::info!("Lost connection to daemon, shutting down");
52        // spawn a new thread to kill the server after some delay
53        #[cfg(not(test))] // we don't want to exit the process in tests
54        std::thread::spawn(|| {
55            std::thread::sleep(Duration::from_secs(5));
56            std::process::exit(0);
57        });
58        // if let Err(e) = self.connect_with_retry().await {
59        //     log::error!("Failed to reconnect to daemon: {}", e);
60        // } else {
61        //     log::info!("Reconnected to daemon");
62        // }
63        maybedaemon.downgrade()
64    }
65
66    /// Create a new Mpris instance with a daemon already connected.
67    #[must_use]
68    pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
69        Self {
70            daemon: RwLock::new(Some(daemon)),
71            port: 0,
72            state: RwLock::new(StateAudio::default()),
73        }
74    }
75
76    /// Connect to the daemon.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the daemon cannot be connected.
81    pub async fn connect(&self) -> Result<()> {
82        if self.daemon.read().await.is_some() {
83            return Ok(());
84        }
85
86        let daemon = init_client(self.port).await.context(format!(
87            "Failed to connect to daemon on port: {}",
88            self.port
89        ))?;
90
91        *self.state.write().await = daemon
92            .state_audio(Context::current())
93            .await
94            .context(
95                "Failed to get initial state from daemon, please ensure the daemon is running",
96            )?
97            .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
98        *self.daemon.write().await = Some(daemon);
99
100        Ok(())
101    }
102
103    /// Connect to the daemon if not already connected.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the daemon cannot be connected after 5 retries.
108    pub async fn connect_with_retry(&self) -> Result<()> {
109        const MAX_RETRIES: u8 = 5;
110        const BASE_DELAY: Duration = Duration::from_secs(1);
111
112        let mut retries = 0;
113
114        while retries < MAX_RETRIES {
115            if let Err(e) = self.connect().await {
116                retries += 1;
117                log::warn!("Failed to connect to daemon: {e}");
118                tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119            } else {
120                return Ok(());
121            }
122        }
123
124        Err(anyhow!(
125            "Failed to connect to daemon on port {} after {} retries",
126            self.port,
127            MAX_RETRIES
128        ))
129    }
130
131    /// Start the Mpris server.
132    ///
133    /// Consumes self, but you can get back a reference to it by calling `imp()` on the returned `Server`.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the server cannot be started.
138    pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
139        Server::new(bus_name_suffix, self).await
140    }
141}
142
143#[derive(Debug)]
144pub enum MessageOutcomes {
145    Nothing,
146    Signal(Signal),
147    Properties(Vec<Property>),
148    Quit,
149}
150
151/// Should be the same as the tick rate used by other clients (e.g. the TUI).
152pub const TICK_RATE: Duration = Duration::from_millis(100);
153
154#[derive(Debug)]
155pub struct Subscriber;
156
157impl Subscriber {
158    /// Main loop for the UDP subscriber.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
163    pub async fn main_loop(
164        &self,
165        server: &Server<Mpris>,
166        // kill: tokio::sync::broadcast::Receiver<()>,
167    ) -> anyhow::Result<()> {
168        let mut listener = Listener::new().await?;
169
170        let maybe_daemon = server.imp().daemon().await;
171        if let Some(daemon) = maybe_daemon.as_ref() {
172            daemon
173                .register_listener(Context::current(), listener.local_addr()?)
174                .await?;
175        } else {
176            return Err(anyhow!("Daemon not connected"));
177        }
178        drop(maybe_daemon);
179
180        let mut ticker = tokio::time::interval(TICK_RATE);
181        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
182
183        #[allow(clippy::redundant_pub_crate)]
184        loop {
185            let daemon = server.imp().daemon().await;
186            let mut state = server.imp().state.write().await;
187
188            tokio::select! {
189                Ok(message) = listener.recv() => {
190                    match self
191                        .handle_message(message, &mut state, daemon.as_ref())
192                        .await?
193                    {
194                        MessageOutcomes::Nothing => continue,
195                        MessageOutcomes::Signal(signal) => server.emit(signal).await?,
196                        MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
197                        MessageOutcomes::Quit => break,
198                    }
199                }
200                _ = ticker.tick() => {
201                    if let Some(runtime) = &mut state.runtime {
202                        runtime.seek_position += TICK_RATE;
203                        runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
204                    }
205                }
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Handle a message received from the UDP socket.
213    ///
214    /// # Returns
215    ///
216    /// Either nothing, a signal, a list of changed properties, or a notification to quit.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the message cannot be handled.
221    pub async fn handle_message(
222        &self,
223        message: Message,
224        state: &mut StateAudio,
225        daemon: Option<&MusicPlayerClient>,
226    ) -> anyhow::Result<MessageOutcomes> {
227        log::info!("Received event: {message:?}");
228        match message {
229            Message::Event(
230                Event::LibraryAnalysisFinished
231                | Event::LibraryReclusterFinished
232                | Event::LibraryRescanFinished,
233            ) => Ok(MessageOutcomes::Nothing),
234            Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
235            Message::StateChange(StateChange::Muted) => {
236                state.muted = true;
237                Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
238            }
239            Message::StateChange(StateChange::Unmuted) => {
240                state.muted = false;
241                Ok(MessageOutcomes::Properties(vec![Property::Volume(
242                    state.volume.into(),
243                )]))
244            }
245            Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
246                state.volume = new_volume;
247                Ok(MessageOutcomes::Properties(vec![Property::Volume(
248                    new_volume.into(),
249                )]))
250            }
251            // generally speaking, a lot can change when a track is changed, therefore we update the entire internal state (even if we only emit the new metadata)
252            Message::StateChange(StateChange::TrackChanged(_)) => {
253                let context = Context::current();
254                // we'll need to update the internal state with the new song (and it's duration info and such)
255                if let Some(daemon) = daemon {
256                    *state = daemon
257                        .state_audio(context)
258                        .await
259                        .context("Failed to get state from daemon")?
260                        .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
261                } else {
262                    state.current_song = None;
263                    state.runtime = None;
264                }
265
266                let metadata = metadata_from_opt_song(state.current_song.as_ref());
267                Ok(MessageOutcomes::Properties(vec![Property::Metadata(
268                    metadata,
269                )]))
270            }
271            Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
272                state.repeat_mode = new_mode;
273                Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
274                    match new_mode {
275                        RepeatMode::None => LoopStatus::None,
276                        RepeatMode::One => LoopStatus::Track,
277                        RepeatMode::All => LoopStatus::Playlist,
278                    },
279                )]))
280            }
281            Message::StateChange(StateChange::Seeked(position)) => {
282                if let Some(runtime) = &mut state.runtime {
283                    runtime.seek_position = position;
284                    runtime.seek_percent = Percent::new(
285                        position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
286                    );
287                }
288                Ok(MessageOutcomes::Signal(Signal::Seeked {
289                    position: Time::from_micros(
290                        i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
291                    ),
292                }))
293            }
294            Message::StateChange(StateChange::StatusChanged(status)) => {
295                state.status = status;
296                Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
297                    match status {
298                        Status::Stopped => PlaybackStatus::Stopped,
299                        Status::Paused => PlaybackStatus::Paused,
300                        Status::Playing => PlaybackStatus::Playing,
301                    },
302                )]))
303            }
304        }
305    }
306}
307
308#[must_use]
309pub fn metadata_from_opt_song(song: Option<&SongBrief>) -> Metadata {
310    song.map_or_else(
311        || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
312        |song| {
313            Metadata::builder()
314                .trackid(object_path_from_thing(&song.id.clone().into()))
315                .length(Time::from_micros(
316                    i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
317                ))
318                .artist(song.artist.iter().map(ToString::to_string))
319                .album(song.album.to_string())
320                .title(song.title.to_string())
321                .build()
322        },
323    )
324}
325
326fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath {
327    ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
328        .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
329}
330
331#[cfg(test)]
332mod subscriber_tests {
333    use std::{num::NonZero, sync::Arc};
334
335    use super::*;
336    use mecomp_core::{audio::AudioKernelSender, config::Settings};
337    use mecomp_daemon::init_test_client_server;
338    use mecomp_storage::{
339        db::schemas::song::Song,
340        test_utils::{arb_song_case, init_test_database_with_state},
341    };
342    use mpris_server::Metadata;
343    use pretty_assertions::assert_str_eq;
344    use rstest::rstest;
345    use tempfile::TempDir;
346
347    #[rstest]
348    #[case::nothing(
349        Message::Event(Event::LibraryAnalysisFinished),
350        MessageOutcomes::Nothing
351    )]
352    #[case::nothing(
353        Message::Event(Event::LibraryReclusterFinished),
354        MessageOutcomes::Nothing
355    )]
356    #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
357    #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
358    #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
359    #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
360    #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
361    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
362    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
363    #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
364    #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
365    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
366    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
367    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
368    #[tokio::test]
369    async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
370        let tempdir = TempDir::new().unwrap();
371
372        let db = init_test_database_with_state(
373            NonZero::new(4).unwrap(),
374            |i| (arb_song_case()(), i > 1, i > 2),
375            None,
376            &tempdir,
377        )
378        .await;
379
380        let settings = Arc::new(Settings::default());
381
382        let (event_tx, _) = std::sync::mpsc::channel();
383
384        let audio_kernel = AudioKernelSender::start(event_tx);
385
386        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
387            .await
388            .unwrap();
389
390        let state = &mut StateAudio::default();
391
392        let actual = Subscriber
393            .handle_message(message, state, Some(&daemon))
394            .await
395            .unwrap();
396
397        // Since the structs from mpris_server don't implement PartialEq, we can't compare them directly, so instead we compare the Debug representations
398        assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
399    }
400}
401
402#[cfg(test)]
403pub mod test_utils {
404    use std::{num::NonZero, sync::Arc};
405
406    use super::*;
407    use mecomp_core::{audio::AudioKernelSender, config::Settings};
408    use mecomp_daemon::init_test_client_server;
409    use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
410    use rstest::fixture;
411    use surrealdb::{Surreal, engine::local::Db};
412    use tempfile::TempDir;
413
414    // Create a database with some songs, a playlist, and a collection
415    async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
416        init_test_database_with_state(
417            NonZero::new(4).unwrap(),
418            |i| (arb_song_case()(), i > 1, i > 2),
419            None,
420            tempdir,
421        )
422        .await
423    }
424
425    #[fixture]
426    pub async fn fixtures() -> (
427        Mpris,
428        std::sync::mpsc::Receiver<StateChange>,
429        TempDir,
430        Arc<AudioKernelSender>,
431    ) {
432        let tempdir = TempDir::new().unwrap();
433
434        let db = db(&tempdir).await;
435
436        let settings = Arc::new(Settings::default());
437
438        let (event_tx, event_rx) = std::sync::mpsc::channel();
439
440        let audio_kernel = AudioKernelSender::start(event_tx);
441
442        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
443            .await
444            .unwrap();
445
446        let mpris = Mpris::new_with_daemon(daemon);
447
448        (mpris, event_rx, tempdir, audio_kernel)
449    }
450}