1extern crate tokio_core;
7extern crate tokio_io;
8#[macro_use]
9extern crate futures;
10extern crate bytes;
11#[macro_use]
12extern crate log;
13#[macro_use]
14extern crate error_chain;
15
16extern crate amqpr_codec;
17
18
19macro_rules! try_stream_ready {
20 ($polled: expr) => {
21 match $polled {
22 Ok(::futures::Async::Ready(Some(frame))) => frame,
23 Ok(::futures::Async::Ready(None)) =>
24 return Err(::errors::Error::from(::errors::ErrorKind::UnexpectedConnectionClose).into()),
25 Ok(::futures::Async::NotReady) => return Ok(::futures::Async::NotReady),
26 Err(e) => return Err(e.into()),
27 }
28 }
29}
30
31
32pub mod channel;
33pub mod exchange;
34pub mod queue;
35pub mod basic;
36pub mod subscribe_stream;
37pub mod publish_sink;
38
39pub mod handshake;
40pub mod errors;
41pub(crate) mod common;
42
43
44pub use handshake::start_handshake;
45pub use channel::open_channel;
46pub use exchange::declare_exchange;
47pub use queue::{declare_queue, bind_queue};
48pub use basic::{publish, get_delivered, start_consume};
49pub use subscribe_stream::subscribe_stream;
50pub use publish_sink::publish_sink;
51
52use futures::{Async, AsyncSink, Stream, Sink, Poll, StartSend};
53use errors::Error;
54use amqpr_codec::Frame;
55
56
57type RawSocket = tokio_io::codec::Framed<tokio_core::net::TcpStream, amqpr_codec::Codec>;
58
59pub struct AmqpSocket(RawSocket);
60
61impl Stream for AmqpSocket {
62 type Item = Frame;
63 type Error = Error;
64
65 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
66 self.0.poll().map_err(|io_err| Error::from(io_err))
67 }
68}
69
70impl Sink for AmqpSocket {
71 type SinkItem = Frame;
72 type SinkError = Error;
73
74 fn start_send(&mut self, item: Frame) -> StartSend<Frame, Error> {
75 self.0.start_send(item).map_err(|io_err| Error::from(io_err))
76 }
77
78 fn poll_complete(&mut self) -> Poll<(), Error> {
79 self.0.poll_complete().map_err(|io_err| Error::from(io_err))
80 }
81
82 fn close(&mut self) -> Poll<(), Error> {
83 self.0.close().map_err(|io_err| Error::from(io_err))
84 }
85}
86
87
88pub struct InOut<In: Stream, Out: Sink>(pub In, pub Out);
91
92impl<In: Stream, Out: Sink> Stream for InOut<In, Out> {
93 type Item = In::Item;
94 type Error = In::Error;
95
96 fn poll(&mut self) -> Result<Async<Option<In::Item>>, In::Error> {
97 self.0.poll()
98 }
99}
100
101
102impl<In: Stream, Out: Sink> Sink for InOut<In, Out> {
103 type SinkItem = Out::SinkItem;
104 type SinkError = Out::SinkError;
105
106 fn start_send(
107 &mut self,
108 item: Out::SinkItem,
109 ) -> Result<AsyncSink<Out::SinkItem>, Out::SinkError> {
110 self.1.start_send(item)
111 }
112
113 fn poll_complete(&mut self) -> Result<Async<()>, Out::SinkError> {
114 self.1.poll_complete()
115 }
116
117 fn close(&mut self) -> Result<Async<()>, Out::SinkError> {
118 self.1.close()
119 }
120}