mpd-utils 0.2.1

Utilities for working with MPD servers, built on top of `mpd_client` and `tokio`
Documentation
use crate::error::{Error, Result};
use crate::persistent_client::PersistentClient;
use mpd_client::client::{CommandError, ConnectionEvent};
use mpd_client::responses::{PlayState, SongInQueue, Status};
use mpd_client::Client;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;

pub struct MultiHostClient {
    clients: Vec<PersistentClient>,
}

impl MultiHostClient {
    pub fn new(hosts: Vec<String>, retry_interval: Duration) -> Self {
        let hosts = hosts
            .into_iter()
            .map(|host| PersistentClient::new(host, retry_interval))
            .collect();

        Self { clients: hosts }
    }

    /// Initialises each of the clients.
    pub fn init(&self) {
        for client in &self.clients {
            client.init();
        }
    }

    /// Waits until any of the clients
    /// make a valid connection to their host.
    pub async fn wait_for_any_client(&self) -> Arc<Client> {
        let waits = self
            .clients
            .iter()
            .map(|client| Box::pin(client.wait_for_client()));
        futures::future::select_all(waits).await.0
    }

    /// Waits until all of the clients
    /// make a valid connection to their host.
    pub async fn wait_for_all_clients(&self) -> Vec<Arc<Client>> {
        let waits = self.clients.iter().map(|client| client.wait_for_client());
        futures::future::join_all(waits).await
    }

    /// Attempts to find the current most relevant client.
    /// This checks for, in order:
    ///
    /// - A currently playing client
    /// - A paused client (ie has items in the playlist)
    /// - A connected client
    async fn get_current_client(
        &self,
    ) -> std::result::Result<Option<&PersistentClient>, CommandError> {
        self.wait_for_any_client().await;

        let connected_clients = self
            .clients
            .iter()
            .filter(|client| client.is_connected())
            .collect::<Vec<_>>();

        if connected_clients.is_empty() {
            Ok(None)
        } else {
            let player_states = connected_clients.iter().map(|&client| async move {
                client.status().await.map(|status| (client, status.state))
            });

            let player_states = futures::future::join_all(player_states)
                .await
                .into_iter()
                .collect::<std::result::Result<Vec<_>, _>>();

            player_states.map(|player_states| {
                player_states
                    .iter()
                    .find(|(_, state)| state == &PlayState::Playing)
                    .or_else(|| {
                        player_states
                            .iter()
                            .find(|(_, state)| state == &PlayState::Paused)
                    })
                    .or_else(|| {
                        player_states
                            .iter()
                            .find(|(_, state)| state == &PlayState::Stopped)
                    })
                    .map(|(client, _)| *client)
            })
        }
    }

    /// Runs the provided callback as soon as a connected client is available,
    /// using the most relevant client (see `get_current_client`).
    pub async fn with_client<F, Fut, T>(&self, f: F) -> Result<T>
    where
        F: FnOnce(Arc<Client>) -> Fut,
        Fut: Future<Output = T>,
    {
        let client = self.get_current_client().await;

        match client {
            Ok(Some(client)) => Ok(client.with_client(f).await),
            Ok(None) => Err(Error::NoHostConnectedError),
            Err(err) => Err(Error::CommandError(err)),
        }
    }

    /// Receives on all clients, returning an event from the first one to respond.
    pub async fn recv(&mut self) -> std::result::Result<Arc<ConnectionEvent>, RecvError> {
        let waits = self
            .clients
            .iter_mut()
            .map(|client| Box::pin(client.recv()));
        futures::future::select_all(waits).await.0
    }

    /// Runs the `status` command on the MPD server.
    pub async fn status(&self) -> Result<Status> {
        let client = self.get_current_client().await;
        match client {
            Ok(Some(client)) => client.status().await.map_err(Error::CommandError),
            Ok(None) => Err(Error::NoHostConnectedError),
            Err(err) => Err(Error::CommandError(err)),
        }
    }

    /// Runs the `currentsong` command on the MPD server.
    pub async fn current_song(&self) -> Result<Option<SongInQueue>> {
        match self.get_current_client().await {
            Ok(Some(client)) => client.current_song().await.map_err(Error::CommandError),
            Ok(None) => Err(Error::NoHostConnectedError),
            Err(err) => Err(Error::CommandError(err)),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test() {
        let client = MultiHostClient::new(
            vec!["localhost:6600".into_string(), "chloe:6600".into_string()],
            Duration::from_secs(5),
        );

        client.init();
        client.wait_for_all_clients().await;

        let current_client = client.get_current_client().await;
        println!("{current_client:?}");
    }
}