Skip to main content

mecomp_tui/
lib.rs

1use mecomp_core::udp::{Event, Listener, Message};
2use mecomp_prost::{MusicPlayerClient, RegisterListenerRequest};
3use state::action::{Action, PopupAction};
4use termination::Interrupted;
5use tokio::sync::{broadcast, mpsc};
6use ui::widgets::popups::PopupType;
7
8pub mod state;
9pub mod termination;
10#[cfg(test)]
11mod test_utils;
12pub mod ui;
13
14#[derive(Debug)]
15pub struct Subscriber;
16
17impl Subscriber {
18    /// Main loop for the subscriber.
19    ///
20    /// # Errors
21    ///
22    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
23    pub async fn main_loop(
24        &self,
25        mut daemon: MusicPlayerClient,
26        action_tx: mpsc::UnboundedSender<Action>,
27        mut interrupt_rx: broadcast::Receiver<Interrupted>,
28    ) -> anyhow::Result<Interrupted> {
29        let mut listener = Listener::new().await?;
30        daemon
31            .register_listener(RegisterListenerRequest::new(listener.local_addr()?))
32            .await?;
33
34        #[allow(clippy::redundant_pub_crate)]
35        let result = loop {
36            tokio::select! {
37                Ok(message) = listener.recv() => {
38                   self.handle_message(&action_tx, message)?;
39                }
40                Ok(interrupted) = interrupt_rx.recv() => {
41                    break interrupted;
42                }
43            }
44        };
45
46        Ok(result)
47    }
48
49    /// Handle a message received from the UDP socket.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the message cannot be handled.
54    pub fn handle_message(
55        &self,
56        action_tx: &mpsc::UnboundedSender<Action>,
57        message: Message,
58    ) -> anyhow::Result<()> {
59        match message {
60            Message::Event(event) => {
61                let notification = match event {
62                    Event::DaemonShutdown => {
63                        action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
64                        return Ok(()); // exit early
65                    }
66                    Event::LibraryRescanFinished => "Library rescan finished",
67                    Event::LibraryAnalysisFinished => "Library analysis finished",
68                    Event::LibraryReclusterFinished => "Library recluster finished",
69                };
70                action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
71
72                action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
73                    notification.into(),
74                ))))?;
75            }
76            Message::StateChange(state_change) => {
77                action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
78                    state_change,
79                )))?;
80            }
81        }
82
83        Ok(())
84    }
85}
86
87#[cfg(test)]
88mod subscriber_tests {
89    use std::sync::Arc;
90
91    use super::*;
92    use mecomp_core::{audio::AudioKernelSender, config::Settings};
93    use mecomp_daemon::init_test_client_server;
94    use mecomp_storage::{
95        db::schemas::{
96            album::Album, analysis::Analysis, artist::Artist, collection::Collection,
97            playlist::Playlist, song::Song,
98        },
99        test_utils::{arb_feature_array, init_test_database},
100    };
101    use rstest::{fixture, rstest};
102    use surrealdb::{RecordId, Surreal, engine::local::Db};
103    use tempfile::tempdir;
104    use termination::create_termination;
105    use test_utils::item_id;
106    use tokio::sync::oneshot;
107
108    /// Create a test database with a simple state
109    async fn db_with_state() -> Arc<Surreal<Db>> {
110        let db = Arc::new(init_test_database().await.unwrap());
111
112        let album_id = RecordId::from_table_key("album", item_id().to_string());
113        let analysis_id = RecordId::from_table_key("analysis", item_id().to_string());
114        let artist_id = RecordId::from_table_key("artist", item_id().to_string());
115        let collection_id = RecordId::from_table_key("collection", item_id().to_string());
116        let playlist_id = RecordId::from_table_key("playlist", item_id().to_string());
117        let song_id = RecordId::from_table_key("song", item_id().to_string());
118
119        // create a song, artist, album, collection, and playlist
120        let song = Song {
121            id: song_id.clone(),
122            title: "Test Song".into(),
123            artist: "Test Artist".to_string().into(),
124            album_artist: "Test Artist".to_string().into(),
125            album: "Test Album".into(),
126            genre: "Test Genre".to_string().into(),
127            runtime: std::time::Duration::from_secs(180),
128            track: Some(0),
129            disc: Some(0),
130            release_year: Some(2021),
131            extension: "mp3".into(),
132            path: "test.mp3".into(),
133        };
134        let analysis = Analysis {
135            id: analysis_id.clone(),
136            features: arb_feature_array()(),
137            embedding: arb_feature_array()(),
138        };
139        let artist = Artist {
140            id: artist_id.clone(),
141            name: song.artist[0].clone(),
142            runtime: song.runtime,
143            album_count: 1,
144            song_count: 1,
145        };
146        let album = Album {
147            id: album_id.clone(),
148            title: song.album.clone(),
149            artist: song.artist.clone(),
150            release: song.release_year,
151            runtime: song.runtime,
152            song_count: 1,
153            discs: 1,
154            genre: song.genre.clone(),
155        };
156        let collection = Collection {
157            id: collection_id.clone(),
158            name: "Collection 0".into(),
159            runtime: song.runtime,
160            song_count: 1,
161        };
162        let playlist = Playlist {
163            id: playlist_id.clone(),
164            name: "Test Playlist".into(),
165            runtime: song.runtime,
166            song_count: 1,
167        };
168
169        // insert the items into the database
170        Song::create(&db, song).await.unwrap();
171        Analysis::create(&db, song_id.clone(), analysis)
172            .await
173            .unwrap();
174        Artist::create(&db, artist).await.unwrap();
175        Album::create(&db, album).await.unwrap();
176        Collection::create(&db, collection).await.unwrap();
177        Playlist::create(&db, playlist).await.unwrap();
178
179        // add relationships between the items
180        Album::add_song(&db, album_id.clone(), song_id.clone())
181            .await
182            .unwrap();
183        Artist::add_album(&db, artist_id.clone(), album_id)
184            .await
185            .unwrap();
186        Artist::add_song(&db, artist_id.clone(), song_id.clone())
187            .await
188            .unwrap();
189        Collection::add_songs(&db, collection_id, vec![song_id.clone()])
190            .await
191            .unwrap();
192        Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
193            .await
194            .unwrap();
195
196        db
197    }
198
199    #[fixture]
200    async fn daemon() -> MusicPlayerClient {
201        let music_dir = Arc::new(tempdir().unwrap());
202
203        let db = db_with_state().await;
204        let mut settings = Settings::default();
205        settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
206        let settings = Arc::new(settings);
207        let (tx, _) = std::sync::mpsc::channel();
208        let audio_kernel = AudioKernelSender::start(tx);
209
210        init_test_client_server(db, settings, audio_kernel)
211            .await
212            .unwrap()
213    }
214
215    #[rstest]
216    #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
217    #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
218    #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
219    #[tokio::test]
220    async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
221        let (tx, mut rx) = mpsc::unbounded_channel();
222        let subscriber = Subscriber;
223
224        subscriber.handle_message(&tx, message).unwrap();
225
226        let action = rx.recv().await.unwrap();
227
228        assert_eq!(
229            action,
230            Action::Library(state::action::LibraryAction::Update)
231        );
232
233        let action = rx.recv().await.unwrap();
234
235        assert_eq!(
236            action,
237            Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
238        );
239    }
240
241    #[rstest]
242    #[tokio::test]
243    async fn test_connect(#[future] daemon: MusicPlayerClient) {
244        let mut daemon = daemon.await;
245
246        let (mut terminator, interrupt_rx) = create_termination();
247        let (action_tx, mut action_rx) = mpsc::unbounded_channel();
248
249        terminator.terminate(Interrupted::UserInt).unwrap();
250
251        let (tx, rx) = oneshot::channel();
252
253        let daemon_ = daemon.clone();
254
255        tokio::spawn(async move {
256            let interrupted = Subscriber
257                .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
258                .await
259                .unwrap();
260
261            tx.send(interrupted).unwrap();
262        });
263
264        tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
265
266        daemon.library_rescan(()).await.unwrap();
267
268        let action = action_rx.recv().await.unwrap();
269
270        assert_eq!(
271            action,
272            Action::Library(state::action::LibraryAction::Update)
273        );
274
275        let action = action_rx.recv().await.unwrap();
276
277        assert_eq!(
278            action,
279            Action::Popup(PopupAction::Open(PopupType::Notification(
280                "Library rescan finished".into()
281            )))
282        );
283
284        // kill the application
285        terminator.terminate(Interrupted::UserInt).unwrap();
286        assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
287    }
288}