1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
extern crate tokio_core;
extern crate tokio_io;
#[macro_use]
extern crate futures;
extern crate bytes;
#[macro_use]
extern crate log;
#[macro_use]
extern crate error_chain;
extern crate amqpr_codec;
macro_rules! try_stream_ready {
($polled: expr) => {
match $polled {
Ok(::futures::Async::Ready(Some(frame))) => frame,
Ok(::futures::Async::Ready(None)) =>
return Err(::errors::Error::from(::errors::ErrorKind::UnexpectedConnectionClose).into()),
Ok(::futures::Async::NotReady) => return Ok(::futures::Async::NotReady),
Err(e) => return Err(e.into()),
}
}
}
pub mod channel;
pub mod exchange;
pub mod queue;
pub mod basic;
pub mod subscribe_stream;
pub mod publish_sink;
pub mod handshake;
pub mod errors;
pub(crate) mod common;
pub use handshake::start_handshake;
pub use channel::open_channel;
pub use exchange::declare_exchange;
pub use queue::{declare_queue, bind_queue};
pub use basic::{publish, get_delivered, start_consume};
pub use subscribe_stream::subscribe_stream;
pub use publish_sink::publish_sink;
use futures::{Async, AsyncSink, Stream, Sink, Poll, StartSend};
use errors::Error;
use amqpr_codec::Frame;
type RawSocket = tokio_io::codec::Framed<tokio_core::net::TcpStream, amqpr_codec::Codec>;
pub struct AmqpSocket(RawSocket);
impl Stream for AmqpSocket {
type Item = Frame;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll().map_err(|io_err| Error::from(io_err))
}
}
impl Sink for AmqpSocket {
type SinkItem = Frame;
type SinkError = Error;
fn start_send(&mut self, item: Frame) -> StartSend<Frame, Error> {
self.0.start_send(item).map_err(|io_err| Error::from(io_err))
}
fn poll_complete(&mut self) -> Poll<(), Error> {
self.0.poll_complete().map_err(|io_err| Error::from(io_err))
}
fn close(&mut self) -> Poll<(), Error> {
self.0.close().map_err(|io_err| Error::from(io_err))
}
}
pub struct InOut<In: Stream, Out: Sink>(pub In, pub Out);
impl<In: Stream, Out: Sink> Stream for InOut<In, Out> {
type Item = In::Item;
type Error = In::Error;
fn poll(&mut self) -> Result<Async<Option<In::Item>>, In::Error> {
self.0.poll()
}
}
impl<In: Stream, Out: Sink> Sink for InOut<In, Out> {
type SinkItem = Out::SinkItem;
type SinkError = Out::SinkError;
fn start_send(
&mut self,
item: Out::SinkItem,
) -> Result<AsyncSink<Out::SinkItem>, Out::SinkError> {
self.1.start_send(item)
}
fn poll_complete(&mut self) -> Result<Async<()>, Out::SinkError> {
self.1.poll_complete()
}
fn close(&mut self) -> Result<Async<()>, Out::SinkError> {
self.1.close()
}
}