1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use amqpr_codec::{Frame, FrameHeader, FramePayload};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::basic::{BasicClass, ConsumeMethod};

use futures::{Future, Stream, Sink, Poll, Async};

use std::collections::HashMap;
use std::borrow::Borrow;

use common::{send_and_receive, SendAndReceive};
use errors::*;


pub fn start_consume_wait<In, Out, E>(
    income: In,
    outcome: Out,
    channel_id: u16,
    option: StartConsumeOption,
) -> ConsumeStarted<In, Out>
where
    In: Stream<Error = E>,
    In::Item: Borrow<Frame>,
    Out: Sink<SinkItem = Frame, SinkError = E>,
    E: From<Error>,
{
    let consume = ConsumeMethod {
        reserved1: 0,
        queue: option.queue,
        consumer_tag: option.consumer_tag,
        no_local: option.is_no_local,
        no_ack: option.is_no_ack,
        exclusive: option.is_exclusive,
        no_wait: false,
        arguments: HashMap::new(),
    };

    let frame = Frame {
        header: FrameHeader { channel: channel_id },
        payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Consume(consume))),
    };

    let find_consume_ok: fn(&Frame) -> bool = |frame| {
        frame
            .method()
            .and_then(|c| c.basic())
            .and_then(|m| m.consume_ok())
            .is_some()
    };

    let process = send_and_receive(frame, income, outcome, find_consume_ok);

    ConsumeStarted { process: process }
}


#[derive(Debug, Clone)]
pub struct StartConsumeOption {
    pub queue: String,
    pub consumer_tag: String,
    pub is_no_local: bool,
    pub is_no_ack: bool,
    pub is_exclusive: bool,
    pub is_no_wait: bool,
}



pub struct ConsumeStarted<In, Out>
where
    Out: Sink,
{
    process: SendAndReceive<In, Out, fn(&Frame) -> bool>,
}


impl<In, Out, E> Future for ConsumeStarted<In, Out>
where
    In: Stream<Error = E>,
    In::Item: Borrow<Frame>,
    Out: Sink<SinkItem = Frame, SinkError = E>,
    E: From<Error>,
{
    type Item = (In, Out);
    type Error = E;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let (_consume_ok, income, outcome) = try_ready!(self.process);
        Ok(Async::Ready((income, outcome)))
    }
}