Crate lapin

source · []
Expand description

lapin

This project follows the AMQP 0.9.1 specifications, targeting especially RabbitMQ.

The main access point is the Channel, which contains the individual AMQP methods. As to the AMQP specification, one TCP Connection can contain multiple channels.

Feature switches

  • codegen: generate code instead of using pregenerated one
  • native-tls (default): enable amqps support through native-tls
  • openssl: enable amqps support through openssl (preferred over native-tls when set)
  • rustls: enable amqps support through rustls (preferred over openssl when set, uses rustls-native-certs by default)
  • rustls-native-certs: same as rustls, be ensure we’ll still use rustls-native-certs even if the default for rustls changes
  • rustls-webpki-roots-certs: same as rustls but using webkit-roots instead of rustls-native-certs

Example

use futures_lite::stream::StreamExt;
use lapin::{
    options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
    ConnectionProperties, Result,
};
use tracing::info;

fn main() -> Result<()> {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }

    tracing_subscriber::fmt::init();

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

    async_global_executor::block_on(async {
        let conn = Connection::connect(
            &addr,
            ConnectionProperties::default(),
        )
        .await?;

        info!("CONNECTED");

        let channel_a = conn.create_channel().await?;
        let channel_b = conn.create_channel().await?;

        let queue = channel_a
            .queue_declare(
                "hello",
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        info!(?queue, "Declared queue");

        let mut consumer = channel_b
            .basic_consume(
                "hello",
                "my_consumer",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;
        async_global_executor::spawn(async move {
            info!("will consume");
            while let Some(delivery) = consumer.next().await {
                let delivery = delivery.expect("error in consumer");
                delivery
                    .ack(BasicAckOptions::default())
                    .await
                    .expect("ack");
            }
        }).detach();

        let payload = b"Hello world!";

        loop {
            let confirm = channel_a
                .basic_publish(
                    "",
                    "hello",
                    BasicPublishOptions::default(),
                    payload,
                    BasicProperties::default(),
                )
                .await?
                .await?;
            assert_eq!(confirm, Confirmation::NotRequested);
        }
    })
}

Re-exports

pub use amq_protocol::tcp;
pub use amq_protocol::types;
pub use amq_protocol::uri;

Modules

Utility to handle SASL authentication with AMQP server

The AMQ Protocol implementation (Generated)

Structs

Main entry point for most AMQP operations.

A TCP connection to the AMQP server.

Continuously consumes message from a Queue.

Enums

The type of error that can be returned in this crate.

Wrapper around plain or TLS TCP streams

Traits

Trait providing a method to connect to an AMQP server

Type Definitions

Type alias for AMQP BasicProperties

A std Result with a lapin::Error error type