1pub mod frame;
4
5use std::{collections::HashSet, fmt, iter::FusedIterator, mem, slice, sync::Arc, vec};
6
7use bytes::{Buf, BytesMut};
8use tracing::trace;
9
10pub use self::frame::Frame;
11use crate::{parser::ParsedComponent, MpdProtocolError};
12
13#[derive(Clone, PartialEq, Eq)]
18pub struct Response {
19 frames: Vec<Frame>,
21 error: Option<Error>,
23}
24
25impl Response {
26 pub(crate) fn empty() -> Self {
29 Self {
30 frames: vec![Frame::empty()],
31 error: None,
32 }
33 }
34
35 pub fn is_error(&self) -> bool {
40 self.error.is_some()
41 }
42
43 pub fn is_success(&self) -> bool {
45 !self.is_error()
46 }
47
48 pub fn successful_frames(&self) -> usize {
52 self.frames.len()
53 }
54
55 pub fn frames(&self) -> FramesRef<'_> {
60 FramesRef {
61 frames: self.frames.iter(),
62 error: self.error.as_ref(),
63 }
64 }
65
66 pub fn into_single_frame(self) -> Result<Frame, Error> {
70 self.into_iter().next().unwrap()
72 }
73
74 pub(crate) fn field_count(&self) -> usize {
75 self.frames.iter().map(Frame::fields_len).sum()
76 }
77}
78
79impl fmt::Debug for Response {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 write!(f, "Response(")?;
82
83 f.debug_list()
84 .entries(&self.frames)
85 .entries(&self.error)
86 .finish()?;
87
88 write!(f, ")")
89 }
90}
91
92#[derive(Clone, Debug)]
94pub(crate) struct ResponseFieldCache(HashSet<Arc<str>, ahash::RandomState>);
95
96impl ResponseFieldCache {
97 pub(crate) fn new() -> ResponseFieldCache {
99 ResponseFieldCache(HashSet::default())
100 }
101
102 pub(crate) fn insert(&mut self, key: &str) -> Arc<str> {
104 if let Some(k) = self.0.get(key) {
105 Arc::clone(k)
106 } else {
107 let k = Arc::from(key);
108 self.0.insert(Arc::clone(&k));
109 k
110 }
111 }
112}
113
114#[derive(Debug)]
115pub(crate) struct ResponseBuilder<'a> {
116 field_cache: &'a mut ResponseFieldCache,
117 state: ResponseState,
118}
119
120#[derive(Clone, Debug, PartialEq, Eq)]
121enum ResponseState {
122 Initial,
123 InProgress {
124 current: Frame,
125 },
126 ListInProgress {
127 current: Frame,
128 completed_frames: Vec<Frame>,
129 },
130}
131
132impl<'a> ResponseBuilder<'a> {
133 pub(crate) fn new(field_cache: &'a mut ResponseFieldCache) -> Self {
134 Self {
135 field_cache,
136 state: ResponseState::Initial,
137 }
138 }
139
140 pub(crate) fn parse(
141 &mut self,
142 src: &mut BytesMut,
143 ) -> Result<Option<Response>, MpdProtocolError> {
144 while !src.is_empty() {
145 let (remaining, component) = match ParsedComponent::parse(src, self.field_cache) {
146 Err(e) if e.is_incomplete() => break,
147 Err(_) => return Err(MpdProtocolError::InvalidMessage),
148 Ok(p) => p,
149 };
150
151 let msg_end = src.len() - remaining.len();
152 let mut msg = src.split_to(msg_end);
153
154 match component {
155 ParsedComponent::Field { key, value } => self.field(key, value),
156 ParsedComponent::BinaryField { data_length } => {
157 msg.advance(msg.len() - (data_length + 1));
158 msg.truncate(data_length);
159 self.binary(msg);
160 }
161 ParsedComponent::Error(e) => return Ok(Some(self.error(e))),
162 ParsedComponent::EndOfFrame => self.finish_frame(),
163 ParsedComponent::EndOfResponse => return Ok(Some(self.finish())),
164 }
165 }
166
167 Ok(None)
168 }
169
170 pub(crate) fn is_frame_in_progress(&self) -> bool {
171 self.state != ResponseState::Initial
172 }
173
174 fn field(&mut self, key: Arc<str>, value: String) {
175 trace!(?key, ?value, "parsed field");
176 match &mut self.state {
177 ResponseState::Initial => {
178 let mut frame = Frame::empty();
179 frame.fields.push_field(key, value);
180 self.state = ResponseState::InProgress { current: frame };
181 }
182 ResponseState::InProgress { current }
183 | ResponseState::ListInProgress { current, .. } => {
184 current.fields.push_field(key, value);
185 }
186 }
187 }
188
189 fn binary(&mut self, binary: BytesMut) {
190 trace!(length = binary.len(), "parsed binary field");
191 match &mut self.state {
192 ResponseState::Initial => {
193 let mut frame = Frame::empty();
194 frame.binary = Some(binary);
195 self.state = ResponseState::InProgress { current: frame };
196 }
197 ResponseState::InProgress { current }
198 | ResponseState::ListInProgress { current, .. } => {
199 current.binary = Some(binary);
200 }
201 }
202 }
203
204 fn finish_frame(&mut self) {
205 trace!("finished command list frame");
206 let completed_frames = match mem::replace(&mut self.state, ResponseState::Initial) {
207 ResponseState::Initial => vec![Frame::empty()],
208 ResponseState::InProgress { current } => vec![current],
209 ResponseState::ListInProgress {
210 current,
211 mut completed_frames,
212 } => {
213 completed_frames.push(current);
214 completed_frames
215 }
216 };
217
218 self.state = ResponseState::ListInProgress {
219 current: Frame::empty(),
220 completed_frames,
221 };
222 }
223
224 fn finish(&mut self) -> Response {
225 trace!("finished response");
226 match mem::replace(&mut self.state, ResponseState::Initial) {
227 ResponseState::Initial => Response::empty(),
228 ResponseState::InProgress { current } => Response {
229 frames: vec![current],
230 error: None,
231 },
232 ResponseState::ListInProgress {
233 completed_frames, ..
234 } => Response {
235 frames: completed_frames,
236 error: None,
237 },
238 }
239 }
240
241 fn error(&mut self, error: Error) -> Response {
242 trace!(?error, "parsed error");
243 match mem::replace(&mut self.state, ResponseState::Initial) {
244 ResponseState::Initial | ResponseState::InProgress { .. } => Response {
245 frames: Vec::new(),
246 error: Some(error),
247 },
248 ResponseState::ListInProgress {
249 completed_frames, ..
250 } => Response {
251 frames: completed_frames,
252 error: Some(error),
253 },
254 }
255 }
256}
257
258#[derive(Clone, Debug)]
260pub struct FramesRef<'a> {
261 frames: slice::Iter<'a, Frame>,
262 error: Option<&'a Error>,
263}
264
265impl<'a> Iterator for FramesRef<'a> {
266 type Item = Result<&'a Frame, &'a Error>;
267
268 fn next(&mut self) -> Option<Self::Item> {
269 if let Some(frame) = self.frames.next() {
270 Some(Ok(frame))
271 } else {
272 self.error.take().map(Err)
273 }
274 }
275
276 fn size_hint(&self) -> (usize, Option<usize>) {
277 let len = self.frames.len() + if self.error.is_some() { 1 } else { 0 };
279
280 (len, Some(len))
281 }
282}
283
284impl<'a> DoubleEndedIterator for FramesRef<'a> {
285 fn next_back(&mut self) -> Option<Self::Item> {
286 if let Some(e) = self.error.take() {
287 Some(Err(e))
288 } else {
289 self.frames.next_back().map(Ok)
290 }
291 }
292}
293
294impl<'a> FusedIterator for FramesRef<'a> {}
295impl<'a> ExactSizeIterator for FramesRef<'a> {}
296
297impl<'a> IntoIterator for &'a Response {
298 type Item = Result<&'a Frame, &'a Error>;
299 type IntoIter = FramesRef<'a>;
300
301 fn into_iter(self) -> Self::IntoIter {
302 self.frames()
303 }
304}
305
306#[derive(Clone, Debug)]
309pub struct Frames {
310 frames: vec::IntoIter<Frame>,
311 error: Option<Error>,
312}
313
314impl Iterator for Frames {
315 type Item = Result<Frame, Error>;
316
317 fn next(&mut self) -> Option<Self::Item> {
318 if let Some(f) = self.frames.next() {
319 Some(Ok(f))
320 } else {
321 self.error.take().map(Err)
322 }
323 }
324
325 fn size_hint(&self) -> (usize, Option<usize>) {
326 let len = self.frames.len() + if self.error.is_some() { 1 } else { 0 };
328
329 (len, Some(len))
330 }
331}
332
333impl DoubleEndedIterator for Frames {
334 fn next_back(&mut self) -> Option<Self::Item> {
335 if let Some(e) = self.error.take() {
336 Some(Err(e))
337 } else {
338 self.frames.next_back().map(Ok)
339 }
340 }
341}
342
343impl FusedIterator for Frames {}
344impl ExactSizeIterator for Frames {}
345
346impl IntoIterator for Response {
347 type Item = Result<Frame, Error>;
348 type IntoIter = Frames;
349
350 fn into_iter(self) -> Self::IntoIter {
351 Frames {
352 frames: self.frames.into_iter(),
353 error: self.error,
354 }
355 }
356}
357
358#[derive(Clone, Debug, Default, PartialEq, Eq)]
360pub struct Error {
361 pub code: u64,
365 pub command_index: u64,
367 pub current_command: Option<Box<str>>,
369 pub message: Box<str>,
371}
372
373#[cfg(test)]
374mod test {
375 use assert_matches::assert_matches;
376
377 use super::*;
378
379 fn frame<const N: usize>(fields: [(&str, &str); N], binary: Option<&[u8]>) -> Frame {
380 let mut out = Frame::empty();
381
382 for &(k, v) in &fields {
383 out.fields.push_field(k.into(), v.into());
384 }
385
386 out.binary = binary.map(BytesMut::from);
387
388 out
389 }
390
391 #[test]
392 fn owned_frames_iter() {
393 let r = Response {
394 frames: vec![Frame::empty(), Frame::empty(), Frame::empty()],
395 error: Some(Error::default()),
396 };
397
398 let mut iter = r.into_iter();
399
400 assert_eq!((4, Some(4)), iter.size_hint());
401 assert_eq!(Some(Ok(Frame::empty())), iter.next());
402
403 assert_eq!((3, Some(3)), iter.size_hint());
404 assert_eq!(Some(Ok(Frame::empty())), iter.next());
405
406 assert_eq!((2, Some(2)), iter.size_hint());
407 assert_eq!(Some(Ok(Frame::empty())), iter.next());
408
409 assert_eq!((1, Some(1)), iter.size_hint());
410 assert_eq!(Some(Err(Error::default())), iter.next());
411
412 assert_eq!((0, Some(0)), iter.size_hint());
413 }
414
415 #[test]
416 fn borrowed_frames_iter() {
417 let r = Response {
418 frames: vec![Frame::empty(), Frame::empty(), Frame::empty()],
419 error: Some(Error::default()),
420 };
421
422 let mut iter = r.frames();
423
424 assert_eq!((4, Some(4)), iter.size_hint());
425 assert_eq!(Some(Ok(&Frame::empty())), iter.next());
426
427 assert_eq!((3, Some(3)), iter.size_hint());
428 assert_eq!(Some(Ok(&Frame::empty())), iter.next());
429
430 assert_eq!((2, Some(2)), iter.size_hint());
431 assert_eq!(Some(Ok(&Frame::empty())), iter.next());
432
433 assert_eq!((1, Some(1)), iter.size_hint());
434 assert_eq!(Some(Err(&Error::default())), iter.next());
435
436 assert_eq!((0, Some(0)), iter.size_hint());
437 }
438
439 #[test]
440 fn simple_response() {
441 let mut io = BytesMut::from("foo: bar\nOK");
442
443 let mut field_cache = ResponseFieldCache::new();
444 let mut builder = ResponseBuilder::new(&mut field_cache);
445 assert_eq!(builder.state, ResponseState::Initial);
446
447 assert_matches!(builder.parse(&mut io), Ok(None));
449 assert_eq!(
450 builder.state,
451 ResponseState::InProgress {
452 current: frame([("foo", "bar")], None)
453 }
454 );
455 assert_eq!(io, "OK");
456
457 assert_matches!(builder.parse(&mut io), Ok(None));
459 assert_eq!(
460 builder.state,
461 ResponseState::InProgress {
462 current: frame([("foo", "bar")], None)
463 }
464 );
465 assert_eq!(io, "OK");
466
467 io.extend_from_slice(b"\n");
468
469 assert_eq!(
471 builder.parse(&mut io).unwrap(),
472 Some(Response {
473 frames: vec![frame([("foo", "bar")], None)],
474 error: None
475 })
476 );
477 assert_eq!(builder.state, ResponseState::Initial);
478 assert_eq!(io, "");
479 }
480
481 #[test]
482 fn response_with_binary() {
483 let mut io = BytesMut::from("foo: bar\nbinary: 6\nOK\n");
484 let mut field_cache = ResponseFieldCache::new();
485 let mut builder = ResponseBuilder::new(&mut field_cache);
486
487 assert_matches!(builder.parse(&mut io), Ok(None));
488 assert_eq!(
489 builder.state,
490 ResponseState::InProgress {
491 current: frame([("foo", "bar")], None)
492 }
493 );
494 assert_eq!(io, "binary: 6\nOK\n");
495
496 io.extend_from_slice(b"OK\n\n");
497
498 assert_matches!(builder.parse(&mut io), Ok(None));
499 assert_eq!(
500 builder.state,
501 ResponseState::InProgress {
502 current: frame([("foo", "bar")], Some(b"OK\nOK\n")),
503 }
504 );
505 assert_eq!(io, "");
506
507 io.extend_from_slice(b"OK\n");
508 assert_eq!(
509 builder.parse(&mut io).unwrap(),
510 Some(Response {
511 frames: vec![frame([("foo", "bar")], Some(b"OK\nOK\n"))],
512 error: None,
513 })
514 );
515 assert_eq!(builder.state, ResponseState::Initial);
516 }
517
518 #[test]
519 fn empty_response() {
520 let mut io = BytesMut::from("OK");
521 let mut field_cache = ResponseFieldCache::new();
522 let mut builder = ResponseBuilder::new(&mut field_cache);
523
524 assert_matches!(builder.parse(&mut io), Ok(None));
525 assert_eq!(builder.state, ResponseState::Initial);
526
527 io.extend_from_slice(b"\n");
528
529 assert_eq!(
530 builder.parse(&mut io).unwrap(),
531 Some(Response {
532 frames: vec![Frame::empty()],
533 error: None,
534 })
535 );
536 }
537
538 #[test]
539 fn error() {
540 let mut io = BytesMut::from("ACK [5@0] {} unknown command \"foo\"");
541 let mut field_cache = ResponseFieldCache::new();
542 let mut builder = ResponseBuilder::new(&mut field_cache);
543
544 assert_matches!(builder.parse(&mut io), Ok(None));
545 assert_eq!(builder.state, ResponseState::Initial);
546
547 io.extend_from_slice(b"\n");
548
549 assert_eq!(
550 builder.parse(&mut io).unwrap(),
551 Some(Response {
552 frames: vec![],
553 error: Some(Error {
554 code: 5,
555 command_index: 0,
556 current_command: None,
557 message: Box::from("unknown command \"foo\""),
558 }),
559 })
560 );
561 assert_eq!(builder.state, ResponseState::Initial);
562 }
563
564 #[test]
565 fn multiple_messages() {
566 let mut io = BytesMut::from("foo: bar\nOK\nhello: world\nOK\n");
567 let mut field_cache = ResponseFieldCache::new();
568 let mut builder = ResponseBuilder::new(&mut field_cache);
569
570 assert_eq!(
571 builder.parse(&mut io).unwrap(),
572 Some(Response {
573 frames: vec![frame([("foo", "bar")], None)],
574 error: None
575 })
576 );
577 assert_eq!(io, "hello: world\nOK\n");
578
579 assert_eq!(
580 builder.parse(&mut io).unwrap(),
581 Some(Response {
582 frames: vec![frame([("hello", "world")], None)],
583 error: None
584 })
585 );
586 assert_eq!(io, "");
587 }
588
589 #[test]
590 fn command_list() {
591 let mut io = BytesMut::from("foo: bar\n");
592 let mut field_cache = ResponseFieldCache::new();
593 let mut builder = ResponseBuilder::new(&mut field_cache);
594
595 assert_matches!(builder.parse(&mut io), Ok(None));
596 assert_eq!(
597 builder.state,
598 ResponseState::InProgress {
599 current: frame([("foo", "bar")], None)
600 }
601 );
602
603 io.extend_from_slice(b"list_OK\n");
604
605 assert_matches!(builder.parse(&mut io), Ok(None));
606 assert_eq!(
607 builder.state,
608 ResponseState::ListInProgress {
609 current: Frame::empty(),
610 completed_frames: vec![frame([("foo", "bar")], None)],
611 }
612 );
613
614 io.extend_from_slice(b"list_OK\n");
615
616 assert_matches!(builder.parse(&mut io), Ok(None));
617 assert_eq!(
618 builder.state,
619 ResponseState::ListInProgress {
620 current: Frame::empty(),
621 completed_frames: vec![frame([("foo", "bar")], None), Frame::empty()],
622 }
623 );
624
625 io.extend_from_slice(b"OK\n");
626
627 assert_eq!(
628 builder.parse(&mut io).unwrap(),
629 Some(Response {
630 frames: vec![frame([("foo", "bar")], None), Frame::empty()],
631 error: None
632 })
633 );
634 assert_eq!(builder.state, ResponseState::Initial);
635 }
636
637 #[test]
638 fn command_list_error() {
639 let mut io = BytesMut::from("list_OK\n");
640 let mut field_cache = ResponseFieldCache::new();
641 let mut builder = ResponseBuilder::new(&mut field_cache);
642
643 assert_matches!(builder.parse(&mut io), Ok(None));
644 assert_eq!(
645 builder.state,
646 ResponseState::ListInProgress {
647 current: Frame::empty(),
648 completed_frames: vec![Frame::empty()],
649 }
650 );
651
652 io.extend_from_slice(b"ACK [5@1] {} unknown command \"foo\"\n");
653
654 assert_eq!(
655 builder.parse(&mut io).unwrap(),
656 Some(Response {
657 frames: vec![Frame::empty()],
658 error: Some(Error {
659 code: 5,
660 command_index: 1,
661 current_command: None,
662 message: Box::from("unknown command \"foo\""),
663 }),
664 })
665 );
666 assert_eq!(builder.state, ResponseState::Initial);
667 }
668
669 #[test]
670 fn key_interning() {
671 let mut io = BytesMut::from("foo: bar\nfoo: baz\nOK\n");
672
673 let mut field_cache = ResponseFieldCache::new();
674 let mut resp = ResponseBuilder::new(&mut field_cache)
675 .parse(&mut io)
676 .expect("incomplete")
677 .expect("invalid");
678
679 let mut fields = resp.frames.pop().unwrap().into_iter();
680
681 let (a, _) = fields.next().unwrap();
682 let (b, _) = fields.next().unwrap();
683
684 assert!(Arc::ptr_eq(&a, &b));
685 }
686}