1use serde::Deserialize;
30use std::sync::{Arc, RwLock};
31use uuid::Uuid;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum LazyError {
36 DeserializationFailed(String),
38 InvalidJson(String),
40 BodyNotAvailable,
42}
43
44impl std::fmt::Display for LazyError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 LazyError::DeserializationFailed(msg) => write!(f, "Deserialization failed: {}", msg),
48 LazyError::InvalidJson(msg) => write!(f, "Invalid JSON: {}", msg),
49 LazyError::BodyNotAvailable => write!(f, "Message body not available"),
50 }
51 }
52}
53
54impl std::error::Error for LazyError {}
55
56#[derive(Debug, Clone)]
61pub struct LazyBody {
62 raw: Vec<u8>,
64 cached: Arc<RwLock<Option<Vec<u8>>>>,
66}
67
68impl LazyBody {
69 pub fn new(raw: Vec<u8>) -> Self {
71 Self {
72 raw,
73 cached: Arc::new(RwLock::new(None)),
74 }
75 }
76
77 pub fn raw_bytes(&self) -> &[u8] {
79 &self.raw
80 }
81
82 #[inline]
84 pub fn size(&self) -> usize {
85 self.raw.len()
86 }
87
88 pub fn deserialize(&self) -> Result<Vec<u8>, LazyError> {
90 {
92 let cached = self.cached.read().expect("lock should not be poisoned");
93 if let Some(body) = cached.as_ref() {
94 return Ok(body.clone());
95 }
96 }
97
98 let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.raw)
100 .map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
101
102 let mut cached = self.cached.write().expect("lock should not be poisoned");
103 *cached = Some(decoded.clone());
104
105 Ok(decoded)
106 }
107
108 pub fn is_cached(&self) -> bool {
110 self.cached
111 .read()
112 .expect("lock should not be poisoned")
113 .is_some()
114 }
115}
116
117#[derive(Debug, Clone)]
122pub struct LazyMessage {
123 pub headers: crate::MessageHeaders,
125
126 pub properties: crate::MessageProperties,
128
129 body: LazyBody,
131
132 pub content_type: String,
134
135 pub content_encoding: String,
137}
138
139impl LazyMessage {
140 pub fn from_json(data: &[u8]) -> Result<Self, LazyError> {
145 #[derive(Deserialize)]
146 struct LazyMessageHelper {
147 headers: crate::MessageHeaders,
148 properties: crate::MessageProperties,
149 #[serde(with = "serde_bytes_helper")]
150 body: Vec<u8>,
151 #[serde(rename = "content-type")]
152 content_type: String,
153 #[serde(rename = "content-encoding")]
154 content_encoding: String,
155 }
156
157 let helper: LazyMessageHelper =
158 serde_json::from_slice(data).map_err(|e| LazyError::InvalidJson(e.to_string()))?;
159
160 Ok(Self {
161 headers: helper.headers,
162 properties: helper.properties,
163 body: LazyBody::new(helper.body),
164 content_type: helper.content_type,
165 content_encoding: helper.content_encoding,
166 })
167 }
168
169 pub fn task_id(&self) -> Uuid {
171 self.headers.id
172 }
173
174 pub fn task_name(&self) -> &str {
176 &self.headers.task
177 }
178
179 pub fn body_size(&self) -> usize {
181 self.body.size()
182 }
183
184 pub fn is_body_cached(&self) -> bool {
186 self.body.is_cached()
187 }
188
189 pub fn raw_body(&self) -> &[u8] {
191 self.body.raw_bytes()
192 }
193
194 pub fn body(&self) -> Result<Vec<u8>, LazyError> {
196 self.body.deserialize()
197 }
198
199 pub fn has_eta(&self) -> bool {
201 self.headers.eta.is_some()
202 }
203
204 pub fn has_expires(&self) -> bool {
206 self.headers.expires.is_some()
207 }
208
209 pub fn has_parent(&self) -> bool {
211 self.headers.parent_id.is_some()
212 }
213
214 pub fn has_root(&self) -> bool {
216 self.headers.root_id.is_some()
217 }
218
219 pub fn has_group(&self) -> bool {
221 self.headers.group.is_some()
222 }
223
224 pub fn into_message(self) -> Result<crate::Message, LazyError> {
226 Ok(crate::Message {
227 headers: self.headers,
228 properties: self.properties,
229 body: self.body.deserialize()?,
230 content_type: self.content_type,
231 content_encoding: self.content_encoding,
232 })
233 }
234}
235
236mod serde_bytes_helper {
238 use serde::{Deserialize, Deserializer};
239
240 pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
241 where
242 D: Deserializer<'de>,
243 {
244 let s = String::deserialize(deserializer)?;
245 Ok(s.into_bytes())
246 }
247}
248
249#[derive(Debug, Clone)]
253pub struct LazyTaskArgs {
254 raw: Vec<u8>,
256 cached: Arc<RwLock<Option<crate::TaskArgs>>>,
258}
259
260impl LazyTaskArgs {
261 pub fn new(raw: Vec<u8>) -> Self {
263 Self {
264 raw,
265 cached: Arc::new(RwLock::new(None)),
266 }
267 }
268
269 pub fn raw_bytes(&self) -> &[u8] {
271 &self.raw
272 }
273
274 pub fn parse(&self) -> Result<crate::TaskArgs, LazyError> {
276 {
278 let cached = self.cached.read().expect("lock should not be poisoned");
279 if let Some(args) = cached.as_ref() {
280 return Ok(args.clone());
281 }
282 }
283
284 let args: crate::TaskArgs = serde_json::from_slice(&self.raw)
286 .map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
287
288 let mut cached = self.cached.write().expect("lock should not be poisoned");
289 *cached = Some(args.clone());
290
291 Ok(args)
292 }
293
294 pub fn is_cached(&self) -> bool {
296 self.cached
297 .read()
298 .expect("lock should not be poisoned")
299 .is_some()
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_lazy_body() {
309 let raw = b"dGVzdCBkYXRh"; let body = LazyBody::new(raw.to_vec());
311
312 assert_eq!(body.size(), raw.len());
313 assert!(!body.is_cached());
314
315 let decoded = body.deserialize().unwrap();
316 assert_eq!(decoded, b"test data");
317 assert!(body.is_cached());
318 }
319
320 #[test]
321 fn test_lazy_message_from_json() {
322 let task_id = Uuid::new_v4();
323 let json = format!(
324 r#"{{"headers":{{"task":"tasks.add","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
325 task_id
326 );
327
328 let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
329
330 assert_eq!(msg.task_name(), "tasks.add");
331 assert_eq!(msg.task_id(), task_id);
332 assert!(!msg.is_body_cached());
333
334 let _body = msg.body().unwrap();
335 assert!(msg.is_body_cached());
336 }
337
338 #[test]
339 fn test_lazy_message_predicates() {
340 let task_id = Uuid::new_v4();
341 let json = format!(
342 r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust","eta":"2024-12-31T23:59:59Z"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
343 task_id
344 );
345
346 let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
347
348 assert!(msg.has_eta());
349 assert!(!msg.has_expires());
350 assert!(!msg.has_parent());
351 }
352
353 #[test]
354 fn test_lazy_message_body_size() {
355 let task_id = Uuid::new_v4();
356 let json = format!(
357 r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
358 task_id
359 );
360
361 let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
362 assert!(msg.body_size() > 0);
363 }
364
365 #[test]
366 fn test_lazy_task_args() {
367 let json = r#"{"args":[1,2,3],"kwargs":{"key":"value"}}"#;
368 let lazy_args = LazyTaskArgs::new(json.as_bytes().to_vec());
369
370 assert!(!lazy_args.is_cached());
371
372 let args = lazy_args.parse().unwrap();
373 assert_eq!(args.args.len(), 3);
374 assert_eq!(args.kwargs.get("key").unwrap(), "value");
375
376 assert!(lazy_args.is_cached());
377 }
378
379 #[test]
380 fn test_lazy_error_display() {
381 let err = LazyError::DeserializationFailed("test error".to_string());
382 assert_eq!(err.to_string(), "Deserialization failed: test error");
383
384 let err = LazyError::InvalidJson("bad json".to_string());
385 assert_eq!(err.to_string(), "Invalid JSON: bad json");
386
387 let err = LazyError::BodyNotAvailable;
388 assert_eq!(err.to_string(), "Message body not available");
389 }
390
391 #[test]
392 fn test_lazy_message_into_message() {
393 let task_id = Uuid::new_v4();
394 let json = format!(
395 r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
396 task_id
397 );
398
399 let lazy_msg = LazyMessage::from_json(json.as_bytes()).unwrap();
400 let msg = lazy_msg.into_message().unwrap();
401
402 assert_eq!(msg.headers.task, "tasks.test");
403 assert_eq!(msg.headers.id, task_id);
404 }
405}