amqpr_api/channel/
open.rs

1use amqpr_codec::{Frame, FrameHeader, FramePayload};
2use amqpr_codec::method::MethodPayload;
3use amqpr_codec::method::channel::{ChannelClass, OpenMethod};
4
5use futures::{Future, Stream, Sink, Poll, Async};
6use futures::sink::Send;
7
8use common::Should;
9use errors::*;
10
11
12/// Open new channel with given channel id.
13///
14/// Returned future consists of `Frame` and `S` but for now, `Frame` is meanless
15/// because it has nothing.
16pub fn open_channel<S, E>(channel_id: u16, socket: S) -> ChannelOpened<S>
17where
18    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
19    E: From<Error>,
20{
21    let open = OpenMethod { reserved1: "".into() };
22    let frame = Frame {
23        header: FrameHeader { channel: channel_id },
24        payload: FramePayload::Method(MethodPayload::Channel(ChannelClass::Open(open))),
25    };
26
27    ChannelOpened::Sending(socket.send(frame))
28}
29
30
31pub enum ChannelOpened<S>
32where
33    S: Sink,
34{
35    Sending(Send<S>),
36    Receiving(Should<S>),
37}
38
39
40impl<S, E> Future for ChannelOpened<S>
41where
42    S: Stream<Item = Frame, Error = E>
43        + Sink<SinkItem = Frame, SinkError = E>,
44    E: From<Error>,
45{
46    type Item = S;
47    type Error = E;
48
49    fn poll(&mut self) -> Poll<S, S::Error> {
50        use self::ChannelOpened::*;
51
52        *self = match self {
53            &mut Sending(ref mut sending) => {
54                let socket = try_ready!(sending.poll());
55                Receiving(Should::new(socket))
56            }
57            &mut Receiving(ref mut socket) => {
58                let frame = try_stream_ready!(socket.as_mut().poll());
59                match frame.method().and_then(|m| m.channel()).and_then(
60                    |c| c.open_ok(),
61                ) {
62                    Some(_open_ok) => return Ok(Async::Ready(socket.take())),
63                    None => {
64                        return Err(E::from(Error::from(
65                            ErrorKind::UnexpectedFrame("OpenOk".into(), frame.clone()),
66                        )))
67                    }
68                }
69            }
70        };
71
72        self.poll()
73    }
74}