use crate::{
message::{BasicGetMessage, BasicReturnMessage},
options::*,
types::{Boolean, FieldTable, LongUInt, ShortUInt},
BasicProperties, ConfirmationFuture, Consumer, Error, ExchangeKind, Queue, Result,
};
use futures::Future;
use lapin::{Channel as InnerChannel, Connection};
#[derive(Clone)]
pub struct Channel {
inner: InnerChannel,
}
impl Channel {
pub fn create(conn: &Connection) -> impl Future<Item = Self, Error = Error> {
let confirmation: ConfirmationFuture<InnerChannel> = conn.create_channel().into();
confirmation.map(|inner| Channel { inner })
}
pub fn id(&self) -> u16 {
self.inner.id()
}
pub fn access_request(
&self,
realm: &str,
options: AccessRequestOptions,
) -> ConfirmationFuture<()> {
self.inner.access_request(realm, options).into()
}
pub fn exchange_declare(
&self,
name: &str,
exchange_type: ExchangeKind,
options: ExchangeDeclareOptions,
arguments: FieldTable,
) -> ConfirmationFuture<()> {
self.inner
.exchange_declare(name, exchange_type, options, arguments)
.into()
}
pub fn exchange_delete(
&self,
name: &str,
options: ExchangeDeleteOptions,
) -> ConfirmationFuture<()> {
self.inner.exchange_delete(name, options).into()
}
pub fn exchange_bind(
&self,
destination: &str,
source: &str,
routing_key: &str,
options: ExchangeBindOptions,
arguments: FieldTable,
) -> ConfirmationFuture<()> {
self.inner
.exchange_bind(destination, source, routing_key, options, arguments)
.into()
}
pub fn exchange_unbind(
&self,
destination: &str,
source: &str,
routing_key: &str,
options: ExchangeUnbindOptions,
arguments: FieldTable,
) -> ConfirmationFuture<()> {
self.inner
.exchange_unbind(destination, source, routing_key, options, arguments)
.into()
}
pub fn queue_declare(
&self,
name: &str,
options: QueueDeclareOptions,
arguments: FieldTable,
) -> ConfirmationFuture<Queue> {
self.inner.queue_declare(name, options, arguments).into()
}
pub fn queue_bind(
&self,
name: &str,
exchange: &str,
routing_key: &str,
options: QueueBindOptions,
arguments: FieldTable,
) -> ConfirmationFuture<()> {
self.inner
.queue_bind(name, exchange, routing_key, options, arguments)
.into()
}
pub fn queue_unbind(
&self,
name: &str,
exchange: &str,
routing_key: &str,
arguments: FieldTable,
) -> ConfirmationFuture<()> {
self.inner
.queue_unbind(name, exchange, routing_key, arguments)
.into()
}
pub fn confirm_select(&self, options: ConfirmSelectOptions) -> ConfirmationFuture<()> {
self.inner.confirm_select(options).into()
}
pub fn basic_qos(
&self,
prefetch_count: ShortUInt,
options: BasicQosOptions,
) -> ConfirmationFuture<()> {
self.inner.basic_qos(prefetch_count, options).into()
}
pub fn basic_publish(
&self,
exchange: &str,
routing_key: &str,
payload: Vec<u8>,
options: BasicPublishOptions,
properties: BasicProperties,
) -> ConfirmationFuture<()> {
self.inner
.basic_publish(exchange, routing_key, options, payload, properties)
.into()
}
pub fn basic_consume(
&self,
queue: &Queue,
consumer_tag: &str,
options: BasicConsumeOptions,
arguments: FieldTable,
) -> impl Future<Item = Consumer, Error = Error> {
let confirmation: ConfirmationFuture<lapin::Consumer> = self
.inner
.basic_consume(queue, consumer_tag, options, arguments)
.into();
confirmation.map(Consumer)
}
pub fn basic_cancel(
&self,
consumer_tag: &str,
options: BasicCancelOptions,
) -> ConfirmationFuture<()> {
self.inner.basic_cancel(consumer_tag, options).into()
}
pub fn basic_recover(&self, options: BasicRecoverOptions) -> ConfirmationFuture<()> {
self.inner.basic_recover(options).into()
}
pub fn basic_recover_async(&self, options: BasicRecoverAsyncOptions) -> ConfirmationFuture<()> {
self.inner.basic_recover_async(options).into()
}
pub fn basic_ack(&self, delivery_tag: u64, multiple: bool) -> ConfirmationFuture<()> {
self.inner
.basic_ack(delivery_tag, BasicAckOptions { multiple })
.into()
}
pub fn basic_nack(
&self,
delivery_tag: u64,
multiple: bool,
requeue: bool,
) -> ConfirmationFuture<()> {
self.inner
.basic_nack(delivery_tag, BasicNackOptions { multiple, requeue })
.into()
}
pub fn basic_reject(
&self,
delivery_tag: u64,
options: BasicRejectOptions,
) -> ConfirmationFuture<()> {
self.inner.basic_reject(delivery_tag, options).into()
}
pub fn basic_get(
&self,
queue: &str,
options: BasicGetOptions,
) -> ConfirmationFuture<Option<BasicGetMessage>> {
self.inner.basic_get(queue, options).into()
}
pub fn queue_purge(
&self,
queue_name: &str,
options: QueuePurgeOptions,
) -> ConfirmationFuture<LongUInt> {
self.inner.queue_purge(queue_name, options).into()
}
pub fn queue_delete(
&self,
queue_name: &str,
options: QueueDeleteOptions,
) -> ConfirmationFuture<LongUInt> {
self.inner.queue_delete(queue_name, options).into()
}
pub fn close(&self, code: u16, message: &str) -> ConfirmationFuture<()> {
self.inner.close(code, message).into()
}
pub fn channel_flow(&self, options: ChannelFlowOptions) -> ConfirmationFuture<Boolean> {
self.inner.channel_flow(options).into()
}
pub fn tx_select(&self) -> ConfirmationFuture<()> {
self.inner.tx_select().into()
}
pub fn tx_commit(&self) -> ConfirmationFuture<()> {
self.inner.tx_commit().into()
}
pub fn tx_rollback(&self) -> ConfirmationFuture<()> {
self.inner.tx_rollback().into()
}
pub fn wait_for_confirms(&self) -> ConfirmationFuture<Vec<BasicReturnMessage>, Result<()>> {
self.inner.wait_for_confirms().into()
}
}