amqpr_api/exchange/
declare.rs

1use amqpr_codec::{Frame, FrameHeader, FramePayload, AmqpString};
2use amqpr_codec::method::MethodPayload;
3use amqpr_codec::method::exchange::{ExchangeClass, DeclareMethod};
4
5use futures::sink::{Sink, Send};
6
7use std::collections::HashMap;
8
9
10pub type ExchangeDeclared<S> = Send<S>;
11
12
13/// Declare exchange asynchronously.
14/// That means we won't wait to receive `Declare-Ok` method after send `Declare` method.
15pub fn declare_exchange<S>(
16    channel_id: u16,
17    socket: S,
18    option: DeclareExchangeOption,
19) -> ExchangeDeclared<S>
20where
21    S: Sink<SinkItem = Frame>,
22{
23    let declare = DeclareMethod {
24        reserved1: 0,
25        exchange: option.name,
26        typ: name_of_type(option.typ),
27        passive: option.is_passive,
28        durable: option.is_durable,
29        auto_delete: option.is_auto_delete,
30        internal: option.is_internal,
31        no_wait: false,
32        arguments: HashMap::new(),
33    };
34
35    let frame = Frame {
36        header: FrameHeader { channel: channel_id },
37        payload: FramePayload::Method(MethodPayload::Exchange(ExchangeClass::Declare(declare))),
38    };
39
40    socket.send(frame)
41}
42
43
44
45#[derive(Debug, Clone)]
46pub struct DeclareExchangeOption {
47    pub name: AmqpString,
48    pub typ: ExchangeType,
49    pub is_passive: bool,
50    pub is_durable: bool,
51    pub is_auto_delete: bool,
52    pub is_internal: bool,
53}
54
55#[derive(Debug, Clone)]
56pub enum ExchangeType {
57    Direct,
58    Fanout,
59    Topic,
60    Headers,
61}
62
63fn name_of_type(typ: ExchangeType) -> AmqpString {
64    match typ {
65        ExchangeType::Direct => "direct".into(),
66        ExchangeType::Fanout => "fanout".into(),
67        ExchangeType::Topic => "topic".into(),
68        ExchangeType::Headers => "headers".into(),
69    }
70}