1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10 rpc::{MusicPlayerClient, init_client},
11 state::{Percent, RepeatMode, StateAudio, Status},
12 udp::{Event, Listener, Message, StateChange},
13};
14use mecomp_storage::db::schemas::song::SongBrief;
15use mpris_server::{
16 LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17 zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tarpc::context::Context;
20use tokio::sync::{RwLock, RwLockReadGuard};
21
22pub struct Mpris {
23 daemon: RwLock<Option<MusicPlayerClient>>,
24 pub port: u16,
25 pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29 #[must_use]
31 pub fn new(port: u16) -> Self {
32 Self {
33 daemon: RwLock::new(None),
34 port,
35 state: RwLock::new(StateAudio::default()),
36 }
37 }
38
39 pub async fn daemon(&self) -> RwLockReadGuard<'_, Option<MusicPlayerClient>> {
41 let mut maybedaemon = self.daemon.write().await;
42 if let Some(daemon) = maybedaemon.as_ref() {
43 let context = Context::current();
44 if daemon.ping(context).await.is_ok() {
45 return maybedaemon.downgrade();
46 }
47 }
48
49 *maybedaemon = None;
51 log::info!("Lost connection to daemon, shutting down");
52 #[cfg(not(test))] std::thread::spawn(|| {
55 std::thread::sleep(Duration::from_secs(5));
56 std::process::exit(0);
57 });
58 maybedaemon.downgrade()
64 }
65
66 #[must_use]
68 pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
69 Self {
70 daemon: RwLock::new(Some(daemon)),
71 port: 0,
72 state: RwLock::new(StateAudio::default()),
73 }
74 }
75
76 pub async fn connect(&self) -> Result<()> {
82 if self.daemon.read().await.is_some() {
83 return Ok(());
84 }
85
86 let daemon = init_client(self.port).await.context(format!(
87 "Failed to connect to daemon on port: {}",
88 self.port
89 ))?;
90
91 *self.state.write().await = daemon
92 .state_audio(Context::current())
93 .await
94 .context(
95 "Failed to get initial state from daemon, please ensure the daemon is running",
96 )?
97 .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
98 *self.daemon.write().await = Some(daemon);
99
100 Ok(())
101 }
102
103 pub async fn connect_with_retry(&self) -> Result<()> {
109 const MAX_RETRIES: u8 = 5;
110 const BASE_DELAY: Duration = Duration::from_secs(1);
111
112 let mut retries = 0;
113
114 while retries < MAX_RETRIES {
115 if let Err(e) = self.connect().await {
116 retries += 1;
117 log::warn!("Failed to connect to daemon: {e}");
118 tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119 } else {
120 return Ok(());
121 }
122 }
123
124 Err(anyhow!(
125 "Failed to connect to daemon on port {} after {} retries",
126 self.port,
127 MAX_RETRIES
128 ))
129 }
130
131 pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
139 Server::new(bus_name_suffix, self).await
140 }
141}
142
143#[derive(Debug)]
144pub enum MessageOutcomes {
145 Nothing,
146 Signal(Signal),
147 Properties(Vec<Property>),
148 Quit,
149}
150
151pub const TICK_RATE: Duration = Duration::from_millis(100);
153
154#[derive(Debug)]
155pub struct Subscriber;
156
157impl Subscriber {
158 pub async fn main_loop(
164 &self,
165 server: &Server<Mpris>,
166 ) -> anyhow::Result<()> {
168 let mut listener = Listener::new().await?;
169
170 let maybe_daemon = server.imp().daemon().await;
171 if let Some(daemon) = maybe_daemon.as_ref() {
172 daemon
173 .register_listener(Context::current(), listener.local_addr()?)
174 .await?;
175 } else {
176 return Err(anyhow!("Daemon not connected"));
177 }
178 drop(maybe_daemon);
179
180 let mut ticker = tokio::time::interval(TICK_RATE);
181 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
182
183 #[allow(clippy::redundant_pub_crate)]
184 loop {
185 let mut state = server.imp().state.write().await;
186
187 tokio::select! {
188 Ok(message) = listener.recv() => {
189 match self
190 .handle_message(message, &mut state, || async { server.imp().daemon().await.clone() })
191 .await?
192 {
193 MessageOutcomes::Nothing => continue,
194 MessageOutcomes::Signal(signal) => server.emit(signal).await?,
195 MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
196 MessageOutcomes::Quit => break,
197 }
198 }
199 _ = ticker.tick() => {
200 if state.paused() {
201 continue;
202 }
203 if let Some(runtime) = &mut state.runtime {
204 runtime.seek_position += TICK_RATE;
205 runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
206 }
207 }
208 }
209 }
210
211 Ok(())
212 }
213
214 pub async fn handle_message<D>(
226 &self,
227 message: Message,
228 state: &mut StateAudio,
229 get_daemon: D,
230 ) -> anyhow::Result<MessageOutcomes>
231 where
232 D: AsyncFnOnce() -> Option<MusicPlayerClient>,
233 {
234 log::info!("Received event: {message:?}");
235 match message {
236 Message::Event(
237 Event::LibraryAnalysisFinished
238 | Event::LibraryReclusterFinished
239 | Event::LibraryRescanFinished,
240 ) => Ok(MessageOutcomes::Nothing),
241 Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
242 Message::StateChange(StateChange::Muted) => {
243 state.muted = true;
244 Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
245 }
246 Message::StateChange(StateChange::Unmuted) => {
247 state.muted = false;
248 Ok(MessageOutcomes::Properties(vec![Property::Volume(
249 state.volume.into(),
250 )]))
251 }
252 Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
253 state.volume = new_volume;
254 Ok(MessageOutcomes::Properties(vec![Property::Volume(
255 new_volume.into(),
256 )]))
257 }
258 Message::StateChange(StateChange::TrackChanged(_) | StateChange::QueueChanged) => {
260 let context = Context::current();
261 if let Some(daemon) = get_daemon().await.as_ref() {
263 *state = daemon
264 .state_audio(context)
265 .await
266 .context("Failed to get state from daemon")?
267 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
268 } else {
269 state.current_song = None;
270 state.runtime = None;
271 }
272
273 let metadata = metadata_from_opt_song(state.current_song.as_ref());
274 Ok(MessageOutcomes::Properties(vec![Property::Metadata(
275 metadata,
276 )]))
277 }
278 Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
279 state.repeat_mode = new_mode;
280 Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
281 match new_mode {
282 RepeatMode::None => LoopStatus::None,
283 RepeatMode::One => LoopStatus::Track,
284 RepeatMode::All => LoopStatus::Playlist,
285 },
286 )]))
287 }
288 Message::StateChange(StateChange::Seeked(position)) => {
289 if let Some(runtime) = &mut state.runtime {
290 runtime.seek_position = position;
291 runtime.seek_percent = Percent::new(
292 position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
293 );
294 }
295 Ok(MessageOutcomes::Signal(Signal::Seeked {
296 position: Time::from_micros(
297 i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
298 ),
299 }))
300 }
301 Message::StateChange(StateChange::StatusChanged(status)) => {
302 state.status = status;
303 Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
304 match status {
305 Status::Stopped => PlaybackStatus::Stopped,
306 Status::Paused => PlaybackStatus::Paused,
307 Status::Playing => PlaybackStatus::Playing,
308 },
309 )]))
310 }
311 }
312 }
313}
314
315#[must_use]
316pub fn metadata_from_opt_song(song: Option<&SongBrief>) -> Metadata {
317 song.map_or_else(
318 || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
319 |song| {
320 Metadata::builder()
321 .trackid(object_path_from_thing(&song.id.clone().into()))
322 .length(Time::from_micros(
323 i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
324 ))
325 .artist(song.artist.iter().map(ToString::to_string))
326 .album(song.album.to_string())
327 .title(song.title.to_string())
328 .build()
329 },
330 )
331}
332
333fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath<'_> {
334 ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
335 .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
336}
337
338#[cfg(test)]
339mod subscriber_tests {
340 use std::{num::NonZero, sync::Arc};
341
342 use super::*;
343 use mecomp_core::{audio::AudioKernelSender, config::Settings};
344 use mecomp_daemon::init_test_client_server;
345 use mecomp_storage::{
346 db::schemas::song::Song,
347 test_utils::{arb_song_case, init_test_database_with_state},
348 };
349 use mpris_server::Metadata;
350 use pretty_assertions::assert_str_eq;
351 use rstest::rstest;
352 use tempfile::TempDir;
353
354 #[rstest]
355 #[case::nothing(
356 Message::Event(Event::LibraryAnalysisFinished),
357 MessageOutcomes::Nothing
358 )]
359 #[case::nothing(
360 Message::Event(Event::LibraryReclusterFinished),
361 MessageOutcomes::Nothing
362 )]
363 #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
364 #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
365 #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
366 #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
367 #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
368 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
369 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
370 #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
371 #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
372 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
373 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
374 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
375 #[tokio::test]
376 async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
377 let tempdir = TempDir::new().unwrap();
378
379 let db = init_test_database_with_state(
380 NonZero::new(4).unwrap(),
381 |i| (arb_song_case()(), i > 1, i > 2),
382 None,
383 &tempdir,
384 )
385 .await;
386
387 let settings = Arc::new(Settings::default());
388
389 let (event_tx, _) = std::sync::mpsc::channel();
390
391 let audio_kernel = AudioKernelSender::start(event_tx);
392
393 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
394 .await
395 .unwrap();
396
397 let state = &mut StateAudio::default();
398
399 let actual = Subscriber
400 .handle_message(message, state, || async { Some(daemon) })
401 .await
402 .unwrap();
403
404 assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
406 }
407}
408
409#[cfg(test)]
410pub mod test_utils {
411 use std::{num::NonZero, sync::Arc};
412
413 use super::*;
414 use mecomp_core::{audio::AudioKernelSender, config::Settings};
415 use mecomp_daemon::init_test_client_server;
416 use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
417 use rstest::fixture;
418 use surrealdb::{Surreal, engine::local::Db};
419 use tempfile::TempDir;
420
421 async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
423 init_test_database_with_state(
424 NonZero::new(4).unwrap(),
425 |i| (arb_song_case()(), i > 1, i > 2),
426 None,
427 tempdir,
428 )
429 .await
430 }
431
432 #[fixture]
433 pub async fn fixtures() -> (
434 Mpris,
435 std::sync::mpsc::Receiver<StateChange>,
436 TempDir,
437 Arc<AudioKernelSender>,
438 ) {
439 let tempdir = TempDir::new().unwrap();
440
441 let db = db(&tempdir).await;
442
443 let settings = Arc::new(Settings::default());
444
445 let (event_tx, event_rx) = std::sync::mpsc::channel();
446
447 let audio_kernel = AudioKernelSender::start(event_tx);
448
449 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
450 .await
451 .unwrap();
452
453 let mpris = Mpris::new_with_daemon(daemon);
454
455 (mpris, event_rx, tempdir, audio_kernel)
456 }
457}