metalmq_client/
message.rs

1use std::collections::HashMap;
2
3use metalmq_codec::frame::{AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags};
4
5use crate::ChannelNumber;
6
7/// A message sent to the server or received from the server.
8#[derive(Debug, Default)]
9pub struct Content {
10    pub channel: ChannelNumber,
11    pub body: Vec<u8>,
12    pub properties: MessageProperties,
13}
14
15#[derive(Debug, Default)]
16pub struct MessageProperties {
17    pub content_type: Option<String>,
18    pub content_encoding: Option<String>,
19    pub headers: HashMap<String, String>,
20    /// 1 - non-persistent, 2 - persistent
21    pub delivery_mode: Option<u8>,
22    pub priority: Option<u8>,
23    pub correlation_id: Option<String>,
24    pub reply_to: Option<String>,
25    pub expiration: Option<String>,
26    pub message_id: Option<String>,
27    pub timestamp: Option<u64>,
28    pub message_type: Option<String>,
29    pub user_id: Option<String>,
30    pub app_id: Option<String>,
31}
32
33/// A delivered message.
34///
35/// With the `consumer_tag` and `delivery_tag` a client can send back acknowledgements to the
36/// server, saying that the message was successfully arrived.
37#[derive(Debug, Default)]
38pub struct DeliveredMessage {
39    pub message: Content,
40    pub consumer_tag: String,
41    pub delivery_tag: u64,
42    pub redelivered: bool,
43    pub exchange: String,
44    pub routing_key: String,
45}
46
47/// A message returned to the client.
48#[derive(Debug, Default)]
49pub struct ReturnedMessage {
50    pub message: Content,
51    // TODO use enums here
52    pub reply_code: u16,
53    pub reply_text: String,
54    pub exchange: String,
55    pub routing_key: String,
56}
57
58/// A message published by the client.
59#[derive(Debug, Default)]
60pub struct PublishedMessage {
61    pub message: Content,
62    pub mandatory: bool,
63    pub immediate: bool,
64}
65
66/// Internally it is comfortable to handle delivered or returned message in the same variable.
67#[derive(Debug)]
68pub(crate) enum Message {
69    Delivered(DeliveredMessage),
70    Returned(ReturnedMessage),
71}
72
73pub(crate) fn to_content_frames(message: Content) -> (ContentHeaderFrame, ContentBodyFrame) {
74    let mut headers = HashMap::new();
75
76    for (k, v) in message.properties.headers {
77        headers.insert(k, AMQPFieldValue::LongString(v));
78    }
79
80    // FIXME set header property flags according to the values
81    let header = ContentHeaderFrame {
82        channel: message.channel,
83        class_id: 0,
84        weight: 0,
85        body_size: message.body.len() as u64,
86        prop_flags: HeaderPropertyFlags::default(),
87        cluster_id: None,
88        app_id: message.properties.app_id,
89        user_id: message.properties.user_id,
90        message_type: message.properties.message_type,
91        timestamp: message.properties.timestamp,
92        message_id: message.properties.message_id,
93        expiration: message.properties.expiration,
94        reply_to: message.properties.reply_to,
95        correlation_id: message.properties.correlation_id,
96        priority: message.properties.priority,
97        delivery_mode: message.properties.delivery_mode,
98        headers: Some(headers),
99        content_encoding: message.properties.content_encoding,
100        content_type: message.properties.content_type,
101    };
102
103    let body = ContentBodyFrame {
104        channel: message.channel,
105        body: message.body,
106    };
107
108    (header, body)
109}
110
111impl From<ContentHeaderFrame> for MessageProperties {
112    fn from(value: ContentHeaderFrame) -> Self {
113        MessageProperties::default()
114    }
115}
116
117impl From<&str> for PublishedMessage {
118    fn from(value: &str) -> Self {
119        Self {
120            message: Content {
121                channel: 0u16,
122                body: value.as_bytes().to_vec(),
123                properties: MessageProperties::default(),
124            },
125            ..Default::default()
126        }
127    }
128}
129
130impl PublishedMessage {
131    pub fn str(mut self, value: &str) -> Self {
132        self.message.body = value.as_bytes().to_vec();
133        self
134    }
135
136    /// Condition for mandatory publishing. Mandatory messages are failed if the exchange doesn't have
137    /// bound queue or if the routing keys are not matched.
138    pub fn mandatory(mut self, value: bool) -> Self {
139        self.mandatory = value;
140        self
141    }
142
143    /// Condition for immediate publishing. Immediate messages are received by a server successfully if
144    /// they managed to be sent to a consumer immediately.
145    pub fn immediate(mut self, value: bool) -> Self {
146        self.immediate = value;
147        self
148    }
149}