1pub mod auth;
49pub mod batch;
50pub mod builder;
51pub mod compat;
52pub mod compression;
53pub mod crypto;
54pub mod dedup;
55pub mod embed;
56pub mod event;
57pub mod extension_api;
58pub mod extensions;
59pub mod lazy;
60pub mod middleware;
61pub mod migration;
62pub mod negotiation;
63pub mod pool;
64pub mod priority_queue;
65pub mod result;
66pub mod retry;
67pub mod routing;
68pub mod security;
69pub mod serializer;
70pub mod utils;
71pub mod workflow;
72pub mod zerocopy;
73
74use chrono::{DateTime, Utc};
75use serde::{Deserialize, Serialize};
76use std::collections::HashMap;
77use std::fmt;
78use uuid::Uuid;
79
80pub(crate) const CONTENT_TYPE_JSON: &str = "application/json";
82#[cfg(feature = "msgpack")]
83pub(crate) const CONTENT_TYPE_MSGPACK: &str = "application/x-msgpack";
84#[cfg(feature = "binary")]
85pub(crate) const CONTENT_TYPE_BINARY: &str = "application/octet-stream";
86
87pub(crate) const ENCODING_UTF8: &str = "utf-8";
89pub(crate) const ENCODING_BINARY: &str = "binary";
90
91pub(crate) const DEFAULT_LANG: &str = "rust";
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
115pub enum ValidationError {
116 EmptyTaskName,
118 RetryLimitExceeded { retries: u32, max: u32 },
120 EtaAfterExpiration,
122 InvalidDeliveryMode { mode: u8 },
124 InvalidPriority { priority: u8 },
126 EmptyContentType,
128 EmptyBody,
130 BodyTooLarge { size: usize, max: usize },
132}
133
134impl fmt::Display for ValidationError {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 ValidationError::EmptyTaskName => write!(f, "Task name cannot be empty"),
138 ValidationError::RetryLimitExceeded { retries, max } => {
139 write!(f, "Retries ({}) cannot exceed {}", retries, max)
140 }
141 ValidationError::EtaAfterExpiration => {
142 write!(f, "ETA cannot be after expiration time")
143 }
144 ValidationError::InvalidDeliveryMode { mode } => {
145 write!(
146 f,
147 "Invalid delivery mode ({}): must be 1 (non-persistent) or 2 (persistent)",
148 mode
149 )
150 }
151 ValidationError::InvalidPriority { priority } => {
152 write!(
153 f,
154 "Invalid priority ({}): must be between 0 and 9",
155 priority
156 )
157 }
158 ValidationError::EmptyContentType => write!(f, "Content type cannot be empty"),
159 ValidationError::EmptyBody => write!(f, "Message body cannot be empty"),
160 ValidationError::BodyTooLarge { size, max } => {
161 write!(
162 f,
163 "Message body too large: {} bytes (max {} bytes)",
164 size, max
165 )
166 }
167 }
168 }
169}
170
171impl std::error::Error for ValidationError {}
172
173#[derive(
175 Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
176)]
177pub enum ProtocolVersion {
178 #[default]
180 V2,
181 V5,
183}
184
185impl std::fmt::Display for ProtocolVersion {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 match self {
188 ProtocolVersion::V2 => write!(f, "v2"),
189 ProtocolVersion::V5 => write!(f, "v5"),
190 }
191 }
192}
193
194impl std::str::FromStr for ProtocolVersion {
195 type Err = String;
196
197 fn from_str(s: &str) -> Result<Self, Self::Err> {
198 match s.to_lowercase().as_str() {
199 "v2" | "2" => Ok(ProtocolVersion::V2),
200 "v5" | "5" => Ok(ProtocolVersion::V5),
201 _ => Err(format!("Invalid protocol version: {}", s)),
202 }
203 }
204}
205
206impl ProtocolVersion {
207 #[inline]
209 pub const fn is_v2(self) -> bool {
210 matches!(self, ProtocolVersion::V2)
211 }
212
213 #[inline]
215 pub const fn is_v5(self) -> bool {
216 matches!(self, ProtocolVersion::V5)
217 }
218
219 #[inline]
221 pub const fn as_u8(self) -> u8 {
222 match self {
223 ProtocolVersion::V2 => 2,
224 ProtocolVersion::V5 => 5,
225 }
226 }
227
228 #[inline]
230 pub const fn as_number_str(self) -> &'static str {
231 match self {
232 ProtocolVersion::V2 => "2",
233 ProtocolVersion::V5 => "5",
234 }
235 }
236}
237
238#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
240pub enum ContentType {
241 #[default]
243 Json,
244 #[cfg(feature = "msgpack")]
246 MessagePack,
247 #[cfg(feature = "binary")]
249 Binary,
250 Custom(String),
252}
253
254impl ContentType {
255 #[inline]
256 pub fn as_str(&self) -> &str {
257 match self {
258 ContentType::Json => CONTENT_TYPE_JSON,
259 #[cfg(feature = "msgpack")]
260 ContentType::MessagePack => CONTENT_TYPE_MSGPACK,
261 #[cfg(feature = "binary")]
262 ContentType::Binary => CONTENT_TYPE_BINARY,
263 ContentType::Custom(s) => s,
264 }
265 }
266}
267
268impl std::fmt::Display for ContentType {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 write!(f, "{}", self.as_str())
271 }
272}
273
274impl std::str::FromStr for ContentType {
275 type Err = String;
276
277 fn from_str(s: &str) -> Result<Self, Self::Err> {
278 match s {
279 CONTENT_TYPE_JSON => Ok(ContentType::Json),
280 #[cfg(feature = "msgpack")]
281 CONTENT_TYPE_MSGPACK => Ok(ContentType::MessagePack),
282 #[cfg(feature = "binary")]
283 CONTENT_TYPE_BINARY => Ok(ContentType::Binary),
284 other => Ok(ContentType::Custom(other.to_string())),
285 }
286 }
287}
288
289impl From<&str> for ContentType {
290 fn from(s: &str) -> Self {
291 match s {
292 CONTENT_TYPE_JSON => ContentType::Json,
293 #[cfg(feature = "msgpack")]
294 CONTENT_TYPE_MSGPACK => ContentType::MessagePack,
295 #[cfg(feature = "binary")]
296 CONTENT_TYPE_BINARY => ContentType::Binary,
297 other => ContentType::Custom(other.to_string()),
298 }
299 }
300}
301
302impl AsRef<str> for ContentType {
303 fn as_ref(&self) -> &str {
304 self.as_str()
305 }
306}
307
308#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
310pub enum ContentEncoding {
311 #[default]
313 Utf8,
314 Binary,
316 Custom(String),
318}
319
320impl ContentEncoding {
321 #[inline]
322 pub fn as_str(&self) -> &str {
323 match self {
324 ContentEncoding::Utf8 => ENCODING_UTF8,
325 ContentEncoding::Binary => ENCODING_BINARY,
326 ContentEncoding::Custom(s) => s,
327 }
328 }
329}
330
331impl std::fmt::Display for ContentEncoding {
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 write!(f, "{}", self.as_str())
334 }
335}
336
337impl std::str::FromStr for ContentEncoding {
338 type Err = String;
339
340 fn from_str(s: &str) -> Result<Self, Self::Err> {
341 match s {
342 ENCODING_UTF8 => Ok(ContentEncoding::Utf8),
343 ENCODING_BINARY => Ok(ContentEncoding::Binary),
344 other => Ok(ContentEncoding::Custom(other.to_string())),
345 }
346 }
347}
348
349impl From<&str> for ContentEncoding {
350 fn from(s: &str) -> Self {
351 match s {
352 ENCODING_UTF8 => ContentEncoding::Utf8,
353 ENCODING_BINARY => ContentEncoding::Binary,
354 other => ContentEncoding::Custom(other.to_string()),
355 }
356 }
357}
358
359impl AsRef<str> for ContentEncoding {
360 fn as_ref(&self) -> &str {
361 self.as_str()
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
367pub struct MessageHeaders {
368 pub task: String,
370
371 pub id: Uuid,
373
374 #[serde(default = "default_lang")]
376 pub lang: String,
377
378 #[serde(skip_serializing_if = "Option::is_none")]
380 pub root_id: Option<Uuid>,
381
382 #[serde(skip_serializing_if = "Option::is_none")]
384 pub parent_id: Option<Uuid>,
385
386 #[serde(skip_serializing_if = "Option::is_none")]
388 pub group: Option<Uuid>,
389
390 #[serde(skip_serializing_if = "Option::is_none")]
392 pub retries: Option<u32>,
393
394 #[serde(skip_serializing_if = "Option::is_none")]
396 pub eta: Option<DateTime<Utc>>,
397
398 #[serde(skip_serializing_if = "Option::is_none")]
400 pub expires: Option<DateTime<Utc>>,
401
402 #[serde(flatten)]
404 pub extra: HashMap<String, serde_json::Value>,
405}
406
407fn default_lang() -> String {
408 DEFAULT_LANG.to_string()
409}
410
411impl MessageHeaders {
412 pub fn new(task: String, id: Uuid) -> Self {
413 Self {
414 task,
415 id,
416 lang: default_lang(),
417 root_id: None,
418 parent_id: None,
419 group: None,
420 retries: None,
421 eta: None,
422 expires: None,
423 extra: HashMap::new(),
424 }
425 }
426
427 #[must_use]
429 pub fn with_lang(mut self, lang: String) -> Self {
430 self.lang = lang;
431 self
432 }
433
434 #[must_use]
436 pub fn with_root_id(mut self, root_id: Uuid) -> Self {
437 self.root_id = Some(root_id);
438 self
439 }
440
441 #[must_use]
443 pub fn with_parent_id(mut self, parent_id: Uuid) -> Self {
444 self.parent_id = Some(parent_id);
445 self
446 }
447
448 #[must_use]
450 pub fn with_group(mut self, group: Uuid) -> Self {
451 self.group = Some(group);
452 self
453 }
454
455 #[must_use]
457 pub fn with_retries(mut self, retries: u32) -> Self {
458 self.retries = Some(retries);
459 self
460 }
461
462 #[must_use]
464 pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
465 self.eta = Some(eta);
466 self
467 }
468
469 #[must_use]
471 pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
472 self.expires = Some(expires);
473 self
474 }
475
476 pub fn validate(&self) -> Result<(), ValidationError> {
478 if self.task.is_empty() {
479 return Err(ValidationError::EmptyTaskName);
480 }
481
482 if let Some(retries) = self.retries {
483 if retries > 1000 {
484 return Err(ValidationError::RetryLimitExceeded { retries, max: 1000 });
485 }
486 }
487
488 if let (Some(eta), Some(expires)) = (self.eta, self.expires) {
490 if eta > expires {
491 return Err(ValidationError::EtaAfterExpiration);
492 }
493 }
494
495 Ok(())
496 }
497}
498
499#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
501pub struct MessageProperties {
502 #[serde(skip_serializing_if = "Option::is_none")]
504 pub correlation_id: Option<String>,
505
506 #[serde(skip_serializing_if = "Option::is_none")]
508 pub reply_to: Option<String>,
509
510 #[serde(default = "default_delivery_mode")]
512 pub delivery_mode: u8,
513
514 #[serde(skip_serializing_if = "Option::is_none")]
516 pub priority: Option<u8>,
517}
518
519const fn default_delivery_mode() -> u8 {
520 2 }
522
523impl Default for MessageProperties {
524 fn default() -> Self {
525 Self {
526 correlation_id: None,
527 reply_to: None,
528 delivery_mode: default_delivery_mode(),
529 priority: None,
530 }
531 }
532}
533
534impl MessageProperties {
535 pub fn new() -> Self {
537 Self::default()
538 }
539
540 #[must_use]
542 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
543 self.correlation_id = Some(correlation_id);
544 self
545 }
546
547 #[must_use]
549 pub fn with_reply_to(mut self, reply_to: String) -> Self {
550 self.reply_to = Some(reply_to);
551 self
552 }
553
554 #[must_use]
556 pub fn with_delivery_mode(mut self, delivery_mode: u8) -> Self {
557 self.delivery_mode = delivery_mode;
558 self
559 }
560
561 #[must_use]
563 pub fn with_priority(mut self, priority: u8) -> Self {
564 self.priority = Some(priority);
565 self
566 }
567
568 pub fn validate(&self) -> Result<(), ValidationError> {
570 if self.delivery_mode != 1 && self.delivery_mode != 2 {
571 return Err(ValidationError::InvalidDeliveryMode {
572 mode: self.delivery_mode,
573 });
574 }
575
576 if let Some(priority) = self.priority {
577 if priority > 9 {
578 return Err(ValidationError::InvalidPriority { priority });
579 }
580 }
581
582 Ok(())
583 }
584}
585
586#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
588pub struct Message {
589 pub headers: MessageHeaders,
591
592 pub properties: MessageProperties,
594
595 #[serde(with = "serde_bytes_opt")]
597 pub body: Vec<u8>,
598
599 #[serde(rename = "content-type")]
601 pub content_type: String,
602
603 #[serde(rename = "content-encoding")]
605 pub content_encoding: String,
606}
607
608mod serde_bytes_opt {
610 use base64::Engine;
611 use serde::de::Error;
612 use serde::{Deserialize, Deserializer, Serializer};
613
614 pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
615 where
616 S: Serializer,
617 {
618 let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
620 serializer.serialize_str(&encoded)
621 }
622
623 pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
624 where
625 D: Deserializer<'de>,
626 {
627 let s = String::deserialize(deserializer)?;
628 base64::engine::general_purpose::STANDARD
629 .decode(&s)
630 .map_err(Error::custom)
631 }
632}
633
634impl Message {
635 pub fn new(task: String, id: Uuid, body: Vec<u8>) -> Self {
637 Self {
638 headers: MessageHeaders::new(task, id),
639 properties: MessageProperties::default(),
640 body,
641 content_type: CONTENT_TYPE_JSON.to_string(),
642 content_encoding: ENCODING_UTF8.to_string(),
643 }
644 }
645
646 #[must_use]
648 pub fn with_priority(mut self, priority: u8) -> Self {
649 self.properties.priority = Some(priority);
650 self
651 }
652
653 #[must_use]
655 pub fn with_parent(mut self, parent_id: Uuid) -> Self {
656 self.headers.parent_id = Some(parent_id);
657 self
658 }
659
660 #[must_use]
662 pub fn with_root(mut self, root_id: Uuid) -> Self {
663 self.headers.root_id = Some(root_id);
664 self
665 }
666
667 #[must_use]
669 pub fn with_group(mut self, group: Uuid) -> Self {
670 self.headers.group = Some(group);
671 self
672 }
673
674 #[must_use]
676 pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
677 self.headers.eta = Some(eta);
678 self
679 }
680
681 #[must_use]
683 pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
684 self.headers.expires = Some(expires);
685 self
686 }
687
688 #[must_use]
690 pub fn with_retries(mut self, retries: u32) -> Self {
691 self.headers.retries = Some(retries);
692 self
693 }
694
695 #[must_use]
697 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
698 self.properties.correlation_id = Some(correlation_id);
699 self
700 }
701
702 #[must_use]
704 pub fn with_reply_to(mut self, reply_to: String) -> Self {
705 self.properties.reply_to = Some(reply_to);
706 self
707 }
708
709 #[must_use]
711 pub fn with_delivery_mode(mut self, mode: u8) -> Self {
712 self.properties.delivery_mode = mode;
713 self
714 }
715
716 pub fn validate(&self) -> Result<(), ValidationError> {
724 self.headers.validate()?;
726
727 self.properties.validate()?;
729
730 if self.content_type.is_empty() {
732 return Err(ValidationError::EmptyContentType);
733 }
734
735 if self.body.is_empty() {
737 return Err(ValidationError::EmptyBody);
738 }
739
740 if self.body.len() > 10_485_760 {
741 return Err(ValidationError::BodyTooLarge {
743 size: self.body.len(),
744 max: 10_485_760,
745 });
746 }
747
748 Ok(())
749 }
750
751 pub fn validate_with_limit(&self, max_body_bytes: usize) -> Result<(), ValidationError> {
753 self.headers.validate()?;
754 self.properties.validate()?;
755
756 if self.content_type.is_empty() {
757 return Err(ValidationError::EmptyContentType);
758 }
759
760 if self.body.is_empty() {
761 return Err(ValidationError::EmptyBody);
762 }
763
764 if self.body.len() > max_body_bytes {
765 return Err(ValidationError::BodyTooLarge {
766 size: self.body.len(),
767 max: max_body_bytes,
768 });
769 }
770
771 Ok(())
772 }
773
774 #[inline(always)]
776 pub fn has_eta(&self) -> bool {
777 self.headers.eta.is_some()
778 }
779
780 #[inline(always)]
782 pub fn has_expires(&self) -> bool {
783 self.headers.expires.is_some()
784 }
785
786 #[inline(always)]
788 pub fn has_group(&self) -> bool {
789 self.headers.group.is_some()
790 }
791
792 #[inline(always)]
794 pub fn has_parent(&self) -> bool {
795 self.headers.parent_id.is_some()
796 }
797
798 #[inline(always)]
800 pub fn has_root(&self) -> bool {
801 self.headers.root_id.is_some()
802 }
803
804 #[inline(always)]
806 pub fn is_persistent(&self) -> bool {
807 self.properties.delivery_mode == 2
808 }
809
810 #[inline(always)]
812 pub fn task_id(&self) -> uuid::Uuid {
813 self.headers.id
814 }
815
816 #[inline(always)]
818 pub fn task_name(&self) -> &str {
819 &self.headers.task
820 }
821
822 #[inline(always)]
824 pub fn content_type_str(&self) -> &str {
825 &self.content_type
826 }
827
828 #[inline(always)]
830 pub fn content_encoding_str(&self) -> &str {
831 &self.content_encoding
832 }
833
834 #[inline(always)]
836 pub fn body_size(&self) -> usize {
837 self.body.len()
838 }
839
840 #[inline(always)]
842 pub fn has_empty_body(&self) -> bool {
843 self.body.is_empty()
844 }
845
846 #[inline(always)]
848 pub fn retry_count(&self) -> u32 {
849 self.headers.retries.unwrap_or(0)
850 }
851
852 #[inline(always)]
854 pub fn priority(&self) -> Option<u8> {
855 self.properties.priority
856 }
857
858 #[inline(always)]
860 pub fn has_correlation_id(&self) -> bool {
861 self.properties.correlation_id.is_some()
862 }
863
864 #[inline]
866 pub fn correlation_id(&self) -> Option<&str> {
867 self.properties.correlation_id.as_deref()
868 }
869
870 #[inline]
872 pub fn reply_to(&self) -> Option<&str> {
873 self.properties.reply_to.as_deref()
874 }
875
876 #[inline(always)]
878 pub fn is_workflow_message(&self) -> bool {
879 self.has_parent() || self.has_root() || self.has_group()
880 }
881
882 #[must_use]
884 pub fn with_new_id(&self) -> Self {
885 let mut cloned = self.clone();
886 cloned.headers.id = Uuid::new_v4();
887 cloned
888 }
889
890 pub fn to_builder(&self) -> crate::builder::MessageBuilder {
895 let mut builder = crate::builder::MessageBuilder::new(&self.headers.task);
896
897 builder = builder.id(self.headers.id);
899
900 if let Some(priority) = self.properties.priority {
902 builder = builder.priority(priority);
903 }
904 if let Some(parent_id) = self.headers.parent_id {
905 builder = builder.parent(parent_id);
906 }
907 if let Some(root_id) = self.headers.root_id {
908 builder = builder.root(root_id);
909 }
910 if let Some(group) = self.headers.group {
911 builder = builder.group(group);
912 }
913 if let Some(eta) = self.headers.eta {
914 builder = builder.eta(eta);
915 }
916 if let Some(expires) = self.headers.expires {
917 builder = builder.expires(expires);
918 }
919
920 builder
921 }
922
923 #[inline]
925 pub fn is_ready_for_execution(&self) -> bool {
926 match self.headers.eta {
927 None => true,
928 Some(eta) => chrono::Utc::now() >= eta,
929 }
930 }
931
932 #[inline]
934 pub fn is_not_expired(&self) -> bool {
935 match self.headers.expires {
936 None => true,
937 Some(expires) => chrono::Utc::now() < expires,
938 }
939 }
940
941 #[inline]
943 pub fn should_process(&self) -> bool {
944 self.is_ready_for_execution() && self.is_not_expired()
945 }
946
947 #[must_use]
961 pub fn with_eta_delay(mut self, delay: chrono::Duration) -> Self {
962 self.headers.eta = Some(chrono::Utc::now() + delay);
963 self
964 }
965
966 #[must_use]
980 pub fn with_expires_in(mut self, duration: chrono::Duration) -> Self {
981 self.headers.expires = Some(chrono::Utc::now() + duration);
982 self
983 }
984
985 #[inline]
987 pub fn time_until_eta(&self) -> Option<chrono::Duration> {
988 self.headers.eta.and_then(|eta| {
989 let now = chrono::Utc::now();
990 if eta > now {
991 Some(eta - now)
992 } else {
993 None
994 }
995 })
996 }
997
998 #[inline]
1000 pub fn time_until_expiration(&self) -> Option<chrono::Duration> {
1001 self.headers.expires.and_then(|expires| {
1002 let now = chrono::Utc::now();
1003 if expires > now {
1004 Some(expires - now)
1005 } else {
1006 None
1007 }
1008 })
1009 }
1010
1011 pub fn increment_retry(&mut self) -> u32 {
1013 let new_count = self.headers.retries.unwrap_or(0) + 1;
1014 self.headers.retries = Some(new_count);
1015 new_count
1016 }
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
1021pub struct TaskArgs {
1022 #[serde(default)]
1024 pub args: Vec<serde_json::Value>,
1025
1026 #[serde(default)]
1028 pub kwargs: HashMap<String, serde_json::Value>,
1029}
1030
1031impl TaskArgs {
1032 pub fn new() -> Self {
1034 Self::default()
1035 }
1036
1037 #[must_use]
1039 pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
1040 self.args = args;
1041 self
1042 }
1043
1044 #[must_use]
1046 pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
1047 self.kwargs = kwargs;
1048 self
1049 }
1050
1051 pub fn add_arg(&mut self, arg: serde_json::Value) {
1053 self.args.push(arg);
1054 }
1055
1056 pub fn add_kwarg(&mut self, key: String, value: serde_json::Value) {
1058 self.kwargs.insert(key, value);
1059 }
1060
1061 #[inline(always)]
1063 pub fn is_empty(&self) -> bool {
1064 self.args.is_empty() && self.kwargs.is_empty()
1065 }
1066
1067 #[inline(always)]
1069 pub fn len(&self) -> usize {
1070 self.args.len() + self.kwargs.len()
1071 }
1072
1073 #[inline(always)]
1075 pub fn has_args(&self) -> bool {
1076 !self.args.is_empty()
1077 }
1078
1079 #[inline(always)]
1081 pub fn has_kwargs(&self) -> bool {
1082 !self.kwargs.is_empty()
1083 }
1084
1085 pub fn clear(&mut self) {
1087 self.args.clear();
1088 self.kwargs.clear();
1089 }
1090
1091 #[inline]
1093 pub fn get_arg(&self, index: usize) -> Option<&serde_json::Value> {
1094 self.args.get(index)
1095 }
1096
1097 #[inline]
1099 pub fn get_kwarg(&self, key: &str) -> Option<&serde_json::Value> {
1100 self.kwargs.get(key)
1101 }
1102
1103 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
1105 serde_json::from_str(json)
1106 }
1107
1108 pub fn to_json(&self) -> Result<String, serde_json::Error> {
1110 serde_json::to_string(self)
1111 }
1112
1113 pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1115 serde_json::to_string_pretty(self)
1116 }
1117}
1118
1119impl std::ops::Index<usize> for TaskArgs {
1121 type Output = serde_json::Value;
1122
1123 #[inline]
1124 fn index(&self, index: usize) -> &Self::Output {
1125 &self.args[index]
1126 }
1127}
1128
1129impl std::ops::IndexMut<usize> for TaskArgs {
1131 #[inline]
1132 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
1133 &mut self.args[index]
1134 }
1135}
1136
1137impl std::ops::Index<&str> for TaskArgs {
1139 type Output = serde_json::Value;
1140
1141 #[inline]
1142 fn index(&self, key: &str) -> &Self::Output {
1143 &self.kwargs[key]
1144 }
1145}
1146
1147impl IntoIterator for TaskArgs {
1149 type Item = serde_json::Value;
1150 type IntoIter = std::vec::IntoIter<serde_json::Value>;
1151
1152 fn into_iter(self) -> Self::IntoIter {
1153 self.args.into_iter()
1154 }
1155}
1156
1157impl<'a> IntoIterator for &'a TaskArgs {
1159 type Item = &'a serde_json::Value;
1160 type IntoIter = std::slice::Iter<'a, serde_json::Value>;
1161
1162 fn into_iter(self) -> Self::IntoIter {
1163 self.args.iter()
1164 }
1165}
1166
1167impl Extend<serde_json::Value> for TaskArgs {
1169 fn extend<T: IntoIterator<Item = serde_json::Value>>(&mut self, iter: T) {
1170 self.args.extend(iter);
1171 }
1172}
1173
1174impl Extend<(String, serde_json::Value)> for TaskArgs {
1176 fn extend<T: IntoIterator<Item = (String, serde_json::Value)>>(&mut self, iter: T) {
1177 self.kwargs.extend(iter);
1178 }
1179}
1180
1181impl FromIterator<serde_json::Value> for TaskArgs {
1183 fn from_iter<T: IntoIterator<Item = serde_json::Value>>(iter: T) -> Self {
1184 Self {
1185 args: iter.into_iter().collect(),
1186 kwargs: HashMap::new(),
1187 }
1188 }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use super::*;
1194
1195 #[test]
1196 fn test_message_creation() {
1197 let task_id = Uuid::new_v4();
1198 let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
1199 let msg = Message::new("tasks.add".to_string(), task_id, body);
1200
1201 assert_eq!(msg.headers.task, "tasks.add");
1202 assert_eq!(msg.headers.id, task_id);
1203 assert_eq!(msg.headers.lang, "rust");
1204 assert_eq!(msg.content_type, "application/json");
1205 }
1206
1207 #[test]
1208 fn test_message_with_priority() {
1209 let task_id = Uuid::new_v4();
1210 let body = vec![];
1211 let msg = Message::new("tasks.test".to_string(), task_id, body).with_priority(9);
1212
1213 assert_eq!(msg.properties.priority, Some(9));
1214 }
1215
1216 #[test]
1217 fn test_task_args() {
1218 let args = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1219
1220 assert_eq!(args.args.len(), 2);
1221 assert_eq!(args.kwargs.len(), 0);
1222 }
1223
1224 #[test]
1225 fn test_protocol_version_default() {
1226 let version = ProtocolVersion::default();
1227 assert_eq!(version, ProtocolVersion::V2);
1228 }
1229
1230 #[test]
1231 fn test_protocol_version_display() {
1232 assert_eq!(ProtocolVersion::V2.to_string(), "v2");
1233 assert_eq!(ProtocolVersion::V5.to_string(), "v5");
1234 }
1235
1236 #[test]
1237 fn test_content_type_as_str() {
1238 assert_eq!(ContentType::Json.as_str(), "application/json");
1239 assert_eq!(
1240 ContentType::Custom("text/plain".to_string()).as_str(),
1241 "text/plain"
1242 );
1243 }
1244
1245 #[test]
1246 fn test_content_type_default() {
1247 let ct = ContentType::default();
1248 assert_eq!(ct, ContentType::Json);
1249 }
1250
1251 #[test]
1252 fn test_content_type_display() {
1253 assert_eq!(ContentType::Json.to_string(), "application/json");
1254 assert_eq!(
1255 ContentType::Custom("text/xml".to_string()).to_string(),
1256 "text/xml"
1257 );
1258 }
1259
1260 #[test]
1261 fn test_content_encoding_as_str() {
1262 assert_eq!(ContentEncoding::Utf8.as_str(), "utf-8");
1263 assert_eq!(ContentEncoding::Binary.as_str(), "binary");
1264 assert_eq!(ContentEncoding::Custom("gzip".to_string()).as_str(), "gzip");
1265 }
1266
1267 #[test]
1268 fn test_content_encoding_default() {
1269 let ce = ContentEncoding::default();
1270 assert_eq!(ce, ContentEncoding::Utf8);
1271 }
1272
1273 #[test]
1274 fn test_content_encoding_display() {
1275 assert_eq!(ContentEncoding::Utf8.to_string(), "utf-8");
1276 assert_eq!(ContentEncoding::Binary.to_string(), "binary");
1277 }
1278
1279 #[test]
1280 fn test_message_headers_validate_empty_task() {
1281 let headers = MessageHeaders::new("".to_string(), Uuid::new_v4());
1282 let result = headers.validate();
1283 assert!(result.is_err());
1284 assert_eq!(result.unwrap_err(), ValidationError::EmptyTaskName);
1285 }
1286
1287 #[test]
1288 fn test_message_headers_validate_retries_limit() {
1289 let mut headers = MessageHeaders::new("test".to_string(), Uuid::new_v4());
1290 headers.retries = Some(1001);
1291 let result = headers.validate();
1292 assert!(result.is_err());
1293 assert_eq!(
1294 result.unwrap_err(),
1295 ValidationError::RetryLimitExceeded {
1296 retries: 1001,
1297 max: 1000
1298 }
1299 );
1300 }
1301
1302 #[test]
1303 fn test_message_headers_validate_eta_expires() {
1304 let mut headers = MessageHeaders::new("test".to_string(), Uuid::new_v4());
1305 headers.eta = Some(Utc::now() + chrono::Duration::hours(2));
1306 headers.expires = Some(Utc::now() + chrono::Duration::hours(1));
1307 let result = headers.validate();
1308 assert!(result.is_err());
1309 assert_eq!(result.unwrap_err(), ValidationError::EtaAfterExpiration);
1310 }
1311
1312 #[test]
1313 fn test_message_properties_validate_delivery_mode() {
1314 let props = MessageProperties {
1315 delivery_mode: 3,
1316 ..MessageProperties::default()
1317 };
1318 let result = props.validate();
1319 assert!(result.is_err());
1320 assert_eq!(
1321 result.unwrap_err(),
1322 ValidationError::InvalidDeliveryMode { mode: 3 }
1323 );
1324 }
1325
1326 #[test]
1327 fn test_message_properties_validate_priority() {
1328 let props = MessageProperties {
1329 delivery_mode: 2, priority: Some(10),
1331 ..MessageProperties::default()
1332 };
1333 let result = props.validate();
1334 assert!(result.is_err());
1335 assert_eq!(
1336 result.unwrap_err(),
1337 ValidationError::InvalidPriority { priority: 10 }
1338 );
1339 }
1340
1341 #[test]
1342 fn test_message_predicates() {
1343 let task_id = Uuid::new_v4();
1344 let parent_id = Uuid::new_v4();
1345 let root_id = Uuid::new_v4();
1346 let group_id = Uuid::new_v4();
1347
1348 let mut msg = Message::new("test".to_string(), task_id, vec![1, 2, 3])
1349 .with_parent(parent_id)
1350 .with_root(root_id)
1351 .with_group(group_id)
1352 .with_eta(Utc::now() + chrono::Duration::hours(1))
1353 .with_expires(Utc::now() + chrono::Duration::days(1));
1354
1355 msg.properties.delivery_mode = 2;
1357
1358 assert!(msg.has_parent());
1359 assert!(msg.has_root());
1360 assert!(msg.has_group());
1361 assert!(msg.has_eta());
1362 assert!(msg.has_expires());
1363 assert!(msg.is_persistent());
1364 }
1365
1366 #[test]
1367 fn test_message_accessors() {
1368 let task_id = Uuid::new_v4();
1369 let msg = Message::new("my_task".to_string(), task_id, vec![1, 2, 3]);
1370
1371 assert_eq!(msg.task_id(), task_id);
1372 assert_eq!(msg.task_name(), "my_task");
1373 }
1374
1375 #[test]
1376 fn test_task_args_add_methods() {
1377 let mut args = TaskArgs::new();
1378 assert!(args.is_empty());
1379
1380 args.add_arg(serde_json::json!(1));
1381 args.add_arg(serde_json::json!(2));
1382 assert_eq!(args.len(), 2);
1383 assert!(args.has_args());
1384 assert!(!args.has_kwargs());
1385
1386 args.add_kwarg("key1".to_string(), serde_json::json!("value1"));
1387 assert_eq!(args.len(), 3);
1388 assert!(args.has_kwargs());
1389 }
1390
1391 #[test]
1392 fn test_task_args_get_methods() {
1393 let mut args = TaskArgs::new();
1394 args.add_arg(serde_json::json!(42));
1395 args.add_kwarg("name".to_string(), serde_json::json!("test"));
1396
1397 assert_eq!(args.get_arg(0), Some(&serde_json::json!(42)));
1398 assert_eq!(args.get_arg(1), None);
1399 assert_eq!(args.get_kwarg("name"), Some(&serde_json::json!("test")));
1400 assert_eq!(args.get_kwarg("missing"), None);
1401 }
1402
1403 #[test]
1404 fn test_task_args_clear() {
1405 let mut args = TaskArgs::new()
1406 .with_args(vec![serde_json::json!(1)])
1407 .with_kwargs({
1408 let mut map = HashMap::new();
1409 map.insert("key".to_string(), serde_json::json!("value"));
1410 map
1411 });
1412
1413 assert!(!args.is_empty());
1414 args.clear();
1415 assert!(args.is_empty());
1416 assert_eq!(args.len(), 0);
1417 }
1418
1419 #[test]
1420 fn test_task_args_partial_eq() {
1421 let args1 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1422 let args2 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1423 let args3 = TaskArgs::new().with_args(vec![serde_json::json!(1)]);
1424
1425 assert_eq!(args1, args2);
1426 assert_ne!(args1, args3);
1427 }
1428
1429 #[test]
1430 fn test_message_body_methods() {
1431 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3, 4, 5]);
1432 assert_eq!(msg.body_size(), 5);
1433 assert!(!msg.has_empty_body());
1434
1435 let empty_msg = Message::new("test".to_string(), Uuid::new_v4(), vec![]);
1436 assert_eq!(empty_msg.body_size(), 0);
1437 assert!(empty_msg.has_empty_body());
1438 }
1439
1440 #[test]
1441 fn test_message_content_accessors() {
1442 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1443 assert_eq!(msg.content_type_str(), "application/json");
1444 assert_eq!(msg.content_encoding_str(), "utf-8");
1445 }
1446
1447 #[test]
1448 fn test_message_retry_and_priority() {
1449 let mut msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1450 assert_eq!(msg.retry_count(), 0);
1451 assert_eq!(msg.priority(), None);
1452
1453 msg.headers.retries = Some(3);
1454 msg.properties.priority = Some(5);
1455 assert_eq!(msg.retry_count(), 3);
1456 assert_eq!(msg.priority(), Some(5));
1457 }
1458
1459 #[test]
1460 fn test_message_correlation_and_reply() {
1461 let mut msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1462 assert!(!msg.has_correlation_id());
1463 assert_eq!(msg.correlation_id(), None);
1464 assert_eq!(msg.reply_to(), None);
1465
1466 msg.properties.correlation_id = Some("corr-123".to_string());
1467 msg.properties.reply_to = Some("reply-queue".to_string());
1468 assert!(msg.has_correlation_id());
1469 assert_eq!(msg.correlation_id(), Some("corr-123"));
1470 assert_eq!(msg.reply_to(), Some("reply-queue"));
1471 }
1472
1473 #[test]
1474 fn test_message_workflow_check() {
1475 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1476 assert!(!msg.is_workflow_message());
1477
1478 let workflow_msg = msg.with_parent(Uuid::new_v4());
1479 assert!(workflow_msg.is_workflow_message());
1480 }
1481
1482 #[test]
1483 fn test_message_with_new_id() {
1484 let task_id = Uuid::new_v4();
1485 let msg = Message::new("test".to_string(), task_id, vec![1, 2, 3]);
1486 let new_msg = msg.with_new_id();
1487
1488 assert_ne!(msg.task_id(), new_msg.task_id());
1489 assert_eq!(msg.task_name(), new_msg.task_name());
1490 assert_eq!(msg.body, new_msg.body);
1491 }
1492
1493 #[test]
1494 fn test_message_to_builder() {
1495 let task_id = Uuid::new_v4();
1496 let parent_id = Uuid::new_v4();
1497 let msg = Message::new("test.task".to_string(), task_id, vec![1, 2, 3])
1498 .with_priority(5)
1499 .with_parent(parent_id);
1500
1501 let builder = msg.to_builder();
1502 let rebuilt = builder.args(vec![serde_json::json!(1)]).build().unwrap();
1504
1505 assert_eq!(rebuilt.task_id(), msg.task_id());
1506 assert_eq!(rebuilt.task_name(), msg.task_name());
1507 assert_eq!(rebuilt.priority(), msg.priority());
1508 assert_eq!(rebuilt.headers.parent_id, msg.headers.parent_id);
1509 }
1510
1511 #[test]
1512 fn test_protocol_version_from_str() {
1513 use std::str::FromStr;
1514 assert_eq!(
1515 ProtocolVersion::from_str("v2").unwrap(),
1516 ProtocolVersion::V2
1517 );
1518 assert_eq!(
1519 ProtocolVersion::from_str("V2").unwrap(),
1520 ProtocolVersion::V2
1521 );
1522 assert_eq!(ProtocolVersion::from_str("2").unwrap(), ProtocolVersion::V2);
1523 assert_eq!(
1524 ProtocolVersion::from_str("v5").unwrap(),
1525 ProtocolVersion::V5
1526 );
1527 assert_eq!(
1528 ProtocolVersion::from_str("V5").unwrap(),
1529 ProtocolVersion::V5
1530 );
1531 assert_eq!(ProtocolVersion::from_str("5").unwrap(), ProtocolVersion::V5);
1532 assert!(ProtocolVersion::from_str("v3").is_err());
1533 assert!(ProtocolVersion::from_str("invalid").is_err());
1534 }
1535
1536 #[test]
1537 fn test_protocol_version_ordering() {
1538 assert!(ProtocolVersion::V2 < ProtocolVersion::V5);
1539 assert!(ProtocolVersion::V5 > ProtocolVersion::V2);
1540 assert_eq!(ProtocolVersion::V2, ProtocolVersion::V2);
1541 }
1542
1543 #[test]
1544 fn test_content_type_from_str() {
1545 use std::str::FromStr;
1546 assert_eq!(
1547 ContentType::from_str("application/json").unwrap(),
1548 ContentType::Json
1549 );
1550 assert_eq!(
1551 ContentType::from_str("text/plain").unwrap(),
1552 ContentType::Custom("text/plain".to_string())
1553 );
1554 }
1555
1556 #[test]
1557 fn test_content_encoding_from_str() {
1558 use std::str::FromStr;
1559 assert_eq!(
1560 ContentEncoding::from_str("utf-8").unwrap(),
1561 ContentEncoding::Utf8
1562 );
1563 assert_eq!(
1564 ContentEncoding::from_str("binary").unwrap(),
1565 ContentEncoding::Binary
1566 );
1567 assert_eq!(
1568 ContentEncoding::from_str("gzip").unwrap(),
1569 ContentEncoding::Custom("gzip".to_string())
1570 );
1571 }
1572
1573 #[test]
1574 fn test_message_headers_equality() {
1575 let id = Uuid::new_v4();
1576 let headers1 = MessageHeaders::new("tasks.add".to_string(), id);
1577 let headers2 = MessageHeaders::new("tasks.add".to_string(), id);
1578 let headers3 = MessageHeaders::new("tasks.sub".to_string(), id);
1579
1580 assert_eq!(headers1, headers2);
1581 assert_ne!(headers1, headers3);
1582 }
1583
1584 #[test]
1585 fn test_message_properties_equality() {
1586 let props1 = MessageProperties::default();
1587 let props2 = MessageProperties::default();
1588 let props3 = MessageProperties {
1589 priority: Some(5),
1590 ..Default::default()
1591 };
1592
1593 assert_eq!(props1, props2);
1594 assert_ne!(props1, props3);
1595 }
1596
1597 #[test]
1598 fn test_message_equality() {
1599 let id = Uuid::new_v4();
1600 let body = vec![1, 2, 3];
1601 let msg1 = Message::new("tasks.add".to_string(), id, body.clone());
1602 let msg2 = Message::new("tasks.add".to_string(), id, body.clone());
1603 let msg3 = Message::new("tasks.add".to_string(), id, vec![4, 5, 6]);
1604
1605 assert_eq!(msg1, msg2);
1606 assert_ne!(msg1, msg3);
1607 }
1608
1609 #[test]
1610 fn test_message_equality_with_options() {
1611 let id = Uuid::new_v4();
1612 let parent_id = Uuid::new_v4();
1613 let body = vec![1, 2, 3];
1614
1615 let msg1 = Message::new("tasks.add".to_string(), id, body.clone())
1616 .with_priority(5)
1617 .with_parent(parent_id);
1618 let msg2 = Message::new("tasks.add".to_string(), id, body.clone())
1619 .with_priority(5)
1620 .with_parent(parent_id);
1621 let msg3 = Message::new("tasks.add".to_string(), id, body.clone())
1622 .with_priority(3)
1623 .with_parent(parent_id);
1624
1625 assert_eq!(msg1, msg2);
1626 assert_ne!(msg1, msg3);
1627 }
1628
1629 #[test]
1630 fn test_task_args_equality() {
1631 let args1 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1632 let args2 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1633 let args3 = TaskArgs::new().with_args(vec![serde_json::json!(3), serde_json::json!(4)]);
1634
1635 assert_eq!(args1, args2);
1636 assert_ne!(args1, args3);
1637 }
1638
1639 #[test]
1640 fn test_task_args_equality_with_kwargs() {
1641 let mut kwargs1 = std::collections::HashMap::new();
1642 kwargs1.insert("key".to_string(), serde_json::json!("value"));
1643
1644 let mut kwargs2 = std::collections::HashMap::new();
1645 kwargs2.insert("key".to_string(), serde_json::json!("value"));
1646
1647 let args1 = TaskArgs::new().with_kwargs(kwargs1);
1648 let args2 = TaskArgs::new().with_kwargs(kwargs2);
1649 let args3 = TaskArgs::new();
1650
1651 assert_eq!(args1, args2);
1652 assert_ne!(args1, args3);
1653 }
1654
1655 #[test]
1656 fn test_content_type_from_str_trait() {
1657 let ct1: ContentType = "application/json".into();
1658 let ct2: ContentType = "text/plain".into();
1659
1660 assert_eq!(ct1, ContentType::Json);
1661 assert_eq!(ct2, ContentType::Custom("text/plain".to_string()));
1662 }
1663
1664 #[test]
1665 fn test_content_encoding_from_str_trait() {
1666 let ce1: ContentEncoding = "utf-8".into();
1667 let ce2: ContentEncoding = "binary".into();
1668 let ce3: ContentEncoding = "gzip".into();
1669
1670 assert_eq!(ce1, ContentEncoding::Utf8);
1671 assert_eq!(ce2, ContentEncoding::Binary);
1672 assert_eq!(ce3, ContentEncoding::Custom("gzip".to_string()));
1673 }
1674
1675 #[test]
1676 fn test_content_type_as_ref() {
1677 let ct = ContentType::Json;
1678 let s: &str = ct.as_ref();
1679 assert_eq!(s, "application/json");
1680
1681 let ct_custom = ContentType::Custom("text/plain".to_string());
1682 let s_custom: &str = ct_custom.as_ref();
1683 assert_eq!(s_custom, "text/plain");
1684 }
1685
1686 #[test]
1687 fn test_content_encoding_as_ref() {
1688 let ce = ContentEncoding::Utf8;
1689 let s: &str = ce.as_ref();
1690 assert_eq!(s, "utf-8");
1691
1692 let ce_binary = ContentEncoding::Binary;
1693 let s_binary: &str = ce_binary.as_ref();
1694 assert_eq!(s_binary, "binary");
1695 }
1696
1697 #[test]
1698 fn test_content_type_hash() {
1699 use std::collections::HashSet;
1700
1701 let mut set = HashSet::new();
1702 set.insert(ContentType::Json);
1703 set.insert(ContentType::Json); #[cfg(feature = "msgpack")]
1705 set.insert(ContentType::MessagePack);
1706 set.insert(ContentType::Custom("text/plain".to_string()));
1707
1708 #[cfg(feature = "msgpack")]
1709 assert_eq!(set.len(), 3);
1710 #[cfg(not(feature = "msgpack"))]
1711 assert_eq!(set.len(), 2);
1712
1713 assert!(set.contains(&ContentType::Json));
1714 }
1715
1716 #[test]
1717 fn test_content_encoding_hash() {
1718 use std::collections::HashSet;
1719
1720 let mut set = HashSet::new();
1721 set.insert(ContentEncoding::Utf8);
1722 set.insert(ContentEncoding::Utf8); set.insert(ContentEncoding::Binary);
1724 set.insert(ContentEncoding::Custom("base64".to_string()));
1725
1726 assert_eq!(set.len(), 3);
1727 assert!(set.contains(&ContentEncoding::Utf8));
1728 assert!(set.contains(&ContentEncoding::Binary));
1729 }
1730
1731 #[test]
1732 fn test_message_with_retries() {
1733 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_retries(5);
1734
1735 assert_eq!(msg.headers.retries, Some(5));
1736 assert_eq!(msg.retry_count(), 5);
1737 }
1738
1739 #[test]
1740 fn test_message_with_correlation_id() {
1741 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1])
1742 .with_correlation_id("corr-123".to_string());
1743
1744 assert_eq!(msg.properties.correlation_id, Some("corr-123".to_string()));
1745 assert_eq!(msg.correlation_id(), Some("corr-123"));
1746 }
1747
1748 #[test]
1749 fn test_message_with_reply_to() {
1750 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1])
1751 .with_reply_to("reply-queue".to_string());
1752
1753 assert_eq!(msg.properties.reply_to, Some("reply-queue".to_string()));
1754 assert_eq!(msg.reply_to(), Some("reply-queue"));
1755 }
1756
1757 #[test]
1758 fn test_message_with_delivery_mode() {
1759 let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_delivery_mode(1);
1760
1761 assert_eq!(msg.properties.delivery_mode, 1);
1762 assert!(!msg.is_persistent());
1763
1764 let persistent_msg =
1765 Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_delivery_mode(2);
1766
1767 assert_eq!(persistent_msg.properties.delivery_mode, 2);
1768 assert!(persistent_msg.is_persistent());
1769 }
1770
1771 #[test]
1772 fn test_message_builder_chaining() {
1773 let parent_id = Uuid::new_v4();
1774 let msg = Message::new("test.task".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1775 .with_priority(7)
1776 .with_retries(3)
1777 .with_correlation_id("corr-456".to_string())
1778 .with_reply_to("result-queue".to_string())
1779 .with_parent(parent_id)
1780 .with_delivery_mode(1);
1781
1782 assert_eq!(msg.priority(), Some(7));
1783 assert_eq!(msg.retry_count(), 3);
1784 assert_eq!(msg.correlation_id(), Some("corr-456"));
1785 assert_eq!(msg.reply_to(), Some("result-queue"));
1786 assert_eq!(msg.headers.parent_id, Some(parent_id));
1787 assert_eq!(msg.properties.delivery_mode, 1);
1788 assert!(!msg.is_persistent());
1789 }
1790
1791 #[test]
1792 fn test_protocol_version_is_v2() {
1793 assert!(ProtocolVersion::V2.is_v2());
1794 assert!(!ProtocolVersion::V5.is_v2());
1795 }
1796
1797 #[test]
1798 fn test_protocol_version_is_v5() {
1799 assert!(ProtocolVersion::V5.is_v5());
1800 assert!(!ProtocolVersion::V2.is_v5());
1801 }
1802
1803 #[test]
1804 fn test_protocol_version_as_u8() {
1805 assert_eq!(ProtocolVersion::V2.as_u8(), 2);
1806 assert_eq!(ProtocolVersion::V5.as_u8(), 5);
1807 }
1808
1809 #[test]
1810 fn test_protocol_version_as_number_str() {
1811 assert_eq!(ProtocolVersion::V2.as_number_str(), "2");
1812 assert_eq!(ProtocolVersion::V5.as_number_str(), "5");
1813 }
1814
1815 #[test]
1816 fn test_task_args_from_json() {
1817 let json = r#"{"args":[1,2,3],"kwargs":{"key":"value"}}"#;
1818 let args = TaskArgs::from_json(json).unwrap();
1819
1820 assert_eq!(args.args.len(), 3);
1821 assert_eq!(args.kwargs.len(), 1);
1822 assert_eq!(args.get_kwarg("key"), Some(&serde_json::json!("value")));
1823 }
1824
1825 #[test]
1826 fn test_task_args_to_json() {
1827 let mut args = TaskArgs::new();
1828 args.add_arg(serde_json::json!(1));
1829 args.add_arg(serde_json::json!(2));
1830 args.add_kwarg("key".to_string(), serde_json::json!("value"));
1831
1832 let json = args.to_json().unwrap();
1833 assert!(json.contains("\"args\""));
1834 assert!(json.contains("\"kwargs\""));
1835 assert!(json.contains("\"key\""));
1836 }
1837
1838 #[test]
1839 fn test_task_args_to_json_pretty() {
1840 let args = TaskArgs::new()
1841 .with_args(vec![serde_json::json!(1)])
1842 .with_kwargs({
1843 let mut map = std::collections::HashMap::new();
1844 map.insert("test".to_string(), serde_json::json!("value"));
1845 map
1846 });
1847
1848 let json_pretty = args.to_json_pretty().unwrap();
1849 assert!(json_pretty.contains('\n')); }
1851
1852 #[test]
1853 fn test_message_is_ready_for_execution() {
1854 let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1855 assert!(msg1.is_ready_for_execution()); let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1858 .with_eta(chrono::Utc::now() - chrono::Duration::hours(1));
1859 assert!(msg2.is_ready_for_execution()); let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1862 .with_eta(chrono::Utc::now() + chrono::Duration::hours(1));
1863 assert!(!msg3.is_ready_for_execution()); }
1865
1866 #[test]
1867 fn test_message_is_not_expired() {
1868 let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1869 assert!(msg1.is_not_expired()); let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1872 .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1873 assert!(msg2.is_not_expired()); let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1876 .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1877 assert!(!msg3.is_not_expired()); }
1879
1880 #[test]
1881 fn test_message_should_process() {
1882 let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1884 assert!(msg1.should_process());
1885
1886 let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1888 .with_eta(chrono::Utc::now() + chrono::Duration::hours(1));
1889 assert!(!msg2.should_process());
1890
1891 let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1893 .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1894 assert!(!msg3.should_process());
1895
1896 let msg4 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1898 .with_eta(chrono::Utc::now() - chrono::Duration::hours(2))
1899 .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1900 assert!(!msg4.should_process());
1901 }
1902
1903 #[test]
1904 fn test_message_headers_builder() {
1905 let task_id = Uuid::new_v4();
1906 let root_id = Uuid::new_v4();
1907 let parent_id = Uuid::new_v4();
1908 let group_id = Uuid::new_v4();
1909
1910 let headers = MessageHeaders::new("task".to_string(), task_id)
1911 .with_lang("python".to_string())
1912 .with_root_id(root_id)
1913 .with_parent_id(parent_id)
1914 .with_group(group_id)
1915 .with_retries(3)
1916 .with_eta(chrono::Utc::now() + chrono::Duration::minutes(5))
1917 .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1918
1919 assert_eq!(headers.lang, "python");
1920 assert_eq!(headers.root_id, Some(root_id));
1921 assert_eq!(headers.parent_id, Some(parent_id));
1922 assert_eq!(headers.group, Some(group_id));
1923 assert_eq!(headers.retries, Some(3));
1924 assert!(headers.eta.is_some());
1925 assert!(headers.expires.is_some());
1926 }
1927
1928 #[test]
1929 fn test_message_properties_builder() {
1930 let props = MessageProperties::new()
1931 .with_correlation_id("corr-123".to_string())
1932 .with_reply_to("reply.queue".to_string())
1933 .with_delivery_mode(1)
1934 .with_priority(5);
1935
1936 assert_eq!(props.correlation_id, Some("corr-123".to_string()));
1937 assert_eq!(props.reply_to, Some("reply.queue".to_string()));
1938 assert_eq!(props.delivery_mode, 1);
1939 assert_eq!(props.priority, Some(5));
1940 }
1941
1942 #[test]
1943 fn test_message_with_eta_delay() {
1944 let before = chrono::Utc::now();
1945 let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1946 .with_eta_delay(chrono::Duration::minutes(10));
1947 let after = chrono::Utc::now();
1948
1949 assert!(msg.has_eta());
1950 let eta = msg.headers.eta.unwrap();
1951 assert!(eta > before + chrono::Duration::minutes(9));
1953 assert!(eta < after + chrono::Duration::minutes(11));
1954 }
1955
1956 #[test]
1957 fn test_message_with_expires_in() {
1958 let before = chrono::Utc::now();
1959 let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1960 .with_expires_in(chrono::Duration::hours(2));
1961 let after = chrono::Utc::now();
1962
1963 assert!(msg.has_expires());
1964 let expires = msg.headers.expires.unwrap();
1965 assert!(expires > before + chrono::Duration::hours(2) - chrono::Duration::seconds(1));
1967 assert!(expires < after + chrono::Duration::hours(2) + chrono::Duration::seconds(1));
1968 }
1969
1970 #[test]
1971 fn test_message_time_until_eta() {
1972 let msg1 = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
1974 assert!(msg1.time_until_eta().is_none());
1975
1976 let msg2 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1978 .with_eta(chrono::Utc::now() + chrono::Duration::minutes(30));
1979 let time_left = msg2.time_until_eta();
1980 assert!(time_left.is_some());
1981 assert!(time_left.unwrap() > chrono::Duration::minutes(29));
1982 assert!(time_left.unwrap() < chrono::Duration::minutes(31));
1983
1984 let msg3 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1986 .with_eta(chrono::Utc::now() - chrono::Duration::minutes(30));
1987 assert!(msg3.time_until_eta().is_none());
1988 }
1989
1990 #[test]
1991 fn test_message_time_until_expiration() {
1992 let msg1 = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
1994 assert!(msg1.time_until_expiration().is_none());
1995
1996 let msg2 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1998 .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1999 let time_left = msg2.time_until_expiration();
2000 assert!(time_left.is_some());
2001 assert!(time_left.unwrap() > chrono::Duration::minutes(59));
2002 assert!(time_left.unwrap() < chrono::Duration::minutes(61));
2003
2004 let msg3 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
2006 .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
2007 assert!(msg3.time_until_expiration().is_none());
2008 }
2009
2010 #[test]
2011 fn test_message_increment_retry() {
2012 let mut msg = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
2013
2014 assert_eq!(msg.retry_count(), 0);
2016
2017 let count1 = msg.increment_retry();
2019 assert_eq!(count1, 1);
2020 assert_eq!(msg.retry_count(), 1);
2021
2022 let count2 = msg.increment_retry();
2024 assert_eq!(count2, 2);
2025 assert_eq!(msg.retry_count(), 2);
2026 }
2027
2028 #[test]
2029 fn test_task_args_index_usize() {
2030 let args = TaskArgs::new().with_args(vec![
2031 serde_json::json!(1),
2032 serde_json::json!("hello"),
2033 serde_json::json!(true),
2034 ]);
2035
2036 assert_eq!(args[0], serde_json::json!(1));
2038 assert_eq!(args[1], serde_json::json!("hello"));
2039 assert_eq!(args[2], serde_json::json!(true));
2040 }
2041
2042 #[test]
2043 fn test_task_args_index_mut_usize() {
2044 let mut args = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
2045
2046 args[0] = serde_json::json!(100);
2048 args[1] = serde_json::json!(200);
2049
2050 assert_eq!(args[0], serde_json::json!(100));
2051 assert_eq!(args[1], serde_json::json!(200));
2052 }
2053
2054 #[test]
2055 fn test_task_args_index_str() {
2056 let mut kwargs = HashMap::new();
2057 kwargs.insert("name".to_string(), serde_json::json!("Alice"));
2058 kwargs.insert("age".to_string(), serde_json::json!(30));
2059
2060 let args = TaskArgs::new().with_kwargs(kwargs);
2061
2062 assert_eq!(args["name"], serde_json::json!("Alice"));
2064 assert_eq!(args["age"], serde_json::json!(30));
2065 }
2066
2067 #[test]
2068 #[should_panic(expected = "no entry found for key")]
2069 fn test_task_args_index_str_panic() {
2070 let args = TaskArgs::new();
2071 let _ = &args["nonexistent"]; }
2073
2074 #[test]
2075 fn test_task_args_into_iterator() {
2076 let args = TaskArgs::new().with_args(vec![
2077 serde_json::json!(1),
2078 serde_json::json!(2),
2079 serde_json::json!(3),
2080 ]);
2081
2082 let values: Vec<_> = args.into_iter().collect();
2084 assert_eq!(values.len(), 3);
2085 assert_eq!(values[0], serde_json::json!(1));
2086 assert_eq!(values[1], serde_json::json!(2));
2087 assert_eq!(values[2], serde_json::json!(3));
2088 }
2089
2090 #[test]
2091 fn test_task_args_into_iterator_ref() {
2092 let args = TaskArgs::new().with_args(vec![serde_json::json!(10), serde_json::json!(20)]);
2093
2094 let sum: i64 = (&args).into_iter().filter_map(|v| v.as_i64()).sum();
2096
2097 assert_eq!(sum, 30);
2098 assert_eq!(args.args.len(), 2);
2100 }
2101
2102 #[test]
2103 fn test_task_args_extend() {
2104 let mut args = TaskArgs::new().with_args(vec![serde_json::json!(1)]);
2105
2106 args.extend(vec![serde_json::json!(2), serde_json::json!(3)]);
2108
2109 assert_eq!(args.args.len(), 3);
2110 assert_eq!(args[0], serde_json::json!(1));
2111 assert_eq!(args[1], serde_json::json!(2));
2112 assert_eq!(args[2], serde_json::json!(3));
2113 }
2114
2115 #[test]
2116 fn test_task_args_extend_kwargs() {
2117 let mut args = TaskArgs::new();
2118
2119 args.extend(vec![
2121 ("key1".to_string(), serde_json::json!("value1")),
2122 ("key2".to_string(), serde_json::json!(42)),
2123 ]);
2124
2125 assert_eq!(args.kwargs.len(), 2);
2126 assert_eq!(args["key1"], serde_json::json!("value1"));
2127 assert_eq!(args["key2"], serde_json::json!(42));
2128 }
2129
2130 #[test]
2131 fn test_task_args_from_iterator() {
2132 let args: TaskArgs = vec![
2134 serde_json::json!(1),
2135 serde_json::json!("hello"),
2136 serde_json::json!(true),
2137 ]
2138 .into_iter()
2139 .collect();
2140
2141 assert_eq!(args.args.len(), 3);
2142 assert_eq!(args.kwargs.len(), 0);
2143 assert_eq!(args[0], serde_json::json!(1));
2144 assert_eq!(args[1], serde_json::json!("hello"));
2145 assert_eq!(args[2], serde_json::json!(true));
2146 }
2147
2148 #[test]
2149 fn test_task_args_from_iterator_range() {
2150 let args: TaskArgs = (1..=5).map(|i| serde_json::json!(i)).collect();
2152
2153 assert_eq!(args.args.len(), 5);
2154 assert_eq!(args[0], serde_json::json!(1));
2155 assert_eq!(args[4], serde_json::json!(5));
2156 }
2157
2158 #[test]
2159 fn test_task_args_iterator_chain() {
2160 let args1: TaskArgs = vec![serde_json::json!(1), serde_json::json!(2)]
2162 .into_iter()
2163 .collect();
2164
2165 let mut args2 = TaskArgs::new();
2166 args2.extend(vec![serde_json::json!(3), serde_json::json!(4)]);
2167
2168 args2.extend(args1);
2170
2171 assert_eq!(args2.args.len(), 4);
2172 assert_eq!(args2[0], serde_json::json!(3));
2173 assert_eq!(args2[3], serde_json::json!(2));
2174 }
2175}