1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{anyhow, Context as _, Result};
8
9use mecomp_core::{
10 rpc::{init_client, MusicPlayerClient},
11 state::{Percent, RepeatMode, StateAudio, Status},
12 udp::{Event, Listener, Message, StateChange},
13};
14use mecomp_storage::db::schemas::song::Song;
15use mpris_server::{
16 zbus::{zvariant::ObjectPath, Error as ZbusError},
17 LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
18};
19use tarpc::context::Context;
20use tokio::sync::{RwLock, RwLockReadGuard};
21
22pub struct Mpris {
23 daemon: RwLock<Option<MusicPlayerClient>>,
24 pub port: u16,
25 pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29 #[must_use]
31 pub fn new(port: u16) -> Self {
32 Self {
33 daemon: RwLock::new(None),
34 port,
35 state: RwLock::new(StateAudio::default()),
36 }
37 }
38
39 pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
41 let mut maybedaemon = self.daemon.write().await;
42 if let Some(daemon) = maybedaemon.as_ref() {
43 let context = Context::current();
44 if daemon.ping(context).await.is_ok() {
45 return maybedaemon.downgrade();
46 }
47 }
48
49 *maybedaemon = None;
51 log::info!("Lost connection to daemon, shutting down");
52 #[cfg(not(test))] std::thread::spawn(|| {
55 std::thread::sleep(Duration::from_secs(5));
56 std::process::exit(0);
57 });
58 maybedaemon.downgrade()
64 }
65
66 #[must_use]
68 pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
69 Self {
70 daemon: RwLock::new(Some(daemon)),
71 port: 0,
72 state: RwLock::new(StateAudio::default()),
73 }
74 }
75
76 pub async fn connect(&self) -> Result<()> {
82 if self.daemon.read().await.is_some() {
83 return Ok(());
84 }
85
86 let daemon = init_client(self.port).await.context(format!(
87 "Failed to connect to daemon on port: {}",
88 self.port
89 ))?;
90
91 *self.state.write().await = daemon
92 .state_audio(Context::current())
93 .await
94 .context(
95 "Failed to get initial state from daemon, please ensure the daemon is running",
96 )?
97 .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
98 *self.daemon.write().await = Some(daemon);
99
100 Ok(())
101 }
102
103 pub async fn connect_with_retry(&self) -> Result<()> {
109 const MAX_RETRIES: u8 = 5;
110 const BASE_DELAY: Duration = Duration::from_secs(1);
111
112 let mut retries = 0;
113
114 while retries < MAX_RETRIES {
115 match self.connect().await {
116 Ok(()) => return Ok(()),
117 Err(e) => {
118 retries += 1;
119 log::warn!("Failed to connect to daemon: {e}");
120 tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
121 }
122 }
123 }
124
125 Err(anyhow!(
126 "Failed to connect to daemon on port {} after {} retries",
127 self.port,
128 MAX_RETRIES
129 ))
130 }
131
132 pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
140 Server::new(bus_name_suffix, self).await
141 }
142}
143
144#[derive(Debug)]
145pub enum MessageOutcomes {
146 Nothing,
147 Signal(Signal),
148 Properties(Vec<Property>),
149 Quit,
150}
151
152pub const TICK_RATE: Duration = Duration::from_millis(100);
154
155#[derive(Debug)]
156pub struct Subscriber;
157
158impl Subscriber {
159 pub async fn main_loop(
165 &self,
166 server: &Server<Mpris>,
167 ) -> anyhow::Result<()> {
169 let mut listener = Listener::new().await?;
170
171 let maybe_daemon = server.imp().daemon().await;
172 if let Some(daemon) = maybe_daemon.as_ref() {
173 daemon
174 .register_listener(Context::current(), listener.local_addr()?)
175 .await?;
176 } else {
177 return Err(anyhow!("Daemon not connected"));
178 }
179 drop(maybe_daemon);
180
181 let mut ticker = tokio::time::interval(TICK_RATE);
182
183 #[allow(clippy::redundant_pub_crate)]
184 loop {
185 let daemon = server.imp().daemon().await;
186 let mut state = server.imp().state.write().await;
187
188 tokio::select! {
189 Ok(message) = listener.recv() => {
190 match self
191 .handle_message(message, &mut state, daemon.as_ref())
192 .await?
193 {
194 MessageOutcomes::Nothing => continue,
195 MessageOutcomes::Signal(signal) => server.emit(signal).await?,
196 MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
197 MessageOutcomes::Quit => break,
198 }
199 }
200 _ = ticker.tick() => {
201 if let Some(runtime) = &mut state.runtime {
202 runtime.seek_position += TICK_RATE;
203 runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
204 }
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 pub async fn handle_message(
222 &self,
223 message: Message,
224 state: &mut StateAudio,
225 daemon: Option<&MusicPlayerClient>,
226 ) -> anyhow::Result<MessageOutcomes> {
227 log::info!("Received event: {message:?}");
228 match message {
229 Message::Event(
230 Event::LibraryAnalysisFinished
231 | Event::LibraryReclusterFinished
232 | Event::LibraryRescanFinished,
233 ) => Ok(MessageOutcomes::Nothing),
234 Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
235 Message::StateChange(StateChange::Muted) => {
236 state.muted = true;
237 Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
238 }
239 Message::StateChange(StateChange::Unmuted) => {
240 state.muted = false;
241 Ok(MessageOutcomes::Properties(vec![Property::Volume(
242 state.volume.into(),
243 )]))
244 }
245 Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
246 state.volume = new_volume;
247 Ok(MessageOutcomes::Properties(vec![Property::Volume(
248 new_volume.into(),
249 )]))
250 }
251 Message::StateChange(StateChange::TrackChanged(_)) => {
253 let context = Context::current();
254 if let Some(daemon) = daemon {
256 *state = daemon
257 .state_audio(context)
258 .await
259 .context("Failed to get state from daemon")?
260 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
261 } else {
262 state.current_song = None;
263 state.runtime = None;
264 }
265
266 let metadata = metadata_from_opt_song(state.current_song.as_ref());
267 Ok(MessageOutcomes::Properties(vec![Property::Metadata(
268 metadata,
269 )]))
270 }
271 Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
272 state.repeat_mode = new_mode;
273 Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
274 match new_mode {
275 RepeatMode::None => LoopStatus::None,
276 RepeatMode::One => LoopStatus::Track,
277 RepeatMode::All => LoopStatus::Playlist,
278 },
279 )]))
280 }
281 Message::StateChange(StateChange::Seeked(position)) => {
282 if let Some(runtime) = &mut state.runtime {
283 runtime.seek_position = position;
284 runtime.seek_percent = Percent::new(
285 position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
286 );
287 }
288 Ok(MessageOutcomes::Signal(Signal::Seeked {
289 position: Time::from_micros(
290 i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
291 ),
292 }))
293 }
294 Message::StateChange(StateChange::StatusChanged(status)) => {
295 state.status = status;
296 Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
297 match status {
298 Status::Stopped => PlaybackStatus::Stopped,
299 Status::Paused => PlaybackStatus::Paused,
300 Status::Playing => PlaybackStatus::Playing,
301 },
302 )]))
303 }
304 }
305 }
306}
307
308#[must_use]
309pub fn metadata_from_opt_song(song: Option<&Song>) -> Metadata {
310 song.map_or_else(
311 || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
312 |song| {
313 Metadata::builder()
314 .trackid(object_path_from_thing(&song.id.clone().into()))
315 .length(Time::from_micros(
316 i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
317 ))
318 .artist(song.artist.iter().map(ToString::to_string))
319 .album(song.album.to_string())
320 .title(song.title.to_string())
321 .build()
322 },
323 )
324}
325
326fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath {
327 ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
328 .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
329}
330
331#[cfg(test)]
332mod subscriber_tests {
333 use std::{num::NonZero, sync::Arc};
334
335 use super::*;
336 use mecomp_core::{audio::AudioKernelSender, config::Settings};
337 use mecomp_daemon::init_test_client_server;
338 use mecomp_storage::{
339 db::schemas::song::Song,
340 test_utils::{arb_song_case, init_test_database_with_state},
341 };
342 use mpris_server::Metadata;
343 use pretty_assertions::assert_str_eq;
344 use rstest::rstest;
345 use tempfile::TempDir;
346
347 #[rstest]
348 #[case::nothing(
349 Message::Event(Event::LibraryAnalysisFinished),
350 MessageOutcomes::Nothing
351 )]
352 #[case::nothing(
353 Message::Event(Event::LibraryReclusterFinished),
354 MessageOutcomes::Nothing
355 )]
356 #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
357 #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
358 #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
359 #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
360 #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
361 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
362 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
363 #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
364 #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
365 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
366 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
367 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
368 #[tokio::test]
369 async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
370 let tempdir = TempDir::new().unwrap();
371
372 let db = init_test_database_with_state(
373 NonZero::new(4).unwrap(),
374 |i| (arb_song_case()(), i > 1, i > 2),
375 None,
376 &tempdir,
377 )
378 .await;
379
380 let settings = Arc::new(Settings::default());
381
382 let (event_tx, _) = std::sync::mpsc::channel();
383
384 let audio_kernel = AudioKernelSender::start(event_tx);
385
386 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
387 .await
388 .unwrap();
389
390 let state = &mut StateAudio::default();
391
392 let actual = Subscriber
393 .handle_message(message, state, Some(&daemon))
394 .await
395 .unwrap();
396
397 assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
399 }
400}
401
402#[cfg(test)]
403pub mod test_utils {
404 use std::{num::NonZero, sync::Arc};
405
406 use super::*;
407 use mecomp_core::{audio::AudioKernelSender, config::Settings};
408 use mecomp_daemon::init_test_client_server;
409 use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
410 use rstest::fixture;
411 use surrealdb::{engine::local::Db, Surreal};
412 use tempfile::TempDir;
413
414 async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
416 init_test_database_with_state(
417 NonZero::new(4).unwrap(),
418 |i| (arb_song_case()(), i > 1, i > 2),
419 None,
420 tempdir,
421 )
422 .await
423 }
424
425 #[fixture]
426 pub async fn fixtures() -> (
427 Mpris,
428 std::sync::mpsc::Receiver<StateChange>,
429 TempDir,
430 Arc<AudioKernelSender>,
431 ) {
432 let tempdir = TempDir::new().unwrap();
433
434 let db = db(&tempdir).await;
435
436 let settings = Arc::new(Settings::default());
437
438 let (event_tx, event_rx) = std::sync::mpsc::channel();
439
440 let audio_kernel = AudioKernelSender::start(event_tx);
441
442 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
443 .await
444 .unwrap();
445
446 let mpris = Mpris::new_with_daemon(daemon);
447
448 (mpris, event_rx, tempdir, audio_kernel)
449 }
450}