use crate::error_from;
use async_trait::async_trait;
use boolinator::Boolinator;
use log::{debug, info};
use regex::Regex;
use snafu::{Backtrace, GenerateBacktrace, OptionExt, Snafu};
use tokio::{
net::{TcpStream, ToSocketAddrs, UnixStream},
prelude::*,
};
use std::{
collections::HashMap,
convert::TryFrom,
fmt,
marker::{Send, Unpin},
path::{Path, PathBuf},
str::FromStr,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{}", cause))]
Other {
#[snafu(source(true))]
cause: Box<dyn std::error::Error>,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to add URI: `{}'", text))]
Add {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to connect to the mpd server: `{}'", text))]
Connect {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Search failed: {}", text))]
Find {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to fetch all song URIs: {}", text))]
GetAllSongs {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to read messages: `{}'", text))]
GetMessages {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to find stickers: `{}'", text))]
GetStickers {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to set sticker: `{}'", text))]
StickerSet {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to get sticker: `{}'", text))]
StickerGet {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to send message: `{}'", text))]
SendMessage {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to subscribe to our subsystems of interest: `{}'", text))]
Subscribe {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to idle: `{}'", text))]
Idle {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display(
"Uknown subsystem `{}'returned in respones to the `idle' command. This is \
likely a bug; please consider reporting it to sp1ff@pobox.com",
text
))]
IdleSubsystem {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Couldn't parse play state from {}", text))]
PlayState {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Couldn't parse songid from {}", text))]
SongId {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Couldn't parse elapsed from {}", text))]
Elapsed {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Couldn't parse song file from {}", text))]
SongFile {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Couldn't parse elapsed from {}", text))]
Duration {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Unknown player state {}", state))]
UnknownState {
state: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to update DB: `{}'", text))]
UpdateDB {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to list stored playlists: `{}'", text))]
GetStoredPlaylists {
text: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
#[snafu(display("Failed to convert sticker {}: {}", sticker, error))]
BadStickerConversion {
sticker: String,
error: String,
#[snafu(backtrace(true))]
back: Backtrace,
},
}
error_from!(crate::commands::Error);
error_from!(regex::Error);
error_from!(std::io::Error);
error_from!(std::num::ParseFloatError);
error_from!(std::num::ParseIntError);
error_from!(std::string::FromUtf8Error);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug)]
pub struct CurrentSong {
pub songid: u64,
pub file: std::path::PathBuf,
pub elapsed: f64,
pub duration: f64,
}
impl CurrentSong {
fn new(songid: u64, file: std::path::PathBuf, elapsed: f64, duration: f64) -> CurrentSong {
CurrentSong {
songid: songid,
file: file,
elapsed: elapsed,
duration: duration,
}
}
pub fn played_pct(&self) -> f64 {
self.elapsed / self.duration
}
}
#[derive(Clone, Debug)]
pub enum PlayerStatus {
Play(CurrentSong),
Pause(CurrentSong),
Stopped,
}
impl PlayerStatus {
pub fn current_song(&self) -> Option<&CurrentSong> {
match self {
PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => Some(curr),
PlayerStatus::Stopped => None,
}
}
}
#[async_trait]
pub trait RequestResponse {
async fn req(&mut self, msg: &str) -> Result<String>;
async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String>;
}
#[cfg(test)]
pub mod test_mock {
use super::*;
pub struct Mock {
inmsgs: Vec<String>,
outmsgs: Vec<String>,
}
impl Mock {
pub fn new(convo: &[(&str, &str)]) -> Mock {
let (left, right): (Vec<&str>, Vec<&str>) = convo.iter().copied().rev().unzip();
Mock {
inmsgs: left.iter().map(|x| x.to_string()).collect(),
outmsgs: right.iter().map(|x| x.to_string()).collect(),
}
}
}
#[async_trait]
impl RequestResponse for Mock {
async fn req(&mut self, msg: &str) -> Result<String> {
self.req_w_hint(msg, 512).await
}
async fn req_w_hint(&mut self, msg: &str, _hint: usize) -> Result<String> {
assert_eq!(msg, self.inmsgs.pop().unwrap());
Ok(self.outmsgs.pop().unwrap())
}
}
#[tokio::test]
async fn mock_smoke_test() {
let mut mock = Mock::new(&[("ping", "pong"), ("from", "to")]);
assert_eq!(mock.req("ping").await.unwrap(), "pong");
assert_eq!(mock.req("from").await.unwrap(), "to");
}
#[tokio::test]
#[should_panic]
async fn mock_negative_test() {
let mut mock = Mock::new(&[("ping", "pong")]);
assert_eq!(mock.req("ping").await.unwrap(), "pong");
let _should_panic = mock.req("not there!").await.unwrap();
}
}
pub struct MpdConnection<T: AsyncRead + AsyncWrite + Send + Unpin> {
sock: T,
_proto_ver: String,
}
#[async_trait]
impl<T> RequestResponse for MpdConnection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin,
{
async fn req(&mut self, msg: &str) -> Result<String> {
self.req_w_hint(msg, 512).await
}
async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String> {
self.sock.write_all(format!("{}\n", msg).as_bytes()).await?;
let mut buf = Vec::with_capacity(hint);
let mut cb = 0; let mut more = true; while more {
cb += self.sock.read_buf(&mut buf).await?;
if cb > 2 && char::from(buf[cb - 1]) == '\n' {
let mut idx = cb - 2;
while idx > 0 {
if char::from(buf[idx]) == '\n' {
idx += 1;
break;
}
idx -= 1;
}
if (idx + 2 < cb && char::from(buf[idx]) == 'O' && char::from(buf[idx + 1]) == 'K')
|| (idx + 3 < cb
&& char::from(buf[idx]) == 'A'
&& char::from(buf[idx + 1]) == 'C'
&& char::from(buf[idx + 2]) == 'K')
{
more = false;
}
}
}
Ok(String::from_utf8(buf)?)
}
}
async fn parse_connect_rsp<T>(sock: &mut T) -> Result<String>
where
T: AsyncRead + AsyncWrite + Send + Unpin,
{
let mut buf = Vec::with_capacity(32);
let _cb = sock.read_buf(&mut buf).await?;
let text = String::from_utf8(buf)?;
text.starts_with("OK MPD ").as_option().context(Connect {
text: text.trim().to_string(),
})?;
info!("Connected {}.", text[7..].trim());
Ok(text[7..].trim().to_string())
}
impl MpdConnection<TcpStream> {
pub async fn new<A: ToSocketAddrs>(addr: A) -> Result<Box<dyn RequestResponse>> {
let mut sock = TcpStream::connect(addr).await?;
let proto_ver = parse_connect_rsp(&mut sock).await?;
Ok(Box::new(MpdConnection::<TcpStream> {
sock: sock,
_proto_ver: proto_ver,
}))
}
}
impl MpdConnection<UnixStream> {
pub async fn new<P: AsRef<Path>>(pth: P) -> Result<Box<dyn RequestResponse>> {
let mut sock = UnixStream::connect(pth).await?;
let proto_ver = parse_connect_rsp(&mut sock).await?;
Ok(Box::new(MpdConnection::<UnixStream> {
sock: sock,
_proto_ver: proto_ver,
}))
}
}
pub fn quote(text: &str) -> String {
if text.contains(&[' ', '\t', '\'', '"'][..]) {
let mut s = String::from("\"");
for c in text.chars() {
if c == '"' || c == '\\' {
s.push('\\');
}
s.push(c);
}
s.push('"');
s
} else {
text.to_string()
}
}
pub struct Client {
stream: Box<dyn RequestResponse>,
re_state: Regex,
re_songid: Regex,
re_elapsed: Regex,
re_file: Regex,
re_duration: Regex,
}
const RE_STATE: &str = r"(?m)^state: (play|pause|stop)$";
const RE_SONGID: &str = r"(?m)^songid: ([0-9]+)$";
const RE_ELAPSED: &str = r"(?m)^elapsed: ([.0-9]+)$";
const RE_FILE: &str = r"(?m)^file: (.*)$";
const RE_DURATION: &str = r"(?m)^duration: (.*)$";
impl Client {
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> Result<Client> {
Self::new(MpdConnection::<TcpStream>::new(addrs).await?)
}
pub async fn open<P: AsRef<Path>>(pth: P) -> Result<Client> {
Self::new(MpdConnection::<UnixStream>::new(pth).await?)
}
pub fn new(stream: Box<dyn RequestResponse>) -> Result<Client> {
Ok(Client {
stream: stream,
re_state: Regex::new(&RE_STATE)?,
re_songid: Regex::new(RE_SONGID)?,
re_elapsed: Regex::new(RE_ELAPSED)?,
re_file: Regex::new(RE_FILE)?,
re_duration: Regex::new(RE_DURATION)?,
})
}
}
fn cap<C1, C2>(re: ®ex::Regex, text: &str, ctx1: C1, ctx2: C2) -> Result<String>
where
C1: snafu::IntoError<Error, Source = snafu::NoneError>,
C2: snafu::IntoError<Error, Source = snafu::NoneError>,
{
Ok(re
.captures(text)
.context(ctx1)?
.get(1)
.context(ctx2)?
.as_str()
.to_string())
}
impl Client {
pub async fn status(&mut self) -> Result<PlayerStatus> {
let text = self.stream.req("status").await?;
let state = cap(
&self.re_state,
&text,
PlayState {
text: text.to_string(),
},
PlayState {
text: text.to_string(),
},
)?;
match state.as_str() {
"stop" => Ok(PlayerStatus::Stopped),
"play" | "pause" => {
let songid = cap(
&self.re_songid,
&text,
SongId {
text: text.to_string(),
},
SongId {
text: text.to_string(),
},
)?
.parse::<u64>()?;
let elapsed = cap(
&self.re_elapsed,
&text,
Elapsed {
text: text.to_string(),
},
Elapsed {
text: text.to_string(),
},
)?
.parse::<f64>()?;
let text = self.stream.req(&format!("playlistid {}", songid)).await?;
let file = cap(
&self.re_file,
&text,
SongFile {
text: text.to_string(),
},
SongFile {
text: text.to_string(),
},
)?;
let duration = cap(
&self.re_duration,
&text,
Duration {
text: text.to_string(),
},
Duration {
text: text.to_string(),
},
)?
.parse::<f64>()?;
let curr = CurrentSong::new(songid, PathBuf::from(file), elapsed, duration);
if state == "play" {
Ok(PlayerStatus::Play(curr))
} else {
Ok(PlayerStatus::Pause(curr))
}
}
_ => Err(Error::UnknownState {
state: state.to_string(),
back: Backtrace::generate(),
}),
}
}
pub async fn get_sticker<T: FromStr>(
&mut self,
file: &str,
sticker_name: &str,
) -> Result<Option<T>>
where
<T as FromStr>::Err: std::error::Error,
{
let msg = format!("sticker get song \"{}\" \"{}\"", file, sticker_name);
let text = self.stream.req(&msg).await?;
debug!("Sent message `{}'; got `{}'", &msg, &text);
let prefix = format!("sticker: {}=", sticker_name);
if text.starts_with(&prefix) {
let s = text[prefix.len()..]
.split('\n')
.next()
.context(StickerGet { text: text.clone() })?;
let r = T::from_str(s);
match r {
Ok(t) => Ok(Some(t)),
Err(e) => Err(Error::BadStickerConversion {
sticker: String::from(sticker_name),
error: format!("{}", e),
back: Backtrace::generate(),
}),
}
} else if text.starts_with("ACK ") && text.ends_with("no such sticker\n") {
Ok(None)
} else {
Err(Error::StickerGet {
text: text.to_string(),
back: Backtrace::generate(),
})
}
}
pub async fn set_sticker<T: std::fmt::Display>(
&mut self,
file: &str,
sticker_name: &str,
sticker_value: &T,
) -> Result<()> {
let msg = format!(
"sticker set song \"{}\" \"{}\" \"{}\"",
file, sticker_name, sticker_value
);
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'", &msg, &text);
text.starts_with("OK").as_option().context(StickerSet {
text: text.to_string(),
})?;
Ok(())
}
pub async fn send_to_playlist(&mut self, file: &str, pl: &str) -> Result<()> {
let msg = format!("playlistadd \"{}\" \"{}\"", pl, file);
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
text.starts_with("OK").as_option().context(StickerSet {
text: text.to_string(),
})?;
Ok(())
}
pub async fn send_message(&mut self, chan: &str, msg: &str) -> Result<()> {
let msg = format!("sendmessage {} {}", chan, quote(msg));
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
text.starts_with("OK").as_option().context(SendMessage {
text: text.to_string(),
})?;
Ok(())
}
pub async fn update(&mut self, uri: &str) -> Result<u64> {
let msg = format!("update \"{}\"", uri);
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
let prefix = "updating_db: ";
if text.starts_with(prefix) {
Ok(text[prefix.len()..].split('\n').collect::<Vec<&str>>()[0]
.to_string()
.parse::<u64>()?)
} else {
Err(Error::UpdateDB {
text: text.to_string(),
back: Backtrace::generate(),
})
}
}
pub async fn get_stored_playlists(&mut self) -> Result<std::vec::Vec<String>> {
let text = self.stream.req("listplaylists").await?;
debug!("Sent listplaylists; got `{}'.", &text);
if text.starts_with("ACK") {
Err(Error::GetStoredPlaylists {
text: text.to_string(),
back: Backtrace::generate(),
})
} else {
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("playlist: ") {
Some(String::from(&x[10..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
}
fn search_rsp_to_uris(&self, text: &str) -> Result<std::vec::Vec<String>> {
if text.starts_with("ACK") {
Err(Error::Find {
text: text.to_string(),
back: Backtrace::generate(),
})
} else {
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("file: ") {
Some(String::from(&x[6..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
}
pub async fn find1(
&mut self,
cond: &str,
val: &str,
case: bool,
) -> Result<std::vec::Vec<String>> {
let cmd = format!(
"{} {}",
if case { "find" } else { "search" },
quote(&format!("({} {})", cond, val))
);
let text = self.stream.req(&cmd).await?;
self.search_rsp_to_uris(&text)
}
pub async fn find2(
&mut self,
attr: &str,
op: &str,
val: &str,
case: bool,
) -> Result<std::vec::Vec<String>> {
let cmd = format!(
"{} {}",
if case { "find" } else { "search" },
quote(&format!("({} {} {})", attr, op, val))
);
debug!("find2 sending ``{}''", cmd);
let text = self.stream.req(&cmd).await?;
self.search_rsp_to_uris(&text)
}
pub async fn get_stickers(&mut self, sticker: &str) -> Result<HashMap<String, String>> {
let text = self
.stream
.req(&format!("sticker find song \"\" {}", sticker))
.await?;
if text.starts_with("ACK") {
Err(Error::GetStickers {
text: text.to_string(),
back: Backtrace::generate(),
})
} else {
let mut m = HashMap::new();
let mut lines = text.lines();
loop {
let file = lines.next().context(GetStickers { text: text.clone() })?;
if "OK" == file {
break;
}
let val = lines.next().context(GetStickers { text: text.clone() })?;
m.insert(
String::from(&file[6..]),
String::from(&val[10 + sticker.len()..]),
);
}
Ok(m)
}
}
pub async fn get_all_songs(&mut self) -> Result<std::vec::Vec<String>> {
let text = self.stream.req("find \"(base '')\"").await?;
if text.starts_with("ACK") {
Err(Error::GetAllSongs {
text: text.to_string(),
back: Backtrace::generate(),
})
} else {
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("file: ") {
Some(String::from(&x[6..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
}
pub async fn add(&mut self, uri: &str) -> Result<()> {
let msg = format!("add {}", quote(uri));
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
text.starts_with("OK").as_option().context(Add {
text: text.to_string(),
})?;
Ok(())
}
}
#[cfg(test)]
mod client_tests {
use super::test_mock::Mock;
use super::*;
#[tokio::test]
async fn client_smoke_test() {
let mock = Box::new(Mock::new(&[(
"sticker get song \"foo.mp3\" \"stick\"",
"sticker: stick=splat\nOK\n",
)]));
let mut cli = Client::new(mock).unwrap();
let val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.unwrap();
assert_eq!(val, "splat");
}
#[tokio::test]
async fn test_status() {
let mock = Box::new(Mock::new(&[
(
"status",
"volume: -1
repeat: 0
random: 0
single: 0
consume: 0
playlist: 3
playlistlength: 87
mixrampdb: 0.000000
state: play
song: 14
songid: 15
time: 141:250
bitrate: 128
audio: 44100:24:2
nextsong: 15
nextsongid: 16
elapsed: 140.585
OK",
),
(
"playlistid 15",
"file: U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3
Last-Modified: 2004-12-24T19:26:13Z
Artist: U2
Title: Who's Gonna RIDE Your WILD HOR
Genre: Pop
Time: 316
Pos: 41
Id: 42
duration: 249.994
OK",
),
(
"status",
"volume: -1
repeat: 0
random: 0
single: 0
consume: 0
playlist: 84
playlistlength: 27
mixrampdb: 0.000000
state: stop
OK",
),
(
"status",
"volume: -1
repeat: 0
state: no-idea!?",
),
]));
let mut cli = Client::new(mock).unwrap();
let stat = cli.status().await.unwrap();
match stat {
PlayerStatus::Play(curr) => {
assert_eq!(curr.songid, 15);
assert_eq!(
curr.file.to_str().unwrap(),
"U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3"
);
assert_eq!(curr.elapsed, 140.585);
assert_eq!(curr.duration, 249.994);
}
_ => panic!(),
}
let stat = cli.status().await.unwrap();
match stat {
PlayerStatus::Stopped => (),
_ => panic!(),
}
let stat = cli.status().await;
match stat {
Err(_) => (),
Ok(_) => panic!(),
}
}
#[tokio::test]
async fn test_get_sticker() {
let mock = Box::new(Mock::new(&[
(
"sticker get song \"foo.mp3\" \"stick\"",
"sticker: stick=2\nOK\n",
),
(
"sticker get song \"foo.mp3\" \"stick\"",
"ACK [50@0] {sticker} no such sticker\n",
),
(
"sticker get song \"foo.mp3\" \"stick\"",
"",
),
]));
let mut cli = Client::new(mock).unwrap();
let val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.unwrap();
assert_eq!(val, "2");
let _val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.is_none();
let _val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap_err();
}
#[tokio::test]
async fn test_set_sticker() {
let mock = Box::new(Mock::new(&[
("sticker set song \"foo.mp3\" \"stick\" \"2\"", "OK\n"),
(
"sticker set song \"foo.mp3\" \"stick\" \"2\"",
"ACK [50@0] {sticker} some error",
),
(
"sticker set song \"foo.mp3\" \"stick\" \"2\"",
"this makes no sense as a response",
),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err();
}
#[tokio::test]
async fn test_send_to_playlist() {
let mock = Box::new(Mock::new(&[
("playlistadd \"foo.m3u\" \"foo.mp3\"", "OK\n"),
(
"playlistadd \"foo.m3u\" \"foo.mp3\"",
"ACK [101@0] {playlist} some error\n",
),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.send_to_playlist("foo.mp3", "foo.m3u").await.unwrap();
let _val = cli
.send_to_playlist("foo.mp3", "foo.m3u")
.await
.unwrap_err();
}
#[tokio::test]
async fn test_update() {
let mock = Box::new(Mock::new(&[
("update \"foo.mp3\"", "updating_db: 2\nOK\n"),
("update \"foo.mp3\"", "ACK [50@0] {update} blahblahblah"),
("update \"foo.mp3\"", "this makes no sense as a response"),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.update("foo.mp3").await.unwrap();
let _val = cli.update("foo.mp3").await.unwrap_err();
let _val = cli.update("foo.mp3").await.unwrap_err();
}
#[tokio::test]
async fn test_get_stored_playlists() {
let mock = Box::new(Mock::new(&[
(
"listplaylists",
"playlist: saturday-afternoons-in-santa-cruz
Last-Modified: 2020-03-13T17:20:16Z
playlist: gaelic-punk
Last-Modified: 2020-05-24T00:36:02Z
playlist: morning-coffee
Last-Modified: 2020-03-13T17:20:16Z
OK
",
),
("listplaylists", "ACK [1@0] {listplaylists} blahblahblah"),
]));
let mut cli = Client::new(mock).unwrap();
let val = cli.get_stored_playlists().await.unwrap();
assert_eq!(
val,
vec![
String::from("saturday-afternoons-in-santa-cruz"),
String::from("gaelic-punk"),
String::from("morning-coffee")
]
);
let _val = cli.get_stored_playlists().await.unwrap_err();
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum IdleSubSystem {
Player,
Message,
}
impl TryFrom<&str> for IdleSubSystem {
type Error = Error;
fn try_from(text: &str) -> std::result::Result<Self, Self::Error> {
let x = text.to_lowercase();
if x == "player" {
Ok(IdleSubSystem::Player)
} else if x == "message" {
Ok(IdleSubSystem::Message)
} else {
Err(Error::IdleSubsystem {
text: String::from(text),
back: Backtrace::generate(),
})
}
}
}
impl fmt::Display for IdleSubSystem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IdleSubSystem::Player => write!(f, "Player"),
IdleSubSystem::Message => write!(f, "Message"),
}
}
}
pub struct IdleClient {
conn: Box<dyn RequestResponse>,
}
impl IdleClient {
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<IdleClient> {
Self::new(MpdConnection::<TcpStream>::new(addr).await?)
}
pub async fn open<P: AsRef<Path>>(pth: P) -> Result<IdleClient> {
Self::new(MpdConnection::<UnixStream>::new(pth).await?)
}
pub fn new(stream: Box<dyn RequestResponse>) -> Result<IdleClient> {
Ok(IdleClient { conn: stream })
}
pub async fn subscribe(&mut self, chan: &str) -> Result<()> {
let text = self.conn.req(&format!("subscribe {}", chan)).await?;
debug!("Sent subscribe message for {}; got `{}'.", chan, text);
text.starts_with("OK").as_option().context(Subscribe {
text: String::from(text),
})?;
debug!("Subscribed to {}.", chan);
Ok(())
}
pub async fn idle(&mut self) -> Result<IdleSubSystem> {
let text = self.conn.req("idle player message").await?;
debug!("Sent idle message; got `{}'.", text);
text.starts_with("changed: ").as_option().context(Idle {
text: String::from(&text),
})?;
let idx = text.find('\n').context(Idle {
text: String::from(&text),
})?;
let result = IdleSubSystem::try_from(&text[9..idx])?;
let text = text[idx + 1..].to_string();
text.starts_with("OK").as_option().context(Idle {
text: String::from(text),
})?;
Ok(result)
}
pub async fn get_messages(&mut self) -> Result<HashMap<String, Vec<String>>> {
let text = self.conn.req("readmessages").await?;
debug!("Sent readmessages; got `{}'.", text);
let mut m: HashMap<String, Vec<String>> = HashMap::new();
enum State {
Init,
Running,
Finished,
}
let mut state = State::Init;
let mut chan = String::new();
let mut msgs: Vec<String> = Vec::new();
for line in text.lines() {
match state {
State::Init => {
line.starts_with("channel: ")
.as_option()
.context(GetMessages {
text: String::from(line),
})?;
chan = String::from(&line[9..]);
state = State::Running;
}
State::Running => {
if line.starts_with("message: ") {
msgs.push(String::from(&line[9..]));
} else if line.starts_with("channel: ") {
match m.get_mut(&chan) {
Some(v) => v.append(&mut msgs),
None => {
m.insert(chan.clone(), msgs.clone());
}
}
chan = String::from(&line[9..]);
msgs = Vec::new();
} else if line == "OK" {
match m.get_mut(&chan) {
Some(v) => v.append(&mut msgs),
None => {
m.insert(chan.clone(), msgs.clone());
}
}
state = State::Finished;
} else {
return Err(Error::GetMessages {
text: String::from(line),
back: Backtrace::generate(),
});
}
}
State::Finished => {
return Err(Error::GetMessages {
text: String::from(line),
back: Backtrace::generate(),
});
}
}
}
Ok(m)
}
}
#[cfg(test)]
mod idle_client_tests {
use super::test_mock::Mock;
use super::*;
#[tokio::test]
async fn test_get_messages() {
let mock = Box::new(Mock::new(&[(
"readmessages",
"channel: ratings
message: 255
message: 128
channel: send-to-playlist
message: foo.m3u
OK
",
)]));
let mut cli = IdleClient::new(mock).unwrap();
let hm = cli.get_messages().await.unwrap();
let val = hm.get("ratings").unwrap();
assert_eq!(val.len(), 2);
let val = hm.get("send-to-playlist").unwrap();
assert!(val.len() == 1);
}
#[tokio::test]
async fn test_issue_1() {
let mock = Box::new(Mock::new(&[(
"readmessages",
"channel: playcounts
message: a
channel: playcounts
message: b
OK
",
)]));
let mut cli = IdleClient::new(mock).unwrap();
let hm = cli.get_messages().await.unwrap();
let val = hm.get("playcounts").unwrap();
assert_eq!(val.len(), 2);
}
}