1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::exchange::{ExchangeClass, DeclareMethod};

use futures::sink::{Sink, Send};

use std::collections::HashMap;


pub type ExchangeDeclared<S> = Send<S>;


/// Declare exchange asynchronously.
/// That means we won't wait to receive `Declare-Ok` method after send `Declare` method.
pub fn declare_exchange<S>(
    channel_id: u16,
    socket: S,
    option: DeclareExchangeOption,
) -> ExchangeDeclared<S>
where
    S: Sink<SinkItem = Frame>,
{
    let declare = DeclareMethod {
        reserved1: 0,
        exchange: option.name,
        typ: name_of_type(option.typ),
        passive: option.is_passive,
        durable: option.is_durable,
        auto_delete: option.is_auto_delete,
        internal: option.is_internal,
        no_wait: false,
        arguments: HashMap::new(),
    };

    let frame = Frame {
        header: FrameHeader { channel: channel_id },
        payload: FramePayload::Method(MethodPayload::Exchange(ExchangeClass::Declare(declare))),
    };

    socket.send(frame)
}



#[derive(Debug, Clone)]
pub struct DeclareExchangeOption {
    pub name: AmqpString,
    pub typ: ExchangeType,
    pub is_passive: bool,
    pub is_durable: bool,
    pub is_auto_delete: bool,
    pub is_internal: bool,
}

#[derive(Debug, Clone)]
pub enum ExchangeType {
    Direct,
    Fanout,
    Topic,
    Headers,
}

fn name_of_type(typ: ExchangeType) -> AmqpString {
    match typ {
        ExchangeType::Direct => "direct".into(),
        ExchangeType::Fanout => "fanout".into(),
        ExchangeType::Topic => "topic".into(),
        ExchangeType::Headers => "headers".into(),
    }
}