mecomp_tui/state/
library.rs

1//! The library state store.
2//!
3//! Updates every minute, or when the user requests a rescan, ands/removes/updates a playlist, or reclusters collections.
4
5use std::sync::Arc;
6
7use tokio::sync::{
8    broadcast,
9    mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10};
11
12use mecomp_core::{rpc::MusicPlayerClient, state::library::LibraryBrief};
13use mecomp_storage::db::schemas;
14
15use crate::termination::Interrupted;
16
17use super::action::LibraryAction;
18
19/// The library state store.
20#[derive(Debug, Clone)]
21#[allow(clippy::module_name_repetitions)]
22pub struct LibraryState {
23    state_tx: UnboundedSender<LibraryBrief>,
24}
25
26impl LibraryState {
27    /// create a new library state store, and return the receiver for listening to state updates.
28    #[must_use]
29    pub fn new() -> (Self, UnboundedReceiver<LibraryBrief>) {
30        let (state_tx, state_rx) = unbounded_channel::<LibraryBrief>();
31
32        (Self { state_tx }, state_rx)
33    }
34
35    /// a loop that updates the library state every tick.
36    ///
37    /// # Errors
38    ///
39    /// Fails if the state cannot be sent
40    /// or if the daemon client can't connect to the server
41    /// or if the daemon returns an error
42    pub async fn main_loop(
43        &self,
44        daemon: Arc<MusicPlayerClient>,
45        mut action_rx: UnboundedReceiver<LibraryAction>,
46        mut interrupt_rx: broadcast::Receiver<Interrupted>,
47    ) -> anyhow::Result<Interrupted> {
48        let mut state = get_library(daemon.clone()).await?;
49
50        // the initial state once
51        self.state_tx.send(state.clone())?;
52
53        loop {
54            tokio::select! {
55                // Handle the actions coming from the UI
56                // and process them to do async operations
57                Some(action) = action_rx.recv() => {
58                    handle_action(&mut state, &self.state_tx, daemon.clone(),action).await?;
59                },
60                // Catch and handle interrupt signal to gracefully shutdown
61                Ok(interrupted) = interrupt_rx.recv() => {
62                    break Ok(interrupted);
63                }
64            }
65        }
66    }
67}
68async fn handle_action(
69    state: &mut LibraryBrief,
70    state_tx: &UnboundedSender<LibraryBrief>,
71    daemon: Arc<MusicPlayerClient>,
72    action: LibraryAction,
73) -> anyhow::Result<()> {
74    let mut update = false;
75    let mut flag_update = || update = true;
76    let current_context = tarpc::context::current;
77
78    match action {
79        LibraryAction::Rescan => rescan_library(daemon.clone()).await?,
80        LibraryAction::Update => flag_update(),
81        LibraryAction::Analyze => analyze_library(daemon.clone()).await?,
82        LibraryAction::Recluster => recluster_library(daemon.clone()).await?,
83        LibraryAction::CreatePlaylist(name) => daemon
84            .playlist_get_or_create(current_context(), name)
85            .await?
86            .map(|_| flag_update())?,
87        LibraryAction::RemovePlaylist(id) => {
88            assert_eq!(id.tb, schemas::playlist::TABLE_NAME);
89            daemon
90                .playlist_remove(current_context(), id)
91                .await?
92                .map(|()| flag_update())?;
93        }
94        LibraryAction::RenamePlaylist(id, name) => {
95            assert_eq!(id.tb, schemas::playlist::TABLE_NAME);
96            daemon
97                .playlist_rename(current_context(), id, name)
98                .await?
99                .map(|_| flag_update())?;
100        }
101        LibraryAction::RemoveSongsFromPlaylist(playlist, songs) => {
102            assert_eq!(playlist.tb, schemas::playlist::TABLE_NAME);
103            assert!(songs.iter().all(|s| s.tb == schemas::song::TABLE_NAME));
104            daemon
105                .playlist_remove_songs(current_context(), playlist, songs)
106                .await??;
107        }
108        LibraryAction::AddThingsToPlaylist(playlist, things) => {
109            assert_eq!(playlist.tb, schemas::playlist::TABLE_NAME);
110            daemon
111                .playlist_add_list(current_context(), playlist, things)
112                .await??;
113        }
114        LibraryAction::CreatePlaylistAndAddThings(name, things) => {
115            let playlist = daemon
116                .playlist_get_or_create(current_context(), name)
117                .await??;
118            daemon
119                .playlist_add_list(current_context(), playlist, things)
120                .await?
121                .map(|()| flag_update())?;
122        }
123        LibraryAction::CreateDynamicPlaylist(name, query) => daemon
124            .dynamic_playlist_create(current_context(), name, query)
125            .await?
126            .map(|_| flag_update())?,
127        LibraryAction::RemoveDynamicPlaylist(id) => {
128            assert_eq!(id.tb, schemas::dynamic::TABLE_NAME);
129            daemon
130                .dynamic_playlist_remove(current_context(), id)
131                .await?
132                .map(|()| flag_update())?;
133        }
134        LibraryAction::UpdateDynamicPlaylist(id, changes) => {
135            assert_eq!(id.tb, schemas::dynamic::TABLE_NAME);
136            daemon
137                .dynamic_playlist_update(current_context(), id, changes)
138                .await?
139                .map(|_| flag_update())?;
140        }
141    }
142
143    if update {
144        *state = get_library(daemon).await?;
145        state_tx.send(state.clone())?;
146    }
147
148    Ok(())
149}
150
151async fn get_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<LibraryBrief> {
152    let ctx = tarpc::context::current();
153    Ok(daemon.library_brief(ctx).await??)
154}
155
156/// initiate a rescan and wait until it's done
157async fn rescan_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
158    let ctx = tarpc::context::current();
159
160    daemon.library_rescan(ctx).await??;
161
162    Ok(())
163}
164
165/// initiate an analysis and wait until it's done
166async fn analyze_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
167    let ctx = tarpc::context::current();
168
169    daemon.library_analyze(ctx, false).await??;
170
171    Ok(())
172}
173
174/// initiate a recluster and wait until it's done
175async fn recluster_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
176    let ctx = tarpc::context::current();
177
178    daemon.library_recluster(ctx).await??;
179
180    Ok(())
181}