1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
use amqpr_codec::method::MethodPayload;
use amqpr_codec::method::exchange::{ExchangeClass, DeclareMethod};
use futures::sink::{Sink, Send};
use std::collections::HashMap;
pub type ExchangeDeclared<S> = Send<S>;
pub fn declare_exchange<S>(
channel_id: u16,
socket: S,
option: DeclareExchangeOption,
) -> ExchangeDeclared<S>
where
S: Sink<SinkItem = Frame>,
{
let declare = DeclareMethod {
reserved1: 0,
exchange: option.name,
typ: name_of_type(option.typ),
passive: option.is_passive,
durable: option.is_durable,
auto_delete: option.is_auto_delete,
internal: option.is_internal,
no_wait: false,
arguments: HashMap::new(),
};
let frame = Frame {
header: FrameHeader { channel: channel_id },
payload: FramePayload::Method(MethodPayload::Exchange(ExchangeClass::Declare(declare))),
};
socket.send(frame)
}
#[derive(Debug, Clone)]
pub struct DeclareExchangeOption {
pub name: AmqpString,
pub typ: ExchangeType,
pub is_passive: bool,
pub is_durable: bool,
pub is_auto_delete: bool,
pub is_internal: bool,
}
#[derive(Debug, Clone)]
pub enum ExchangeType {
Direct,
Fanout,
Topic,
Headers,
}
fn name_of_type(typ: ExchangeType) -> AmqpString {
match typ {
ExchangeType::Direct => "direct".into(),
ExchangeType::Fanout => "fanout".into(),
ExchangeType::Topic => "topic".into(),
ExchangeType::Headers => "headers".into(),
}
}