Skip to main content

Crate lapin

Crate lapin 

Source
Expand description

lapin

This project follows the AMQP 0.9.1 specifications, targeting especially RabbitMQ.

The main access point is the Channel, which contains the individual AMQP methods. As to the AMQP specification, one TCP Connection can contain multiple channels.

§Feature switches

  • codegen: generate code instead of using pregenerated one
  • native-tls: enable amqps support through native-tls (preferred over rustls when set)
  • openssl: enable amqps support through openssl (preferred over rustls when set)
  • rustls (default): enable amqps support through rustls (uses rustls-native-certs by default)
  • rustls-native-certs: same as rustls, be ensure we’ll still use rustls-native-certs even if the default for rustls changes
  • rustls-webpki-roots-certs: same as rustls but using webkit-roots instead of rustls-native-certs

§Example

use async_rs::traits::*;
use futures_lite::stream::StreamExt;
use lapin::{
    options::*, Confirmation, types::FieldTable, BasicProperties, Connection,
    ConnectionProperties, Result,
};
use tracing::info;

fn main() -> Result<()> {
    if std::env::var("RUST_LOG").is_err() {
        unsafe { std::env::set_var("RUST_LOG", "info") };
    }

    tracing_subscriber::fmt::init();

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let runtime = lapin::runtime::default_runtime()?;

    runtime.clone().block_on(async move {
        let conn = Connection::connect_with_runtime(
            &addr,
            ConnectionProperties::default(),
            runtime.clone(),
        )
        .await?;

        info!("CONNECTED");

        let channel_a = conn.create_channel().await?;
        let channel_b = conn.create_channel().await?;

        let queue = channel_a
            .queue_declare(
                "hello".into(),
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        info!(?queue, "Declared queue");

        let mut consumer = channel_b
            .basic_consume(
                "hello".into(),
                "my_consumer".into(),
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;
        runtime.spawn(async move {
            info!("will consume");
            while let Some(delivery) = consumer.next().await {
                let delivery = delivery.expect("error in consumer");
                delivery
                    .ack(BasicAckOptions::default())
                    .await
                    .expect("ack");
            }
        });

        let payload = b"Hello world!";

        loop {
            let confirm = channel_a
                .basic_publish(
                    "".into(),
                    "hello".into(),
                    BasicPublishOptions::default(),
                    payload,
                    BasicProperties::default(),
                )
                .await?
                .await?;
            assert_eq!(confirm, Confirmation::NotRequested);
        }
    })
}

Re-exports§

pub use amq_protocol::tcp;
pub use amq_protocol::types;
pub use amq_protocol::uri;

Modules§

auth
message
options
protocol
The AMQ Protocol implementation (Generated)
runtime

Structs§

Acker
Channel
Main entry point for most AMQP operations.
ChannelStatus
Configuration
Connection
A TCP connection to the AMQP server.
ConnectionProperties
ConnectionStatus
Consumer
Continuously consumes message from a Queue.
Error
The error that can be returned in this crate.
Queue

Enums§

AsyncTcpStream
Wrapper around plain or TLS async TCP streams
ChannelState
Confirmation
ConnectionState
ErrorKind
The type of error that can be returned in this crate.
Event
An event happening on the connection
ExchangeKind

Traits§

Connect
Trait providing a method to connect to an AMQP server
ConsumerDelegate

Type Aliases§

BasicProperties
Type alias for AMQP BasicProperties
Result
A std Result with a lapin::Error error type