Expand description
lapin
This project follows the AMQP 0.9.1 specifications, targeting especially RabbitMQ.
The main access point is the Channel, which contains the individual
AMQP methods. As to the AMQP specification, one TCP Connection can contain
multiple channels.
Feature switches
codegen: generate code instead of using pregenerated onenative-tls(default): enable amqps support through native-tlsopenssl: enable amqps support through openssl (preferred over native-tls when set)rustls: enable amqps support through rustls (preferred over openssl when set, uses rustls-native-certs by default)rustls-native-certs: same as rustls, be ensure we’ll still use rustls-native-certs even if the default for rustls changesrustls-webpki-roots-certs: same as rustls but using webkit-roots instead of rustls-native-certs
Example
use futures_util::stream::StreamExt;
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use log::info;
fn main() -> Result<()> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_default_executor(8),
)
.await?;
info!("CONNECTED");
let channel_a = conn.create_channel().await?;
let channel_b = conn.create_channel().await?;
let queue = channel_a
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
info!("Declared queue {:?}", queue);
let mut consumer = channel_b
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
async_global_executor::spawn(async move {
info!("will consume");
while let Some(delivery) = consumer.next().await {
let (_, delivery) = delivery.expect("error in consumer");
delivery
.ack(BasicAckOptions::default())
.await
.expect("ack");
}
}).detach();
let payload = b"Hello world!";
loop {
let confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await?
.await?;
assert_eq!(confirm, Confirmation::NotRequested);
}
})
}Re-exports
Modules
Utility to handle SASL authentication with AMQP server
The AMQ Protocol implementation (Generated)
Structs
Continuously consumes message from a Queue.
Enums
The type of error that can be returned in this crate.
Traits
Trait providing a method to connect to an AMQP server
Type Definitions
Type alias for AMQP BasicProperties
A std Result with a lapin::Error error type