mecomp_tui/
lib.rs

1use std::sync::Arc;
2
3use mecomp_core::{
4    rpc::MusicPlayerClient,
5    udp::{Event, Listener, Message},
6};
7use state::action::{Action, PopupAction};
8use tarpc::context::Context;
9use termination::Interrupted;
10use tokio::sync::{broadcast, mpsc};
11use ui::widgets::popups::PopupType;
12
13pub mod state;
14pub mod termination;
15#[cfg(test)]
16mod test_utils;
17pub mod ui;
18
19#[derive(Debug)]
20pub struct Subscriber;
21
22impl Subscriber {
23    /// Main loop for the subscriber.
24    ///
25    /// # Errors
26    ///
27    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
28    pub async fn main_loop(
29        &self,
30        daemon: Arc<MusicPlayerClient>,
31        action_tx: mpsc::UnboundedSender<Action>,
32        mut interrupt_rx: broadcast::Receiver<Interrupted>,
33    ) -> anyhow::Result<Interrupted> {
34        let mut listener = Listener::new().await?;
35        daemon
36            .register_listener(Context::current(), listener.local_addr()?)
37            .await?;
38
39        #[allow(clippy::redundant_pub_crate)]
40        let result = loop {
41            tokio::select! {
42                Ok(message) = listener.recv() => {
43                   self.handle_message(&action_tx, message)?;
44                }
45                Ok(interrupted) = interrupt_rx.recv() => {
46                    break interrupted;
47                }
48            }
49        };
50
51        Ok(result)
52    }
53
54    /// Handle a message received from the UDP socket.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the message cannot be handled.
59    pub fn handle_message(
60        &self,
61        action_tx: &mpsc::UnboundedSender<Action>,
62        message: Message,
63    ) -> anyhow::Result<()> {
64        match message {
65            Message::Event(event) => {
66                let notification = match event {
67                    Event::DaemonShutdown => {
68                        action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
69                        return Ok(()); // exit early
70                    }
71                    Event::LibraryRescanFinished => "Library rescan finished",
72                    Event::LibraryAnalysisFinished => "Library analysis finished",
73                    Event::LibraryReclusterFinished => "Library recluster finished",
74                };
75                action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
76
77                action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
78                    notification.into(),
79                ))))?;
80            }
81            Message::StateChange(state_change) => {
82                action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
83                    state_change,
84                )))?;
85            }
86        }
87
88        Ok(())
89    }
90}
91
92#[cfg(test)]
93mod subscriber_tests {
94    use super::*;
95    use mecomp_core::{audio::AudioKernelSender, config::Settings};
96    use mecomp_daemon::init_test_client_server;
97    use mecomp_storage::{
98        db::schemas::{
99            album::Album, analysis::Analysis, artist::Artist, collection::Collection,
100            playlist::Playlist, song::Song,
101        },
102        test_utils::{arb_analysis_features, init_test_database},
103    };
104    use rstest::{fixture, rstest};
105    use surrealdb::{RecordId, Surreal, engine::local::Db};
106    use tarpc::context::Context;
107    use tempfile::tempdir;
108    use termination::create_termination;
109    use test_utils::item_id;
110    use tokio::sync::oneshot;
111
112    /// Create a test database with a simple state
113    async fn db_with_state() -> Arc<Surreal<Db>> {
114        let db = Arc::new(init_test_database().await.unwrap());
115
116        let album_id = RecordId::from_table_key("album", item_id());
117        let analysis_id = RecordId::from_table_key("analysis", item_id());
118        let artist_id = RecordId::from_table_key("artist", item_id());
119        let collection_id = RecordId::from_table_key("collection", item_id());
120        let playlist_id = RecordId::from_table_key("playlist", item_id());
121        let song_id = RecordId::from_table_key("song", item_id());
122
123        // create a song, artist, album, collection, and playlist
124        let song = Song {
125            id: song_id.clone(),
126            title: "Test Song".into(),
127            artist: "Test Artist".to_string().into(),
128            album_artist: "Test Artist".to_string().into(),
129            album: "Test Album".into(),
130            genre: "Test Genre".to_string().into(),
131            runtime: std::time::Duration::from_secs(180),
132            track: Some(0),
133            disc: Some(0),
134            release_year: Some(2021),
135            extension: "mp3".into(),
136            path: "test.mp3".into(),
137        };
138        let analysis = Analysis {
139            id: analysis_id.clone(),
140            features: arb_analysis_features()(),
141        };
142        let artist = Artist {
143            id: artist_id.clone(),
144            name: song.artist[0].clone(),
145            runtime: song.runtime,
146            album_count: 1,
147            song_count: 1,
148        };
149        let album = Album {
150            id: album_id.clone(),
151            title: song.album.clone(),
152            artist: song.artist.clone(),
153            release: song.release_year,
154            runtime: song.runtime,
155            song_count: 1,
156            discs: 1,
157            genre: song.genre.clone(),
158        };
159        let collection = Collection {
160            id: collection_id.clone(),
161            name: "Collection 0".into(),
162            runtime: song.runtime,
163            song_count: 1,
164        };
165        let playlist = Playlist {
166            id: playlist_id.clone(),
167            name: "Test Playlist".into(),
168            runtime: song.runtime,
169            song_count: 1,
170        };
171
172        // insert the items into the database
173        Song::create(&db, song).await.unwrap();
174        Analysis::create(&db, song_id.clone(), analysis)
175            .await
176            .unwrap();
177        Artist::create(&db, artist).await.unwrap();
178        Album::create(&db, album).await.unwrap();
179        Collection::create(&db, collection).await.unwrap();
180        Playlist::create(&db, playlist).await.unwrap();
181
182        // add relationships between the items
183        Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
184            .await
185            .unwrap();
186        Artist::add_album(&db, artist_id.clone(), album_id)
187            .await
188            .unwrap();
189        Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
190            .await
191            .unwrap();
192        Collection::add_songs(&db, collection_id, vec![song_id.clone()])
193            .await
194            .unwrap();
195        Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
196            .await
197            .unwrap();
198
199        db
200    }
201
202    #[fixture]
203    async fn daemon() -> MusicPlayerClient {
204        let music_dir = Arc::new(tempdir().unwrap());
205
206        let db = db_with_state().await;
207        let mut settings = Settings::default();
208        settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
209        let settings = Arc::new(settings);
210        let (tx, _) = std::sync::mpsc::channel();
211        let audio_kernel = AudioKernelSender::start(tx);
212
213        init_test_client_server(db, settings, audio_kernel)
214            .await
215            .unwrap()
216    }
217
218    #[rstest]
219    #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
220    #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
221    #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
222    #[tokio::test]
223    async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
224        let (tx, mut rx) = mpsc::unbounded_channel();
225        let subscriber = Subscriber;
226
227        subscriber.handle_message(&tx, message).unwrap();
228
229        let action = rx.recv().await.unwrap();
230
231        assert_eq!(
232            action,
233            Action::Library(state::action::LibraryAction::Update)
234        );
235
236        let action = rx.recv().await.unwrap();
237
238        assert_eq!(
239            action,
240            Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
241        );
242    }
243
244    #[rstest]
245    #[tokio::test]
246    async fn test_connect(#[future] daemon: MusicPlayerClient) {
247        let daemon = Arc::new(daemon.await);
248
249        let (mut terminator, interrupt_rx) = create_termination();
250        let (action_tx, mut action_rx) = mpsc::unbounded_channel();
251
252        terminator.terminate(Interrupted::UserInt).unwrap();
253
254        let (tx, rx) = oneshot::channel();
255
256        let daemon_ = daemon.clone();
257
258        tokio::spawn(async move {
259            let interrupted = Subscriber
260                .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
261                .await
262                .unwrap();
263
264            tx.send(interrupted).unwrap();
265        });
266
267        tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
268
269        daemon
270            .library_rescan(Context::current())
271            .await
272            .unwrap()
273            .unwrap();
274
275        let action = action_rx.recv().await.unwrap();
276
277        assert_eq!(
278            action,
279            Action::Library(state::action::LibraryAction::Update)
280        );
281
282        let action = action_rx.recv().await.unwrap();
283
284        assert_eq!(
285            action,
286            Action::Popup(PopupAction::Open(PopupType::Notification(
287                "Library rescan finished".into()
288            )))
289        );
290
291        // kill the application
292        terminator.terminate(Interrupted::UserInt).unwrap();
293        assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
294    }
295}