1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::queue::{QueueClass, DeclareMethod};
pub use amqpr_codec::method::queue::DeclareOkMethod as DeclareResult;
use futures::{Future, Stream, Sink, Poll, Async};
use futures::sink::Send;
use std::collections::HashMap;
use common::Should;
use errors::*;
pub fn declare_queue<S, E>(
channel_id: u16,
socket: S,
option: DeclareQueueOption,
) -> QueueDeclared<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
let declare = DeclareMethod {
reserved1: 0,
queue: option.name,
passive: option.is_passive,
durable: option.is_durable,
exclusive: option.is_exclusive,
auto_delete: option.is_auto_delete,
no_wait: false,
arguments: HashMap::new(),
};
let frame = Frame {
header: FrameHeader { channel: channel_id },
payload: FramePayload::Method(MethodPayload::Queue(QueueClass::Declare(declare))),
};
QueueDeclared::Sending(socket.send(frame))
}
pub enum QueueDeclared<S, E>
where
S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
Sending(Send<S>),
Receiveing(Should<S>),
}
impl<S, E> Future for QueueDeclared<S, E>
where
S: Stream<Item = Frame, Error = E>
+ Sink<SinkItem = Frame, SinkError = E>,
E: From<Error>,
{
type Item = (DeclareResult, S);
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use self::QueueDeclared::*;
let state = match self {
&mut Sending(ref mut sending) => {
let socket = try_ready!(sending.poll());
Receiveing(Should::new(socket))
}
&mut Receiveing(ref mut socket) => {
let frame = try_stream_ready!(socket.as_mut().poll());
let dec_ok = match frame.method().and_then(|m| m.queue()).and_then(
|c| c.declare_ok(),
) {
Some(dec_ok) => dec_ok.clone(),
None => {
return Err(E::from(Error::from(ErrorKind::UnexpectedFrame(
"DeclareOk".into(),
frame.clone(),
))))
}
};
debug!("Receive declare-ok response");
return Ok(Async::Ready((dec_ok, socket.take())));
}
};
*self = state;
self.poll()
}
}
#[derive(Clone, Debug)]
pub struct DeclareQueueOption {
pub name: AmqpString,
pub is_passive: bool,
pub is_durable: bool,
pub is_exclusive: bool,
pub is_auto_delete: bool,
}