[][src]Struct mobc_lapin::RMQConnectionManager

pub struct RMQConnectionManager { /* fields omitted */ }

A mobc::Manager for lapin::Connections.

Example

use mobc::Pool;
use mobc_lapin::RMQConnectionManager;
use tokio_amqp::*;
use futures::StreamExt;
use std::time::Duration;
use lapin::{
    options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation,
    ConnectionProperties,
};

const PAYLOAD: &[u8;13] = b"Hello, World!";
const QUEUE_NAME: &str = "test";

#[tokio::main]
async fn main() {
    let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
    let manager =RMQConnectionManager::new(addr.to_owned(), ConnectionProperties::default().with_tokio());
    let pool = Pool::<RMQConnectionManager>::builder()
        .max_open(5)
        .build(manager);

    let conn = pool.get().await.unwrap();
    let channel = conn.create_channel().await.unwrap();
    let _ = channel
        .queue_declare(
            QUEUE_NAME,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await.unwrap();

    // send messages to the queue
    println!("spawning senders...");
    for i in 0..50 {
        let send_pool = pool.clone();
        let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_millis(200));
            loop {
                interval.tick().await;
                let send_conn = send_pool.get().await.unwrap();
                let send_channel = send_conn.create_channel().await.unwrap();
                let confirm = send_channel
                    .basic_publish(
                        "",
                        QUEUE_NAME,
                        BasicPublishOptions::default(),
                        PAYLOAD.to_vec(),
                        send_props.clone(),
                    )
                    .await.unwrap()
                    .await.unwrap();
                assert_eq!(confirm, Confirmation::NotRequested);
            }

        });
    }

    // listen for incoming messages from the queue
    let mut consumer = channel
        .basic_consume(
            QUEUE_NAME,
            "my_consumer",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await.unwrap();

    println!("listening to messages...");
    while let Some(delivery) = consumer.next().await {
        let (channel, delivery) = delivery.expect("error in consumer");
        println!("incoming message from: {:?}", delivery.properties.kind());
        channel
            .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
            .await
            .expect("ack");
        }
}

Implementations

impl RMQConnectionManager[src]

pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self[src]

Trait Implementations

impl Clone for RMQConnectionManager[src]

impl Debug for RMQConnectionManager[src]

impl Manager for RMQConnectionManager[src]

type Connection = Connection

The connection type this manager deals with.

type Error = LapinError

The error type returned by Connections.

Auto Trait Implementations

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> Conv for T

impl<T> Conv for T

impl<T> FmtForward for T

impl<T> From<T> for T[src]

impl<T> Instrument for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> Pipe for T where
    T: ?Sized

impl<T> Pipe for T

impl<T> PipeAsRef for T

impl<T> PipeBorrow for T

impl<T> PipeDeref for T

impl<T> PipeRef for T

impl<T> Tap for T

impl<T> Tap for T

impl<T, U> TapAsRef<U> for T where
    U: ?Sized

impl<T, U> TapBorrow<U> for T where
    U: ?Sized

impl<T> TapDeref for T

impl<T> ToOwned for T where
    T: Clone
[src]

type Owned = T

The resulting type after obtaining ownership.

impl<T> TryConv for T

impl<T> TryConv for T

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.