pub mod frame;
use std::{collections::HashSet, fmt, iter::FusedIterator, mem, slice, sync::Arc, vec};
use bytes::{Buf, BytesMut};
use tracing::trace;
pub use self::frame::Frame;
use crate::{parser::ParsedComponent, MpdProtocolError};
#[derive(Clone, PartialEq, Eq)]
pub struct Response {
frames: Vec<Frame>,
error: Option<Error>,
}
impl Response {
pub(crate) fn empty() -> Self {
Self {
frames: vec![Frame::empty()],
error: None,
}
}
pub fn is_error(&self) -> bool {
self.error.is_some()
}
pub fn is_success(&self) -> bool {
!self.is_error()
}
pub fn successful_frames(&self) -> usize {
self.frames.len()
}
pub fn frames(&self) -> FramesRef<'_> {
FramesRef {
frames: self.frames.iter(),
error: self.error.as_ref(),
}
}
pub fn into_single_frame(self) -> Result<Frame, Error> {
self.into_iter().next().unwrap()
}
pub(crate) fn field_count(&self) -> usize {
self.frames.iter().map(Frame::fields_len).sum()
}
}
impl fmt::Debug for Response {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Response(")?;
f.debug_list()
.entries(&self.frames)
.entries(&self.error)
.finish()?;
write!(f, ")")
}
}
#[derive(Clone, Debug)]
pub(crate) struct ResponseFieldCache(HashSet<Arc<str>, ahash::RandomState>);
impl ResponseFieldCache {
pub(crate) fn new() -> ResponseFieldCache {
ResponseFieldCache(HashSet::default())
}
pub(crate) fn insert(&mut self, key: &str) -> Arc<str> {
if let Some(k) = self.0.get(key) {
Arc::clone(k)
} else {
let k = Arc::from(key);
self.0.insert(Arc::clone(&k));
k
}
}
}
#[derive(Debug)]
pub(crate) struct ResponseBuilder<'a> {
field_cache: &'a mut ResponseFieldCache,
state: ResponseState,
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum ResponseState {
Initial,
InProgress {
current: Frame,
},
ListInProgress {
current: Frame,
completed_frames: Vec<Frame>,
},
}
impl<'a> ResponseBuilder<'a> {
pub(crate) fn new(field_cache: &'a mut ResponseFieldCache) -> Self {
Self {
field_cache,
state: ResponseState::Initial,
}
}
pub(crate) fn parse(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Response>, MpdProtocolError> {
while !src.is_empty() {
let (remaining, component) = match ParsedComponent::parse(src, self.field_cache) {
Err(e) if e.is_incomplete() => break,
Err(_) => return Err(MpdProtocolError::InvalidMessage),
Ok(p) => p,
};
let msg_end = src.len() - remaining.len();
let mut msg = src.split_to(msg_end);
match component {
ParsedComponent::Field { key, value } => self.field(key, value),
ParsedComponent::BinaryField { data_length } => {
msg.advance(msg.len() - (data_length + 1));
msg.truncate(data_length);
self.binary(msg);
}
ParsedComponent::Error(e) => return Ok(Some(self.error(e))),
ParsedComponent::EndOfFrame => self.finish_frame(),
ParsedComponent::EndOfResponse => return Ok(Some(self.finish())),
}
}
Ok(None)
}
pub(crate) fn is_frame_in_progress(&self) -> bool {
self.state != ResponseState::Initial
}
fn field(&mut self, key: Arc<str>, value: String) {
trace!(?key, ?value, "parsed field");
match &mut self.state {
ResponseState::Initial => {
let mut frame = Frame::empty();
frame.fields.push_field(key, value);
self.state = ResponseState::InProgress { current: frame };
}
ResponseState::InProgress { current }
| ResponseState::ListInProgress { current, .. } => {
current.fields.push_field(key, value);
}
}
}
fn binary(&mut self, binary: BytesMut) {
trace!(length = binary.len(), "parsed binary field");
match &mut self.state {
ResponseState::Initial => {
let mut frame = Frame::empty();
frame.binary = Some(binary);
self.state = ResponseState::InProgress { current: frame };
}
ResponseState::InProgress { current }
| ResponseState::ListInProgress { current, .. } => {
current.binary = Some(binary);
}
}
}
fn finish_frame(&mut self) {
trace!("finished command list frame");
let completed_frames = match mem::replace(&mut self.state, ResponseState::Initial) {
ResponseState::Initial => vec![Frame::empty()],
ResponseState::InProgress { current } => vec![current],
ResponseState::ListInProgress {
current,
mut completed_frames,
} => {
completed_frames.push(current);
completed_frames
}
};
self.state = ResponseState::ListInProgress {
current: Frame::empty(),
completed_frames,
};
}
fn finish(&mut self) -> Response {
trace!("finished response");
match mem::replace(&mut self.state, ResponseState::Initial) {
ResponseState::Initial => Response::empty(),
ResponseState::InProgress { current } => Response {
frames: vec![current],
error: None,
},
ResponseState::ListInProgress {
completed_frames, ..
} => Response {
frames: completed_frames,
error: None,
},
}
}
fn error(&mut self, error: Error) -> Response {
trace!(?error, "parsed error");
match mem::replace(&mut self.state, ResponseState::Initial) {
ResponseState::Initial | ResponseState::InProgress { .. } => Response {
frames: Vec::new(),
error: Some(error),
},
ResponseState::ListInProgress {
completed_frames, ..
} => Response {
frames: completed_frames,
error: Some(error),
},
}
}
}
#[derive(Clone, Debug)]
pub struct FramesRef<'a> {
frames: slice::Iter<'a, Frame>,
error: Option<&'a Error>,
}
impl<'a> Iterator for FramesRef<'a> {
type Item = Result<&'a Frame, &'a Error>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(frame) = self.frames.next() {
Some(Ok(frame))
} else {
self.error.take().map(Err)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.frames.len() + if self.error.is_some() { 1 } else { 0 };
(len, Some(len))
}
}
impl<'a> DoubleEndedIterator for FramesRef<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(e) = self.error.take() {
Some(Err(e))
} else {
self.frames.next_back().map(Ok)
}
}
}
impl<'a> FusedIterator for FramesRef<'a> {}
impl<'a> ExactSizeIterator for FramesRef<'a> {}
impl<'a> IntoIterator for &'a Response {
type Item = Result<&'a Frame, &'a Error>;
type IntoIter = FramesRef<'a>;
fn into_iter(self) -> Self::IntoIter {
self.frames()
}
}
#[derive(Clone, Debug)]
pub struct Frames {
frames: vec::IntoIter<Frame>,
error: Option<Error>,
}
impl Iterator for Frames {
type Item = Result<Frame, Error>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(f) = self.frames.next() {
Some(Ok(f))
} else {
self.error.take().map(Err)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.frames.len() + if self.error.is_some() { 1 } else { 0 };
(len, Some(len))
}
}
impl DoubleEndedIterator for Frames {
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(e) = self.error.take() {
Some(Err(e))
} else {
self.frames.next_back().map(Ok)
}
}
}
impl FusedIterator for Frames {}
impl ExactSizeIterator for Frames {}
impl IntoIterator for Response {
type Item = Result<Frame, Error>;
type IntoIter = Frames;
fn into_iter(self) -> Self::IntoIter {
Frames {
frames: self.frames.into_iter(),
error: self.error,
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Error {
pub code: u64,
pub command_index: u64,
pub current_command: Option<Box<str>>,
pub message: Box<str>,
}
#[cfg(test)]
mod test {
use assert_matches::assert_matches;
use super::*;
fn frame<const N: usize>(fields: [(&str, &str); N], binary: Option<&[u8]>) -> Frame {
let mut out = Frame::empty();
for &(k, v) in &fields {
out.fields.push_field(k.into(), v.into());
}
out.binary = binary.map(BytesMut::from);
out
}
#[test]
fn owned_frames_iter() {
let r = Response {
frames: vec![Frame::empty(), Frame::empty(), Frame::empty()],
error: Some(Error::default()),
};
let mut iter = r.into_iter();
assert_eq!((4, Some(4)), iter.size_hint());
assert_eq!(Some(Ok(Frame::empty())), iter.next());
assert_eq!((3, Some(3)), iter.size_hint());
assert_eq!(Some(Ok(Frame::empty())), iter.next());
assert_eq!((2, Some(2)), iter.size_hint());
assert_eq!(Some(Ok(Frame::empty())), iter.next());
assert_eq!((1, Some(1)), iter.size_hint());
assert_eq!(Some(Err(Error::default())), iter.next());
assert_eq!((0, Some(0)), iter.size_hint());
}
#[test]
fn borrowed_frames_iter() {
let r = Response {
frames: vec![Frame::empty(), Frame::empty(), Frame::empty()],
error: Some(Error::default()),
};
let mut iter = r.frames();
assert_eq!((4, Some(4)), iter.size_hint());
assert_eq!(Some(Ok(&Frame::empty())), iter.next());
assert_eq!((3, Some(3)), iter.size_hint());
assert_eq!(Some(Ok(&Frame::empty())), iter.next());
assert_eq!((2, Some(2)), iter.size_hint());
assert_eq!(Some(Ok(&Frame::empty())), iter.next());
assert_eq!((1, Some(1)), iter.size_hint());
assert_eq!(Some(Err(&Error::default())), iter.next());
assert_eq!((0, Some(0)), iter.size_hint());
}
#[test]
fn simple_response() {
let mut io = BytesMut::from("foo: bar\nOK");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_eq!(builder.state, ResponseState::Initial);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::InProgress {
current: frame([("foo", "bar")], None)
}
);
assert_eq!(io, "OK");
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::InProgress {
current: frame([("foo", "bar")], None)
}
);
assert_eq!(io, "OK");
io.extend_from_slice(b"\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![frame([("foo", "bar")], None)],
error: None
})
);
assert_eq!(builder.state, ResponseState::Initial);
assert_eq!(io, "");
}
#[test]
fn response_with_binary() {
let mut io = BytesMut::from("foo: bar\nbinary: 6\nOK\n");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::InProgress {
current: frame([("foo", "bar")], None)
}
);
assert_eq!(io, "binary: 6\nOK\n");
io.extend_from_slice(b"OK\n\n");
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::InProgress {
current: frame([("foo", "bar")], Some(b"OK\nOK\n")),
}
);
assert_eq!(io, "");
io.extend_from_slice(b"OK\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![frame([("foo", "bar")], Some(b"OK\nOK\n"))],
error: None,
})
);
assert_eq!(builder.state, ResponseState::Initial);
}
#[test]
fn empty_response() {
let mut io = BytesMut::from("OK");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(builder.state, ResponseState::Initial);
io.extend_from_slice(b"\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![Frame::empty()],
error: None,
})
);
}
#[test]
fn error() {
let mut io = BytesMut::from("ACK [5@0] {} unknown command \"foo\"");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(builder.state, ResponseState::Initial);
io.extend_from_slice(b"\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![],
error: Some(Error {
code: 5,
command_index: 0,
current_command: None,
message: Box::from("unknown command \"foo\""),
}),
})
);
assert_eq!(builder.state, ResponseState::Initial);
}
#[test]
fn multiple_messages() {
let mut io = BytesMut::from("foo: bar\nOK\nhello: world\nOK\n");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![frame([("foo", "bar")], None)],
error: None
})
);
assert_eq!(io, "hello: world\nOK\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![frame([("hello", "world")], None)],
error: None
})
);
assert_eq!(io, "");
}
#[test]
fn command_list() {
let mut io = BytesMut::from("foo: bar\n");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::InProgress {
current: frame([("foo", "bar")], None)
}
);
io.extend_from_slice(b"list_OK\n");
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::ListInProgress {
current: Frame::empty(),
completed_frames: vec![frame([("foo", "bar")], None)],
}
);
io.extend_from_slice(b"list_OK\n");
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::ListInProgress {
current: Frame::empty(),
completed_frames: vec![frame([("foo", "bar")], None), Frame::empty()],
}
);
io.extend_from_slice(b"OK\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![frame([("foo", "bar")], None), Frame::empty()],
error: None
})
);
assert_eq!(builder.state, ResponseState::Initial);
}
#[test]
fn command_list_error() {
let mut io = BytesMut::from("list_OK\n");
let mut field_cache = ResponseFieldCache::new();
let mut builder = ResponseBuilder::new(&mut field_cache);
assert_matches!(builder.parse(&mut io), Ok(None));
assert_eq!(
builder.state,
ResponseState::ListInProgress {
current: Frame::empty(),
completed_frames: vec![Frame::empty()],
}
);
io.extend_from_slice(b"ACK [5@1] {} unknown command \"foo\"\n");
assert_eq!(
builder.parse(&mut io).unwrap(),
Some(Response {
frames: vec![Frame::empty()],
error: Some(Error {
code: 5,
command_index: 1,
current_command: None,
message: Box::from("unknown command \"foo\""),
}),
})
);
assert_eq!(builder.state, ResponseState::Initial);
}
#[test]
fn key_interning() {
let mut io = BytesMut::from("foo: bar\nfoo: baz\nOK\n");
let mut field_cache = ResponseFieldCache::new();
let mut resp = ResponseBuilder::new(&mut field_cache)
.parse(&mut io)
.expect("incomplete")
.expect("invalid");
let mut fields = resp.frames.pop().unwrap().into_iter();
let (a, _) = fields.next().unwrap();
let (b, _) = fields.next().unwrap();
assert!(Arc::ptr_eq(&a, &b));
}
}