mecomp_tui/
lib.rs

1use std::sync::Arc;
2
3use mecomp_core::{
4    rpc::MusicPlayerClient,
5    udp::{Event, Listener, Message},
6};
7use state::action::{Action, PopupAction};
8use tarpc::context::Context;
9use termination::Interrupted;
10use tokio::sync::{broadcast, mpsc};
11use ui::widgets::popups::PopupType;
12
13pub mod state;
14pub mod termination;
15#[cfg(test)]
16mod test_utils;
17pub mod ui;
18
19#[derive(Debug)]
20pub struct Subscriber;
21
22impl Subscriber {
23    /// Main loop for the subscriber.
24    ///
25    /// # Errors
26    ///
27    /// Returns an error if the main loop cannot be started, or if an error occurs while handling a message.
28    pub async fn main_loop(
29        &self,
30        daemon: Arc<MusicPlayerClient>,
31        action_tx: mpsc::UnboundedSender<Action>,
32        mut interrupt_rx: broadcast::Receiver<Interrupted>,
33    ) -> anyhow::Result<Interrupted> {
34        let mut listener = Listener::new().await?;
35        daemon
36            .register_listener(Context::current(), listener.local_addr()?)
37            .await?;
38
39        #[allow(clippy::redundant_pub_crate)]
40        let result = loop {
41            tokio::select! {
42                Ok(message) = listener.recv() => {
43                   self.handle_message(&action_tx, message)?;
44                }
45                Ok(interrupted) = interrupt_rx.recv() => {
46                    break interrupted;
47                }
48            }
49        };
50
51        Ok(result)
52    }
53
54    /// Handle a message received from the UDP socket.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the message cannot be handled.
59    pub fn handle_message(
60        &self,
61        action_tx: &mpsc::UnboundedSender<Action>,
62        message: Message,
63    ) -> anyhow::Result<()> {
64        match message {
65            Message::Event(event) => {
66                let notification = match event {
67                    Event::DaemonShutdown => {
68                        action_tx.send(Action::General(state::action::GeneralAction::Exit))?;
69                        return Ok(()); // exit early
70                    }
71                    Event::LibraryRescanFinished => "Library rescan finished",
72                    Event::LibraryAnalysisFinished => "Library analysis finished",
73                    Event::LibraryReclusterFinished => "Library recluster finished",
74                };
75                action_tx.send(Action::Library(state::action::LibraryAction::Update))?;
76
77                action_tx.send(Action::Popup(PopupAction::Open(PopupType::Notification(
78                    notification.into(),
79                ))))?;
80            }
81            Message::StateChange(state_change) => {
82                action_tx.send(Action::Audio(state::action::AudioAction::StateChange(
83                    state_change,
84                )))?;
85            }
86        }
87
88        Ok(())
89    }
90}
91
92#[cfg(test)]
93mod subscriber_tests {
94    use super::*;
95    use mecomp_core::{audio::AudioKernelSender, config::Settings};
96    use mecomp_daemon::init_test_client_server;
97    use mecomp_storage::{
98        db::schemas::{
99            album::Album, analysis::Analysis, artist::Artist, collection::Collection,
100            playlist::Playlist, song::Song,
101        },
102        test_utils::{arb_analysis_features, init_test_database},
103    };
104    use one_or_many::OneOrMany;
105    use rstest::{fixture, rstest};
106    use surrealdb::{engine::local::Db, RecordId, Surreal};
107    use tarpc::context::Context;
108    use tempfile::tempdir;
109    use termination::create_termination;
110    use test_utils::item_id;
111    use tokio::sync::oneshot;
112
113    /// Create a test database with a simple state
114    async fn db_with_state() -> Arc<Surreal<Db>> {
115        let db = Arc::new(init_test_database().await.unwrap());
116
117        let album_id = RecordId::from_table_key("album", item_id());
118        let analysis_id = RecordId::from_table_key("analysis", item_id());
119        let artist_id = RecordId::from_table_key("artist", item_id());
120        let collection_id = RecordId::from_table_key("collection", item_id());
121        let playlist_id = RecordId::from_table_key("playlist", item_id());
122        let song_id = RecordId::from_table_key("song", item_id());
123
124        // create a song, artist, album, collection, and playlist
125        let song = Song {
126            id: song_id.clone(),
127            title: "Test Song".into(),
128            artist: OneOrMany::One("Test Artist".into()),
129            album_artist: OneOrMany::One("Test Artist".into()),
130            album: "Test Album".into(),
131            genre: OneOrMany::One("Test Genre".into()),
132            runtime: std::time::Duration::from_secs(180),
133            track: Some(0),
134            disc: Some(0),
135            release_year: Some(2021),
136            extension: "mp3".into(),
137            path: "test.mp3".into(),
138        };
139        let analysis = Analysis {
140            id: analysis_id.clone(),
141            features: arb_analysis_features()(),
142        };
143        let artist = Artist {
144            id: artist_id.clone(),
145            name: song.artist[0].clone(),
146            runtime: song.runtime,
147            album_count: 1,
148            song_count: 1,
149        };
150        let album = Album {
151            id: album_id.clone(),
152            title: song.album.clone(),
153            artist: song.artist.clone(),
154            release: song.release_year,
155            runtime: song.runtime,
156            song_count: 1,
157            discs: 1,
158            genre: song.genre.clone(),
159        };
160        let collection = Collection {
161            id: collection_id.clone(),
162            name: "Collection 0".into(),
163            runtime: song.runtime,
164            song_count: 1,
165        };
166        let playlist = Playlist {
167            id: playlist_id.clone(),
168            name: "Test Playlist".into(),
169            runtime: song.runtime,
170            song_count: 1,
171        };
172
173        // insert the items into the database
174        Song::create(&db, song).await.unwrap();
175        Analysis::create(&db, song_id.clone(), analysis)
176            .await
177            .unwrap();
178        Artist::create(&db, artist).await.unwrap();
179        Album::create(&db, album).await.unwrap();
180        Collection::create(&db, collection).await.unwrap();
181        Playlist::create(&db, playlist).await.unwrap();
182
183        // add relationships between the items
184        Album::add_songs(&db, album_id.clone(), vec![song_id.clone()])
185            .await
186            .unwrap();
187        Artist::add_album(&db, artist_id.clone(), album_id)
188            .await
189            .unwrap();
190        Artist::add_songs(&db, artist_id.clone(), vec![song_id.clone()])
191            .await
192            .unwrap();
193        Collection::add_songs(&db, collection_id, vec![song_id.clone()])
194            .await
195            .unwrap();
196        Playlist::add_songs(&db, playlist_id, vec![song_id.clone()])
197            .await
198            .unwrap();
199
200        db
201    }
202
203    #[fixture]
204    async fn daemon() -> MusicPlayerClient {
205        let music_dir = Arc::new(tempdir().unwrap());
206
207        let db = db_with_state().await;
208        let mut settings = Settings::default();
209        settings.daemon.library_paths = vec![music_dir.path().to_path_buf()].into_boxed_slice();
210        let settings = Arc::new(settings);
211        let (tx, _) = std::sync::mpsc::channel();
212        let audio_kernel = AudioKernelSender::start(tx);
213
214        init_test_client_server(db, settings, audio_kernel)
215            .await
216            .unwrap()
217    }
218
219    #[rstest]
220    #[case(Message::Event(Event::LibraryRescanFinished), "Library rescan finished".into())]
221    #[case(Message::Event(Event::LibraryAnalysisFinished), "Library analysis finished".into())]
222    #[case(Message::Event(Event::LibraryReclusterFinished), "Library recluster finished".into())]
223    #[tokio::test]
224    async fn test_handle_message(#[case] message: Message, #[case] expected: String) {
225        let (tx, mut rx) = mpsc::unbounded_channel();
226        let subscriber = Subscriber;
227
228        subscriber.handle_message(&tx, message).unwrap();
229
230        let action = rx.recv().await.unwrap();
231
232        assert_eq!(
233            action,
234            Action::Library(state::action::LibraryAction::Update)
235        );
236
237        let action = rx.recv().await.unwrap();
238
239        assert_eq!(
240            action,
241            Action::Popup(PopupAction::Open(PopupType::Notification(expected.into())))
242        );
243    }
244
245    #[rstest]
246    #[tokio::test]
247    async fn test_connect(#[future] daemon: MusicPlayerClient) {
248        let daemon = Arc::new(daemon.await);
249
250        let (mut terminator, interrupt_rx) = create_termination();
251        let (action_tx, mut action_rx) = mpsc::unbounded_channel();
252
253        terminator.terminate(Interrupted::UserInt).unwrap();
254
255        let (tx, rx) = oneshot::channel();
256
257        let daemon_ = daemon.clone();
258
259        tokio::spawn(async move {
260            let interrupted = Subscriber
261                .main_loop(daemon_, action_tx, interrupt_rx.resubscribe())
262                .await
263                .unwrap();
264
265            tx.send(interrupted).unwrap();
266        });
267
268        tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
269
270        daemon
271            .library_rescan(Context::current())
272            .await
273            .unwrap()
274            .unwrap();
275
276        let action = action_rx.recv().await.unwrap();
277
278        assert_eq!(
279            action,
280            Action::Library(state::action::LibraryAction::Update)
281        );
282
283        let action = action_rx.recv().await.unwrap();
284
285        assert_eq!(
286            action,
287            Action::Popup(PopupAction::Open(PopupType::Notification(
288                "Library rescan finished".into()
289            )))
290        );
291
292        // kill the application
293        terminator.terminate(Interrupted::UserInt).unwrap();
294        assert_eq!(rx.await.unwrap(), Interrupted::UserInt);
295    }
296}