[][src]Crate lapin_async

lapin-async

this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.

Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.

Example

use env_logger;
use lapin_async as lapin;
use log::info;

use crate::lapin::{
  BasicProperties, Channel, Connection, ConnectionProperties, ConsumerSubscriber,
  message::Delivery,
  options::*,
  types::FieldTable,
};

#[derive(Clone,Debug)]
struct Subscriber {
  channel: Channel,
}

impl ConsumerSubscriber for Subscriber {
  fn new_delivery(&self, delivery: Delivery) {
    self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).as_error().expect("basic_ack");
  }
  fn drop_prefetched_messages(&self) {}
  fn cancel(&self) {}
}

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
  let conn = Connection::connect(&addr, ConnectionProperties::default()).wait().expect("connection error");

  info!("CONNECTED");

  let channel_a = conn.create_channel().wait().expect("create_channel");
  let channel_b = conn.create_channel().wait().expect("create_channel");

  channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
  let queue = channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");

  info!("will consume");
  channel_b.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");

  let payload = b"Hello world!";

  loop {
    channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
  }
}

Re-exports

pub use amq_protocol::tcp;
pub use amq_protocol::types;
pub use amq_protocol::uri;

Modules

auth

Utility to handle SASL authentication with AMQP server

confirmationDeprecated
messageDeprecated
options
protocol

The AMQ Protocol implementation (Generated)

Structs

ChannelDeprecated
ChannelStatusDeprecated
ConfigurationDeprecated
ConnectionDeprecated
ConnectionPropertiesDeprecated
ConnectionStatusDeprecated
ErrorDeprecated

The type of error that can be returned in this crate.

QueueDeprecated

Enums

ChannelStateDeprecated
ConnectionStateDeprecated
ErrorKindDeprecated

The different kinds of errors that can be reported.

Traits

ConnectDeprecated

Trait providing a method to connect to an AMQP server

ConsumerSubscriberDeprecated

Type Definitions

BasicProperties

Type alias for AMQP BasicProperties