celers_protocol/
lazy.rs

1//! Lazy deserialization for improved performance
2//!
3//! This module provides lazy deserialization capabilities where message bodies
4//! and other large data structures are only deserialized when actually accessed.
5//! This is particularly useful for:
6//! - Large message bodies that may not always be needed
7//! - Message routing/filtering scenarios where only headers are needed
8//! - High-throughput scenarios where deserialization overhead matters
9//!
10//! # Examples
11//!
12//! ```
13//! use celers_protocol::lazy::LazyMessage;
14//! use uuid::Uuid;
15//!
16//! let task_id = Uuid::new_v4();
17//! let json = format!(r#"{{"headers":{{"task":"tasks.add","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#, task_id);
18//!
19//! // Parse only the headers initially
20//! let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
21//!
22//! // Headers are immediately available
23//! assert_eq!(msg.task_name(), "tasks.add");
24//!
25//! // Body is only deserialized when accessed
26//! let body = msg.body().unwrap();
27//! ```
28
29use serde::Deserialize;
30use std::sync::{Arc, RwLock};
31use uuid::Uuid;
32
33/// Error type for lazy deserialization
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum LazyError {
36    /// Deserialization failed
37    DeserializationFailed(String),
38    /// Invalid JSON
39    InvalidJson(String),
40    /// Body not available
41    BodyNotAvailable,
42}
43
44impl std::fmt::Display for LazyError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            LazyError::DeserializationFailed(msg) => write!(f, "Deserialization failed: {}", msg),
48            LazyError::InvalidJson(msg) => write!(f, "Invalid JSON: {}", msg),
49            LazyError::BodyNotAvailable => write!(f, "Message body not available"),
50        }
51    }
52}
53
54impl std::error::Error for LazyError {}
55
56/// Lazy-deserialized message body
57///
58/// The body is stored in raw form and only deserialized when accessed.
59/// Uses interior mutability with `RwLock` to cache the deserialized value.
60#[derive(Debug, Clone)]
61pub struct LazyBody {
62    /// Raw body bytes
63    raw: Vec<u8>,
64    /// Cached deserialized body
65    cached: Arc<RwLock<Option<Vec<u8>>>>,
66}
67
68impl LazyBody {
69    /// Create a new lazy body from raw bytes
70    pub fn new(raw: Vec<u8>) -> Self {
71        Self {
72            raw,
73            cached: Arc::new(RwLock::new(None)),
74        }
75    }
76
77    /// Get the raw bytes without deserialization
78    pub fn raw_bytes(&self) -> &[u8] {
79        &self.raw
80    }
81
82    /// Get the body size in bytes
83    #[inline]
84    pub fn size(&self) -> usize {
85        self.raw.len()
86    }
87
88    /// Deserialize the body (base64 decode)
89    pub fn deserialize(&self) -> Result<Vec<u8>, LazyError> {
90        // Check cache first
91        {
92            let cached = self.cached.read().expect("lock should not be poisoned");
93            if let Some(body) = cached.as_ref() {
94                return Ok(body.clone());
95            }
96        }
97
98        // Deserialize and cache
99        let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.raw)
100            .map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
101
102        let mut cached = self.cached.write().expect("lock should not be poisoned");
103        *cached = Some(decoded.clone());
104
105        Ok(decoded)
106    }
107
108    /// Check if body is already deserialized
109    pub fn is_cached(&self) -> bool {
110        self.cached
111            .read()
112            .expect("lock should not be poisoned")
113            .is_some()
114    }
115}
116
117/// Lazy-deserialized message
118///
119/// Headers and properties are eagerly deserialized (they're small),
120/// but the body is lazily deserialized only when accessed.
121#[derive(Debug, Clone)]
122pub struct LazyMessage {
123    /// Message headers (eagerly deserialized)
124    pub headers: crate::MessageHeaders,
125
126    /// Message properties (eagerly deserialized)
127    pub properties: crate::MessageProperties,
128
129    /// Lazy body
130    body: LazyBody,
131
132    /// Content type
133    pub content_type: String,
134
135    /// Content encoding
136    pub content_encoding: String,
137}
138
139impl LazyMessage {
140    /// Create a lazy message from JSON bytes
141    ///
142    /// This performs minimal deserialization - only headers and properties
143    /// are parsed. The body remains in raw form until accessed.
144    pub fn from_json(data: &[u8]) -> Result<Self, LazyError> {
145        #[derive(Deserialize)]
146        struct LazyMessageHelper {
147            headers: crate::MessageHeaders,
148            properties: crate::MessageProperties,
149            #[serde(with = "serde_bytes_helper")]
150            body: Vec<u8>,
151            #[serde(rename = "content-type")]
152            content_type: String,
153            #[serde(rename = "content-encoding")]
154            content_encoding: String,
155        }
156
157        let helper: LazyMessageHelper =
158            serde_json::from_slice(data).map_err(|e| LazyError::InvalidJson(e.to_string()))?;
159
160        Ok(Self {
161            headers: helper.headers,
162            properties: helper.properties,
163            body: LazyBody::new(helper.body),
164            content_type: helper.content_type,
165            content_encoding: helper.content_encoding,
166        })
167    }
168
169    /// Get the task ID
170    pub fn task_id(&self) -> Uuid {
171        self.headers.id
172    }
173
174    /// Get the task name
175    pub fn task_name(&self) -> &str {
176        &self.headers.task
177    }
178
179    /// Get the body size without deserializing
180    pub fn body_size(&self) -> usize {
181        self.body.size()
182    }
183
184    /// Check if body is already deserialized
185    pub fn is_body_cached(&self) -> bool {
186        self.body.is_cached()
187    }
188
189    /// Get raw body bytes without deserializing
190    pub fn raw_body(&self) -> &[u8] {
191        self.body.raw_bytes()
192    }
193
194    /// Get deserialized body (triggers deserialization if not cached)
195    pub fn body(&self) -> Result<Vec<u8>, LazyError> {
196        self.body.deserialize()
197    }
198
199    /// Check if message has ETA
200    pub fn has_eta(&self) -> bool {
201        self.headers.eta.is_some()
202    }
203
204    /// Check if message has expiration
205    pub fn has_expires(&self) -> bool {
206        self.headers.expires.is_some()
207    }
208
209    /// Check if message has parent
210    pub fn has_parent(&self) -> bool {
211        self.headers.parent_id.is_some()
212    }
213
214    /// Check if message has root
215    pub fn has_root(&self) -> bool {
216        self.headers.root_id.is_some()
217    }
218
219    /// Check if message has group
220    pub fn has_group(&self) -> bool {
221        self.headers.group.is_some()
222    }
223
224    /// Convert to fully deserialized message
225    pub fn into_message(self) -> Result<crate::Message, LazyError> {
226        Ok(crate::Message {
227            headers: self.headers,
228            properties: self.properties,
229            body: self.body.deserialize()?,
230            content_type: self.content_type,
231            content_encoding: self.content_encoding,
232        })
233    }
234}
235
236// Helper for deserializing body as raw bytes
237mod serde_bytes_helper {
238    use serde::{Deserialize, Deserializer};
239
240    pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
241    where
242        D: Deserializer<'de>,
243    {
244        let s = String::deserialize(deserializer)?;
245        Ok(s.into_bytes())
246    }
247}
248
249/// Lazy task arguments
250///
251/// Defers parsing of task arguments until they're actually needed.
252#[derive(Debug, Clone)]
253pub struct LazyTaskArgs {
254    /// Raw JSON bytes
255    raw: Vec<u8>,
256    /// Cached parsed arguments
257    cached: Arc<RwLock<Option<crate::TaskArgs>>>,
258}
259
260impl LazyTaskArgs {
261    /// Create lazy task arguments from raw JSON
262    pub fn new(raw: Vec<u8>) -> Self {
263        Self {
264            raw,
265            cached: Arc::new(RwLock::new(None)),
266        }
267    }
268
269    /// Get raw bytes
270    pub fn raw_bytes(&self) -> &[u8] {
271        &self.raw
272    }
273
274    /// Parse task arguments (cached)
275    pub fn parse(&self) -> Result<crate::TaskArgs, LazyError> {
276        // Check cache
277        {
278            let cached = self.cached.read().expect("lock should not be poisoned");
279            if let Some(args) = cached.as_ref() {
280                return Ok(args.clone());
281            }
282        }
283
284        // Parse and cache
285        let args: crate::TaskArgs = serde_json::from_slice(&self.raw)
286            .map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
287
288        let mut cached = self.cached.write().expect("lock should not be poisoned");
289        *cached = Some(args.clone());
290
291        Ok(args)
292    }
293
294    /// Check if arguments are cached
295    pub fn is_cached(&self) -> bool {
296        self.cached
297            .read()
298            .expect("lock should not be poisoned")
299            .is_some()
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_lazy_body() {
309        let raw = b"dGVzdCBkYXRh"; // base64 "test data"
310        let body = LazyBody::new(raw.to_vec());
311
312        assert_eq!(body.size(), raw.len());
313        assert!(!body.is_cached());
314
315        let decoded = body.deserialize().unwrap();
316        assert_eq!(decoded, b"test data");
317        assert!(body.is_cached());
318    }
319
320    #[test]
321    fn test_lazy_message_from_json() {
322        let task_id = Uuid::new_v4();
323        let json = format!(
324            r#"{{"headers":{{"task":"tasks.add","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
325            task_id
326        );
327
328        let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
329
330        assert_eq!(msg.task_name(), "tasks.add");
331        assert_eq!(msg.task_id(), task_id);
332        assert!(!msg.is_body_cached());
333
334        let _body = msg.body().unwrap();
335        assert!(msg.is_body_cached());
336    }
337
338    #[test]
339    fn test_lazy_message_predicates() {
340        let task_id = Uuid::new_v4();
341        let json = format!(
342            r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust","eta":"2024-12-31T23:59:59Z"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
343            task_id
344        );
345
346        let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
347
348        assert!(msg.has_eta());
349        assert!(!msg.has_expires());
350        assert!(!msg.has_parent());
351    }
352
353    #[test]
354    fn test_lazy_message_body_size() {
355        let task_id = Uuid::new_v4();
356        let json = format!(
357            r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
358            task_id
359        );
360
361        let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
362        assert!(msg.body_size() > 0);
363    }
364
365    #[test]
366    fn test_lazy_task_args() {
367        let json = r#"{"args":[1,2,3],"kwargs":{"key":"value"}}"#;
368        let lazy_args = LazyTaskArgs::new(json.as_bytes().to_vec());
369
370        assert!(!lazy_args.is_cached());
371
372        let args = lazy_args.parse().unwrap();
373        assert_eq!(args.args.len(), 3);
374        assert_eq!(args.kwargs.get("key").unwrap(), "value");
375
376        assert!(lazy_args.is_cached());
377    }
378
379    #[test]
380    fn test_lazy_error_display() {
381        let err = LazyError::DeserializationFailed("test error".to_string());
382        assert_eq!(err.to_string(), "Deserialization failed: test error");
383
384        let err = LazyError::InvalidJson("bad json".to_string());
385        assert_eq!(err.to_string(), "Invalid JSON: bad json");
386
387        let err = LazyError::BodyNotAvailable;
388        assert_eq!(err.to_string(), "Message body not available");
389    }
390
391    #[test]
392    fn test_lazy_message_into_message() {
393        let task_id = Uuid::new_v4();
394        let json = format!(
395            r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
396            task_id
397        );
398
399        let lazy_msg = LazyMessage::from_json(json.as_bytes()).unwrap();
400        let msg = lazy_msg.into_message().unwrap();
401
402        assert_eq!(msg.headers.task, "tasks.test");
403        assert_eq!(msg.headers.id, task_id);
404    }
405}