amqpr_api/channel/
open.rs1use amqpr_codec::{Frame, FrameHeader, FramePayload};
2use amqpr_codec::method::MethodPayload;
3use amqpr_codec::method::channel::{ChannelClass, OpenMethod};
4
5use futures::{Future, Stream, Sink, Poll, Async};
6use futures::sink::Send;
7
8use common::Should;
9use errors::*;
10
11
12pub fn open_channel<S, E>(channel_id: u16, socket: S) -> ChannelOpened<S>
17where
18 S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
19 E: From<Error>,
20{
21 let open = OpenMethod { reserved1: "".into() };
22 let frame = Frame {
23 header: FrameHeader { channel: channel_id },
24 payload: FramePayload::Method(MethodPayload::Channel(ChannelClass::Open(open))),
25 };
26
27 ChannelOpened::Sending(socket.send(frame))
28}
29
30
31pub enum ChannelOpened<S>
32where
33 S: Sink,
34{
35 Sending(Send<S>),
36 Receiving(Should<S>),
37}
38
39
40impl<S, E> Future for ChannelOpened<S>
41where
42 S: Stream<Item = Frame, Error = E>
43 + Sink<SinkItem = Frame, SinkError = E>,
44 E: From<Error>,
45{
46 type Item = S;
47 type Error = E;
48
49 fn poll(&mut self) -> Poll<S, S::Error> {
50 use self::ChannelOpened::*;
51
52 *self = match self {
53 &mut Sending(ref mut sending) => {
54 let socket = try_ready!(sending.poll());
55 Receiving(Should::new(socket))
56 }
57 &mut Receiving(ref mut socket) => {
58 let frame = try_stream_ready!(socket.as_mut().poll());
59 match frame.method().and_then(|m| m.channel()).and_then(
60 |c| c.open_ok(),
61 ) {
62 Some(_open_ok) => return Ok(Async::Ready(socket.take())),
63 None => {
64 return Err(E::from(Error::from(
65 ErrorKind::UnexpectedFrame("OpenOk".into(), frame.clone()),
66 )))
67 }
68 }
69 }
70 };
71
72 self.poll()
73 }
74}