mpd_utils/
multi_host_client.rs

1use crate::error::{Error, Result};
2use crate::persistent_client::PersistentClient;
3use mpd_client::client::{CommandError, ConnectionEvent};
4use mpd_client::responses::{PlayState, SongInQueue, Status};
5use mpd_client::Client;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::broadcast::error::RecvError;
10
11pub struct MultiHostClient {
12    clients: Vec<PersistentClient>,
13}
14
15impl MultiHostClient {
16    pub fn new(hosts: Vec<String>, retry_interval: Duration) -> Self {
17        let hosts = hosts
18            .into_iter()
19            .map(|host| PersistentClient::new(host, retry_interval))
20            .collect();
21
22        Self { clients: hosts }
23    }
24
25    /// Initialises each of the clients.
26    pub fn init(&self) {
27        for client in &self.clients {
28            client.init();
29        }
30    }
31
32    /// Waits until any of the clients
33    /// make a valid connection to their host.
34    pub async fn wait_for_any_client(&self) -> Arc<Client> {
35        let waits = self
36            .clients
37            .iter()
38            .map(|client| Box::pin(client.wait_for_client()));
39        futures::future::select_all(waits).await.0
40    }
41
42    /// Waits until all of the clients
43    /// make a valid connection to their host.
44    pub async fn wait_for_all_clients(&self) -> Vec<Arc<Client>> {
45        let waits = self.clients.iter().map(|client| client.wait_for_client());
46        futures::future::join_all(waits).await
47    }
48
49    /// Attempts to find the current most relevant client.
50    /// This checks for, in order:
51    ///
52    /// - A currently playing client
53    /// - A paused client (ie has items in the playlist)
54    /// - A connected client
55    async fn get_current_client(
56        &self,
57    ) -> std::result::Result<Option<&PersistentClient>, CommandError> {
58        self.wait_for_any_client().await;
59
60        let connected_clients = self
61            .clients
62            .iter()
63            .filter(|client| client.is_connected())
64            .collect::<Vec<_>>();
65
66        if connected_clients.is_empty() {
67            Ok(None)
68        } else {
69            let player_states = connected_clients.iter().map(|&client| async move {
70                client.status().await.map(|status| (client, status.state))
71            });
72
73            let player_states = futures::future::join_all(player_states)
74                .await
75                .into_iter()
76                .collect::<std::result::Result<Vec<_>, _>>();
77
78            player_states.map(|player_states| {
79                player_states
80                    .iter()
81                    .find(|(_, state)| state == &PlayState::Playing)
82                    .or_else(|| {
83                        player_states
84                            .iter()
85                            .find(|(_, state)| state == &PlayState::Paused)
86                    })
87                    .or_else(|| {
88                        player_states
89                            .iter()
90                            .find(|(_, state)| state == &PlayState::Stopped)
91                    })
92                    .map(|(client, _)| *client)
93            })
94        }
95    }
96
97    /// Runs the provided callback as soon as a connected client is available,
98    /// using the most relevant client (see `get_current_client`).
99    pub async fn with_client<F, Fut, T>(&self, f: F) -> Result<T>
100    where
101        F: FnOnce(Arc<Client>) -> Fut,
102        Fut: Future<Output = T>,
103    {
104        let client = self.get_current_client().await;
105
106        match client {
107            Ok(Some(client)) => Ok(client.with_client(f).await),
108            Ok(None) => Err(Error::NoHostConnectedError),
109            Err(err) => Err(Error::CommandError(err)),
110        }
111    }
112
113    /// Receives on all clients, returning an event from the first one to respond.
114    pub async fn recv(&mut self) -> std::result::Result<Arc<ConnectionEvent>, RecvError> {
115        let waits = self
116            .clients
117            .iter_mut()
118            .map(|client| Box::pin(client.recv()));
119        futures::future::select_all(waits).await.0
120    }
121
122    /// Runs the `status` command on the MPD server.
123    pub async fn status(&self) -> Result<Status> {
124        let client = self.get_current_client().await;
125        match client {
126            Ok(Some(client)) => client.status().await.map_err(Error::CommandError),
127            Ok(None) => Err(Error::NoHostConnectedError),
128            Err(err) => Err(Error::CommandError(err)),
129        }
130    }
131
132    /// Runs the `currentsong` command on the MPD server.
133    pub async fn current_song(&self) -> Result<Option<SongInQueue>> {
134        match self.get_current_client().await {
135            Ok(Some(client)) => client.current_song().await.map_err(Error::CommandError),
136            Ok(None) => Err(Error::NoHostConnectedError),
137            Err(err) => Err(Error::CommandError(err)),
138        }
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[tokio::test]
147    async fn test() {
148        let client = MultiHostClient::new(
149            vec!["localhost:6600".into_string(), "chloe:6600".into_string()],
150            Duration::from_secs(5),
151        );
152
153        client.init();
154        client.wait_for_all_clients().await;
155
156        let current_client = client.get_current_client().await;
157        println!("{current_client:?}");
158    }
159}