metalmq_client/
message.rs1use std::collections::HashMap;
2
3use metalmq_codec::frame::{AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags};
4
5use crate::ChannelNumber;
6
7#[derive(Debug, Default)]
9pub struct Content {
10 pub channel: ChannelNumber,
11 pub body: Vec<u8>,
12 pub properties: MessageProperties,
13}
14
15#[derive(Debug, Default)]
16pub struct MessageProperties {
17 pub content_type: Option<String>,
18 pub content_encoding: Option<String>,
19 pub headers: HashMap<String, String>,
20 pub delivery_mode: Option<u8>,
22 pub priority: Option<u8>,
23 pub correlation_id: Option<String>,
24 pub reply_to: Option<String>,
25 pub expiration: Option<String>,
26 pub message_id: Option<String>,
27 pub timestamp: Option<u64>,
28 pub message_type: Option<String>,
29 pub user_id: Option<String>,
30 pub app_id: Option<String>,
31}
32
33#[derive(Debug, Default)]
38pub struct DeliveredMessage {
39 pub message: Content,
40 pub consumer_tag: String,
41 pub delivery_tag: u64,
42 pub redelivered: bool,
43 pub exchange: String,
44 pub routing_key: String,
45}
46
47#[derive(Debug, Default)]
49pub struct ReturnedMessage {
50 pub message: Content,
51 pub reply_code: u16,
53 pub reply_text: String,
54 pub exchange: String,
55 pub routing_key: String,
56}
57
58#[derive(Debug, Default)]
60pub struct PublishedMessage {
61 pub message: Content,
62 pub mandatory: bool,
63 pub immediate: bool,
64}
65
66#[derive(Debug)]
68pub(crate) enum Message {
69 Delivered(DeliveredMessage),
70 Returned(ReturnedMessage),
71}
72
73pub(crate) fn to_content_frames(message: Content) -> (ContentHeaderFrame, ContentBodyFrame) {
74 let mut headers = HashMap::new();
75
76 for (k, v) in message.properties.headers {
77 headers.insert(k, AMQPFieldValue::LongString(v));
78 }
79
80 let header = ContentHeaderFrame {
82 channel: message.channel,
83 class_id: 0,
84 weight: 0,
85 body_size: message.body.len() as u64,
86 prop_flags: HeaderPropertyFlags::default(),
87 cluster_id: None,
88 app_id: message.properties.app_id,
89 user_id: message.properties.user_id,
90 message_type: message.properties.message_type,
91 timestamp: message.properties.timestamp,
92 message_id: message.properties.message_id,
93 expiration: message.properties.expiration,
94 reply_to: message.properties.reply_to,
95 correlation_id: message.properties.correlation_id,
96 priority: message.properties.priority,
97 delivery_mode: message.properties.delivery_mode,
98 headers: Some(headers),
99 content_encoding: message.properties.content_encoding,
100 content_type: message.properties.content_type,
101 };
102
103 let body = ContentBodyFrame {
104 channel: message.channel,
105 body: message.body,
106 };
107
108 (header, body)
109}
110
111impl From<ContentHeaderFrame> for MessageProperties {
112 fn from(value: ContentHeaderFrame) -> Self {
113 MessageProperties::default()
114 }
115}
116
117impl From<&str> for PublishedMessage {
118 fn from(value: &str) -> Self {
119 Self {
120 message: Content {
121 channel: 0u16,
122 body: value.as_bytes().to_vec(),
123 properties: MessageProperties::default(),
124 },
125 ..Default::default()
126 }
127 }
128}
129
130impl PublishedMessage {
131 pub fn str(mut self, value: &str) -> Self {
132 self.message.body = value.as_bytes().to_vec();
133 self
134 }
135
136 pub fn mandatory(mut self, value: bool) -> Self {
139 self.mandatory = value;
140 self
141 }
142
143 pub fn immediate(mut self, value: bool) -> Self {
146 self.immediate = value;
147 self
148 }
149}