1use crate::embed::{CallbackSignature, EmbedOptions, EmbeddedBody};
23use crate::{ContentEncoding, ContentType, Message, MessageHeaders, MessageProperties};
24use chrono::{DateTime, Duration, Utc};
25use serde_json::Value;
26use std::collections::HashMap;
27use uuid::Uuid;
28
29#[derive(Debug, Clone)]
31pub enum BuilderError {
32 MissingTaskName,
34 SerializationError(String),
36 ValidationError(String),
38}
39
40impl std::fmt::Display for BuilderError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 BuilderError::MissingTaskName => write!(f, "Task name is required"),
44 BuilderError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
45 BuilderError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
46 }
47 }
48}
49
50impl std::error::Error for BuilderError {}
51
52impl From<crate::ValidationError> for BuilderError {
53 fn from(err: crate::ValidationError) -> Self {
54 BuilderError::ValidationError(err.to_string())
55 }
56}
57
58pub type BuilderResult<T> = Result<T, BuilderError>;
60
61#[derive(Debug, Clone)]
63pub struct MessageBuilder {
64 task: String,
66 task_id: Option<Uuid>,
68 args: Vec<Value>,
70 kwargs: HashMap<String, Value>,
72 priority: Option<u8>,
74 queue: Option<String>,
76 routing_key: Option<String>,
78 eta: Option<DateTime<Utc>>,
80 countdown: Option<i64>,
82 expires: Option<DateTime<Utc>>,
84 max_retries: Option<u32>,
86 retries: Option<u32>,
88 parent_id: Option<Uuid>,
90 root_id: Option<Uuid>,
92 group_id: Option<Uuid>,
94 callbacks: Vec<CallbackSignature>,
96 errbacks: Vec<CallbackSignature>,
98 chain: Vec<CallbackSignature>,
100 chord: Option<CallbackSignature>,
102 content_type: ContentType,
104 persistent: bool,
106 reply_to: Option<String>,
108 extra_headers: HashMap<String, Value>,
110}
111
112impl MessageBuilder {
113 pub fn new(task: impl Into<String>) -> Self {
115 Self {
116 task: task.into(),
117 task_id: None,
118 args: Vec::new(),
119 kwargs: HashMap::new(),
120 priority: None,
121 queue: None,
122 routing_key: None,
123 eta: None,
124 countdown: None,
125 expires: None,
126 max_retries: None,
127 retries: None,
128 parent_id: None,
129 root_id: None,
130 group_id: None,
131 callbacks: Vec::new(),
132 errbacks: Vec::new(),
133 chain: Vec::new(),
134 chord: None,
135 content_type: ContentType::Json,
136 persistent: true,
137 reply_to: None,
138 extra_headers: HashMap::new(),
139 }
140 }
141
142 #[must_use]
144 pub fn id(mut self, id: Uuid) -> Self {
145 self.task_id = Some(id);
146 self
147 }
148
149 #[must_use]
151 pub fn args(mut self, args: Vec<Value>) -> Self {
152 self.args = args;
153 self
154 }
155
156 #[must_use]
158 pub fn arg(mut self, arg: Value) -> Self {
159 self.args.push(arg);
160 self
161 }
162
163 #[must_use]
165 pub fn kwargs(mut self, kwargs: HashMap<String, Value>) -> Self {
166 self.kwargs = kwargs;
167 self
168 }
169
170 #[must_use]
172 pub fn kwarg(mut self, key: impl Into<String>, value: Value) -> Self {
173 self.kwargs.insert(key.into(), value);
174 self
175 }
176
177 #[must_use]
179 pub fn priority(mut self, priority: u8) -> Self {
180 self.priority = Some(priority.min(9));
181 self
182 }
183
184 #[must_use]
186 pub fn queue(mut self, queue: impl Into<String>) -> Self {
187 self.queue = Some(queue.into());
188 self
189 }
190
191 #[must_use]
193 pub fn routing_key(mut self, key: impl Into<String>) -> Self {
194 self.routing_key = Some(key.into());
195 self
196 }
197
198 #[must_use]
200 pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
201 self.eta = Some(eta);
202 self.countdown = None; self
204 }
205
206 #[must_use]
208 pub fn countdown(mut self, seconds: i64) -> Self {
209 self.countdown = Some(seconds);
210 self.eta = None; self
212 }
213
214 #[must_use]
216 pub fn expires(mut self, expires: DateTime<Utc>) -> Self {
217 self.expires = Some(expires);
218 self
219 }
220
221 #[must_use]
223 pub fn expires_in(mut self, duration: Duration) -> Self {
224 self.expires = Some(Utc::now() + duration);
225 self
226 }
227
228 #[must_use]
230 pub fn max_retries(mut self, max: u32) -> Self {
231 self.max_retries = Some(max);
232 self
233 }
234
235 #[must_use]
237 pub fn retries(mut self, count: u32) -> Self {
238 self.retries = Some(count);
239 self
240 }
241
242 #[must_use]
244 pub fn parent(mut self, parent_id: Uuid) -> Self {
245 self.parent_id = Some(parent_id);
246 self
247 }
248
249 #[must_use]
251 pub fn root(mut self, root_id: Uuid) -> Self {
252 self.root_id = Some(root_id);
253 self
254 }
255
256 #[must_use]
258 pub fn group(mut self, group_id: Uuid) -> Self {
259 self.group_id = Some(group_id);
260 self
261 }
262
263 #[must_use]
265 pub fn link(mut self, task: impl Into<String>) -> Self {
266 self.callbacks.push(CallbackSignature::new(task));
267 self
268 }
269
270 #[must_use]
272 pub fn link_signature(mut self, callback: CallbackSignature) -> Self {
273 self.callbacks.push(callback);
274 self
275 }
276
277 #[must_use]
279 pub fn link_error(mut self, task: impl Into<String>) -> Self {
280 self.errbacks.push(CallbackSignature::new(task));
281 self
282 }
283
284 #[must_use]
286 pub fn link_error_signature(mut self, errback: CallbackSignature) -> Self {
287 self.errbacks.push(errback);
288 self
289 }
290
291 #[must_use]
293 pub fn chain_task(mut self, task: impl Into<String>) -> Self {
294 self.chain.push(CallbackSignature::new(task));
295 self
296 }
297
298 #[must_use]
300 pub fn chord(mut self, callback: impl Into<String>) -> Self {
301 self.chord = Some(CallbackSignature::new(callback));
302 self
303 }
304
305 #[must_use]
307 pub fn content_type(mut self, ct: ContentType) -> Self {
308 self.content_type = ct;
309 self
310 }
311
312 #[must_use]
314 pub fn persistent(mut self, persistent: bool) -> Self {
315 self.persistent = persistent;
316 self
317 }
318
319 #[must_use]
321 pub fn reply_to(mut self, queue: impl Into<String>) -> Self {
322 self.reply_to = Some(queue.into());
323 self
324 }
325
326 #[must_use]
328 pub fn header(mut self, key: impl Into<String>, value: Value) -> Self {
329 self.extra_headers.insert(key.into(), value);
330 self
331 }
332
333 pub fn build(self) -> BuilderResult<Message> {
335 let task_id = self.task_id.unwrap_or_else(Uuid::new_v4);
337
338 let eta = match (self.eta, self.countdown) {
340 (Some(eta), _) => Some(eta),
341 (None, Some(seconds)) => Some(Utc::now() + Duration::seconds(seconds)),
342 _ => None,
343 };
344
345 let mut embed = EmbedOptions::new();
347 for cb in self.callbacks {
348 embed = embed.with_callback(cb);
349 }
350 for eb in self.errbacks {
351 embed = embed.with_errback(eb);
352 }
353 for chain_task in self.chain {
354 embed = embed.with_chain_task(chain_task);
355 }
356 if let Some(chord) = self.chord {
357 embed = embed.with_chord(chord);
358 }
359 if let Some(group_id) = self.group_id {
360 embed = embed.with_group(group_id);
361 }
362 if let Some(parent_id) = self.parent_id {
363 embed = embed.with_parent(parent_id);
364 }
365 if let Some(root_id) = self.root_id {
366 embed = embed.with_root(root_id);
367 }
368
369 let embedded_body = EmbeddedBody::new()
371 .with_args(self.args)
372 .with_kwargs(self.kwargs)
373 .with_embed(embed);
374
375 let body = embedded_body
377 .encode()
378 .map_err(|e| BuilderError::SerializationError(e.to_string()))?;
379
380 let mut headers = MessageHeaders::new(self.task.clone(), task_id);
382 headers.eta = eta;
383 headers.expires = self.expires;
384 headers.retries = self.retries;
385 headers.parent_id = self.parent_id;
386 headers.root_id = self.root_id;
387 headers.group = self.group_id;
388
389 for (key, value) in self.extra_headers {
391 headers.extra.insert(key, value);
392 }
393
394 let properties = MessageProperties {
396 priority: self.priority,
397 delivery_mode: if self.persistent { 2 } else { 1 },
398 correlation_id: Some(task_id.to_string()),
399 reply_to: self.reply_to,
400 };
401
402 let message = Message {
404 headers,
405 properties,
406 body,
407 content_type: self.content_type.as_str().to_string(),
408 content_encoding: ContentEncoding::Utf8.as_str().to_string(),
409 };
410
411 Ok(message)
412 }
413
414 pub fn build_validated(self) -> BuilderResult<Message> {
416 let message = self.build()?;
417 message.validate().map_err(BuilderError::from)?;
418 Ok(message)
419 }
420}
421
422pub fn task(name: impl Into<String>) -> MessageBuilder {
424 MessageBuilder::new(name)
425}
426
427pub fn task_with_args(name: impl Into<String>, args: Vec<Value>) -> MessageBuilder {
429 MessageBuilder::new(name).args(args)
430}
431
432pub fn delayed_task(name: impl Into<String>, countdown_seconds: i64) -> MessageBuilder {
434 MessageBuilder::new(name).countdown(countdown_seconds)
435}
436
437pub fn scheduled_task(name: impl Into<String>, eta: DateTime<Utc>) -> MessageBuilder {
439 MessageBuilder::new(name).eta(eta)
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use serde_json::json;
446
447 #[test]
448 fn test_basic_message_builder() {
449 let message = MessageBuilder::new("tasks.add")
450 .args(vec![json!(1), json!(2)])
451 .build()
452 .unwrap();
453
454 assert_eq!(message.task_name(), "tasks.add");
455 assert!(!message.body.is_empty());
456 }
457
458 #[test]
459 fn test_message_builder_with_id() {
460 let id = Uuid::new_v4();
461 let message = MessageBuilder::new("tasks.test").id(id).build().unwrap();
462
463 assert_eq!(message.task_id(), id);
464 }
465
466 #[test]
467 fn test_message_builder_with_priority() {
468 let message = MessageBuilder::new("tasks.test")
469 .priority(9)
470 .build()
471 .unwrap();
472
473 assert_eq!(message.properties.priority, Some(9));
474 }
475
476 #[test]
477 fn test_message_builder_with_priority_capped() {
478 let message = MessageBuilder::new("tasks.test")
479 .priority(100)
480 .build()
481 .unwrap();
482
483 assert_eq!(message.properties.priority, Some(9));
484 }
485
486 #[test]
487 fn test_message_builder_with_countdown() {
488 let message = MessageBuilder::new("tasks.test")
489 .countdown(60)
490 .build()
491 .unwrap();
492
493 assert!(message.has_eta());
494 }
495
496 #[test]
497 fn test_message_builder_with_eta() {
498 let eta = Utc::now() + Duration::hours(1);
499 let message = MessageBuilder::new("tasks.test").eta(eta).build().unwrap();
500
501 assert!(message.has_eta());
502 assert_eq!(message.headers.eta, Some(eta));
503 }
504
505 #[test]
506 fn test_message_builder_with_expires() {
507 let expires = Utc::now() + Duration::days(1);
508 let message = MessageBuilder::new("tasks.test")
509 .expires(expires)
510 .build()
511 .unwrap();
512
513 assert!(message.has_expires());
514 }
515
516 #[test]
517 fn test_message_builder_with_expires_in() {
518 let message = MessageBuilder::new("tasks.test")
519 .expires_in(Duration::hours(2))
520 .build()
521 .unwrap();
522
523 assert!(message.has_expires());
524 }
525
526 #[test]
527 fn test_message_builder_with_kwargs() {
528 let mut kwargs = HashMap::new();
529 kwargs.insert("x".to_string(), json!(10));
530
531 let message = MessageBuilder::new("tasks.test")
532 .kwargs(kwargs)
533 .kwarg("y", json!(20))
534 .build()
535 .unwrap();
536
537 assert!(!message.body.is_empty());
538 }
539
540 #[test]
541 fn test_message_builder_with_link() {
542 let message = MessageBuilder::new("tasks.first")
543 .link("tasks.second")
544 .link_error("tasks.on_error")
545 .build()
546 .unwrap();
547
548 assert!(!message.body.is_empty());
549 }
550
551 #[test]
552 fn test_message_builder_with_chain() {
553 let message = MessageBuilder::new("tasks.step1")
554 .chain_task("tasks.step2")
555 .chain_task("tasks.step3")
556 .build()
557 .unwrap();
558
559 assert!(!message.body.is_empty());
560 }
561
562 #[test]
563 fn test_message_builder_with_workflow_ids() {
564 let parent_id = Uuid::new_v4();
565 let root_id = Uuid::new_v4();
566 let group_id = Uuid::new_v4();
567
568 let message = MessageBuilder::new("tasks.test")
569 .parent(parent_id)
570 .root(root_id)
571 .group(group_id)
572 .build()
573 .unwrap();
574
575 assert_eq!(message.headers.parent_id, Some(parent_id));
576 assert_eq!(message.headers.root_id, Some(root_id));
577 assert_eq!(message.headers.group, Some(group_id));
578 }
579
580 #[test]
581 fn test_message_builder_non_persistent() {
582 let message = MessageBuilder::new("tasks.test")
583 .persistent(false)
584 .build()
585 .unwrap();
586
587 assert_eq!(message.properties.delivery_mode, 1);
588 }
589
590 #[test]
591 fn test_message_builder_with_reply_to() {
592 let message = MessageBuilder::new("tasks.test")
593 .reply_to("results-queue")
594 .build()
595 .unwrap();
596
597 assert_eq!(
598 message.properties.reply_to,
599 Some("results-queue".to_string())
600 );
601 }
602
603 #[test]
604 fn test_message_builder_with_extra_header() {
605 let message = MessageBuilder::new("tasks.test")
606 .header("custom", json!("value"))
607 .build()
608 .unwrap();
609
610 assert_eq!(message.headers.extra.get("custom"), Some(&json!("value")));
611 }
612
613 #[test]
614 fn test_task_helper() {
615 let message = task("tasks.add")
616 .arg(json!(1))
617 .arg(json!(2))
618 .build()
619 .unwrap();
620 assert_eq!(message.task_name(), "tasks.add");
621 }
622
623 #[test]
624 fn test_task_with_args_helper() {
625 let message = task_with_args("tasks.add", vec![json!(1), json!(2)])
626 .build()
627 .unwrap();
628 assert_eq!(message.task_name(), "tasks.add");
629 }
630
631 #[test]
632 fn test_delayed_task_helper() {
633 let message = delayed_task("tasks.later", 300).build().unwrap();
634 assert!(message.has_eta());
635 }
636
637 #[test]
638 fn test_scheduled_task_helper() {
639 let eta = Utc::now() + Duration::hours(1);
640 let message = scheduled_task("tasks.scheduled", eta).build().unwrap();
641 assert!(message.has_eta());
642 }
643
644 #[test]
645 fn test_build_validated() {
646 let message = MessageBuilder::new("tasks.test")
647 .args(vec![json!(1)])
648 .build_validated()
649 .unwrap();
650
651 assert_eq!(message.task_name(), "tasks.test");
652 }
653
654 #[test]
655 fn test_builder_error_display() {
656 let err = BuilderError::MissingTaskName;
657 assert_eq!(err.to_string(), "Task name is required");
658
659 let err = BuilderError::SerializationError("test".to_string());
660 assert!(err.to_string().contains("test"));
661
662 let err = BuilderError::ValidationError("invalid".to_string());
663 assert!(err.to_string().contains("invalid"));
664 }
665}