dataflow_rs/engine/
message.rs

1use crate::engine::error::ErrorInfo;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use std::sync::Arc;
6use uuid::Uuid;
7
8#[derive(Debug, Clone)]
9pub struct Message {
10    pub id: String,
11    pub payload: Arc<Value>,
12    /// Unified context containing data, metadata, and temp_data
13    pub context: Value,
14    pub audit_trail: Vec<AuditTrail>,
15    /// Errors that occurred during message processing
16    pub errors: Vec<ErrorInfo>,
17    /// Cached Arc of the context to avoid repeated cloning
18    /// This is invalidated (set to None) whenever context is modified
19    context_arc_cache: Option<Arc<Value>>,
20}
21
22// Custom Serialize implementation to exclude context_arc_cache
23impl Serialize for Message {
24    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
25    where
26        S: serde::Serializer,
27    {
28        use serde::ser::SerializeStruct;
29        let mut state = serializer.serialize_struct("Message", 5)?;
30        state.serialize_field("id", &self.id)?;
31        state.serialize_field("payload", &self.payload)?;
32        state.serialize_field("context", &self.context)?;
33        state.serialize_field("audit_trail", &self.audit_trail)?;
34        state.serialize_field("errors", &self.errors)?;
35        state.end()
36    }
37}
38
39// Custom Deserialize implementation to initialize context_arc_cache as None
40impl<'de> Deserialize<'de> for Message {
41    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42    where
43        D: serde::Deserializer<'de>,
44    {
45        #[derive(Deserialize)]
46        struct MessageData {
47            id: String,
48            payload: Arc<Value>,
49            context: Value,
50            audit_trail: Vec<AuditTrail>,
51            errors: Vec<ErrorInfo>,
52        }
53
54        let data = MessageData::deserialize(deserializer)?;
55        Ok(Message {
56            id: data.id,
57            payload: data.payload,
58            context: data.context,
59            audit_trail: data.audit_trail,
60            errors: data.errors,
61            context_arc_cache: None,
62        })
63    }
64}
65
66impl Message {
67    pub fn new(payload: Arc<Value>) -> Self {
68        Self {
69            id: Uuid::new_v4().to_string(),
70            payload,
71            context: json!({
72                "data": {},
73                "metadata": {},
74                "temp_data": {}
75            }),
76            audit_trail: vec![],
77            errors: vec![],
78            context_arc_cache: None,
79        }
80    }
81
82    /// Get or create an Arc reference to the context
83    /// This method returns a cached Arc if available, or creates and caches a new one
84    pub fn get_context_arc(&mut self) -> Arc<Value> {
85        if let Some(ref arc) = self.context_arc_cache {
86            Arc::clone(arc)
87        } else {
88            let arc = Arc::new(self.context.clone());
89            self.context_arc_cache = Some(Arc::clone(&arc));
90            arc
91        }
92    }
93
94    /// Invalidate the cached context Arc
95    /// Call this whenever the context is modified
96    pub fn invalidate_context_cache(&mut self) {
97        self.context_arc_cache = None;
98    }
99
100    /// Convenience method for creating a message from a Value reference
101    /// Note: This clones the entire Value. Use from_arc() to avoid cloning when possible.
102    pub fn from_value(payload: &Value) -> Self {
103        Self::new(Arc::new(payload.clone()))
104    }
105
106    /// Create a message from an Arc<Value> directly, avoiding cloning
107    pub fn from_arc(payload: Arc<Value>) -> Self {
108        Self::new(payload)
109    }
110
111    /// Add an error to the message
112    pub fn add_error(&mut self, error: ErrorInfo) {
113        self.errors.push(error);
114    }
115
116    /// Check if message has errors
117    pub fn has_errors(&self) -> bool {
118        !self.errors.is_empty()
119    }
120
121    /// Get a reference to the data field in context
122    pub fn data(&self) -> &Value {
123        &self.context["data"]
124    }
125
126    /// Get a mutable reference to the data field in context
127    pub fn data_mut(&mut self) -> &mut Value {
128        &mut self.context["data"]
129    }
130
131    /// Get a reference to the metadata field in context
132    pub fn metadata(&self) -> &Value {
133        &self.context["metadata"]
134    }
135
136    /// Get a mutable reference to the metadata field in context
137    pub fn metadata_mut(&mut self) -> &mut Value {
138        &mut self.context["metadata"]
139    }
140
141    /// Get a reference to the temp_data field in context
142    pub fn temp_data(&self) -> &Value {
143        &self.context["temp_data"]
144    }
145
146    /// Get a mutable reference to the temp_data field in context
147    pub fn temp_data_mut(&mut self) -> &mut Value {
148        &mut self.context["temp_data"]
149    }
150}
151
152#[derive(Serialize, Deserialize, Debug, Clone)]
153pub struct AuditTrail {
154    pub workflow_id: Arc<str>,
155    pub task_id: Arc<str>,
156    pub timestamp: DateTime<Utc>,
157    pub changes: Vec<Change>,
158    pub status: usize,
159}
160
161#[derive(Serialize, Deserialize, Debug, Clone)]
162pub struct Change {
163    pub path: Arc<str>,
164    pub old_value: Arc<Value>,
165    pub new_value: Arc<Value>,
166}