mecomp_tui/state/
library.rs

1//! The library state store.
2//!
3//! Updates every minute, or when the user requests a rescan, ands/removes/updates a playlist, or reclusters collections.
4
5use std::sync::Arc;
6
7use tokio::sync::{
8    broadcast,
9    mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10};
11
12use mecomp_core::{
13    errors::SerializableLibraryError, rpc::MusicPlayerClient, state::library::LibraryBrief,
14};
15use mecomp_storage::db::schemas;
16
17use crate::termination::Interrupted;
18
19use super::action::LibraryAction;
20
21/// The library state store.
22#[derive(Debug, Clone)]
23#[allow(clippy::module_name_repetitions)]
24pub struct LibraryState {
25    state_tx: UnboundedSender<LibraryBrief>,
26}
27
28impl LibraryState {
29    /// create a new library state store, and return the receiver for listening to state updates.
30    #[must_use]
31    pub fn new() -> (Self, UnboundedReceiver<LibraryBrief>) {
32        let (state_tx, state_rx) = unbounded_channel::<LibraryBrief>();
33
34        (Self { state_tx }, state_rx)
35    }
36
37    /// a loop that updates the library state every tick.
38    ///
39    /// # Errors
40    ///
41    /// Fails if the state cannot be sent
42    /// or if the daemon client can't connect to the server
43    /// or if the daemon returns an error
44    pub async fn main_loop(
45        &self,
46        daemon: Arc<MusicPlayerClient>,
47        mut action_rx: UnboundedReceiver<LibraryAction>,
48        mut interrupt_rx: broadcast::Receiver<Interrupted>,
49    ) -> anyhow::Result<Interrupted> {
50        // the initial state once
51        let state = get_library(daemon.clone()).await?;
52        self.state_tx.send(state)?;
53
54        loop {
55            tokio::select! {
56                // Handle the actions coming from the UI
57                // and process them to do async operations
58                Some(action) = action_rx.recv() => {
59                    handle_action(&self.state_tx, daemon.clone(),action).await?;
60                },
61                // Catch and handle interrupt signal to gracefully shutdown
62                Ok(interrupted) = interrupt_rx.recv() => {
63                    break Ok(interrupted);
64                }
65            }
66        }
67    }
68}
69async fn handle_action(
70    state_tx: &UnboundedSender<LibraryBrief>,
71    daemon: Arc<MusicPlayerClient>,
72    action: LibraryAction,
73) -> anyhow::Result<()> {
74    let mut update = false;
75    let mut flag_update = || update = true;
76    let current_context = tarpc::context::current;
77
78    match action {
79        LibraryAction::Rescan => rescan_library(daemon.clone()).await?,
80        LibraryAction::Update => flag_update(),
81        LibraryAction::Analyze => analyze_library(daemon.clone()).await?,
82        LibraryAction::Recluster => recluster_library(daemon.clone()).await?,
83        LibraryAction::CreatePlaylist(name) => daemon
84            .playlist_get_or_create(current_context(), name)
85            .await?
86            .map(|_| flag_update())?,
87        LibraryAction::RemovePlaylist(id) => {
88            assert_eq!(id.tb, schemas::playlist::TABLE_NAME);
89            daemon
90                .playlist_remove(current_context(), id)
91                .await?
92                .map(|()| flag_update())?;
93        }
94        LibraryAction::RenamePlaylist(id, name) => {
95            assert_eq!(id.tb, schemas::playlist::TABLE_NAME);
96            daemon
97                .playlist_rename(current_context(), id, name)
98                .await?
99                .map(|_| flag_update())?;
100        }
101        LibraryAction::RemoveSongsFromPlaylist(playlist, songs) => {
102            assert_eq!(playlist.tb, schemas::playlist::TABLE_NAME);
103            assert!(songs.iter().all(|s| s.tb == schemas::song::TABLE_NAME));
104            daemon
105                .playlist_remove_songs(current_context(), playlist, songs)
106                .await??;
107        }
108        LibraryAction::AddThingsToPlaylist(playlist, things) => {
109            assert_eq!(playlist.tb, schemas::playlist::TABLE_NAME);
110            daemon
111                .playlist_add_list(current_context(), playlist, things)
112                .await??;
113        }
114        LibraryAction::CreatePlaylistAndAddThings(name, things) => {
115            let playlist = daemon
116                .playlist_get_or_create(current_context(), name)
117                .await??;
118            daemon
119                .playlist_add_list(current_context(), playlist, things)
120                .await?
121                .map(|()| flag_update())?;
122        }
123        LibraryAction::CreateDynamicPlaylist(name, query) => daemon
124            .dynamic_playlist_create(current_context(), name, query)
125            .await?
126            .map(|_| flag_update())?,
127        LibraryAction::RemoveDynamicPlaylist(id) => {
128            assert_eq!(id.tb, schemas::dynamic::TABLE_NAME);
129            daemon
130                .dynamic_playlist_remove(current_context(), id)
131                .await?
132                .map(|()| flag_update())?;
133        }
134        LibraryAction::UpdateDynamicPlaylist(id, changes) => {
135            assert_eq!(id.tb, schemas::dynamic::TABLE_NAME);
136            daemon
137                .dynamic_playlist_update(current_context(), id, changes)
138                .await?
139                .map(|_| flag_update())?;
140        }
141    }
142
143    if update {
144        let state = get_library(daemon).await?;
145        state_tx.send(state)?;
146    }
147
148    Ok(())
149}
150
151async fn get_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<LibraryBrief> {
152    let ctx = tarpc::context::current();
153    Ok(daemon.library_brief(ctx).await??)
154}
155
156/// initiate a rescan and wait until it's done
157async fn rescan_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
158    let ctx = tarpc::context::current();
159
160    // don't error out is a rescan is in progress
161    match daemon.library_rescan(ctx).await? {
162        Ok(()) | Err(SerializableLibraryError::RescanInProgress) => Ok(()),
163        Err(e) => Err(e.into()),
164    }
165}
166
167/// initiate an analysis and wait until it's done
168async fn analyze_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
169    let ctx = tarpc::context::current();
170
171    // don't error out if an analysis is in progress
172    match daemon.library_analyze(ctx, false).await? {
173        Ok(()) | Err(SerializableLibraryError::AnalysisInProgress) => Ok(()),
174        Err(e) => Err(e.into()),
175    }
176}
177
178/// initiate a recluster and wait until it's done
179async fn recluster_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
180    let ctx = tarpc::context::current();
181
182    match daemon.library_recluster(ctx).await? {
183        Ok(()) | Err(SerializableLibraryError::ReclusterInProgress) => Ok(()),
184        Err(e) => Err(e.into()),
185    }
186}