use super::{ChannelMessage, ConnectionBlockedNotification, ConsumerMessage, IoLoopMessage};
use crate::errors::*;
use crate::serialize::{IntoAmqpClass, OutputBuffer, TryFromAmqpClass};
use crate::{AmqpProperties, Confirm, Error, Get, Return};
use amq_protocol::protocol::basic::AMQPMethod as AmqpBasic;
use amq_protocol::protocol::basic::Consume;
use amq_protocol::protocol::basic::Get as AmqpGet;
use amq_protocol::protocol::connection::AMQPMethod as AmqpConnection;
use amq_protocol::protocol::connection::Close as ConnectionClose;
use amq_protocol::protocol::connection::CloseOk as ConnectionCloseOk;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use crossbeam_channel::Sender as CrossbeamSender;
use log::error;
use mio_extras::channel::SyncSender as MioSyncSender;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::result::Result as StdResult;
pub(super) struct IoLoopHandle {
channel_id: u16,
buf: OutputBuffer,
tx: MioSyncSender<IoLoopMessage>,
rx: CrossbeamReceiver<Result<ChannelMessage>>,
}
impl fmt::Debug for IoLoopHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
write!(f, "IoLoopHandle {{ channel_id: {}, .. }}", self.channel_id)
}
}
impl IoLoopHandle {
pub(super) fn new(
channel_id: u16,
tx: MioSyncSender<IoLoopMessage>,
rx: CrossbeamReceiver<Result<ChannelMessage>>,
) -> IoLoopHandle {
IoLoopHandle {
channel_id,
buf: OutputBuffer::empty(),
tx,
rx,
}
}
#[inline]
pub(super) fn channel_id(&self) -> u16 {
self.channel_id
}
fn make_buf<M: IntoAmqpClass>(&mut self, method: M) -> OutputBuffer {
debug_assert!(self.buf.is_empty());
self.buf.push_method(self.channel_id, method);
self.buf.drain_into_new_buf()
}
pub(super) fn set_return_handler(
&mut self,
handler: Option<CrossbeamSender<Return>>,
) -> Result<()> {
self.send(IoLoopMessage::SetReturnHandler(handler))
}
pub(super) fn set_pub_confirm_handler(
&mut self,
handler: Option<CrossbeamSender<Confirm>>,
) -> Result<()> {
self.send(IoLoopMessage::SetPubConfirmHandler(handler))
}
pub(super) fn get(&mut self, get: AmqpGet) -> Result<Option<Get>> {
let buf = self.make_buf(AmqpBasic::Get(get));
self.send(IoLoopMessage::Send(buf))?;
match self.recv()? {
ChannelMessage::GetOk(get) => Ok(*get),
ChannelMessage::Method(_) | ChannelMessage::ConsumeOk(_, _) => FrameUnexpectedSnafu.fail(),
}
}
pub(super) fn consume(
&mut self,
consume: Consume,
) -> Result<(String, CrossbeamReceiver<ConsumerMessage>)> {
let buf = self.make_buf(AmqpBasic::Consume(consume));
self.send(IoLoopMessage::Send(buf))?;
match self.recv()? {
ChannelMessage::ConsumeOk(tag, rx) => Ok((tag, rx)),
ChannelMessage::Method(_) | ChannelMessage::GetOk(_) => FrameUnexpectedSnafu.fail(),
}
}
pub(super) fn call_connection_close(
&mut self,
close: ConnectionClose,
) -> Result<ConnectionCloseOk> {
let buf = self.make_buf(AmqpConnection::Close(close));
self.call_message(IoLoopMessage::ConnectionClose(buf))
}
pub(super) fn call<M: IntoAmqpClass, T: TryFromAmqpClass>(&mut self, method: M) -> Result<T> {
let buf = self.make_buf(method);
self.call_message(IoLoopMessage::Send(buf))
}
fn call_message<T: TryFromAmqpClass>(&mut self, message: IoLoopMessage) -> Result<T> {
self.send(message)?;
match self.recv()? {
ChannelMessage::Method(method) => T::try_from(method),
ChannelMessage::ConsumeOk(_, _) | ChannelMessage::GetOk(_) => FrameUnexpectedSnafu.fail(),
}
}
pub(super) fn call_nowait<M: IntoAmqpClass>(&mut self, method: M) -> Result<()> {
let buf = self.make_buf(method);
self.send(IoLoopMessage::Send(buf))
}
pub(super) fn send_content_header(
&mut self,
class_id: u16,
len: usize,
properties: &AmqpProperties,
) -> Result<()> {
debug_assert!(self.buf.is_empty());
self.buf
.push_content_header(self.channel_id, class_id, len, properties);
let buf = self.buf.drain_into_new_buf();
self.send(IoLoopMessage::Send(buf))
}
pub(super) fn send_content_body(&mut self, content: &[u8]) -> Result<()> {
debug_assert!(self.buf.is_empty());
self.buf.push_content_body(self.channel_id, content);
let buf = self.buf.drain_into_new_buf();
self.send(IoLoopMessage::Send(buf))
}
fn send(&mut self, message: IoLoopMessage) -> Result<()> {
self.tx
.send(message)
.map_err(|_| self.check_recv_for_error())
}
fn recv(&mut self) -> Result<ChannelMessage> {
self.rx.recv().map_err(|_| Error::EventLoopDropped)?
}
fn check_recv_for_error(&mut self) -> Error {
match self.recv() {
Ok(_) => {
error!("internal error - received unexpected frame after I/O thread disappeared");
Error::FrameUnexpected
}
Err(err) => err,
}
}
}
pub(super) struct IoLoopHandle0 {
common: IoLoopHandle,
set_blocked_tx: MioSyncSender<CrossbeamSender<ConnectionBlockedNotification>>,
alloc_chan_req_tx: MioSyncSender<Option<u16>>,
alloc_chan_rep_rx: CrossbeamReceiver<Result<IoLoopHandle>>,
}
impl fmt::Debug for IoLoopHandle0 {
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
write!(f, "IoLoopHandle0 {{ .. }}")
}
}
impl IoLoopHandle0 {
pub(super) fn new(
common: IoLoopHandle,
set_blocked_tx: MioSyncSender<CrossbeamSender<ConnectionBlockedNotification>>,
alloc_chan_req_tx: MioSyncSender<Option<u16>>,
alloc_chan_rep_rx: CrossbeamReceiver<Result<IoLoopHandle>>,
) -> IoLoopHandle0 {
IoLoopHandle0 {
common,
set_blocked_tx,
alloc_chan_req_tx,
alloc_chan_rep_rx,
}
}
pub(super) fn allocate_channel(&mut self, channel_id: Option<u16>) -> Result<IoLoopHandle> {
self.alloc_chan_req_tx
.send(channel_id)
.map_err(|_| self.common.check_recv_for_error())?;
self.alloc_chan_rep_rx
.recv()
.map_err(|_| Error::EventLoopDropped)?
}
pub(super) fn set_blocked_tx(
&mut self,
tx: CrossbeamSender<ConnectionBlockedNotification>,
) -> Result<()> {
self.set_blocked_tx
.send(tx)
.map_err(|_| self.common.check_recv_for_error())
}
}
impl Deref for IoLoopHandle0 {
type Target = IoLoopHandle;
fn deref(&self) -> &IoLoopHandle {
&self.common
}
}
impl DerefMut for IoLoopHandle0 {
fn deref_mut(&mut self) -> &mut IoLoopHandle {
&mut self.common
}
}