metalmq_codec/frame/
connection.rs

1use super::{AMQPFieldValue, AMQPFrame, FieldTable, MethodFrameArgs};
2
3#[derive(Debug, Default)]
4pub struct ConnectionStartArgs {
5    pub version_major: u8,
6    pub version_minor: u8,
7    pub capabilities: Option<FieldTable>,
8    pub properties: Option<FieldTable>,
9    pub mechanisms: String,
10    pub locales: String,
11}
12
13#[derive(Debug, Default)]
14pub struct ConnectionStartOkArgs {
15    pub capabilities: Option<FieldTable>,
16    pub properties: Option<FieldTable>,
17    pub mechanism: String,
18    pub response: String,
19    pub locale: String,
20}
21
22#[derive(Debug, Default)]
23pub struct ConnectionTuneArgs {
24    pub channel_max: u16,
25    pub frame_max: u32,
26    pub heartbeat: u16,
27}
28
29#[derive(Debug, Default)]
30pub struct ConnectionTuneOkArgs {
31    pub channel_max: u16,
32    pub frame_max: u32,
33    pub heartbeat: u16,
34}
35
36#[derive(Debug, Default)]
37pub struct ConnectionOpenArgs {
38    pub virtual_host: String,
39    pub insist: bool,
40}
41
42impl ConnectionStartArgs {
43    pub fn new() -> Self {
44        let mut capabilities = FieldTable::new();
45
46        capabilities.insert("publisher_confirms".into(), AMQPFieldValue::Bool(true));
47        capabilities.insert("exchange_exchange_bindings".into(), AMQPFieldValue::Bool(true));
48        capabilities.insert("basic.nack".into(), AMQPFieldValue::Bool(true));
49        capabilities.insert("consumer_cancel_notify".into(), AMQPFieldValue::Bool(true));
50        capabilities.insert("connection.blocked".into(), AMQPFieldValue::Bool(true));
51        capabilities.insert("consumer_priorities".into(), AMQPFieldValue::Bool(true));
52        capabilities.insert("authentication_failure_close".into(), AMQPFieldValue::Bool(true));
53        capabilities.insert("per_consumer_qos".into(), AMQPFieldValue::Bool(true));
54        capabilities.insert("direct_reply_to".into(), AMQPFieldValue::Bool(true));
55
56        let mut server_properties = FieldTable::new();
57
58        server_properties.insert(
59            "capabilities".into(),
60            AMQPFieldValue::FieldTable(Box::new(capabilities)),
61        );
62        server_properties.insert("product".into(), AMQPFieldValue::LongString("MetalMQ server".into()));
63        server_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
64
65        Self {
66            version_major: 0,
67            version_minor: 9,
68            capabilities: None,
69            properties: Some(server_properties),
70            mechanisms: "PLAIN".into(),
71            locales: "en_US".into(),
72        }
73    }
74
75    pub fn frame(self) -> AMQPFrame {
76        AMQPFrame::Method(0, super::CONNECTION_START, MethodFrameArgs::ConnectionStart(self))
77    }
78}
79
80impl ConnectionStartOkArgs {
81    pub fn new(username: &str, password: &str) -> Self {
82        let mut caps = FieldTable::new();
83
84        caps.insert("authentication_failure_close".to_string(), AMQPFieldValue::Bool(true));
85
86        let mut client_properties = FieldTable::new();
87
88        client_properties.insert("product".into(), AMQPFieldValue::LongString("metalmq-client".into()));
89        client_properties.insert("platform".into(), AMQPFieldValue::LongString("Rust".into()));
90        client_properties.insert("capabilities".into(), AMQPFieldValue::FieldTable(Box::new(caps)));
91        // TODO get the version from the build vars or an external file
92        client_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
93
94        let mut auth = vec![0x00];
95        auth.extend_from_slice(username.as_bytes());
96        auth.push(0x00);
97        auth.extend_from_slice(password.as_bytes());
98
99        let auth_string = String::from_utf8(auth).unwrap();
100
101        Self {
102            capabilities: None,
103            properties: Some(client_properties),
104            mechanism: "PLAIN".into(),
105            response: auth_string,
106            locale: "en_US".into(),
107        }
108    }
109
110    pub fn frame(self) -> AMQPFrame {
111        AMQPFrame::Method(0, super::CONNECTION_START_OK, MethodFrameArgs::ConnectionStartOk(self))
112    }
113}
114
115impl ConnectionOpenArgs {
116    pub fn virtual_host(mut self, virtual_host: &str) -> Self {
117        self.virtual_host = virtual_host.to_string();
118        self
119    }
120
121    pub fn frame(self) -> AMQPFrame {
122        AMQPFrame::Method(0, super::CONNECTION_OPEN, super::MethodFrameArgs::ConnectionOpen(self))
123    }
124}
125
126#[derive(Debug, Default)]
127pub struct ConnectionCloseArgs {
128    pub code: u16,
129    pub text: String,
130    pub class_id: u16,
131    pub method_id: u16,
132}
133
134pub fn connection_tune() -> AMQPFrame {
135    AMQPFrame::Method(
136        0,
137        super::CONNECTION_TUNE,
138        MethodFrameArgs::ConnectionTune(ConnectionTuneArgs {
139            channel_max: 2047,
140            frame_max: 131_072,
141            heartbeat: 60,
142        }),
143    )
144}
145
146pub fn connection_tune_ok() -> AMQPFrame {
147    AMQPFrame::Method(
148        0,
149        super::CONNECTION_TUNE_OK,
150        MethodFrameArgs::ConnectionTuneOk(ConnectionTuneOkArgs {
151            channel_max: 2047,
152            frame_max: 131_072,
153            heartbeat: 60,
154        }),
155    )
156}
157
158pub fn connection_open_ok() -> AMQPFrame {
159    AMQPFrame::Method(0, super::CONNECTION_OPEN_OK, MethodFrameArgs::ConnectionOpenOk)
160}
161
162pub fn connection_close(code: u16, text: &str, class_method: u32) -> AMQPFrame {
163    let (class_id, method_id) = super::split_class_method(class_method);
164
165    AMQPFrame::Method(
166        0,
167        super::CONNECTION_CLOSE,
168        MethodFrameArgs::ConnectionClose(ConnectionCloseArgs {
169            code,
170            text: text.into(),
171            class_id,
172            method_id,
173        }),
174    )
175}
176
177pub fn connection_close_ok() -> AMQPFrame {
178    AMQPFrame::Method(0, super::CONNECTION_CLOSE_OK, MethodFrameArgs::ConnectionCloseOk)
179}