1pub mod interfaces;
2
3use std::time::Duration;
4
5use anyhow::{anyhow, Context as _, Result};
6
7use mecomp_core::{
8 rpc::{init_client, MusicPlayerClient},
9 state::{Percent, RepeatMode, StateAudio, Status},
10 udp::{Event, Listener, Message, StateChange},
11};
12use mecomp_storage::db::schemas::song::Song;
13use mpris_server::{
14 zbus::{zvariant::ObjectPath, Error as ZbusError},
15 LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
16};
17use tarpc::context::Context;
18use tokio::sync::{RwLock, RwLockReadGuard};
19
20pub struct Mpris {
21 daemon: RwLock<Option<MusicPlayerClient>>,
22 pub port: u16,
23 pub state: RwLock<StateAudio>,
24}
25
26impl Mpris {
27 #[must_use]
29 pub fn new(port: u16) -> Self {
30 Self {
31 daemon: RwLock::new(None),
32 port,
33 state: RwLock::new(StateAudio::default()),
34 }
35 }
36
37 pub async fn daemon(&self) -> RwLockReadGuard<Option<MusicPlayerClient>> {
39 let mut maybedaemon = self.daemon.write().await;
40 if let Some(daemon) = maybedaemon.as_ref() {
41 let context = Context::current();
42 if daemon.ping(context).await.is_ok() {
43 return maybedaemon.downgrade();
44 }
45 }
46
47 *maybedaemon = None;
49 log::info!("Lost connection to daemon, shutting down");
50 #[cfg(not(test))] std::thread::spawn(|| {
53 std::thread::sleep(Duration::from_secs(5));
54 std::process::exit(0);
55 });
56 maybedaemon.downgrade()
62 }
63
64 #[must_use]
66 pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
67 Self {
68 daemon: RwLock::new(Some(daemon)),
69 port: 0,
70 state: RwLock::new(StateAudio::default()),
71 }
72 }
73
74 pub async fn connect(&self) -> Result<()> {
80 if self.daemon.read().await.is_some() {
81 return Ok(());
82 }
83
84 let daemon = init_client(self.port).await.context(format!(
85 "Failed to connect to daemon on port: {}",
86 self.port
87 ))?;
88
89 *self.state.write().await = daemon
90 .state_audio(Context::current())
91 .await
92 .context(
93 "Failed to get initial state from daemon, please ensure the daemon is running",
94 )?
95 .ok_or_else(|| anyhow!("Failed to get initial state from daemon"))?;
96 *self.daemon.write().await = Some(daemon);
97
98 Ok(())
99 }
100
101 pub async fn connect_with_retry(&self) -> Result<()> {
107 const MAX_RETRIES: u8 = 5;
108 const BASE_DELAY: Duration = Duration::from_secs(1);
109
110 let mut retries = 0;
111
112 while retries < MAX_RETRIES {
113 match self.connect().await {
114 Ok(()) => return Ok(()),
115 Err(e) => {
116 retries += 1;
117 log::warn!("Failed to connect to daemon: {}", e);
118 tokio::time::sleep(BASE_DELAY * u32::from(retries)).await;
119 }
120 }
121 }
122
123 Err(anyhow!(
124 "Failed to connect to daemon on port {} after {} retries",
125 self.port,
126 MAX_RETRIES
127 ))
128 }
129
130 pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
138 Server::new(bus_name_suffix, self).await
139 }
140}
141
142#[derive(Debug)]
143pub enum MessageOutcomes {
144 Nothing,
145 Signal(Signal),
146 Properties(Vec<Property>),
147 Quit,
148}
149
150pub const TICK_RATE: Duration = Duration::from_millis(100);
152
153#[derive(Debug)]
154pub struct Subscriber;
155
156impl Subscriber {
157 pub async fn main_loop(
163 &self,
164 server: &Server<Mpris>,
165 ) -> anyhow::Result<()> {
167 let mut listener = Listener::new().await?;
168
169 let maybe_daemon = server.imp().daemon().await;
170 if let Some(daemon) = maybe_daemon.as_ref() {
171 daemon
172 .register_listener(Context::current(), listener.local_addr()?)
173 .await?;
174 } else {
175 return Err(anyhow!("Daemon not connected"));
176 }
177 drop(maybe_daemon);
178
179 let mut ticker = tokio::time::interval(TICK_RATE);
180
181 #[allow(clippy::redundant_pub_crate)]
182 loop {
183 let daemon = server.imp().daemon().await;
184 let mut state = server.imp().state.write().await;
185
186 tokio::select! {
187 Ok(message) = listener.recv() => {
188 match self
189 .handle_message(message, &mut state, daemon.as_ref())
190 .await?
191 {
192 MessageOutcomes::Nothing => continue,
193 MessageOutcomes::Signal(signal) => server.emit(signal).await?,
194 MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
195 MessageOutcomes::Quit => break,
196 }
197 }
198 _ = ticker.tick() => {
199 if let Some(runtime) = &mut state.runtime {
200 runtime.seek_position += TICK_RATE;
201 runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
202 }
203 }
204 }
205 }
206
207 Ok(())
208 }
209
210 pub async fn handle_message(
220 &self,
221 message: Message,
222 state: &mut StateAudio,
223 daemon: Option<&MusicPlayerClient>,
224 ) -> anyhow::Result<MessageOutcomes> {
225 log::info!("Received event: {:?}", message);
226 match message {
227 Message::Event(
228 Event::LibraryAnalysisFinished
229 | Event::LibraryReclusterFinished
230 | Event::LibraryRescanFinished,
231 ) => Ok(MessageOutcomes::Nothing),
232 Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
233 Message::StateChange(StateChange::Muted) => {
234 state.muted = true;
235 Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
236 }
237 Message::StateChange(StateChange::Unmuted) => {
238 state.muted = false;
239 Ok(MessageOutcomes::Properties(vec![Property::Volume(
240 state.volume.into(),
241 )]))
242 }
243 Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
244 state.volume = new_volume;
245 Ok(MessageOutcomes::Properties(vec![Property::Volume(
246 new_volume.into(),
247 )]))
248 }
249 Message::StateChange(StateChange::TrackChanged(_)) => {
251 let context = Context::current();
252 if let Some(daemon) = daemon {
254 *state = daemon
255 .state_audio(context)
256 .await
257 .context("Failed to get state from daemon")?
258 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?;
259 } else {
260 state.current_song = None;
261 state.runtime = None;
262 }
263
264 let metadata = metadata_from_opt_song(state.current_song.as_ref());
265 Ok(MessageOutcomes::Properties(vec![Property::Metadata(
266 metadata,
267 )]))
268 }
269 Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
270 state.repeat_mode = new_mode;
271 Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
272 match new_mode {
273 RepeatMode::None => LoopStatus::None,
274 RepeatMode::One => LoopStatus::Track,
275 RepeatMode::All => LoopStatus::Playlist,
276 },
277 )]))
278 }
279 Message::StateChange(StateChange::Seeked(position)) => {
280 if let Some(runtime) = &mut state.runtime {
281 runtime.seek_position = position;
282 runtime.seek_percent = Percent::new(
283 position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
284 );
285 }
286 Ok(MessageOutcomes::Signal(Signal::Seeked {
287 position: Time::from_micros(
288 i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
289 ),
290 }))
291 }
292 Message::StateChange(StateChange::StatusChanged(status)) => {
293 state.status = status;
294 Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
295 match status {
296 Status::Stopped => PlaybackStatus::Stopped,
297 Status::Paused => PlaybackStatus::Paused,
298 Status::Playing => PlaybackStatus::Playing,
299 },
300 )]))
301 }
302 }
303 }
304}
305
306#[must_use]
307pub fn metadata_from_opt_song(song: Option<&Song>) -> Metadata {
308 song.map_or_else(
309 || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
310 |song| {
311 Metadata::builder()
312 .trackid(object_path_from_thing(&song.id.clone().into()))
313 .length(Time::from_micros(
314 i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
315 ))
316 .artist(song.artist.iter().map(ToString::to_string))
317 .album(song.album.to_string())
318 .title(song.title.to_string())
319 .build()
320 },
321 )
322}
323
324fn object_path_from_thing(thing: &mecomp_storage::db::schemas::Thing) -> ObjectPath {
325 ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
326 .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
327}
328
329#[cfg(test)]
330mod subscriber_tests {
331 use std::{num::NonZero, sync::Arc};
332
333 use super::*;
334 use mecomp_core::{audio::AudioKernelSender, config::Settings};
335 use mecomp_daemon::init_test_client_server;
336 use mecomp_storage::{
337 db::schemas::song::Song,
338 test_utils::{arb_song_case, init_test_database_with_state},
339 };
340 use mpris_server::Metadata;
341 use pretty_assertions::assert_str_eq;
342 use rstest::rstest;
343 use tempfile::TempDir;
344
345 #[rstest]
346 #[case::nothing(
347 Message::Event(Event::LibraryAnalysisFinished),
348 MessageOutcomes::Nothing
349 )]
350 #[case::nothing(
351 Message::Event(Event::LibraryReclusterFinished),
352 MessageOutcomes::Nothing
353 )]
354 #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
355 #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
356 #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
357 #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
358 #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
359 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
360 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
361 #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
362 #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
363 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
364 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
365 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
366 #[tokio::test]
367 async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
368 let tempdir = TempDir::new().unwrap();
369
370 let db = init_test_database_with_state(
371 NonZero::new(4).unwrap(),
372 |i| (arb_song_case()(), i > 1, i > 2),
373 None,
374 &tempdir,
375 )
376 .await;
377
378 let settings = Arc::new(Settings::default());
379
380 let (event_tx, _) = std::sync::mpsc::channel();
381
382 let audio_kernel = AudioKernelSender::start(event_tx);
383
384 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
385 .await
386 .unwrap();
387
388 let state = &mut StateAudio::default();
389
390 let actual = Subscriber
391 .handle_message(message, state, Some(&daemon))
392 .await
393 .unwrap();
394
395 assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
397 }
398}
399
400#[cfg(test)]
401pub mod test_utils {
402 use std::{num::NonZero, sync::Arc};
403
404 use super::*;
405 use mecomp_core::{audio::AudioKernelSender, config::Settings};
406 use mecomp_daemon::init_test_client_server;
407 use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
408 use rstest::fixture;
409 use surrealdb::{engine::local::Db, Surreal};
410 use tempfile::TempDir;
411
412 async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
414 init_test_database_with_state(
415 NonZero::new(4).unwrap(),
416 |i| (arb_song_case()(), i > 1, i > 2),
417 None,
418 tempdir,
419 )
420 .await
421 }
422
423 #[fixture]
424 pub async fn fixtures() -> (
425 Mpris,
426 std::sync::mpsc::Receiver<StateChange>,
427 TempDir,
428 Arc<AudioKernelSender>,
429 ) {
430 let tempdir = TempDir::new().unwrap();
431
432 let db = db(&tempdir).await;
433
434 let settings = Arc::new(Settings::default());
435
436 let (event_tx, event_rx) = std::sync::mpsc::channel();
437
438 let audio_kernel = AudioKernelSender::start(event_tx);
439
440 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
441 .await
442 .unwrap();
443
444 let mpris = Mpris::new_with_daemon(daemon);
445
446 (mpris, event_rx, tempdir, audio_kernel)
447 }
448}