amqpr_api/
lib.rs

1//! amqpr-api is AMQP client api library.
2//! You can talk with AMQP server via channel controller provided by this crate.
3//! There is two kind of channel controllers; GlobalChannelController and LocalChannelController.
4//!
5
6extern crate tokio_core;
7extern crate tokio_io;
8#[macro_use]
9extern crate futures;
10extern crate bytes;
11#[macro_use]
12extern crate log;
13#[macro_use]
14extern crate error_chain;
15
16extern crate amqpr_codec;
17
18
19macro_rules! try_stream_ready {
20    ($polled: expr) => {
21        match $polled {
22            Ok(::futures::Async::Ready(Some(frame))) => frame,
23            Ok(::futures::Async::Ready(None)) =>
24                return Err(::errors::Error::from(::errors::ErrorKind::UnexpectedConnectionClose).into()),
25            Ok(::futures::Async::NotReady) => return Ok(::futures::Async::NotReady),
26            Err(e) => return Err(e.into()),
27        }
28    }
29}
30
31
32pub mod channel;
33pub mod exchange;
34pub mod queue;
35pub mod basic;
36pub mod subscribe_stream;
37pub mod publish_sink;
38
39pub mod handshake;
40pub mod errors;
41pub(crate) mod common;
42
43
44pub use handshake::start_handshake;
45pub use channel::open_channel;
46pub use exchange::declare_exchange;
47pub use queue::{declare_queue, bind_queue};
48pub use basic::{publish, get_delivered, start_consume};
49pub use subscribe_stream::subscribe_stream;
50pub use publish_sink::publish_sink;
51
52use futures::{Async, AsyncSink, Stream, Sink, Poll, StartSend};
53use errors::Error;
54use amqpr_codec::Frame;
55
56
57type RawSocket = tokio_io::codec::Framed<tokio_core::net::TcpStream, amqpr_codec::Codec>;
58
59pub struct AmqpSocket(RawSocket);
60
61impl Stream for AmqpSocket {
62    type Item = Frame;
63    type Error = Error;
64
65    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
66        self.0.poll().map_err(|io_err| Error::from(io_err))
67    }
68}
69
70impl Sink for AmqpSocket {
71    type SinkItem = Frame;
72    type SinkError = Error;
73
74    fn start_send(&mut self, item: Frame) -> StartSend<Frame, Error> {
75        self.0.start_send(item).map_err(|io_err| Error::from(io_err))
76    }
77
78    fn poll_complete(&mut self) -> Poll<(), Error> {
79        self.0.poll_complete().map_err(|io_err| Error::from(io_err))
80    }
81
82    fn close(&mut self) -> Poll<(), Error> {
83        self.0.close().map_err(|io_err| Error::from(io_err))
84    }
85}
86
87
88/// This struct is useful when the case such as some functions require `S: Stream + Sink` but your socket is
89/// separeted into `Stream` and `Sink`.
90pub struct InOut<In: Stream, Out: Sink>(pub In, pub Out);
91
92impl<In: Stream, Out: Sink> Stream for InOut<In, Out> {
93    type Item = In::Item;
94    type Error = In::Error;
95
96    fn poll(&mut self) -> Result<Async<Option<In::Item>>, In::Error> {
97        self.0.poll()
98    }
99}
100
101
102impl<In: Stream, Out: Sink> Sink for InOut<In, Out> {
103    type SinkItem = Out::SinkItem;
104    type SinkError = Out::SinkError;
105
106    fn start_send(
107        &mut self,
108        item: Out::SinkItem,
109    ) -> Result<AsyncSink<Out::SinkItem>, Out::SinkError> {
110        self.1.start_send(item)
111    }
112
113    fn poll_complete(&mut self) -> Result<Async<()>, Out::SinkError> {
114        self.1.poll_complete()
115    }
116
117    fn close(&mut self) -> Result<Async<()>, Out::SinkError> {
118        self.1.close()
119    }
120}