1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//! lapin-async
//!
//! this library is meant for use in an event loop. The library exposes, through the
//! [Connection struct](https://docs.rs/lapin-async/0.1.0/lapin_async/connection/struct.Connection.html),
//! a state machine you can drive through IO you manage.
//!
//! Typically, your code would own the socket and buffers, and regularly pass the
//! input and output buffers to the state machine so it receives messages and
//! serializes new ones to send. You can then query the current state and see
//! if it received new messages for the consumers.
//!
//! ## Example
//!
//! ```rust,no_run
//! use env_logger;
//! use lapin_async as lapin;
//! use log::info;
//!
//! use crate::lapin::{
//! BasicProperties, Channel, Connection, ConnectionProperties, ConsumerSubscriber,
//! message::Delivery,
//! options::*,
//! types::FieldTable,
//! };
//!
//! #[derive(Clone,Debug)]
//! struct Subscriber {
//! channel: Channel,
//! }
//!
//! impl ConsumerSubscriber for Subscriber {
//! fn new_delivery(&self, delivery: Delivery) {
//! self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).as_error().expect("basic_ack");
//! }
//! fn drop_prefetched_messages(&self) {}
//! fn cancel(&self) {}
//! }
//!
//! fn main() {
//! env_logger::init();
//!
//! let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
//! let conn = Connection::connect(&addr, ConnectionProperties::default()).wait().expect("connection error");
//!
//! info!("CONNECTED");
//!
//! let channel_a = conn.create_channel().wait().expect("create_channel");
//! let channel_b = conn.create_channel().wait().expect("create_channel");
//!
//! channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
//! let queue = channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
//!
//! info!("will consume");
//! channel_b.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");
//!
//! let payload = b"Hello world!";
//!
//! loop {
//! channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
//! }
//! }
//! ```
pub use ;
pub use ;
pub use ;
pub use Configuration;
pub use ;
pub use ConnectionProperties;
pub use ;
pub use ConsumerSubscriber;
pub use ;
pub use Queue;