amqpr_api/basic/
consume.rs

1use amqpr_codec::{Frame, FrameHeader, FramePayload};
2use amqpr_codec::args::AmqpString;
3use amqpr_codec::method::MethodPayload;
4use amqpr_codec::method::basic::{BasicClass, ConsumeMethod};
5
6use futures::sink::{Sink, Send};
7
8use std::collections::HashMap;
9
10use errors::*;
11
12
13pub type ConsumeStarted<S> = Send<S>;
14
15
16/// Send `Consume` message to AMQP server.
17///
18/// # Notice
19/// A message being sent by this function is `no-wait` mode.
20pub fn start_consume<S>(channel_id: u16, socket: S, option: StartConsumeOption) -> ConsumeStarted<S>
21where
22    S: Sink<SinkItem = Frame>,
23    S::SinkError: From<Error>,
24{
25    let consume = ConsumeMethod {
26        reserved1: 0,
27        queue: option.queue,
28        consumer_tag: option.consumer_tag,
29        no_local: option.is_no_local,
30        no_ack: option.is_no_ack,
31        exclusive: option.is_exclusive,
32        no_wait: true,
33        arguments: HashMap::new(),
34    };
35
36    let frame = Frame {
37        header: FrameHeader { channel: channel_id },
38        payload: FramePayload::Method(MethodPayload::Basic(BasicClass::Consume(consume))),
39    };
40
41    socket.send(frame)
42}
43
44
45#[derive(Debug, Clone)]
46pub struct StartConsumeOption {
47    pub queue: AmqpString,
48    pub consumer_tag: AmqpString,
49    pub is_no_local: bool,
50    pub is_no_ack: bool,
51    pub is_exclusive: bool,
52}