amqpr_api/basic/
publish.rs1use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
2use amqpr_codec::content_body::ContentBodyPayload;
3use amqpr_codec::content_header::{ContentHeaderPayload, Properties};
4use amqpr_codec::method::MethodPayload;
5use amqpr_codec::method::basic::{BasicClass, PublishMethod};
6
7use bytes::Bytes;
8
9use futures::{Future, Sink, Poll, Async};
10use futures::sink::Send;
11
12use common::Should;
13
14
15pub fn publish<S>(channel_id: u16, socket: S, item: PublishItem) -> Published<S>
19where
20 S: Sink<SinkItem = Frame>,
21{
22 let (meta, header, body) = (item.meta, item.header, item.body);
23
24 let declare = PublishMethod {
25 reserved1: 0,
26 exchange: meta.exchange,
27 routing_key: meta.routing_key,
28 mandatory: meta.is_mandatory,
29 immediate: meta.is_immediate,
30 };
31
32 let frame = Frame {
33 header: FrameHeader { channel: channel_id },
34 payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Publish(declare))),
35 };
36
37 debug!("Sending publish method : {:?}", frame);
38
39 Published {
40 state: SendingContentState::SendingPublishMethod(
41 socket.send(frame),
42 Should::new(header),
43 Should::new(body),
44 ),
45 channel_id: channel_id,
46 }
47}
48
49
50
51#[derive(Clone, Debug)]
53pub struct PublishOption {
54 pub exchange: AmqpString,
55 pub routing_key: AmqpString,
56 pub is_mandatory: bool,
57 pub is_immediate: bool,
58}
59
60
61#[derive(Clone, Debug)]
62pub struct PublishItem {
63 pub meta: PublishOption,
64 pub header: Properties,
65 pub body: Bytes,
66}
67
68
69
70pub struct Published<S>
72where
73 S: Sink<SinkItem = Frame>,
74{
75 state: SendingContentState<S>,
76 channel_id: u16,
77}
78
79pub enum SendingContentState<S>
80where
81 S: Sink<SinkItem = Frame>,
82{
83 SendingPublishMethod(Send<S>, Should<Properties>, Should<Bytes>),
84 SendingContentHeader(Send<S>, Should<Bytes>),
85 SendingContentBody(Send<S>),
86}
87
88
89impl<S> Future for Published<S>
90where
91 S: Sink<SinkItem = Frame>,
92{
93 type Item = S;
94 type Error = S::SinkError;
95
96 fn poll(&mut self) -> Poll<S, S::SinkError> {
97
98 use self::SendingContentState::*;
99 self.state = match &mut self.state {
100 &mut SendingPublishMethod(ref mut sending, ref mut properties, ref mut bytes) => {
101 let socket = try_ready!(sending.poll());
102 let header = ContentHeaderPayload {
103 class_id: 60,
104 body_size: bytes.as_ref().len() as u64,
105 properties: properties.take(),
106 };
107 let frame = Frame {
108 header: FrameHeader { channel: self.channel_id },
109 payload: FramePayload::ContentHeader(header),
110 };
111 debug!("Sent publish method");
112 SendingContentHeader(socket.send(frame), bytes.clone())
113 }
114
115 &mut SendingContentHeader(ref mut sending, ref mut bytes) => {
116 let socket = try_ready!(sending.poll());
117 let frame = {
118 let payload = ContentBodyPayload { bytes: bytes.take() };
119 Frame {
120 header: FrameHeader { channel: self.channel_id },
121 payload: FramePayload::ContentBody(payload),
122 }
123 };
124 debug!("Sent content header");
125 SendingContentBody(socket.send(frame))
126 }
127
128 &mut SendingContentBody(ref mut sending) => {
129 let socket = try_ready!(sending.poll());
130 debug!("Sent content body");
131 return Ok(Async::Ready(socket));
132 }
133 };
134
135 self.poll()
136 }
137}
138