1use chrono::{DateTime, Utc};
23use serde::{Deserialize, Serialize};
24use std::borrow::Cow;
25use std::collections::HashMap;
26use uuid::Uuid;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageHeadersRef<'a> {
33 #[serde(borrow)]
35 pub task: Cow<'a, str>,
36
37 pub id: Uuid,
39
40 #[serde(borrow, default = "default_lang_cow")]
42 pub lang: Cow<'a, str>,
43
44 #[serde(skip_serializing_if = "Option::is_none")]
46 pub root_id: Option<Uuid>,
47
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub parent_id: Option<Uuid>,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
54 pub group: Option<Uuid>,
55
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub retries: Option<u32>,
59
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub eta: Option<DateTime<Utc>>,
63
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub expires: Option<DateTime<Utc>>,
67
68 #[serde(flatten)]
70 pub extra: HashMap<Cow<'a, str>, serde_json::Value>,
71}
72
73fn default_lang_cow() -> Cow<'static, str> {
74 Cow::Borrowed("rust")
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct MessagePropertiesRef<'a> {
80 #[serde(borrow, skip_serializing_if = "Option::is_none")]
82 pub correlation_id: Option<Cow<'a, str>>,
83
84 #[serde(borrow, skip_serializing_if = "Option::is_none")]
86 pub reply_to: Option<Cow<'a, str>>,
87
88 #[serde(default = "default_delivery_mode")]
90 pub delivery_mode: u8,
91
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub priority: Option<u8>,
95}
96
97fn default_delivery_mode() -> u8 {
98 2
99}
100
101impl Default for MessagePropertiesRef<'_> {
102 fn default() -> Self {
103 Self {
104 correlation_id: None,
105 reply_to: None,
106 delivery_mode: default_delivery_mode(),
107 priority: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct MessageRef<'a> {
119 #[serde(borrow)]
121 pub headers: MessageHeadersRef<'a>,
122
123 #[serde(borrow)]
125 pub properties: MessagePropertiesRef<'a>,
126
127 #[serde(borrow)]
129 pub body: Cow<'a, [u8]>,
130
131 #[serde(borrow, rename = "content-type")]
133 pub content_type: Cow<'a, str>,
134
135 #[serde(borrow, rename = "content-encoding")]
137 pub content_encoding: Cow<'a, str>,
138}
139
140impl<'a> MessageRef<'a> {
141 pub fn new(task: &'a str, id: Uuid, body: &'a [u8]) -> Self {
143 Self {
144 headers: MessageHeadersRef {
145 task: Cow::Borrowed(task),
146 id,
147 lang: default_lang_cow(),
148 root_id: None,
149 parent_id: None,
150 group: None,
151 retries: None,
152 eta: None,
153 expires: None,
154 extra: HashMap::new(),
155 },
156 properties: MessagePropertiesRef::default(),
157 body: Cow::Borrowed(body),
158 content_type: Cow::Borrowed("application/json"),
159 content_encoding: Cow::Borrowed("utf-8"),
160 }
161 }
162
163 pub fn task_id(&self) -> Uuid {
165 self.headers.id
166 }
167
168 pub fn task_name(&self) -> &str {
170 &self.headers.task
171 }
172
173 pub fn body_slice(&self) -> &[u8] {
175 &self.body
176 }
177
178 pub fn has_eta(&self) -> bool {
180 self.headers.eta.is_some()
181 }
182
183 pub fn has_expires(&self) -> bool {
185 self.headers.expires.is_some()
186 }
187
188 pub fn has_parent(&self) -> bool {
190 self.headers.parent_id.is_some()
191 }
192
193 pub fn has_root(&self) -> bool {
195 self.headers.root_id.is_some()
196 }
197
198 pub fn has_group(&self) -> bool {
200 self.headers.group.is_some()
201 }
202
203 pub fn into_owned(self) -> crate::Message {
205 crate::Message {
206 headers: crate::MessageHeaders {
207 task: self.headers.task.into_owned(),
208 id: self.headers.id,
209 lang: self.headers.lang.into_owned(),
210 root_id: self.headers.root_id,
211 parent_id: self.headers.parent_id,
212 group: self.headers.group,
213 retries: self.headers.retries,
214 eta: self.headers.eta,
215 expires: self.headers.expires,
216 extra: self
217 .headers
218 .extra
219 .into_iter()
220 .map(|(k, v)| (k.into_owned(), v))
221 .collect(),
222 },
223 properties: crate::MessageProperties {
224 correlation_id: self.properties.correlation_id.map(|s| s.into_owned()),
225 reply_to: self.properties.reply_to.map(|s| s.into_owned()),
226 delivery_mode: self.properties.delivery_mode,
227 priority: self.properties.priority,
228 },
229 body: self.body.into_owned(),
230 content_type: self.content_type.into_owned(),
231 content_encoding: self.content_encoding.into_owned(),
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct TaskArgsRef<'a> {
241 #[serde(default)]
243 pub args: Vec<serde_json::Value>,
244
245 #[serde(borrow, default)]
247 pub kwargs: HashMap<Cow<'a, str>, serde_json::Value>,
248}
249
250impl<'a> TaskArgsRef<'a> {
251 pub fn new() -> Self {
253 Self {
254 args: Vec::new(),
255 kwargs: HashMap::new(),
256 }
257 }
258
259 pub fn into_owned(self) -> crate::TaskArgs {
261 crate::TaskArgs {
262 args: self.args,
263 kwargs: self
264 .kwargs
265 .into_iter()
266 .map(|(k, v)| (k.into_owned(), v))
267 .collect(),
268 }
269 }
270}
271
272impl<'a> Default for TaskArgsRef<'a> {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_message_ref_new() {
284 let task_id = Uuid::new_v4();
285 let body = b"test body";
286 let msg = MessageRef::new("tasks.test", task_id, body);
287
288 assert_eq!(msg.task_name(), "tasks.test");
289 assert_eq!(msg.task_id(), task_id);
290 assert_eq!(msg.body_slice(), body);
291 }
292
293 #[test]
294 fn test_message_ref_predicates() {
295 let task_id = Uuid::new_v4();
296 let body = b"{}";
297 let mut msg = MessageRef::new("tasks.test", task_id, body);
298
299 assert!(!msg.has_eta());
300 assert!(!msg.has_expires());
301 assert!(!msg.has_parent());
302 assert!(!msg.has_root());
303 assert!(!msg.has_group());
304
305 msg.headers.eta = Some(Utc::now());
306 msg.headers.parent_id = Some(Uuid::new_v4());
307
308 assert!(msg.has_eta());
309 assert!(msg.has_parent());
310 }
311
312 #[test]
313 fn test_message_ref_to_owned() {
314 let task_id = Uuid::new_v4();
315 let body = b"test";
316 let msg_ref = MessageRef::new("tasks.test", task_id, body);
317
318 let msg = msg_ref.into_owned();
319
320 assert_eq!(msg.headers.task, "tasks.test");
321 assert_eq!(msg.headers.id, task_id);
322 assert_eq!(msg.body, body);
323 }
324
325 #[test]
326 fn test_task_args_ref() {
327 let args = TaskArgsRef::new();
328 assert_eq!(args.args.len(), 0);
329 assert_eq!(args.kwargs.len(), 0);
330 }
331
332 #[test]
333 fn test_task_args_ref_to_owned() {
334 let mut args_ref = TaskArgsRef::new();
335 args_ref
336 .kwargs
337 .insert(Cow::Borrowed("key"), serde_json::json!("value"));
338
339 let args = args_ref.into_owned();
340 assert_eq!(args.kwargs.get("key").unwrap(), "value");
341 }
342
343 #[test]
344 fn test_zero_copy_deserialization() {
345 let json = r#"{"headers":{"task":"tasks.add","id":"550e8400-e29b-41d4-a716-446655440000","lang":"rust"},"properties":{"delivery_mode":2},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}"#;
346
347 let msg: MessageRef = serde_json::from_str(json).unwrap();
348 assert_eq!(msg.task_name(), "tasks.add");
349 assert_eq!(msg.content_type, "application/json");
350 }
351}