mecomp_tui/state/
library.rs1use std::sync::Arc;
6
7use tokio::sync::{
8 broadcast,
9 mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10};
11
12use mecomp_core::{rpc::MusicPlayerClient, state::library::LibraryFull};
13
14use crate::termination::Interrupted;
15
16use super::action::LibraryAction;
17
18#[derive(Debug, Clone)]
20#[allow(clippy::module_name_repetitions)]
21pub struct LibraryState {
22 state_tx: UnboundedSender<LibraryFull>,
23}
24
25impl LibraryState {
26 #[must_use]
28 pub fn new() -> (Self, UnboundedReceiver<LibraryFull>) {
29 let (state_tx, state_rx) = unbounded_channel::<LibraryFull>();
30
31 (Self { state_tx }, state_rx)
32 }
33
34 pub async fn main_loop(
42 &self,
43 daemon: Arc<MusicPlayerClient>,
44 mut action_rx: UnboundedReceiver<LibraryAction>,
45 mut interrupt_rx: broadcast::Receiver<Interrupted>,
46 ) -> anyhow::Result<Interrupted> {
47 let mut state = get_library(daemon.clone()).await?;
48
49 self.state_tx.send(state.clone())?;
51
52 let result = loop {
53 tokio::select! {
54 Some(action) = action_rx.recv() => {
57 match action {
58 LibraryAction::Rescan => {
59 rescan_library(daemon.clone()).await?;
60 }
61 LibraryAction::Update => {
62 state = get_library(daemon.clone()).await?;
63 self.state_tx.send(state.clone())?;
64 }
65 LibraryAction::Analyze => {
66 analyze_library(daemon.clone()).await?;
67 }
68 LibraryAction::Recluster => {
69 recluster_library(daemon.clone()).await?;
70 }
71 LibraryAction::CreatePlaylist(name) => {
72 daemon.playlist_get_or_create(tarpc::context::current(), name).await??;
73 state = get_library(daemon.clone()).await?;
74 self.state_tx.send(state.clone())?;
75 }
76 LibraryAction::RemovePlaylist(id) => {
77 debug_assert_eq!(
78 id.tb,
79 mecomp_storage::db::schemas::playlist::TABLE_NAME
80 );
81 daemon.playlist_remove(tarpc::context::current(), id).await??;
82 state = get_library(daemon.clone()).await?;
83 self.state_tx.send(state.clone())?;
84 }
85 LibraryAction::RenamePlaylist(id, name) => {
86 debug_assert_eq!(
87 id.tb,
88 mecomp_storage::db::schemas::playlist::TABLE_NAME
89 );
90 daemon.playlist_rename(tarpc::context::current(), id, name).await??;
91 state = get_library(daemon.clone()).await?;
92 self.state_tx.send(state.clone())?;
93 }
94 LibraryAction::RemoveSongsFromPlaylist(playlist, songs) => {
95 debug_assert_eq!(
96 playlist.tb,
97 mecomp_storage::db::schemas::playlist::TABLE_NAME
98 );
99 debug_assert!(songs.iter().all(|s| s.tb == mecomp_storage::db::schemas::song::TABLE_NAME));
100 daemon.playlist_remove_songs(tarpc::context::current(), playlist, songs).await??;
101 }
102 LibraryAction::AddThingsToPlaylist(playlist, things) => {
103 debug_assert_eq!(
104 playlist.tb,
105 mecomp_storage::db::schemas::playlist::TABLE_NAME
106 );
107 daemon.playlist_add_list(tarpc::context::current(), playlist, things).await??;
108 }
109 LibraryAction::CreatePlaylistAndAddThings(name, things) => {
110 let playlist = daemon.playlist_get_or_create(tarpc::context::current(), name).await??;
111 daemon.playlist_add_list(tarpc::context::current(), playlist, things).await??;
112 state = get_library(daemon.clone()).await?;
113 self.state_tx.send(state.clone())?;
114 }
115 LibraryAction::CreateDynamicPlaylist(name, query) => {
116 daemon.dynamic_playlist_create(tarpc::context::current(), name, query).await??;
117 state = get_library(daemon.clone()).await?;
118 self.state_tx.send(state.clone())?;
119 }
120 LibraryAction::RemoveDynamicPlaylist(id) => {
121 debug_assert_eq!(
122 id.tb,
123 mecomp_storage::db::schemas::dynamic::TABLE_NAME
124 );
125 daemon.dynamic_playlist_remove(tarpc::context::current(), id).await??;
126 state = get_library(daemon.clone()).await?;
127 self.state_tx.send(state.clone())?;
128 }
129 LibraryAction::UpdateDynamicPlaylist(id, changes) => {
130 debug_assert_eq!(
131 id.tb,
132 mecomp_storage::db::schemas::dynamic::TABLE_NAME
133 );
134 daemon.dynamic_playlist_update(tarpc::context::current(), id, changes).await??;
135 state = get_library(daemon.clone()).await?;
136 self.state_tx.send(state.clone())?;
137 }
138 }
139 },
140 Ok(interrupted) = interrupt_rx.recv() => {
142 break interrupted;
143 }
144 }
145 };
146
147 Ok(result)
148 }
149}
150
151async fn get_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<LibraryFull> {
152 let ctx = tarpc::context::current();
153 Ok(daemon.library_full(ctx).await??)
154}
155
156async fn rescan_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
158 let ctx = tarpc::context::current();
159
160 daemon.library_rescan(ctx).await??;
161
162 Ok(())
163}
164
165async fn analyze_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
167 let ctx = tarpc::context::current();
168
169 daemon.library_analyze(ctx, false).await??;
170
171 Ok(())
172}
173
174async fn recluster_library(daemon: Arc<MusicPlayerClient>) -> anyhow::Result<()> {
176 let ctx = tarpc::context::current();
177
178 daemon.library_recluster(ctx).await??;
179
180 Ok(())
181}