amqpr_api/basic/
publish.rs

1use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
2use amqpr_codec::content_body::ContentBodyPayload;
3use amqpr_codec::content_header::{ContentHeaderPayload, Properties};
4use amqpr_codec::method::MethodPayload;
5use amqpr_codec::method::basic::{BasicClass, PublishMethod};
6
7use bytes::Bytes;
8
9use futures::{Future, Sink, Poll, Async};
10use futures::sink::Send;
11
12use common::Should;
13
14
15/// Publish an item to AMQP server.
16/// If you want to publish a lot number of items, please consider to use `publish_sink` function.
17/// Returned item is `Future` which will be completed when finish to send.
18pub fn publish<S>(channel_id: u16, socket: S, item: PublishItem) -> Published<S>
19where
20    S: Sink<SinkItem = Frame>,
21{
22    let (meta, header, body) = (item.meta, item.header, item.body);
23
24    let declare = PublishMethod {
25        reserved1: 0,
26        exchange: meta.exchange,
27        routing_key: meta.routing_key,
28        mandatory: meta.is_mandatory,
29        immediate: meta.is_immediate,
30    };
31
32    let frame = Frame {
33        header: FrameHeader { channel: channel_id },
34        payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Publish(declare))),
35    };
36
37    debug!("Sending publish method : {:?}", frame);
38
39    Published {
40        state: SendingContentState::SendingPublishMethod(
41            socket.send(frame),
42            Should::new(header),
43            Should::new(body),
44        ),
45        channel_id: channel_id,
46    }
47}
48
49
50
51/// A meta option of `Publish` message on AMQP.
52#[derive(Clone, Debug)]
53pub struct PublishOption {
54    pub exchange: AmqpString,
55    pub routing_key: AmqpString,
56    pub is_mandatory: bool,
57    pub is_immediate: bool,
58}
59
60
61#[derive(Clone, Debug)]
62pub struct PublishItem {
63    pub meta: PublishOption,
64    pub header: Properties,
65    pub body: Bytes,
66}
67
68
69
70// Published struct {{{
71pub struct Published<S>
72where
73    S: Sink<SinkItem = Frame>,
74{
75    state: SendingContentState<S>,
76    channel_id: u16,
77}
78
79pub enum SendingContentState<S>
80where
81    S: Sink<SinkItem = Frame>,
82{
83    SendingPublishMethod(Send<S>, Should<Properties>, Should<Bytes>),
84    SendingContentHeader(Send<S>, Should<Bytes>),
85    SendingContentBody(Send<S>),
86}
87
88
89impl<S> Future for Published<S>
90where
91    S: Sink<SinkItem = Frame>,
92{
93    type Item = S;
94    type Error = S::SinkError;
95
96    fn poll(&mut self) -> Poll<S, S::SinkError> {
97
98        use self::SendingContentState::*;
99        self.state = match &mut self.state {
100            &mut SendingPublishMethod(ref mut sending, ref mut properties, ref mut bytes) => {
101                let socket = try_ready!(sending.poll());
102                let header = ContentHeaderPayload {
103                    class_id: 60,
104                    body_size: bytes.as_ref().len() as u64,
105                    properties: properties.take(),
106                };
107                let frame = Frame {
108                    header: FrameHeader { channel: self.channel_id },
109                    payload: FramePayload::ContentHeader(header),
110                };
111                debug!("Sent publish method");
112                SendingContentHeader(socket.send(frame), bytes.clone())
113            }
114
115            &mut SendingContentHeader(ref mut sending, ref mut bytes) => {
116                let socket = try_ready!(sending.poll());
117                let frame = {
118                    let payload = ContentBodyPayload { bytes: bytes.take() };
119                    Frame {
120                        header: FrameHeader { channel: self.channel_id },
121                        payload: FramePayload::ContentBody(payload),
122                    }
123                };
124                debug!("Sent content header");
125                SendingContentBody(socket.send(frame))
126            }
127
128            &mut SendingContentBody(ref mut sending) => {
129                let socket = try_ready!(sending.poll());
130                debug!("Sent content body");
131                return Ok(Async::Ready(socket));
132            }
133        };
134
135        self.poll()
136    }
137}
138// }}}