celers_protocol/
lib.rs

1//! Celery Protocol v2/v5 implementation
2//!
3//! This crate provides the core protocol definitions for Celery message format,
4//! ensuring compatibility with Python Celery workers.
5//!
6//! # Protocol Compatibility
7//!
8//! - Celery Protocol v2 (Celery 4.x+)
9//! - Celery Protocol v5 (Celery 5.x+)
10//!
11//! # Message Format
12//!
13//! Messages consist of:
14//! - **Headers**: Task metadata (task name, ID, parent/root IDs, etc.)
15//! - **Properties**: AMQP properties (`correlation_id`, `reply_to`, `delivery_mode`)
16//! - **Body**: Serialized task arguments
17//! - **Content-Type**: Serialization format ("application/json", "application/x-msgpack")
18//! - **Content-Encoding**: Encoding format ("utf-8", "binary")
19//!
20//! # Modules
21//!
22//! - [`compat`] - Python Celery compatibility verification
23//! - [`serializer`] - Pluggable serialization framework
24//! - [`result`] - Task result message format
25//! - [`event`] - Celery event message format
26//! - [`compression`] - Message body compression
27//! - [`embed`] - Embedded body format (args, kwargs, embed)
28//! - [`negotiation`] - Protocol version negotiation
29//! - [`security`] - Security utilities and content-type whitelist
30//! - [`builder`] - Fluent message builder API
31//! - [`auth`] - Message authentication and signing (HMAC)
32//! - [`crypto`] - Message encryption (AES-256-GCM)
33//! - [`extensions`] - Message extensions and utility helpers
34//! - [`migration`] - Protocol version migration helpers
35//! - [`middleware`] - Message transformation middleware
36//! - [`zerocopy`] - Zero-copy deserialization for performance
37//! - [`lazy`] - Lazy deserialization for large messages
38//! - [`pool`] - Message pooling for memory efficiency
39//! - [`extension_api`] - Custom protocol extensions API
40//! - [`utils`] - Message utility helpers
41//! - [`batch`] - Batch message processing utilities
42//! - [`routing`] - Message routing helpers
43//! - [`retry`] - Retry strategy utilities
44//! - [`dedup`] - Message deduplication utilities
45//! - [`priority_queue`] - Priority-based message queues
46//! - [`workflow`] - Workflow and task chain utilities
47
48pub mod auth;
49pub mod batch;
50pub mod builder;
51pub mod compat;
52pub mod compression;
53pub mod crypto;
54pub mod dedup;
55pub mod embed;
56pub mod event;
57pub mod extension_api;
58pub mod extensions;
59pub mod lazy;
60pub mod middleware;
61pub mod migration;
62pub mod negotiation;
63pub mod pool;
64pub mod priority_queue;
65pub mod result;
66pub mod retry;
67pub mod routing;
68pub mod security;
69pub mod serializer;
70pub mod utils;
71pub mod workflow;
72pub mod zerocopy;
73
74use chrono::{DateTime, Utc};
75use serde::{Deserialize, Serialize};
76use std::collections::HashMap;
77use std::fmt;
78use uuid::Uuid;
79
80/// Common content type constants
81pub(crate) const CONTENT_TYPE_JSON: &str = "application/json";
82#[cfg(feature = "msgpack")]
83pub(crate) const CONTENT_TYPE_MSGPACK: &str = "application/x-msgpack";
84#[cfg(feature = "binary")]
85pub(crate) const CONTENT_TYPE_BINARY: &str = "application/octet-stream";
86
87/// Common encoding constants
88pub(crate) const ENCODING_UTF8: &str = "utf-8";
89pub(crate) const ENCODING_BINARY: &str = "binary";
90
91/// Default language
92pub(crate) const DEFAULT_LANG: &str = "rust";
93
94/// Validation errors for Celery protocol messages
95///
96/// # Examples
97///
98/// ```
99/// use celers_protocol::{Message, ValidationError};
100/// use uuid::Uuid;
101///
102/// // Create a message with an empty task name
103/// let msg = Message::new("".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
104///
105/// // Validation will fail with a structured error
106/// match msg.validate() {
107///     Ok(_) => panic!("Should have failed"),
108///     Err(ValidationError::EmptyTaskName) => {
109///         println!("Task name cannot be empty");
110///     }
111///     Err(e) => panic!("Unexpected error: {}", e),
112/// }
113/// ```
114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
115pub enum ValidationError {
116    /// Task name is empty
117    EmptyTaskName,
118    /// Retry count exceeds maximum
119    RetryLimitExceeded { retries: u32, max: u32 },
120    /// ETA is after expiration time
121    EtaAfterExpiration,
122    /// Invalid delivery mode (must be 1 or 2)
123    InvalidDeliveryMode { mode: u8 },
124    /// Invalid priority (must be 0-9)
125    InvalidPriority { priority: u8 },
126    /// Content type is empty
127    EmptyContentType,
128    /// Message body is empty
129    EmptyBody,
130    /// Message body exceeds size limit
131    BodyTooLarge { size: usize, max: usize },
132}
133
134impl fmt::Display for ValidationError {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            ValidationError::EmptyTaskName => write!(f, "Task name cannot be empty"),
138            ValidationError::RetryLimitExceeded { retries, max } => {
139                write!(f, "Retries ({}) cannot exceed {}", retries, max)
140            }
141            ValidationError::EtaAfterExpiration => {
142                write!(f, "ETA cannot be after expiration time")
143            }
144            ValidationError::InvalidDeliveryMode { mode } => {
145                write!(
146                    f,
147                    "Invalid delivery mode ({}): must be 1 (non-persistent) or 2 (persistent)",
148                    mode
149                )
150            }
151            ValidationError::InvalidPriority { priority } => {
152                write!(
153                    f,
154                    "Invalid priority ({}): must be between 0 and 9",
155                    priority
156                )
157            }
158            ValidationError::EmptyContentType => write!(f, "Content type cannot be empty"),
159            ValidationError::EmptyBody => write!(f, "Message body cannot be empty"),
160            ValidationError::BodyTooLarge { size, max } => {
161                write!(
162                    f,
163                    "Message body too large: {} bytes (max {} bytes)",
164                    size, max
165                )
166            }
167        }
168    }
169}
170
171impl std::error::Error for ValidationError {}
172
173/// Protocol version
174#[derive(
175    Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
176)]
177pub enum ProtocolVersion {
178    /// Protocol version 2 (Celery 4.x+)
179    #[default]
180    V2,
181    /// Protocol version 5 (Celery 5.x+)
182    V5,
183}
184
185impl std::fmt::Display for ProtocolVersion {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        match self {
188            ProtocolVersion::V2 => write!(f, "v2"),
189            ProtocolVersion::V5 => write!(f, "v5"),
190        }
191    }
192}
193
194impl std::str::FromStr for ProtocolVersion {
195    type Err = String;
196
197    fn from_str(s: &str) -> Result<Self, Self::Err> {
198        match s.to_lowercase().as_str() {
199            "v2" | "2" => Ok(ProtocolVersion::V2),
200            "v5" | "5" => Ok(ProtocolVersion::V5),
201            _ => Err(format!("Invalid protocol version: {}", s)),
202        }
203    }
204}
205
206impl ProtocolVersion {
207    /// Check if this is protocol version 2
208    #[inline]
209    pub const fn is_v2(self) -> bool {
210        matches!(self, ProtocolVersion::V2)
211    }
212
213    /// Check if this is protocol version 5
214    #[inline]
215    pub const fn is_v5(self) -> bool {
216        matches!(self, ProtocolVersion::V5)
217    }
218
219    /// Get the version number as u8
220    #[inline]
221    pub const fn as_u8(self) -> u8 {
222        match self {
223            ProtocolVersion::V2 => 2,
224            ProtocolVersion::V5 => 5,
225        }
226    }
227
228    /// Get the version number as a static string
229    #[inline]
230    pub const fn as_number_str(self) -> &'static str {
231        match self {
232            ProtocolVersion::V2 => "2",
233            ProtocolVersion::V5 => "5",
234        }
235    }
236}
237
238/// Content type for serialization
239#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
240pub enum ContentType {
241    /// JSON serialization
242    #[default]
243    Json,
244    /// MessagePack serialization
245    #[cfg(feature = "msgpack")]
246    MessagePack,
247    /// Binary serialization
248    #[cfg(feature = "binary")]
249    Binary,
250    /// Custom content type
251    Custom(String),
252}
253
254impl ContentType {
255    #[inline]
256    pub fn as_str(&self) -> &str {
257        match self {
258            ContentType::Json => CONTENT_TYPE_JSON,
259            #[cfg(feature = "msgpack")]
260            ContentType::MessagePack => CONTENT_TYPE_MSGPACK,
261            #[cfg(feature = "binary")]
262            ContentType::Binary => CONTENT_TYPE_BINARY,
263            ContentType::Custom(s) => s,
264        }
265    }
266}
267
268impl std::fmt::Display for ContentType {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        write!(f, "{}", self.as_str())
271    }
272}
273
274impl std::str::FromStr for ContentType {
275    type Err = String;
276
277    fn from_str(s: &str) -> Result<Self, Self::Err> {
278        match s {
279            CONTENT_TYPE_JSON => Ok(ContentType::Json),
280            #[cfg(feature = "msgpack")]
281            CONTENT_TYPE_MSGPACK => Ok(ContentType::MessagePack),
282            #[cfg(feature = "binary")]
283            CONTENT_TYPE_BINARY => Ok(ContentType::Binary),
284            other => Ok(ContentType::Custom(other.to_string())),
285        }
286    }
287}
288
289impl From<&str> for ContentType {
290    fn from(s: &str) -> Self {
291        match s {
292            CONTENT_TYPE_JSON => ContentType::Json,
293            #[cfg(feature = "msgpack")]
294            CONTENT_TYPE_MSGPACK => ContentType::MessagePack,
295            #[cfg(feature = "binary")]
296            CONTENT_TYPE_BINARY => ContentType::Binary,
297            other => ContentType::Custom(other.to_string()),
298        }
299    }
300}
301
302impl AsRef<str> for ContentType {
303    fn as_ref(&self) -> &str {
304        self.as_str()
305    }
306}
307
308/// Content encoding
309#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
310pub enum ContentEncoding {
311    /// UTF-8 encoding
312    #[default]
313    Utf8,
314    /// Binary encoding
315    Binary,
316    /// Custom encoding
317    Custom(String),
318}
319
320impl ContentEncoding {
321    #[inline]
322    pub fn as_str(&self) -> &str {
323        match self {
324            ContentEncoding::Utf8 => ENCODING_UTF8,
325            ContentEncoding::Binary => ENCODING_BINARY,
326            ContentEncoding::Custom(s) => s,
327        }
328    }
329}
330
331impl std::fmt::Display for ContentEncoding {
332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333        write!(f, "{}", self.as_str())
334    }
335}
336
337impl std::str::FromStr for ContentEncoding {
338    type Err = String;
339
340    fn from_str(s: &str) -> Result<Self, Self::Err> {
341        match s {
342            ENCODING_UTF8 => Ok(ContentEncoding::Utf8),
343            ENCODING_BINARY => Ok(ContentEncoding::Binary),
344            other => Ok(ContentEncoding::Custom(other.to_string())),
345        }
346    }
347}
348
349impl From<&str> for ContentEncoding {
350    fn from(s: &str) -> Self {
351        match s {
352            ENCODING_UTF8 => ContentEncoding::Utf8,
353            ENCODING_BINARY => ContentEncoding::Binary,
354            other => ContentEncoding::Custom(other.to_string()),
355        }
356    }
357}
358
359impl AsRef<str> for ContentEncoding {
360    fn as_ref(&self) -> &str {
361        self.as_str()
362    }
363}
364
365/// Message headers (Celery protocol)
366#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
367pub struct MessageHeaders {
368    /// Task name (e.g., "tasks.add")
369    pub task: String,
370
371    /// Task ID (UUID)
372    pub id: Uuid,
373
374    /// Programming language ("rust", "py")
375    #[serde(default = "default_lang")]
376    pub lang: String,
377
378    /// Root task ID (for workflow tracking)
379    #[serde(skip_serializing_if = "Option::is_none")]
380    pub root_id: Option<Uuid>,
381
382    /// Parent task ID (for nested tasks)
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub parent_id: Option<Uuid>,
385
386    /// Group ID (for grouped tasks)
387    #[serde(skip_serializing_if = "Option::is_none")]
388    pub group: Option<Uuid>,
389
390    /// Maximum retries
391    #[serde(skip_serializing_if = "Option::is_none")]
392    pub retries: Option<u32>,
393
394    /// ETA (Estimated Time of Arrival) for delayed tasks
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub eta: Option<DateTime<Utc>>,
397
398    /// Task expiration timestamp
399    #[serde(skip_serializing_if = "Option::is_none")]
400    pub expires: Option<DateTime<Utc>>,
401
402    /// Additional custom headers
403    #[serde(flatten)]
404    pub extra: HashMap<String, serde_json::Value>,
405}
406
407fn default_lang() -> String {
408    DEFAULT_LANG.to_string()
409}
410
411impl MessageHeaders {
412    pub fn new(task: String, id: Uuid) -> Self {
413        Self {
414            task,
415            id,
416            lang: default_lang(),
417            root_id: None,
418            parent_id: None,
419            group: None,
420            retries: None,
421            eta: None,
422            expires: None,
423            extra: HashMap::new(),
424        }
425    }
426
427    /// Set the language field (builder pattern)
428    #[must_use]
429    pub fn with_lang(mut self, lang: String) -> Self {
430        self.lang = lang;
431        self
432    }
433
434    /// Set the root ID field (builder pattern)
435    #[must_use]
436    pub fn with_root_id(mut self, root_id: Uuid) -> Self {
437        self.root_id = Some(root_id);
438        self
439    }
440
441    /// Set the parent ID field (builder pattern)
442    #[must_use]
443    pub fn with_parent_id(mut self, parent_id: Uuid) -> Self {
444        self.parent_id = Some(parent_id);
445        self
446    }
447
448    /// Set the group field (builder pattern)
449    #[must_use]
450    pub fn with_group(mut self, group: Uuid) -> Self {
451        self.group = Some(group);
452        self
453    }
454
455    /// Set the retries field (builder pattern)
456    #[must_use]
457    pub fn with_retries(mut self, retries: u32) -> Self {
458        self.retries = Some(retries);
459        self
460    }
461
462    /// Set the ETA field (builder pattern)
463    #[must_use]
464    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
465        self.eta = Some(eta);
466        self
467    }
468
469    /// Set the expires field (builder pattern)
470    #[must_use]
471    pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
472        self.expires = Some(expires);
473        self
474    }
475
476    /// Validate message headers
477    pub fn validate(&self) -> Result<(), ValidationError> {
478        if self.task.is_empty() {
479            return Err(ValidationError::EmptyTaskName);
480        }
481
482        if let Some(retries) = self.retries {
483            if retries > 1000 {
484                return Err(ValidationError::RetryLimitExceeded { retries, max: 1000 });
485            }
486        }
487
488        // Validate ETA and expiration relationship
489        if let (Some(eta), Some(expires)) = (self.eta, self.expires) {
490            if eta > expires {
491                return Err(ValidationError::EtaAfterExpiration);
492            }
493        }
494
495        Ok(())
496    }
497}
498
499/// Message properties (AMQP-like)
500#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
501pub struct MessageProperties {
502    /// Correlation ID for RPC-style calls
503    #[serde(skip_serializing_if = "Option::is_none")]
504    pub correlation_id: Option<String>,
505
506    /// Reply-to queue for results
507    #[serde(skip_serializing_if = "Option::is_none")]
508    pub reply_to: Option<String>,
509
510    /// Delivery mode (1 = non-persistent, 2 = persistent)
511    #[serde(default = "default_delivery_mode")]
512    pub delivery_mode: u8,
513
514    /// Priority (0-9, higher = more priority)
515    #[serde(skip_serializing_if = "Option::is_none")]
516    pub priority: Option<u8>,
517}
518
519const fn default_delivery_mode() -> u8 {
520    2 // Persistent by default
521}
522
523impl Default for MessageProperties {
524    fn default() -> Self {
525        Self {
526            correlation_id: None,
527            reply_to: None,
528            delivery_mode: default_delivery_mode(),
529            priority: None,
530        }
531    }
532}
533
534impl MessageProperties {
535    /// Create new MessageProperties with default values
536    pub fn new() -> Self {
537        Self::default()
538    }
539
540    /// Set correlation ID (builder pattern)
541    #[must_use]
542    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
543        self.correlation_id = Some(correlation_id);
544        self
545    }
546
547    /// Set reply-to queue (builder pattern)
548    #[must_use]
549    pub fn with_reply_to(mut self, reply_to: String) -> Self {
550        self.reply_to = Some(reply_to);
551        self
552    }
553
554    /// Set delivery mode (builder pattern)
555    #[must_use]
556    pub fn with_delivery_mode(mut self, delivery_mode: u8) -> Self {
557        self.delivery_mode = delivery_mode;
558        self
559    }
560
561    /// Set priority (builder pattern)
562    #[must_use]
563    pub fn with_priority(mut self, priority: u8) -> Self {
564        self.priority = Some(priority);
565        self
566    }
567
568    /// Validate message properties
569    pub fn validate(&self) -> Result<(), ValidationError> {
570        if self.delivery_mode != 1 && self.delivery_mode != 2 {
571            return Err(ValidationError::InvalidDeliveryMode {
572                mode: self.delivery_mode,
573            });
574        }
575
576        if let Some(priority) = self.priority {
577            if priority > 9 {
578                return Err(ValidationError::InvalidPriority { priority });
579            }
580        }
581
582        Ok(())
583    }
584}
585
586/// Complete Celery message
587#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
588pub struct Message {
589    /// Message headers
590    pub headers: MessageHeaders,
591
592    /// Message properties
593    pub properties: MessageProperties,
594
595    /// Serialized body (task arguments)
596    #[serde(with = "serde_bytes_opt")]
597    pub body: Vec<u8>,
598
599    /// Content type
600    #[serde(rename = "content-type")]
601    pub content_type: String,
602
603    /// Content encoding
604    #[serde(rename = "content-encoding")]
605    pub content_encoding: String,
606}
607
608// Custom serde module for optional byte arrays
609mod serde_bytes_opt {
610    use base64::Engine;
611    use serde::de::Error;
612    use serde::{Deserialize, Deserializer, Serializer};
613
614    pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
615    where
616        S: Serializer,
617    {
618        // Serialize as base64 string for JSON compatibility
619        let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
620        serializer.serialize_str(&encoded)
621    }
622
623    pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
624    where
625        D: Deserializer<'de>,
626    {
627        let s = String::deserialize(deserializer)?;
628        base64::engine::general_purpose::STANDARD
629            .decode(&s)
630            .map_err(Error::custom)
631    }
632}
633
634impl Message {
635    /// Create a new message with JSON body
636    pub fn new(task: String, id: Uuid, body: Vec<u8>) -> Self {
637        Self {
638            headers: MessageHeaders::new(task, id),
639            properties: MessageProperties::default(),
640            body,
641            content_type: CONTENT_TYPE_JSON.to_string(),
642            content_encoding: ENCODING_UTF8.to_string(),
643        }
644    }
645
646    /// Set priority (0-9)
647    #[must_use]
648    pub fn with_priority(mut self, priority: u8) -> Self {
649        self.properties.priority = Some(priority);
650        self
651    }
652
653    /// Set parent task ID
654    #[must_use]
655    pub fn with_parent(mut self, parent_id: Uuid) -> Self {
656        self.headers.parent_id = Some(parent_id);
657        self
658    }
659
660    /// Set root task ID
661    #[must_use]
662    pub fn with_root(mut self, root_id: Uuid) -> Self {
663        self.headers.root_id = Some(root_id);
664        self
665    }
666
667    /// Set group ID
668    #[must_use]
669    pub fn with_group(mut self, group: Uuid) -> Self {
670        self.headers.group = Some(group);
671        self
672    }
673
674    /// Set ETA (delayed execution)
675    #[must_use]
676    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
677        self.headers.eta = Some(eta);
678        self
679    }
680
681    /// Set expiration
682    #[must_use]
683    pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
684        self.headers.expires = Some(expires);
685        self
686    }
687
688    /// Set retry count
689    #[must_use]
690    pub fn with_retries(mut self, retries: u32) -> Self {
691        self.headers.retries = Some(retries);
692        self
693    }
694
695    /// Set correlation ID (for RPC-style calls)
696    #[must_use]
697    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
698        self.properties.correlation_id = Some(correlation_id);
699        self
700    }
701
702    /// Set reply-to queue (for results)
703    #[must_use]
704    pub fn with_reply_to(mut self, reply_to: String) -> Self {
705        self.properties.reply_to = Some(reply_to);
706        self
707    }
708
709    /// Set delivery mode (1 = non-persistent, 2 = persistent)
710    #[must_use]
711    pub fn with_delivery_mode(mut self, mode: u8) -> Self {
712        self.properties.delivery_mode = mode;
713        self
714    }
715
716    /// Validate the complete message
717    ///
718    /// Validates:
719    /// - Headers (task name, retries, eta/expires)
720    /// - Properties (delivery mode, priority)
721    /// - Content type format
722    /// - Body size
723    pub fn validate(&self) -> Result<(), ValidationError> {
724        // Validate headers
725        self.headers.validate()?;
726
727        // Validate properties
728        self.properties.validate()?;
729
730        // Validate content type
731        if self.content_type.is_empty() {
732            return Err(ValidationError::EmptyContentType);
733        }
734
735        // Validate body
736        if self.body.is_empty() {
737            return Err(ValidationError::EmptyBody);
738        }
739
740        if self.body.len() > 10_485_760 {
741            // 10MB limit
742            return Err(ValidationError::BodyTooLarge {
743                size: self.body.len(),
744                max: 10_485_760,
745            });
746        }
747
748        Ok(())
749    }
750
751    /// Validate with custom body size limit
752    pub fn validate_with_limit(&self, max_body_bytes: usize) -> Result<(), ValidationError> {
753        self.headers.validate()?;
754        self.properties.validate()?;
755
756        if self.content_type.is_empty() {
757            return Err(ValidationError::EmptyContentType);
758        }
759
760        if self.body.is_empty() {
761            return Err(ValidationError::EmptyBody);
762        }
763
764        if self.body.len() > max_body_bytes {
765            return Err(ValidationError::BodyTooLarge {
766                size: self.body.len(),
767                max: max_body_bytes,
768            });
769        }
770
771        Ok(())
772    }
773
774    /// Check if the message has an ETA (delayed execution)
775    #[inline(always)]
776    pub fn has_eta(&self) -> bool {
777        self.headers.eta.is_some()
778    }
779
780    /// Check if the message has an expiration time
781    #[inline(always)]
782    pub fn has_expires(&self) -> bool {
783        self.headers.expires.is_some()
784    }
785
786    /// Check if the message is part of a group
787    #[inline(always)]
788    pub fn has_group(&self) -> bool {
789        self.headers.group.is_some()
790    }
791
792    /// Check if the message has a parent task
793    #[inline(always)]
794    pub fn has_parent(&self) -> bool {
795        self.headers.parent_id.is_some()
796    }
797
798    /// Check if the message has a root task
799    #[inline(always)]
800    pub fn has_root(&self) -> bool {
801        self.headers.root_id.is_some()
802    }
803
804    /// Check if the message is persistent
805    #[inline(always)]
806    pub fn is_persistent(&self) -> bool {
807        self.properties.delivery_mode == 2
808    }
809
810    /// Get the task ID
811    #[inline(always)]
812    pub fn task_id(&self) -> uuid::Uuid {
813        self.headers.id
814    }
815
816    /// Get the task name
817    #[inline(always)]
818    pub fn task_name(&self) -> &str {
819        &self.headers.task
820    }
821
822    /// Get the content type as a string slice
823    #[inline(always)]
824    pub fn content_type_str(&self) -> &str {
825        &self.content_type
826    }
827
828    /// Get the content encoding as a string slice
829    #[inline(always)]
830    pub fn content_encoding_str(&self) -> &str {
831        &self.content_encoding
832    }
833
834    /// Get the message body size in bytes
835    #[inline(always)]
836    pub fn body_size(&self) -> usize {
837        self.body.len()
838    }
839
840    /// Check if the message body is empty
841    #[inline(always)]
842    pub fn has_empty_body(&self) -> bool {
843        self.body.is_empty()
844    }
845
846    /// Get the retry count (0 if not set)
847    #[inline(always)]
848    pub fn retry_count(&self) -> u32 {
849        self.headers.retries.unwrap_or(0)
850    }
851
852    /// Get the priority (None if not set)
853    #[inline(always)]
854    pub fn priority(&self) -> Option<u8> {
855        self.properties.priority
856    }
857
858    /// Check if message has a correlation ID
859    #[inline(always)]
860    pub fn has_correlation_id(&self) -> bool {
861        self.properties.correlation_id.is_some()
862    }
863
864    /// Get the correlation ID
865    #[inline]
866    pub fn correlation_id(&self) -> Option<&str> {
867        self.properties.correlation_id.as_deref()
868    }
869
870    /// Get the reply-to queue
871    #[inline]
872    pub fn reply_to(&self) -> Option<&str> {
873        self.properties.reply_to.as_deref()
874    }
875
876    /// Check if this is a workflow message (has parent, root, or group)
877    #[inline(always)]
878    pub fn is_workflow_message(&self) -> bool {
879        self.has_parent() || self.has_root() || self.has_group()
880    }
881
882    /// Clone the message with a new task ID
883    #[must_use]
884    pub fn with_new_id(&self) -> Self {
885        let mut cloned = self.clone();
886        cloned.headers.id = Uuid::new_v4();
887        cloned
888    }
889
890    /// Create a builder from this message (for modification)
891    ///
892    /// Note: This creates a new builder with the message's metadata.
893    /// The body (args/kwargs) must be set separately on the builder.
894    pub fn to_builder(&self) -> crate::builder::MessageBuilder {
895        let mut builder = crate::builder::MessageBuilder::new(&self.headers.task);
896
897        // Set basic properties
898        builder = builder.id(self.headers.id);
899
900        // Set optional fields
901        if let Some(priority) = self.properties.priority {
902            builder = builder.priority(priority);
903        }
904        if let Some(parent_id) = self.headers.parent_id {
905            builder = builder.parent(parent_id);
906        }
907        if let Some(root_id) = self.headers.root_id {
908            builder = builder.root(root_id);
909        }
910        if let Some(group) = self.headers.group {
911            builder = builder.group(group);
912        }
913        if let Some(eta) = self.headers.eta {
914            builder = builder.eta(eta);
915        }
916        if let Some(expires) = self.headers.expires {
917            builder = builder.expires(expires);
918        }
919
920        builder
921    }
922
923    /// Check if the message is ready for immediate execution (not delayed)
924    #[inline]
925    pub fn is_ready_for_execution(&self) -> bool {
926        match self.headers.eta {
927            None => true,
928            Some(eta) => chrono::Utc::now() >= eta,
929        }
930    }
931
932    /// Check if the message has not expired yet
933    #[inline]
934    pub fn is_not_expired(&self) -> bool {
935        match self.headers.expires {
936            None => true,
937            Some(expires) => chrono::Utc::now() < expires,
938        }
939    }
940
941    /// Check if the message should be processed (not expired and ready for execution)
942    #[inline]
943    pub fn should_process(&self) -> bool {
944        self.is_ready_for_execution() && self.is_not_expired()
945    }
946
947    /// Set ETA to now + duration (builder pattern)
948    ///
949    /// # Examples
950    ///
951    /// ```
952    /// use celers_protocol::Message;
953    /// use uuid::Uuid;
954    /// use chrono::Duration;
955    ///
956    /// let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
957    ///     .with_eta_delay(Duration::minutes(5));
958    /// assert!(msg.has_eta());
959    /// ```
960    #[must_use]
961    pub fn with_eta_delay(mut self, delay: chrono::Duration) -> Self {
962        self.headers.eta = Some(chrono::Utc::now() + delay);
963        self
964    }
965
966    /// Set expiration to now + duration (builder pattern)
967    ///
968    /// # Examples
969    ///
970    /// ```
971    /// use celers_protocol::Message;
972    /// use uuid::Uuid;
973    /// use chrono::Duration;
974    ///
975    /// let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
976    ///     .with_expires_in(Duration::hours(1));
977    /// assert!(msg.has_expires());
978    /// ```
979    #[must_use]
980    pub fn with_expires_in(mut self, duration: chrono::Duration) -> Self {
981        self.headers.expires = Some(chrono::Utc::now() + duration);
982        self
983    }
984
985    /// Get the time remaining until ETA (None if no ETA or already past)
986    #[inline]
987    pub fn time_until_eta(&self) -> Option<chrono::Duration> {
988        self.headers.eta.and_then(|eta| {
989            let now = chrono::Utc::now();
990            if eta > now {
991                Some(eta - now)
992            } else {
993                None
994            }
995        })
996    }
997
998    /// Get the time remaining until expiration (None if no expiration or already expired)
999    #[inline]
1000    pub fn time_until_expiration(&self) -> Option<chrono::Duration> {
1001        self.headers.expires.and_then(|expires| {
1002            let now = chrono::Utc::now();
1003            if expires > now {
1004                Some(expires - now)
1005            } else {
1006                None
1007            }
1008        })
1009    }
1010
1011    /// Increment the retry count (returns new count)
1012    pub fn increment_retry(&mut self) -> u32 {
1013        let new_count = self.headers.retries.unwrap_or(0) + 1;
1014        self.headers.retries = Some(new_count);
1015        new_count
1016    }
1017}
1018
1019/// Task arguments (args, kwargs)
1020#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
1021pub struct TaskArgs {
1022    /// Positional arguments
1023    #[serde(default)]
1024    pub args: Vec<serde_json::Value>,
1025
1026    /// Keyword arguments
1027    #[serde(default)]
1028    pub kwargs: HashMap<String, serde_json::Value>,
1029}
1030
1031impl TaskArgs {
1032    /// Create a new empty TaskArgs
1033    pub fn new() -> Self {
1034        Self::default()
1035    }
1036
1037    /// Set all positional arguments at once (builder pattern)
1038    #[must_use]
1039    pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
1040        self.args = args;
1041        self
1042    }
1043
1044    /// Set all keyword arguments at once (builder pattern)
1045    #[must_use]
1046    pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
1047        self.kwargs = kwargs;
1048        self
1049    }
1050
1051    /// Add a single positional argument
1052    pub fn add_arg(&mut self, arg: serde_json::Value) {
1053        self.args.push(arg);
1054    }
1055
1056    /// Add a single keyword argument
1057    pub fn add_kwarg(&mut self, key: String, value: serde_json::Value) {
1058        self.kwargs.insert(key, value);
1059    }
1060
1061    /// Check if both args and kwargs are empty
1062    #[inline(always)]
1063    pub fn is_empty(&self) -> bool {
1064        self.args.is_empty() && self.kwargs.is_empty()
1065    }
1066
1067    /// Get the total number of arguments (positional + keyword)
1068    #[inline(always)]
1069    pub fn len(&self) -> usize {
1070        self.args.len() + self.kwargs.len()
1071    }
1072
1073    /// Check if there are any positional arguments
1074    #[inline(always)]
1075    pub fn has_args(&self) -> bool {
1076        !self.args.is_empty()
1077    }
1078
1079    /// Check if there are any keyword arguments
1080    #[inline(always)]
1081    pub fn has_kwargs(&self) -> bool {
1082        !self.kwargs.is_empty()
1083    }
1084
1085    /// Clear all arguments
1086    pub fn clear(&mut self) {
1087        self.args.clear();
1088        self.kwargs.clear();
1089    }
1090
1091    /// Get a positional argument by index
1092    #[inline]
1093    pub fn get_arg(&self, index: usize) -> Option<&serde_json::Value> {
1094        self.args.get(index)
1095    }
1096
1097    /// Get a keyword argument by key
1098    #[inline]
1099    pub fn get_kwarg(&self, key: &str) -> Option<&serde_json::Value> {
1100        self.kwargs.get(key)
1101    }
1102
1103    /// Create TaskArgs from a JSON string
1104    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
1105        serde_json::from_str(json)
1106    }
1107
1108    /// Convert TaskArgs to a JSON string
1109    pub fn to_json(&self) -> Result<String, serde_json::Error> {
1110        serde_json::to_string(self)
1111    }
1112
1113    /// Convert TaskArgs to pretty-printed JSON
1114    pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
1115        serde_json::to_string_pretty(self)
1116    }
1117}
1118
1119// Index trait for accessing positional arguments by index
1120impl std::ops::Index<usize> for TaskArgs {
1121    type Output = serde_json::Value;
1122
1123    #[inline]
1124    fn index(&self, index: usize) -> &Self::Output {
1125        &self.args[index]
1126    }
1127}
1128
1129// IndexMut trait for mutating positional arguments by index
1130impl std::ops::IndexMut<usize> for TaskArgs {
1131    #[inline]
1132    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
1133        &mut self.args[index]
1134    }
1135}
1136
1137// Index trait for accessing keyword arguments by string key
1138impl std::ops::Index<&str> for TaskArgs {
1139    type Output = serde_json::Value;
1140
1141    #[inline]
1142    fn index(&self, key: &str) -> &Self::Output {
1143        &self.kwargs[key]
1144    }
1145}
1146
1147// IntoIterator for TaskArgs - iterates over positional args
1148impl IntoIterator for TaskArgs {
1149    type Item = serde_json::Value;
1150    type IntoIter = std::vec::IntoIter<serde_json::Value>;
1151
1152    fn into_iter(self) -> Self::IntoIter {
1153        self.args.into_iter()
1154    }
1155}
1156
1157// IntoIterator for &TaskArgs - iterates over positional args by reference
1158impl<'a> IntoIterator for &'a TaskArgs {
1159    type Item = &'a serde_json::Value;
1160    type IntoIter = std::slice::Iter<'a, serde_json::Value>;
1161
1162    fn into_iter(self) -> Self::IntoIter {
1163        self.args.iter()
1164    }
1165}
1166
1167// Extend trait for TaskArgs - extend with more positional arguments
1168impl Extend<serde_json::Value> for TaskArgs {
1169    fn extend<T: IntoIterator<Item = serde_json::Value>>(&mut self, iter: T) {
1170        self.args.extend(iter);
1171    }
1172}
1173
1174// Extend trait for TaskArgs with key-value pairs for kwargs
1175impl Extend<(String, serde_json::Value)> for TaskArgs {
1176    fn extend<T: IntoIterator<Item = (String, serde_json::Value)>>(&mut self, iter: T) {
1177        self.kwargs.extend(iter);
1178    }
1179}
1180
1181// FromIterator for TaskArgs - build from iterator of positional args
1182impl FromIterator<serde_json::Value> for TaskArgs {
1183    fn from_iter<T: IntoIterator<Item = serde_json::Value>>(iter: T) -> Self {
1184        Self {
1185            args: iter.into_iter().collect(),
1186            kwargs: HashMap::new(),
1187        }
1188    }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::*;
1194
1195    #[test]
1196    fn test_message_creation() {
1197        let task_id = Uuid::new_v4();
1198        let body = serde_json::to_vec(&TaskArgs::new()).unwrap();
1199        let msg = Message::new("tasks.add".to_string(), task_id, body);
1200
1201        assert_eq!(msg.headers.task, "tasks.add");
1202        assert_eq!(msg.headers.id, task_id);
1203        assert_eq!(msg.headers.lang, "rust");
1204        assert_eq!(msg.content_type, "application/json");
1205    }
1206
1207    #[test]
1208    fn test_message_with_priority() {
1209        let task_id = Uuid::new_v4();
1210        let body = vec![];
1211        let msg = Message::new("tasks.test".to_string(), task_id, body).with_priority(9);
1212
1213        assert_eq!(msg.properties.priority, Some(9));
1214    }
1215
1216    #[test]
1217    fn test_task_args() {
1218        let args = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1219
1220        assert_eq!(args.args.len(), 2);
1221        assert_eq!(args.kwargs.len(), 0);
1222    }
1223
1224    #[test]
1225    fn test_protocol_version_default() {
1226        let version = ProtocolVersion::default();
1227        assert_eq!(version, ProtocolVersion::V2);
1228    }
1229
1230    #[test]
1231    fn test_protocol_version_display() {
1232        assert_eq!(ProtocolVersion::V2.to_string(), "v2");
1233        assert_eq!(ProtocolVersion::V5.to_string(), "v5");
1234    }
1235
1236    #[test]
1237    fn test_content_type_as_str() {
1238        assert_eq!(ContentType::Json.as_str(), "application/json");
1239        assert_eq!(
1240            ContentType::Custom("text/plain".to_string()).as_str(),
1241            "text/plain"
1242        );
1243    }
1244
1245    #[test]
1246    fn test_content_type_default() {
1247        let ct = ContentType::default();
1248        assert_eq!(ct, ContentType::Json);
1249    }
1250
1251    #[test]
1252    fn test_content_type_display() {
1253        assert_eq!(ContentType::Json.to_string(), "application/json");
1254        assert_eq!(
1255            ContentType::Custom("text/xml".to_string()).to_string(),
1256            "text/xml"
1257        );
1258    }
1259
1260    #[test]
1261    fn test_content_encoding_as_str() {
1262        assert_eq!(ContentEncoding::Utf8.as_str(), "utf-8");
1263        assert_eq!(ContentEncoding::Binary.as_str(), "binary");
1264        assert_eq!(ContentEncoding::Custom("gzip".to_string()).as_str(), "gzip");
1265    }
1266
1267    #[test]
1268    fn test_content_encoding_default() {
1269        let ce = ContentEncoding::default();
1270        assert_eq!(ce, ContentEncoding::Utf8);
1271    }
1272
1273    #[test]
1274    fn test_content_encoding_display() {
1275        assert_eq!(ContentEncoding::Utf8.to_string(), "utf-8");
1276        assert_eq!(ContentEncoding::Binary.to_string(), "binary");
1277    }
1278
1279    #[test]
1280    fn test_message_headers_validate_empty_task() {
1281        let headers = MessageHeaders::new("".to_string(), Uuid::new_v4());
1282        let result = headers.validate();
1283        assert!(result.is_err());
1284        assert_eq!(result.unwrap_err(), ValidationError::EmptyTaskName);
1285    }
1286
1287    #[test]
1288    fn test_message_headers_validate_retries_limit() {
1289        let mut headers = MessageHeaders::new("test".to_string(), Uuid::new_v4());
1290        headers.retries = Some(1001);
1291        let result = headers.validate();
1292        assert!(result.is_err());
1293        assert_eq!(
1294            result.unwrap_err(),
1295            ValidationError::RetryLimitExceeded {
1296                retries: 1001,
1297                max: 1000
1298            }
1299        );
1300    }
1301
1302    #[test]
1303    fn test_message_headers_validate_eta_expires() {
1304        let mut headers = MessageHeaders::new("test".to_string(), Uuid::new_v4());
1305        headers.eta = Some(Utc::now() + chrono::Duration::hours(2));
1306        headers.expires = Some(Utc::now() + chrono::Duration::hours(1));
1307        let result = headers.validate();
1308        assert!(result.is_err());
1309        assert_eq!(result.unwrap_err(), ValidationError::EtaAfterExpiration);
1310    }
1311
1312    #[test]
1313    fn test_message_properties_validate_delivery_mode() {
1314        let props = MessageProperties {
1315            delivery_mode: 3,
1316            ..MessageProperties::default()
1317        };
1318        let result = props.validate();
1319        assert!(result.is_err());
1320        assert_eq!(
1321            result.unwrap_err(),
1322            ValidationError::InvalidDeliveryMode { mode: 3 }
1323        );
1324    }
1325
1326    #[test]
1327    fn test_message_properties_validate_priority() {
1328        let props = MessageProperties {
1329            delivery_mode: 2, // Set valid delivery mode
1330            priority: Some(10),
1331            ..MessageProperties::default()
1332        };
1333        let result = props.validate();
1334        assert!(result.is_err());
1335        assert_eq!(
1336            result.unwrap_err(),
1337            ValidationError::InvalidPriority { priority: 10 }
1338        );
1339    }
1340
1341    #[test]
1342    fn test_message_predicates() {
1343        let task_id = Uuid::new_v4();
1344        let parent_id = Uuid::new_v4();
1345        let root_id = Uuid::new_v4();
1346        let group_id = Uuid::new_v4();
1347
1348        let mut msg = Message::new("test".to_string(), task_id, vec![1, 2, 3])
1349            .with_parent(parent_id)
1350            .with_root(root_id)
1351            .with_group(group_id)
1352            .with_eta(Utc::now() + chrono::Duration::hours(1))
1353            .with_expires(Utc::now() + chrono::Duration::days(1));
1354
1355        // Set delivery_mode to 2 for persistence check
1356        msg.properties.delivery_mode = 2;
1357
1358        assert!(msg.has_parent());
1359        assert!(msg.has_root());
1360        assert!(msg.has_group());
1361        assert!(msg.has_eta());
1362        assert!(msg.has_expires());
1363        assert!(msg.is_persistent());
1364    }
1365
1366    #[test]
1367    fn test_message_accessors() {
1368        let task_id = Uuid::new_v4();
1369        let msg = Message::new("my_task".to_string(), task_id, vec![1, 2, 3]);
1370
1371        assert_eq!(msg.task_id(), task_id);
1372        assert_eq!(msg.task_name(), "my_task");
1373    }
1374
1375    #[test]
1376    fn test_task_args_add_methods() {
1377        let mut args = TaskArgs::new();
1378        assert!(args.is_empty());
1379
1380        args.add_arg(serde_json::json!(1));
1381        args.add_arg(serde_json::json!(2));
1382        assert_eq!(args.len(), 2);
1383        assert!(args.has_args());
1384        assert!(!args.has_kwargs());
1385
1386        args.add_kwarg("key1".to_string(), serde_json::json!("value1"));
1387        assert_eq!(args.len(), 3);
1388        assert!(args.has_kwargs());
1389    }
1390
1391    #[test]
1392    fn test_task_args_get_methods() {
1393        let mut args = TaskArgs::new();
1394        args.add_arg(serde_json::json!(42));
1395        args.add_kwarg("name".to_string(), serde_json::json!("test"));
1396
1397        assert_eq!(args.get_arg(0), Some(&serde_json::json!(42)));
1398        assert_eq!(args.get_arg(1), None);
1399        assert_eq!(args.get_kwarg("name"), Some(&serde_json::json!("test")));
1400        assert_eq!(args.get_kwarg("missing"), None);
1401    }
1402
1403    #[test]
1404    fn test_task_args_clear() {
1405        let mut args = TaskArgs::new()
1406            .with_args(vec![serde_json::json!(1)])
1407            .with_kwargs({
1408                let mut map = HashMap::new();
1409                map.insert("key".to_string(), serde_json::json!("value"));
1410                map
1411            });
1412
1413        assert!(!args.is_empty());
1414        args.clear();
1415        assert!(args.is_empty());
1416        assert_eq!(args.len(), 0);
1417    }
1418
1419    #[test]
1420    fn test_task_args_partial_eq() {
1421        let args1 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1422        let args2 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1423        let args3 = TaskArgs::new().with_args(vec![serde_json::json!(1)]);
1424
1425        assert_eq!(args1, args2);
1426        assert_ne!(args1, args3);
1427    }
1428
1429    #[test]
1430    fn test_message_body_methods() {
1431        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3, 4, 5]);
1432        assert_eq!(msg.body_size(), 5);
1433        assert!(!msg.has_empty_body());
1434
1435        let empty_msg = Message::new("test".to_string(), Uuid::new_v4(), vec![]);
1436        assert_eq!(empty_msg.body_size(), 0);
1437        assert!(empty_msg.has_empty_body());
1438    }
1439
1440    #[test]
1441    fn test_message_content_accessors() {
1442        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1443        assert_eq!(msg.content_type_str(), "application/json");
1444        assert_eq!(msg.content_encoding_str(), "utf-8");
1445    }
1446
1447    #[test]
1448    fn test_message_retry_and_priority() {
1449        let mut msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1450        assert_eq!(msg.retry_count(), 0);
1451        assert_eq!(msg.priority(), None);
1452
1453        msg.headers.retries = Some(3);
1454        msg.properties.priority = Some(5);
1455        assert_eq!(msg.retry_count(), 3);
1456        assert_eq!(msg.priority(), Some(5));
1457    }
1458
1459    #[test]
1460    fn test_message_correlation_and_reply() {
1461        let mut msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1462        assert!(!msg.has_correlation_id());
1463        assert_eq!(msg.correlation_id(), None);
1464        assert_eq!(msg.reply_to(), None);
1465
1466        msg.properties.correlation_id = Some("corr-123".to_string());
1467        msg.properties.reply_to = Some("reply-queue".to_string());
1468        assert!(msg.has_correlation_id());
1469        assert_eq!(msg.correlation_id(), Some("corr-123"));
1470        assert_eq!(msg.reply_to(), Some("reply-queue"));
1471    }
1472
1473    #[test]
1474    fn test_message_workflow_check() {
1475        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]);
1476        assert!(!msg.is_workflow_message());
1477
1478        let workflow_msg = msg.with_parent(Uuid::new_v4());
1479        assert!(workflow_msg.is_workflow_message());
1480    }
1481
1482    #[test]
1483    fn test_message_with_new_id() {
1484        let task_id = Uuid::new_v4();
1485        let msg = Message::new("test".to_string(), task_id, vec![1, 2, 3]);
1486        let new_msg = msg.with_new_id();
1487
1488        assert_ne!(msg.task_id(), new_msg.task_id());
1489        assert_eq!(msg.task_name(), new_msg.task_name());
1490        assert_eq!(msg.body, new_msg.body);
1491    }
1492
1493    #[test]
1494    fn test_message_to_builder() {
1495        let task_id = Uuid::new_v4();
1496        let parent_id = Uuid::new_v4();
1497        let msg = Message::new("test.task".to_string(), task_id, vec![1, 2, 3])
1498            .with_priority(5)
1499            .with_parent(parent_id);
1500
1501        let builder = msg.to_builder();
1502        // Need to add args since builder doesn't copy body automatically
1503        let rebuilt = builder.args(vec![serde_json::json!(1)]).build().unwrap();
1504
1505        assert_eq!(rebuilt.task_id(), msg.task_id());
1506        assert_eq!(rebuilt.task_name(), msg.task_name());
1507        assert_eq!(rebuilt.priority(), msg.priority());
1508        assert_eq!(rebuilt.headers.parent_id, msg.headers.parent_id);
1509    }
1510
1511    #[test]
1512    fn test_protocol_version_from_str() {
1513        use std::str::FromStr;
1514        assert_eq!(
1515            ProtocolVersion::from_str("v2").unwrap(),
1516            ProtocolVersion::V2
1517        );
1518        assert_eq!(
1519            ProtocolVersion::from_str("V2").unwrap(),
1520            ProtocolVersion::V2
1521        );
1522        assert_eq!(ProtocolVersion::from_str("2").unwrap(), ProtocolVersion::V2);
1523        assert_eq!(
1524            ProtocolVersion::from_str("v5").unwrap(),
1525            ProtocolVersion::V5
1526        );
1527        assert_eq!(
1528            ProtocolVersion::from_str("V5").unwrap(),
1529            ProtocolVersion::V5
1530        );
1531        assert_eq!(ProtocolVersion::from_str("5").unwrap(), ProtocolVersion::V5);
1532        assert!(ProtocolVersion::from_str("v3").is_err());
1533        assert!(ProtocolVersion::from_str("invalid").is_err());
1534    }
1535
1536    #[test]
1537    fn test_protocol_version_ordering() {
1538        assert!(ProtocolVersion::V2 < ProtocolVersion::V5);
1539        assert!(ProtocolVersion::V5 > ProtocolVersion::V2);
1540        assert_eq!(ProtocolVersion::V2, ProtocolVersion::V2);
1541    }
1542
1543    #[test]
1544    fn test_content_type_from_str() {
1545        use std::str::FromStr;
1546        assert_eq!(
1547            ContentType::from_str("application/json").unwrap(),
1548            ContentType::Json
1549        );
1550        assert_eq!(
1551            ContentType::from_str("text/plain").unwrap(),
1552            ContentType::Custom("text/plain".to_string())
1553        );
1554    }
1555
1556    #[test]
1557    fn test_content_encoding_from_str() {
1558        use std::str::FromStr;
1559        assert_eq!(
1560            ContentEncoding::from_str("utf-8").unwrap(),
1561            ContentEncoding::Utf8
1562        );
1563        assert_eq!(
1564            ContentEncoding::from_str("binary").unwrap(),
1565            ContentEncoding::Binary
1566        );
1567        assert_eq!(
1568            ContentEncoding::from_str("gzip").unwrap(),
1569            ContentEncoding::Custom("gzip".to_string())
1570        );
1571    }
1572
1573    #[test]
1574    fn test_message_headers_equality() {
1575        let id = Uuid::new_v4();
1576        let headers1 = MessageHeaders::new("tasks.add".to_string(), id);
1577        let headers2 = MessageHeaders::new("tasks.add".to_string(), id);
1578        let headers3 = MessageHeaders::new("tasks.sub".to_string(), id);
1579
1580        assert_eq!(headers1, headers2);
1581        assert_ne!(headers1, headers3);
1582    }
1583
1584    #[test]
1585    fn test_message_properties_equality() {
1586        let props1 = MessageProperties::default();
1587        let props2 = MessageProperties::default();
1588        let props3 = MessageProperties {
1589            priority: Some(5),
1590            ..Default::default()
1591        };
1592
1593        assert_eq!(props1, props2);
1594        assert_ne!(props1, props3);
1595    }
1596
1597    #[test]
1598    fn test_message_equality() {
1599        let id = Uuid::new_v4();
1600        let body = vec![1, 2, 3];
1601        let msg1 = Message::new("tasks.add".to_string(), id, body.clone());
1602        let msg2 = Message::new("tasks.add".to_string(), id, body.clone());
1603        let msg3 = Message::new("tasks.add".to_string(), id, vec![4, 5, 6]);
1604
1605        assert_eq!(msg1, msg2);
1606        assert_ne!(msg1, msg3);
1607    }
1608
1609    #[test]
1610    fn test_message_equality_with_options() {
1611        let id = Uuid::new_v4();
1612        let parent_id = Uuid::new_v4();
1613        let body = vec![1, 2, 3];
1614
1615        let msg1 = Message::new("tasks.add".to_string(), id, body.clone())
1616            .with_priority(5)
1617            .with_parent(parent_id);
1618        let msg2 = Message::new("tasks.add".to_string(), id, body.clone())
1619            .with_priority(5)
1620            .with_parent(parent_id);
1621        let msg3 = Message::new("tasks.add".to_string(), id, body.clone())
1622            .with_priority(3)
1623            .with_parent(parent_id);
1624
1625        assert_eq!(msg1, msg2);
1626        assert_ne!(msg1, msg3);
1627    }
1628
1629    #[test]
1630    fn test_task_args_equality() {
1631        let args1 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1632        let args2 = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
1633        let args3 = TaskArgs::new().with_args(vec![serde_json::json!(3), serde_json::json!(4)]);
1634
1635        assert_eq!(args1, args2);
1636        assert_ne!(args1, args3);
1637    }
1638
1639    #[test]
1640    fn test_task_args_equality_with_kwargs() {
1641        let mut kwargs1 = std::collections::HashMap::new();
1642        kwargs1.insert("key".to_string(), serde_json::json!("value"));
1643
1644        let mut kwargs2 = std::collections::HashMap::new();
1645        kwargs2.insert("key".to_string(), serde_json::json!("value"));
1646
1647        let args1 = TaskArgs::new().with_kwargs(kwargs1);
1648        let args2 = TaskArgs::new().with_kwargs(kwargs2);
1649        let args3 = TaskArgs::new();
1650
1651        assert_eq!(args1, args2);
1652        assert_ne!(args1, args3);
1653    }
1654
1655    #[test]
1656    fn test_content_type_from_str_trait() {
1657        let ct1: ContentType = "application/json".into();
1658        let ct2: ContentType = "text/plain".into();
1659
1660        assert_eq!(ct1, ContentType::Json);
1661        assert_eq!(ct2, ContentType::Custom("text/plain".to_string()));
1662    }
1663
1664    #[test]
1665    fn test_content_encoding_from_str_trait() {
1666        let ce1: ContentEncoding = "utf-8".into();
1667        let ce2: ContentEncoding = "binary".into();
1668        let ce3: ContentEncoding = "gzip".into();
1669
1670        assert_eq!(ce1, ContentEncoding::Utf8);
1671        assert_eq!(ce2, ContentEncoding::Binary);
1672        assert_eq!(ce3, ContentEncoding::Custom("gzip".to_string()));
1673    }
1674
1675    #[test]
1676    fn test_content_type_as_ref() {
1677        let ct = ContentType::Json;
1678        let s: &str = ct.as_ref();
1679        assert_eq!(s, "application/json");
1680
1681        let ct_custom = ContentType::Custom("text/plain".to_string());
1682        let s_custom: &str = ct_custom.as_ref();
1683        assert_eq!(s_custom, "text/plain");
1684    }
1685
1686    #[test]
1687    fn test_content_encoding_as_ref() {
1688        let ce = ContentEncoding::Utf8;
1689        let s: &str = ce.as_ref();
1690        assert_eq!(s, "utf-8");
1691
1692        let ce_binary = ContentEncoding::Binary;
1693        let s_binary: &str = ce_binary.as_ref();
1694        assert_eq!(s_binary, "binary");
1695    }
1696
1697    #[test]
1698    fn test_content_type_hash() {
1699        use std::collections::HashSet;
1700
1701        let mut set = HashSet::new();
1702        set.insert(ContentType::Json);
1703        set.insert(ContentType::Json); // Duplicate
1704        #[cfg(feature = "msgpack")]
1705        set.insert(ContentType::MessagePack);
1706        set.insert(ContentType::Custom("text/plain".to_string()));
1707
1708        #[cfg(feature = "msgpack")]
1709        assert_eq!(set.len(), 3);
1710        #[cfg(not(feature = "msgpack"))]
1711        assert_eq!(set.len(), 2);
1712
1713        assert!(set.contains(&ContentType::Json));
1714    }
1715
1716    #[test]
1717    fn test_content_encoding_hash() {
1718        use std::collections::HashSet;
1719
1720        let mut set = HashSet::new();
1721        set.insert(ContentEncoding::Utf8);
1722        set.insert(ContentEncoding::Utf8); // Duplicate
1723        set.insert(ContentEncoding::Binary);
1724        set.insert(ContentEncoding::Custom("base64".to_string()));
1725
1726        assert_eq!(set.len(), 3);
1727        assert!(set.contains(&ContentEncoding::Utf8));
1728        assert!(set.contains(&ContentEncoding::Binary));
1729    }
1730
1731    #[test]
1732    fn test_message_with_retries() {
1733        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_retries(5);
1734
1735        assert_eq!(msg.headers.retries, Some(5));
1736        assert_eq!(msg.retry_count(), 5);
1737    }
1738
1739    #[test]
1740    fn test_message_with_correlation_id() {
1741        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1])
1742            .with_correlation_id("corr-123".to_string());
1743
1744        assert_eq!(msg.properties.correlation_id, Some("corr-123".to_string()));
1745        assert_eq!(msg.correlation_id(), Some("corr-123"));
1746    }
1747
1748    #[test]
1749    fn test_message_with_reply_to() {
1750        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1])
1751            .with_reply_to("reply-queue".to_string());
1752
1753        assert_eq!(msg.properties.reply_to, Some("reply-queue".to_string()));
1754        assert_eq!(msg.reply_to(), Some("reply-queue"));
1755    }
1756
1757    #[test]
1758    fn test_message_with_delivery_mode() {
1759        let msg = Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_delivery_mode(1);
1760
1761        assert_eq!(msg.properties.delivery_mode, 1);
1762        assert!(!msg.is_persistent());
1763
1764        let persistent_msg =
1765            Message::new("test".to_string(), Uuid::new_v4(), vec![1]).with_delivery_mode(2);
1766
1767        assert_eq!(persistent_msg.properties.delivery_mode, 2);
1768        assert!(persistent_msg.is_persistent());
1769    }
1770
1771    #[test]
1772    fn test_message_builder_chaining() {
1773        let parent_id = Uuid::new_v4();
1774        let msg = Message::new("test.task".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1775            .with_priority(7)
1776            .with_retries(3)
1777            .with_correlation_id("corr-456".to_string())
1778            .with_reply_to("result-queue".to_string())
1779            .with_parent(parent_id)
1780            .with_delivery_mode(1);
1781
1782        assert_eq!(msg.priority(), Some(7));
1783        assert_eq!(msg.retry_count(), 3);
1784        assert_eq!(msg.correlation_id(), Some("corr-456"));
1785        assert_eq!(msg.reply_to(), Some("result-queue"));
1786        assert_eq!(msg.headers.parent_id, Some(parent_id));
1787        assert_eq!(msg.properties.delivery_mode, 1);
1788        assert!(!msg.is_persistent());
1789    }
1790
1791    #[test]
1792    fn test_protocol_version_is_v2() {
1793        assert!(ProtocolVersion::V2.is_v2());
1794        assert!(!ProtocolVersion::V5.is_v2());
1795    }
1796
1797    #[test]
1798    fn test_protocol_version_is_v5() {
1799        assert!(ProtocolVersion::V5.is_v5());
1800        assert!(!ProtocolVersion::V2.is_v5());
1801    }
1802
1803    #[test]
1804    fn test_protocol_version_as_u8() {
1805        assert_eq!(ProtocolVersion::V2.as_u8(), 2);
1806        assert_eq!(ProtocolVersion::V5.as_u8(), 5);
1807    }
1808
1809    #[test]
1810    fn test_protocol_version_as_number_str() {
1811        assert_eq!(ProtocolVersion::V2.as_number_str(), "2");
1812        assert_eq!(ProtocolVersion::V5.as_number_str(), "5");
1813    }
1814
1815    #[test]
1816    fn test_task_args_from_json() {
1817        let json = r#"{"args":[1,2,3],"kwargs":{"key":"value"}}"#;
1818        let args = TaskArgs::from_json(json).unwrap();
1819
1820        assert_eq!(args.args.len(), 3);
1821        assert_eq!(args.kwargs.len(), 1);
1822        assert_eq!(args.get_kwarg("key"), Some(&serde_json::json!("value")));
1823    }
1824
1825    #[test]
1826    fn test_task_args_to_json() {
1827        let mut args = TaskArgs::new();
1828        args.add_arg(serde_json::json!(1));
1829        args.add_arg(serde_json::json!(2));
1830        args.add_kwarg("key".to_string(), serde_json::json!("value"));
1831
1832        let json = args.to_json().unwrap();
1833        assert!(json.contains("\"args\""));
1834        assert!(json.contains("\"kwargs\""));
1835        assert!(json.contains("\"key\""));
1836    }
1837
1838    #[test]
1839    fn test_task_args_to_json_pretty() {
1840        let args = TaskArgs::new()
1841            .with_args(vec![serde_json::json!(1)])
1842            .with_kwargs({
1843                let mut map = std::collections::HashMap::new();
1844                map.insert("test".to_string(), serde_json::json!("value"));
1845                map
1846            });
1847
1848        let json_pretty = args.to_json_pretty().unwrap();
1849        assert!(json_pretty.contains('\n')); // Should have newlines
1850    }
1851
1852    #[test]
1853    fn test_message_is_ready_for_execution() {
1854        let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1855        assert!(msg1.is_ready_for_execution()); // No ETA
1856
1857        let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1858            .with_eta(chrono::Utc::now() - chrono::Duration::hours(1));
1859        assert!(msg2.is_ready_for_execution()); // Past ETA
1860
1861        let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1862            .with_eta(chrono::Utc::now() + chrono::Duration::hours(1));
1863        assert!(!msg3.is_ready_for_execution()); // Future ETA
1864    }
1865
1866    #[test]
1867    fn test_message_is_not_expired() {
1868        let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1869        assert!(msg1.is_not_expired()); // No expiration
1870
1871        let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1872            .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1873        assert!(msg2.is_not_expired()); // Future expiration
1874
1875        let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1876            .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1877        assert!(!msg3.is_not_expired()); // Past expiration
1878    }
1879
1880    #[test]
1881    fn test_message_should_process() {
1882        // Ready and not expired
1883        let msg1 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3]);
1884        assert!(msg1.should_process());
1885
1886        // Not ready (future ETA)
1887        let msg2 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1888            .with_eta(chrono::Utc::now() + chrono::Duration::hours(1));
1889        assert!(!msg2.should_process());
1890
1891        // Expired
1892        let msg3 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1893            .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1894        assert!(!msg3.should_process());
1895
1896        // Ready but expired
1897        let msg4 = Message::new("test".to_string(), Uuid::new_v4(), vec![1, 2, 3])
1898            .with_eta(chrono::Utc::now() - chrono::Duration::hours(2))
1899            .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
1900        assert!(!msg4.should_process());
1901    }
1902
1903    #[test]
1904    fn test_message_headers_builder() {
1905        let task_id = Uuid::new_v4();
1906        let root_id = Uuid::new_v4();
1907        let parent_id = Uuid::new_v4();
1908        let group_id = Uuid::new_v4();
1909
1910        let headers = MessageHeaders::new("task".to_string(), task_id)
1911            .with_lang("python".to_string())
1912            .with_root_id(root_id)
1913            .with_parent_id(parent_id)
1914            .with_group(group_id)
1915            .with_retries(3)
1916            .with_eta(chrono::Utc::now() + chrono::Duration::minutes(5))
1917            .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1918
1919        assert_eq!(headers.lang, "python");
1920        assert_eq!(headers.root_id, Some(root_id));
1921        assert_eq!(headers.parent_id, Some(parent_id));
1922        assert_eq!(headers.group, Some(group_id));
1923        assert_eq!(headers.retries, Some(3));
1924        assert!(headers.eta.is_some());
1925        assert!(headers.expires.is_some());
1926    }
1927
1928    #[test]
1929    fn test_message_properties_builder() {
1930        let props = MessageProperties::new()
1931            .with_correlation_id("corr-123".to_string())
1932            .with_reply_to("reply.queue".to_string())
1933            .with_delivery_mode(1)
1934            .with_priority(5);
1935
1936        assert_eq!(props.correlation_id, Some("corr-123".to_string()));
1937        assert_eq!(props.reply_to, Some("reply.queue".to_string()));
1938        assert_eq!(props.delivery_mode, 1);
1939        assert_eq!(props.priority, Some(5));
1940    }
1941
1942    #[test]
1943    fn test_message_with_eta_delay() {
1944        let before = chrono::Utc::now();
1945        let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1946            .with_eta_delay(chrono::Duration::minutes(10));
1947        let after = chrono::Utc::now();
1948
1949        assert!(msg.has_eta());
1950        let eta = msg.headers.eta.unwrap();
1951        // ETA should be roughly 10 minutes from now
1952        assert!(eta > before + chrono::Duration::minutes(9));
1953        assert!(eta < after + chrono::Duration::minutes(11));
1954    }
1955
1956    #[test]
1957    fn test_message_with_expires_in() {
1958        let before = chrono::Utc::now();
1959        let msg = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1960            .with_expires_in(chrono::Duration::hours(2));
1961        let after = chrono::Utc::now();
1962
1963        assert!(msg.has_expires());
1964        let expires = msg.headers.expires.unwrap();
1965        // Expiration should be roughly 2 hours from now
1966        assert!(expires > before + chrono::Duration::hours(2) - chrono::Duration::seconds(1));
1967        assert!(expires < after + chrono::Duration::hours(2) + chrono::Duration::seconds(1));
1968    }
1969
1970    #[test]
1971    fn test_message_time_until_eta() {
1972        // No ETA
1973        let msg1 = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
1974        assert!(msg1.time_until_eta().is_none());
1975
1976        // Future ETA
1977        let msg2 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1978            .with_eta(chrono::Utc::now() + chrono::Duration::minutes(30));
1979        let time_left = msg2.time_until_eta();
1980        assert!(time_left.is_some());
1981        assert!(time_left.unwrap() > chrono::Duration::minutes(29));
1982        assert!(time_left.unwrap() < chrono::Duration::minutes(31));
1983
1984        // Past ETA
1985        let msg3 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1986            .with_eta(chrono::Utc::now() - chrono::Duration::minutes(30));
1987        assert!(msg3.time_until_eta().is_none());
1988    }
1989
1990    #[test]
1991    fn test_message_time_until_expiration() {
1992        // No expiration
1993        let msg1 = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
1994        assert!(msg1.time_until_expiration().is_none());
1995
1996        // Future expiration
1997        let msg2 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
1998            .with_expires(chrono::Utc::now() + chrono::Duration::hours(1));
1999        let time_left = msg2.time_until_expiration();
2000        assert!(time_left.is_some());
2001        assert!(time_left.unwrap() > chrono::Duration::minutes(59));
2002        assert!(time_left.unwrap() < chrono::Duration::minutes(61));
2003
2004        // Past expiration
2005        let msg3 = Message::new("task".to_string(), Uuid::new_v4(), vec![])
2006            .with_expires(chrono::Utc::now() - chrono::Duration::hours(1));
2007        assert!(msg3.time_until_expiration().is_none());
2008    }
2009
2010    #[test]
2011    fn test_message_increment_retry() {
2012        let mut msg = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
2013
2014        // Initial retry count is 0
2015        assert_eq!(msg.retry_count(), 0);
2016
2017        // Increment to 1
2018        let count1 = msg.increment_retry();
2019        assert_eq!(count1, 1);
2020        assert_eq!(msg.retry_count(), 1);
2021
2022        // Increment to 2
2023        let count2 = msg.increment_retry();
2024        assert_eq!(count2, 2);
2025        assert_eq!(msg.retry_count(), 2);
2026    }
2027
2028    #[test]
2029    fn test_task_args_index_usize() {
2030        let args = TaskArgs::new().with_args(vec![
2031            serde_json::json!(1),
2032            serde_json::json!("hello"),
2033            serde_json::json!(true),
2034        ]);
2035
2036        // Test Index trait
2037        assert_eq!(args[0], serde_json::json!(1));
2038        assert_eq!(args[1], serde_json::json!("hello"));
2039        assert_eq!(args[2], serde_json::json!(true));
2040    }
2041
2042    #[test]
2043    fn test_task_args_index_mut_usize() {
2044        let mut args = TaskArgs::new().with_args(vec![serde_json::json!(1), serde_json::json!(2)]);
2045
2046        // Test IndexMut trait
2047        args[0] = serde_json::json!(100);
2048        args[1] = serde_json::json!(200);
2049
2050        assert_eq!(args[0], serde_json::json!(100));
2051        assert_eq!(args[1], serde_json::json!(200));
2052    }
2053
2054    #[test]
2055    fn test_task_args_index_str() {
2056        let mut kwargs = HashMap::new();
2057        kwargs.insert("name".to_string(), serde_json::json!("Alice"));
2058        kwargs.insert("age".to_string(), serde_json::json!(30));
2059
2060        let args = TaskArgs::new().with_kwargs(kwargs);
2061
2062        // Test Index trait with string keys
2063        assert_eq!(args["name"], serde_json::json!("Alice"));
2064        assert_eq!(args["age"], serde_json::json!(30));
2065    }
2066
2067    #[test]
2068    #[should_panic(expected = "no entry found for key")]
2069    fn test_task_args_index_str_panic() {
2070        let args = TaskArgs::new();
2071        let _ = &args["nonexistent"]; // Should panic
2072    }
2073
2074    #[test]
2075    fn test_task_args_into_iterator() {
2076        let args = TaskArgs::new().with_args(vec![
2077            serde_json::json!(1),
2078            serde_json::json!(2),
2079            serde_json::json!(3),
2080        ]);
2081
2082        // Test IntoIterator
2083        let values: Vec<_> = args.into_iter().collect();
2084        assert_eq!(values.len(), 3);
2085        assert_eq!(values[0], serde_json::json!(1));
2086        assert_eq!(values[1], serde_json::json!(2));
2087        assert_eq!(values[2], serde_json::json!(3));
2088    }
2089
2090    #[test]
2091    fn test_task_args_into_iterator_ref() {
2092        let args = TaskArgs::new().with_args(vec![serde_json::json!(10), serde_json::json!(20)]);
2093
2094        // Test IntoIterator for &TaskArgs
2095        let sum: i64 = (&args).into_iter().filter_map(|v| v.as_i64()).sum();
2096
2097        assert_eq!(sum, 30);
2098        // args is still usable
2099        assert_eq!(args.args.len(), 2);
2100    }
2101
2102    #[test]
2103    fn test_task_args_extend() {
2104        let mut args = TaskArgs::new().with_args(vec![serde_json::json!(1)]);
2105
2106        // Test Extend trait with positional args
2107        args.extend(vec![serde_json::json!(2), serde_json::json!(3)]);
2108
2109        assert_eq!(args.args.len(), 3);
2110        assert_eq!(args[0], serde_json::json!(1));
2111        assert_eq!(args[1], serde_json::json!(2));
2112        assert_eq!(args[2], serde_json::json!(3));
2113    }
2114
2115    #[test]
2116    fn test_task_args_extend_kwargs() {
2117        let mut args = TaskArgs::new();
2118
2119        // Test Extend trait with key-value pairs
2120        args.extend(vec![
2121            ("key1".to_string(), serde_json::json!("value1")),
2122            ("key2".to_string(), serde_json::json!(42)),
2123        ]);
2124
2125        assert_eq!(args.kwargs.len(), 2);
2126        assert_eq!(args["key1"], serde_json::json!("value1"));
2127        assert_eq!(args["key2"], serde_json::json!(42));
2128    }
2129
2130    #[test]
2131    fn test_task_args_from_iterator() {
2132        // Test FromIterator trait
2133        let args: TaskArgs = vec![
2134            serde_json::json!(1),
2135            serde_json::json!("hello"),
2136            serde_json::json!(true),
2137        ]
2138        .into_iter()
2139        .collect();
2140
2141        assert_eq!(args.args.len(), 3);
2142        assert_eq!(args.kwargs.len(), 0);
2143        assert_eq!(args[0], serde_json::json!(1));
2144        assert_eq!(args[1], serde_json::json!("hello"));
2145        assert_eq!(args[2], serde_json::json!(true));
2146    }
2147
2148    #[test]
2149    fn test_task_args_from_iterator_range() {
2150        // Build TaskArgs from range using FromIterator
2151        let args: TaskArgs = (1..=5).map(|i| serde_json::json!(i)).collect();
2152
2153        assert_eq!(args.args.len(), 5);
2154        assert_eq!(args[0], serde_json::json!(1));
2155        assert_eq!(args[4], serde_json::json!(5));
2156    }
2157
2158    #[test]
2159    fn test_task_args_iterator_chain() {
2160        // Test combining traits: FromIterator, IntoIterator, Extend
2161        let args1: TaskArgs = vec![serde_json::json!(1), serde_json::json!(2)]
2162            .into_iter()
2163            .collect();
2164
2165        let mut args2 = TaskArgs::new();
2166        args2.extend(vec![serde_json::json!(3), serde_json::json!(4)]);
2167
2168        // Extend args2 with args1's values
2169        args2.extend(args1);
2170
2171        assert_eq!(args2.args.len(), 4);
2172        assert_eq!(args2[0], serde_json::json!(3));
2173        assert_eq!(args2[3], serde_json::json!(2));
2174    }
2175}