lapin_futures/lib.rs
1#![warn(rust_2018_idioms)]
2
3//! lapin-futures
4//!
5//! This library offers a futures-0.1 based API over the lapin library.
6//! It leverages the futures-0.1 library, so you can use it
7//! with tokio, futures-cpupool or any other executor.
8//!
9//! ## Publishing a message
10//!
11//! ```rust,no_run
12//! use futures::future::Future;
13//! use lapin_futures as lapin;
14//! use crate::lapin::{BasicProperties, Client, ConnectionProperties};
15//! use crate::lapin::options::{BasicPublishOptions, QueueDeclareOptions};
16//! use crate::lapin::types::FieldTable;
17//! use log::info;
18//!
19//! fn main() {
20//! env_logger::init();
21//!
22//! let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
23//!
24//! futures::executor::spawn(
25//! Client::connect(&addr, ConnectionProperties::default()).and_then(|client| {
26//! // create_channel returns a future that is resolved
27//! // once the channel is successfully created
28//! client.create_channel()
29//! }).and_then(|channel| {
30//! let id = channel.id();
31//! info!("created channel with id: {}", id);
32//!
33//! // we using a "move" closure to reuse the channel
34//! // once the queue is declared. We could also clone
35//! // the channel
36//! channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |_| {
37//! info!("channel {} declared queue {}", id, "hello");
38//!
39//! channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default())
40//! })
41//! })
42//! ).wait_future().expect("runtime failure");
43//! }
44//! ```
45//!
46//! ## Creating a consumer
47//!
48//! ```rust,no_run
49//! use futures::{Future, Stream};
50//! use lapin_futures as lapin;
51//! use crate::lapin::{Client, ConnectionProperties};
52//! use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions};
53//! use crate::lapin::types::FieldTable;
54//! use log::{debug, info};
55//!
56//! fn main() {
57//! env_logger::init();
58//!
59//! let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
60//!
61//! futures::executor::spawn(
62//! Client::connect(&addr, ConnectionProperties::default()).and_then(|client| {
63//! // create_channel returns a future that is resolved
64//! // once the channel is successfully created
65//! client.create_channel()
66//! }).and_then(|channel| {
67//! let id = channel.id();
68//! info!("created channel with id: {}", id);
69//!
70//! let ch = channel.clone();
71//! channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
72//! info!("channel {} declared queue {:?}", id, queue);
73//!
74//! // basic_consume returns a future of a message
75//! // stream. Any time a message arrives for this consumer,
76//! // the for_each method would be called
77//! channel.basic_consume("hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
78//! }).and_then(|stream| {
79//! info!("got consumer stream");
80//!
81//! stream.for_each(move |message| {
82//! debug!("got message: {:?}", message);
83//! info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
84//! ch.basic_ack(message.delivery_tag, false)
85//! })
86//! })
87//! })
88//! ).wait_future().expect("runtime failure");
89//! }
90//! ```
91
92#[deprecated(note = "use lapin instead")]
93pub use lapin::{
94 auth, message, options, protocol, tcp, types, uri, BasicProperties, Configuration,
95 ConnectionProperties, ConsumerDelegate, Error, ExchangeKind, Queue, Result,
96};
97
98#[deprecated(note = "use lapin instead")]
99pub use channel::Channel;
100#[deprecated(note = "use lapin instead")]
101pub use client::{Client, ClientFuture, Connect};
102#[deprecated(note = "use lapin instead")]
103pub use confirmation::ConfirmationFuture;
104#[deprecated(note = "use lapin instead")]
105pub use consumer::Consumer;
106
107mod channel;
108mod client;
109mod confirmation;
110mod consumer;