[−][src]Struct mobc_lapin::RMQConnectionManager
A mobc::Manager for lapin::Connections.
Example
use mobc::Pool; use mobc_lapin::RMQConnectionManager; use tokio_amqp::*; use futures::StreamExt; use std::time::Duration; use lapin::{ options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation, ConnectionProperties, }; const PAYLOAD: &[u8;13] = b"Hello, World!"; const QUEUE_NAME: &str = "test"; #[tokio::main] async fn main() { let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f"; let manager =RMQConnectionManager::new(addr.to_owned(), ConnectionProperties::default().with_tokio()); let pool = Pool::<RMQConnectionManager>::builder() .max_open(5) .build(manager); let conn = pool.get().await.unwrap(); let channel = conn.create_channel().await.unwrap(); let _ = channel .queue_declare( QUEUE_NAME, QueueDeclareOptions::default(), FieldTable::default(), ) .await.unwrap(); // send messages to the queue println!("spawning senders..."); for i in 0..50 { let send_pool = pool.clone(); let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into()); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(200)); loop { interval.tick().await; let send_conn = send_pool.get().await.unwrap(); let send_channel = send_conn.create_channel().await.unwrap(); let confirm = send_channel .basic_publish( "", QUEUE_NAME, BasicPublishOptions::default(), PAYLOAD.to_vec(), send_props.clone(), ) .await.unwrap() .await.unwrap(); assert_eq!(confirm, Confirmation::NotRequested); } }); } // listen for incoming messages from the queue let mut consumer = channel .basic_consume( QUEUE_NAME, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), ) .await.unwrap(); println!("listening to messages..."); while let Some(delivery) = consumer.next().await { let (channel, delivery) = delivery.expect("error in consumer"); println!("incoming message from: {:?}", delivery.properties.kind()); channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .await .expect("ack"); } }
Implementations
impl RMQConnectionManager[src]
pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self[src]
Trait Implementations
impl Clone for RMQConnectionManager[src]
fn clone(&self) -> RMQConnectionManager[src]
fn clone_from(&mut self, source: &Self)1.0.0[src]
impl Debug for RMQConnectionManager[src]
impl Manager for RMQConnectionManager[src]
type Connection = Connection
The connection type this manager deals with.
type Error = LapinError
The error type returned by Connections.
fn connect<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait, [src]
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn check<'life0, 'async_trait>(
&'life0 self,
conn: Self::Connection
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait, [src]
&'life0 self,
conn: Self::Connection
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
fn spawn_task<T>(&self, task: T) where
T: Future + Send + 'static,
<T as Future>::Output: Send,
<T as Future>::Output: 'static, [src]
T: Future + Send + 'static,
<T as Future>::Output: Send,
<T as Future>::Output: 'static,
fn validate(&self, _conn: &mut Self::Connection) -> bool[src]
Auto Trait Implementations
impl !RefUnwindSafe for RMQConnectionManager
impl Send for RMQConnectionManager
impl Sync for RMQConnectionManager
impl Unpin for RMQConnectionManager
impl !UnwindSafe for RMQConnectionManager
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized, [src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized, [src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized, [src]
T: ?Sized,
fn borrow_mut(&mut self) -> &mut T[src]
impl<T> From<T> for T[src]
impl<T> Instrument for T[src]
fn instrument(self, span: Span) -> Instrumented<Self>[src]
fn in_current_span(self) -> Instrumented<Self>[src]
impl<T, U> Into<U> for T where
U: From<T>, [src]
U: From<T>,
impl<T> ToOwned for T where
T: Clone, [src]
T: Clone,
type Owned = T
The resulting type after obtaining ownership.
fn to_owned(&self) -> T[src]
fn clone_into(&self, target: &mut T)[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>, [src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>, [src]
U: TryFrom<T>,