1use std::sync::Arc;
2
3use mecomp_core::{
4 rpc::MusicPlayerClient,
5 udp::{Event, Listener, Message},
6};
7use state::action::{Action, PopupAction};
8use tarpc::context::Context;
9use termination::Interrupted;
10use tokio::sync::{broadcast, mpsc};
11use ui::widgets::popups::PopupType;
12
13pub mod state;
14pub mod termination;
15#[cfg(test)]
16mod test_utils;
17pub mod ui;
18
19#[derive(Debug)]
20pub struct Subscriber;
21
22impl Subscriber {
23 pub async fn main_loop(
29 &self,
30 daemon: Arc<MusicPlayerClient>,
31 action_tx: mpsc::UnboundedSender<Action>,
32 mut interrupt_rx: broadcast::Receiver<Interrupted>,
33 ) -> anyhow::Result<Interrupted> {
34 let mut listener = Listener::new().await?;
35 daemon
36 .register_listener(Context::current(), listener.local_addr()?)
37 .await?;
38
39 #[allow(clippy::redundant_pub_crate)]
40 let result = loop {
41 tokio::select! {
42 Ok(message) = listener.recv() => {
43 self.handle_message(&action_tx, message)?;
44 }
45 Ok(interrupted) = interrupt_rx.recv() => {
46 break interrupted;
47 }
48 }
49 };
50
51 Ok(result)
52 }
53
54 pub fn handle_message(
60 &self,
61 action_tx: &mpsc::UnboundedSender<Action>,
62 message: Message,
63 ) -> anyhow::Result<()> {
64 match message {
65 Message::Event(event) => {
66 let notification = match event {
67 Event::DaemonShutdown => {
68 action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
69 return Ok(()); }
71 Event::LibraryRescanFinished => "Library rescan finished",
72 Event::LibraryAnalysisFinished => "Library analysis finished",
73 Event::LibraryReclusterFinished => "Library recluster finished",
74 };
75 action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
76
77 action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
78 notification.into(),
79 ))))?;
80 }
81 Message::StateChange(state_change) => {
82 action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
83 state_change,
84 )))?;
85 }
86 }
87
88 Ok(())
89 }
90}
91
92#[cfg(test)]
93mod subscriber_tests {
94 use super::*;
95 use mecomp_core::{audio::AudioKernelSender, config::Settings};
96 use mecomp_daemon::init_test_client_server;
97 use mecomp_storage::{
98 db::schemas::{
99 album::Album, analysis::Analysis, artist::Artist, collection::Collection,
100 playlist::Playlist, song::Song,
101 },
102 test_utils::{arb_analysis_features, init_test_database},
103 };
104 use one_or_many::OneOrMany;
105 use rstest::{fixture, rstest};
106 use surrealdb::{RecordId, Surreal, engine::local::Db};
107 use tarpc::context::Context;
108 use tempfile::tempdir;
109 use termination::create_termination;
110 use test_utils::item_id;
111 use tokio::sync::oneshot;
112
113 async fn db_with_state() -> Arc<Surreal<Db>> {
115 let db = Arc::new(init_test_database().await.unwrap());
116
117 let album_id = RecordId::from_table_key("album", item_id());
118 let analysis_id = RecordId::from_table_key("analysis", item_id());
119 let artist_id = RecordId::from_table_key("artist", item_id());
120 let collection_id = RecordId::from_table_key("collection", item_id());
121 let playlist_id = RecordId::from_table_key("playlist", item_id());
122 let song_id = RecordId::from_table_key("song", item_id());
123
124 let song = Song {
126 id: song_id.clone(),
127 title: "Test Song".into(),
128 artist: OneOrMany::One("Test Artist".into()),
129 album_artist: OneOrMany::One("Test Artist".into()),
130 album: "Test Album".into(),
131 genre: OneOrMany::One("Test Genre".into()),
132 runtime: std::time::Duration::from_secs(180),
133 track: Some(0),
134 disc: Some(0),
135 release_year: Some(2021),
136 extension: "mp3".into(),
137 path: "test.mp3".into(),
138 };
139 let analysis = Analysis {
140 id: analysis_id.clone(),
141 features: arb_analysis_features()(),
142 };
143 let artist = Artist {
144 id: artist_id.clone(),
145 name: song.artist[0].clone(),
146 runtime: song.runtime,
147 album_count: 1,
148 song_count: 1,
149 };
150 let album = Album {
151 id: album_id.clone(),
152 title: song.album.clone(),
153 artist: song.artist.clone(),
154 release: song.release_year,
155 runtime: song.runtime,
156 song_count: 1,
157 discs: 1,
158 genre: song.genre.clone(),
159 };
160 let collection = Collection {
161 id: collection_id.clone(),
162 name: "Collection 0".into(),
163 runtime: song.runtime,
164 song_count: 1,
165 };
166 let playlist = Playlist {
167 id: playlist_id.clone(),
168 name: "Test Playlist".into(),
169 runtime: song.runtime,
170 song_count: 1,
171 };
172
173 Song::create(&db, song).await.unwrap();
175 Analysis::create(&db, song_id.clone(), analysis)
176 .await
177 .unwrap();
178 Artist::create(&db, artist).await.unwrap();
179 Album::create(&db, album).await.unwrap();
180 Collection::create(&db, collection).await.unwrap();
181 Playlist::create(&db, playlist).await.unwrap();
182
183 Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
185 .await
186 .unwrap();
187 Artist::add_album(&db, artist_id.clone(), album_id)
188 .await
189 .unwrap();
190 Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
191 .await
192 .unwrap();
193 Collection::add_songs(&db, collection_id, vec![song_id.clone()])
194 .await
195 .unwrap();
196 Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
197 .await
198 .unwrap();
199
200 db
201 }
202
203 #[fixture]
204 async fn daemon() -> MusicPlayerClient {
205 let music_dir = Arc::new(tempdir().unwrap());
206
207 let db = db_with_state().await;
208 let mut settings = Settings::default();
209 settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
210 let settings = Arc::new(settings);
211 let (tx, _) = std::sync::mpsc::channel();
212 let audio_kernel = AudioKernelSender::start(tx);
213
214 init_test_client_server(db, settings, audio_kernel)
215 .await
216 .unwrap()
217 }
218
219 #[rstest]
220 #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
221 #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
222 #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
223 #[tokio::test]
224 async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
225 let (tx, mut rx) = mpsc::unbounded_channel();
226 let subscriber = Subscriber;
227
228 subscriber.handle_message(&tx, message).unwrap();
229
230 let action = rx.recv().await.unwrap();
231
232 assert_eq!(
233 action,
234 Action::Library(state::action::LibraryAction::Update)
235 );
236
237 let action = rx.recv().await.unwrap();
238
239 assert_eq!(
240 action,
241 Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
242 );
243 }
244
245 #[rstest]
246 #[tokio::test]
247 async fn test_connect(#[future] daemon: MusicPlayerClient) {
248 let daemon = Arc::new(daemon.await);
249
250 let (mut terminator, interrupt_rx) = create_termination();
251 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
252
253 terminator.terminate(Interrupted::UserInt).unwrap();
254
255 let (tx, rx) = oneshot::channel();
256
257 let daemon_ = daemon.clone();
258
259 tokio::spawn(async move {
260 let interrupted = Subscriber
261 .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
262 .await
263 .unwrap();
264
265 tx.send(interrupted).unwrap();
266 });
267
268 tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
269
270 daemon
271 .library_rescan(Context::current())
272 .await
273 .unwrap()
274 .unwrap();
275
276 let action = action_rx.recv().await.unwrap();
277
278 assert_eq!(
279 action,
280 Action::Library(state::action::LibraryAction::Update)
281 );
282
283 let action = action_rx.recv().await.unwrap();
284
285 assert_eq!(
286 action,
287 Action::Popup(PopupAction::Open(PopupType::Notification(
288 "Library rescan finished".into()
289 )))
290 );
291
292 terminator.terminate(Interrupted::UserInt).unwrap();
294 assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
295 }
296}