mecomp_mpris/
lib.rs

1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10    state::{Percent, RepeatMode, StateAudio, Status},
11    udp::{Event, Listener, Message, StateChange},
12};
13use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
14use mecomp_storage::db::schemas::song::SongBrief;
15use mpris_server::{
16    LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17    zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tokio::sync::RwLock;
20
21#[derive(Debug)]
22pub struct Mpris {
23    pub daemon: MusicPlayerClient,
24    pub port: u16,
25    pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29    /// Create a new Mpris instance with a daemon already connected.
30    #[must_use]
31    pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
32        Self {
33            daemon,
34            port: 0,
35            state: RwLock::new(StateAudio::default()),
36        }
37    }
38
39    /// Update the state from the daemon.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the state cannot be retrieved from the daemon.
44    pub async fn update_state(&self) -> Result<()> {
45        let new_state = self
46            .daemon
47            .clone()
48            .state_audio(())
49            .await
50            .context("Failed to get state from daemon")?
51            .into_inner()
52            .state
53            .ok_or_else(|| anyhow!("Failed to get state from daemon"))?
54            .into();
55
56        *self.state.write().await = new_state;
57
58        Ok(())
59    }
60
61    /// Start the Mpris server.
62    ///
63    /// Consumes self, but you can get back a reference to it by calling `imp()` on the returned `Server`.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the server cannot be started.
68    pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
69        Server::new(bus_name_suffix, self).await
70    }
71}
72
73#[derive(Debug)]
74pub enum MessageOutcomes {
75    Nothing,
76    Signal(Signal),
77    Properties(Vec<Property>),
78    Quit,
79}
80
81/// Should be the same as the tick rate used by other clients (e.g. the TUI).
82pub const TICK_RATE: Duration = Duration::from_millis(100);
83
84#[derive(Debug)]
85pub struct Subscriber;
86
87impl Subscriber {
88    /// Main loop for the UDP subscriber.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
93    pub async fn main_loop(
94        &self,
95        server: &Server<Mpris>,
96        // kill: tokio::sync::broadcast::Receiver<()>,
97    ) -> anyhow::Result<()> {
98        let mut listener = Listener::new().await?;
99
100        server
101            .imp()
102            .daemon
103            .clone()
104            .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
105            .await?;
106
107        let mut ticker = tokio::time::interval(TICK_RATE);
108        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
109
110        #[allow(clippy::redundant_pub_crate)]
111        loop {
112            let mut state = server.imp().state.write().await;
113
114            tokio::select! {
115                Ok(message) = listener.recv() => {
116                    match self
117                        .handle_message(message, &mut state, server.imp().daemon.clone())
118                        .await?
119                    {
120                        MessageOutcomes::Nothing => continue,
121                        MessageOutcomes::Signal(signal) => server.emit(signal).await?,
122                        MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
123                        MessageOutcomes::Quit => break,
124                    }
125                }
126                _ = ticker.tick() => {
127                    if state.paused() {
128                        continue;
129                    }
130                    if let Some(runtime) = &mut state.runtime {
131                        runtime.seek_position += TICK_RATE;
132                        runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
133                    }
134                }
135            }
136        }
137
138        Ok(())
139    }
140
141    /// Handle a message received from the UDP socket.
142    ///
143    /// Takes a closure that it can use to get a reference to the daemon client when needed.
144    ///
145    /// # Returns
146    ///
147    /// Either nothing, a signal, a list of changed properties, or a notification to quit.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the message cannot be handled.
152    pub async fn handle_message(
153        &self,
154        message: Message,
155        state: &mut StateAudio,
156        daemon: MusicPlayerClient,
157    ) -> anyhow::Result<MessageOutcomes> {
158        log::info!("Received event: {message:?}");
159        match message {
160            Message::Event(
161                Event::LibraryAnalysisFinished
162                | Event::LibraryReclusterFinished
163                | Event::LibraryRescanFinished,
164            ) => Ok(MessageOutcomes::Nothing),
165            Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
166            Message::StateChange(StateChange::Muted) => {
167                state.muted = true;
168                Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
169            }
170            Message::StateChange(StateChange::Unmuted) => {
171                state.muted = false;
172                Ok(MessageOutcomes::Properties(vec![Property::Volume(
173                    state.volume.into(),
174                )]))
175            }
176            Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
177                state.volume = new_volume;
178                Ok(MessageOutcomes::Properties(vec![Property::Volume(
179                    new_volume.into(),
180                )]))
181            }
182            // generally speaking, a lot can change when a track is changed, therefore we update the entire internal state (even if we only emit the new metadata)
183            Message::StateChange(StateChange::TrackChanged(_) | StateChange::QueueChanged) => {
184                // we'll need to update the internal state with the new song (and it's duration info and such)
185                *state = daemon
186                    .clone()
187                    .state_audio(())
188                    .await
189                    .context("Failed to get state from daemon")?
190                    .into_inner()
191                    .state
192                    .ok_or_else(|| anyhow!("Failed to get state from daemon"))?
193                    .into();
194
195                let metadata = metadata_from_opt_song(state.current_song.as_ref());
196                Ok(MessageOutcomes::Properties(vec![Property::Metadata(
197                    metadata,
198                )]))
199            }
200            Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
201                state.repeat_mode = new_mode;
202                Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
203                    match new_mode {
204                        RepeatMode::None => LoopStatus::None,
205                        RepeatMode::One => LoopStatus::Track,
206                        RepeatMode::All => LoopStatus::Playlist,
207                    },
208                )]))
209            }
210            Message::StateChange(StateChange::Seeked(position)) => {
211                if let Some(runtime) = &mut state.runtime {
212                    runtime.seek_position = position;
213                    runtime.seek_percent = Percent::new(
214                        position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
215                    );
216                }
217                Ok(MessageOutcomes::Signal(Signal::Seeked {
218                    position: Time::from_micros(
219                        i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
220                    ),
221                }))
222            }
223            Message::StateChange(StateChange::StatusChanged(status)) => {
224                state.status = status;
225                Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
226                    match status {
227                        Status::Stopped => PlaybackStatus::Stopped,
228                        Status::Paused => PlaybackStatus::Paused,
229                        Status::Playing => PlaybackStatus::Playing,
230                    },
231                )]))
232            }
233        }
234    }
235}
236
237#[must_use]
238pub fn metadata_from_opt_song(song: Option<&SongBrief>) -> Metadata {
239    song.map_or_else(
240        || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
241        |song| {
242            Metadata::builder()
243                .trackid(object_path_from_thing(&song.id.clone().into()))
244                .length(Time::from_micros(
245                    i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
246                ))
247                .artist(song.artist.as_slice())
248                .album(&song.album)
249                .title(&song.title)
250                .build()
251        },
252    )
253}
254
255fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath<'_> {
256    ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
257        .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
258}
259
260#[cfg(test)]
261mod subscriber_tests {
262    use std::{num::NonZero, sync::Arc};
263
264    use super::*;
265    use mecomp_core::{audio::AudioKernelSender, config::Settings};
266    use mecomp_daemon::init_test_client_server;
267    use mecomp_storage::{
268        db::schemas::song::Song,
269        test_utils::{arb_song_case, init_test_database_with_state},
270    };
271    use mpris_server::Metadata;
272    use pretty_assertions::assert_str_eq;
273    use rstest::rstest;
274    use tempfile::TempDir;
275
276    #[rstest]
277    #[case::nothing(
278        Message::Event(Event::LibraryAnalysisFinished),
279        MessageOutcomes::Nothing
280    )]
281    #[case::nothing(
282        Message::Event(Event::LibraryReclusterFinished),
283        MessageOutcomes::Nothing
284    )]
285    #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
286    #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
287    #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
288    #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
289    #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
290    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
291    #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
292    #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
293    #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
294    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
295    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
296    #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
297    #[tokio::test]
298    async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
299        let tempdir = TempDir::new().unwrap();
300
301        let db = init_test_database_with_state(
302            NonZero::new(4).unwrap(),
303            |i| (arb_song_case()(), i > 1, i > 2),
304            None,
305            &tempdir,
306        )
307        .await;
308
309        let settings = Arc::new(Settings::default());
310
311        let (event_tx, _) = std::sync::mpsc::channel();
312
313        let audio_kernel = AudioKernelSender::start(event_tx);
314
315        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
316            .await
317            .unwrap();
318
319        let state = &mut StateAudio::default();
320
321        let actual = Subscriber
322            .handle_message(message, state, daemon.clone())
323            .await
324            .unwrap();
325
326        // Since the structs from mpris_server don't implement PartialEq, we can't compare them directly, so instead we compare the Debug representations
327        assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
328    }
329}
330
331#[cfg(test)]
332pub mod test_utils {
333    use std::{num::NonZero, sync::Arc};
334
335    use super::*;
336    use mecomp_core::{audio::AudioKernelSender, config::Settings};
337    use mecomp_daemon::init_test_client_server;
338    use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
339    use rstest::fixture;
340    use surrealdb::{Surreal, engine::local::Db};
341    use tempfile::TempDir;
342
343    // Create a database with some songs, a playlist, and a collection
344    async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
345        init_test_database_with_state(
346            NonZero::new(4).unwrap(),
347            |i| (arb_song_case()(), i > 1, i > 2),
348            None,
349            tempdir,
350        )
351        .await
352    }
353
354    #[fixture]
355    pub async fn fixtures() -> (
356        Mpris,
357        std::sync::mpsc::Receiver<StateChange>,
358        TempDir,
359        Arc<AudioKernelSender>,
360    ) {
361        let tempdir = TempDir::new().unwrap();
362
363        let db = db(&tempdir).await;
364
365        let settings = Arc::new(Settings::default());
366
367        let (event_tx, event_rx) = std::sync::mpsc::channel();
368
369        let audio_kernel = AudioKernelSender::start(event_tx);
370
371        let daemon = init_test_client_server(db, settings, audio_kernel.clone())
372            .await
373            .unwrap();
374
375        let mpris = Mpris::new_with_daemon(daemon);
376
377        (mpris, event_rx, tempdir, audio_kernel)
378    }
379}