1use mecomp_core::udp::{Event, Listener, Message};
2use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
3use state::action::{Action, PopupAction};
4use termination::Interrupted;
5use tokio::sync::{broadcast, mpsc};
6use ui::widgets::popups::PopupType;
7
8pub mod state;
9pub mod termination;
10#[cfg(test)]
11mod test_utils;
12pub mod ui;
13
14#[derive(Debug)]
15pub struct Subscriber;
16
17impl Subscriber {
18 pub async fn main_loop(
24 &self,
25 mut daemon: MusicPlayerClient,
26 action_tx: mpsc::UnboundedSender<Action>,
27 mut interrupt_rx: broadcast::Receiver<Interrupted>,
28 ) -> anyhow::Result<Interrupted> {
29 let mut listener = Listener::new().await?;
30 daemon
31 .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
32 .await?;
33
34 #[allow(clippy::redundant_pub_crate)]
35 let result = loop {
36 tokio::select! {
37 Ok(message) = listener.recv() => {
38 self.handle_message(&action_tx, message)?;
39 }
40 Ok(interrupted) = interrupt_rx.recv() => {
41 break interrupted;
42 }
43 }
44 };
45
46 Ok(result)
47 }
48
49 pub fn handle_message(
55 &self,
56 action_tx: &mpsc::UnboundedSender<Action>,
57 message: Message,
58 ) -> anyhow::Result<()> {
59 match message {
60 Message::Event(event) => {
61 let notification = match event {
62 Event::DaemonShutdown => {
63 action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
64 return Ok(()); }
66 Event::LibraryRescanFinished => "Library rescan finished",
67 Event::LibraryAnalysisFinished => "Library analysis finished",
68 Event::LibraryReclusterFinished => "Library recluster finished",
69 };
70 action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
71
72 action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
73 notification.into(),
74 ))))?;
75 }
76 Message::StateChange(state_change) => {
77 action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
78 state_change,
79 )))?;
80 }
81 }
82
83 Ok(())
84 }
85}
86
87#[cfg(test)]
88mod subscriber_tests {
89 use std::sync::Arc;
90
91 use super::*;
92 use mecomp_core::{audio::AudioKernelSender, config::Settings};
93 use mecomp_daemon::init_test_client_server;
94 use mecomp_storage::{
95 db::schemas::{
96 album::Album, analysis::Analysis, artist::Artist, collection::Collection,
97 playlist::Playlist, song::Song,
98 },
99 test_utils::{arb_feature_array, init_test_database},
100 };
101 use rstest::{fixture, rstest};
102 use surrealdb::{RecordId, Surreal, engine::local::Db};
103 use tempfile::tempdir;
104 use termination::create_termination;
105 use test_utils::item_id;
106 use tokio::sync::oneshot;
107
108 async fn db_with_state() -> Arc<Surreal<Db>> {
110 let db = Arc::new(init_test_database().await.unwrap());
111
112 let album_id = RecordId::from_table_key("album", item_id().to_string());
113 let analysis_id = RecordId::from_table_key("analysis", item_id().to_string());
114 let artist_id = RecordId::from_table_key("artist", item_id().to_string());
115 let collection_id = RecordId::from_table_key("collection", item_id().to_string());
116 let playlist_id = RecordId::from_table_key("playlist", item_id().to_string());
117 let song_id = RecordId::from_table_key("song", item_id().to_string());
118
119 let song = Song {
121 id: song_id.clone(),
122 title: "Test Song".into(),
123 artist: "Test Artist".to_string().into(),
124 album_artist: "Test Artist".to_string().into(),
125 album: "Test Album".into(),
126 genre: "Test Genre".to_string().into(),
127 runtime: std::time::Duration::from_secs(180),
128 track: Some(0),
129 disc: Some(0),
130 release_year: Some(2021),
131 extension: "mp3".into(),
132 path: "test.mp3".into(),
133 };
134 let analysis = Analysis {
135 id: analysis_id.clone(),
136 features: arb_feature_array()(),
137 embedding: arb_feature_array()(),
138 };
139 let artist = Artist {
140 id: artist_id.clone(),
141 name: song.artist[0].clone(),
142 runtime: song.runtime,
143 album_count: 1,
144 song_count: 1,
145 };
146 let album = Album {
147 id: album_id.clone(),
148 title: song.album.clone(),
149 artist: song.artist.clone(),
150 release: song.release_year,
151 runtime: song.runtime,
152 song_count: 1,
153 discs: 1,
154 genre: song.genre.clone(),
155 };
156 let collection = Collection {
157 id: collection_id.clone(),
158 name: "Collection 0".into(),
159 runtime: song.runtime,
160 song_count: 1,
161 };
162 let playlist = Playlist {
163 id: playlist_id.clone(),
164 name: "Test Playlist".into(),
165 runtime: song.runtime,
166 song_count: 1,
167 };
168
169 Song::create(&db, song).await.unwrap();
171 Analysis::create(&db, song_id.clone(), analysis)
172 .await
173 .unwrap();
174 Artist::create(&db, artist).await.unwrap();
175 Album::create(&db, album).await.unwrap();
176 Collection::create(&db, collection).await.unwrap();
177 Playlist::create(&db, playlist).await.unwrap();
178
179 Album::add_song(&db, album_id.clone(), song_id.clone())
181 .await
182 .unwrap();
183 Artist::add_album(&db, artist_id.clone(), album_id)
184 .await
185 .unwrap();
186 Artist::add_song(&db, artist_id.clone(), song_id.clone())
187 .await
188 .unwrap();
189 Collection::add_songs(&db, collection_id, vec![song_id.clone()])
190 .await
191 .unwrap();
192 Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
193 .await
194 .unwrap();
195
196 db
197 }
198
199 #[fixture]
200 async fn daemon() -> MusicPlayerClient {
201 let music_dir = Arc::new(tempdir().unwrap());
202
203 let db = db_with_state().await;
204 let mut settings = Settings::default();
205 settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
206 let settings = Arc::new(settings);
207 let (tx, _) = std::sync::mpsc::channel();
208 let audio_kernel = AudioKernelSender::start(tx);
209
210 init_test_client_server(db, settings, audio_kernel)
211 .await
212 .unwrap()
213 }
214
215 #[rstest]
216 #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
217 #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
218 #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
219 #[tokio::test]
220 async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
221 let (tx, mut rx) = mpsc::unbounded_channel();
222 let subscriber = Subscriber;
223
224 subscriber.handle_message(&tx, message).unwrap();
225
226 let action = rx.recv().await.unwrap();
227
228 assert_eq!(
229 action,
230 Action::Library(state::action::LibraryAction::Update)
231 );
232
233 let action = rx.recv().await.unwrap();
234
235 assert_eq!(
236 action,
237 Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
238 );
239 }
240
241 #[rstest]
242 #[tokio::test]
243 async fn test_connect(#[future] daemon: MusicPlayerClient) {
244 let mut daemon = daemon.await;
245
246 let (mut terminator, interrupt_rx) = create_termination();
247 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
248
249 terminator.terminate(Interrupted::UserInt).unwrap();
250
251 let (tx, rx) = oneshot::channel();
252
253 let daemon_ = daemon.clone();
254
255 tokio::spawn(async move {
256 let interrupted = Subscriber
257 .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
258 .await
259 .unwrap();
260
261 tx.send(interrupted).unwrap();
262 });
263
264 tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
265
266 daemon.library_rescan(()).await.unwrap();
267
268 let action = action_rx.recv().await.unwrap();
269
270 assert_eq!(
271 action,
272 Action::Library(state::action::LibraryAction::Update)
273 );
274
275 let action = action_rx.recv().await.unwrap();
276
277 assert_eq!(
278 action,
279 Action::Popup(PopupAction::Open(PopupType::Notification(
280 "Library rescan finished".into()
281 )))
282 );
283
284 terminator.terminate(Interrupted::UserInt).unwrap();
286 assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
287 }
288}