Expand description
An async AMQP 0-9-1 client library targeting RabbitMQ.
§Core concepts
Connection — a single TCP socket to the broker. One process
typically creates one connection and reuses it throughout its lifetime.
Channel — a lightweight virtual connection multiplexed over a
Connection. All AMQP operations (declaring queues, publishing,
consuming, …) are performed through channels. Open as many as you need;
they are cheap.
Consumer — an async Stream of message::Delivery values
obtained by calling Channel::basic_consume. Each delivery must be
explicitly acknowledged once processed.
PublisherConfirm — a future returned by Channel::basic_publish
that resolves to a Confirmation once the broker has acknowledged the
message (requires Channel::confirm_select).
§Quick start
use async_rs::traits::Executor;
use futures_lite::stream::StreamExt;
use lapin::{
options::*, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
fn main() -> Result<()> {
let addr = std::env::var("AMQP_ADDR")
.unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let runtime = lapin::runtime::default_runtime()?;
runtime.clone().block_on(async move {
let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare("hello".into(), QueueDeclareOptions::durable(), FieldTable::default())
.await?;
channel
.basic_publish(
"".into(),
"hello".into(),
BasicPublishOptions::default(),
b"Hello, world!",
BasicProperties::default(),
)
.await?
.await?;
let mut consumer = channel
.basic_consume(
"hello".into(),
"my_consumer".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
delivery.ack(BasicAckOptions::default()).await?;
}
Ok(())
})
}§Automatic connection recovery
Enable recovery in ConnectionProperties to automatically reconnect and
replay topology (exchanges, queues, bindings, consumers) after a network
failure:
use lapin::ConnectionProperties;
let props = ConnectionProperties::default().enable_auto_recover();
// then pass `props` to Connection::connect(…)After catching an error from a channel operation, call
Channel::wait_for_recovery to block until the connection has been
re-established:
channel.wait_for_recovery(error).await?;§Feature flags
§Async runtime (pick exactly one)
| Flag | Notes |
|---|---|
tokio (default) | Requires a Tokio runtime |
smol | Uses the smol executor |
async-global-executor | Uses async-global-executor |
§TLS backend (pick at most one; rustls is the default)
| Flag | Notes |
|---|---|
rustls (default) | TLS via rustls |
native-tls | TLS via the platform’s native library |
openssl | TLS via OpenSSL |
§Rustls certificate store (only when rustls is active)
| Flag | Notes |
|---|---|
rustls-platform-verifier (default) | Uses the platform trust store |
rustls-native-certs | Loads native root certificates |
rustls-webpki-roots-certs | Uses the webpki bundled root set |
§Rustls crypto provider (at least one must be enabled)
| Flag | Notes |
|---|---|
rustls--aws_lc_rs (default) | Uses aws-lc-rs |
rustls--ring | Uses ring (more portable) |
§Miscellaneous
| Flag | Notes |
|---|---|
hickory-dns | Use hickory-dns for name resolution |
codegen | Force code regeneration at build time |
verbose-errors | More detailed AMQP parser error messages |
Re-exports§
Modules§
- auth
- Authentication providers and helpers for connecting to RabbitMQ.
- message
- AMQP message types delivered to consumers.
- options
- Options structs for every AMQP method that accepts flag arguments.
- protocol
- Code-generated AMQP 0-9-1 method and property types derived from the RabbitMQ spec.
- runtime
- Runtime selection and helpers.
Structs§
- Acker
- Handle for acknowledging, negatively acknowledging, or rejecting a delivery.
- Channel
- Main entry point for most AMQP operations.
- Channel
Status - Shared, cheaply cloneable view of a channel’s current state.
- Configuration
- The negotiated connection parameters returned by the AMQP handshake.
- Connection
- A TCP connection to the AMQP server.
- Connection
Builder - Builder for
Connectionthat collects URI, properties, and TLS config before connecting. - Connection
Properties - Configuration for establishing a
Connectionto the broker. - Connection
Status - Shared, cheaply cloneable view of a connection’s current state.
- Consumer
- Continuously consumes message from a Queue.
- Error
- The error that can be returned in this crate.
- Publisher
Confirm - A future that resolves to the broker’s acknowledgement of a published message.
- Queue
- Information about an AMQP queue as returned by the server.
Enums§
- Async
TcpStream - Wrapper around plain or TLS async TCP streams
- Channel
State - The lifecycle state of an AMQP channel.
- Confirmation
- The broker’s response to a published message.
- Connection
State - The lifecycle state of an AMQP connection.
- Error
Kind - The type of error that can be returned in this crate.
- Event
- A connection-level event delivered via
Connection::events_listener. - Exchange
Kind - The routing algorithm used by an AMQP exchange.
Traits§
- Connect
- Extension trait that lets URI types open a
Connectiondirectly. - Consumer
Delegate - Callback-based alternative to polling a
Consumerstream.
Type Aliases§
- Basic
Properties - Type alias for AMQP BasicProperties
- Default
Connection Builder - A
ConnectionBuilderusing the crate’s default async runtime. - Result
- A std Result with a lapin::Error error type