celers_protocol/
compat.rs1use crate::Message;
7use base64::Engine;
8use serde_json::json;
9use uuid::Uuid;
10
11pub fn verify_message_format(msg: &Message) -> Result<(), String> {
13 let json_str = serde_json::to_string(msg).map_err(|e| format!("Serialization error: {}", e))?;
15
16 let value: serde_json::Value =
17 serde_json::from_str(&json_str).map_err(|e| format!("Parse error: {}", e))?;
18
19 if value.get("headers").is_none() {
21 return Err("Missing 'headers' field".to_string());
22 }
23
24 if value.get("properties").is_none() {
25 return Err("Missing 'properties' field".to_string());
26 }
27
28 if value.get("body").is_none() {
29 return Err("Missing 'body' field".to_string());
30 }
31
32 if value.get("content-type").is_none() {
33 return Err("Missing 'content-type' field".to_string());
34 }
35
36 if value.get("content-encoding").is_none() {
37 return Err("Missing 'content-encoding' field".to_string());
38 }
39
40 let headers = value.get("headers").expect("headers field should exist");
42 if headers.get("task").is_none() {
43 return Err("Missing 'headers.task' field".to_string());
44 }
45
46 if headers.get("id").is_none() {
47 return Err("Missing 'headers.id' field".to_string());
48 }
49
50 if headers.get("lang").is_none() {
51 return Err("Missing 'headers.lang' field".to_string());
52 }
53
54 Ok(())
55}
56
57pub fn create_python_celery_message(
59 task_name: &str,
60 task_id: Uuid,
61 args: Vec<serde_json::Value>,
62 kwargs: serde_json::Value,
63) -> serde_json::Value {
64 json!({
66 "headers": {
67 "task": task_name,
68 "id": task_id.to_string(),
69 "lang": "py",
70 "root_id": null,
71 "parent_id": null,
72 "group": null
73 },
74 "properties": {
75 "correlation_id": task_id.to_string(),
76 "reply_to": null,
77 "delivery_mode": 2,
78 "priority": null
79 },
80 "content-type": "application/json",
81 "content-encoding": "utf-8",
82 "body": base64::engine::general_purpose::STANDARD.encode(
83 serde_json::to_vec(&json!([args, kwargs, {}])).expect("serialization should not fail")
84 )
85 })
86}
87
88pub fn parse_python_message(json_value: serde_json::Value) -> Result<Message, String> {
90 serde_json::from_value(json_value).map_err(|e| format!("Parse error: {}", e))
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use crate::{ContentEncoding, ContentType};
97 use chrono::Utc;
98
99 #[test]
100 fn test_celers_message_format_compatibility() {
101 let task_id = Uuid::new_v4();
102 let body = serde_json::to_vec(&json!([[1, 2], {}, {}])).unwrap();
103
104 let msg = Message::new("tasks.add".to_string(), task_id, body);
105
106 verify_message_format(&msg).expect("Message format should be compatible");
108 }
109
110 #[test]
111 fn test_parse_python_celery_message() {
112 let task_id = Uuid::new_v4();
113 let python_msg = create_python_celery_message(
114 "tasks.multiply",
115 task_id,
116 vec![json!(4), json!(5)],
117 json!({}),
118 );
119
120 let msg = parse_python_message(python_msg).expect("Should parse Python message");
122
123 assert_eq!(msg.headers.task, "tasks.multiply");
124 assert_eq!(msg.headers.id, task_id);
125 assert_eq!(msg.headers.lang, "py");
126 assert_eq!(msg.content_type, "application/json");
127 }
128
129 #[test]
130 fn test_round_trip_serialization() {
131 let task_id = Uuid::new_v4();
132 let body = serde_json::to_vec(&json!([[10, 20], {"debug": true}, {}])).unwrap();
133
134 let msg1 = Message::new("tasks.process".to_string(), task_id, body.clone());
135
136 let json_str = serde_json::to_string(&msg1).expect("Should serialize");
138
139 let msg2: Message = serde_json::from_str(&json_str).expect("Should deserialize");
141
142 assert_eq!(msg1.headers.task, msg2.headers.task);
144 assert_eq!(msg1.headers.id, msg2.headers.id);
145 assert_eq!(msg1.body, msg2.body);
146 assert_eq!(msg1.content_type, msg2.content_type);
147 }
148
149 #[test]
150 fn test_message_with_workflow_fields() {
151 let task_id = Uuid::new_v4();
152 let parent_id = Uuid::new_v4();
153 let root_id = Uuid::new_v4();
154 let group_id = Uuid::new_v4();
155
156 let body = serde_json::to_vec(&json!([[], {}, {}])).unwrap();
157
158 let msg = Message::new("tasks.chord_callback".to_string(), task_id, body)
159 .with_parent(parent_id)
160 .with_root(root_id)
161 .with_group(group_id)
162 .with_priority(5);
163
164 verify_message_format(&msg).expect("Should be compatible");
166
167 let json_str = serde_json::to_string(&msg).expect("Should serialize");
169 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
170
171 assert_eq!(value["headers"]["parent_id"], json!(parent_id.to_string()));
172 assert_eq!(value["headers"]["root_id"], json!(root_id.to_string()));
173 assert_eq!(value["headers"]["group"], json!(group_id.to_string()));
174 assert_eq!(value["properties"]["priority"], json!(5));
175 }
176
177 #[test]
178 fn test_message_with_eta_and_expires() {
179 let task_id = Uuid::new_v4();
180 let eta = Utc::now() + chrono::Duration::hours(2);
181 let expires = Utc::now() + chrono::Duration::days(1);
182
183 let body = serde_json::to_vec(&json!([[], {}, {}])).unwrap();
184
185 let msg = Message::new("tasks.scheduled".to_string(), task_id, body)
186 .with_eta(eta)
187 .with_expires(expires);
188
189 verify_message_format(&msg).expect("Should be compatible");
190
191 let json_str = serde_json::to_string(&msg).expect("Should serialize");
193 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
194
195 assert!(value["headers"]["eta"].is_string());
197 assert!(value["headers"]["expires"].is_string());
198 }
199
200 #[test]
201 fn test_body_base64_encoding() {
202 let task_id = Uuid::new_v4();
203 let raw_body = b"test data";
204
205 let msg = Message::new("tasks.test".to_string(), task_id, raw_body.to_vec());
206
207 let json_str = serde_json::to_string(&msg).expect("Should serialize");
208 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
209
210 assert!(value["body"].is_string());
212
213 let encoded = value["body"].as_str().unwrap();
215 let decoded = base64::engine::general_purpose::STANDARD
216 .decode(encoded)
217 .expect("Should decode");
218 assert_eq!(decoded, raw_body);
219 }
220
221 #[test]
222 fn test_content_type_values() {
223 assert_eq!(ContentType::Json.as_str(), "application/json");
224 #[cfg(feature = "msgpack")]
225 assert_eq!(ContentType::MessagePack.as_str(), "application/x-msgpack");
226 #[cfg(feature = "binary")]
227 assert_eq!(ContentType::Binary.as_str(), "application/octet-stream");
228 }
229
230 #[test]
231 fn test_content_encoding_values() {
232 assert_eq!(ContentEncoding::Utf8.as_str(), "utf-8");
233 assert_eq!(ContentEncoding::Binary.as_str(), "binary");
234 }
235
236 #[test]
237 fn test_delivery_mode_persistent() {
238 let task_id = Uuid::new_v4();
239 let body = vec![];
240
241 let msg = Message::new("tasks.test".to_string(), task_id, body);
242
243 assert_eq!(msg.properties.delivery_mode, 2);
245
246 let json_str = serde_json::to_string(&msg).unwrap();
247 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
248
249 assert_eq!(value["properties"]["delivery_mode"], json!(2));
250 }
251}