mpd_utils/
multi_host_client.rs1use crate::error::{Error, Result};
2use crate::persistent_client::PersistentClient;
3use mpd_client::client::{CommandError, ConnectionEvent};
4use mpd_client::responses::{PlayState, SongInQueue, Status};
5use mpd_client::Client;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::broadcast::error::RecvError;
10
11pub struct MultiHostClient {
12 clients: Vec<PersistentClient>,
13}
14
15impl MultiHostClient {
16 pub fn new(hosts: Vec<String>, retry_interval: Duration) -> Self {
17 let hosts = hosts
18 .into_iter()
19 .map(|host| PersistentClient::new(host, retry_interval))
20 .collect();
21
22 Self { clients: hosts }
23 }
24
25 pub fn init(&self) {
27 for client in &self.clients {
28 client.init();
29 }
30 }
31
32 pub async fn wait_for_any_client(&self) -> Arc<Client> {
35 let waits = self
36 .clients
37 .iter()
38 .map(|client| Box::pin(client.wait_for_client()));
39 futures::future::select_all(waits).await.0
40 }
41
42 pub async fn wait_for_all_clients(&self) -> Vec<Arc<Client>> {
45 let waits = self.clients.iter().map(|client| client.wait_for_client());
46 futures::future::join_all(waits).await
47 }
48
49 async fn get_current_client(
56 &self,
57 ) -> std::result::Result<Option<&PersistentClient>, CommandError> {
58 self.wait_for_any_client().await;
59
60 let connected_clients = self
61 .clients
62 .iter()
63 .filter(|client| client.is_connected())
64 .collect::<Vec<_>>();
65
66 if connected_clients.is_empty() {
67 Ok(None)
68 } else {
69 let player_states = connected_clients.iter().map(|&client| async move {
70 client.status().await.map(|status| (client, status.state))
71 });
72
73 let player_states = futures::future::join_all(player_states)
74 .await
75 .into_iter()
76 .collect::<std::result::Result<Vec<_>, _>>();
77
78 player_states.map(|player_states| {
79 player_states
80 .iter()
81 .find(|(_, state)| state == &PlayState::Playing)
82 .or_else(|| {
83 player_states
84 .iter()
85 .find(|(_, state)| state == &PlayState::Paused)
86 })
87 .or_else(|| {
88 player_states
89 .iter()
90 .find(|(_, state)| state == &PlayState::Stopped)
91 })
92 .map(|(client, _)| *client)
93 })
94 }
95 }
96
97 pub async fn with_client<F, Fut, T>(&self, f: F) -> Result<T>
100 where
101 F: FnOnce(Arc<Client>) -> Fut,
102 Fut: Future<Output = T>,
103 {
104 let client = self.get_current_client().await;
105
106 match client {
107 Ok(Some(client)) => Ok(client.with_client(f).await),
108 Ok(None) => Err(Error::NoHostConnectedError),
109 Err(err) => Err(Error::CommandError(err)),
110 }
111 }
112
113 pub async fn recv(&mut self) -> std::result::Result<Arc<ConnectionEvent>, RecvError> {
115 let waits = self
116 .clients
117 .iter_mut()
118 .map(|client| Box::pin(client.recv()));
119 futures::future::select_all(waits).await.0
120 }
121
122 pub async fn status(&self) -> Result<Status> {
124 let client = self.get_current_client().await;
125 match client {
126 Ok(Some(client)) => client.status().await.map_err(Error::CommandError),
127 Ok(None) => Err(Error::NoHostConnectedError),
128 Err(err) => Err(Error::CommandError(err)),
129 }
130 }
131
132 pub async fn current_song(&self) -> Result<Option<SongInQueue>> {
134 match self.get_current_client().await {
135 Ok(Some(client)) => client.current_song().await.map_err(Error::CommandError),
136 Ok(None) => Err(Error::NoHostConnectedError),
137 Err(err) => Err(Error::CommandError(err)),
138 }
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[tokio::test]
147 async fn test() {
148 let client = MultiHostClient::new(
149 vec!["localhost:6600".into_string(), "chloe:6600".into_string()],
150 Duration::from_secs(5),
151 );
152
153 client.init();
154 client.wait_for_all_clients().await;
155
156 let current_client = client.get_current_client().await;
157 println!("{current_client:?}");
158 }
159}