Expand description
lapin
This project follows the AMQP 0.9.1 specifications, targeting especially RabbitMQ.
The main access point is the Channel, which contains the individual
AMQP methods. As to the AMQP specification, one TCP Connection can contain
multiple channels.
§Feature switches
codegen: generate code instead of using pregenerated onenative-tls: enable amqps support through native-tls (preferred over rustls when set)openssl: enable amqps support through openssl (preferred over rustls when set)rustls(default): enable amqps support through rustls (uses rustls-native-certs by default)rustls-native-certs: same as rustls, be ensure we’ll still use rustls-native-certs even if the default for rustls changesrustls-webpki-roots-certs: same as rustls but using webkit-roots instead of rustls-native-certs
§Example
use async_rs::traits::*;
use futures_lite::stream::StreamExt;
use lapin::{
options::*, Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use tracing::info;
fn main() -> Result<()> {
if std::env::var("RUST_LOG").is_err() {
unsafe { std::env::set_var("RUST_LOG", "info") };
}
tracing_subscriber::fmt::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let runtime = lapin::runtime::default_runtime()?;
runtime.clone().block_on(async move {
let conn = Connection::connect_with_runtime(
&addr,
ConnectionProperties::default(),
runtime.clone(),
)
.await?;
info!("CONNECTED");
let channel_a = conn.create_channel().await?;
let channel_b = conn.create_channel().await?;
let queue = channel_a
.queue_declare(
"hello".into(),
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
info!(?queue, "Declared queue");
let mut consumer = channel_b
.basic_consume(
"hello".into(),
"my_consumer".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
runtime.spawn(async move {
info!("will consume");
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
delivery
.ack(BasicAckOptions::default())
.await
.expect("ack");
}
});
let payload = b"Hello world!";
loop {
let confirm = channel_a
.basic_publish(
"".into(),
"hello".into(),
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await?
.await?;
assert_eq!(confirm, Confirmation::NotRequested);
}
})
}Re-exports§
Modules§
Structs§
- Acker
- Channel
- Main entry point for most AMQP operations.
- Channel
Status - Configuration
- Connection
- A TCP connection to the AMQP server.
- Connection
Properties - Connection
Status - Consumer
- Continuously consumes message from a Queue.
- Error
- The error that can be returned in this crate.
- Queue
Enums§
- Async
TcpStream - Wrapper around plain or TLS async TCP streams
- Channel
State - Confirmation
- Connection
State - Error
Kind - The type of error that can be returned in this crate.
- Event
- An event happening on the connection
- Exchange
Kind
Traits§
- Connect
- Trait providing a method to connect to an AMQP server
- Consumer
Delegate
Type Aliases§
- Basic
Properties - Type alias for AMQP BasicProperties
- Result
- A std Result with a lapin::Error error type