celers_protocol/
message.rs1use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use crate::{
7 Message, MessageHeaders, MessageProperties, ValidationError, CONTENT_TYPE_JSON, ENCODING_UTF8,
8};
9
10impl Message {
11 pub fn new(task: String, id: Uuid, body: Vec<u8>) -> Self {
13 Self {
14 headers: MessageHeaders::new(task, id),
15 properties: MessageProperties::default(),
16 body,
17 content_type: CONTENT_TYPE_JSON.to_string(),
18 content_encoding: ENCODING_UTF8.to_string(),
19 }
20 }
21
22 #[must_use]
24 pub fn with_priority(mut self, priority: u8) -> Self {
25 self.properties.priority = Some(priority);
26 self
27 }
28
29 #[must_use]
31 pub fn with_parent(mut self, parent_id: Uuid) -> Self {
32 self.headers.parent_id = Some(parent_id);
33 self
34 }
35
36 #[must_use]
38 pub fn with_root(mut self, root_id: Uuid) -> Self {
39 self.headers.root_id = Some(root_id);
40 self
41 }
42
43 #[must_use]
45 pub fn with_group(mut self, group: Uuid) -> Self {
46 self.headers.group = Some(group);
47 self
48 }
49
50 #[must_use]
52 pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
53 self.headers.eta = Some(eta);
54 self
55 }
56
57 #[must_use]
59 pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
60 self.headers.expires = Some(expires);
61 self
62 }
63
64 #[must_use]
66 pub fn with_retries(mut self, retries: u32) -> Self {
67 self.headers.retries = Some(retries);
68 self
69 }
70
71 #[must_use]
73 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
74 self.properties.correlation_id = Some(correlation_id);
75 self
76 }
77
78 #[must_use]
80 pub fn with_reply_to(mut self, reply_to: String) -> Self {
81 self.properties.reply_to = Some(reply_to);
82 self
83 }
84
85 #[must_use]
87 pub fn with_delivery_mode(mut self, mode: u8) -> Self {
88 self.properties.delivery_mode = mode;
89 self
90 }
91
92 pub fn validate(&self) -> Result<(), ValidationError> {
100 self.headers.validate()?;
102
103 self.properties.validate()?;
105
106 if self.content_type.is_empty() {
108 return Err(ValidationError::EmptyContentType);
109 }
110
111 if self.body.is_empty() {
113 return Err(ValidationError::EmptyBody);
114 }
115
116 if self.body.len() > 10_485_760 {
117 return Err(ValidationError::BodyTooLarge {
119 size: self.body.len(),
120 max: 10_485_760,
121 });
122 }
123
124 Ok(())
125 }
126
127 pub fn validate_with_limit(&self, max_body_bytes: usize) -> Result<(), ValidationError> {
129 self.headers.validate()?;
130 self.properties.validate()?;
131
132 if self.content_type.is_empty() {
133 return Err(ValidationError::EmptyContentType);
134 }
135
136 if self.body.is_empty() {
137 return Err(ValidationError::EmptyBody);
138 }
139
140 if self.body.len() > max_body_bytes {
141 return Err(ValidationError::BodyTooLarge {
142 size: self.body.len(),
143 max: max_body_bytes,
144 });
145 }
146
147 Ok(())
148 }
149
150 #[inline(always)]
152 pub fn has_eta(&self) -> bool {
153 self.headers.eta.is_some()
154 }
155
156 #[inline(always)]
158 pub fn has_expires(&self) -> bool {
159 self.headers.expires.is_some()
160 }
161
162 #[inline(always)]
164 pub fn has_group(&self) -> bool {
165 self.headers.group.is_some()
166 }
167
168 #[inline(always)]
170 pub fn has_parent(&self) -> bool {
171 self.headers.parent_id.is_some()
172 }
173
174 #[inline(always)]
176 pub fn has_root(&self) -> bool {
177 self.headers.root_id.is_some()
178 }
179
180 #[inline(always)]
182 pub fn is_persistent(&self) -> bool {
183 self.properties.delivery_mode == 2
184 }
185
186 #[inline(always)]
188 pub fn task_id(&self) -> uuid::Uuid {
189 self.headers.id
190 }
191
192 #[inline(always)]
194 pub fn task_name(&self) -> &str {
195 &self.headers.task
196 }
197
198 #[inline(always)]
200 pub fn content_type_str(&self) -> &str {
201 &self.content_type
202 }
203
204 #[inline(always)]
206 pub fn content_encoding_str(&self) -> &str {
207 &self.content_encoding
208 }
209
210 #[inline(always)]
212 pub fn body_size(&self) -> usize {
213 self.body.len()
214 }
215
216 #[inline(always)]
218 pub fn has_empty_body(&self) -> bool {
219 self.body.is_empty()
220 }
221
222 #[inline(always)]
224 pub fn retry_count(&self) -> u32 {
225 self.headers.retries.unwrap_or(0)
226 }
227
228 #[inline(always)]
230 pub fn priority(&self) -> Option<u8> {
231 self.properties.priority
232 }
233
234 #[inline(always)]
236 pub fn has_correlation_id(&self) -> bool {
237 self.properties.correlation_id.is_some()
238 }
239
240 #[inline]
242 pub fn correlation_id(&self) -> Option<&str> {
243 self.properties.correlation_id.as_deref()
244 }
245
246 #[inline]
248 pub fn reply_to(&self) -> Option<&str> {
249 self.properties.reply_to.as_deref()
250 }
251
252 #[inline(always)]
254 pub fn is_workflow_message(&self) -> bool {
255 self.has_parent() || self.has_root() || self.has_group()
256 }
257
258 #[must_use]
260 pub fn with_new_id(&self) -> Self {
261 let mut cloned = self.clone();
262 cloned.headers.id = Uuid::new_v4();
263 cloned
264 }
265
266 pub fn to_builder(&self) -> crate::builder::MessageBuilder {
271 let mut builder = crate::builder::MessageBuilder::new(&self.headers.task);
272
273 builder = builder.id(self.headers.id);
275
276 if let Some(priority) = self.properties.priority {
278 builder = builder.priority(priority);
279 }
280 if let Some(parent_id) = self.headers.parent_id {
281 builder = builder.parent(parent_id);
282 }
283 if let Some(root_id) = self.headers.root_id {
284 builder = builder.root(root_id);
285 }
286 if let Some(group) = self.headers.group {
287 builder = builder.group(group);
288 }
289 if let Some(eta) = self.headers.eta {
290 builder = builder.eta(eta);
291 }
292 if let Some(expires) = self.headers.expires {
293 builder = builder.expires(expires);
294 }
295
296 builder
297 }
298
299 #[inline]
301 pub fn is_ready_for_execution(&self) -> bool {
302 match self.headers.eta {
303 None => true,
304 Some(eta) => chrono::Utc::now() >= eta,
305 }
306 }
307
308 #[inline]
310 pub fn is_not_expired(&self) -> bool {
311 match self.headers.expires {
312 None => true,
313 Some(expires) => chrono::Utc::now() < expires,
314 }
315 }
316
317 #[inline]
319 pub fn should_process(&self) -> bool {
320 self.is_ready_for_execution() && self.is_not_expired()
321 }
322
323 #[must_use]
337 pub fn with_eta_delay(mut self, delay: chrono::Duration) -> Self {
338 self.headers.eta = Some(chrono::Utc::now() + delay);
339 self
340 }
341
342 #[must_use]
356 pub fn with_expires_in(mut self, duration: chrono::Duration) -> Self {
357 self.headers.expires = Some(chrono::Utc::now() + duration);
358 self
359 }
360
361 #[inline]
363 pub fn time_until_eta(&self) -> Option<chrono::Duration> {
364 self.headers.eta.and_then(|eta| {
365 let now = chrono::Utc::now();
366 if eta > now {
367 Some(eta - now)
368 } else {
369 None
370 }
371 })
372 }
373
374 #[inline]
376 pub fn time_until_expiration(&self) -> Option<chrono::Duration> {
377 self.headers.expires.and_then(|expires| {
378 let now = chrono::Utc::now();
379 if expires > now {
380 Some(expires - now)
381 } else {
382 None
383 }
384 })
385 }
386
387 pub fn increment_retry(&mut self) -> u32 {
389 let new_count = self.headers.retries.unwrap_or(0) + 1;
390 self.headers.retries = Some(new_count);
391 new_count
392 }
393}