amqpr_api/queue/
declare.rs

1use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
2use amqpr_codec::method::MethodPayload;
3use amqpr_codec::method::queue::{QueueClass, DeclareMethod};
4pub use amqpr_codec::method::queue::DeclareOkMethod as DeclareResult;
5
6use futures::{Future, Stream, Sink, Poll, Async};
7use futures::sink::Send;
8
9use std::collections::HashMap;
10
11use common::Should;
12use errors::*;
13
14
15/// Declare a queue synchronously.
16/// That means we will wait to receive `Declare-Ok` method after send `Declare` method.
17pub fn declare_queue<S, E>(
18    channel_id: u16,
19    socket: S,
20    option: DeclareQueueOption,
21) -> QueueDeclared<S, E>
22where
23    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
24    E: From<Error>,
25{
26    let declare = DeclareMethod {
27        reserved1: 0,
28        queue: option.name,
29        passive: option.is_passive,
30        durable: option.is_durable,
31        exclusive: option.is_exclusive,
32        auto_delete: option.is_auto_delete,
33        no_wait: false,
34        arguments: HashMap::new(),
35    };
36
37    let frame = Frame {
38        header: FrameHeader { channel: channel_id },
39        payload: FramePayload::Method(MethodPayload::Queue(QueueClass::Declare(declare))),
40    };
41
42    QueueDeclared::Sending(socket.send(frame))
43}
44
45
46
47pub enum QueueDeclared<S, E>
48where
49    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
50    E: From<Error>,
51{
52    Sending(Send<S>),
53    Receiveing(Should<S>),
54}
55
56
57impl<S, E> Future for QueueDeclared<S, E>
58where
59    S: Stream<Item = Frame, Error = E>
60        + Sink<SinkItem = Frame, SinkError = E>,
61    E: From<Error>,
62{
63    type Item = (DeclareResult, S);
64    type Error = E;
65
66    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
67        use self::QueueDeclared::*;
68
69        let state = match self {
70            &mut Sending(ref mut sending) => {
71                let socket = try_ready!(sending.poll());
72                Receiveing(Should::new(socket))
73            }
74            &mut Receiveing(ref mut socket) => {
75                let frame = try_stream_ready!(socket.as_mut().poll());
76                let dec_ok = match frame.method().and_then(|m| m.queue()).and_then(
77                    |c| c.declare_ok(),
78                ) {
79                    Some(dec_ok) => dec_ok.clone(),
80                    None => {
81                        return Err(E::from(Error::from(ErrorKind::UnexpectedFrame(
82                            "DeclareOk".into(),
83                            frame.clone(),
84                        ))))
85                    }
86                };
87                debug!("Receive declare-ok response");
88
89                return Ok(Async::Ready((dec_ok, socket.take())));
90            }
91        };
92
93        *self = state;
94
95        self.poll()
96    }
97}
98
99
100#[derive(Clone, Debug)]
101pub struct DeclareQueueOption {
102    pub name: AmqpString,
103    pub is_passive: bool,
104    pub is_durable: bool,
105    pub is_exclusive: bool,
106    pub is_auto_delete: bool,
107}