celers_protocol/
compat.rs

1//! Python Celery Protocol Compatibility Tests
2//!
3//! This module provides verification that CeleRS messages are wire-compatible
4//! with Python Celery's protocol v2/v5 format.
5
6use crate::Message;
7use base64::Engine;
8use serde_json::json;
9use uuid::Uuid;
10
11/// Verify that a CeleRS message serializes to Celery-compatible JSON
12pub fn verify_message_format(msg: &Message) -> Result<(), String> {
13    // Serialize to JSON
14    let json_str = serde_json::to_string(msg).map_err(|e| format!("Serialization error: {}", e))?;
15
16    let value: serde_json::Value =
17        serde_json::from_str(&json_str).map_err(|e| format!("Parse error: {}", e))?;
18
19    // Verify required fields exist
20    if value.get("headers").is_none() {
21        return Err("Missing 'headers' field".to_string());
22    }
23
24    if value.get("properties").is_none() {
25        return Err("Missing 'properties' field".to_string());
26    }
27
28    if value.get("body").is_none() {
29        return Err("Missing 'body' field".to_string());
30    }
31
32    if value.get("content-type").is_none() {
33        return Err("Missing 'content-type' field".to_string());
34    }
35
36    if value.get("content-encoding").is_none() {
37        return Err("Missing 'content-encoding' field".to_string());
38    }
39
40    // Verify headers structure
41    let headers = value.get("headers").expect("headers field should exist");
42    if headers.get("task").is_none() {
43        return Err("Missing 'headers.task' field".to_string());
44    }
45
46    if headers.get("id").is_none() {
47        return Err("Missing 'headers.id' field".to_string());
48    }
49
50    if headers.get("lang").is_none() {
51        return Err("Missing 'headers.lang' field".to_string());
52    }
53
54    Ok(())
55}
56
57/// Create a Python Celery-compatible message (for testing deserialization)
58pub fn create_python_celery_message(
59    task_name: &str,
60    task_id: Uuid,
61    args: Vec<serde_json::Value>,
62    kwargs: serde_json::Value,
63) -> serde_json::Value {
64    // This is the exact format Python Celery uses for Protocol v2
65    json!({
66        "headers": {
67            "task": task_name,
68            "id": task_id.to_string(),
69            "lang": "py",
70            "root_id": null,
71            "parent_id": null,
72            "group": null
73        },
74        "properties": {
75            "correlation_id": task_id.to_string(),
76            "reply_to": null,
77            "delivery_mode": 2,
78            "priority": null
79        },
80        "content-type": "application/json",
81        "content-encoding": "utf-8",
82        "body": base64::engine::general_purpose::STANDARD.encode(
83            serde_json::to_vec(&json!([args, kwargs, {}])).expect("serialization should not fail")
84        )
85    })
86}
87
88/// Parse a Python Celery message into CeleRS Message
89pub fn parse_python_message(json_value: serde_json::Value) -> Result<Message, String> {
90    serde_json::from_value(json_value).map_err(|e| format!("Parse error: {}", e))
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96    use crate::{ContentEncoding, ContentType};
97    use chrono::Utc;
98
99    #[test]
100    fn test_celers_message_format_compatibility() {
101        let task_id = Uuid::new_v4();
102        let body = serde_json::to_vec(&json!([[1, 2], {}, {}])).unwrap();
103
104        let msg = Message::new("tasks.add".to_string(), task_id, body);
105
106        // Verify it produces valid Celery format
107        verify_message_format(&msg).expect("Message format should be compatible");
108    }
109
110    #[test]
111    fn test_parse_python_celery_message() {
112        let task_id = Uuid::new_v4();
113        let python_msg = create_python_celery_message(
114            "tasks.multiply",
115            task_id,
116            vec![json!(4), json!(5)],
117            json!({}),
118        );
119
120        // Should parse without errors
121        let msg = parse_python_message(python_msg).expect("Should parse Python message");
122
123        assert_eq!(msg.headers.task, "tasks.multiply");
124        assert_eq!(msg.headers.id, task_id);
125        assert_eq!(msg.headers.lang, "py");
126        assert_eq!(msg.content_type, "application/json");
127    }
128
129    #[test]
130    fn test_round_trip_serialization() {
131        let task_id = Uuid::new_v4();
132        let body = serde_json::to_vec(&json!([[10, 20], {"debug": true}, {}])).unwrap();
133
134        let msg1 = Message::new("tasks.process".to_string(), task_id, body.clone());
135
136        // Serialize to JSON
137        let json_str = serde_json::to_string(&msg1).expect("Should serialize");
138
139        // Deserialize back
140        let msg2: Message = serde_json::from_str(&json_str).expect("Should deserialize");
141
142        // Verify fields match
143        assert_eq!(msg1.headers.task, msg2.headers.task);
144        assert_eq!(msg1.headers.id, msg2.headers.id);
145        assert_eq!(msg1.body, msg2.body);
146        assert_eq!(msg1.content_type, msg2.content_type);
147    }
148
149    #[test]
150    fn test_message_with_workflow_fields() {
151        let task_id = Uuid::new_v4();
152        let parent_id = Uuid::new_v4();
153        let root_id = Uuid::new_v4();
154        let group_id = Uuid::new_v4();
155
156        let body = serde_json::to_vec(&json!([[], {}, {}])).unwrap();
157
158        let msg = Message::new("tasks.chord_callback".to_string(), task_id, body)
159            .with_parent(parent_id)
160            .with_root(root_id)
161            .with_group(group_id)
162            .with_priority(5);
163
164        // Verify format
165        verify_message_format(&msg).expect("Should be compatible");
166
167        // Serialize and check JSON structure
168        let json_str = serde_json::to_string(&msg).expect("Should serialize");
169        let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
170
171        assert_eq!(value["headers"]["parent_id"], json!(parent_id.to_string()));
172        assert_eq!(value["headers"]["root_id"], json!(root_id.to_string()));
173        assert_eq!(value["headers"]["group"], json!(group_id.to_string()));
174        assert_eq!(value["properties"]["priority"], json!(5));
175    }
176
177    #[test]
178    fn test_message_with_eta_and_expires() {
179        let task_id = Uuid::new_v4();
180        let eta = Utc::now() + chrono::Duration::hours(2);
181        let expires = Utc::now() + chrono::Duration::days(1);
182
183        let body = serde_json::to_vec(&json!([[], {}, {}])).unwrap();
184
185        let msg = Message::new("tasks.scheduled".to_string(), task_id, body)
186            .with_eta(eta)
187            .with_expires(expires);
188
189        verify_message_format(&msg).expect("Should be compatible");
190
191        // Serialize and verify timestamp format
192        let json_str = serde_json::to_string(&msg).expect("Should serialize");
193        let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
194
195        // Celery uses ISO 8601 format for timestamps
196        assert!(value["headers"]["eta"].is_string());
197        assert!(value["headers"]["expires"].is_string());
198    }
199
200    #[test]
201    fn test_body_base64_encoding() {
202        let task_id = Uuid::new_v4();
203        let raw_body = b"test data";
204
205        let msg = Message::new("tasks.test".to_string(), task_id, raw_body.to_vec());
206
207        let json_str = serde_json::to_string(&msg).expect("Should serialize");
208        let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
209
210        // Body should be base64-encoded string
211        assert!(value["body"].is_string());
212
213        // Decode and verify
214        let encoded = value["body"].as_str().unwrap();
215        let decoded = base64::engine::general_purpose::STANDARD
216            .decode(encoded)
217            .expect("Should decode");
218        assert_eq!(decoded, raw_body);
219    }
220
221    #[test]
222    fn test_content_type_values() {
223        assert_eq!(ContentType::Json.as_str(), "application/json");
224        #[cfg(feature = "msgpack")]
225        assert_eq!(ContentType::MessagePack.as_str(), "application/x-msgpack");
226        #[cfg(feature = "binary")]
227        assert_eq!(ContentType::Binary.as_str(), "application/octet-stream");
228    }
229
230    #[test]
231    fn test_content_encoding_values() {
232        assert_eq!(ContentEncoding::Utf8.as_str(), "utf-8");
233        assert_eq!(ContentEncoding::Binary.as_str(), "binary");
234    }
235
236    #[test]
237    fn test_delivery_mode_persistent() {
238        let task_id = Uuid::new_v4();
239        let body = vec![];
240
241        let msg = Message::new("tasks.test".to_string(), task_id, body);
242
243        // Default should be persistent (delivery_mode = 2)
244        assert_eq!(msg.properties.delivery_mode, 2);
245
246        let json_str = serde_json::to_string(&msg).unwrap();
247        let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
248
249        assert_eq!(value["properties"]["delivery_mode"], json!(2));
250    }
251}