1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#![warn(rust_2018_idioms)]

//! lapin-futures
//!
//! This library offers a futures-0.1 based API over the lapin library.
//! It leverages the futures-0.1 library, so you can use it
//! with tokio, futures-cpupool or any other executor.
//!
//! ## Publishing a message
//!
//! ```rust,no_run
//! use env_logger;
//! use failure::Error;
//! use futures::future;
//! use futures::future::Future;
//! use lapin_futures as lapin;
//! use crate::lapin::{BasicProperties, Client, ConnectionProperties};
//! use crate::lapin::options::{BasicPublishOptions, QueueDeclareOptions};
//! use crate::lapin::types::FieldTable;
//! use log::info;
//! use tokio;
//! use tokio::runtime::Runtime;
//!
//! fn main() {
//!   env_logger::init();
//!
//!   let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
//!
//!   Runtime::new().unwrap().block_on_all(
//!    Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
//!       // create_channel returns a future that is resolved
//!       // once the channel is successfully created
//!       client.create_channel().map_err(Error::from)
//!     }).and_then(|mut channel| {
//!       let id = channel.id();
//!       info!("created channel with id: {}", id);
//!
//!       // we using a "move" closure to reuse the channel
//!       // once the queue is declared. We could also clone
//!       // the channel
//!       channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |_| {
//!         info!("channel {} declared queue {}", id, "hello");
//!
//!         channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default())
//!       }).map_err(Error::from)
//!     })
//!   ).expect("runtime failure");
//! }
//! ```
//!
//! ## Creating a consumer
//!
//! ```rust,no_run
//! use env_logger;
//! use failure::Error;
//! use futures::{future, Future, Stream};
//! use lapin_futures as lapin;
//! use crate::lapin::{BasicProperties, Client, ConnectionProperties};
//! use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions};
//! use crate::lapin::types::FieldTable;
//! use log::{debug, info};
//! use tokio;
//! use tokio::runtime::Runtime;
//!
//! fn main() {
//!   env_logger::init();
//!
//!   let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
//!
//!   Runtime::new().unwrap().block_on_all(
//!    Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
//!       // create_channel returns a future that is resolved
//!       // once the channel is successfully created
//!       client.create_channel().map_err(Error::from)
//!     }).and_then(|mut channel| {
//!       let id = channel.id();
//!       info!("created channel with id: {}", id);
//!
//!       let mut ch = channel.clone();
//!       channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
//!         info!("channel {} declared queue {}", id, "hello");
//!
//!         // basic_consume returns a future of a message
//!         // stream. Any time a message arrives for this consumer,
//!         // the for_each method would be called
//!         channel.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
//!       }).and_then(|stream| {
//!         info!("got consumer stream");
//!
//!         stream.for_each(move |message| {
//!           debug!("got message: {:?}", message);
//!           info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
//!           ch.basic_ack(message.delivery_tag, false)
//!         })
//!       }).map_err(Error::from)
//!     })
//!   ).expect("runtime failure");
//! }
//! ```

pub use lapin::{
  auth, message, options, protocol, tcp, types, uri,
  BasicProperties, Configuration, ConnectionProperties, ConsumerDelegate, Error, ErrorKind, Queue,
};

pub use channel::Channel;
pub use client::{Client, ClientFuture, Connect};
pub use confirmation::ConfirmationFuture;
pub use consumer::Consumer;

mod channel;
mod client;
mod confirmation;
mod consumer;