use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
use amqpr_codec::content_body::ContentBodyPayload;
use amqpr_codec::content_header::{ContentHeaderPayload, Properties};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::basic::{BasicClass, PublishMethod};
use bytes::Bytes;
use futures::{Future, Sink, Poll, Async};
use futures::sink::Send;
use common::Should;
pub fn publish<S>(channel_id: u16, socket: S, item: PublishItem) -> Published<S>
where
S: Sink<SinkItem = Frame>,
{
let (meta, header, body) = (item.meta, item.header, item.body);
let declare = PublishMethod {
reserved1: 0,
exchange: meta.exchange,
routing_key: meta.routing_key,
mandatory: meta.is_mandatory,
immediate: meta.is_immediate,
};
let frame = Frame {
header: FrameHeader { channel: channel_id },
payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Publish(declare))),
};
debug!("Sending publish method : {:?}", frame);
Published {
state: SendingContentState::SendingPublishMethod(
socket.send(frame),
Should::new(header),
Should::new(body),
),
channel_id: channel_id,
}
}
#[derive(Clone, Debug)]
pub struct PublishOption {
pub exchange: AmqpString,
pub routing_key: AmqpString,
pub is_mandatory: bool,
pub is_immediate: bool,
}
#[derive(Clone, Debug)]
pub struct PublishItem {
pub meta: PublishOption,
pub header: Properties,
pub body: Bytes,
}
pub struct Published<S>
where
S: Sink<SinkItem = Frame>,
{
state: SendingContentState<S>,
channel_id: u16,
}
pub enum SendingContentState<S>
where
S: Sink<SinkItem = Frame>,
{
SendingPublishMethod(Send<S>, Should<Properties>, Should<Bytes>),
SendingContentHeader(Send<S>, Should<Bytes>),
SendingContentBody(Send<S>),
}
impl<S> Future for Published<S>
where
S: Sink<SinkItem = Frame>,
{
type Item = S;
type Error = S::SinkError;
fn poll(&mut self) -> Poll<S, S::SinkError> {
use self::SendingContentState::*;
self.state = match &mut self.state {
&mut SendingPublishMethod(ref mut sending, ref mut properties, ref mut bytes) => {
let socket = try_ready!(sending.poll());
let header = ContentHeaderPayload {
class_id: 60,
body_size: bytes.as_ref().len() as u64,
properties: properties.take(),
};
let frame = Frame {
header: FrameHeader { channel: self.channel_id },
payload: FramePayload::ContentHeader(header),
};
debug!("Sent publish method");
SendingContentHeader(socket.send(frame), bytes.clone())
}
&mut SendingContentHeader(ref mut sending, ref mut bytes) => {
let socket = try_ready!(sending.poll());
let frame = {
let payload = ContentBodyPayload { bytes: bytes.take() };
Frame {
header: FrameHeader { channel: self.channel_id },
payload: FramePayload::ContentBody(payload),
}
};
debug!("Sent content header");
SendingContentBody(socket.send(frame))
}
&mut SendingContentBody(ref mut sending) => {
let socket = try_ready!(sending.poll());
debug!("Sent content body");
return Ok(Async::Ready(socket));
}
};
self.poll()
}
}