1#![allow(clippy::needless_continue)]
2
3pub mod interfaces;
4
5use std::time::Duration;
6
7use anyhow::{Context as _, Result, anyhow};
8
9use mecomp_core::{
10 state::{Percent, RepeatMode, StateAudio, Status},
11 udp::{Event, Listener, Message, StateChange},
12};
13use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
14use mecomp_storage::db::schemas::song::SongBrief;
15use mpris_server::{
16 LoopStatus, Metadata, PlaybackStatus, Property, Server, Signal, Time, TrackId,
17 zbus::{Error as ZbusError, zvariant::ObjectPath},
18};
19use tokio::sync::RwLock;
20
21#[derive(Debug)]
22pub struct Mpris {
23 pub daemon: MusicPlayerClient,
24 pub port: u16,
25 pub state: RwLock<StateAudio>,
26}
27
28impl Mpris {
29 #[must_use]
31 pub fn new_with_daemon(daemon: MusicPlayerClient) -> Self {
32 Self {
33 daemon,
34 port: 0,
35 state: RwLock::new(StateAudio::default()),
36 }
37 }
38
39 pub async fn update_state(&self) -> Result<()> {
45 let new_state = self
46 .daemon
47 .clone()
48 .state_audio(())
49 .await
50 .context("Failed to get state from daemon")?
51 .into_inner()
52 .state
53 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?
54 .into();
55
56 *self.state.write().await = new_state;
57
58 Ok(())
59 }
60
61 pub async fn start_server(self, bus_name_suffix: &str) -> Result<Server<Self>, ZbusError> {
69 Server::new(bus_name_suffix, self).await
70 }
71}
72
73#[derive(Debug)]
74pub enum MessageOutcomes {
75 Nothing,
76 Signal(Signal),
77 Properties(Vec<Property>),
78 Quit,
79}
80
81pub const TICK_RATE: Duration = Duration::from_millis(100);
83
84#[derive(Debug)]
85pub struct Subscriber;
86
87impl Subscriber {
88 pub async fn main_loop(
94 &self,
95 server: &Server<Mpris>,
96 ) -> anyhow::Result<()> {
98 let mut listener = Listener::new().await?;
99
100 server
101 .imp()
102 .daemon
103 .clone()
104 .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
105 .await?;
106
107 let mut ticker = tokio::time::interval(TICK_RATE);
108 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
109
110 #[allow(clippy::redundant_pub_crate)]
111 loop {
112 let mut state = server.imp().state.write().await;
113
114 tokio::select! {
115 Ok(message) = listener.recv() => {
116 match self
117 .handle_message(message, &mut state, server.imp().daemon.clone())
118 .await?
119 {
120 MessageOutcomes::Nothing => continue,
121 MessageOutcomes::Signal(signal) => server.emit(signal).await?,
122 MessageOutcomes::Properties(items) => server.properties_changed(items).await?,
123 MessageOutcomes::Quit => break,
124 }
125 }
126 _ = ticker.tick() => {
127 if state.paused() {
128 continue;
129 }
130 if let Some(runtime) = &mut state.runtime {
131 runtime.seek_position += TICK_RATE;
132 runtime.seek_percent = Percent::new(runtime.seek_position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0);
133 }
134 }
135 }
136 }
137
138 Ok(())
139 }
140
141 pub async fn handle_message(
153 &self,
154 message: Message,
155 state: &mut StateAudio,
156 daemon: MusicPlayerClient,
157 ) -> anyhow::Result<MessageOutcomes> {
158 log::info!("Received event: {message:?}");
159 match message {
160 Message::Event(
161 Event::LibraryAnalysisFinished
162 | Event::LibraryReclusterFinished
163 | Event::LibraryRescanFinished,
164 ) => Ok(MessageOutcomes::Nothing),
165 Message::Event(Event::DaemonShutdown) => Ok(MessageOutcomes::Quit),
166 Message::StateChange(StateChange::Muted) => {
167 state.muted = true;
168 Ok(MessageOutcomes::Properties(vec![Property::Volume(0.0)]))
169 }
170 Message::StateChange(StateChange::Unmuted) => {
171 state.muted = false;
172 Ok(MessageOutcomes::Properties(vec![Property::Volume(
173 state.volume.into(),
174 )]))
175 }
176 Message::StateChange(StateChange::VolumeChanged(new_volume)) => {
177 state.volume = new_volume;
178 Ok(MessageOutcomes::Properties(vec![Property::Volume(
179 new_volume.into(),
180 )]))
181 }
182 Message::StateChange(StateChange::TrackChanged(_) | StateChange::QueueChanged) => {
184 *state = daemon
186 .clone()
187 .state_audio(())
188 .await
189 .context("Failed to get state from daemon")?
190 .into_inner()
191 .state
192 .ok_or_else(|| anyhow!("Failed to get state from daemon"))?
193 .into();
194
195 let metadata = metadata_from_opt_song(state.current_song.as_ref());
196 Ok(MessageOutcomes::Properties(vec![Property::Metadata(
197 metadata,
198 )]))
199 }
200 Message::StateChange(StateChange::RepeatModeChanged(new_mode)) => {
201 state.repeat_mode = new_mode;
202 Ok(MessageOutcomes::Properties(vec![Property::LoopStatus(
203 match new_mode {
204 RepeatMode::None => LoopStatus::None,
205 RepeatMode::One => LoopStatus::Track,
206 RepeatMode::All => LoopStatus::Playlist,
207 },
208 )]))
209 }
210 Message::StateChange(StateChange::Seeked(position)) => {
211 if let Some(runtime) = &mut state.runtime {
212 runtime.seek_position = position;
213 runtime.seek_percent = Percent::new(
214 position.as_secs_f32() / runtime.duration.as_secs_f32() * 100.0,
215 );
216 }
217 Ok(MessageOutcomes::Signal(Signal::Seeked {
218 position: Time::from_micros(
219 i64::try_from(position.as_micros()).unwrap_or(i64::MAX),
220 ),
221 }))
222 }
223 Message::StateChange(StateChange::StatusChanged(status)) => {
224 state.status = status;
225 Ok(MessageOutcomes::Properties(vec![Property::PlaybackStatus(
226 match status {
227 Status::Stopped => PlaybackStatus::Stopped,
228 Status::Paused => PlaybackStatus::Paused,
229 Status::Playing => PlaybackStatus::Playing,
230 },
231 )]))
232 }
233 }
234 }
235}
236
237#[must_use]
238pub fn metadata_from_opt_song(song: Option<&SongBrief>) -> Metadata {
239 song.map_or_else(
240 || Metadata::builder().trackid(TrackId::NO_TRACK).build(),
241 |song| {
242 Metadata::builder()
243 .trackid(object_path_from_thing(&song.id.clone().into()))
244 .length(Time::from_micros(
245 i64::try_from(song.runtime.as_micros()).unwrap_or(i64::MAX),
246 ))
247 .artist(song.artist.as_slice())
248 .album(&song.album)
249 .title(&song.title)
250 .build()
251 },
252 )
253}
254
255fn object_path_from_thing(thing: &mecomp_storage::db::schemas::RecordId) -> ObjectPath<'_> {
256 ObjectPath::try_from(format!("/mecomp/{}/{}", thing.tb, thing.id))
257 .unwrap_or_else(|e| panic!("Failed to convert {thing} to ObjectPath: {e}"))
258}
259
260#[cfg(test)]
261mod subscriber_tests {
262 use std::{num::NonZero, sync::Arc};
263
264 use super::*;
265 use mecomp_core::{audio::AudioKernelSender, config::Settings};
266 use mecomp_daemon::init_test_client_server;
267 use mecomp_storage::{
268 db::schemas::song::Song,
269 test_utils::{arb_song_case, init_test_database_with_state},
270 };
271 use mpris_server::Metadata;
272 use pretty_assertions::assert_str_eq;
273 use rstest::rstest;
274 use tempfile::TempDir;
275
276 #[rstest]
277 #[case::nothing(
278 Message::Event(Event::LibraryAnalysisFinished),
279 MessageOutcomes::Nothing
280 )]
281 #[case::nothing(
282 Message::Event(Event::LibraryReclusterFinished),
283 MessageOutcomes::Nothing
284 )]
285 #[case::nothing(Message::Event(Event::LibraryRescanFinished), MessageOutcomes::Nothing)]
286 #[case::quit(Message::Event(Event::DaemonShutdown), MessageOutcomes::Quit)]
287 #[case::muted(Message::StateChange(StateChange::Muted), MessageOutcomes::Properties(vec![Property::Volume(0.0)]))]
288 #[case::unmuted(Message::StateChange(StateChange::Unmuted), MessageOutcomes::Properties(vec![Property::Volume(1.0)]))]
289 #[case::volume_changed(Message::StateChange(StateChange::VolumeChanged(0.75)), MessageOutcomes::Properties(vec![Property::Volume(0.75)]))]
290 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(None)), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
291 #[case::track_changed(Message::StateChange(StateChange::TrackChanged(Some(Song::generate_id().into()))), MessageOutcomes::Properties(vec![Property::Metadata(Metadata::builder().trackid(TrackId::NO_TRACK).build())]))]
292 #[case::repeat_mode_changed(Message::StateChange(StateChange::RepeatModeChanged(RepeatMode::One)), MessageOutcomes::Properties(vec![Property::LoopStatus(LoopStatus::Track)]))]
293 #[case::seeked(Message::StateChange(StateChange::Seeked(Duration::from_secs(10))), MessageOutcomes::Signal(Signal::Seeked { position: Time::from_micros(10_000_000) }))]
294 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Playing)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Playing)]))]
295 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Paused)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Paused)]))]
296 #[case::status_changed(Message::StateChange(StateChange::StatusChanged(Status::Stopped)), MessageOutcomes::Properties(vec![Property::PlaybackStatus(PlaybackStatus::Stopped)]))]
297 #[tokio::test]
298 async fn test_handle_message(#[case] message: Message, #[case] expected: MessageOutcomes) {
299 let tempdir = TempDir::new().unwrap();
300
301 let db = init_test_database_with_state(
302 NonZero::new(4).unwrap(),
303 |i| (arb_song_case()(), i > 1, i > 2),
304 None,
305 &tempdir,
306 )
307 .await;
308
309 let settings = Arc::new(Settings::default());
310
311 let (event_tx, _) = std::sync::mpsc::channel();
312
313 let audio_kernel = AudioKernelSender::start(event_tx);
314
315 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
316 .await
317 .unwrap();
318
319 let state = &mut StateAudio::default();
320
321 let actual = Subscriber
322 .handle_message(message, state, daemon.clone())
323 .await
324 .unwrap();
325
326 assert_str_eq!(format!("{actual:?}"), format!("{expected:?}"));
328 }
329}
330
331#[cfg(test)]
332pub mod test_utils {
333 use std::{num::NonZero, sync::Arc};
334
335 use super::*;
336 use mecomp_core::{audio::AudioKernelSender, config::Settings};
337 use mecomp_daemon::init_test_client_server;
338 use mecomp_storage::test_utils::{arb_song_case, init_test_database_with_state};
339 use rstest::fixture;
340 use surrealdb::{Surreal, engine::local::Db};
341 use tempfile::TempDir;
342
343 async fn db(tempdir: &TempDir) -> Arc<Surreal<Db>> {
345 init_test_database_with_state(
346 NonZero::new(4).unwrap(),
347 |i| (arb_song_case()(), i > 1, i > 2),
348 None,
349 tempdir,
350 )
351 .await
352 }
353
354 #[fixture]
355 pub async fn fixtures() -> (
356 Mpris,
357 std::sync::mpsc::Receiver<StateChange>,
358 TempDir,
359 Arc<AudioKernelSender>,
360 ) {
361 let tempdir = TempDir::new().unwrap();
362
363 let db = db(&tempdir).await;
364
365 let settings = Arc::new(Settings::default());
366
367 let (event_tx, event_rx) = std::sync::mpsc::channel();
368
369 let audio_kernel = AudioKernelSender::start(event_tx);
370
371 let daemon = init_test_client_server(db, settings, audio_kernel.clone())
372 .await
373 .unwrap();
374
375 let mpris = Mpris::new_with_daemon(daemon);
376
377 (mpris, event_rx, tempdir, audio_kernel)
378 }
379}