Skip to main content

dataflow_rs/engine/
task_context.rs

1//! # Task context
2//!
3//! Wraps the per-call state passed to every `AsyncFunctionHandler::execute`
4//! call: the message under processing, a handle to the shared datalogic
5//! engine, and an audit-trail accumulator. Exposes typed helpers so handlers
6//! don't have to reach into `crate::engine::utils::{get,set}_nested_value`
7//! or hand-build `Change` entries.
8//!
9//! Custom handlers should treat `TaskContext` as their *only* mutation surface
10//! for `message.context`: the `set` family records a `Change` automatically
11//! when `message.capture_changes` is true, keeping the audit trail in sync
12//! with the data without per-handler boilerplate.
13
14use crate::engine::error::ErrorInfo;
15use crate::engine::message::{Change, Message};
16use crate::engine::utils::{get_nested_value, set_nested_value};
17use datalogic_rs::Engine as DatalogicEngine;
18use datavalue::OwnedDataValue;
19use serde_json::Value as JsonValue;
20use std::sync::Arc;
21
22/// Per-call execution context handed to `AsyncFunctionHandler::execute`.
23///
24/// Borrows the message and datalogic engine for the duration of the handler
25/// call; collects `Change` entries that the workflow executor folds into the
26/// audit trail when the handler returns. Drop semantics are trivial — there
27/// is nothing to flush; the executor extracts the buffered changes via
28/// `into_changes()`.
29pub struct TaskContext<'a> {
30    message: &'a mut Message,
31    datalogic: &'a Arc<DatalogicEngine>,
32    /// Changes accumulated through the `set*` family. Only populated when
33    /// `message.capture_changes` is true; otherwise pushes are no-ops to
34    /// keep the bulk-pipeline fast path allocation-free.
35    changes: Vec<Change>,
36}
37
38impl<'a> TaskContext<'a> {
39    /// Construct a new context. Mostly engine-internal — handlers receive a
40    /// pre-built `&mut TaskContext` from the executor — but exposed `pub` so
41    /// tests and benchmarks can drive `AsyncFunctionHandler::execute`
42    /// directly without going through `Engine::process_message`.
43    pub fn new(message: &'a mut Message, datalogic: &'a Arc<DatalogicEngine>) -> Self {
44        Self {
45            message,
46            datalogic,
47            changes: Vec::new(),
48        }
49    }
50
51    /// Borrow the message under processing. Use this when you need to inspect
52    /// the message id, payload, or audit trail; for reading and mutating the
53    /// `data` / `metadata` / `temp_data` context, prefer the typed helpers on
54    /// `TaskContext` itself.
55    #[inline]
56    pub fn message(&self) -> &Message {
57        self.message
58    }
59
60    /// Mutable access to the message. Prefer the typed helpers (`set`,
61    /// `add_error`) over poking at `message.context` directly — direct
62    /// mutations bypass the audit trail.
63    #[inline]
64    pub fn message_mut(&mut self) -> &mut Message {
65        self.message
66    }
67
68    /// Shared datalogic engine, in case the handler needs to evaluate ad-hoc
69    /// JSONLogic. Most handlers can ignore this argument.
70    #[inline]
71    pub fn datalogic(&self) -> &Arc<DatalogicEngine> {
72        self.datalogic
73    }
74
75    /// Read-only view of `data`. Returns `&OwnedDataValue::Null` if missing
76    /// (mirrors the `Index` fallback semantics of `serde_json::Value`).
77    #[inline]
78    pub fn data(&self) -> &OwnedDataValue {
79        self.message.data()
80    }
81
82    /// Read-only view of `metadata`.
83    #[inline]
84    pub fn metadata(&self) -> &OwnedDataValue {
85        self.message.metadata()
86    }
87
88    /// Read-only view of `temp_data`.
89    #[inline]
90    pub fn temp_data(&self) -> &OwnedDataValue {
91        self.message.temp_data()
92    }
93
94    /// Look up a value by dot-path against the full context tree (rooted at
95    /// the unified `{data, metadata, temp_data}` object). Returns `None` if
96    /// the path doesn't resolve.
97    ///
98    /// Use the same path syntax as JSONLogic: `"data.user.name"`,
99    /// `"temp_data.items.0"`, `"metadata.progress.status_code"`.
100    #[inline]
101    pub fn get(&self, path: &str) -> Option<&OwnedDataValue> {
102        get_nested_value(&self.message.context, path)
103    }
104
105    /// Set a value at a dot-path on the context. Records a `Change` on the
106    /// audit trail when `message.capture_changes` is true; otherwise the
107    /// write happens but no audit entry is buffered.
108    ///
109    /// Intermediate objects/arrays are created on demand; see
110    /// [`crate::engine::utils::set_nested_value`] for the exact semantics
111    /// (numeric segments → arrays, `#` prefix → escaped object key, etc.).
112    pub fn set(&mut self, path: &str, value: OwnedDataValue) {
113        if self.message.capture_changes {
114            let old_value = get_nested_value(&self.message.context, path)
115                .cloned()
116                .unwrap_or(OwnedDataValue::Null);
117            let new_value = value.clone();
118            self.changes.push(Change {
119                path: Arc::from(path),
120                old_value,
121                new_value,
122            });
123        }
124        set_nested_value(&mut self.message.context, path, value);
125    }
126
127    /// Same as [`Self::set`] but accepts a `serde_json::Value` (bridges
128    /// through `OwnedDataValue::from`). Convenience for handlers that
129    /// already speak `serde_json::Value`.
130    #[inline]
131    pub fn set_json(&mut self, path: &str, value: &JsonValue) {
132        self.set(path, OwnedDataValue::from(value));
133    }
134
135    /// Append an error to `message.errors`. Convenience for
136    /// `ctx.message_mut().add_error(...)`.
137    #[inline]
138    pub fn add_error(&mut self, error: ErrorInfo) {
139        self.message.add_error(error);
140    }
141
142    /// Drain the accumulated changes. The workflow executor calls this after
143    /// the handler returns to fold them into the audit trail; tests and
144    /// benchmarks driving the trait directly can use it to inspect what the
145    /// handler buffered.
146    #[inline]
147    pub fn into_changes(self) -> Vec<Change> {
148        self.changes
149    }
150}