Skip to main content

celers_protocol/
message.rs

1//! Message implementation for Celery protocol messages.
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use crate::{
7    Message, MessageHeaders, MessageProperties, ValidationError, CONTENT_TYPE_JSON, ENCODING_UTF8,
8};
9
10impl Message {
11    /// Create a new message with JSON body
12    pub fn new(task: String, id: Uuid, body: Vec<u8>) -> Self {
13        Self {
14            headers: MessageHeaders::new(task, id),
15            properties: MessageProperties::default(),
16            body,
17            content_type: CONTENT_TYPE_JSON.to_string(),
18            content_encoding: ENCODING_UTF8.to_string(),
19        }
20    }
21
22    /// Set priority (0-9)
23    #[must_use]
24    pub fn with_priority(mut self, priority: u8) -> Self {
25        self.properties.priority = Some(priority);
26        self
27    }
28
29    /// Set parent task ID
30    #[must_use]
31    pub fn with_parent(mut self, parent_id: Uuid) -> Self {
32        self.headers.parent_id = Some(parent_id);
33        self
34    }
35
36    /// Set root task ID
37    #[must_use]
38    pub fn with_root(mut self, root_id: Uuid) -> Self {
39        self.headers.root_id = Some(root_id);
40        self
41    }
42
43    /// Set group ID
44    #[must_use]
45    pub fn with_group(mut self, group: Uuid) -> Self {
46        self.headers.group = Some(group);
47        self
48    }
49
50    /// Set ETA (delayed execution)
51    #[must_use]
52    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
53        self.headers.eta = Some(eta);
54        self
55    }
56
57    /// Set expiration
58    #[must_use]
59    pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
60        self.headers.expires = Some(expires);
61        self
62    }
63
64    /// Set retry count
65    #[must_use]
66    pub fn with_retries(mut self, retries: u32) -> Self {
67        self.headers.retries = Some(retries);
68        self
69    }
70
71    /// Set correlation ID (for RPC-style calls)
72    #[must_use]
73    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
74        self.properties.correlation_id = Some(correlation_id);
75        self
76    }
77
78    /// Set reply-to queue (for results)
79    #[must_use]
80    pub fn with_reply_to(mut self, reply_to: String) -> Self {
81        self.properties.reply_to = Some(reply_to);
82        self
83    }
84
85    /// Set delivery mode (1 = non-persistent, 2 = persistent)
86    #[must_use]
87    pub fn with_delivery_mode(mut self, mode: u8) -> Self {
88        self.properties.delivery_mode = mode;
89        self
90    }
91
92    /// Validate the complete message
93    ///
94    /// Validates:
95    /// - Headers (task name, retries, eta/expires)
96    /// - Properties (delivery mode, priority)
97    /// - Content type format
98    /// - Body size
99    pub fn validate(&self) -> Result<(), ValidationError> {
100        // Validate headers
101        self.headers.validate()?;
102
103        // Validate properties
104        self.properties.validate()?;
105
106        // Validate content type
107        if self.content_type.is_empty() {
108            return Err(ValidationError::EmptyContentType);
109        }
110
111        // Validate body
112        if self.body.is_empty() {
113            return Err(ValidationError::EmptyBody);
114        }
115
116        if self.body.len() > 10_485_760 {
117            // 10MB limit
118            return Err(ValidationError::BodyTooLarge {
119                size: self.body.len(),
120                max: 10_485_760,
121            });
122        }
123
124        Ok(())
125    }
126
127    /// Validate with custom body size limit
128    pub fn validate_with_limit(&self, max_body_bytes: usize) -> Result<(), ValidationError> {
129        self.headers.validate()?;
130        self.properties.validate()?;
131
132        if self.content_type.is_empty() {
133            return Err(ValidationError::EmptyContentType);
134        }
135
136        if self.body.is_empty() {
137            return Err(ValidationError::EmptyBody);
138        }
139
140        if self.body.len() > max_body_bytes {
141            return Err(ValidationError::BodyTooLarge {
142                size: self.body.len(),
143                max: max_body_bytes,
144            });
145        }
146
147        Ok(())
148    }
149
150    /// Check if the message has an ETA (delayed execution)
151    #[inline(always)]
152    pub fn has_eta(&self) -> bool {
153        self.headers.eta.is_some()
154    }
155
156    /// Check if the message has an expiration time
157    #[inline(always)]
158    pub fn has_expires(&self) -> bool {
159        self.headers.expires.is_some()
160    }
161
162    /// Check if the message is part of a group
163    #[inline(always)]
164    pub fn has_group(&self) -> bool {
165        self.headers.group.is_some()
166    }
167
168    /// Check if the message has a parent task
169    #[inline(always)]
170    pub fn has_parent(&self) -> bool {
171        self.headers.parent_id.is_some()
172    }
173
174    /// Check if the message has a root task
175    #[inline(always)]
176    pub fn has_root(&self) -> bool {
177        self.headers.root_id.is_some()
178    }
179
180    /// Check if the message is persistent
181    #[inline(always)]
182    pub fn is_persistent(&self) -> bool {
183        self.properties.delivery_mode == 2
184    }
185
186    /// Get the task ID
187    #[inline(always)]
188    pub fn task_id(&self) -> uuid::Uuid {
189        self.headers.id
190    }
191
192    /// Get the task name
193    #[inline(always)]
194    pub fn task_name(&self) -> &str {
195        &self.headers.task
196    }
197
198    /// Get the content type as a string slice
199    #[inline(always)]
200    pub fn content_type_str(&self) -> &str {
201        &self.content_type
202    }
203
204    /// Get the content encoding as a string slice
205    #[inline(always)]
206    pub fn content_encoding_str(&self) -> &str {
207        &self.content_encoding
208    }
209
210    /// Get the message body size in bytes
211    #[inline(always)]
212    pub fn body_size(&self) -> usize {
213        self.body.len()
214    }
215
216    /// Check if the message body is empty
217    #[inline(always)]
218    pub fn has_empty_body(&self) -> bool {
219        self.body.is_empty()
220    }
221
222    /// Get the retry count (0 if not set)
223    #[inline(always)]
224    pub fn retry_count(&self) -> u32 {
225        self.headers.retries.unwrap_or(0)
226    }
227
228    /// Get the priority (None if not set)
229    #[inline(always)]
230    pub fn priority(&self) -> Option<u8> {
231        self.properties.priority
232    }
233
234    /// Check if message has a correlation ID
235    #[inline(always)]
236    pub fn has_correlation_id(&self) -> bool {
237        self.properties.correlation_id.is_some()
238    }
239
240    /// Get the correlation ID
241    #[inline]
242    pub fn correlation_id(&self) -> Option<&str> {
243        self.properties.correlation_id.as_deref()
244    }
245
246    /// Get the reply-to queue
247    #[inline]
248    pub fn reply_to(&self) -> Option<&str> {
249        self.properties.reply_to.as_deref()
250    }
251
252    /// Check if this is a workflow message (has parent, root, or group)
253    #[inline(always)]
254    pub fn is_workflow_message(&self) -> bool {
255        self.has_parent() || self.has_root() || self.has_group()
256    }
257
258    /// Clone the message with a new task ID
259    #[must_use]
260    pub fn with_new_id(&self) -> Self {
261        let mut cloned = self.clone();
262        cloned.headers.id = Uuid::new_v4();
263        cloned
264    }
265
266    /// Create a builder from this message (for modification)
267    ///
268    /// Note: This creates a new builder with the message's metadata.
269    /// The body (args/kwargs) must be set separately on the builder.
270    pub fn to_builder(&self) -> crate::builder::MessageBuilder {
271        let mut builder = crate::builder::MessageBuilder::new(&self.headers.task);
272
273        // Set basic properties
274        builder = builder.id(self.headers.id);
275
276        // Set optional fields
277        if let Some(priority) = self.properties.priority {
278            builder = builder.priority(priority);
279        }
280        if let Some(parent_id) = self.headers.parent_id {
281            builder = builder.parent(parent_id);
282        }
283        if let Some(root_id) = self.headers.root_id {
284            builder = builder.root(root_id);
285        }
286        if let Some(group) = self.headers.group {
287            builder = builder.group(group);
288        }
289        if let Some(eta) = self.headers.eta {
290            builder = builder.eta(eta);
291        }
292        if let Some(expires) = self.headers.expires {
293            builder = builder.expires(expires);
294        }
295
296        builder
297    }
298
299    /// Check if the message is ready for immediate execution (not delayed)
300    #[inline]
301    pub fn is_ready_for_execution(&self) -> bool {
302        match self.headers.eta {
303            None => true,
304            Some(eta) => chrono::Utc::now() >= eta,
305        }
306    }
307
308    /// Check if the message has not expired yet
309    #[inline]
310    pub fn is_not_expired(&self) -> bool {
311        match self.headers.expires {
312            None => true,
313            Some(expires) => chrono::Utc::now() < expires,
314        }
315    }
316
317    /// Check if the message should be processed (not expired and ready for execution)
318    #[inline]
319    pub fn should_process(&self) -> bool {
320        self.is_ready_for_execution() && self.is_not_expired()
321    }
322
323    /// Set ETA to now + duration (builder pattern)
324    ///
325    /// # Examples
326    ///
327    /// ```
328    /// use celers_protocol::Message;
329    /// use uuid::Uuid;
330    /// use chrono::Duration;
331    ///
332    /// let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
333    ///     .with_eta_delay(Duration::minutes(5));
334    /// assert!(msg.has_eta());
335    /// ```
336    #[must_use]
337    pub fn with_eta_delay(mut self, delay: chrono::Duration) -> Self {
338        self.headers.eta = Some(chrono::Utc::now() + delay);
339        self
340    }
341
342    /// Set expiration to now + duration (builder pattern)
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use celers_protocol::Message;
348    /// use uuid::Uuid;
349    /// use chrono::Duration;
350    ///
351    /// let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
352    ///     .with_expires_in(Duration::hours(1));
353    /// assert!(msg.has_expires());
354    /// ```
355    #[must_use]
356    pub fn with_expires_in(mut self, duration: chrono::Duration) -> Self {
357        self.headers.expires = Some(chrono::Utc::now() + duration);
358        self
359    }
360
361    /// Get the time remaining until ETA (None if no ETA or already past)
362    #[inline]
363    pub fn time_until_eta(&self) -> Option<chrono::Duration> {
364        self.headers.eta.and_then(|eta| {
365            let now = chrono::Utc::now();
366            if eta > now {
367                Some(eta - now)
368            } else {
369                None
370            }
371        })
372    }
373
374    /// Get the time remaining until expiration (None if no expiration or already expired)
375    #[inline]
376    pub fn time_until_expiration(&self) -> Option<chrono::Duration> {
377        self.headers.expires.and_then(|expires| {
378            let now = chrono::Utc::now();
379            if expires > now {
380                Some(expires - now)
381            } else {
382                None
383            }
384        })
385    }
386
387    /// Increment the retry count (returns new count)
388    pub fn increment_retry(&mut self) -> u32 {
389        let new_count = self.headers.retries.unwrap_or(0) + 1;
390        self.headers.retries = Some(new_count);
391        new_count
392    }
393}