dataflow_rs/engine/message.rs
1use crate::engine::error::{DataflowError, ErrorInfo};
2use chrono::{DateTime, Utc};
3use datavalue::OwnedDataValue;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::sync::Arc;
7use uuid::Uuid;
8
9/// A message flowing through the dataflow engine.
10///
11/// Construct via [`Message::builder`] for the full API, or use the shortcuts
12/// [`Message::new`] (already-owned `Arc<OwnedDataValue>` payload — the perf
13/// path) and [`Message::from_value`] (bridges from `serde_json::Value`).
14///
15/// `context` is held as an [`OwnedDataValue`] tree (not `serde_json::Value`)
16/// so the JSONLogic evaluator can borrow it into its arena via
17/// `OwnedDataValue::to_arena` with a single deep walk in, and project the
18/// result back via `DataValue::to_owned` with a single deep walk out — no
19/// `serde_json::Value` round-trip in the hot path. The on-the-wire JSON
20/// shape is preserved by datavalue's native `Serialize` / `Deserialize`
21/// impls.
22///
23/// Every other field is encapsulated — read via `id()`, `payload()`,
24/// `audit_trail()`, `errors()`, `capture_changes()`; mutate `errors` via
25/// [`Message::add_error`]; mutate `context` via [`crate::TaskContext::set`].
26/// Direct mutation of `audit_trail` is engine-internal.
27#[derive(Debug, Clone)]
28pub struct Message {
29 pub(crate) id: String,
30 pub(crate) payload: Arc<OwnedDataValue>,
31 /// Unified context containing `data`, `metadata`, and `temp_data` keys.
32 /// Always an `OwnedDataValue::Object`; the engine populates the three
33 /// top-level keys at construction. Public for read access (tests do
34 /// `message.context["data"]["x"]` lookups); inside handlers prefer
35 /// [`crate::TaskContext::set`] which records audit-trail changes.
36 pub context: OwnedDataValue,
37 pub(crate) audit_trail: Vec<AuditTrail>,
38 /// Errors that occurred during message processing. Read via
39 /// `errors()`, append via `add_error()`.
40 pub(crate) errors: Vec<ErrorInfo>,
41 /// When `true` (default), built-in functions emit per-write `Change`
42 /// entries into `audit_trail`, capturing `old_value` and `new_value` deep
43 /// clones. When `false`, `AuditTrail` entries are still recorded
44 /// (workflow_id, task_id, status, timestamp) but `changes` is empty —
45 /// the bulk-pipeline fast path. UI debug consumers should leave this at
46 /// `true`. Wire shape is unchanged either way.
47 pub(crate) capture_changes: bool,
48}
49
50// Custom Serialize: stable wire format ({id, payload, context, audit_trail, errors}).
51// `capture_changes` is an in-memory hint only — never serialized.
52impl Serialize for Message {
53 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
54 where
55 S: serde::Serializer,
56 {
57 use serde::ser::SerializeStruct;
58 let mut state = serializer.serialize_struct("Message", 5)?;
59 state.serialize_field("id", &self.id)?;
60 state.serialize_field("payload", &self.payload)?;
61 state.serialize_field("context", &self.context)?;
62 state.serialize_field("audit_trail", &self.audit_trail)?;
63 state.serialize_field("errors", &self.errors)?;
64 state.end()
65 }
66}
67
68// Custom Deserialize: mirrors the Serialize shape; no cache field to seed.
69// `capture_changes` defaults to `true` for back-compat.
70impl<'de> Deserialize<'de> for Message {
71 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
72 where
73 D: serde::Deserializer<'de>,
74 {
75 #[derive(Deserialize)]
76 struct MessageData {
77 id: String,
78 payload: Arc<OwnedDataValue>,
79 context: OwnedDataValue,
80 audit_trail: Vec<AuditTrail>,
81 errors: Vec<ErrorInfo>,
82 }
83
84 let data = MessageData::deserialize(deserializer)?;
85 Ok(Message {
86 id: data.id,
87 payload: data.payload,
88 context: data.context,
89 audit_trail: data.audit_trail,
90 errors: data.errors,
91 capture_changes: true,
92 })
93 }
94}
95
96impl Message {
97 /// Start building a message. The recommended constructor — chains
98 /// `.id(...)`, `.payload(...)` / `.payload_json(...)`, and
99 /// `.capture_changes(...)` calls, then `.build()`.
100 pub fn builder() -> MessageBuilder {
101 MessageBuilder::new()
102 }
103
104 /// Construct a message from an already-owned payload `Arc`. The perf
105 /// path: zero `serde_json::Value` walk, one Arc refcount bump per
106 /// message. Use this from a hot loop with a payload `Arc` shared across
107 /// messages (e.g. a benchmark harness or an HTTP handler that receives
108 /// already-parsed payloads).
109 pub fn new(payload: Arc<OwnedDataValue>) -> Self {
110 Self {
111 // UUID v7: ms-precision timestamp in the high bits, random tail.
112 // Time-ordered and sortable — better for databases/logs than v4
113 // (random-only) and the same `rng` backend cost.
114 id: Uuid::now_v7().to_string(),
115 payload,
116 context: empty_context(),
117 audit_trail: vec![],
118 errors: vec![],
119 capture_changes: true,
120 }
121 }
122
123 /// Construct a message from a `serde_json::Value` payload. Convenience
124 /// for code that already speaks serde_json; goes through the
125 /// `OwnedDataValue::from(&Value)` bridge (one deep walk).
126 pub fn from_value(payload: &JsonValue) -> Self {
127 Self::new(Arc::new(OwnedDataValue::from(payload)))
128 }
129
130 /// Construct a message from a JSON payload string. Parses with
131 /// `serde_json` and bridges into `OwnedDataValue`. Returns
132 /// `DataflowError::Deserialization` on parse failure.
133 pub fn from_json_str(payload: &str) -> crate::engine::error::Result<Self> {
134 let value: JsonValue = serde_json::from_str(payload).map_err(DataflowError::from_serde)?;
135 Ok(Self::from_value(&value))
136 }
137
138 /// Add an error to the message
139 pub fn add_error(&mut self, error: ErrorInfo) {
140 self.errors.push(error);
141 }
142
143 /// Check if message has errors
144 pub fn has_errors(&self) -> bool {
145 !self.errors.is_empty()
146 }
147
148 /// Message id (UUID v7 string by default; caller-supplied if set via
149 /// [`MessageBuilder::id`]).
150 #[inline]
151 pub fn id(&self) -> &str {
152 &self.id
153 }
154
155 /// Original payload as the engine received it. Immutable for the
156 /// lifetime of the message — the engine reads it through this Arc and
157 /// copies into `context` only as needed by handlers.
158 #[inline]
159 pub fn payload(&self) -> &OwnedDataValue {
160 &self.payload
161 }
162
163 /// The shared payload `Arc` itself. Useful when forwarding the same
164 /// payload to multiple messages without recloning the underlying
165 /// `OwnedDataValue` tree.
166 #[inline]
167 pub fn payload_arc(&self) -> &Arc<OwnedDataValue> {
168 &self.payload
169 }
170
171 /// Audit-trail entries recorded by the engine, one per task that ran
172 /// (skipped tasks are absent unless `Trace` mode is on).
173 #[inline]
174 pub fn audit_trail(&self) -> &[AuditTrail] {
175 &self.audit_trail
176 }
177
178 /// Errors collected while processing — both validation failures and
179 /// task errors that the workflow swallowed via `continue_on_error`.
180 #[inline]
181 pub fn errors(&self) -> &[ErrorInfo] {
182 &self.errors
183 }
184
185 /// Whether per-write `Change` capture is on. When `false`, audit-trail
186 /// entries are still emitted but their `changes` lists are empty —
187 /// the bulk-pipeline fast path.
188 #[inline]
189 pub fn capture_changes(&self) -> bool {
190 self.capture_changes
191 }
192
193 /// Get a reference to the `data` field in context. Returns
194 /// `&OwnedDataValue::Null` if missing (matches `serde_json::Value`'s
195 /// `Index` fallback semantics).
196 pub fn data(&self) -> &OwnedDataValue {
197 &self.context["data"]
198 }
199
200 /// Get a reference to the `metadata` field in context.
201 pub fn metadata(&self) -> &OwnedDataValue {
202 &self.context["metadata"]
203 }
204
205 /// Get a reference to the `temp_data` field in context.
206 pub fn temp_data(&self) -> &OwnedDataValue {
207 &self.context["temp_data"]
208 }
209}
210
211/// Builder for [`Message`]. Collapses the historical
212/// `new` / `with_id` / `from_value` / `without_change_capture` four-way
213/// constructor split into a single fluent shape.
214///
215/// ```
216/// use dataflow_rs::Message;
217/// use serde_json::json;
218///
219/// // Minimal: serde_json payload, default UUID id, capture on.
220/// let m = Message::builder()
221/// .payload_json(&json!({"order": {"total": 1500}}))
222/// .build();
223/// assert!(m.id().len() > 0);
224/// assert!(m.capture_changes());
225/// ```
226#[must_use = "MessageBuilder must be `.build()` to produce a Message"]
227#[derive(Default)]
228pub struct MessageBuilder {
229 id: Option<String>,
230 payload: Option<Arc<OwnedDataValue>>,
231 capture_changes: Option<bool>,
232}
233
234impl MessageBuilder {
235 /// Create an empty builder. Equivalent to [`MessageBuilder::default`].
236 pub fn new() -> Self {
237 Self::default()
238 }
239
240 /// Caller-supplied id (typically a correlation id from upstream).
241 /// Defaults to a freshly-generated UUID v7.
242 pub fn id(mut self, id: impl Into<String>) -> Self {
243 self.id = Some(id.into());
244 self
245 }
246
247 /// Already-owned payload `Arc` — zero serde_json walk, refcount-only
248 /// share. Mutually exclusive with [`Self::payload_json`]; whichever is
249 /// called last wins.
250 pub fn payload(mut self, payload: Arc<OwnedDataValue>) -> Self {
251 self.payload = Some(payload);
252 self
253 }
254
255 /// Construct the payload from a `serde_json::Value`. Goes through the
256 /// `OwnedDataValue::from(&Value)` bridge (one deep walk).
257 pub fn payload_json(mut self, payload: &JsonValue) -> Self {
258 self.payload = Some(Arc::new(OwnedDataValue::from(payload)));
259 self
260 }
261
262 /// When `false`, built-in functions skip per-write `Change` capture —
263 /// audit-trail entries are still recorded but their `changes` list is
264 /// empty. Defaults to `true`.
265 pub fn capture_changes(mut self, on: bool) -> Self {
266 self.capture_changes = Some(on);
267 self
268 }
269
270 /// Finalize. Defaults: id = UUID v7, payload = `OwnedDataValue::Null`,
271 /// capture_changes = `true`.
272 pub fn build(self) -> Message {
273 Message {
274 id: self.id.unwrap_or_else(|| Uuid::now_v7().to_string()),
275 payload: self
276 .payload
277 .unwrap_or_else(|| Arc::new(OwnedDataValue::Null)),
278 context: empty_context(),
279 audit_trail: vec![],
280 errors: vec![],
281 capture_changes: self.capture_changes.unwrap_or(true),
282 }
283 }
284}
285
286/// Build the canonical empty context shape used by `Message::new` and
287/// `MessageBuilder::build`.
288fn empty_context() -> OwnedDataValue {
289 OwnedDataValue::Object(vec![
290 ("data".to_string(), OwnedDataValue::Object(Vec::new())),
291 ("metadata".to_string(), OwnedDataValue::Object(Vec::new())),
292 ("temp_data".to_string(), OwnedDataValue::Object(Vec::new())),
293 ])
294}
295
296#[derive(Serialize, Deserialize, Debug, Clone)]
297pub struct AuditTrail {
298 pub workflow_id: Arc<str>,
299 pub task_id: Arc<str>,
300 pub timestamp: DateTime<Utc>,
301 pub changes: Vec<Change>,
302 pub status: usize,
303}
304
305/// A single recorded mutation in the audit trail.
306///
307/// `old_value` and `new_value` are owned `OwnedDataValue`s rather than
308/// `Arc<OwnedDataValue>` — eliminates one heap allocation per Change on the
309/// hot path. External consumers that need to share a `Change` across threads
310/// can wrap it themselves; in-process pipelines (audit-on map mappings) don't
311/// pay the Arc cost they were never going to use.
312#[derive(Serialize, Deserialize, Debug, Clone)]
313pub struct Change {
314 pub path: Arc<str>,
315 pub old_value: OwnedDataValue,
316 pub new_value: OwnedDataValue,
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn from_json_str_parses_valid_payload() {
325 let msg =
326 Message::from_json_str(r#"{"order": {"total": 42}}"#).expect("valid JSON should parse");
327 let payload_json = serde_json::to_value(msg.payload()).unwrap();
328 assert_eq!(payload_json, serde_json::json!({"order": {"total": 42}}));
329 }
330
331 #[test]
332 fn from_json_str_rejects_malformed_payload() {
333 let err = Message::from_json_str("{ not json").expect_err("malformed input should fail");
334 assert!(matches!(err, DataflowError::Deserialization(_)));
335 }
336}