[][src]Crate lapin_async

lapin-async

this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.

Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.

Example

use env_logger;
use lapin_async as lapin;
use log::info;

use crate::lapin::{
  Connect as _,
  channel::{BasicProperties, Channel},
  channel::options::*,
  connection::Connection,
  connection_properties::ConnectionProperties,
  consumer::ConsumerSubscriber,
  credentials::Credentials,
  message::Delivery,
  types::FieldTable,
};

#[derive(Clone,Debug)]
struct Subscriber {
  channel: Channel,
}

impl ConsumerSubscriber for Subscriber {
  fn new_delivery(&self, delivery: Delivery) {
    self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).into_result().expect("basic_ack");
  }
  fn drop_prefetched_messages(&self) {}
  fn cancel(&self) {}
}

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
  let conn = addr.connect(Credentials::default(), ConnectionProperties::default()).wait().expect("connection error");

  info!("CONNECTED");

  let channel_a = conn.create_channel().unwrap();
  let channel_b = conn.create_channel().unwrap();

  channel_a.channel_open().wait().expect("channel_open");
  channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");

  channel_b.channel_open().wait().expect("channel_open");
  channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");

  info!("will consume");
  channel_b.basic_consume("hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");

  let payload = b"Hello world!";

  loop {
    channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
  }
}

Re-exports

pub use connection::Connect;

Modules

acknowledgement
buffer
channel
channel_status
channels
configuration
confirmation
connection
connection_properties
connection_status
consumer
credentials
error
frames
id_sequence
io_loop
message
queue
queues
registration
returned_messages
tcp
types
uri
wait