dataflow_rs/engine/task_context.rs
1//! # Task context
2//!
3//! Wraps the per-call state passed to every `AsyncFunctionHandler::execute`
4//! call: the message under processing, a handle to the shared datalogic
5//! engine, and an audit-trail accumulator. Exposes typed helpers so handlers
6//! don't have to reach into `crate::engine::utils::{get,set}_nested_value`
7//! or hand-build `Change` entries.
8//!
9//! Custom handlers should treat `TaskContext` as their *only* mutation surface
10//! for `message.context`: the `set` family records a `Change` automatically
11//! when `message.capture_changes` is true, keeping the audit trail in sync
12//! with the data without per-handler boilerplate.
13
14use crate::engine::error::ErrorInfo;
15use crate::engine::message::{Change, Message};
16use crate::engine::utils::{get_nested_value, set_nested_value};
17use datalogic_rs::Engine as DatalogicEngine;
18use datavalue::OwnedDataValue;
19use serde_json::Value as JsonValue;
20use std::sync::Arc;
21
22/// Per-call execution context handed to `AsyncFunctionHandler::execute`.
23///
24/// Borrows the message and datalogic engine for the duration of the handler
25/// call; collects `Change` entries that the workflow executor folds into the
26/// audit trail when the handler returns. Drop semantics are trivial — there
27/// is nothing to flush; the executor extracts the buffered changes via
28/// `into_changes()`.
29pub struct TaskContext<'a> {
30 message: &'a mut Message,
31 datalogic: &'a Arc<DatalogicEngine>,
32 /// Changes accumulated through the `set*` family. Only populated when
33 /// `message.capture_changes` is true; otherwise pushes are no-ops to
34 /// keep the bulk-pipeline fast path allocation-free.
35 changes: Vec<Change>,
36}
37
38impl<'a> TaskContext<'a> {
39 /// Construct a new context. Mostly engine-internal — handlers receive a
40 /// pre-built `&mut TaskContext` from the executor — but exposed `pub` so
41 /// tests and benchmarks can drive `AsyncFunctionHandler::execute`
42 /// directly without going through `Engine::process_message`.
43 pub fn new(message: &'a mut Message, datalogic: &'a Arc<DatalogicEngine>) -> Self {
44 Self {
45 message,
46 datalogic,
47 changes: Vec::new(),
48 }
49 }
50
51 /// Borrow the message under processing. Use this when you need to inspect
52 /// the message id, payload, or audit trail; for reading and mutating the
53 /// `data` / `metadata` / `temp_data` context, prefer the typed helpers on
54 /// `TaskContext` itself.
55 #[inline]
56 pub fn message(&self) -> &Message {
57 self.message
58 }
59
60 /// Mutable access to the message. Prefer the typed helpers (`set`,
61 /// `add_error`) over poking at `message.context` directly — direct
62 /// mutations bypass the audit trail.
63 #[inline]
64 pub fn message_mut(&mut self) -> &mut Message {
65 self.message
66 }
67
68 /// Shared datalogic engine, in case the handler needs to evaluate ad-hoc
69 /// JSONLogic. Most handlers can ignore this argument.
70 #[inline]
71 pub fn datalogic(&self) -> &Arc<DatalogicEngine> {
72 self.datalogic
73 }
74
75 /// Read-only view of `data`. Returns `&OwnedDataValue::Null` if missing
76 /// (mirrors the `Index` fallback semantics of `serde_json::Value`).
77 #[inline]
78 pub fn data(&self) -> &OwnedDataValue {
79 self.message.data()
80 }
81
82 /// Read-only view of `metadata`.
83 #[inline]
84 pub fn metadata(&self) -> &OwnedDataValue {
85 self.message.metadata()
86 }
87
88 /// Read-only view of `temp_data`.
89 #[inline]
90 pub fn temp_data(&self) -> &OwnedDataValue {
91 self.message.temp_data()
92 }
93
94 /// Look up a value by dot-path against the full context tree (rooted at
95 /// the unified `{data, metadata, temp_data}` object). Returns `None` if
96 /// the path doesn't resolve.
97 ///
98 /// Use the same path syntax as JSONLogic: `"data.user.name"`,
99 /// `"temp_data.items.0"`, `"metadata.progress.status_code"`.
100 #[inline]
101 pub fn get(&self, path: &str) -> Option<&OwnedDataValue> {
102 get_nested_value(&self.message.context, path)
103 }
104
105 /// Set a value at a dot-path on the context. Records a `Change` on the
106 /// audit trail when `message.capture_changes` is true; otherwise the
107 /// write happens but no audit entry is buffered.
108 ///
109 /// Intermediate objects/arrays are created on demand; see
110 /// [`crate::engine::utils::set_nested_value`] for the exact semantics
111 /// (numeric segments → arrays, `#` prefix → escaped object key, etc.).
112 pub fn set(&mut self, path: &str, value: OwnedDataValue) {
113 if self.message.capture_changes {
114 let old_value = get_nested_value(&self.message.context, path)
115 .cloned()
116 .unwrap_or(OwnedDataValue::Null);
117 let new_value = value.clone();
118 self.changes.push(Change {
119 path: Arc::from(path),
120 old_value,
121 new_value,
122 });
123 }
124 set_nested_value(&mut self.message.context, path, value);
125 }
126
127 /// Same as [`Self::set`] but accepts a `serde_json::Value` (bridges
128 /// through `OwnedDataValue::from`). Convenience for handlers that
129 /// already speak `serde_json::Value`.
130 #[inline]
131 pub fn set_json(&mut self, path: &str, value: &JsonValue) {
132 self.set(path, OwnedDataValue::from(value));
133 }
134
135 /// Append an error to `message.errors`. Convenience for
136 /// `ctx.message_mut().add_error(...)`.
137 #[inline]
138 pub fn add_error(&mut self, error: ErrorInfo) {
139 self.message.add_error(error);
140 }
141
142 /// Drain the accumulated changes. The workflow executor calls this after
143 /// the handler returns to fold them into the audit trail; tests and
144 /// benchmarks driving the trait directly can use it to inspect what the
145 /// handler buffered.
146 #[inline]
147 pub fn into_changes(self) -> Vec<Change> {
148 self.changes
149 }
150}