1use super::{AMQPFieldValue, AMQPFrame, FieldTable, MethodFrameArgs};
2
3#[derive(Debug, Default)]
4pub struct ConnectionStartArgs {
5 pub version_major: u8,
6 pub version_minor: u8,
7 pub capabilities: Option<FieldTable>,
8 pub properties: Option<FieldTable>,
9 pub mechanisms: String,
10 pub locales: String,
11}
12
13#[derive(Debug, Default)]
14pub struct ConnectionStartOkArgs {
15 pub capabilities: Option<FieldTable>,
16 pub properties: Option<FieldTable>,
17 pub mechanism: String,
18 pub response: String,
19 pub locale: String,
20}
21
22#[derive(Debug, Default)]
23pub struct ConnectionTuneArgs {
24 pub channel_max: u16,
25 pub frame_max: u32,
26 pub heartbeat: u16,
27}
28
29#[derive(Debug, Default)]
30pub struct ConnectionTuneOkArgs {
31 pub channel_max: u16,
32 pub frame_max: u32,
33 pub heartbeat: u16,
34}
35
36#[derive(Debug, Default)]
37pub struct ConnectionOpenArgs {
38 pub virtual_host: String,
39 pub insist: bool,
40}
41
42impl ConnectionStartArgs {
43 pub fn new() -> Self {
44 let mut capabilities = FieldTable::new();
45
46 capabilities.insert("publisher_confirms".into(), AMQPFieldValue::Bool(true));
47 capabilities.insert("exchange_exchange_bindings".into(), AMQPFieldValue::Bool(true));
48 capabilities.insert("basic.nack".into(), AMQPFieldValue::Bool(true));
49 capabilities.insert("consumer_cancel_notify".into(), AMQPFieldValue::Bool(true));
50 capabilities.insert("connection.blocked".into(), AMQPFieldValue::Bool(true));
51 capabilities.insert("consumer_priorities".into(), AMQPFieldValue::Bool(true));
52 capabilities.insert("authentication_failure_close".into(), AMQPFieldValue::Bool(true));
53 capabilities.insert("per_consumer_qos".into(), AMQPFieldValue::Bool(true));
54 capabilities.insert("direct_reply_to".into(), AMQPFieldValue::Bool(true));
55
56 let mut server_properties = FieldTable::new();
57
58 server_properties.insert(
59 "capabilities".into(),
60 AMQPFieldValue::FieldTable(Box::new(capabilities)),
61 );
62 server_properties.insert("product".into(), AMQPFieldValue::LongString("MetalMQ server".into()));
63 server_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
64
65 Self {
66 version_major: 0,
67 version_minor: 9,
68 capabilities: None,
69 properties: Some(server_properties),
70 mechanisms: "PLAIN".into(),
71 locales: "en_US".into(),
72 }
73 }
74
75 pub fn frame(self) -> AMQPFrame {
76 AMQPFrame::Method(0, super::CONNECTION_START, MethodFrameArgs::ConnectionStart(self))
77 }
78}
79
80impl ConnectionStartOkArgs {
81 pub fn new(username: &str, password: &str) -> Self {
82 let mut caps = FieldTable::new();
83
84 caps.insert("authentication_failure_close".to_string(), AMQPFieldValue::Bool(true));
85
86 let mut client_properties = FieldTable::new();
87
88 client_properties.insert("product".into(), AMQPFieldValue::LongString("metalmq-client".into()));
89 client_properties.insert("platform".into(), AMQPFieldValue::LongString("Rust".into()));
90 client_properties.insert("capabilities".into(), AMQPFieldValue::FieldTable(Box::new(caps)));
91 client_properties.insert("version".into(), AMQPFieldValue::LongString("0.1.0".into()));
93
94 let mut auth = vec![0x00];
95 auth.extend_from_slice(username.as_bytes());
96 auth.push(0x00);
97 auth.extend_from_slice(password.as_bytes());
98
99 let auth_string = String::from_utf8(auth).unwrap();
100
101 Self {
102 capabilities: None,
103 properties: Some(client_properties),
104 mechanism: "PLAIN".into(),
105 response: auth_string,
106 locale: "en_US".into(),
107 }
108 }
109
110 pub fn frame(self) -> AMQPFrame {
111 AMQPFrame::Method(0, super::CONNECTION_START_OK, MethodFrameArgs::ConnectionStartOk(self))
112 }
113}
114
115impl ConnectionOpenArgs {
116 pub fn virtual_host(mut self, virtual_host: &str) -> Self {
117 self.virtual_host = virtual_host.to_string();
118 self
119 }
120
121 pub fn frame(self) -> AMQPFrame {
122 AMQPFrame::Method(0, super::CONNECTION_OPEN, super::MethodFrameArgs::ConnectionOpen(self))
123 }
124}
125
126#[derive(Debug, Default)]
127pub struct ConnectionCloseArgs {
128 pub code: u16,
129 pub text: String,
130 pub class_id: u16,
131 pub method_id: u16,
132}
133
134pub fn connection_tune() -> AMQPFrame {
135 AMQPFrame::Method(
136 0,
137 super::CONNECTION_TUNE,
138 MethodFrameArgs::ConnectionTune(ConnectionTuneArgs {
139 channel_max: 2047,
140 frame_max: 131_072,
141 heartbeat: 60,
142 }),
143 )
144}
145
146pub fn connection_tune_ok() -> AMQPFrame {
147 AMQPFrame::Method(
148 0,
149 super::CONNECTION_TUNE_OK,
150 MethodFrameArgs::ConnectionTuneOk(ConnectionTuneOkArgs {
151 channel_max: 2047,
152 frame_max: 131_072,
153 heartbeat: 60,
154 }),
155 )
156}
157
158pub fn connection_open_ok() -> AMQPFrame {
159 AMQPFrame::Method(0, super::CONNECTION_OPEN_OK, MethodFrameArgs::ConnectionOpenOk)
160}
161
162pub fn connection_close(code: u16, text: &str, class_method: u32) -> AMQPFrame {
163 let (class_id, method_id) = super::split_class_method(class_method);
164
165 AMQPFrame::Method(
166 0,
167 super::CONNECTION_CLOSE,
168 MethodFrameArgs::ConnectionClose(ConnectionCloseArgs {
169 code,
170 text: text.into(),
171 class_id,
172 method_id,
173 }),
174 )
175}
176
177pub fn connection_close_ok() -> AMQPFrame {
178 AMQPFrame::Method(0, super::CONNECTION_CLOSE_OK, MethodFrameArgs::ConnectionCloseOk)
179}