1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#![warn(rust_2018_idioms)]

//! lapin-async
//!
//! this library is meant for use in an event loop. The library exposes, through the
//! [Connection struct](https://docs.rs/lapin-async/0.1.0/lapin_async/connection/struct.Connection.html),
//! a state machine you can drive through IO you manage.
//!
//! Typically, your code would own the socket and buffers, and regularly pass the
//! input and output buffers to the state machine so it receives messages and
//! serializes new ones to send. You can then query the current state and see
//! if it received new messages for the consumers.
//!
//! ## Example
//!
//! ```rust,no_run
//! use env_logger;
//! use lapin_async as lapin;
//! use crate::lapin::buffer::Buffer;
//! use crate::lapin::connection::*;
//! use crate::lapin::consumer::ConsumerSubscriber;
//! use crate::lapin::channel::BasicProperties;
//! use crate::lapin::channel::options::*;
//! use crate::lapin::channel_status::ChannelState;
//! use crate::lapin::message::Delivery;
//! use crate::lapin::types::FieldTable;
//!
//! use std::{net::TcpStream, thread, time};
//!
//! fn main() {
//!   env_logger::init();
//!
//!   /* Open TCP connection */
//!   let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap();
//!   stream.set_nonblocking(true).unwrap();
//!
//!   /* Configure AMQP connection */
//!   let capacity = 8192;
//!   let mut send_buffer    = Buffer::with_capacity(capacity as usize);
//!   let mut receive_buffer = Buffer::with_capacity(capacity as usize);
//!   let mut conn: Connection = Connection::new();
//!   conn.configuration.set_frame_max(capacity);
//!
//!   /* Connect tp RabbitMQ server */
//!   assert_eq!(conn.connect(ConnectionProperties::default()).unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader(ConnectionProperties::default())));
//!   loop {
//!     match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) {
//!       Err(e) => panic!("could not connect: {:?}", e),
//!       Ok(ConnectionState::Connected) => break,
//!       Ok(state) => println!("now at state {:?}, continue", state),
//!     }
//!     thread::sleep(time::Duration::from_millis(100));
//!   }
//!   println!("CONNECTED");
//!
//!   /* Adapt our buffer after negocation with the server */
//!   let frame_max = conn.configuration.frame_max();
//!   if frame_max > capacity {
//!     send_buffer.grow(frame_max as usize);
//!     receive_buffer.grow(frame_max as usize);
//!   }
//!
//!   /* Create and open a channel */
//!   let channel = conn.channels.create().unwrap();
//!   channel.channel_open().expect("channel_open");
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   thread::sleep(time::Duration::from_millis(100));
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   assert!(channel.status.state() == ChannelState::Connected);
//!
//!   /* Declaire the "hellp" queue */
//!   let request_id = channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).unwrap();
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   thread::sleep(time::Duration::from_millis(100));
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   assert!(channel.requests.was_successful(request_id.unwrap()).unwrap_or(false));
//!
//!   /* Publish "Hellow world!" to the "hello" queue */
//!   let payload = b"Hello world!";
//!   channel.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).expect("basic_publish");
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   thread::sleep(time::Duration::from_millis(100));
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!
//!   /* Consumer the messages from the "hello" queue using an instance of Subscriber */
//!   let request_id = channel.basic_consume("hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber)).expect("basic_consume");
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   thread::sleep(time::Duration::from_millis(100));
//!   conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap();
//!   assert!(channel.requests.was_successful(request_id.unwrap()).unwrap_or(false));
//! }
//!
//! #[derive(Debug)]
//! struct Subscriber;
//!
//! impl ConsumerSubscriber for Subscriber {
//!   fn new_delivery(&self, _delivery: Delivery) {
//!     // handle message
//!   }
//!   fn drop_prefetched_messages(&self) {}
//!   fn cancel(&self) {}
//! }
//! ```

pub mod acknowledgement;
pub mod buffer;
pub mod channel;
pub mod channel_status;
pub mod channels;
pub mod connection;
pub mod configuration;
pub mod consumer;
pub mod error;
pub mod generated_names;
pub mod id_sequence;
pub mod io;
pub mod message;
pub mod queue;
pub mod queues;
pub mod replies;
pub mod requests;
pub mod types;
pub mod uri;