amqpr_api/basic/
deliver.rs1use amqpr_codec::Frame;
2use amqpr_codec::content_header::ContentHeaderPayload;
3use amqpr_codec::content_body::ContentBodyPayload;
4use amqpr_codec::frame::method::basic::DeliverMethod;
5
6use futures::{Future, Stream, Poll, Async};
7
8use common::Should;
9use errors::*;
10
11
12pub fn get_delivered<S>(stream: S) -> Delivered<S>
23where
24 S: Stream<Item = Frame>,
25 S::Error: From<Error>,
26{
27 Delivered::ReceivingDeliverMethod(Should::new(stream))
28}
29
30
31#[derive(Debug, Clone)]
33pub struct DeliveredItem {
34 pub meta: DeliverMethod,
35 pub header: ContentHeaderPayload,
36 pub body: ContentBodyPayload,
37}
38
39
40pub enum Delivered<S> {
42 ReceivingDeliverMethod(Should<S>),
43 ReceivingContentHeader(Should<S>, Should<DeliverMethod>),
44 ReceivingContentBody(Should<S>, Should<(DeliverMethod, ContentHeaderPayload)>),
45}
46
47
48impl<S> Future for Delivered<S>
49where
50 S: Stream<Item = Frame>,
51 S::Error: From<Error>,
52{
53 type Item = (DeliveredItem, S);
54 type Error = S::Error;
55
56 fn poll(&mut self) -> Poll<(DeliveredItem, S), S::Error> {
57
58 use self::Delivered::*;
59 *self = match self {
60
61 &mut ReceivingDeliverMethod(ref mut socket) => {
62 let frame = try_stream_ready!(socket.as_mut().poll());
63
64 let is_deliver = frame.method().and_then(|m| m.basic()).and_then(
65 |c| c.deliver(),
66 );
67 let deliver = match is_deliver {
68 Some(del) => del.clone(),
69 None => {
70 return Err(S::Error::from(Error::from(
71 ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
72 )))
73 }
74 };
75 info!("Deliver method is received : {:?}", deliver);
76 ReceivingContentHeader(Should::new(socket.take()), Should::new(deliver))
77 }
78
79 &mut ReceivingContentHeader(ref mut socket, ref mut deliver) => {
80 let frame = try_stream_ready!(socket.as_mut().poll());
81 let header = match frame.content_header() {
82 Some(ch) => ch.clone(),
83 None => {
84 return Err(S::Error::from(Error::from(
85 ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
86 )))
87 }
88 };
89 info!("Content header is received : {:?}", header);
90
91 ReceivingContentBody(
92 Should::new(socket.take()),
93 Should::new((deliver.take(), header)),
94 )
95 }
96
97 &mut ReceivingContentBody(ref mut socket, ref mut piece) => {
98 let frame = try_stream_ready!(socket.as_mut().poll());
99 let body = match frame.content_body() {
100 Some(cb) => cb.clone(),
101 None => {
102 return Err(S::Error::from(Error::from(
103 ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
104 )))
105 }
106 };
107
108 info!("Content body is received : {:?}", body);
109 let (meta, header) = piece.take();
110 let item = DeliveredItem {
111 meta: meta,
112 header: header,
113 body: body,
114 };
115 return Ok(Async::Ready((item, socket.take())));
116 }
117 };
118
119 self.poll()
120 }
121}
122