use async_net::{AsyncToSocketAddrs, TcpStream};
use futures_lite::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use itertools::Itertools;
use std::str::FromStr;
use std::{io, net::SocketAddr};
use crate::client::respmap::RespMap;
use crate::{
client::responses::{self, MixedResponse},
client::Filter,
Stats, Status, Subsystem, Track,
};
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Invalid command or arguments")]
CommandError { msg: String },
#[error("The server closed the connection")]
Disconnected,
#[error(transparent)]
IOError(#[from] io::Error),
#[error("Invalid reply to command")]
ResponseError { reply: String, errmsg: String },
#[error("invalid value error")]
ValueError { msg: String },
}
pub struct MpdClient {
bufreader: BufReader<TcpStream>,
version: String,
server_address: SocketAddr,
}
impl MpdClient {
pub async fn new<A: AsyncToSocketAddrs>(addr: A) -> Result<Self, Error> {
let stream = TcpStream::connect(addr).await?;
let server_address = stream.peer_addr()?;
let bufreader = BufReader::new(stream);
let mut s = Self {
bufreader,
version: String::new(),
server_address,
};
s.read_version().await?;
Ok(s)
}
pub async fn reconnect(&mut self) -> Result<(), Error> {
let stream = TcpStream::connect(self.server_address).await?;
let bufreader = BufReader::new(stream);
self.bufreader = bufreader;
self.read_version().await?;
Ok(())
}
async fn read_version(&mut self) -> Result<(), Error> {
self.version = self.read_resp_line().await?;
log::debug!("Connected: {}", self.version);
Ok(())
}
pub async fn stats(&mut self) -> Result<Stats, Error> {
self.cmd("stats").await?;
let lines = self.read_resp().await?;
let map = RespMap::from_string(lines);
Ok(map.into())
}
pub async fn status(&mut self) -> Result<Status, Error> {
self.cmd("status").await?;
let lines = self.read_resp().await?;
let map = RespMap::from_string(lines);
Ok(map.into())
}
pub async fn update(&mut self, path: Option<&str>) -> Result<i32, Error> {
self.cmd(Cmd::new("update", path)).await?;
let r = self.read_resp_line().await?;
let db_version = match r.split(": ").next_tuple() {
Some(("updating_db", dbv)) => dbv.parse().unwrap_or(0),
_ => 0,
};
Ok(db_version)
}
pub async fn rescan(&mut self, path: Option<&str>) -> Result<i32, Error> {
self.cmd(Cmd::new("rescan", path)).await?;
let r = self.read_resp_line().await?;
let db_version = match r.split(": ").next_tuple() {
Some(("updating_db", dbv)) => dbv.parse().unwrap_or(0),
_ => 0,
};
Ok(db_version)
}
pub async fn idle(&mut self) -> Result<Subsystem, Error> {
self.cmd("idle").await?;
let resp = self.read_resp().await?;
let mut lines = resp.lines();
let line = lines.next().unwrap_or_default();
for unexpected_line in lines {
log::warn!("More than one line in idle response: {}", unexpected_line);
}
if let Some((k, v)) = line.splitn(2, ": ").next_tuple() {
if k != "changed" {
log::warn!("k not changed");
return Err(Error::CommandError {
msg: "".to_string(),
});
}
let subsystem = Subsystem::from_str(v)?;
return Ok(subsystem);
}
Err(Error::CommandError {
msg: "".to_string(),
})
}
pub async fn noidle(&mut self) -> Result<(), Error> {
self.okcmd("noidle").await
}
pub async fn setvol(&mut self, volume: u32) -> Result<(), Error> {
self.okcmd(Cmd::new("setvol", Some(volume))).await
}
pub async fn repeat(&mut self, repeat: bool) -> Result<(), Error> {
let repeat: i32 = repeat.into();
self.okcmd(Cmd::new("repeat", Some(repeat))).await
}
pub async fn random(&mut self, random: bool) -> Result<(), Error> {
let random: i32 = random.into();
self.okcmd(Cmd::new("random", Some(random))).await
}
pub async fn consume(&mut self, consume: bool) -> Result<(), Error> {
let consume: i32 = consume.into();
self.okcmd(Cmd::new("consume", Some(consume))).await
}
pub async fn play(&mut self) -> Result<(), Error> {
self.play_pause(true).await
}
pub async fn playid(&mut self, id: u32) -> Result<(), Error> {
self.okcmd(Cmd::new("playid", Some(id))).await?;
Ok(())
}
pub async fn pause(&mut self) -> Result<(), Error> {
self.play_pause(false).await
}
pub async fn play_pause(&mut self, play: bool) -> Result<(), Error> {
let play: i32 = (!play).into();
self.okcmd(Cmd::new("pause", Some(play))).await
}
pub async fn next(&mut self) -> Result<(), Error> {
self.okcmd("next").await
}
pub async fn prev(&mut self) -> Result<(), Error> {
self.okcmd("prev").await
}
pub async fn stop(&mut self) -> Result<(), Error> {
self.okcmd("stop").await
}
pub async fn listall(&mut self, path: Option<String>) -> Result<Vec<String>, Error> {
self.cmd(Cmd::new("listall", path)).await?;
Ok(self
.read_resp()
.await?
.lines()
.filter_map(|line| {
if line.starts_with("file: ") {
Some(line[6..].to_string())
} else {
None
}
})
.collect())
}
pub async fn listallinfo(&mut self, path: Option<&str>) -> Result<Vec<MixedResponse>, Error> {
self.cmd(Cmd::new("listallinfo", path)).await?;
Ok(responses::mixed_stream(&mut self.bufreader).await?)
}
pub async fn queue_add(&mut self, path: &str) -> Result<(), Error> {
self.okcmd(Cmd::new("add", Some(path))).await
}
pub async fn queue_clear(&mut self) -> Result<(), Error> {
self.okcmd("clear").await
}
pub async fn queue(&mut self) -> Result<Vec<Track>, Error> {
self.cmd("playlistinfo").await?;
Ok(responses::tracks(&mut self.bufreader).await?)
}
pub async fn search(&mut self, filter: &Filter) -> Result<Vec<Track>, Error> {
self.cmd(Cmd::new("search", filter.to_query())).await?;
let tracks = responses::tracks(&mut self.bufreader).await?;
Ok(tracks)
}
async fn cmd(&mut self, cmd: impl Into<Cmd>) -> io::Result<()> {
let r = cmd.into().to_string();
self.bufreader.get_mut().write_all(r.as_bytes()).await?;
Ok(())
}
async fn okcmd(&mut self, cmd: impl Into<Cmd>) -> Result<(), Error> {
let r = cmd.into().to_string();
log::debug!("cmd: {}", r);
self.bufreader.get_mut().write_all(r.as_bytes()).await?;
self.read_ok_resp().await?;
Ok(())
}
async fn read_resp(&mut self) -> Result<String, Error> {
let mut v = Vec::new();
loop {
let mut line = String::new();
if self.bufreader.read_line(&mut line).await? == 0 {
return Err(Error::Disconnected);
}
let line = line.trim();
if line == "OK" {
break;
}
if line.starts_with("ACK ") {
log::trace!("Cmd error: {}", line);
return Err(Error::CommandError { msg: line.into() });
}
v.push(line.to_string())
}
Ok(v.join("\n"))
}
async fn read_resp_line(&mut self) -> Result<String, Error> {
let mut line = String::new();
self.bufreader.read_line(&mut line).await?;
Ok(line.trim().to_string())
}
async fn read_ok_resp(&mut self) -> Result<(), Error> {
let mut line = String::new();
self.bufreader.read_line(&mut line).await?;
if &line != "OK\n" {
return Err(Error::ResponseError {
reply: line.to_string(),
errmsg: "Expected OK".to_string(),
});
}
Ok(())
}
}
struct Cmd {
cmd: &'static str,
arg: Option<String>,
}
impl Cmd {
fn new<T: ToString>(cmd: &'static str, arg: Option<T>) -> Self {
Self {
cmd,
arg: arg.map(|a| a.to_string()),
}
}
fn to_string(&self) -> String {
if let Some(arg) = &self.arg {
format!("{} \"{}\"\n", self.cmd, arg.to_string())
} else {
format!("{}\n", self.cmd)
}
}
}
impl From<&'static str> for Cmd {
fn from(cmd: &'static str) -> Self {
Self { cmd, arg: None }
}
}