dataflow_rs/engine/
message.rs1use crate::engine::error::ErrorInfo;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use std::sync::Arc;
6use uuid::Uuid;
7
8#[derive(Debug, Clone)]
9pub struct Message {
10 pub id: String,
11 pub payload: Arc<Value>,
12 pub context: Value,
14 pub audit_trail: Vec<AuditTrail>,
15 pub errors: Vec<ErrorInfo>,
17 context_arc_cache: Option<Arc<Value>>,
20}
21
22impl Serialize for Message {
24 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
25 where
26 S: serde::Serializer,
27 {
28 use serde::ser::SerializeStruct;
29 let mut state = serializer.serialize_struct("Message", 5)?;
30 state.serialize_field("id", &self.id)?;
31 state.serialize_field("payload", &self.payload)?;
32 state.serialize_field("context", &self.context)?;
33 state.serialize_field("audit_trail", &self.audit_trail)?;
34 state.serialize_field("errors", &self.errors)?;
35 state.end()
36 }
37}
38
39impl<'de> Deserialize<'de> for Message {
41 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42 where
43 D: serde::Deserializer<'de>,
44 {
45 #[derive(Deserialize)]
46 struct MessageData {
47 id: String,
48 payload: Arc<Value>,
49 context: Value,
50 audit_trail: Vec<AuditTrail>,
51 errors: Vec<ErrorInfo>,
52 }
53
54 let data = MessageData::deserialize(deserializer)?;
55 Ok(Message {
56 id: data.id,
57 payload: data.payload,
58 context: data.context,
59 audit_trail: data.audit_trail,
60 errors: data.errors,
61 context_arc_cache: None,
62 })
63 }
64}
65
66impl Message {
67 pub fn new(payload: Arc<Value>) -> Self {
68 Self {
69 id: Uuid::new_v4().to_string(),
70 payload,
71 context: json!({
72 "data": {},
73 "metadata": {},
74 "temp_data": {}
75 }),
76 audit_trail: vec![],
77 errors: vec![],
78 context_arc_cache: None,
79 }
80 }
81
82 pub fn get_context_arc(&mut self) -> Arc<Value> {
85 if let Some(ref arc) = self.context_arc_cache {
86 Arc::clone(arc)
87 } else {
88 let arc = Arc::new(self.context.clone());
89 self.context_arc_cache = Some(Arc::clone(&arc));
90 arc
91 }
92 }
93
94 pub fn invalidate_context_cache(&mut self) {
97 self.context_arc_cache = None;
98 }
99
100 pub fn from_value(payload: &Value) -> Self {
103 Self::new(Arc::new(payload.clone()))
104 }
105
106 pub fn from_arc(payload: Arc<Value>) -> Self {
108 Self::new(payload)
109 }
110
111 pub fn add_error(&mut self, error: ErrorInfo) {
113 self.errors.push(error);
114 }
115
116 pub fn has_errors(&self) -> bool {
118 !self.errors.is_empty()
119 }
120
121 pub fn data(&self) -> &Value {
123 &self.context["data"]
124 }
125
126 pub fn data_mut(&mut self) -> &mut Value {
128 &mut self.context["data"]
129 }
130
131 pub fn metadata(&self) -> &Value {
133 &self.context["metadata"]
134 }
135
136 pub fn metadata_mut(&mut self) -> &mut Value {
138 &mut self.context["metadata"]
139 }
140
141 pub fn temp_data(&self) -> &Value {
143 &self.context["temp_data"]
144 }
145
146 pub fn temp_data_mut(&mut self) -> &mut Value {
148 &mut self.context["temp_data"]
149 }
150}
151
152#[derive(Serialize, Deserialize, Debug, Clone)]
153pub struct AuditTrail {
154 pub workflow_id: Arc<str>,
155 pub task_id: Arc<str>,
156 pub timestamp: DateTime<Utc>,
157 pub changes: Vec<Change>,
158 pub status: usize,
159}
160
161#[derive(Serialize, Deserialize, Debug, Clone)]
162pub struct Change {
163 pub path: Arc<str>,
164 pub old_value: Arc<Value>,
165 pub new_value: Arc<Value>,
166}