Skip to main content

dataflow_rs/engine/
message.rs

1use crate::engine::error::{DataflowError, ErrorInfo};
2use chrono::{DateTime, Utc};
3use datavalue::OwnedDataValue;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::sync::Arc;
7use uuid::Uuid;
8
9/// A message flowing through the dataflow engine.
10///
11/// Construct via [`Message::builder`] for the full API, or use the shortcuts
12/// [`Message::new`] (already-owned `Arc<OwnedDataValue>` payload — the perf
13/// path) and [`Message::from_value`] (bridges from `serde_json::Value`).
14///
15/// `context` is held as an [`OwnedDataValue`] tree (not `serde_json::Value`)
16/// so the JSONLogic evaluator can borrow it into its arena via
17/// `OwnedDataValue::to_arena` with a single deep walk in, and project the
18/// result back via `DataValue::to_owned` with a single deep walk out — no
19/// `serde_json::Value` round-trip in the hot path. The on-the-wire JSON
20/// shape is preserved by datavalue's native `Serialize` / `Deserialize`
21/// impls.
22///
23/// Every other field is encapsulated — read via `id()`, `payload()`,
24/// `audit_trail()`, `errors()`, `capture_changes()`; mutate `errors` via
25/// [`Message::add_error`]; mutate `context` via [`crate::TaskContext::set`].
26/// Direct mutation of `audit_trail` is engine-internal.
27#[derive(Debug, Clone)]
28pub struct Message {
29    pub(crate) id: String,
30    pub(crate) payload: Arc<OwnedDataValue>,
31    /// Unified context containing `data`, `metadata`, and `temp_data` keys.
32    /// Always an `OwnedDataValue::Object`; the engine populates the three
33    /// top-level keys at construction. Public for read access (tests do
34    /// `message.context["data"]["x"]` lookups); inside handlers prefer
35    /// [`crate::TaskContext::set`] which records audit-trail changes.
36    pub context: OwnedDataValue,
37    pub(crate) audit_trail: Vec<AuditTrail>,
38    /// Errors that occurred during message processing. Read via
39    /// `errors()`, append via `add_error()`.
40    pub(crate) errors: Vec<ErrorInfo>,
41    /// When `true` (default), built-in functions emit per-write `Change`
42    /// entries into `audit_trail`, capturing `old_value` and `new_value` deep
43    /// clones. When `false`, `AuditTrail` entries are still recorded
44    /// (workflow_id, task_id, status, timestamp) but `changes` is empty —
45    /// the bulk-pipeline fast path. UI debug consumers should leave this at
46    /// `true`. Wire shape is unchanged either way.
47    pub(crate) capture_changes: bool,
48}
49
50// Custom Serialize: stable wire format ({id, payload, context, audit_trail, errors}).
51// `capture_changes` is an in-memory hint only — never serialized.
52impl Serialize for Message {
53    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
54    where
55        S: serde::Serializer,
56    {
57        use serde::ser::SerializeStruct;
58        let mut state = serializer.serialize_struct("Message", 5)?;
59        state.serialize_field("id", &self.id)?;
60        state.serialize_field("payload", &self.payload)?;
61        state.serialize_field("context", &self.context)?;
62        state.serialize_field("audit_trail", &self.audit_trail)?;
63        state.serialize_field("errors", &self.errors)?;
64        state.end()
65    }
66}
67
68// Custom Deserialize: mirrors the Serialize shape; no cache field to seed.
69// `capture_changes` defaults to `true` for back-compat.
70impl<'de> Deserialize<'de> for Message {
71    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
72    where
73        D: serde::Deserializer<'de>,
74    {
75        #[derive(Deserialize)]
76        struct MessageData {
77            id: String,
78            payload: Arc<OwnedDataValue>,
79            context: OwnedDataValue,
80            audit_trail: Vec<AuditTrail>,
81            errors: Vec<ErrorInfo>,
82        }
83
84        let data = MessageData::deserialize(deserializer)?;
85        Ok(Message {
86            id: data.id,
87            payload: data.payload,
88            context: data.context,
89            audit_trail: data.audit_trail,
90            errors: data.errors,
91            capture_changes: true,
92        })
93    }
94}
95
96impl Message {
97    /// Start building a message. The recommended constructor — chains
98    /// `.id(...)`, `.payload(...)` / `.payload_json(...)`, and
99    /// `.capture_changes(...)` calls, then `.build()`.
100    pub fn builder() -> MessageBuilder {
101        MessageBuilder::new()
102    }
103
104    /// Construct a message from an already-owned payload `Arc`. The perf
105    /// path: zero `serde_json::Value` walk, one Arc refcount bump per
106    /// message. Use this from a hot loop with a payload `Arc` shared across
107    /// messages (e.g. a benchmark harness or an HTTP handler that receives
108    /// already-parsed payloads).
109    pub fn new(payload: Arc<OwnedDataValue>) -> Self {
110        Self {
111            // UUID v7: ms-precision timestamp in the high bits, random tail.
112            // Time-ordered and sortable — better for databases/logs than v4
113            // (random-only) and the same `rng` backend cost.
114            id: Uuid::now_v7().to_string(),
115            payload,
116            context: empty_context(),
117            audit_trail: vec![],
118            errors: vec![],
119            capture_changes: true,
120        }
121    }
122
123    /// Construct a message from a `serde_json::Value` payload. Convenience
124    /// for code that already speaks serde_json; goes through the
125    /// `OwnedDataValue::from(&Value)` bridge (one deep walk).
126    pub fn from_value(payload: &JsonValue) -> Self {
127        Self::new(Arc::new(OwnedDataValue::from(payload)))
128    }
129
130    /// Construct a message from a JSON payload string. Parses with
131    /// `serde_json` and bridges into `OwnedDataValue`. Returns
132    /// `DataflowError::Deserialization` on parse failure.
133    pub fn from_json_str(payload: &str) -> crate::engine::error::Result<Self> {
134        let value: JsonValue = serde_json::from_str(payload).map_err(DataflowError::from_serde)?;
135        Ok(Self::from_value(&value))
136    }
137
138    /// Add an error to the message
139    pub fn add_error(&mut self, error: ErrorInfo) {
140        self.errors.push(error);
141    }
142
143    /// Check if message has errors
144    pub fn has_errors(&self) -> bool {
145        !self.errors.is_empty()
146    }
147
148    /// Message id (UUID v7 string by default; caller-supplied if set via
149    /// [`MessageBuilder::id`]).
150    #[inline]
151    pub fn id(&self) -> &str {
152        &self.id
153    }
154
155    /// Original payload as the engine received it. Immutable for the
156    /// lifetime of the message — the engine reads it through this Arc and
157    /// copies into `context` only as needed by handlers.
158    #[inline]
159    pub fn payload(&self) -> &OwnedDataValue {
160        &self.payload
161    }
162
163    /// The shared payload `Arc` itself. Useful when forwarding the same
164    /// payload to multiple messages without recloning the underlying
165    /// `OwnedDataValue` tree.
166    #[inline]
167    pub fn payload_arc(&self) -> &Arc<OwnedDataValue> {
168        &self.payload
169    }
170
171    /// Audit-trail entries recorded by the engine, one per task that ran
172    /// (skipped tasks are absent unless `Trace` mode is on).
173    #[inline]
174    pub fn audit_trail(&self) -> &[AuditTrail] {
175        &self.audit_trail
176    }
177
178    /// Errors collected while processing — both validation failures and
179    /// task errors that the workflow swallowed via `continue_on_error`.
180    #[inline]
181    pub fn errors(&self) -> &[ErrorInfo] {
182        &self.errors
183    }
184
185    /// Whether per-write `Change` capture is on. When `false`, audit-trail
186    /// entries are still emitted but their `changes` lists are empty —
187    /// the bulk-pipeline fast path.
188    #[inline]
189    pub fn capture_changes(&self) -> bool {
190        self.capture_changes
191    }
192
193    /// Get a reference to the `data` field in context. Returns
194    /// `&OwnedDataValue::Null` if missing (matches `serde_json::Value`'s
195    /// `Index` fallback semantics).
196    pub fn data(&self) -> &OwnedDataValue {
197        &self.context["data"]
198    }
199
200    /// Get a reference to the `metadata` field in context.
201    pub fn metadata(&self) -> &OwnedDataValue {
202        &self.context["metadata"]
203    }
204
205    /// Get a reference to the `temp_data` field in context.
206    pub fn temp_data(&self) -> &OwnedDataValue {
207        &self.context["temp_data"]
208    }
209}
210
211/// Builder for [`Message`]. Collapses the historical
212/// `new` / `with_id` / `from_value` / `without_change_capture` four-way
213/// constructor split into a single fluent shape.
214///
215/// ```
216/// use dataflow_rs::Message;
217/// use serde_json::json;
218///
219/// // Minimal: serde_json payload, default UUID id, capture on.
220/// let m = Message::builder()
221///     .payload_json(&json!({"order": {"total": 1500}}))
222///     .build();
223/// assert!(m.id().len() > 0);
224/// assert!(m.capture_changes());
225/// ```
226#[must_use = "MessageBuilder must be `.build()` to produce a Message"]
227#[derive(Default)]
228pub struct MessageBuilder {
229    id: Option<String>,
230    payload: Option<Arc<OwnedDataValue>>,
231    capture_changes: Option<bool>,
232}
233
234impl MessageBuilder {
235    /// Create an empty builder. Equivalent to [`MessageBuilder::default`].
236    pub fn new() -> Self {
237        Self::default()
238    }
239
240    /// Caller-supplied id (typically a correlation id from upstream).
241    /// Defaults to a freshly-generated UUID v7.
242    pub fn id(mut self, id: impl Into<String>) -> Self {
243        self.id = Some(id.into());
244        self
245    }
246
247    /// Already-owned payload `Arc` — zero serde_json walk, refcount-only
248    /// share. Mutually exclusive with [`Self::payload_json`]; whichever is
249    /// called last wins.
250    pub fn payload(mut self, payload: Arc<OwnedDataValue>) -> Self {
251        self.payload = Some(payload);
252        self
253    }
254
255    /// Construct the payload from a `serde_json::Value`. Goes through the
256    /// `OwnedDataValue::from(&Value)` bridge (one deep walk).
257    pub fn payload_json(mut self, payload: &JsonValue) -> Self {
258        self.payload = Some(Arc::new(OwnedDataValue::from(payload)));
259        self
260    }
261
262    /// When `false`, built-in functions skip per-write `Change` capture —
263    /// audit-trail entries are still recorded but their `changes` list is
264    /// empty. Defaults to `true`.
265    pub fn capture_changes(mut self, on: bool) -> Self {
266        self.capture_changes = Some(on);
267        self
268    }
269
270    /// Finalize. Defaults: id = UUID v7, payload = `OwnedDataValue::Null`,
271    /// capture_changes = `true`.
272    pub fn build(self) -> Message {
273        Message {
274            id: self.id.unwrap_or_else(|| Uuid::now_v7().to_string()),
275            payload: self
276                .payload
277                .unwrap_or_else(|| Arc::new(OwnedDataValue::Null)),
278            context: empty_context(),
279            audit_trail: vec![],
280            errors: vec![],
281            capture_changes: self.capture_changes.unwrap_or(true),
282        }
283    }
284}
285
286/// Build the canonical empty context shape used by `Message::new` and
287/// `MessageBuilder::build`.
288fn empty_context() -> OwnedDataValue {
289    OwnedDataValue::Object(vec![
290        ("data".to_string(), OwnedDataValue::Object(Vec::new())),
291        ("metadata".to_string(), OwnedDataValue::Object(Vec::new())),
292        ("temp_data".to_string(), OwnedDataValue::Object(Vec::new())),
293    ])
294}
295
296#[derive(Serialize, Deserialize, Debug, Clone)]
297pub struct AuditTrail {
298    pub workflow_id: Arc<str>,
299    pub task_id: Arc<str>,
300    pub timestamp: DateTime<Utc>,
301    pub changes: Vec<Change>,
302    pub status: usize,
303}
304
305/// A single recorded mutation in the audit trail.
306///
307/// `old_value` and `new_value` are owned `OwnedDataValue`s rather than
308/// `Arc<OwnedDataValue>` — eliminates one heap allocation per Change on the
309/// hot path. External consumers that need to share a `Change` across threads
310/// can wrap it themselves; in-process pipelines (audit-on map mappings) don't
311/// pay the Arc cost they were never going to use.
312#[derive(Serialize, Deserialize, Debug, Clone)]
313pub struct Change {
314    pub path: Arc<str>,
315    pub old_value: OwnedDataValue,
316    pub new_value: OwnedDataValue,
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn from_json_str_parses_valid_payload() {
325        let msg =
326            Message::from_json_str(r#"{"order": {"total": 42}}"#).expect("valid JSON should parse");
327        let payload_json = serde_json::to_value(msg.payload()).unwrap();
328        assert_eq!(payload_json, serde_json::json!({"order": {"total": 42}}));
329    }
330
331    #[test]
332    fn from_json_str_rejects_malformed_payload() {
333        let err = Message::from_json_str("{ not json").expect_err("malformed input should fail");
334        assert!(matches!(err, DataflowError::Deserialization(_)));
335    }
336}