1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
#![warn(rust_2018_idioms)] //! lapin-async //! //! this library is meant for use in an event loop. The library exposes, through the //! [Connection struct](https://docs.rs/lapin-async/0.1.0/lapin_async/connection/struct.Connection.html), //! a state machine you can drive through IO you manage. //! //! Typically, your code would own the socket and buffers, and regularly pass the //! input and output buffers to the state machine so it receives messages and //! serializes new ones to send. You can then query the current state and see //! if it received new messages for the consumers. //! //! ## Example //! //! ```rust,no_run //! use env_logger; //! use lapin_async as lapin; //! use crate::lapin::buffer::Buffer; //! use crate::lapin::connection::*; //! use crate::lapin::consumer::ConsumerSubscriber; //! use crate::lapin::channel::BasicProperties; //! use crate::lapin::channel::options::*; //! use crate::lapin::channel_status::ChannelState; //! use crate::lapin::message::Delivery; //! use crate::lapin::types::FieldTable; //! //! use std::{net::TcpStream, thread, time}; //! //! fn main() { //! env_logger::init(); //! //! /* Open TCP connection */ //! let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap(); //! stream.set_nonblocking(true).unwrap(); //! //! /* Configure AMQP connection */ //! let capacity = 8192; //! let mut send_buffer = Buffer::with_capacity(capacity as usize); //! let mut receive_buffer = Buffer::with_capacity(capacity as usize); //! let mut conn: Connection = Connection::new(); //! conn.configuration.set_frame_max(capacity); //! //! /* Connect tp RabbitMQ server */ //! assert_eq!(conn.connect(ConnectionProperties::default()).unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader(ConnectionProperties::default()))); //! loop { //! match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) { //! Err(e) => panic!("could not connect: {:?}", e), //! Ok(ConnectionState::Connected) => break, //! Ok(state) => println!("now at state {:?}, continue", state), //! } //! thread::sleep(time::Duration::from_millis(100)); //! } //! println!("CONNECTED"); //! //! /* Adapt our buffer after negocation with the server */ //! let frame_max = conn.configuration.frame_max(); //! if frame_max > capacity { //! send_buffer.grow(frame_max as usize); //! receive_buffer.grow(frame_max as usize); //! } //! //! /* Create and open a channel */ //! let channel = conn.channels.create().unwrap(); //! channel.channel_open().expect("channel_open"); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! thread::sleep(time::Duration::from_millis(100)); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! assert!(channel.status.state() == ChannelState::Connected); //! //! /* Declaire the "hellp" queue */ //! let request_id = channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).unwrap(); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! thread::sleep(time::Duration::from_millis(100)); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! assert!(channel.requests.was_successful(request_id.unwrap()).unwrap_or(false)); //! //! /* Publish "Hellow world!" to the "hello" queue */ //! let payload = b"Hello world!"; //! channel.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).expect("basic_publish"); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! thread::sleep(time::Duration::from_millis(100)); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! //! /* Consumer the messages from the "hello" queue using an instance of Subscriber */ //! let request_id = channel.basic_consume("hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber)).expect("basic_consume"); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! thread::sleep(time::Duration::from_millis(100)); //! conn.run(&mut stream, &mut send_buffer, &mut receive_buffer).unwrap(); //! assert!(channel.requests.was_successful(request_id.unwrap()).unwrap_or(false)); //! } //! //! #[derive(Debug)] //! struct Subscriber; //! //! impl ConsumerSubscriber for Subscriber { //! fn new_delivery(&self, _delivery: Delivery) { //! // handle message //! } //! fn drop_prefetched_messages(&self) {} //! fn cancel(&self) {} //! } //! ``` pub mod acknowledgement; pub mod buffer; pub mod channel; pub mod channel_status; pub mod channels; pub mod connection; pub mod configuration; pub mod consumer; pub mod error; pub mod generated_names; pub mod id_sequence; pub mod io; pub mod message; pub mod queue; pub mod queues; pub mod replies; pub mod requests; pub mod types; pub mod uri;