1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::queue::{QueueClass, DeclareMethod};
pub use amqpr_codec::method::queue::DeclareOkMethod as DeclareResult;

use futures::{Future, Stream, Sink, Poll, Async};
use futures::sink::Send;

use std::collections::HashMap;

use common::Should;
use errors::*;


/// Declare a queue synchronously.
/// That means we will wait to receive `Declare-Ok` method after send `Declare` method.
pub fn declare_queue<S, E>(
    channel_id: u16,
    socket: S,
    option: DeclareQueueOption,
) -> QueueDeclared<S, E>
where
    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
    E: From<Error>,
{
    let declare = DeclareMethod {
        reserved1: 0,
        queue: option.name,
        passive: option.is_passive,
        durable: option.is_durable,
        exclusive: option.is_exclusive,
        auto_delete: option.is_auto_delete,
        no_wait: false,
        arguments: HashMap::new(),
    };

    let frame = Frame {
        header: FrameHeader { channel: channel_id },
        payload: FramePayload::Method(MethodPayload::Queue(QueueClass::Declare(declare))),
    };

    QueueDeclared::Sending(socket.send(frame))
}



pub enum QueueDeclared<S, E>
where
    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
    E: From<Error>,
{
    Sending(Send<S>),
    Receiveing(Should<S>),
}


impl<S, E> Future for QueueDeclared<S, E>
where
    S: Stream<Item = Frame, Error = E>
        + Sink<SinkItem = Frame, SinkError = E>,
    E: From<Error>,
{
    type Item = (DeclareResult, S);
    type Error = E;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        use self::QueueDeclared::*;

        let state = match self {
            &mut Sending(ref mut sending) => {
                let socket = try_ready!(sending.poll());
                Receiveing(Should::new(socket))
            }
            &mut Receiveing(ref mut socket) => {
                let frame = try_stream_ready!(socket.as_mut().poll());
                let dec_ok = match frame.method().and_then(|m| m.queue()).and_then(
                    |c| c.declare_ok(),
                ) {
                    Some(dec_ok) => dec_ok.clone(),
                    None => {
                        return Err(E::from(Error::from(ErrorKind::UnexpectedFrame(
                            "DeclareOk".into(),
                            frame.clone(),
                        ))))
                    }
                };
                debug!("Receive declare-ok response");

                return Ok(Async::Ready((dec_ok, socket.take())));
            }
        };

        *self = state;

        self.poll()
    }
}


#[derive(Clone, Debug)]
pub struct DeclareQueueOption {
    pub name: AmqpString,
    pub is_passive: bool,
    pub is_durable: bool,
    pub is_exclusive: bool,
    pub is_auto_delete: bool,
}