Crate lapin_async [] [src]

lapin-async

this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.

Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.

Creating a new connection

Set up an AMQP 0.9.1 compliant server. For the purpose of this documentation, we'll assume it is listening on 127.0.0.1:5672.

Create the client socket and some buffers to move data:

extern crate lapin_async as lapin;

use std::net::TcpStream;
use lapin::connection::*;
use lapin::buffer::Buffer;

fn main() {
  let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap();
  stream.set_nonblocking(true);

  let capacity = 8192;
  let mut send_buffer    = Buffer::with_capacity(capacity as usize);
  let mut receive_buffer = Buffer::with_capacity(capacity as usize);

}

Now, we can create the Connection object:

let mut conn: Connection = Connection::new();
  loop {
    match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) {
      Err(e) => panic!("could not connect: {:?}", e),
        Ok(ConnectionState::Connected) => break,
        Ok(state) => println!("now at state {:?}, continue", state),
      }
    thread::sleep(time::Duration::from_millis(100));
  }
  println!("CONNECTED");

The run method will repeatedly call 3 other methods:

  • read_from_stream with the input buffer. This method is only there as a helper to get data from the network into a Buffer struct, you might want to handle reading yourself
  • parse will parse the data you just gathered, update the state, and return the current state and how much bytes were consumed
  • then you must call write_to_stream in case the state machine must send messages to the server. It returns how much data was consumed and the current state

Calling parse and write_to_stream is how you handle the plumbing for the state machine. Your code should then react to the state changes returned by those functions, or to the specific state of channels and consumers.

In the current case, we wait until the state machine gets to the ConnectionState::Connected state.

Creating a channel

let channel_id: u16 = conn.create_channel();
conn.channel_open(channel_a, "".to_string()).expect("channel_open");

// update state here until:
assert!(conn.check_state(channel_id, ChannelState::Connected).unwrap_or(false));

Creating a queue

//create the "hello" queue
let request_id: u16 = conn.queue_declare(channel_id, 0, "hello".to_string(), false, false, false, false, false, HashMap::new()).unwrap();

// update state here until:
assert!(conn.is_finished(request_id));

Publishing a message

conn.basic_publish(channel_id, 0, "".to_string(), "hello".to_string(), false, false).expect("basic_publish");
let payload = b"Hello world!";
conn.send_content_frames(channel_a, 60, payload);

// update state

Creating a Consumer

//create the "hello" queue
let request_id: u16 = conn.basic_consume(channel_id, 0, "hello".to_string(), "my_consumer".to_string(), false, true, false, false, HashMap::new()).expect("basic_consume");

// update state here until:
assert!(conn.is_finished(request_id));

// get the next message
if let Ok(message) = conn.next_message(channel_id, "hello", "my_consumer") {
 // handle message
}

Reexports

pub use format::*;

Modules

api
buffer
channel
connection
error
format
generated
io
queue