Skip to main content

Crate lapin

Crate lapin 

Source
Expand description

An async AMQP 0-9-1 client library targeting RabbitMQ.

§Core concepts

Connection — a single TCP socket to the broker. One process typically creates one connection and reuses it throughout its lifetime.

Channel — a lightweight virtual connection multiplexed over a Connection. All AMQP operations (declaring queues, publishing, consuming, …) are performed through channels. Open as many as you need; they are cheap.

Consumer — an async Stream of message::Delivery values obtained by calling Channel::basic_consume. Each delivery must be explicitly acknowledged once processed.

PublisherConfirm — a future returned by Channel::basic_publish that resolves to a Confirmation once the broker has acknowledged the message (requires Channel::confirm_select).

§Quick start

use async_rs::traits::Executor;
use futures_lite::stream::StreamExt;
use lapin::{
    options::*, types::FieldTable, BasicProperties, Connection,
    ConnectionProperties, Result,
};

fn main() -> Result<()> {
    let addr = std::env::var("AMQP_ADDR")
        .unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let runtime = lapin::runtime::default_runtime()?;

    runtime.clone().block_on(async move {
        let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;

        let channel = conn.create_channel().await?;

        channel
            .queue_declare("hello".into(), QueueDeclareOptions::durable(), FieldTable::default())
            .await?;

        channel
            .basic_publish(
                "".into(),
                "hello".into(),
                BasicPublishOptions::default(),
                b"Hello, world!",
                BasicProperties::default(),
            )
            .await?
            .await?;

        let mut consumer = channel
            .basic_consume(
                "hello".into(),
                "my_consumer".into(),
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;

        while let Some(delivery) = consumer.next().await {
            let delivery = delivery?;
            delivery.ack(BasicAckOptions::default()).await?;
        }
        Ok(())
    })
}

§Automatic connection recovery

Enable recovery in ConnectionProperties to automatically reconnect and replay topology (exchanges, queues, bindings, consumers) after a network failure:

use lapin::ConnectionProperties;

let props = ConnectionProperties::default().enable_auto_recover();
// then pass `props` to Connection::connect(…)

After catching an error from a channel operation, call Channel::wait_for_recovery to block until the connection has been re-established:

channel.wait_for_recovery(error).await?;

§Feature flags

§Async runtime (pick exactly one)

FlagNotes
tokio (default)Requires a Tokio runtime
smolUses the smol executor
async-global-executorUses async-global-executor

§TLS backend (pick at most one; rustls is the default)

FlagNotes
rustls (default)TLS via rustls
native-tlsTLS via the platform’s native library
opensslTLS via OpenSSL

§Rustls certificate store (only when rustls is active)

FlagNotes
rustls-platform-verifier (default)Uses the platform trust store
rustls-native-certsLoads native root certificates
rustls-webpki-roots-certsUses the webpki bundled root set

§Rustls crypto provider (at least one must be enabled)

FlagNotes
rustls--aws_lc_rs (default)Uses aws-lc-rs
rustls--ringUses ring (more portable)

§Miscellaneous

FlagNotes
hickory-dnsUse hickory-dns for name resolution
codegenForce code regeneration at build time
verbose-errorsMore detailed AMQP parser error messages

Re-exports§

pub use amq_protocol::tcp;
pub use amq_protocol::types;
pub use amq_protocol::uri;

Modules§

auth
Authentication providers and helpers for connecting to RabbitMQ.
message
AMQP message types delivered to consumers.
options
Options structs for every AMQP method that accepts flag arguments.
protocol
Code-generated AMQP 0-9-1 method and property types derived from the RabbitMQ spec.
runtime
Runtime selection and helpers.

Structs§

Acker
Handle for acknowledging, negatively acknowledging, or rejecting a delivery.
Channel
Main entry point for most AMQP operations.
ChannelStatus
Shared, cheaply cloneable view of a channel’s current state.
Configuration
The negotiated connection parameters returned by the AMQP handshake.
Connection
A TCP connection to the AMQP server.
ConnectionBuilder
Builder for Connection that collects URI, properties, and TLS config before connecting.
ConnectionProperties
Configuration for establishing a Connection to the broker.
ConnectionStatus
Shared, cheaply cloneable view of a connection’s current state.
Consumer
Continuously consumes message from a Queue.
Error
The error that can be returned in this crate.
PublisherConfirm
A future that resolves to the broker’s acknowledgement of a published message.
Queue
Information about an AMQP queue as returned by the server.

Enums§

AsyncTcpStream
Wrapper around plain or TLS async TCP streams
ChannelState
The lifecycle state of an AMQP channel.
Confirmation
The broker’s response to a published message.
ConnectionState
The lifecycle state of an AMQP connection.
ErrorKind
The type of error that can be returned in this crate.
Event
A connection-level event delivered via Connection::events_listener.
ExchangeKind
The routing algorithm used by an AMQP exchange.

Traits§

Connect
Extension trait that lets URI types open a Connection directly.
ConsumerDelegate
Callback-based alternative to polling a Consumer stream.

Type Aliases§

BasicProperties
Type alias for AMQP BasicProperties
DefaultConnectionBuilder
A ConnectionBuilder using the crate’s default async runtime.
Result
A std Result with a lapin::Error error type