celers_protocol/
zerocopy.rs

1//! Zero-copy deserialization for performance optimization
2//!
3//! This module provides zero-copy deserialization capabilities using `Cow` (Copy-on-Write)
4//! to avoid unnecessary data copying during deserialization. This is particularly useful
5//! for large message bodies where avoiding copies can significantly improve performance.
6//!
7//! # Examples
8//!
9//! ```
10//! use celers_protocol::zerocopy::{MessageRef, TaskArgsRef};
11//! use uuid::Uuid;
12//!
13//! // Create a zero-copy message reference
14//! let task_id = Uuid::new_v4();
15//! let body = b"{\"args\":[1,2],\"kwargs\":{}}";
16//! let msg = MessageRef::new("tasks.add", task_id, body);
17//!
18//! assert_eq!(msg.task_name(), "tasks.add");
19//! assert_eq!(msg.task_id(), task_id);
20//! ```
21
22use chrono::{DateTime, Utc};
23use serde::{Deserialize, Serialize};
24use std::borrow::Cow;
25use std::collections::HashMap;
26use uuid::Uuid;
27
28/// Zero-copy message headers
29///
30/// Uses `Cow<'a, str>` to avoid unnecessary string allocations when deserializing.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageHeadersRef<'a> {
33    /// Task name (zero-copy reference)
34    #[serde(borrow)]
35    pub task: Cow<'a, str>,
36
37    /// Task ID
38    pub id: Uuid,
39
40    /// Programming language
41    #[serde(borrow, default = "default_lang_cow")]
42    pub lang: Cow<'a, str>,
43
44    /// Root task ID
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub root_id: Option<Uuid>,
47
48    /// Parent task ID
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub parent_id: Option<Uuid>,
51
52    /// Group ID
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub group: Option<Uuid>,
55
56    /// Maximum retries
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub retries: Option<u32>,
59
60    /// ETA for delayed tasks
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub eta: Option<DateTime<Utc>>,
63
64    /// Task expiration timestamp
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub expires: Option<DateTime<Utc>>,
67
68    /// Additional custom headers
69    #[serde(flatten)]
70    pub extra: HashMap<Cow<'a, str>, serde_json::Value>,
71}
72
73fn default_lang_cow() -> Cow<'static, str> {
74    Cow::Borrowed("rust")
75}
76
77/// Zero-copy message properties
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct MessagePropertiesRef<'a> {
80    /// Correlation ID
81    #[serde(borrow, skip_serializing_if = "Option::is_none")]
82    pub correlation_id: Option<Cow<'a, str>>,
83
84    /// Reply-to queue
85    #[serde(borrow, skip_serializing_if = "Option::is_none")]
86    pub reply_to: Option<Cow<'a, str>>,
87
88    /// Delivery mode (1 = non-persistent, 2 = persistent)
89    #[serde(default = "default_delivery_mode")]
90    pub delivery_mode: u8,
91
92    /// Priority (0-9)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub priority: Option<u8>,
95}
96
97fn default_delivery_mode() -> u8 {
98    2
99}
100
101impl Default for MessagePropertiesRef<'_> {
102    fn default() -> Self {
103        Self {
104            correlation_id: None,
105            reply_to: None,
106            delivery_mode: default_delivery_mode(),
107            priority: None,
108        }
109    }
110}
111
112/// Zero-copy Celery message
113///
114/// Uses borrowed data where possible to avoid unnecessary allocations.
115/// This is particularly useful when deserializing messages from a buffer
116/// that will remain valid for the lifetime of the message.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct MessageRef<'a> {
119    /// Message headers (zero-copy)
120    #[serde(borrow)]
121    pub headers: MessageHeadersRef<'a>,
122
123    /// Message properties (zero-copy)
124    #[serde(borrow)]
125    pub properties: MessagePropertiesRef<'a>,
126
127    /// Message body (zero-copy reference to raw bytes)
128    #[serde(borrow)]
129    pub body: Cow<'a, [u8]>,
130
131    /// Content type (zero-copy)
132    #[serde(borrow, rename = "content-type")]
133    pub content_type: Cow<'a, str>,
134
135    /// Content encoding (zero-copy)
136    #[serde(borrow, rename = "content-encoding")]
137    pub content_encoding: Cow<'a, str>,
138}
139
140impl<'a> MessageRef<'a> {
141    /// Create a new zero-copy message reference
142    pub fn new(task: &'a str, id: Uuid, body: &'a [u8]) -> Self {
143        Self {
144            headers: MessageHeadersRef {
145                task: Cow::Borrowed(task),
146                id,
147                lang: default_lang_cow(),
148                root_id: None,
149                parent_id: None,
150                group: None,
151                retries: None,
152                eta: None,
153                expires: None,
154                extra: HashMap::new(),
155            },
156            properties: MessagePropertiesRef::default(),
157            body: Cow::Borrowed(body),
158            content_type: Cow::Borrowed("application/json"),
159            content_encoding: Cow::Borrowed("utf-8"),
160        }
161    }
162
163    /// Get the task ID
164    pub fn task_id(&self) -> Uuid {
165        self.headers.id
166    }
167
168    /// Get the task name
169    pub fn task_name(&self) -> &str {
170        &self.headers.task
171    }
172
173    /// Get body as slice
174    pub fn body_slice(&self) -> &[u8] {
175        &self.body
176    }
177
178    /// Check if message has ETA
179    pub fn has_eta(&self) -> bool {
180        self.headers.eta.is_some()
181    }
182
183    /// Check if message has expiration
184    pub fn has_expires(&self) -> bool {
185        self.headers.expires.is_some()
186    }
187
188    /// Check if message has parent
189    pub fn has_parent(&self) -> bool {
190        self.headers.parent_id.is_some()
191    }
192
193    /// Check if message has root
194    pub fn has_root(&self) -> bool {
195        self.headers.root_id.is_some()
196    }
197
198    /// Check if message has group
199    pub fn has_group(&self) -> bool {
200        self.headers.group.is_some()
201    }
202
203    /// Convert to owned message
204    pub fn into_owned(self) -> crate::Message {
205        crate::Message {
206            headers: crate::MessageHeaders {
207                task: self.headers.task.into_owned(),
208                id: self.headers.id,
209                lang: self.headers.lang.into_owned(),
210                root_id: self.headers.root_id,
211                parent_id: self.headers.parent_id,
212                group: self.headers.group,
213                retries: self.headers.retries,
214                eta: self.headers.eta,
215                expires: self.headers.expires,
216                extra: self
217                    .headers
218                    .extra
219                    .into_iter()
220                    .map(|(k, v)| (k.into_owned(), v))
221                    .collect(),
222            },
223            properties: crate::MessageProperties {
224                correlation_id: self.properties.correlation_id.map(|s| s.into_owned()),
225                reply_to: self.properties.reply_to.map(|s| s.into_owned()),
226                delivery_mode: self.properties.delivery_mode,
227                priority: self.properties.priority,
228            },
229            body: self.body.into_owned(),
230            content_type: self.content_type.into_owned(),
231            content_encoding: self.content_encoding.into_owned(),
232        }
233    }
234}
235
236/// Zero-copy task arguments
237///
238/// Provides zero-copy access to task arguments when deserializing.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct TaskArgsRef<'a> {
241    /// Positional arguments
242    #[serde(default)]
243    pub args: Vec<serde_json::Value>,
244
245    /// Keyword arguments (zero-copy keys)
246    #[serde(borrow, default)]
247    pub kwargs: HashMap<Cow<'a, str>, serde_json::Value>,
248}
249
250impl<'a> TaskArgsRef<'a> {
251    /// Create new task arguments reference
252    pub fn new() -> Self {
253        Self {
254            args: Vec::new(),
255            kwargs: HashMap::new(),
256        }
257    }
258
259    /// Convert to owned task arguments
260    pub fn into_owned(self) -> crate::TaskArgs {
261        crate::TaskArgs {
262            args: self.args,
263            kwargs: self
264                .kwargs
265                .into_iter()
266                .map(|(k, v)| (k.into_owned(), v))
267                .collect(),
268        }
269    }
270}
271
272impl<'a> Default for TaskArgsRef<'a> {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_message_ref_new() {
284        let task_id = Uuid::new_v4();
285        let body = b"test body";
286        let msg = MessageRef::new("tasks.test", task_id, body);
287
288        assert_eq!(msg.task_name(), "tasks.test");
289        assert_eq!(msg.task_id(), task_id);
290        assert_eq!(msg.body_slice(), body);
291    }
292
293    #[test]
294    fn test_message_ref_predicates() {
295        let task_id = Uuid::new_v4();
296        let body = b"{}";
297        let mut msg = MessageRef::new("tasks.test", task_id, body);
298
299        assert!(!msg.has_eta());
300        assert!(!msg.has_expires());
301        assert!(!msg.has_parent());
302        assert!(!msg.has_root());
303        assert!(!msg.has_group());
304
305        msg.headers.eta = Some(Utc::now());
306        msg.headers.parent_id = Some(Uuid::new_v4());
307
308        assert!(msg.has_eta());
309        assert!(msg.has_parent());
310    }
311
312    #[test]
313    fn test_message_ref_to_owned() {
314        let task_id = Uuid::new_v4();
315        let body = b"test";
316        let msg_ref = MessageRef::new("tasks.test", task_id, body);
317
318        let msg = msg_ref.into_owned();
319
320        assert_eq!(msg.headers.task, "tasks.test");
321        assert_eq!(msg.headers.id, task_id);
322        assert_eq!(msg.body, body);
323    }
324
325    #[test]
326    fn test_task_args_ref() {
327        let args = TaskArgsRef::new();
328        assert_eq!(args.args.len(), 0);
329        assert_eq!(args.kwargs.len(), 0);
330    }
331
332    #[test]
333    fn test_task_args_ref_to_owned() {
334        let mut args_ref = TaskArgsRef::new();
335        args_ref
336            .kwargs
337            .insert(Cow::Borrowed("key"), serde_json::json!("value"));
338
339        let args = args_ref.into_owned();
340        assert_eq!(args.kwargs.get("key").unwrap(), "value");
341    }
342
343    #[test]
344    fn test_zero_copy_deserialization() {
345        let json = r#"{"headers":{"task":"tasks.add","id":"550e8400-e29b-41d4-a716-446655440000","lang":"rust"},"properties":{"delivery_mode":2},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}"#;
346
347        let msg: MessageRef = serde_json::from_str(json).unwrap();
348        assert_eq!(msg.task_name(), "tasks.add");
349        assert_eq!(msg.content_type, "application/json");
350    }
351}