lapin 4.7.2

AMQP client library
Documentation
use async_rs::{Runtime, TokioRuntime, traits::*};
use futures_lite::stream::StreamExt;
use lapin::{
    BasicProperties, Channel, Confirmation, Connection, ConnectionProperties, Result, options::*,
    types::FieldTable,
};
use tracing::info;

async fn rabbit_stuff(addr: String, runtime: TokioRuntime) -> Result<()> {
    let conn = Connection::connect_with_runtime(
        &addr,
        ConnectionProperties::default().enable_auto_recover(),
        runtime.clone(),
    )
    .await?;

    info!("CONNECTED");

    let channel_a = conn.create_channel().await?;
    let channel_b = conn.create_channel().await?;

    let queue = channel_a
        .queue_declare(
            "hello".into(),
            QueueDeclareOptions::durable(),
            FieldTable::default(),
        )
        .await?;

    info!(?queue, "Declared queue");

    let mut consumer = channel_b
        .basic_consume(
            "hello".into(),
            "my_consumer".into(),
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;
    runtime.spawn(async move {
        info!("will consume");
        while let Some(delivery) = consumer.next().await {
            let delivery = match delivery {
                Ok(delivery) => delivery,
                Err(err) => {
                    channel_b.wait_for_recovery(err).await?;
                    continue;
                }
            };
            delivery.ack(BasicAckOptions::default()).await?;
        }
        Ok::<(), lapin::Error>(())
    });

    let payload = b"Hello world!";

    loop {
        match publish(&channel_a, payload).await {
            Ok(confirm) => assert_eq!(confirm, Confirmation::NotRequested),
            Err(err) => channel_a.wait_for_recovery(err).await?,
        }
    }
}

async fn publish(channel: &Channel, payload: &'static [u8]) -> Result<Confirmation> {
    channel
        .basic_publish(
            "".into(),
            "hello".into(),
            BasicPublishOptions::default(),
            payload,
            BasicProperties::default(),
        )
        .await?
        .await
}

fn main() -> Result<()> {
    if std::env::var("RUST_LOG").is_err() {
        unsafe { std::env::set_var("RUST_LOG", "info") };
    }

    tracing_subscriber::fmt::init();

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let runtime = Runtime::tokio()?;

    runtime.clone().block_on(rabbit_stuff(addr, runtime))
}