1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use amqpr_codec::{Frame, FrameHeader, FramePayload};
use amqpr_codec::content_body::ContentBodyPayload;
use amqpr_codec::content_header::ContentHeaderPayload;
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::basic::{BasicClass, PublishMethod};
use bytes::Bytes;
use futures::{Future, Sink, Poll, Async};
use futures::sink::Send;
use common::Should;
pub fn publish<S>(sink: S, channel_id: u16, bytes: Bytes, option: PublishOption) -> Published<S>
where
S: Sink<SinkItem = Frame>,
{
let declare = PublishMethod {
reserved1: 0,
exchange: option.exchange,
routing_key: option.routing_key,
mandatory: option.is_mandatory,
immediate: option.is_immediate,
};
let frame = Frame {
header: FrameHeader { channel: channel_id },
payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Publish(declare))),
};
debug!("Sending publish method : {:?}", frame);
Published {
state: SendingContentState::SendingPublishMethod(sink.send(frame)),
bytes: Should::new(bytes),
channel_id: channel_id,
}
}
#[derive(Clone, Debug)]
pub struct PublishOption {
pub exchange: String,
pub routing_key: String,
pub is_mandatory: bool,
pub is_immediate: bool,
}
pub struct Published<S>
where
S: Sink<SinkItem = Frame>,
{
state: SendingContentState<S>,
bytes: Should<Bytes>,
channel_id: u16,
}
pub enum SendingContentState<S>
where
S: Sink<SinkItem = Frame>,
{
SendingPublishMethod(Send<S>),
SendingContentHeader(Send<S>),
SendingContentBody(Send<S>),
}
impl<S> Future for Published<S>
where
S: Sink<SinkItem = Frame>,
{
type Item = S;
type Error = S::SinkError;
fn poll(&mut self) -> Poll<S, S::SinkError> {
use self::SendingContentState::*;
self.state = match &mut self.state {
&mut SendingPublishMethod(ref mut sending) => {
let socket = try_ready!(sending);
let frame = Frame {
header: FrameHeader { channel: self.channel_id },
payload: FramePayload::ContentHeader(ContentHeaderPayload {
class_id: 60,
body_size: self.bytes.as_ref().len() as u64,
property_flags: 1,
}),
};
debug!("Sending content header : {:?}", frame);
SendingContentHeader(socket.send(frame))
}
&mut SendingContentHeader(ref mut sending) => {
let socket = try_ready!(sending);
let frame = {
let payload = ContentBodyPayload { bytes: self.bytes.take() };
Frame {
header: FrameHeader { channel: self.channel_id },
payload: FramePayload::ContentBody(payload),
}
};
debug!("Sending content body : {:?}", frame);
SendingContentBody(socket.send(frame))
}
&mut SendingContentBody(ref mut sending) => {
return Ok(Async::Ready(try_ready!(sending)));
}
};
self.poll()
}
}