amqpr_api/basic/
deliver.rs

1use amqpr_codec::Frame;
2use amqpr_codec::content_header::ContentHeaderPayload;
3use amqpr_codec::content_body::ContentBodyPayload;
4use amqpr_codec::frame::method::basic::DeliverMethod;
5
6use futures::{Future, Stream, Poll, Async};
7
8use common::Should;
9use errors::*;
10
11
12/// Get `DeliveredItem` from given stream.
13/// `DeliveredItem` consists of three things; `DeliverMethod`, `ContentHeaderPayload` and
14/// `ContentBodyPayload`.
15///
16/// # Notice
17/// Maybe it is useful to use `subscribe_stream`. That function returns `Stream` of
18/// `DeliveredItem`.
19///
20/// # Error
21/// `Delivered` future might be `Error` when `stream: S` yields unexpected frame.
22pub fn get_delivered<S>(stream: S) -> Delivered<S>
23where
24    S: Stream<Item = Frame>,
25    S::Error: From<Error>,
26{
27    Delivered::ReceivingDeliverMethod(Should::new(stream))
28}
29
30
31/// The value in `Future` being returned by `get_delivered` function.
32#[derive(Debug, Clone)]
33pub struct DeliveredItem {
34    pub meta: DeliverMethod,
35    pub header: ContentHeaderPayload,
36    pub body: ContentBodyPayload,
37}
38
39
40// Delivered struct {{{
41pub enum Delivered<S> {
42    ReceivingDeliverMethod(Should<S>),
43    ReceivingContentHeader(Should<S>, Should<DeliverMethod>),
44    ReceivingContentBody(Should<S>, Should<(DeliverMethod, ContentHeaderPayload)>),
45}
46
47
48impl<S> Future for Delivered<S>
49where
50    S: Stream<Item = Frame>,
51    S::Error: From<Error>,
52{
53    type Item = (DeliveredItem, S);
54    type Error = S::Error;
55
56    fn poll(&mut self) -> Poll<(DeliveredItem, S), S::Error> {
57
58        use self::Delivered::*;
59        *self = match self {
60
61            &mut ReceivingDeliverMethod(ref mut socket) => {
62                let frame = try_stream_ready!(socket.as_mut().poll());
63
64                let is_deliver = frame.method().and_then(|m| m.basic()).and_then(
65                    |c| c.deliver(),
66                );
67                let deliver = match is_deliver {
68                    Some(del) => del.clone(),
69                    None => {
70                        return Err(S::Error::from(Error::from(
71                            ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
72                        )))
73                    }
74                };
75                info!("Deliver method is received : {:?}", deliver);
76                ReceivingContentHeader(Should::new(socket.take()), Should::new(deliver))
77            }
78
79            &mut ReceivingContentHeader(ref mut socket, ref mut deliver) => {
80                let frame = try_stream_ready!(socket.as_mut().poll());
81                let header = match frame.content_header() {
82                    Some(ch) => ch.clone(),
83                    None => {
84                        return Err(S::Error::from(Error::from(
85                            ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
86                        )))
87                    }
88                };
89                info!("Content header is received : {:?}", header);
90
91                ReceivingContentBody(
92                    Should::new(socket.take()),
93                    Should::new((deliver.take(), header)),
94                )
95            }
96
97            &mut ReceivingContentBody(ref mut socket, ref mut piece) => {
98                let frame = try_stream_ready!(socket.as_mut().poll());
99                let body = match frame.content_body() {
100                    Some(cb) => cb.clone(),
101                    None => {
102                        return Err(S::Error::from(Error::from(
103                            ErrorKind::UnexpectedFrame("Deliver".into(), frame.clone()),
104                        )))
105                    }
106                };
107
108                info!("Content body is received : {:?}", body);
109                let (meta, header) = piece.take();
110                let item = DeliveredItem {
111                    meta: meta,
112                    header: header,
113                    body: body,
114                };
115                return Ok(Async::Ready((item, socket.take())));
116            }
117        };
118
119        self.poll()
120    }
121}
122// }}}