use async_trait::async_trait;
use regex::Regex;
use snafu::{Backtrace, IntoError, OptionExt, ResultExt, prelude::*};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpStream, ToSocketAddrs, UnixStream};
use tracing::{debug, info};
use lazy_static::lazy_static;
use std::{
collections::HashMap,
convert::TryFrom,
fmt,
marker::{Send, Unpin},
path::{Path, PathBuf},
str::FromStr,
};
#[derive(Debug)]
#[non_exhaustive]
pub enum Operation {
Connect,
Status,
GetSticker,
SetSticker,
SendToPlaylist,
SendMessage,
Update,
GetStoredPlaylists,
RspToUris,
GetStickers,
GetAllSongs,
Add,
Idle,
GetMessages,
}
impl std::fmt::Display for Operation {
#[allow(unreachable_patterns)] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Operation::Connect => write!(f, "Connect"),
Operation::Status => write!(f, "Status"),
Operation::GetSticker => write!(f, "GetSticker"),
Operation::SetSticker => write!(f, "SetSticker"),
Operation::SendToPlaylist => write!(f, "SendToPlaylist"),
Operation::SendMessage => write!(f, "SendMessage"),
Operation::Update => write!(f, "Update"),
Operation::GetStoredPlaylists => write!(f, "GetStoredPlaylists"),
Operation::RspToUris => write!(f, "RspToUris"),
Operation::GetStickers => write!(f, "GetStickers"),
Operation::GetAllSongs => write!(f, "GetAllSongs"),
Operation::Add => write!(f, "Add"),
Operation::Idle => write!(f, "Idle"),
Operation::GetMessages => write!(f, "GetMessages"),
_ => write!(f, "Unknown client operation"),
}
}
}
#[derive(Debug, Snafu)]
#[non_exhaustive]
pub enum Error {
#[snafu(display("Protocol error ({}): {}", op, msg))]
Protocol {
op: Operation,
msg: String,
backtrace: Backtrace,
},
#[snafu(display("Protocol errror ({}): {}", op, source))]
ProtocolConv {
op: Operation,
source: Box<dyn std::error::Error>,
backtrace: Backtrace,
},
#[snafu(display("I/O error: {}", source))]
Io {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Encoding error: {}", source))]
Encoding {
buf: Vec<u8>,
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("While converting sticker ``{}'': {}", sticker, source))]
StickerConversion {
sticker: String,
source: Box<dyn std::error::Error>,
backtrace: Backtrace,
},
#[snafu(display("``{}'' is not a recognized Idle subsystem", text))]
IdleSubSystem { text: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug)]
pub struct CurrentSong {
pub songid: u64,
pub file: std::path::PathBuf,
pub elapsed: f64,
pub duration: f64,
}
impl CurrentSong {
fn new(songid: u64, file: std::path::PathBuf, elapsed: f64, duration: f64) -> CurrentSong {
CurrentSong {
songid: songid,
file: file,
elapsed: elapsed,
duration: duration,
}
}
pub fn played_pct(&self) -> f64 {
self.elapsed / self.duration
}
}
#[derive(Clone, Debug)]
pub enum PlayerStatus {
Play(CurrentSong),
Pause(CurrentSong),
Stopped,
}
impl PlayerStatus {
pub fn current_song(&self) -> Option<&CurrentSong> {
match self {
PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => Some(curr),
PlayerStatus::Stopped => None,
}
}
}
#[async_trait]
pub trait RequestResponse {
async fn req(&mut self, msg: &str) -> Result<String>;
async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String>;
}
#[cfg(test)]
pub mod test_mock {
use super::*;
pub struct Mock {
inmsgs: Vec<String>,
outmsgs: Vec<String>,
}
impl Mock {
pub fn new(convo: &[(&str, &str)]) -> Mock {
let (left, right): (Vec<&str>, Vec<&str>) = convo.iter().copied().rev().unzip();
Mock {
inmsgs: left.iter().map(|x| x.to_string()).collect(),
outmsgs: right.iter().map(|x| x.to_string()).collect(),
}
}
}
#[async_trait]
impl RequestResponse for Mock {
async fn req(&mut self, msg: &str) -> Result<String> {
self.req_w_hint(msg, 512).await
}
async fn req_w_hint(&mut self, msg: &str, _hint: usize) -> Result<String> {
assert_eq!(msg, self.inmsgs.pop().unwrap());
Ok(self.outmsgs.pop().unwrap())
}
}
#[tokio::test]
async fn mock_smoke_test() {
let mut mock = Mock::new(&[("ping", "pong"), ("from", "to")]);
assert_eq!(mock.req("ping").await.unwrap(), "pong");
assert_eq!(mock.req("from").await.unwrap(), "to");
}
#[tokio::test]
#[should_panic]
async fn mock_negative_test() {
let mut mock = Mock::new(&[("ping", "pong")]);
assert_eq!(mock.req("ping").await.unwrap(), "pong");
let _should_panic = mock.req("not there!").await.unwrap();
}
}
pub struct MpdConnection<T: AsyncRead + AsyncWrite + Send + Unpin> {
sock: T,
_protocol_ver: String,
}
#[async_trait]
impl<T> RequestResponse for MpdConnection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin,
{
async fn req(&mut self, msg: &str) -> Result<String> {
self.req_w_hint(msg, 512).await
}
async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String> {
self.sock
.write_all(format!("{}\n", msg).as_bytes())
.await
.context(IoSnafu)?;
let mut buf = Vec::with_capacity(hint);
let mut cb = 0; let mut more = true; while more {
cb += self.sock.read_buf(&mut buf).await.context(IoSnafu)?;
if cb > 2 && char::from(buf[cb - 1]) == '\n' {
let mut idx = cb - 2;
while idx > 0 {
if char::from(buf[idx]) == '\n' {
idx += 1;
break;
}
idx -= 1;
}
if (idx + 2 < cb && char::from(buf[idx]) == 'O' && char::from(buf[idx + 1]) == 'K')
|| (idx + 3 < cb
&& char::from(buf[idx]) == 'A'
&& char::from(buf[idx + 1]) == 'C'
&& char::from(buf[idx + 2]) == 'K')
{
more = false;
}
}
}
String::from_utf8(buf.clone()).context(EncodingSnafu { buf })
}
}
async fn parse_connect_rsp<T>(sock: &mut T) -> Result<String>
where
T: AsyncReadExt + AsyncWriteExt + Send + Unpin,
{
let mut buf = Vec::with_capacity(32);
let _cb = sock.read_buf(&mut buf).await.context(IoSnafu)?;
let text = String::from_utf8(buf.clone()).context(EncodingSnafu { buf })?;
ensure!(
text.starts_with("OK MPD "),
ProtocolSnafu {
op: Operation::Connect,
msg: text.trim()
}
);
info!("Connected {}.", text[7..].trim());
Ok(text[7..].trim().to_string())
}
impl MpdConnection<TcpStream> {
pub async fn new<A: ToSocketAddrs>(addr: A) -> Result<Box<dyn RequestResponse>> {
let mut sock = TcpStream::connect(addr).await.context(IoSnafu)?;
let proto_ver = parse_connect_rsp(&mut sock).await?;
Ok(Box::new(MpdConnection::<TcpStream> {
sock: sock,
_protocol_ver: proto_ver,
}))
}
}
impl MpdConnection<UnixStream> {
pub async fn new<P: AsRef<Path>>(pth: P) -> Result<Box<dyn RequestResponse>> {
let mut sock = UnixStream::connect(pth).await.context(IoSnafu)?;
let proto_ver = parse_connect_rsp(&mut sock).await?;
Ok(Box::new(MpdConnection::<UnixStream> {
sock: sock,
_protocol_ver: proto_ver,
}))
}
}
pub fn quote(text: &str) -> String {
if text.contains(&[' ', '\t', '\'', '"'][..]) {
let mut s = String::from("\"");
for c in text.chars() {
if c == '"' || c == '\\' {
s.push('\\');
}
s.push(c);
}
s.push('"');
s
} else {
text.to_string()
}
}
pub struct Client {
stream: Box<dyn RequestResponse>,
}
lazy_static! {
static ref RE_STATE: regex::Regex = Regex::new(r"(?m)^state: (play|pause|stop)$").unwrap();
static ref RE_SONGID: regex::Regex = Regex::new(r"(?m)^songid: ([0-9]+)$").unwrap();
static ref RE_ELAPSED: regex::Regex = Regex::new(r"(?m)^elapsed: ([.0-9]+)$").unwrap();
static ref RE_FILE: regex::Regex = Regex::new(r"(?m)^file: (.*)$").unwrap();
static ref RE_DURATION: regex::Regex = Regex::new(r"(?m)^duration: (.*)$").unwrap();
}
impl Client {
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> Result<Client> {
Self::new(MpdConnection::<TcpStream>::new(addrs).await?)
}
pub async fn open<P: AsRef<Path>>(pth: P) -> Result<Client> {
Self::new(MpdConnection::<UnixStream>::new(pth).await?)
}
pub fn new(stream: Box<dyn RequestResponse>) -> Result<Client> {
Ok(Client { stream })
}
}
impl Client {
pub async fn status(&mut self) -> Result<PlayerStatus> {
let text = self.stream.req("status").await?;
let proto = || -> Error {
ProtocolSnafu {
op: Operation::Status,
msg: text.to_owned(),
}
.build()
};
let state = RE_STATE
.captures(&text)
.ok_or_else(proto)?
.get(1)
.ok_or_else(proto)?
.as_str();
match state {
"stop" => Ok(PlayerStatus::Stopped),
"play" | "pause" => {
let songid = RE_SONGID
.captures(&text)
.ok_or_else(proto)?
.get(1)
.ok_or_else(proto)?
.as_str()
.parse::<u64>()
.map_err(|err| {
ProtocolConvSnafu {
op: Operation::Status,
}
.into_error(Box::new(err))
})?;
let elapsed = RE_ELAPSED
.captures(&text)
.ok_or_else(proto)?
.get(1)
.ok_or_else(proto)?
.as_str()
.parse::<f64>()
.map_err(|err| {
ProtocolConvSnafu {
op: Operation::Status,
}
.into_error(Box::new(err))
})?;
let text = self.stream.req(&format!("playlistid {}", songid)).await?;
let file = RE_FILE
.captures(&text)
.ok_or_else(proto)?
.get(1)
.ok_or_else(proto)?
.as_str();
let duration = RE_DURATION
.captures(&text)
.ok_or_else(proto)?
.get(1)
.ok_or_else(proto)?
.as_str()
.parse::<f64>()
.map_err(|err| {
ProtocolConvSnafu {
op: Operation::Status,
}
.into_error(Box::new(err))
})?;
let curr = CurrentSong::new(songid, PathBuf::from(file), elapsed, duration);
if state == "play" {
Ok(PlayerStatus::Play(curr))
} else {
Ok(PlayerStatus::Pause(curr))
}
}
_ => ProtocolSnafu {
op: Operation::Status,
msg: state.to_owned(),
}
.fail(),
}
}
pub async fn get_sticker<T: FromStr>(
&mut self,
file: &str,
sticker_name: &str,
) -> Result<Option<T>>
where
<T as FromStr>::Err: std::error::Error + Sync + Send + 'static,
{
let msg = format!("sticker get song {} {}", quote(file), quote(sticker_name));
let text = self.stream.req(&msg).await?;
debug!("Sent message `{}'; got `{}'", &msg, &text);
let prefix = format!("sticker: {}=", sticker_name);
if text.starts_with(&prefix) {
let s = text[prefix.len()..]
.split('\n')
.next()
.context(ProtocolSnafu {
op: Operation::GetSticker,
msg,
})?;
Ok(Some(T::from_str(s).map_err(|err| {
StickerConversionSnafu {
sticker: sticker_name.to_owned(),
}
.into_error(Box::new(err))
})?))
} else {
ensure!(
text.starts_with("ACK [50@0]"),
ProtocolSnafu {
op: Operation::GetSticker,
msg,
}
);
Ok(None)
}
}
pub async fn set_sticker<T: std::fmt::Display>(
&mut self,
file: &str,
sticker_name: &str,
sticker_value: &T,
) -> Result<()> {
let value_as_str = format!("{}", sticker_value);
let msg = format!(
"sticker set song {} {} {}",
quote(file),
quote(sticker_name),
quote(&value_as_str)
);
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'", &msg, &text);
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::SetSticker,
msg: msg
}
);
Ok(())
}
pub async fn send_to_playlist(&mut self, file: &str, pl: &str) -> Result<()> {
let msg = format!("playlistadd {} {}", quote(pl), quote(file));
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::SendToPlaylist,
msg
}
);
Ok(())
}
pub async fn send_message(&mut self, chan: &str, msg: &str) -> Result<()> {
let msg = format!("sendmessage {} {}", chan, quote(msg));
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::SendMessage,
msg: text
}
);
Ok(())
}
pub async fn update(&mut self, uri: &str) -> Result<u64> {
let msg = format!("update \"{}\"", uri);
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
let prefix = "updating_db: ";
ensure!(
text.starts_with(prefix),
ProtocolSnafu {
op: Operation::Update,
msg: &text
}
);
text[prefix.len()..].split('\n').collect::<Vec<&str>>()[0]
.to_string()
.parse::<u64>()
.map_err(|err| {
ProtocolConvSnafu {
op: Operation::Update,
}
.into_error(Box::new(err))
})
}
pub async fn get_stored_playlists(&mut self) -> Result<std::vec::Vec<String>> {
let text = self.stream.req("listplaylists").await?;
debug!("Sent listplaylists; got `{}'.", &text);
ensure!(
!text.starts_with("ACK"),
ProtocolSnafu {
op: Operation::GetStoredPlaylists,
msg: text
}
);
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("playlist: ") {
Some(String::from(&x[10..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
fn search_rsp_to_uris(&self, text: &str) -> Result<std::vec::Vec<String>> {
ensure!(
!text.starts_with("ACK"),
ProtocolSnafu {
op: Operation::RspToUris,
msg: text.to_owned()
}
);
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("file: ") {
Some(String::from(&x[6..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
pub async fn find1(
&mut self,
cond: &str,
val: &str,
case: bool,
) -> Result<std::vec::Vec<String>> {
let cmd = format!(
"{} {}",
if case { "find" } else { "search" },
quote(&format!("({} {})", cond, val))
);
let text = self.stream.req(&cmd).await?;
self.search_rsp_to_uris(&text)
}
pub async fn find2(
&mut self,
attr: &str,
op: &str,
val: &str,
case: bool,
) -> Result<std::vec::Vec<String>> {
let cmd = format!(
"{} {}",
if case { "find" } else { "search" },
quote(&format!("({} {} {})", attr, op, val))
);
debug!("find2 sending ``{}''", cmd);
let text = self.stream.req(&cmd).await?;
self.search_rsp_to_uris(&text)
}
pub async fn get_stickers(&mut self, sticker: &str) -> Result<HashMap<String, String>> {
let text = self
.stream
.req(&format!("sticker find song \"\" {}", sticker))
.await?;
ensure!(
!text.starts_with("ACK"),
ProtocolSnafu {
op: Operation::GetStickers,
msg: text,
}
);
let mut m = HashMap::new();
let mut lines = text.lines();
loop {
let file = lines.next().context(ProtocolSnafu {
op: Operation::GetStickers,
msg: text.to_owned(),
})?;
if "OK" == file {
break;
}
let val = lines.next().context(ProtocolSnafu {
op: Operation::GetStickers,
msg: text.to_owned(),
})?;
m.insert(
String::from(&file[6..]),
String::from(&val[10 + sticker.len()..]),
);
}
Ok(m)
}
pub async fn get_all_songs(&mut self) -> Result<std::vec::Vec<String>> {
let text = self.stream.req("find \"(base '')\"").await?;
ensure!(
!text.starts_with("ACK"),
ProtocolSnafu {
op: Operation::GetAllSongs,
msg: text,
}
);
Ok(text
.lines()
.filter_map(|x| {
if x.starts_with("file: ") {
Some(String::from(&x[6..]))
} else {
None
}
})
.collect::<Vec<String>>())
}
pub async fn add(&mut self, uri: &str) -> Result<()> {
let msg = format!("add {}", quote(uri));
let text = self.stream.req(&msg).await?;
debug!("Sent `{}'; got `{}'.", &msg, &text);
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::Add,
msg: &text
}
);
Ok(())
}
}
#[cfg(test)]
mod client_tests {
use super::test_mock::Mock;
use super::*;
#[tokio::test]
async fn client_smoke_test() {
let mock = Box::new(Mock::new(&[(
"sticker get song foo.mp3 stick",
"sticker: stick=splat\nOK\n",
)]));
let mut cli = Client::new(mock).unwrap();
let val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.unwrap();
assert_eq!(val, "splat");
}
#[tokio::test]
async fn test_status() {
let mock = Box::new(Mock::new(&[
(
"status",
"volume: -1
repeat: 0
random: 0
single: 0
consume: 0
playlist: 3
playlistlength: 87
mixrampdb: 0.000000
state: play
song: 14
songid: 15
time: 141:250
bitrate: 128
audio: 44100:24:2
nextsong: 15
nextsongid: 16
elapsed: 140.585
OK",
),
(
"playlistid 15",
"file: U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3
Last-Modified: 2004-12-24T19:26:13Z
Artist: U2
Title: Who's Gonna RIDE Your WILD HOR
Genre: Pop
Time: 316
Pos: 41
Id: 42
duration: 249.994
OK",
),
(
"status",
"volume: -1
repeat: 0
random: 0
single: 0
consume: 0
playlist: 84
playlistlength: 27
mixrampdb: 0.000000
state: stop
OK",
),
(
"status",
"volume: -1
repeat: 0
state: no-idea!?",
),
]));
let mut cli = Client::new(mock).unwrap();
let stat = cli.status().await.unwrap();
match stat {
PlayerStatus::Play(curr) => {
assert_eq!(curr.songid, 15);
assert_eq!(
curr.file.to_str().unwrap(),
"U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3"
);
assert_eq!(curr.elapsed, 140.585);
assert_eq!(curr.duration, 249.994);
}
_ => panic!(),
}
let stat = cli.status().await.unwrap();
match stat {
PlayerStatus::Stopped => (),
_ => panic!(),
}
let stat = cli.status().await;
match stat {
Err(_) => (),
Ok(_) => panic!(),
}
}
#[tokio::test]
async fn test_get_sticker() {
let mock = Box::new(Mock::new(&[
(
"sticker get song foo.mp3 stick",
"sticker: stick=2\nOK\n",
),
(
"sticker get song foo.mp3 stick",
"ACK [50@0] {sticker} no such sticker\n",
),
(
"sticker get song foo.mp3 stick",
"",
),
(
"sticker get song \"filename_with\\\"doublequotes\\\".flac\" unwoundstack.com:playcount",
"sticker: unwoundstack.com:playcount=11\nOK\n",
),
]));
let mut cli = Client::new(mock).unwrap();
let val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.unwrap();
assert_eq!(val, "2");
let _val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap()
.is_none();
let _val = cli
.get_sticker::<String>("foo.mp3", "stick")
.await
.unwrap_err();
let val = cli
.get_sticker::<String>(
"filename_with\"doublequotes\".flac",
"unwoundstack.com:playcount",
)
.await
.unwrap()
.unwrap();
assert_eq!(val, "11");
}
#[tokio::test]
async fn test_set_sticker() {
let mock = Box::new(Mock::new(&[
("sticker set song foo.mp3 stick 2", "OK\n"),
(
"sticker set song foo.mp3 stick 2",
"ACK [50@0] {sticker} some error",
),
(
"sticker set song foo.mp3 stick 2",
"this makes no sense as a response",
),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err();
let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err();
}
#[tokio::test]
async fn test_send_to_playlist() {
let mock = Box::new(Mock::new(&[
("playlistadd foo.m3u foo.mp3", "OK\n"),
(
"playlistadd foo.m3u foo.mp3",
"ACK [101@0] {playlist} some error\n",
),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.send_to_playlist("foo.mp3", "foo.m3u").await.unwrap();
let _val = cli
.send_to_playlist("foo.mp3", "foo.m3u")
.await
.unwrap_err();
}
#[tokio::test]
async fn test_update() {
let mock = Box::new(Mock::new(&[
("update \"foo.mp3\"", "updating_db: 2\nOK\n"),
("update \"foo.mp3\"", "ACK [50@0] {update} blahblahblah"),
("update \"foo.mp3\"", "this makes no sense as a response"),
]));
let mut cli = Client::new(mock).unwrap();
let _val = cli.update("foo.mp3").await.unwrap();
let _val = cli.update("foo.mp3").await.unwrap_err();
let _val = cli.update("foo.mp3").await.unwrap_err();
}
#[tokio::test]
async fn test_get_stored_playlists() {
let mock = Box::new(Mock::new(&[
(
"listplaylists",
"playlist: saturday-afternoons-in-santa-cruz
Last-Modified: 2020-03-13T17:20:16Z
playlist: gaelic-punk
Last-Modified: 2020-05-24T00:36:02Z
playlist: morning-coffee
Last-Modified: 2020-03-13T17:20:16Z
OK
",
),
("listplaylists", "ACK [1@0] {listplaylists} blahblahblah"),
]));
let mut cli = Client::new(mock).unwrap();
let val = cli.get_stored_playlists().await.unwrap();
assert_eq!(
val,
vec![
String::from("saturday-afternoons-in-santa-cruz"),
String::from("gaelic-punk"),
String::from("morning-coffee")
]
);
let _val = cli.get_stored_playlists().await.unwrap_err();
}
}
#[non_exhaustive]
#[derive(Debug, PartialEq, Eq)]
pub enum IdleSubSystem {
Player,
Message,
}
impl TryFrom<&str> for IdleSubSystem {
type Error = Error;
fn try_from(text: &str) -> std::result::Result<Self, Self::Error> {
let x = text.to_lowercase();
if x == "player" {
Ok(IdleSubSystem::Player)
} else if x == "message" {
Ok(IdleSubSystem::Message)
} else {
Err(IdleSubSystemSnafu {
text: String::from(text),
}
.build())
}
}
}
impl fmt::Display for IdleSubSystem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IdleSubSystem::Player => write!(f, "Player"),
IdleSubSystem::Message => write!(f, "Message"),
}
}
}
pub struct IdleClient {
conn: Box<dyn RequestResponse>,
}
impl IdleClient {
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<IdleClient> {
Self::new(MpdConnection::<TcpStream>::new(addr).await?)
}
pub async fn open<P: AsRef<Path>>(pth: P) -> Result<IdleClient> {
Self::new(MpdConnection::<UnixStream>::new(pth).await?)
}
pub fn new(stream: Box<dyn RequestResponse>) -> Result<IdleClient> {
Ok(IdleClient { conn: stream })
}
pub async fn subscribe(&mut self, chan: &str) -> Result<()> {
let text = self.conn.req(&format!("subscribe {}", chan)).await?;
debug!("Sent subscribe message for {}; got `{}'.", chan, text);
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::Connect,
msg: &text
}
);
debug!("Subscribed to {}.", chan);
Ok(())
}
pub async fn idle(&mut self) -> Result<IdleSubSystem> {
let text = self.conn.req("idle player message").await?;
debug!("Sent idle message; got `{}'.", text);
ensure!(
text.starts_with("changed: "),
ProtocolSnafu {
op: Operation::Idle,
msg: &text
}
);
let idx = text.find('\n').context(ProtocolSnafu {
op: Operation::Idle,
msg: text.to_owned(),
})?;
let result = IdleSubSystem::try_from(&text[9..idx])?;
let text = text[idx + 1..].to_string();
ensure!(
text.starts_with("OK"),
ProtocolSnafu {
op: Operation::Idle,
msg: &text
}
);
Ok(result)
}
pub async fn get_messages(&mut self) -> Result<HashMap<String, Vec<String>>> {
let text = self.conn.req("readmessages").await?;
debug!("Sent readmessages; got `{}'.", text);
let mut m: HashMap<String, Vec<String>> = HashMap::new();
enum State {
Init,
Running,
Finished,
}
let mut state = State::Init;
let mut chan = String::new();
let mut msgs: Vec<String> = Vec::new();
for line in text.lines() {
match state {
State::Init => {
ensure!(
line.starts_with("channel: "),
ProtocolSnafu {
op: Operation::GetMessages,
msg: line.to_owned()
}
);
chan = String::from(&line[9..]);
state = State::Running;
}
State::Running => {
if line.starts_with("message: ") {
msgs.push(String::from(&line[9..]));
} else if line.starts_with("channel: ") {
match m.get_mut(&chan) {
Some(v) => v.append(&mut msgs),
None => {
m.insert(chan.clone(), msgs.clone());
}
}
chan = String::from(&line[9..]);
msgs = Vec::new();
} else if line == "OK" {
match m.get_mut(&chan) {
Some(v) => v.append(&mut msgs),
None => {
m.insert(chan.clone(), msgs.clone());
}
}
state = State::Finished;
} else {
return Err(ProtocolSnafu {
op: Operation::GetMessages,
msg: text,
}
.build());
}
}
State::Finished => {
return Err(ProtocolSnafu {
op: Operation::GetMessages,
msg: String::from(line),
}
.build());
}
}
}
Ok(m)
}
}
#[cfg(test)]
mod idle_client_tests {
use super::test_mock::Mock;
use super::*;
#[tokio::test]
async fn test_get_messages() {
let mock = Box::new(Mock::new(&[(
"readmessages",
"channel: ratings
message: 255
message: 128
channel: send-to-playlist
message: foo.m3u
OK
",
)]));
let mut cli = IdleClient::new(mock).unwrap();
let hm = cli.get_messages().await.unwrap();
let val = hm.get("ratings").unwrap();
assert_eq!(val.len(), 2);
let val = hm.get("send-to-playlist").unwrap();
assert!(val.len() == 1);
}
#[tokio::test]
async fn test_issue_1() {
let mock = Box::new(Mock::new(&[(
"readmessages",
"channel: playcounts
message: a
channel: playcounts
message: b
OK
",
)]));
let mut cli = IdleClient::new(mock).unwrap();
let hm = cli.get_messages().await.unwrap();
let val = hm.get("playcounts").unwrap();
assert_eq!(val.len(), 2);
}
}