use crate::errors::*;
use crate::{Confirm, ConfirmPayload, Return};
use amq_protocol::frame::AMQPFrame;
use amq_protocol::protocol::basic::AMQPMethod as AmqpBasic;
use amq_protocol::protocol::basic::CancelOk;
use amq_protocol::protocol::channel::AMQPMethod as AmqpChannel;
use amq_protocol::protocol::channel::CloseOk as ChannelCloseOk;
use amq_protocol::protocol::confirm::AMQPMethod as AmqpConfirm;
use amq_protocol::protocol::connection::AMQPMethod as AmqpConnection;
use amq_protocol::protocol::connection::Close as ConnectionClose;
use amq_protocol::protocol::connection::CloseOk as ConnectionCloseOk;
use amq_protocol::protocol::exchange::AMQPMethod as AmqpExchange;
use amq_protocol::protocol::queue::AMQPMethod as AmqpQueue;
use amq_protocol::protocol::{AMQPClass, AMQPHardError};
use crossbeam_channel::{Sender, TrySendError};
use log::{debug, error, trace, warn};
use snafu::OptionExt;
use std::collections::hash_map::Entry;
use super::content_collector::CollectorResult;
use super::{
Channel0Slot, ChannelMessage, ChannelSlot, ConnectionBlockedNotification, ConsumerMessage,
Inner,
};
#[allow(clippy::large_enum_variant)]
pub(super) enum ConnectionState {
Steady(Channel0Slot),
ServerClosing(ConnectionClose),
ClientException,
ClientClosed,
}
fn slot_remove(inner: &mut Inner, channel_id: u16) -> Result<ChannelSlot> {
inner
.chan_slots
.remove(channel_id)
.context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}
fn slot_get(inner: &mut Inner, channel_id: u16) -> Result<&ChannelSlot> {
inner
.chan_slots
.get(channel_id)
.context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}
fn slot_get_mut(inner: &mut Inner, channel_id: u16) -> Result<&mut ChannelSlot> {
inner
.chan_slots
.get_mut(channel_id)
.context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}
fn send<T: Send + Sync + 'static>(tx: &Sender<T>, item: T) -> Result<()> {
match tx.try_send(item) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
error!("internal error - bounded channel is unexpectedly full");
FrameUnexpectedSnafu.fail()
}
Err(TrySendError::Disconnected(_)) => {
error!("internal error - channel client dropped without being disconnected");
EventLoopClientDroppedSnafu.fail()
}
}
}
fn try_send_return(slot: &mut ChannelSlot, return_: Return) {
let return_ = if let Some(tx) = &slot.return_handler {
match tx.try_send(return_) {
Ok(()) => return,
Err(TrySendError::Full(return_)) | Err(TrySendError::Disconnected(return_)) => {
slot.return_handler = None;
return_
}
}
} else {
return_
};
warn!("discarding returned data {:?}", return_);
}
fn try_send_confirm(slot: &mut ChannelSlot, confirm: Confirm) {
let confirm = if let Some(tx) = &slot.pub_confirm_handler {
match tx.try_send(confirm) {
Ok(()) => return,
Err(TrySendError::Full(confirm)) | Err(TrySendError::Disconnected(confirm)) => {
slot.pub_confirm_handler = None;
confirm
}
}
} else {
confirm
};
warn!("discarding returned data {:?}", confirm);
}
fn try_send_blocked(slot: &mut Channel0Slot, note: ConnectionBlockedNotification) {
if let Some(tx) = &slot.blocked_tx {
match tx.try_send(note) {
Ok(()) => (),
Err(_) => {
slot.blocked_tx = None;
}
}
}
}
impl ConnectionState {
fn client_exception(
&mut self,
inner: &mut Inner,
reply_code: AMQPHardError,
reply_text: String,
) -> Result<()> {
error!("{} - closing connection", reply_text);
let close = ConnectionClose {
reply_code: reply_code.get_id(),
reply_text,
class_id: 0,
method_id: 0,
};
inner.push_method(0, AmqpConnection::Close(close));
inner.seal_writes();
*self = ConnectionState::ClientException;
Ok(())
}
pub(super) fn process(&mut self, inner: &mut Inner, frame: AMQPFrame) -> Result<()> {
let ch0_slot = match self {
ConnectionState::Steady(ch0_slot) => ch0_slot,
ConnectionState::ClientException => return Ok(()),
ConnectionState::ServerClosing(_) | ConnectionState::ClientClosed => {
return FrameUnexpectedSnafu.fail();
}
};
match frame {
AMQPFrame::Heartbeat(0) => {
debug!("received heartbeat");
}
AMQPFrame::ProtocolHeader | AMQPFrame::Heartbeat(_) => return FrameUnexpectedSnafu.fail(),
AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Close(close))) => {
inner.push_method(0, AmqpConnection::CloseOk(ConnectionCloseOk {}));
inner.seal_writes();
let reply_code = close.reply_code;
let message = close.reply_text.clone();
let make_err = || Error::ServerClosedConnection {
code: reply_code,
message: message.clone(),
};
*self = ConnectionState::ServerClosing(close);
for (_, mut slot) in inner.chan_slots.drain() {
send(&slot.tx, Err(make_err()))?;
for (_, tx) in slot.consumers.drain() {
send(&tx, ConsumerMessage::ServerClosedConnection(make_err()))?;
}
}
}
AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::CloseOk(close_ok))) => {
ch0_slot
.common
.tx
.send(Ok(ChannelMessage::Method(AMQPClass::Connection(
AmqpConnection::CloseOk(close_ok),
))))
.map_err(|_| Error::EventLoopClientDropped)?;
*self = ConnectionState::ClientClosed;
for (_, mut slot) in inner.chan_slots.drain() {
send(&slot.tx, Err(Error::ClientClosedConnection))?;
for (_, tx) in slot.consumers.drain() {
send(&tx, ConsumerMessage::ClientClosedConnection)?;
}
}
}
AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Blocked(blocked))) => {
warn!("server has blocked connection; reason = {}", blocked.reason);
let note = ConnectionBlockedNotification::Blocked(blocked.reason);
try_send_blocked(ch0_slot, note);
}
AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Unblocked(_))) => {
warn!("server has unblocked connection");
let note = ConnectionBlockedNotification::Unblocked;
try_send_blocked(ch0_slot, note);
}
AMQPFrame::Method(0, other) => {
let text = format!("do not know how to handle channel 0 method {:?}", other);
self.client_exception(inner, AMQPHardError::NOTIMPLEMENTED, text)?;
}
AMQPFrame::Header(0, _, _) | AMQPFrame::Body(0, _) => {
let text = format!("received illegal channel 0 frame {:?}", frame);
self.client_exception(inner, AMQPHardError::NOTALLOWED, text)?;
}
AMQPFrame::Method(n, AMQPClass::Channel(AmqpChannel::Close(close))) => {
warn!("server closing channel {}: {:?}", n, close);
let mut slot = slot_remove(inner, n)?;
let make_err = || Error::ServerClosedChannel {
channel_id: n,
code: close.reply_code,
message: close.reply_text.clone(),
};
send(&slot.tx, Err(make_err()))?;
for (_, tx) in slot.consumers.drain() {
send(&tx, ConsumerMessage::ServerClosedChannel(make_err()))?;
}
inner.push_method(n, AmqpChannel::CloseOk(ChannelCloseOk {}));
}
AMQPFrame::Method(n, AMQPClass::Channel(AmqpChannel::CloseOk(close_ok))) => {
if let Ok(mut slot) = slot_remove(inner, n) {
send(
&slot.tx,
Ok(ChannelMessage::Method(AMQPClass::Channel(
AmqpChannel::CloseOk(close_ok),
))),
)?;
for (_, tx) in slot.consumers.drain() {
send(&tx, ConsumerMessage::ClientClosedChannel)?;
}
}
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::ConsumeOk(consume_ok))) => {
let consumer_tag = consume_ok.consumer_tag;
let slot = slot_get_mut(inner, n)?;
match slot.consumers.entry(consumer_tag.clone()) {
Entry::Occupied(_) => {
return DuplicateConsumerTagSnafu {
channel_id: n,
consumer_tag,
}
.fail();
}
Entry::Vacant(entry) => {
let (tx, rx) = crossbeam_channel::unbounded();
entry.insert(tx);
send(&slot.tx, Ok(ChannelMessage::ConsumeOk(consumer_tag, rx)))?;
}
}
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Cancel(cancel))) => {
let consumer_tag = cancel.consumer_tag;
let slot = slot_get_mut(inner, n)?;
if let Some(tx) = slot.consumers.remove(&consumer_tag) {
send(&tx, ConsumerMessage::ServerCancelled)?;
}
if !cancel.nowait {
inner.push_method(n, AmqpBasic::CancelOk(CancelOk { consumer_tag }));
}
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::CancelOk(cancel_ok))) => {
let slot = slot_get_mut(inner, n)?;
let consumer = slot.consumers.remove(&cancel_ok.consumer_tag);
send(
&slot.tx,
Ok(ChannelMessage::Method(AMQPClass::Basic(
AmqpBasic::CancelOk(cancel_ok),
))),
)?;
if let Some(tx) = consumer {
send(&tx, ConsumerMessage::ClientCancelled)?;
}
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Deliver(deliver))) => {
let slot = slot_get_mut(inner, n)?;
slot.collector.collect_deliver(deliver)?;
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Return(return_))) => {
let slot = slot_get_mut(inner, n)?;
slot.collector.collect_return(return_)?;
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::GetOk(get_ok))) => {
let slot = slot_get_mut(inner, n)?;
slot.collector.collect_get(get_ok)?;
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::GetEmpty(_))) => {
let slot = slot_get(inner, n)?;
send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(None))))?;
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Ack(ack))) => {
let slot = slot_get_mut(inner, n)?;
let confirm = ConfirmPayload {
delivery_tag: ack.delivery_tag,
multiple: ack.multiple,
};
try_send_confirm(slot, Confirm::Ack(confirm));
}
AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Nack(nack))) => {
let slot = slot_get_mut(inner, n)?;
let confirm = ConfirmPayload {
delivery_tag: nack.delivery_tag,
multiple: nack.multiple,
};
try_send_confirm(slot, Confirm::Nack(confirm));
}
AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::QosOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::RecoverOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::OpenOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Confirm(AmqpConfirm::SelectOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::DeclareOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::DeleteOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::BindOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::UnbindOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::DeclareOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::DeleteOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::BindOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::PurgeOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::UnbindOk(_))) => {
let slot = slot_get(inner, n)?;
trace!(
"trying to send method to client for channel {}: {:?}",
n,
method
);
send(&slot.tx, Ok(ChannelMessage::Method(method)))?;
}
AMQPFrame::Method(n, method @ AMQPClass::Access(_))
| AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::Flow(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::FlowOk(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Tx(_)) => {
let text = format!(
"do not know how to handle channel {} method {:?}",
n, method
);
self.client_exception(inner, AMQPHardError::NOTIMPLEMENTED, text)?;
}
AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Qos(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Consume(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Get(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Publish(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Recover(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::RecoverAsync(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Reject(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::Open(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Confirm(AmqpConfirm::Select(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Connection(_))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Declare(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Delete(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Bind(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Unbind(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Declare(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Delete(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Bind(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Purge(_)))
| AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Unbind(_))) => {
let text = format!("illegal channel {} method {:?}", n, method);
self.client_exception(inner, AMQPHardError::NOTALLOWED, text)?;
}
AMQPFrame::Header(n, _, header) => {
let slot = slot_get_mut(inner, n)?;
if let Some(collected) = slot.collector.collect_header(*header)? {
match collected {
CollectorResult::Delivery((consumer_tag, delivery)) => {
let tx =
slot.consumers
.get(&consumer_tag)
.context(UnknownConsumerTagSnafu {
channel_id: n,
consumer_tag,
})?;
send(tx, ConsumerMessage::Delivery(delivery))?;
}
CollectorResult::Return(return_) => {
try_send_return(slot, return_);
}
CollectorResult::Get(get) => {
send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(Some(get)))))?;
}
}
}
}
AMQPFrame::Body(n, body) => {
let slot = slot_get_mut(inner, n)?;
if let Some(collected) = slot.collector.collect_body(body)? {
match collected {
CollectorResult::Delivery((consumer_tag, delivery)) => {
let tx =
slot.consumers
.get(&consumer_tag)
.context(UnknownConsumerTagSnafu {
channel_id: n,
consumer_tag,
})?;
send(tx, ConsumerMessage::Delivery(delivery))?;
}
CollectorResult::Return(return_) => {
try_send_return(slot, return_);
}
CollectorResult::Get(get) => {
send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(Some(get)))))?;
}
}
}
}
}
Ok(())
}
}