1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10 rpc::{MusicPlayerClient, init_client},
11 state::{Percent, RepeatMode, StateAudio, Status},
12 udp::{Event, Listener, Message, StateChange},
13};
14use mecomp_storage::db::schemas::song::Song;
15use mpris_server::{
16 LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17 zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tarpc::context::Context;
20use tokio::sync::{RwLock, RwLockReadGuard};
21
22pub struct Mpris {
23 daemon: RwLock<Option<MusicPlayerClient>>,
24 pub port: u16,
25 pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29 #[must_use]
31 pub fn new(port: u16) -> Self {
32 Self {
33 daemon: RwLock::new(None),
34 port,
35 state: RwLock::new(StateAudio::default()),
36 }
37 }
38
39 pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
41 let mut maybedaemon = self.daemon.write().await;
42 if let Some(daemon) = maybedaemon.as_ref() {
43 let context = Context::current();
44 if daemon.ping(context).await.is_ok() {
45 return maybedaemon.downgrade();
46 }
47 }
48
49 *maybedaemon = None;
51 log::info!("Lost connection to daemon, shutting down");
52 #[cfg(not(test))] std::thread::spawn(|| {
55 std::thread::sleep(Duration::from_secs(5));
56 std::process::exit(0);
57 });
58 maybedaemon.downgrade()
64 }
65
66 #[must_use]
68 pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
69 Self {
70 daemon: RwLock::new(Some(daemon)),
71 port: 0,
72 state: RwLock::new(StateAudio::default()),
73 }
74 }
75
76 pub async fn connect(&self) -> Result<()> {
82 if self.daemon.read().await.is_some() {
83 return Ok(());
84 }
85
86 let daemon = init_client(self.port).await.context(format!(
87 "Failed to connect to daemon on port: {}",
88 self.port
89 ))?;
90
91 *self.state.write().await = daemon
92 .state_audio(Context::current())
93 .await
94 .context(
95 "Failed to get initial state from daemon, please ensure the daemon is running",
96 )?
97 .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
98 *self.daemon.write().await = Some(daemon);
99
100 Ok(())
101 }
102
103 pub async fn connect_with_retry(&self) -> Result<()> {
109 const MAX_RETRIES: u8 = 5;
110 const BASE_DELAY: Duration = Duration::from_secs(1);
111
112 let mut retries = 0;
113
114 while retries < MAX_RETRIES {
115 if let Err(e) = self.connect().await {
116 retries += 1;
117 log::warn!("Failed to connect to daemon: {e}");
118 tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119 } else {
120 return Ok(());
121 }
122 }
123
124 Err(anyhow!(
125 "Failed to connect to daemon on port {} after {} retries",
126 self.port,
127 MAX_RETRIES
128 ))
129 }
130
131 pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
139 Server::new(bus_name_suffix, self).await
140 }
141}
142
143#[derive(Debug)]
144pub enum MessageOutcomes {
145 Nothing,
146 Signal(Signal),
147 Properties(Vec<Property>),
148 Quit,
149}
150
151pub const TICK_RATE: Duration = Duration::from_millis(100);
153
154#[derive(Debug)]
155pub struct Subscriber;
156
157impl Subscriber {
158 pub async fn main_loop(
164 &self,
165 server: &Server<Mpris>,
166 ) -> anyhow::Result<()> {
168 let mut listener = Listener::new().await?;
169
170 let maybe_daemon = server.imp().daemon().await;
171 if let Some(daemon) = maybe_daemon.as_ref() {
172 daemon
173 .register_listener(Context::current(), listener.local_addr()?)
174 .await?;
175 } else {
176 return Err(anyhow!("Daemon not connected"));
177 }
178 drop(maybe_daemon);
179
180 let mut ticker = tokio::time::interval(TICK_RATE);
181
182 #[allow(clippy::redundant_pub_crate)]
183 loop {
184 let daemon = server.imp().daemon().await;
185 let mut state = server.imp().state.write().await;
186
187 tokio::select! {
188 Ok(message) = listener.recv() => {
189 match self
190 .handle_message(message, &mut state, daemon.as_ref())
191 .await?
192 {
193 MessageOutcomes::Nothing => continue,
194 MessageOutcomes::Signal(signal) => server.emit(signal).await?,
195 MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
196 MessageOutcomes::Quit => break,
197 }
198 }
199 _ = ticker.tick() => {
200 if let Some(runtime) = &mut state.runtime {
201 runtime.seek_position += TICK_RATE;
202 runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
203 }
204 }
205 }
206 }
207
208 Ok(())
209 }
210
211 pub async fn handle_message(
221 &self,
222 message: Message,
223 state: &mut StateAudio,
224 daemon: Option<&MusicPlayerClient>,
225 ) -> anyhow::Result<MessageOutcomes> {
226 log::info!("Received event: {message:?}");
227 match message {
228 Message::Event(
229 Event::LibraryAnalysisFinished
230 | Event::LibraryReclusterFinished
231 | Event::LibraryRescanFinished,
232 ) => Ok(MessageOutcomes::Nothing),
233 Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
234 Message::StateChange(StateChange::Muted) => {
235 state.muted = true;
236 Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
237 }
238 Message::StateChange(StateChange::Unmuted) => {
239 state.muted = false;
240 Ok(MessageOutcomes::Properties(vec![Property::Volume(
241 state.volume.into(),
242 )]))
243 }
244 Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
245 state.volume = new_volume;
246 Ok(MessageOutcomes::Properties(vec![Property::Volume(
247 new_volume.into(),
248 )]))
249 }
250 Message::StateChange(StateChange::TrackChanged(_)) => {
252 let context = Context::current();
253 if let Some(daemon) = daemon {
255 *state = daemon
256 .state_audio(context)
257 .await
258 .context("Failed to get state from daemon")?
259 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
260 } else {
261 state.current_song = None;
262 state.runtime = None;
263 }
264
265 let metadata = metadata_from_opt_song(state.current_song.as_ref());
266 Ok(MessageOutcomes::Properties(vec![Property::Metadata(
267 metadata,
268 )]))
269 }
270 Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
271 state.repeat_mode = new_mode;
272 Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
273 match new_mode {
274 RepeatMode::None => LoopStatus::None,
275 RepeatMode::One => LoopStatus::Track,
276 RepeatMode::All => LoopStatus::Playlist,
277 },
278 )]))
279 }
280 Message::StateChange(StateChange::Seeked(position)) => {
281 if let Some(runtime) = &mut state.runtime {
282 runtime.seek_position = position;
283 runtime.seek_percent = Percent::new(
284 position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
285 );
286 }
287 Ok(MessageOutcomes::Signal(Signal::Seeked {
288 position: Time::from_micros(
289 i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
290 ),
291 }))
292 }
293 Message::StateChange(StateChange::StatusChanged(status)) => {
294 state.status = status;
295 Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
296 match status {
297 Status::Stopped => PlaybackStatus::Stopped,
298 Status::Paused => PlaybackStatus::Paused,
299 Status::Playing => PlaybackStatus::Playing,
300 },
301 )]))
302 }
303 }
304 }
305}
306
307#[must_use]
308pub fn metadata_from_opt_song(song: Option<&Song>) -> Metadata {
309 song.map_or_else(
310 || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
311 |song| {
312 Metadata::builder()
313 .trackid(object_path_from_thing(&song.id.clone().into()))
314 .length(Time::from_micros(
315 i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
316 ))
317 .artist(song.artist.iter().map(ToString::to_string))
318 .album(song.album.to_string())
319 .title(song.title.to_string())
320 .build()
321 },
322 )
323}
324
325fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath {
326 ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
327 .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
328}
329
330#[cfg(test)]
331mod subscriber_tests {
332 use std::{num::NonZero, sync::Arc};
333
334 use super::*;
335 use mecomp_core::{audio::AudioKernelSender, config::Settings};
336 use mecomp_daemon::init_test_client_server;
337 use mecomp_storage::{
338 db::schemas::song::Song,
339 test_utils::{arb_song_case, init_test_database_with_state},
340 };
341 use mpris_server::Metadata;
342 use pretty_assertions::assert_str_eq;
343 use rstest::rstest;
344 use tempfile::TempDir;
345
346 #[rstest]
347 #[case::nothing(
348 Message::Event(Event::LibraryAnalysisFinished),
349 MessageOutcomes::Nothing
350 )]
351 #[case::nothing(
352 Message::Event(Event::LibraryReclusterFinished),
353 MessageOutcomes::Nothing
354 )]
355 #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
356 #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
357 #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
358 #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
359 #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
360 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
361 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
362 #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
363 #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
364 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
365 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
366 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
367 #[tokio::test]
368 async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
369 let tempdir = TempDir::new().unwrap();
370
371 let db = init_test_database_with_state(
372 NonZero::new(4).unwrap(),
373 |i| (arb_song_case()(), i > 1, i > 2),
374 None,
375 &tempdir,
376 )
377 .await;
378
379 let settings = Arc::new(Settings::default());
380
381 let (event_tx, _) = std::sync::mpsc::channel();
382
383 let audio_kernel = AudioKernelSender::start(event_tx);
384
385 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
386 .await
387 .unwrap();
388
389 let state = &mut StateAudio::default();
390
391 let actual = Subscriber
392 .handle_message(message, state, Some(&daemon))
393 .await
394 .unwrap();
395
396 assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
398 }
399}
400
401#[cfg(test)]
402pub mod test_utils {
403 use std::{num::NonZero, sync::Arc};
404
405 use super::*;
406 use mecomp_core::{audio::AudioKernelSender, config::Settings};
407 use mecomp_daemon::init_test_client_server;
408 use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
409 use rstest::fixture;
410 use surrealdb::{Surreal, engine::local::Db};
411 use tempfile::TempDir;
412
413 async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
415 init_test_database_with_state(
416 NonZero::new(4).unwrap(),
417 |i| (arb_song_case()(), i > 1, i > 2),
418 None,
419 tempdir,
420 )
421 .await
422 }
423
424 #[fixture]
425 pub async fn fixtures() -> (
426 Mpris,
427 std::sync::mpsc::Receiver<StateChange>,
428 TempDir,
429 Arc<AudioKernelSender>,
430 ) {
431 let tempdir = TempDir::new().unwrap();
432
433 let db = db(&tempdir).await;
434
435 let settings = Arc::new(Settings::default());
436
437 let (event_tx, event_rx) = std::sync::mpsc::channel();
438
439 let audio_kernel = AudioKernelSender::start(event_tx);
440
441 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
442 .await
443 .unwrap();
444
445 let mpris = Mpris::new_with_daemon(daemon);
446
447 (mpris, event_rx, tempdir, audio_kernel)
448 }
449}