1use mecomp_core::udp::{Event, Listener, Message};
2use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
3use state::action::{Action, PopupAction};
4use termination::Interrupted;
5use tokio::sync::{broadcast, mpsc};
6use ui::widgets::popups::PopupType;
7
8pub mod state;
9pub mod termination;
10#[cfg(test)]
11mod test_utils;
12pub mod ui;
13
14#[derive(Debug)]
15pub struct Subscriber;
16
17impl Subscriber {
18 pub async fn main_loop(
24 &self,
25 mut daemon: MusicPlayerClient,
26 action_tx: mpsc::UnboundedSender<Action>,
27 mut interrupt_rx: broadcast::Receiver<Interrupted>,
28 ) -> anyhow::Result<Interrupted> {
29 let mut listener = Listener::new().await?;
30 daemon
31 .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
32 .await?;
33
34 #[allow(clippy::redundant_pub_crate)]
35 let result = loop {
36 tokio::select! {
37 Ok(message) = listener.recv() => {
38 self.handle_message(&action_tx, message)?;
39 }
40 Ok(interrupted) = interrupt_rx.recv() => {
41 break interrupted;
42 }
43 }
44 };
45
46 Ok(result)
47 }
48
49 pub fn handle_message(
55 &self,
56 action_tx: &mpsc::UnboundedSender<Action>,
57 message: Message,
58 ) -> anyhow::Result<()> {
59 match message {
60 Message::Event(event) => {
61 let notification = match event {
62 Event::DaemonShutdown => {
63 action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
64 return Ok(()); }
66 Event::LibraryRescanFinished => "Library rescan finished",
67 Event::LibraryAnalysisFinished => "Library analysis finished",
68 Event::LibraryReclusterFinished => "Library recluster finished",
69 };
70 action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
71
72 action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
73 notification.into(),
74 ))))?;
75 }
76 Message::StateChange(state_change) => {
77 action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
78 state_change,
79 )))?;
80 }
81 }
82
83 Ok(())
84 }
85}
86
87#[cfg(test)]
88mod subscriber_tests {
89 use std::sync::Arc;
90
91 use super::*;
92 use mecomp_core::{audio::AudioKernelSender, config::Settings};
93 use mecomp_daemon::init_test_client_server;
94 use mecomp_storage::{
95 db::schemas::{
96 album::Album, analysis::Analysis, artist::Artist, collection::Collection,
97 playlist::Playlist, song::Song,
98 },
99 test_utils::{arb_analysis_features, init_test_database},
100 };
101 use rstest::{fixture, rstest};
102 use surrealdb::{RecordId, Surreal, engine::local::Db};
103 use tempfile::tempdir;
104 use termination::create_termination;
105 use test_utils::item_id;
106 use tokio::sync::oneshot;
107
108 async fn db_with_state() -> Arc<Surreal<Db>> {
110 let db = Arc::new(init_test_database().await.unwrap());
111
112 let album_id = RecordId::from_table_key("album", item_id().to_string());
113 let analysis_id = RecordId::from_table_key("analysis", item_id().to_string());
114 let artist_id = RecordId::from_table_key("artist", item_id().to_string());
115 let collection_id = RecordId::from_table_key("collection", item_id().to_string());
116 let playlist_id = RecordId::from_table_key("playlist", item_id().to_string());
117 let song_id = RecordId::from_table_key("song", item_id().to_string());
118
119 let song = Song {
121 id: song_id.clone(),
122 title: "Test Song".into(),
123 artist: "Test Artist".to_string().into(),
124 album_artist: "Test Artist".to_string().into(),
125 album: "Test Album".into(),
126 genre: "Test Genre".to_string().into(),
127 runtime: std::time::Duration::from_secs(180),
128 track: Some(0),
129 disc: Some(0),
130 release_year: Some(2021),
131 extension: "mp3".into(),
132 path: "test.mp3".into(),
133 };
134 let analysis = Analysis {
135 id: analysis_id.clone(),
136 features: arb_analysis_features()(),
137 };
138 let artist = Artist {
139 id: artist_id.clone(),
140 name: song.artist[0].clone(),
141 runtime: song.runtime,
142 album_count: 1,
143 song_count: 1,
144 };
145 let album = Album {
146 id: album_id.clone(),
147 title: song.album.clone(),
148 artist: song.artist.clone(),
149 release: song.release_year,
150 runtime: song.runtime,
151 song_count: 1,
152 discs: 1,
153 genre: song.genre.clone(),
154 };
155 let collection = Collection {
156 id: collection_id.clone(),
157 name: "Collection 0".into(),
158 runtime: song.runtime,
159 song_count: 1,
160 };
161 let playlist = Playlist {
162 id: playlist_id.clone(),
163 name: "Test Playlist".into(),
164 runtime: song.runtime,
165 song_count: 1,
166 };
167
168 Song::create(&db, song).await.unwrap();
170 Analysis::create(&db, song_id.clone(), analysis)
171 .await
172 .unwrap();
173 Artist::create(&db, artist).await.unwrap();
174 Album::create(&db, album).await.unwrap();
175 Collection::create(&db, collection).await.unwrap();
176 Playlist::create(&db, playlist).await.unwrap();
177
178 Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
180 .await
181 .unwrap();
182 Artist::add_album(&db, artist_id.clone(), album_id)
183 .await
184 .unwrap();
185 Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
186 .await
187 .unwrap();
188 Collection::add_songs(&db, collection_id, vec![song_id.clone()])
189 .await
190 .unwrap();
191 Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
192 .await
193 .unwrap();
194
195 db
196 }
197
198 #[fixture]
199 async fn daemon() -> MusicPlayerClient {
200 let music_dir = Arc::new(tempdir().unwrap());
201
202 let db = db_with_state().await;
203 let mut settings = Settings::default();
204 settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
205 let settings = Arc::new(settings);
206 let (tx, _) = std::sync::mpsc::channel();
207 let audio_kernel = AudioKernelSender::start(tx);
208
209 init_test_client_server(db, settings, audio_kernel)
210 .await
211 .unwrap()
212 }
213
214 #[rstest]
215 #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
216 #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
217 #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
218 #[tokio::test]
219 async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
220 let (tx, mut rx) = mpsc::unbounded_channel();
221 let subscriber = Subscriber;
222
223 subscriber.handle_message(&tx, message).unwrap();
224
225 let action = rx.recv().await.unwrap();
226
227 assert_eq!(
228 action,
229 Action::Library(state::action::LibraryAction::Update)
230 );
231
232 let action = rx.recv().await.unwrap();
233
234 assert_eq!(
235 action,
236 Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
237 );
238 }
239
240 #[rstest]
241 #[tokio::test]
242 async fn test_connect(#[future] daemon: MusicPlayerClient) {
243 let mut daemon = daemon.await;
244
245 let (mut terminator, interrupt_rx) = create_termination();
246 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
247
248 terminator.terminate(Interrupted::UserInt).unwrap();
249
250 let (tx, rx) = oneshot::channel();
251
252 let daemon_ = daemon.clone();
253
254 tokio::spawn(async move {
255 let interrupted = Subscriber
256 .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
257 .await
258 .unwrap();
259
260 tx.send(interrupted).unwrap();
261 });
262
263 tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
264
265 daemon.library_rescan(()).await.unwrap();
266
267 let action = action_rx.recv().await.unwrap();
268
269 assert_eq!(
270 action,
271 Action::Library(state::action::LibraryAction::Update)
272 );
273
274 let action = action_rx.recv().await.unwrap();
275
276 assert_eq!(
277 action,
278 Action::Popup(PopupAction::Open(PopupType::Notification(
279 "Library rescan finished".into()
280 )))
281 );
282
283 terminator.terminate(Interrupted::UserInt).unwrap();
285 assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
286 }
287}