use serde::Deserialize;
use std::sync::{Arc, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LazyError {
DeserializationFailed(String),
InvalidJson(String),
BodyNotAvailable,
}
impl std::fmt::Display for LazyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LazyError::DeserializationFailed(msg) => write!(f, "Deserialization failed: {}", msg),
LazyError::InvalidJson(msg) => write!(f, "Invalid JSON: {}", msg),
LazyError::BodyNotAvailable => write!(f, "Message body not available"),
}
}
}
impl std::error::Error for LazyError {}
#[derive(Debug, Clone)]
pub struct LazyBody {
raw: Vec<u8>,
cached: Arc<RwLock<Option<Vec<u8>>>>,
}
impl LazyBody {
pub fn new(raw: Vec<u8>) -> Self {
Self {
raw,
cached: Arc::new(RwLock::new(None)),
}
}
pub fn raw_bytes(&self) -> &[u8] {
&self.raw
}
#[inline]
pub fn size(&self) -> usize {
self.raw.len()
}
pub fn deserialize(&self) -> Result<Vec<u8>, LazyError> {
{
let cached = self.cached.read().expect("lock should not be poisoned");
if let Some(body) = cached.as_ref() {
return Ok(body.clone());
}
}
let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.raw)
.map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
let mut cached = self.cached.write().expect("lock should not be poisoned");
*cached = Some(decoded.clone());
Ok(decoded)
}
pub fn is_cached(&self) -> bool {
self.cached
.read()
.expect("lock should not be poisoned")
.is_some()
}
}
#[derive(Debug, Clone)]
pub struct LazyMessage {
pub headers: crate::MessageHeaders,
pub properties: crate::MessageProperties,
body: LazyBody,
pub content_type: String,
pub content_encoding: String,
}
impl LazyMessage {
pub fn from_json(data: &[u8]) -> Result<Self, LazyError> {
#[derive(Deserialize)]
struct LazyMessageHelper {
headers: crate::MessageHeaders,
properties: crate::MessageProperties,
#[serde(with = "serde_bytes_helper")]
body: Vec<u8>,
#[serde(rename = "content-type")]
content_type: String,
#[serde(rename = "content-encoding")]
content_encoding: String,
}
let helper: LazyMessageHelper =
serde_json::from_slice(data).map_err(|e| LazyError::InvalidJson(e.to_string()))?;
Ok(Self {
headers: helper.headers,
properties: helper.properties,
body: LazyBody::new(helper.body),
content_type: helper.content_type,
content_encoding: helper.content_encoding,
})
}
pub fn task_id(&self) -> Uuid {
self.headers.id
}
pub fn task_name(&self) -> &str {
&self.headers.task
}
pub fn body_size(&self) -> usize {
self.body.size()
}
pub fn is_body_cached(&self) -> bool {
self.body.is_cached()
}
pub fn raw_body(&self) -> &[u8] {
self.body.raw_bytes()
}
pub fn body(&self) -> Result<Vec<u8>, LazyError> {
self.body.deserialize()
}
pub fn has_eta(&self) -> bool {
self.headers.eta.is_some()
}
pub fn has_expires(&self) -> bool {
self.headers.expires.is_some()
}
pub fn has_parent(&self) -> bool {
self.headers.parent_id.is_some()
}
pub fn has_root(&self) -> bool {
self.headers.root_id.is_some()
}
pub fn has_group(&self) -> bool {
self.headers.group.is_some()
}
pub fn into_message(self) -> Result<crate::Message, LazyError> {
Ok(crate::Message {
headers: self.headers,
properties: self.properties,
body: self.body.deserialize()?,
content_type: self.content_type,
content_encoding: self.content_encoding,
})
}
}
mod serde_bytes_helper {
use serde::{Deserialize, Deserializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(s.into_bytes())
}
}
#[derive(Debug, Clone)]
pub struct LazyTaskArgs {
raw: Vec<u8>,
cached: Arc<RwLock<Option<crate::TaskArgs>>>,
}
impl LazyTaskArgs {
pub fn new(raw: Vec<u8>) -> Self {
Self {
raw,
cached: Arc::new(RwLock::new(None)),
}
}
pub fn raw_bytes(&self) -> &[u8] {
&self.raw
}
pub fn parse(&self) -> Result<crate::TaskArgs, LazyError> {
{
let cached = self.cached.read().expect("lock should not be poisoned");
if let Some(args) = cached.as_ref() {
return Ok(args.clone());
}
}
let args: crate::TaskArgs = serde_json::from_slice(&self.raw)
.map_err(|e| LazyError::DeserializationFailed(e.to_string()))?;
let mut cached = self.cached.write().expect("lock should not be poisoned");
*cached = Some(args.clone());
Ok(args)
}
pub fn is_cached(&self) -> bool {
self.cached
.read()
.expect("lock should not be poisoned")
.is_some()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lazy_body() {
let raw = b"dGVzdCBkYXRh"; let body = LazyBody::new(raw.to_vec());
assert_eq!(body.size(), raw.len());
assert!(!body.is_cached());
let decoded = body.deserialize().unwrap();
assert_eq!(decoded, b"test data");
assert!(body.is_cached());
}
#[test]
fn test_lazy_message_from_json() {
let task_id = Uuid::new_v4();
let json = format!(
r#"{{"headers":{{"task":"tasks.add","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
task_id
);
let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
assert_eq!(msg.task_name(), "tasks.add");
assert_eq!(msg.task_id(), task_id);
assert!(!msg.is_body_cached());
let _body = msg.body().unwrap();
assert!(msg.is_body_cached());
}
#[test]
fn test_lazy_message_predicates() {
let task_id = Uuid::new_v4();
let json = format!(
r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust","eta":"2024-12-31T23:59:59Z"}},"properties":{{"delivery_mode":2}},"body":"e30=","content-type":"application/json","content-encoding":"utf-8"}}"#,
task_id
);
let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
assert!(msg.has_eta());
assert!(!msg.has_expires());
assert!(!msg.has_parent());
}
#[test]
fn test_lazy_message_body_size() {
let task_id = Uuid::new_v4();
let json = format!(
r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
task_id
);
let msg = LazyMessage::from_json(json.as_bytes()).unwrap();
assert!(msg.body_size() > 0);
}
#[test]
fn test_lazy_task_args() {
let json = r#"{"args":[1,2,3],"kwargs":{"key":"value"}}"#;
let lazy_args = LazyTaskArgs::new(json.as_bytes().to_vec());
assert!(!lazy_args.is_cached());
let args = lazy_args.parse().unwrap();
assert_eq!(args.args.len(), 3);
assert_eq!(args.kwargs.get("key").unwrap(), "value");
assert!(lazy_args.is_cached());
}
#[test]
fn test_lazy_error_display() {
let err = LazyError::DeserializationFailed("test error".to_string());
assert_eq!(err.to_string(), "Deserialization failed: test error");
let err = LazyError::InvalidJson("bad json".to_string());
assert_eq!(err.to_string(), "Invalid JSON: bad json");
let err = LazyError::BodyNotAvailable;
assert_eq!(err.to_string(), "Message body not available");
}
#[test]
fn test_lazy_message_into_message() {
let task_id = Uuid::new_v4();
let json = format!(
r#"{{"headers":{{"task":"tasks.test","id":"{}","lang":"rust"}},"properties":{{"delivery_mode":2}},"body":"dGVzdA==","content-type":"application/json","content-encoding":"utf-8"}}"#,
task_id
);
let lazy_msg = LazyMessage::from_json(json.as_bytes()).unwrap();
let msg = lazy_msg.into_message().unwrap();
assert_eq!(msg.headers.task, "tasks.test");
assert_eq!(msg.headers.id, task_id);
}
}