amqpr_api/exchange/
declare.rs1use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
2use amqpr_codec::method::MethodPayload;
3use amqpr_codec::method::exchange::{ExchangeClass, DeclareMethod};
4
5use futures::sink::{Sink, Send};
6
7use std::collections::HashMap;
8
9
10pub type ExchangeDeclared<S> = Send<S>;
11
12
13pub fn declare_exchange<S>(
16 channel_id: u16,
17 socket: S,
18 option: DeclareExchangeOption,
19) -> ExchangeDeclared<S>
20where
21 S: Sink<SinkItem = Frame>,
22{
23 let declare = DeclareMethod {
24 reserved1: 0,
25 exchange: option.name,
26 typ: name_of_type(option.typ),
27 passive: option.is_passive,
28 durable: option.is_durable,
29 auto_delete: option.is_auto_delete,
30 internal: option.is_internal,
31 no_wait: false,
32 arguments: HashMap::new(),
33 };
34
35 let frame = Frame {
36 header: FrameHeader { channel: channel_id },
37 payload: FramePayload::Method(MethodPayload::Exchange(ExchangeClass::Declare(declare))),
38 };
39
40 socket.send(frame)
41}
42
43
44
45#[derive(Debug, Clone)]
46pub struct DeclareExchangeOption {
47 pub name: AmqpString,
48 pub typ: ExchangeType,
49 pub is_passive: bool,
50 pub is_durable: bool,
51 pub is_auto_delete: bool,
52 pub is_internal: bool,
53}
54
55#[derive(Debug, Clone)]
56pub enum ExchangeType {
57 Direct,
58 Fanout,
59 Topic,
60 Headers,
61}
62
63fn name_of_type(typ: ExchangeType) -> AmqpString {
64 match typ {
65 ExchangeType::Direct => "direct".into(),
66 ExchangeType::Fanout => "fanout".into(),
67 ExchangeType::Topic => "topic".into(),
68 ExchangeType::Headers => "headers".into(),
69 }
70}