mecomp_tui/
lib.rs

1use mecomp_core::udp::{Event, Listener, Message};
2use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
3use state::action::{Action, PopupAction};
4use termination::Interrupted;
5use tokio::sync::{broadcast, mpsc};
6use ui::widgets::popups::PopupType;
7
8pub mod state;
9pub mod termination;
10#[cfg(test)]
11mod test_utils;
12pub mod ui;
13
14#[derive(Debug)]
15pub struct Subscriber;
16
17impl Subscriber {
18    /// Main loop for the subscriber.
19    ///
20    /// # Errors
21    ///
22    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
23    pub async fn main_loop(
24        &self,
25        mut daemon: MusicPlayerClient,
26        action_tx: mpsc::UnboundedSender<Action>,
27        mut interrupt_rx: broadcast::Receiver<Interrupted>,
28    ) -> anyhow::Result<Interrupted> {
29        let mut listener = Listener::new().await?;
30        daemon
31            .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
32            .await?;
33
34        #[allow(clippy::redundant_pub_crate)]
35        let result = loop {
36            tokio::select! {
37                Ok(message) = listener.recv() => {
38                   self.handle_message(&action_tx, message)?;
39                }
40                Ok(interrupted) = interrupt_rx.recv() => {
41                    break interrupted;
42                }
43            }
44        };
45
46        Ok(result)
47    }
48
49    /// Handle a message received from the UDP socket.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the message cannot be handled.
54    pub fn handle_message(
55        &self,
56        action_tx: &mpsc::UnboundedSender<Action>,
57        message: Message,
58    ) -> anyhow::Result<()> {
59        match message {
60            Message::Event(event) => {
61                let notification = match event {
62                    Event::DaemonShutdown => {
63                        action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
64                        return Ok(()); // exit early
65                    }
66                    Event::LibraryRescanFinished => "Library rescan finished",
67                    Event::LibraryAnalysisFinished => "Library analysis finished",
68                    Event::LibraryReclusterFinished => "Library recluster finished",
69                };
70                action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
71
72                action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
73                    notification.into(),
74                ))))?;
75            }
76            Message::StateChange(state_change) => {
77                action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
78                    state_change,
79                )))?;
80            }
81        }
82
83        Ok(())
84    }
85}
86
87#[cfg(test)]
88mod subscriber_tests {
89    use std::sync::Arc;
90
91    use super::*;
92    use mecomp_core::{audio::AudioKernelSender, config::Settings};
93    use mecomp_daemon::init_test_client_server;
94    use mecomp_storage::{
95        db::schemas::{
96            album::Album, analysis::Analysis, artist::Artist, collection::Collection,
97            playlist::Playlist, song::Song,
98        },
99        test_utils::{arb_analysis_features, init_test_database},
100    };
101    use rstest::{fixture, rstest};
102    use surrealdb::{RecordId, Surreal, engine::local::Db};
103    use tempfile::tempdir;
104    use termination::create_termination;
105    use test_utils::item_id;
106    use tokio::sync::oneshot;
107
108    /// Create a test database with a simple state
109    async fn db_with_state() -> Arc<Surreal<Db>> {
110        let db = Arc::new(init_test_database().await.unwrap());
111
112        let album_id = RecordId::from_table_key("album", item_id().to_string());
113        let analysis_id = RecordId::from_table_key("analysis", item_id().to_string());
114        let artist_id = RecordId::from_table_key("artist", item_id().to_string());
115        let collection_id = RecordId::from_table_key("collection", item_id().to_string());
116        let playlist_id = RecordId::from_table_key("playlist", item_id().to_string());
117        let song_id = RecordId::from_table_key("song", item_id().to_string());
118
119        // create a song, artist, album, collection, and playlist
120        let song = Song {
121            id: song_id.clone(),
122            title: "Test Song".into(),
123            artist: "Test Artist".to_string().into(),
124            album_artist: "Test Artist".to_string().into(),
125            album: "Test Album".into(),
126            genre: "Test Genre".to_string().into(),
127            runtime: std::time::Duration::from_secs(180),
128            track: Some(0),
129            disc: Some(0),
130            release_year: Some(2021),
131            extension: "mp3".into(),
132            path: "test.mp3".into(),
133        };
134        let analysis = Analysis {
135            id: analysis_id.clone(),
136            features: arb_analysis_features()(),
137        };
138        let artist = Artist {
139            id: artist_id.clone(),
140            name: song.artist[0].clone(),
141            runtime: song.runtime,
142            album_count: 1,
143            song_count: 1,
144        };
145        let album = Album {
146            id: album_id.clone(),
147            title: song.album.clone(),
148            artist: song.artist.clone(),
149            release: song.release_year,
150            runtime: song.runtime,
151            song_count: 1,
152            discs: 1,
153            genre: song.genre.clone(),
154        };
155        let collection = Collection {
156            id: collection_id.clone(),
157            name: "Collection 0".into(),
158            runtime: song.runtime,
159            song_count: 1,
160        };
161        let playlist = Playlist {
162            id: playlist_id.clone(),
163            name: "Test Playlist".into(),
164            runtime: song.runtime,
165            song_count: 1,
166        };
167
168        // insert the items into the database
169        Song::create(&db, song).await.unwrap();
170        Analysis::create(&db, song_id.clone(), analysis)
171            .await
172            .unwrap();
173        Artist::create(&db, artist).await.unwrap();
174        Album::create(&db, album).await.unwrap();
175        Collection::create(&db, collection).await.unwrap();
176        Playlist::create(&db, playlist).await.unwrap();
177
178        // add relationships between the items
179        Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
180            .await
181            .unwrap();
182        Artist::add_album(&db, artist_id.clone(), album_id)
183            .await
184            .unwrap();
185        Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
186            .await
187            .unwrap();
188        Collection::add_songs(&db, collection_id, vec![song_id.clone()])
189            .await
190            .unwrap();
191        Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
192            .await
193            .unwrap();
194
195        db
196    }
197
198    #[fixture]
199    async fn daemon() -> MusicPlayerClient {
200        let music_dir = Arc::new(tempdir().unwrap());
201
202        let db = db_with_state().await;
203        let mut settings = Settings::default();
204        settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
205        let settings = Arc::new(settings);
206        let (tx, _) = std::sync::mpsc::channel();
207        let audio_kernel = AudioKernelSender::start(tx);
208
209        init_test_client_server(db, settings, audio_kernel)
210            .await
211            .unwrap()
212    }
213
214    #[rstest]
215    #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
216    #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
217    #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
218    #[tokio::test]
219    async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
220        let (tx, mut rx) = mpsc::unbounded_channel();
221        let subscriber = Subscriber;
222
223        subscriber.handle_message(&tx, message).unwrap();
224
225        let action = rx.recv().await.unwrap();
226
227        assert_eq!(
228            action,
229            Action::Library(state::action::LibraryAction::Update)
230        );
231
232        let action = rx.recv().await.unwrap();
233
234        assert_eq!(
235            action,
236            Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
237        );
238    }
239
240    #[rstest]
241    #[tokio::test]
242    async fn test_connect(#[future] daemon: MusicPlayerClient) {
243        let mut daemon = daemon.await;
244
245        let (mut terminator, interrupt_rx) = create_termination();
246        let (action_tx, mut action_rx) = mpsc::unbounded_channel();
247
248        terminator.terminate(Interrupted::UserInt).unwrap();
249
250        let (tx, rx) = oneshot::channel();
251
252        let daemon_ = daemon.clone();
253
254        tokio::spawn(async move {
255            let interrupted = Subscriber
256                .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
257                .await
258                .unwrap();
259
260            tx.send(interrupted).unwrap();
261        });
262
263        tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
264
265        daemon.library_rescan(()).await.unwrap();
266
267        let action = action_rx.recv().await.unwrap();
268
269        assert_eq!(
270            action,
271            Action::Library(state::action::LibraryAction::Update)
272        );
273
274        let action = action_rx.recv().await.unwrap();
275
276        assert_eq!(
277            action,
278            Action::Popup(PopupAction::Open(PopupType::Notification(
279                "Library rescan finished".into()
280            )))
281        );
282
283        // kill the application
284        terminator.terminate(Interrupted::UserInt).unwrap();
285        assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
286    }
287}