amqpr_api/basic/
consume.rs1use amqpr_codec::{Frame, FrameHeader, FramePayload};
2use amqpr_codec::args::AmqpString;
3use amqpr_codec::method::MethodPayload;
4use amqpr_codec::method::basic::{BasicClass, ConsumeMethod};
5
6use futures::sink::{Sink, Send};
7
8use std::collections::HashMap;
9
10use errors::*;
11
12
13pub type ConsumeStarted<S> = Send<S>;
14
15
16pub fn start_consume<S>(channel_id: u16, socket: S, option: StartConsumeOption) -> ConsumeStarted<S>
21where
22 S: Sink<SinkItem = Frame>,
23 S::SinkError: From<Error>,
24{
25 let consume = ConsumeMethod {
26 reserved1: 0,
27 queue: option.queue,
28 consumer_tag: option.consumer_tag,
29 no_local: option.is_no_local,
30 no_ack: option.is_no_ack,
31 exclusive: option.is_exclusive,
32 no_wait: true,
33 arguments: HashMap::new(),
34 };
35
36 let frame = Frame {
37 header: FrameHeader { channel: channel_id },
38 payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Consume(consume))),
39 };
40
41 socket.send(frame)
42}
43
44
45#[derive(Debug, Clone)]
46pub struct StartConsumeOption {
47 pub queue: AmqpString,
48 pub consumer_tag: AmqpString,
49 pub is_no_local: bool,
50 pub is_no_ack: bool,
51 pub is_exclusive: bool,
52}