pub mod options {
use super::*;
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ConnectionOpenOptions {
pub insist: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ChannelFlowOptions {
pub active: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ChannelFlowOkOptions {
pub active: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct AccessRequestOptions {
pub exclusive: Boolean,
pub passive: Boolean,
pub active: Boolean,
pub write: Boolean,
pub read: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ExchangeDeclareOptions {
pub passive: Boolean,
pub durable: Boolean,
pub auto_delete: Boolean,
pub internal: Boolean,
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ExchangeDeleteOptions {
pub if_unused: Boolean,
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ExchangeBindOptions {
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ExchangeUnbindOptions {
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QueueDeclareOptions {
pub passive: Boolean,
pub durable: Boolean,
pub exclusive: Boolean,
pub auto_delete: Boolean,
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QueueBindOptions {
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QueuePurgeOptions {
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QueueDeleteOptions {
pub if_unused: Boolean,
pub if_empty: Boolean,
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicQosOptions {
pub global: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicConsumeOptions {
pub no_local: Boolean,
pub no_ack: Boolean,
pub exclusive: Boolean,
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicCancelOptions {
pub nowait: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicPublishOptions {
pub mandatory: Boolean,
pub immediate: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicDeliverOptions {
pub redelivered: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicGetOptions {
pub no_ack: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicGetOkOptions {
pub redelivered: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicAckOptions {
pub multiple: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicRejectOptions {
pub requeue: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicRecoverAsyncOptions {
pub requeue: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicRecoverOptions {
pub requeue: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BasicNackOptions {
pub multiple: Boolean,
pub requeue: Boolean,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ConfirmSelectOptions {
pub nowait: Boolean,
}
}
use options::*;
#[derive(Debug)]
pub enum Reply {
AwaitingChannelOpenOk(RequestId),
AwaitingChannelOpenOkOk(RequestId),
AwaitingChannelFlowOk(RequestId),
AwaitingChannelFlowOkOk(RequestId),
AwaitingChannelCloseOk(RequestId),
AwaitingChannelCloseOkOk(RequestId),
AwaitingAccessRequestOk(RequestId),
AwaitingAccessRequestOkOk(RequestId),
AwaitingExchangeDeclareOk(RequestId),
AwaitingExchangeDeclareOkOk(RequestId),
AwaitingExchangeDeleteOk(RequestId),
AwaitingExchangeDeleteOkOk(RequestId),
AwaitingExchangeBindOk(RequestId),
AwaitingExchangeBindOkOk(RequestId),
AwaitingExchangeUnbindOk(RequestId),
AwaitingExchangeUnbindOkOk(RequestId),
AwaitingQueueDeclareOk(RequestId),
AwaitingQueueDeclareOkOk(RequestId),
AwaitingQueueBindOk(RequestId),
AwaitingQueueBindOkOk(RequestId),
AwaitingQueuePurgeOk(RequestId),
AwaitingQueuePurgeOkOk(RequestId),
AwaitingQueueDeleteOk(RequestId, String),
AwaitingQueueDeleteOkOk(RequestId, String),
AwaitingQueueUnbindOk(RequestId),
AwaitingQueueUnbindOkOk(RequestId),
AwaitingBasicQosOk(RequestId),
AwaitingBasicQosOkOk(RequestId),
AwaitingBasicConsumeOk(RequestId, String, bool, bool, bool, bool, Box<dyn ConsumerSubscriber>),
AwaitingBasicConsumeOkOk(RequestId, String, bool, bool, bool, bool, Box<dyn ConsumerSubscriber>),
AwaitingBasicCancelOk(RequestId),
AwaitingBasicCancelOkOk(RequestId),
AwaitingPublishConfirm,
AwaitingBasicReturnOk(RequestId),
AwaitingBasicDeliverOk(RequestId),
AwaitingBasicGetOk(RequestId, String),
AwaitingBasicGetOkOk(RequestId, String),
AwaitingBasicGetEmptyOk(RequestId),
AwaitingBasicAckOk(RequestId),
AwaitingBasicRejectOk(RequestId),
AwaitingBasicRecoverAsyncOk(RequestId),
AwaitingBasicRecoverOk(RequestId),
AwaitingBasicRecoverOkOk(RequestId),
AwaitingBasicNackOk(RequestId),
AwaitingTxSelectOk(RequestId),
AwaitingTxSelectOkOk(RequestId),
AwaitingTxCommitOk(RequestId),
AwaitingTxCommitOkOk(RequestId),
AwaitingTxRollbackOk(RequestId),
AwaitingTxRollbackOkOk(RequestId),
AwaitingConfirmSelectOk(RequestId),
AwaitingConfirmSelectOkOk(RequestId),
}
impl Channel {
pub(crate) fn receive_method(&self, method: AMQPClass) -> Result<(), Error> {
match method {
AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk(m)) => self.receive_channel_open_ok(m),
AMQPClass::Channel(protocol::channel::AMQPMethod::Flow(m)) => self.receive_channel_flow(m),
AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk(m)) => self.receive_channel_flow_ok(m),
AMQPClass::Channel(protocol::channel::AMQPMethod::Close(m)) => self.receive_channel_close(m),
AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(m)) => self.receive_channel_close_ok(m),
AMQPClass::Access(protocol::access::AMQPMethod::RequestOk(m)) => self.receive_access_request_ok(m),
AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk(m)) => self.receive_exchange_declare_ok(m),
AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk(m)) => self.receive_exchange_delete_ok(m),
AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk(m)) => self.receive_exchange_bind_ok(m),
AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk(m)) => self.receive_exchange_unbind_ok(m),
AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk(m)) => self.receive_queue_declare_ok(m),
AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk(m)) => self.receive_queue_bind_ok(m),
AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk(m)) => self.receive_queue_purge_ok(m),
AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk(m)) => self.receive_queue_delete_ok(m),
AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk(m)) => self.receive_queue_unbind_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk(m)) => self.receive_basic_qos_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk(m)) => self.receive_basic_consume_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(m)) => self.receive_basic_cancel(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk(m)) => self.receive_basic_cancel_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver(m)) => self.receive_basic_deliver(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk(m)) => self.receive_basic_get_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty(m)) => self.receive_basic_get_empty(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::Ack(m)) => self.receive_basic_ack(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk(m)) => self.receive_basic_recover_ok(m),
AMQPClass::Basic(protocol::basic::AMQPMethod::Nack(m)) => self.receive_basic_nack(m),
AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk(m)) => self.receive_tx_select_ok(m),
AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk(m)) => self.receive_tx_commit_ok(m),
AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk(m)) => self.receive_tx_rollback_ok(m),
AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk(m)) => self.receive_confirm_select_ok(m),
m => {
error!("the client should not receive this method: {:?}", m);
Err(ErrorKind::InvalidMethod(m).into())
}
}
}
pub fn channel_open(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_initializing() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Open (protocol::channel::Open {
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingChannelOpenOk(request_id));
Some(request_id)
}
)
}
pub fn channel_open_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_initializing() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::OpenOk (protocol::channel::OpenOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_channel_open_ok(&self, method: protocol::channel::OpenOk) -> Result<(), Error> {
if !self.status.is_initializing() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingChannelOpenOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_channel_open_ok_received(method)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn channel_flow(&self, options: ChannelFlowOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ChannelFlowOptions {
active,
} = options;
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Flow (protocol::channel::Flow {
active,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingChannelFlowOk(request_id));
Some(request_id)
}
)
}
fn receive_channel_flow(&self, method: protocol::channel::Flow) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_channel_flow_received(method)
}
pub fn channel_flow_ok(&self, options: ChannelFlowOkOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ChannelFlowOkOptions {
active,
} = options;
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::FlowOk (protocol::channel::FlowOk {
active,
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_channel_flow_ok(&self, method: protocol::channel::FlowOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingChannelFlowOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_channel_flow_ok_received(method)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn channel_close(&self, reply_code: ShortUInt, reply_text: &str, class_id: ShortUInt, method_id: ShortUInt) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::Close (protocol::channel::Close {
reply_code: reply_code,
reply_text: reply_text.to_string(),
class_id: class_id,
method_id: method_id,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingChannelCloseOk(request_id));
Some(request_id)
}
)
}
fn receive_channel_close(&self, method: protocol::channel::Close) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_channel_close_received(method)
}
pub fn channel_close_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk (protocol::channel::CloseOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_channel_close_ok(&self, _: protocol::channel::CloseOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingChannelCloseOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_channel_close_ok_received()
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let AccessRequestOptions {
exclusive,
passive,
active,
write,
read,
} = options;
let method = AMQPClass::Access(protocol::access::AMQPMethod::Request (protocol::access::Request {
realm: realm.to_string(),
exclusive,
passive,
active,
write,
read,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingAccessRequestOk(request_id));
Some(request_id)
}
)
}
pub fn access_request_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Access(protocol::access::AMQPMethod::RequestOk (protocol::access::RequestOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_access_request_ok(&self, method: protocol::access::RequestOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingAccessRequestOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_access_request_ok_received(method)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn exchange_declare(&self, exchange: &str, kind: &str, options: ExchangeDeclareOptions, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ExchangeDeclareOptions {
passive,
durable,
auto_delete,
internal,
nowait,
} = options;
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Declare (protocol::exchange::Declare {
exchange: exchange.to_string(),
kind: kind.to_string(),
passive,
durable,
auto_delete,
internal,
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingExchangeDeclareOk(request_id));
Some(request_id)
}
)
}
pub fn exchange_declare_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeclareOk (protocol::exchange::DeclareOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_exchange_declare_ok(&self, _: protocol::exchange::DeclareOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingExchangeDeclareOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn exchange_delete(&self, exchange: &str, options: ExchangeDeleteOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ExchangeDeleteOptions {
if_unused,
nowait,
} = options;
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Delete (protocol::exchange::Delete {
exchange: exchange.to_string(),
if_unused,
nowait,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingExchangeDeleteOk(request_id));
Some(request_id)
}
)
}
pub fn exchange_delete_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::DeleteOk (protocol::exchange::DeleteOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_exchange_delete_ok(&self, _: protocol::exchange::DeleteOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingExchangeDeleteOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn exchange_bind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeBindOptions, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ExchangeBindOptions {
nowait,
} = options;
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Bind (protocol::exchange::Bind {
destination: destination.to_string(),
source: source.to_string(),
routing_key: routing_key.to_string(),
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingExchangeBindOk(request_id));
Some(request_id)
}
)
}
pub fn exchange_bind_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::BindOk (protocol::exchange::BindOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_exchange_bind_ok(&self, _: protocol::exchange::BindOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingExchangeBindOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn exchange_unbind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeUnbindOptions, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ExchangeUnbindOptions {
nowait,
} = options;
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::Unbind (protocol::exchange::Unbind {
destination: destination.to_string(),
source: source.to_string(),
routing_key: routing_key.to_string(),
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingExchangeUnbindOk(request_id));
Some(request_id)
}
)
}
pub fn exchange_unbind_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Exchange(protocol::exchange::AMQPMethod::UnbindOk (protocol::exchange::UnbindOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_exchange_unbind_ok(&self, _: protocol::exchange::UnbindOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingExchangeUnbindOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn queue_declare(&self, queue: &str, options: QueueDeclareOptions, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let QueueDeclareOptions {
passive,
durable,
exclusive,
auto_delete,
nowait,
} = options;
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Declare (protocol::queue::Declare {
queue: queue.to_string(),
passive,
durable,
exclusive,
auto_delete,
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingQueueDeclareOk(request_id));
Some(request_id)
}
)
}
pub fn queue_declare_ok(&self, queue: &str, message_count: LongUInt, consumer_count: LongUInt) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::DeclareOk (protocol::queue::DeclareOk {
queue: queue.to_string(),
message_count: message_count,
consumer_count: consumer_count,
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_queue_declare_ok(&self, method: protocol::queue::DeclareOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingQueueDeclareOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_queue_declare_ok_received(method, request_id)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn queue_bind(&self, queue: &str, exchange: &str, routing_key: &str, options: QueueBindOptions, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let QueueBindOptions {
nowait,
} = options;
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Bind (protocol::queue::Bind {
queue: queue.to_string(),
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingQueueBindOk(request_id));
Some(request_id)
}
)
}
pub fn queue_bind_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::BindOk (protocol::queue::BindOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_queue_bind_ok(&self, _: protocol::queue::BindOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingQueueBindOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn queue_purge(&self, queue: &str, options: QueuePurgeOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let QueuePurgeOptions {
nowait,
} = options;
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Purge (protocol::queue::Purge {
queue: queue.to_string(),
nowait,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingQueuePurgeOk(request_id));
Some(request_id)
}
)
}
pub fn queue_purge_ok(&self, message_count: LongUInt) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::PurgeOk (protocol::queue::PurgeOk {
message_count: message_count,
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_queue_purge_ok(&self, method: protocol::queue::PurgeOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingQueuePurgeOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_queue_purge_ok_received(method)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn queue_delete(&self, queue: &str, options: QueueDeleteOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let QueueDeleteOptions {
if_unused,
if_empty,
nowait,
} = options;
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Delete (protocol::queue::Delete {
queue: queue.to_string(),
if_unused,
if_empty,
nowait,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingQueueDeleteOk(request_id, queue.to_string()));
Some(request_id)
}
)
}
pub fn queue_delete_ok(&self, message_count: LongUInt) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::DeleteOk (protocol::queue::DeleteOk {
message_count: message_count,
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_queue_delete_ok(&self, method: protocol::queue::DeleteOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingQueueDeleteOk(request_id, queue)) => {
self.requests.finish(request_id, true);
self.on_queue_delete_ok_received(method, queue)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn queue_unbind(&self, queue: &str, exchange: &str, routing_key: &str, arguments: FieldTable) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::Unbind (protocol::queue::Unbind {
queue: queue.to_string(),
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingQueueUnbindOk(request_id));
Some(request_id)
}
)
}
pub fn queue_unbind_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Queue(protocol::queue::AMQPMethod::UnbindOk (protocol::queue::UnbindOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_queue_unbind_ok(&self, _: protocol::queue::UnbindOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingQueueUnbindOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_qos(&self, prefetch_count: ShortUInt, options: BasicQosOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicQosOptions {
global,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Qos (protocol::basic::Qos {
prefetch_count: prefetch_count,
global,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingBasicQosOk(request_id));
Some(request_id)
}
)
}
pub fn basic_qos_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::QosOk (protocol::basic::QosOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_qos_ok(&self, _: protocol::basic::QosOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingBasicQosOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_consume(&self, queue: &str, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable, subscriber: Box<dyn ConsumerSubscriber>) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicConsumeOptions {
no_local,
no_ack,
exclusive,
nowait,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Consume (protocol::basic::Consume {
queue: queue.to_string(),
consumer_tag: consumer_tag.to_string(),
no_local,
no_ack,
exclusive,
nowait,
arguments: arguments,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingBasicConsumeOk(request_id, queue.to_string(), no_local, no_ack, exclusive, nowait, subscriber));
Some(request_id)
}
)
}
pub fn basic_consume_ok(&self, consumer_tag: &str) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::ConsumeOk (protocol::basic::ConsumeOk {
consumer_tag: consumer_tag.to_string(),
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_consume_ok(&self, method: protocol::basic::ConsumeOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingBasicConsumeOk(request_id, queue, no_local, no_ack, exclusive, nowait, subscriber)) => {
self.requests.finish(request_id, true);
self.on_basic_consume_ok_received(method, request_id, queue, no_local, no_ack, exclusive, nowait, subscriber)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_cancel(&self, consumer_tag: &str, options: BasicCancelOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicCancelOptions {
nowait,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel (protocol::basic::Cancel {
consumer_tag: consumer_tag.to_string(),
nowait,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingBasicCancelOk(request_id));
Some(request_id)
}
)
}
fn receive_basic_cancel(&self, method: protocol::basic::Cancel) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_basic_cancel_received(method)
}
pub fn basic_cancel_ok(&self, consumer_tag: &str) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::CancelOk (protocol::basic::CancelOk {
consumer_tag: consumer_tag.to_string(),
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_cancel_ok(&self, method: protocol::basic::CancelOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingBasicCancelOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_basic_cancel_ok_received(method)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_publish(&self, exchange: &str, routing_key: &str, options: BasicPublishOptions, payload: Vec<u8>, properties: BasicProperties) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicPublishOptions {
mandatory,
immediate,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Publish (protocol::basic::Publish {
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
mandatory,
immediate,
}));
self.send_method_frame(method);
self.on_basic_publish_sent(60, payload, properties);
Ok(None)
}
pub fn basic_return(&self, reply_code: ShortUInt, reply_text: &str, exchange: &str, routing_key: &str) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Return (protocol::basic::Return {
reply_code: reply_code,
reply_text: reply_text.to_string(),
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
}));
self.send_method_frame(method);
Ok(None)
}
pub fn basic_deliver(&self, consumer_tag: &str, delivery_tag: LongLongUInt, options: BasicDeliverOptions, exchange: &str, routing_key: &str) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicDeliverOptions {
redelivered,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Deliver (protocol::basic::Deliver {
consumer_tag: consumer_tag.to_string(),
delivery_tag: delivery_tag,
redelivered,
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_deliver(&self, method: protocol::basic::Deliver) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_basic_deliver_received(method)
}
pub fn basic_get(&self, queue: &str, options: BasicGetOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicGetOptions {
no_ack,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Get (protocol::basic::Get {
queue: queue.to_string(),
no_ack,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingBasicGetOk(request_id, queue.to_string()));
Some(request_id)
}
)
}
pub fn basic_get_ok(&self, delivery_tag: LongLongUInt, options: BasicGetOkOptions, exchange: &str, routing_key: &str, message_count: LongUInt) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicGetOkOptions {
redelivered,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::GetOk (protocol::basic::GetOk {
delivery_tag: delivery_tag,
redelivered,
exchange: exchange.to_string(),
routing_key: routing_key.to_string(),
message_count: message_count,
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_get_ok(&self, method: protocol::basic::GetOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingBasicGetOk(request_id, queue)) => {
self.requests.finish(request_id, true);
self.on_basic_get_ok_received(method, queue)
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_get_empty(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::GetEmpty (protocol::basic::GetEmpty {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_get_empty(&self, method: protocol::basic::GetEmpty) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_basic_get_empty_received(method)
}
pub fn basic_ack(&self, delivery_tag: LongLongUInt, options: BasicAckOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicAckOptions {
multiple,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Ack (protocol::basic::Ack {
delivery_tag: delivery_tag,
multiple,
}));
self.send_method_frame(method);
self.on_basic_ack_sent(multiple, delivery_tag);
Ok(None)
}
fn receive_basic_ack(&self, method: protocol::basic::Ack) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_basic_ack_received(method)
}
pub fn basic_reject(&self, delivery_tag: LongLongUInt, options: BasicRejectOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicRejectOptions {
requeue,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Reject (protocol::basic::Reject {
delivery_tag: delivery_tag,
requeue,
}));
self.send_method_frame(method);
Ok(None)
}
pub fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicRecoverAsyncOptions {
requeue,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverAsync (protocol::basic::RecoverAsync {
requeue,
}));
self.send_method_frame(method);
self.on_basic_recover_async_sent();
Ok(None)
}
pub fn basic_recover(&self, options: BasicRecoverOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicRecoverOptions {
requeue,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Recover (protocol::basic::Recover {
requeue,
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingBasicRecoverOk(request_id));
Some(request_id)
}
)
}
pub fn basic_recover_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::RecoverOk (protocol::basic::RecoverOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_basic_recover_ok(&self, _: protocol::basic::RecoverOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingBasicRecoverOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_basic_recover_ok_received()
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn basic_nack(&self, delivery_tag: LongLongUInt, options: BasicNackOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let BasicNackOptions {
multiple,
requeue,
} = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Nack (protocol::basic::Nack {
delivery_tag: delivery_tag,
multiple,
requeue,
}));
self.send_method_frame(method);
self.on_basic_nack_sent(multiple, delivery_tag);
Ok(None)
}
fn receive_basic_nack(&self, method: protocol::basic::Nack) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
self.on_basic_nack_received(method)
}
pub fn tx_select(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Select (protocol::tx::Select {
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingTxSelectOk(request_id));
Some(request_id)
}
)
}
pub fn tx_select_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::SelectOk (protocol::tx::SelectOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_tx_select_ok(&self, _: protocol::tx::SelectOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingTxSelectOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn tx_commit(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Commit (protocol::tx::Commit {
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingTxCommitOk(request_id));
Some(request_id)
}
)
}
pub fn tx_commit_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::CommitOk (protocol::tx::CommitOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_tx_commit_ok(&self, _: protocol::tx::CommitOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingTxCommitOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn tx_rollback(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::Rollback (protocol::tx::Rollback {
}));
self.send_method_frame(method);
Ok(
{
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingTxRollbackOk(request_id));
Some(request_id)
}
)
}
pub fn tx_rollback_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Tx(protocol::tx::AMQPMethod::RollbackOk (protocol::tx::RollbackOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_tx_rollback_ok(&self, _: protocol::tx::RollbackOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingTxRollbackOk(request_id)) => {
self.requests.finish(request_id, true);
Ok(())
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
pub fn confirm_select(&self, options: ConfirmSelectOptions) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let ConfirmSelectOptions {
nowait,
} = options;
let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::Select (protocol::confirm::Select {
nowait,
}));
self.send_method_frame(method);
Ok(
if nowait {
None
} else {
let request_id = self.request_id.next();
self.replies.register_pending(self.id, Reply::AwaitingConfirmSelectOk(request_id));
Some(request_id)
}
)
}
pub fn confirm_select_ok(&self) -> Result<Option<RequestId>, Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
let method = AMQPClass::Confirm(protocol::confirm::AMQPMethod::SelectOk (protocol::confirm::SelectOk {
}));
self.send_method_frame(method);
Ok(None)
}
fn receive_confirm_select_ok(&self, _: protocol::confirm::SelectOk) -> Result<(), Error> {
if !self.status.is_connected() {
return Err(ErrorKind::NotConnected.into());
}
match self.replies.next() {
Some(Reply::AwaitingConfirmSelectOk(request_id)) => {
self.requests.finish(request_id, true);
self.on_confirm_select_ok_received()
},
_ => {
self.set_error()?;
Err(ErrorKind::UnexpectedReply.into())
},
}
}
}