amqpr_api/
subscribe_stream.rs

1//! Convenient module to subscribe item.
2
3use futures::{Future, Stream, Sink, Poll, Async};
4
5use amqpr_codec::Frame;
6
7use basic::consume::{start_consume, ConsumeStarted};
8use basic::deliver::{get_delivered, Delivered, DeliveredItem};
9use errors::Error;
10
11pub use basic::consume::StartConsumeOption;
12
13
14
15/// Returns `SubscribeStream` which is `Stream` of `DeliveredItem`.
16/// Basically, this function send `Consume` message to AMQO server first, and then
17/// wait for each items. This uses `get_delivered` method internally. If you want to know more detail
18/// please have a look at `get_delivered` function document.
19///
20/// We skips an item being not considered as subscribe item such as `Heartbeat` or `Error`.
21/// So we recommend that one local channel has only one subscribe stream.
22///
23/// # Notice
24/// Here is the list of default options we set in `Consume` method.
25///
26/// - no-local: false
27/// - no-ack: true
28/// - exclusive: true
29/// - no-wait: true
30pub fn subscribe_stream<S, E>(
31    ch_id: u16,
32    socket: S,
33    option: StartConsumeOption,
34) -> SubscribeStream<S, E>
35where
36    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
37    E: From<Error>,
38{
39    let consume_started = start_consume(ch_id, socket, option);
40    let sub_stream = SubscribeStream::SendingConsumeMethod(consume_started);
41
42    sub_stream
43}
44
45
46
47
48
49
50/// Stream of subscribed item from AMQP server.
51/// This stream is based on `no_ack` consume because of performance.
52/// But that may cause decreasing of reliability.
53/// If you want reliability rather than performance, you should use `subscribe_stream_ack`
54/// function (after we implement that...).
55pub enum SubscribeStream<S, E>
56where
57    S: Stream<Item = Frame, Error = E> + Sink<SinkItem = Frame, SinkError = E>,
58{
59    SendingConsumeMethod(ConsumeStarted<S>),
60    ReceivingDeliverd(Delivered<S>),
61}
62
63
64impl<S, E> Stream for SubscribeStream<S, E>
65where
66    S: Stream<Item = Frame, Error = E>
67        + Sink<SinkItem = Frame, SinkError = E>,
68    E: From<Error>,
69{
70    type Item = DeliveredItem;
71    type Error = E;
72
73    fn poll(&mut self) -> Poll<Option<DeliveredItem>, Self::Error> {
74        use self::SubscribeStream::*;
75
76        let (item, socket) = match self {
77            &mut SendingConsumeMethod(ref mut fut) => {
78                let socket = try_ready!(fut.poll());
79                (None, socket)
80            }
81            &mut ReceivingDeliverd(ref mut del) => {
82                let (item, socket) = try_ready!(del.poll());
83                (Some(item), socket)
84            }
85        };
86
87        // Start to receive new delivered item.
88        let delivered = get_delivered(socket);
89        *self = ReceivingDeliverd(delivered);
90
91        match item {
92            Some(bytes) => Ok(Async::Ready(Some(bytes))),
93            None => self.poll(),
94        }
95    }
96}