1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use lapin_async::api::RequestId;
use std::default::Default;
use std::io::{self,Error,ErrorKind};
use futures::{Async,Future};
use futures::future;
use tokio_io::{AsyncRead,AsyncWrite};
use std::sync::{Arc,Mutex};
use transport::*;
use channel::Channel;
#[derive(Clone)]
pub struct Client<T> {
transport: Arc<Mutex<AMQPTransport<T>>>,
}
#[derive(Clone,Debug,PartialEq)]
pub struct ConnectionOptions {
pub username: String,
pub password: String,
pub vhost: String,
pub heartbeat: u16,
}
impl Default for ConnectionOptions {
fn default() -> ConnectionOptions {
ConnectionOptions {
username: "guest".to_string(),
password: "guest".to_string(),
vhost: "/".to_string(),
heartbeat: 60,
}
}
}
impl<T: AsyncRead+AsyncWrite+'static> Client<T> {
pub fn connect(stream: T, options: &ConnectionOptions) -> Box<Future<Item = Client<T>, Error = io::Error>> {
Box::new(AMQPTransport::connect(stream.framed(AMQPCodec), options).and_then(|transport| {
debug!("got client service");
let client = Client {
transport: Arc::new(Mutex::new(transport)),
};
future::ok(client)
}))
}
pub fn create_channel(&self) -> Box<Future<Item = Channel<T>, Error = io::Error>> {
let channel_transport = self.transport.clone();
if let Ok(mut transport) = self.transport.lock() {
let channel_id: u16 = transport.conn.create_channel();
match transport.conn.channel_open(channel_id, "".to_string()) {
Err(e) => Box::new(
future::err(Error::new(ErrorKind::ConnectionAborted, format!("could not create channel: {:?}", e)))
),
Ok(request_id) => {
trace!("request id: {}", request_id);
transport.send_and_handle_frames();
Box::new(wait_for_answer(channel_transport.clone(), request_id).map(move |_| {
Channel {
id: channel_id,
transport: channel_transport,
}
}))
}
}
} else {
Box::new(future::err(
Error::new(ErrorKind::ConnectionAborted, format!("could not create channel"))
))
}
}
pub fn create_confirm_channel(&self) -> Box<Future<Item = Channel<T>, Error = io::Error>> {
Box::new(self.create_channel().and_then(|channel| {
let ch = channel.clone();
channel.confirm_select().map(|_| ch)
}))
}
}
pub fn wait_for_answer<T: AsyncRead+AsyncWrite+'static>(transport: Arc<Mutex<AMQPTransport<T>>>, request_id: RequestId) -> Box<Future<Item = (), Error = io::Error>> {
Box::new(future::poll_fn(move || {
let connected = if let Ok(mut tr) = transport.try_lock() {
tr.handle_frames();
if ! tr.conn.is_finished(request_id) {
tr.handle_frames();
tr.conn.is_finished(request_id)
} else {
true
}
} else {
return Ok(Async::NotReady);
};
if connected {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}))
}