celers_protocol/
builder.rs

1//! Fluent message builder API
2//!
3//! This module provides a builder pattern for constructing Celery protocol
4//! messages with a clean, fluent API.
5//!
6//! # Example
7//!
8//! ```
9//! use celers_protocol::builder::MessageBuilder;
10//! use serde_json::json;
11//!
12//! let message = MessageBuilder::new("tasks.add")
13//!     .args(vec![json!(1), json!(2)])
14//!     .priority(5)
15//!     .queue("high-priority")
16//!     .build()
17//!     .unwrap();
18//!
19//! assert_eq!(message.task_name(), "tasks.add");
20//! ```
21
22use crate::embed::{CallbackSignature, EmbedOptions, EmbeddedBody};
23use crate::{ContentEncoding, ContentType, Message, MessageHeaders, MessageProperties};
24use chrono::{DateTime, Duration, Utc};
25use serde_json::Value;
26use std::collections::HashMap;
27use uuid::Uuid;
28
29/// Error type for message building
30#[derive(Debug, Clone)]
31pub enum BuilderError {
32    /// Task name is required
33    MissingTaskName,
34    /// Serialization failed
35    SerializationError(String),
36    /// Validation failed
37    ValidationError(String),
38}
39
40impl std::fmt::Display for BuilderError {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            BuilderError::MissingTaskName => write!(f, "Task name is required"),
44            BuilderError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
45            BuilderError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
46        }
47    }
48}
49
50impl std::error::Error for BuilderError {}
51
52impl From<crate::ValidationError> for BuilderError {
53    fn from(err: crate::ValidationError) -> Self {
54        BuilderError::ValidationError(err.to_string())
55    }
56}
57
58/// Result type for message building
59pub type BuilderResult<T> = Result<T, BuilderError>;
60
61/// Fluent builder for creating Celery messages
62#[derive(Debug, Clone)]
63pub struct MessageBuilder {
64    /// Task name
65    task: String,
66    /// Task ID (auto-generated if not set)
67    task_id: Option<Uuid>,
68    /// Positional arguments
69    args: Vec<Value>,
70    /// Keyword arguments
71    kwargs: HashMap<String, Value>,
72    /// Task priority (0-9)
73    priority: Option<u8>,
74    /// Queue name
75    queue: Option<String>,
76    /// Routing key
77    routing_key: Option<String>,
78    /// ETA (scheduled execution time)
79    eta: Option<DateTime<Utc>>,
80    /// Countdown (delay in seconds)
81    countdown: Option<i64>,
82    /// Expiration time
83    expires: Option<DateTime<Utc>>,
84    /// Maximum retries
85    max_retries: Option<u32>,
86    /// Current retry count
87    retries: Option<u32>,
88    /// Parent task ID
89    parent_id: Option<Uuid>,
90    /// Root task ID
91    root_id: Option<Uuid>,
92    /// Group ID
93    group_id: Option<Uuid>,
94    /// Callbacks (on success)
95    callbacks: Vec<CallbackSignature>,
96    /// Errbacks (on failure)
97    errbacks: Vec<CallbackSignature>,
98    /// Chain of tasks
99    chain: Vec<CallbackSignature>,
100    /// Chord callback
101    chord: Option<CallbackSignature>,
102    /// Content type
103    content_type: ContentType,
104    /// Delivery mode (persistent by default)
105    persistent: bool,
106    /// Reply-to queue
107    reply_to: Option<String>,
108    /// Extra headers
109    extra_headers: HashMap<String, Value>,
110}
111
112impl MessageBuilder {
113    /// Create a new message builder with the given task name
114    pub fn new(task: impl Into<String>) -> Self {
115        Self {
116            task: task.into(),
117            task_id: None,
118            args: Vec::new(),
119            kwargs: HashMap::new(),
120            priority: None,
121            queue: None,
122            routing_key: None,
123            eta: None,
124            countdown: None,
125            expires: None,
126            max_retries: None,
127            retries: None,
128            parent_id: None,
129            root_id: None,
130            group_id: None,
131            callbacks: Vec::new(),
132            errbacks: Vec::new(),
133            chain: Vec::new(),
134            chord: None,
135            content_type: ContentType::Json,
136            persistent: true,
137            reply_to: None,
138            extra_headers: HashMap::new(),
139        }
140    }
141
142    /// Set the task ID
143    #[must_use]
144    pub fn id(mut self, id: Uuid) -> Self {
145        self.task_id = Some(id);
146        self
147    }
148
149    /// Set positional arguments
150    #[must_use]
151    pub fn args(mut self, args: Vec<Value>) -> Self {
152        self.args = args;
153        self
154    }
155
156    /// Add a positional argument
157    #[must_use]
158    pub fn arg(mut self, arg: Value) -> Self {
159        self.args.push(arg);
160        self
161    }
162
163    /// Set keyword arguments
164    #[must_use]
165    pub fn kwargs(mut self, kwargs: HashMap<String, Value>) -> Self {
166        self.kwargs = kwargs;
167        self
168    }
169
170    /// Add a keyword argument
171    #[must_use]
172    pub fn kwarg(mut self, key: impl Into<String>, value: Value) -> Self {
173        self.kwargs.insert(key.into(), value);
174        self
175    }
176
177    /// Set task priority (0-9, higher = more urgent)
178    #[must_use]
179    pub fn priority(mut self, priority: u8) -> Self {
180        self.priority = Some(priority.min(9));
181        self
182    }
183
184    /// Set the queue name
185    #[must_use]
186    pub fn queue(mut self, queue: impl Into<String>) -> Self {
187        self.queue = Some(queue.into());
188        self
189    }
190
191    /// Set the routing key
192    #[must_use]
193    pub fn routing_key(mut self, key: impl Into<String>) -> Self {
194        self.routing_key = Some(key.into());
195        self
196    }
197
198    /// Set ETA (scheduled execution time)
199    #[must_use]
200    pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
201        self.eta = Some(eta);
202        self.countdown = None; // ETA takes precedence
203        self
204    }
205
206    /// Set countdown (delay in seconds)
207    #[must_use]
208    pub fn countdown(mut self, seconds: i64) -> Self {
209        self.countdown = Some(seconds);
210        self.eta = None; // Countdown takes precedence
211        self
212    }
213
214    /// Set expiration time
215    #[must_use]
216    pub fn expires(mut self, expires: DateTime<Utc>) -> Self {
217        self.expires = Some(expires);
218        self
219    }
220
221    /// Set expiration as duration from now
222    #[must_use]
223    pub fn expires_in(mut self, duration: Duration) -> Self {
224        self.expires = Some(Utc::now() + duration);
225        self
226    }
227
228    /// Set maximum retries
229    #[must_use]
230    pub fn max_retries(mut self, max: u32) -> Self {
231        self.max_retries = Some(max);
232        self
233    }
234
235    /// Set current retry count
236    #[must_use]
237    pub fn retries(mut self, count: u32) -> Self {
238        self.retries = Some(count);
239        self
240    }
241
242    /// Set parent task ID
243    #[must_use]
244    pub fn parent(mut self, parent_id: Uuid) -> Self {
245        self.parent_id = Some(parent_id);
246        self
247    }
248
249    /// Set root task ID
250    #[must_use]
251    pub fn root(mut self, root_id: Uuid) -> Self {
252        self.root_id = Some(root_id);
253        self
254    }
255
256    /// Set group ID
257    #[must_use]
258    pub fn group(mut self, group_id: Uuid) -> Self {
259        self.group_id = Some(group_id);
260        self
261    }
262
263    /// Add a success callback (link)
264    #[must_use]
265    pub fn link(mut self, task: impl Into<String>) -> Self {
266        self.callbacks.push(CallbackSignature::new(task));
267        self
268    }
269
270    /// Add a success callback with full signature
271    #[must_use]
272    pub fn link_signature(mut self, callback: CallbackSignature) -> Self {
273        self.callbacks.push(callback);
274        self
275    }
276
277    /// Add an error callback (errback)
278    #[must_use]
279    pub fn link_error(mut self, task: impl Into<String>) -> Self {
280        self.errbacks.push(CallbackSignature::new(task));
281        self
282    }
283
284    /// Add an error callback with full signature
285    #[must_use]
286    pub fn link_error_signature(mut self, errback: CallbackSignature) -> Self {
287        self.errbacks.push(errback);
288        self
289    }
290
291    /// Add a chain task
292    #[must_use]
293    pub fn chain_task(mut self, task: impl Into<String>) -> Self {
294        self.chain.push(CallbackSignature::new(task));
295        self
296    }
297
298    /// Set chord callback
299    #[must_use]
300    pub fn chord(mut self, callback: impl Into<String>) -> Self {
301        self.chord = Some(CallbackSignature::new(callback));
302        self
303    }
304
305    /// Set content type
306    #[must_use]
307    pub fn content_type(mut self, ct: ContentType) -> Self {
308        self.content_type = ct;
309        self
310    }
311
312    /// Set message persistence
313    #[must_use]
314    pub fn persistent(mut self, persistent: bool) -> Self {
315        self.persistent = persistent;
316        self
317    }
318
319    /// Set reply-to queue
320    #[must_use]
321    pub fn reply_to(mut self, queue: impl Into<String>) -> Self {
322        self.reply_to = Some(queue.into());
323        self
324    }
325
326    /// Add extra header
327    #[must_use]
328    pub fn header(mut self, key: impl Into<String>, value: Value) -> Self {
329        self.extra_headers.insert(key.into(), value);
330        self
331    }
332
333    /// Build the message
334    pub fn build(self) -> BuilderResult<Message> {
335        // Generate task ID if not set
336        let task_id = self.task_id.unwrap_or_else(Uuid::new_v4);
337
338        // Calculate ETA from countdown if set
339        let eta = match (self.eta, self.countdown) {
340            (Some(eta), _) => Some(eta),
341            (None, Some(seconds)) => Some(Utc::now() + Duration::seconds(seconds)),
342            _ => None,
343        };
344
345        // Build embed options
346        let mut embed = EmbedOptions::new();
347        for cb in self.callbacks {
348            embed = embed.with_callback(cb);
349        }
350        for eb in self.errbacks {
351            embed = embed.with_errback(eb);
352        }
353        for chain_task in self.chain {
354            embed = embed.with_chain_task(chain_task);
355        }
356        if let Some(chord) = self.chord {
357            embed = embed.with_chord(chord);
358        }
359        if let Some(group_id) = self.group_id {
360            embed = embed.with_group(group_id);
361        }
362        if let Some(parent_id) = self.parent_id {
363            embed = embed.with_parent(parent_id);
364        }
365        if let Some(root_id) = self.root_id {
366            embed = embed.with_root(root_id);
367        }
368
369        // Build embedded body
370        let embedded_body = EmbeddedBody::new()
371            .with_args(self.args)
372            .with_kwargs(self.kwargs)
373            .with_embed(embed);
374
375        // Serialize body
376        let body = embedded_body
377            .encode()
378            .map_err(|e| BuilderError::SerializationError(e.to_string()))?;
379
380        // Build headers
381        let mut headers = MessageHeaders::new(self.task.clone(), task_id);
382        headers.eta = eta;
383        headers.expires = self.expires;
384        headers.retries = self.retries;
385        headers.parent_id = self.parent_id;
386        headers.root_id = self.root_id;
387        headers.group = self.group_id;
388
389        // Add extra headers
390        for (key, value) in self.extra_headers {
391            headers.extra.insert(key, value);
392        }
393
394        // Build properties
395        let properties = MessageProperties {
396            priority: self.priority,
397            delivery_mode: if self.persistent { 2 } else { 1 },
398            correlation_id: Some(task_id.to_string()),
399            reply_to: self.reply_to,
400        };
401
402        // Build message
403        let message = Message {
404            headers,
405            properties,
406            body,
407            content_type: self.content_type.as_str().to_string(),
408            content_encoding: ContentEncoding::Utf8.as_str().to_string(),
409        };
410
411        Ok(message)
412    }
413
414    /// Build and validate the message
415    pub fn build_validated(self) -> BuilderResult<Message> {
416        let message = self.build()?;
417        message.validate().map_err(BuilderError::from)?;
418        Ok(message)
419    }
420}
421
422/// Create a simple task message
423pub fn task(name: impl Into<String>) -> MessageBuilder {
424    MessageBuilder::new(name)
425}
426
427/// Create a task message with args
428pub fn task_with_args(name: impl Into<String>, args: Vec<Value>) -> MessageBuilder {
429    MessageBuilder::new(name).args(args)
430}
431
432/// Create a delayed task message
433pub fn delayed_task(name: impl Into<String>, countdown_seconds: i64) -> MessageBuilder {
434    MessageBuilder::new(name).countdown(countdown_seconds)
435}
436
437/// Create a scheduled task message
438pub fn scheduled_task(name: impl Into<String>, eta: DateTime<Utc>) -> MessageBuilder {
439    MessageBuilder::new(name).eta(eta)
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use serde_json::json;
446
447    #[test]
448    fn test_basic_message_builder() {
449        let message = MessageBuilder::new("tasks.add")
450            .args(vec![json!(1), json!(2)])
451            .build()
452            .unwrap();
453
454        assert_eq!(message.task_name(), "tasks.add");
455        assert!(!message.body.is_empty());
456    }
457
458    #[test]
459    fn test_message_builder_with_id() {
460        let id = Uuid::new_v4();
461        let message = MessageBuilder::new("tasks.test").id(id).build().unwrap();
462
463        assert_eq!(message.task_id(), id);
464    }
465
466    #[test]
467    fn test_message_builder_with_priority() {
468        let message = MessageBuilder::new("tasks.test")
469            .priority(9)
470            .build()
471            .unwrap();
472
473        assert_eq!(message.properties.priority, Some(9));
474    }
475
476    #[test]
477    fn test_message_builder_with_priority_capped() {
478        let message = MessageBuilder::new("tasks.test")
479            .priority(100)
480            .build()
481            .unwrap();
482
483        assert_eq!(message.properties.priority, Some(9));
484    }
485
486    #[test]
487    fn test_message_builder_with_countdown() {
488        let message = MessageBuilder::new("tasks.test")
489            .countdown(60)
490            .build()
491            .unwrap();
492
493        assert!(message.has_eta());
494    }
495
496    #[test]
497    fn test_message_builder_with_eta() {
498        let eta = Utc::now() + Duration::hours(1);
499        let message = MessageBuilder::new("tasks.test").eta(eta).build().unwrap();
500
501        assert!(message.has_eta());
502        assert_eq!(message.headers.eta, Some(eta));
503    }
504
505    #[test]
506    fn test_message_builder_with_expires() {
507        let expires = Utc::now() + Duration::days(1);
508        let message = MessageBuilder::new("tasks.test")
509            .expires(expires)
510            .build()
511            .unwrap();
512
513        assert!(message.has_expires());
514    }
515
516    #[test]
517    fn test_message_builder_with_expires_in() {
518        let message = MessageBuilder::new("tasks.test")
519            .expires_in(Duration::hours(2))
520            .build()
521            .unwrap();
522
523        assert!(message.has_expires());
524    }
525
526    #[test]
527    fn test_message_builder_with_kwargs() {
528        let mut kwargs = HashMap::new();
529        kwargs.insert("x".to_string(), json!(10));
530
531        let message = MessageBuilder::new("tasks.test")
532            .kwargs(kwargs)
533            .kwarg("y", json!(20))
534            .build()
535            .unwrap();
536
537        assert!(!message.body.is_empty());
538    }
539
540    #[test]
541    fn test_message_builder_with_link() {
542        let message = MessageBuilder::new("tasks.first")
543            .link("tasks.second")
544            .link_error("tasks.on_error")
545            .build()
546            .unwrap();
547
548        assert!(!message.body.is_empty());
549    }
550
551    #[test]
552    fn test_message_builder_with_chain() {
553        let message = MessageBuilder::new("tasks.step1")
554            .chain_task("tasks.step2")
555            .chain_task("tasks.step3")
556            .build()
557            .unwrap();
558
559        assert!(!message.body.is_empty());
560    }
561
562    #[test]
563    fn test_message_builder_with_workflow_ids() {
564        let parent_id = Uuid::new_v4();
565        let root_id = Uuid::new_v4();
566        let group_id = Uuid::new_v4();
567
568        let message = MessageBuilder::new("tasks.test")
569            .parent(parent_id)
570            .root(root_id)
571            .group(group_id)
572            .build()
573            .unwrap();
574
575        assert_eq!(message.headers.parent_id, Some(parent_id));
576        assert_eq!(message.headers.root_id, Some(root_id));
577        assert_eq!(message.headers.group, Some(group_id));
578    }
579
580    #[test]
581    fn test_message_builder_non_persistent() {
582        let message = MessageBuilder::new("tasks.test")
583            .persistent(false)
584            .build()
585            .unwrap();
586
587        assert_eq!(message.properties.delivery_mode, 1);
588    }
589
590    #[test]
591    fn test_message_builder_with_reply_to() {
592        let message = MessageBuilder::new("tasks.test")
593            .reply_to("results-queue")
594            .build()
595            .unwrap();
596
597        assert_eq!(
598            message.properties.reply_to,
599            Some("results-queue".to_string())
600        );
601    }
602
603    #[test]
604    fn test_message_builder_with_extra_header() {
605        let message = MessageBuilder::new("tasks.test")
606            .header("custom", json!("value"))
607            .build()
608            .unwrap();
609
610        assert_eq!(message.headers.extra.get("custom"), Some(&json!("value")));
611    }
612
613    #[test]
614    fn test_task_helper() {
615        let message = task("tasks.add")
616            .arg(json!(1))
617            .arg(json!(2))
618            .build()
619            .unwrap();
620        assert_eq!(message.task_name(), "tasks.add");
621    }
622
623    #[test]
624    fn test_task_with_args_helper() {
625        let message = task_with_args("tasks.add", vec![json!(1), json!(2)])
626            .build()
627            .unwrap();
628        assert_eq!(message.task_name(), "tasks.add");
629    }
630
631    #[test]
632    fn test_delayed_task_helper() {
633        let message = delayed_task("tasks.later", 300).build().unwrap();
634        assert!(message.has_eta());
635    }
636
637    #[test]
638    fn test_scheduled_task_helper() {
639        let eta = Utc::now() + Duration::hours(1);
640        let message = scheduled_task("tasks.scheduled", eta).build().unwrap();
641        assert!(message.has_eta());
642    }
643
644    #[test]
645    fn test_build_validated() {
646        let message = MessageBuilder::new("tasks.test")
647            .args(vec![json!(1)])
648            .build_validated()
649            .unwrap();
650
651        assert_eq!(message.task_name(), "tasks.test");
652    }
653
654    #[test]
655    fn test_builder_error_display() {
656        let err = BuilderError::MissingTaskName;
657        assert_eq!(err.to_string(), "Task name is required");
658
659        let err = BuilderError::SerializationError("test".to_string());
660        assert!(err.to_string().contains("test"));
661
662        let err = BuilderError::ValidationError("invalid".to_string());
663        assert!(err.to_string().contains("invalid"));
664    }
665}