1use std::sync::Arc;
2
3use mecomp_core::{
4 rpc::MusicPlayerClient,
5 udp::{Event, Listener, Message},
6};
7use state::action::{Action, PopupAction};
8use tarpc::context::Context;
9use termination::Interrupted;
10use tokio::sync::{broadcast, mpsc};
11use ui::widgets::popups::PopupType;
12
13pub mod state;
14pub mod termination;
15#[cfg(test)]
16mod test_utils;
17pub mod ui;
18
19#[derive(Debug)]
20pub struct Subscriber;
21
22impl Subscriber {
23 pub async fn main_loop(
29 &self,
30 daemon: Arc<MusicPlayerClient>,
31 action_tx: mpsc::UnboundedSender<Action>,
32 mut interrupt_rx: broadcast::Receiver<Interrupted>,
33 ) -> anyhow::Result<Interrupted> {
34 let mut listener = Listener::new().await?;
35 daemon
36 .register_listener(Context::current(), listener.local_addr()?)
37 .await?;
38
39 #[allow(clippy::redundant_pub_crate)]
40 let result = loop {
41 tokio::select! {
42 Ok(message) = listener.recv() => {
43 self.handle_message(&action_tx, message)?;
44 }
45 Ok(interrupted) = interrupt_rx.recv() => {
46 break interrupted;
47 }
48 }
49 };
50
51 Ok(result)
52 }
53
54 pub fn handle_message(
60 &self,
61 action_tx: &mpsc::UnboundedSender<Action>,
62 message: Message,
63 ) -> anyhow::Result<()> {
64 match message {
65 Message::Event(event) => {
66 let notification = match event {
67 Event::DaemonShutdown => {
68 action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
69 return Ok(()); }
71 Event::LibraryRescanFinished => "Library rescan finished",
72 Event::LibraryAnalysisFinished => "Library analysis finished",
73 Event::LibraryReclusterFinished => "Library recluster finished",
74 };
75 action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
76
77 action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
78 notification.into(),
79 ))))?;
80 }
81 Message::StateChange(state_change) => {
82 action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
83 state_change,
84 )))?;
85 }
86 }
87
88 Ok(())
89 }
90}
91
92#[cfg(test)]
93mod subscriber_tests {
94 use super::*;
95 use mecomp_core::{audio::AudioKernelSender, config::Settings};
96 use mecomp_daemon::init_test_client_server;
97 use mecomp_storage::{
98 db::schemas::{
99 album::Album, analysis::Analysis, artist::Artist, collection::Collection,
100 playlist::Playlist, song::Song,
101 },
102 test_utils::{arb_analysis_features, init_test_database},
103 };
104 use rstest::{fixture, rstest};
105 use surrealdb::{RecordId, Surreal, engine::local::Db};
106 use tarpc::context::Context;
107 use tempfile::tempdir;
108 use termination::create_termination;
109 use test_utils::item_id;
110 use tokio::sync::oneshot;
111
112 async fn db_with_state() -> Arc<Surreal<Db>> {
114 let db = Arc::new(init_test_database().await.unwrap());
115
116 let album_id = RecordId::from_table_key("album", item_id());
117 let analysis_id = RecordId::from_table_key("analysis", item_id());
118 let artist_id = RecordId::from_table_key("artist", item_id());
119 let collection_id = RecordId::from_table_key("collection", item_id());
120 let playlist_id = RecordId::from_table_key("playlist", item_id());
121 let song_id = RecordId::from_table_key("song", item_id());
122
123 let song = Song {
125 id: song_id.clone(),
126 title: "Test Song".into(),
127 artist: "Test Artist".to_string().into(),
128 album_artist: "Test Artist".to_string().into(),
129 album: "Test Album".into(),
130 genre: "Test Genre".to_string().into(),
131 runtime: std::time::Duration::from_secs(180),
132 track: Some(0),
133 disc: Some(0),
134 release_year: Some(2021),
135 extension: "mp3".into(),
136 path: "test.mp3".into(),
137 };
138 let analysis = Analysis {
139 id: analysis_id.clone(),
140 features: arb_analysis_features()(),
141 };
142 let artist = Artist {
143 id: artist_id.clone(),
144 name: song.artist[0].clone(),
145 runtime: song.runtime,
146 album_count: 1,
147 song_count: 1,
148 };
149 let album = Album {
150 id: album_id.clone(),
151 title: song.album.clone(),
152 artist: song.artist.clone(),
153 release: song.release_year,
154 runtime: song.runtime,
155 song_count: 1,
156 discs: 1,
157 genre: song.genre.clone(),
158 };
159 let collection = Collection {
160 id: collection_id.clone(),
161 name: "Collection 0".into(),
162 runtime: song.runtime,
163 song_count: 1,
164 };
165 let playlist = Playlist {
166 id: playlist_id.clone(),
167 name: "Test Playlist".into(),
168 runtime: song.runtime,
169 song_count: 1,
170 };
171
172 Song::create(&db, song).await.unwrap();
174 Analysis::create(&db, song_id.clone(), analysis)
175 .await
176 .unwrap();
177 Artist::create(&db, artist).await.unwrap();
178 Album::create(&db, album).await.unwrap();
179 Collection::create(&db, collection).await.unwrap();
180 Playlist::create(&db, playlist).await.unwrap();
181
182 Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
184 .await
185 .unwrap();
186 Artist::add_album(&db, artist_id.clone(), album_id)
187 .await
188 .unwrap();
189 Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
190 .await
191 .unwrap();
192 Collection::add_songs(&db, collection_id, vec![song_id.clone()])
193 .await
194 .unwrap();
195 Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
196 .await
197 .unwrap();
198
199 db
200 }
201
202 #[fixture]
203 async fn daemon() -> MusicPlayerClient {
204 let music_dir = Arc::new(tempdir().unwrap());
205
206 let db = db_with_state().await;
207 let mut settings = Settings::default();
208 settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
209 let settings = Arc::new(settings);
210 let (tx, _) = std::sync::mpsc::channel();
211 let audio_kernel = AudioKernelSender::start(tx);
212
213 init_test_client_server(db, settings, audio_kernel)
214 .await
215 .unwrap()
216 }
217
218 #[rstest]
219 #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
220 #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
221 #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
222 #[tokio::test]
223 async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
224 let (tx, mut rx) = mpsc::unbounded_channel();
225 let subscriber = Subscriber;
226
227 subscriber.handle_message(&tx, message).unwrap();
228
229 let action = rx.recv().await.unwrap();
230
231 assert_eq!(
232 action,
233 Action::Library(state::action::LibraryAction::Update)
234 );
235
236 let action = rx.recv().await.unwrap();
237
238 assert_eq!(
239 action,
240 Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
241 );
242 }
243
244 #[rstest]
245 #[tokio::test]
246 async fn test_connect(#[future] daemon: MusicPlayerClient) {
247 let daemon = Arc::new(daemon.await);
248
249 let (mut terminator, interrupt_rx) = create_termination();
250 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
251
252 terminator.terminate(Interrupted::UserInt).unwrap();
253
254 let (tx, rx) = oneshot::channel();
255
256 let daemon_ = daemon.clone();
257
258 tokio::spawn(async move {
259 let interrupted = Subscriber
260 .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
261 .await
262 .unwrap();
263
264 tx.send(interrupted).unwrap();
265 });
266
267 tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
268
269 daemon
270 .library_rescan(Context::current())
271 .await
272 .unwrap()
273 .unwrap();
274
275 let action = action_rx.recv().await.unwrap();
276
277 assert_eq!(
278 action,
279 Action::Library(state::action::LibraryAction::Update)
280 );
281
282 let action = action_rx.recv().await.unwrap();
283
284 assert_eq!(
285 action,
286 Action::Popup(PopupAction::Open(PopupType::Notification(
287 "Library rescan finished".into()
288 )))
289 );
290
291 terminator.terminate(Interrupted::UserInt).unwrap();
293 assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
294 }
295}