1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
#![warn(rust_2018_idioms)] //! lapin-futures //! //! This library offers a futures-0.1 based API over the lapin library. //! It leverages the futures-0.1 library, so you can use it //! with tokio, futures-cpupool or any other executor. //! //! ## Publishing a message //! //! ```rust,no_run //! use env_logger; //! use failure::Error; //! use futures::future; //! use futures::future::Future; //! use lapin_futures as lapin; //! use crate::lapin::{BasicProperties, Client, ConnectionProperties}; //! use crate::lapin::options::{BasicPublishOptions, QueueDeclareOptions}; //! use crate::lapin::types::FieldTable; //! use log::info; //! use tokio; //! use tokio::runtime::Runtime; //! //! fn main() { //! env_logger::init(); //! //! let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); //! //! Runtime::new().unwrap().block_on_all( //! Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| { //! // create_channel returns a future that is resolved //! // once the channel is successfully created //! client.create_channel().map_err(Error::from) //! }).and_then(|mut channel| { //! let id = channel.id(); //! info!("created channel with id: {}", id); //! //! // we using a "move" closure to reuse the channel //! // once the queue is declared. We could also clone //! // the channel //! channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |_| { //! info!("channel {} declared queue {}", id, "hello"); //! //! channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default()) //! }).map_err(Error::from) //! }) //! ).expect("runtime failure"); //! } //! ``` //! //! ## Creating a consumer //! //! ```rust,no_run //! use env_logger; //! use failure::Error; //! use futures::{future, Future, Stream}; //! use lapin_futures as lapin; //! use crate::lapin::{BasicProperties, Client, ConnectionProperties}; //! use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions}; //! use crate::lapin::types::FieldTable; //! use log::{debug, info}; //! use tokio; //! use tokio::runtime::Runtime; //! //! fn main() { //! env_logger::init(); //! //! let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); //! //! Runtime::new().unwrap().block_on_all( //! Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| { //! // create_channel returns a future that is resolved //! // once the channel is successfully created //! client.create_channel().map_err(Error::from) //! }).and_then(|mut channel| { //! let id = channel.id(); //! info!("created channel with id: {}", id); //! //! let mut ch = channel.clone(); //! channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| { //! info!("channel {} declared queue {}", id, "hello"); //! //! // basic_consume returns a future of a message //! // stream. Any time a message arrives for this consumer, //! // the for_each method would be called //! channel.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default()) //! }).and_then(|stream| { //! info!("got consumer stream"); //! //! stream.for_each(move |message| { //! debug!("got message: {:?}", message); //! info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap()); //! ch.basic_ack(message.delivery_tag, false) //! }) //! }).map_err(Error::from) //! }) //! ).expect("runtime failure"); //! } //! ``` pub use lapin::{ auth, message, options, protocol, tcp, types, uri, BasicProperties, Configuration, ConnectionProperties, ConsumerDelegate, Error, ErrorKind, Queue, }; pub use channel::Channel; pub use client::{Client, ClientFuture, Connect}; pub use confirmation::ConfirmationFuture; pub use consumer::Consumer; mod channel; mod client; mod confirmation; mod consumer;