Skip to main content

foxtive_worker/
message_properties.rs

1use serde::{Deserialize, Serialize};
2
3/// Message properties containing backend-specific metadata.
4///
5/// This struct provides a standardized way to capture message properties
6/// from different backends (RabbitMQ, Redis Streams, etc.) for use in
7/// microservices architectures where you need to track message origin,
8/// routing, and other metadata.
9#[derive(Debug, Clone, Serialize, Deserialize, Default)]
10pub struct MessageProperties {
11    /// Content type of the message payload (e.g., "application/json")
12    #[serde(skip_serializing_if = "Option::is_none")]
13    pub content_type: Option<String>,
14
15    /// Content encoding (e.g., "utf-8", "gzip")
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub content_encoding: Option<String>,
18
19    /// Message priority (if supported by backend)
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub priority: Option<u8>,
22
23    /// Time-to-live or expiration duration in milliseconds
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub expiration: Option<u64>,
26
27    /// Message type identifier for routing/dispatching
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub message_type: Option<String>,
30
31    /// User ID associated with the message
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub user_id: Option<String>,
34
35    /// Application ID that sent the message
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub app_id: Option<String>,
38
39    /// Cluster ID (for federated messaging systems)
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub cluster_id: Option<String>,
42
43    /// Reply-to address for response messages
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub reply_to: Option<String>,
46
47    /// Custom headers/properties specific to the backend
48    /// Key-value pairs for additional metadata
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub headers: Option<std::collections::HashMap<String, String>>,
51}
52
53impl MessageProperties {
54    /// Create empty message properties
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Set content type
60    pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
61        self.content_type = Some(content_type.into());
62        self
63    }
64
65    /// Set content encoding
66    pub fn with_content_encoding(mut self, encoding: impl Into<String>) -> Self {
67        self.content_encoding = Some(encoding.into());
68        self
69    }
70
71    /// Set message priority
72    pub fn with_priority(mut self, priority: u8) -> Self {
73        self.priority = Some(priority);
74        self
75    }
76
77    /// Set expiration/TTL in milliseconds
78    pub fn with_expiration(mut self, expiration_ms: u64) -> Self {
79        self.expiration = Some(expiration_ms);
80        self
81    }
82
83    /// Set message type
84    pub fn with_message_type(mut self, message_type: impl Into<String>) -> Self {
85        self.message_type = Some(message_type.into());
86        self
87    }
88
89    /// Set user ID
90    pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
91        self.user_id = Some(user_id.into());
92        self
93    }
94
95    /// Set application ID
96    pub fn with_app_id(mut self, app_id: impl Into<String>) -> Self {
97        self.app_id = Some(app_id.into());
98        self
99    }
100
101    /// Set cluster ID
102    pub fn with_cluster_id(mut self, cluster_id: impl Into<String>) -> Self {
103        self.cluster_id = Some(cluster_id.into());
104        self
105    }
106
107    /// Set reply-to address
108    pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
109        self.reply_to = Some(reply_to.into());
110        self
111    }
112
113    /// Add a custom header
114    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
115        if self.headers.is_none() {
116            self.headers = Some(std::collections::HashMap::new());
117        }
118        if let Some(headers) = &mut self.headers {
119            headers.insert(key.into(), value.into());
120        }
121        self
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::MessageMetadata;
129
130    #[tokio::test]
131    async fn test_message_properties_builder() {
132        let props = MessageProperties::new()
133            .with_content_type("application/json")
134            .with_content_encoding("utf-8")
135            .with_priority(5)
136            .with_message_type("user.created")
137            .with_app_id("user-service")
138            .with_header("service_name", "user-service")
139            .with_header("correlation_id", "abc-123");
140
141        assert_eq!(props.content_type, Some("application/json".to_string()));
142        assert_eq!(props.content_encoding, Some("utf-8".to_string()));
143        assert_eq!(props.priority, Some(5));
144        assert_eq!(props.message_type, Some("user.created".to_string()));
145        assert_eq!(props.app_id, Some("user-service".to_string()));
146
147        let headers = props.headers.unwrap();
148        assert_eq!(
149            headers.get("service_name"),
150            Some(&"user-service".to_string())
151        );
152        assert_eq!(headers.get("correlation_id"), Some(&"abc-123".to_string()));
153    }
154
155    #[tokio::test]
156    async fn test_metadata_with_properties() {
157        let props = MessageProperties::new()
158            .with_content_type("application/json")
159            .with_app_id("test-service");
160
161        let metadata = MessageMetadata::new("test-queue")
162            .with_correlation_id("corr-456")
163            .with_properties(props);
164
165        assert_eq!(metadata.correlation_id, Some("corr-456".to_string()));
166        assert!(metadata.properties.is_some());
167
168        let props = metadata.properties.unwrap();
169        assert_eq!(props.content_type, Some("application/json".to_string()));
170        assert_eq!(props.app_id, Some("test-service".to_string()));
171    }
172
173    #[tokio::test]
174    async fn test_empty_properties() {
175        let props = MessageProperties::new();
176        assert!(props.content_type.is_none());
177        assert!(props.content_encoding.is_none());
178        assert!(props.priority.is_none());
179        assert!(props.expiration.is_none());
180        assert!(props.message_type.is_none());
181        assert!(props.user_id.is_none());
182        assert!(props.app_id.is_none());
183        assert!(props.cluster_id.is_none());
184        assert!(props.reply_to.is_none());
185        assert!(props.headers.is_none());
186    }
187
188    #[tokio::test]
189    async fn test_multiple_headers() {
190        let props = MessageProperties::new()
191            .with_header("key1", "value1")
192            .with_header("key2", "value2")
193            .with_header("key3", "value3");
194
195        let headers = props.headers.unwrap();
196        assert_eq!(headers.len(), 3);
197        assert_eq!(headers.get("key1"), Some(&"value1".to_string()));
198        assert_eq!(headers.get("key2"), Some(&"value2".to_string()));
199        assert_eq!(headers.get("key3"), Some(&"value3".to_string()));
200    }
201
202    #[tokio::test]
203    async fn test_header_overwrite() {
204        let props = MessageProperties::new()
205            .with_header("key", "value1")
206            .with_header("key", "value2");
207
208        let headers = props.headers.unwrap();
209        assert_eq!(headers.len(), 1);
210        assert_eq!(headers.get("key"), Some(&"value2".to_string()));
211    }
212
213    #[tokio::test]
214    async fn test_all_standard_fields() {
215        let props = MessageProperties::new()
216            .with_content_type("text/plain")
217            .with_content_encoding("gzip")
218            .with_priority(10)
219            .with_expiration(60000)
220            .with_message_type("order.created")
221            .with_user_id("user-123")
222            .with_app_id("order-service")
223            .with_cluster_id("cluster-east")
224            .with_reply_to("reply-queue");
225
226        assert_eq!(props.content_type, Some("text/plain".to_string()));
227        assert_eq!(props.content_encoding, Some("gzip".to_string()));
228        assert_eq!(props.priority, Some(10));
229        assert_eq!(props.expiration, Some(60000));
230        assert_eq!(props.message_type, Some("order.created".to_string()));
231        assert_eq!(props.user_id, Some("user-123".to_string()));
232        assert_eq!(props.app_id, Some("order-service".to_string()));
233        assert_eq!(props.cluster_id, Some("cluster-east".to_string()));
234        assert_eq!(props.reply_to, Some("reply-queue".to_string()));
235    }
236
237    #[tokio::test]
238    async fn test_serialization_deserialization() {
239        let props = MessageProperties::new()
240            .with_content_type("application/json")
241            .with_app_id("test-service")
242            .with_header("correlation_id", "abc-123");
243
244        // Serialize to JSON
245        let json = serde_json::to_string(&props).unwrap();
246
247        // Deserialize back
248        let deserialized: MessageProperties = serde_json::from_str(&json).unwrap();
249
250        assert_eq!(deserialized.content_type, props.content_type);
251        assert_eq!(deserialized.app_id, props.app_id);
252        assert_eq!(deserialized.headers, props.headers);
253    }
254
255    #[tokio::test]
256    async fn test_serialization_skips_none_fields() {
257        let props = MessageProperties::new().with_app_id("test");
258        let json = serde_json::to_string(&props).unwrap();
259
260        // Verify that None fields are not in JSON
261        assert!(json.contains("app_id"));
262        assert!(!json.contains("content_type"));
263        assert!(!json.contains("priority"));
264    }
265
266    #[tokio::test]
267    async fn test_clone_properties() {
268        let props = MessageProperties::new()
269            .with_content_type("application/json")
270            .with_header("key", "value");
271
272        let cloned = props.clone();
273        assert_eq!(cloned.content_type, props.content_type);
274        assert_eq!(cloned.headers, props.headers);
275    }
276
277    #[tokio::test]
278    async fn test_priority_bounds() {
279        // Test minimum priority
280        let props_min = MessageProperties::new().with_priority(0);
281        assert_eq!(props_min.priority, Some(0));
282
283        // Test maximum priority (u8 max)
284        let props_max = MessageProperties::new().with_priority(255);
285        assert_eq!(props_max.priority, Some(255));
286    }
287
288    #[tokio::test]
289    async fn test_expiration_values() {
290        // Test zero expiration
291        let props_zero = MessageProperties::new().with_expiration(0);
292        assert_eq!(props_zero.expiration, Some(0));
293
294        // Test large expiration (24 hours in ms)
295        let props_large = MessageProperties::new().with_expiration(86400000);
296        assert_eq!(props_large.expiration, Some(86400000));
297    }
298
299    #[tokio::test]
300    async fn test_unicode_in_headers() {
301        let props = MessageProperties::new()
302            .with_header("unicode_key", "日本語")
303            .with_header("emoji", "🚀");
304
305        let headers = props.headers.unwrap();
306        assert_eq!(headers.get("unicode_key"), Some(&"日本語".to_string()));
307        assert_eq!(headers.get("emoji"), Some(&"🚀".to_string()));
308    }
309
310    #[tokio::test]
311    async fn test_empty_strings() {
312        let props = MessageProperties::new()
313            .with_content_type("")
314            .with_app_id("")
315            .with_header("empty_key", "");
316
317        assert_eq!(props.content_type, Some("".to_string()));
318        assert_eq!(props.app_id, Some("".to_string()));
319
320        let headers = props.headers.unwrap();
321        assert_eq!(headers.get("empty_key"), Some(&"".to_string()));
322    }
323
324    #[tokio::test]
325    async fn test_chaining_order_independence() {
326        // Builder should work regardless of chaining order
327        let props1 = MessageProperties::new()
328            .with_content_type("json")
329            .with_app_id("app1");
330
331        let props2 = MessageProperties::new()
332            .with_app_id("app1")
333            .with_content_type("json");
334
335        assert_eq!(props1.content_type, props2.content_type);
336        assert_eq!(props1.app_id, props2.app_id);
337    }
338}