1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
use amqpr_codec::{Frame, FrameHeader, FramePayload}; use amqpr_codec::method::MethodPayload; use amqpr_codec::method::basic::{BasicClass, ConsumeMethod}; use futures::{Future, Stream, Sink, Poll, Async}; use std::collections::HashMap; use std::borrow::Borrow; use common::{send_and_receive, SendAndReceive}; use errors::*; pub fn start_consume_wait<In, Out, E>( income: In, outcome: Out, channel_id: u16, option: StartConsumeOption, ) -> ConsumeStarted<In, Out> where In: Stream<Error = E>, In::Item: Borrow<Frame>, Out: Sink<SinkItem = Frame, SinkError = E>, E: From<Error>, { let consume = ConsumeMethod { reserved1: 0, queue: option.queue, consumer_tag: option.consumer_tag, no_local: option.is_no_local, no_ack: option.is_no_ack, exclusive: option.is_exclusive, no_wait: false, arguments: HashMap::new(), }; let frame = Frame { header: FrameHeader { channel: channel_id }, payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Consume(consume))), }; let find_consume_ok: fn(&Frame) -> bool = |frame| { frame .method() .and_then(|c| c.basic()) .and_then(|m| m.consume_ok()) .is_some() }; let process = send_and_receive(frame, income, outcome, find_consume_ok); ConsumeStarted { process: process } } #[derive(Debug, Clone)] pub struct StartConsumeOption { pub queue: String, pub consumer_tag: String, pub is_no_local: bool, pub is_no_ack: bool, pub is_exclusive: bool, pub is_no_wait: bool, } pub struct ConsumeStarted<In, Out> where Out: Sink, { process: SendAndReceive<In, Out, fn(&Frame) -> bool>, } impl<In, Out, E> Future for ConsumeStarted<In, Out> where In: Stream<Error = E>, In::Item: Borrow<Frame>, Out: Sink<SinkItem = Frame, SinkError = E>, E: From<Error>, { type Item = (In, Out); type Error = E; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { let (_consume_ok, income, outcome) = try_ready!(self.process); Ok(Async::Ready((income, outcome))) } }