Skip to main content

harn_vm/
runtime_context.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::rc::Rc;
4
5use crate::value::{VmError, VmValue};
6
7#[derive(Clone, Debug)]
8pub struct RuntimeContext {
9    pub task_id: String,
10    pub parent_task_id: Option<String>,
11    pub root_task_id: String,
12    pub task_name: Option<String>,
13    pub task_group_id: Option<String>,
14    pub scope_id: Option<String>,
15    pub values: BTreeMap<String, VmValue>,
16}
17
18impl RuntimeContext {
19    pub fn root() -> Self {
20        Self {
21            task_id: "task_root".to_string(),
22            parent_task_id: None,
23            root_task_id: "task_root".to_string(),
24            task_name: Some("root".to_string()),
25            task_group_id: None,
26            scope_id: None,
27            values: BTreeMap::new(),
28        }
29    }
30
31    pub fn child_task(
32        &self,
33        task_id: impl Into<String>,
34        task_name: impl Into<String>,
35        task_group_id: Option<String>,
36    ) -> Self {
37        Self {
38            task_id: task_id.into(),
39            parent_task_id: Some(self.task_id.clone()),
40            root_task_id: self.root_task_id.clone(),
41            task_name: Some(task_name.into()),
42            task_group_id,
43            scope_id: self.scope_id.clone(),
44            values: self.values.clone(),
45        }
46    }
47}
48
49impl Default for RuntimeContext {
50    fn default() -> Self {
51        Self::root()
52    }
53}
54
55#[derive(Clone, Debug, Default)]
56pub struct RuntimeContextOverlay {
57    pub workflow_id: Option<String>,
58    pub run_id: Option<String>,
59    pub stage_id: Option<String>,
60    pub worker_id: Option<String>,
61}
62
63thread_local! {
64    static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
65        const { RefCell::new(Vec::new()) };
66}
67
68pub struct RuntimeContextOverlayGuard;
69
70pub fn install_runtime_context_overlay(
71    overlay: RuntimeContextOverlay,
72) -> RuntimeContextOverlayGuard {
73    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
74    RuntimeContextOverlayGuard
75}
76
77impl Drop for RuntimeContextOverlayGuard {
78    fn drop(&mut self) {
79        RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
80            stack.borrow_mut().pop();
81        });
82    }
83}
84
85fn current_overlay() -> RuntimeContextOverlay {
86    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
87        let mut merged = RuntimeContextOverlay::default();
88        for overlay in stack.borrow().iter() {
89            if overlay.workflow_id.is_some() {
90                merged.workflow_id = overlay.workflow_id.clone();
91            }
92            if overlay.run_id.is_some() {
93                merged.run_id = overlay.run_id.clone();
94            }
95            if overlay.stage_id.is_some() {
96                merged.stage_id = overlay.stage_id.clone();
97            }
98            if overlay.worker_id.is_some() {
99                merged.worker_id = overlay.worker_id.clone();
100            }
101        }
102        merged
103    })
104}
105
106pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
107    for name in [
108        "runtime_context",
109        "task_current",
110        "runtime_context_values",
111        "runtime_context_get",
112        "runtime_context_set",
113        "runtime_context_clear",
114    ] {
115        vm.register_builtin(name, move |_args, _out| {
116            Err(VmError::Runtime(format!(
117                "{name}: internal runtime context builtin was not intercepted"
118            )))
119        });
120    }
121}
122
123pub(crate) fn dispatch_runtime_context_builtin(
124    vm: &mut crate::vm::Vm,
125    name: &str,
126    args: &[VmValue],
127) -> Option<Result<VmValue, VmError>> {
128    match name {
129        "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
130        "runtime_context_values" => Some(Ok(VmValue::Dict(Rc::new(
131            vm.runtime_context.values.clone(),
132        )))),
133        "runtime_context_get" => Some(runtime_context_get(vm, args)),
134        "runtime_context_set" => Some(runtime_context_set(vm, args)),
135        "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
136        _ => None,
137    }
138}
139
140fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
141    let key = require_key(args, "runtime_context_get")?;
142    Ok(vm
143        .runtime_context
144        .values
145        .get(&key)
146        .cloned()
147        .or_else(|| args.get(1).cloned())
148        .unwrap_or(VmValue::Nil))
149}
150
151fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
152    let key = require_key(args, "runtime_context_set")?;
153    let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
154    Ok(vm
155        .runtime_context
156        .values
157        .insert(key, value)
158        .unwrap_or(VmValue::Nil))
159}
160
161fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
162    let key = require_key(args, "runtime_context_clear")?;
163    Ok(vm
164        .runtime_context
165        .values
166        .remove(&key)
167        .unwrap_or(VmValue::Nil))
168}
169
170fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
171    match args.first() {
172        Some(VmValue::String(value)) => Ok(value.to_string()),
173        _ => Err(VmError::Runtime(format!(
174            "{builtin}: first argument must be a string key"
175        ))),
176    }
177}
178
179pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
180    let overlay = current_overlay();
181    let mutation = crate::orchestration::current_mutation_session();
182    let dispatch = crate::triggers::dispatcher::current_dispatch_context();
183    let trace_context = crate::stdlib::tracing::current_trace_context();
184    let agent_session_id = crate::agent_sessions::current_session_id();
185    let agent_ancestry = agent_session_id
186        .as_deref()
187        .and_then(crate::agent_sessions::ancestry);
188    let cancelled = vm
189        .cancel_token
190        .as_ref()
191        .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
192
193    let workflow_id = overlay.workflow_id;
194    let run_id = overlay
195        .run_id
196        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
197    let stage_id = overlay.stage_id;
198    let worker_id = overlay.worker_id.or_else(|| {
199        mutation
200            .as_ref()
201            .and_then(|session| session.worker_id.clone())
202    });
203
204    let mut values = BTreeMap::new();
205    insert_string(
206        &mut values,
207        "task_id",
208        Some(vm.runtime_context.task_id.clone()),
209    );
210    insert_string(
211        &mut values,
212        "parent_task_id",
213        vm.runtime_context.parent_task_id.clone(),
214    );
215    insert_string(
216        &mut values,
217        "root_task_id",
218        Some(vm.runtime_context.root_task_id.clone()),
219    );
220    insert_string(
221        &mut values,
222        "task_name",
223        vm.runtime_context.task_name.clone(),
224    );
225    insert_string(
226        &mut values,
227        "task_group_id",
228        vm.runtime_context.task_group_id.clone(),
229    );
230    insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
231    insert_string(&mut values, "workflow_id", workflow_id);
232    insert_string(&mut values, "run_id", run_id);
233    insert_string(&mut values, "stage_id", stage_id);
234    insert_string(&mut values, "worker_id", worker_id);
235    insert_string(&mut values, "agent_session_id", agent_session_id.clone());
236    insert_string(
237        &mut values,
238        "parent_agent_session_id",
239        agent_ancestry
240            .as_ref()
241            .and_then(|ancestry| ancestry.parent_id.clone()),
242    );
243    insert_string(
244        &mut values,
245        "root_agent_session_id",
246        agent_ancestry
247            .as_ref()
248            .map(|ancestry| ancestry.root_id.clone()),
249    );
250    insert_string(&mut values, "agent_name", None);
251
252    if let Some(context) = dispatch {
253        insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
254        insert_string(
255            &mut values,
256            "trigger_event_id",
257            Some(context.trigger_event.id.0.clone()),
258        );
259        insert_string(
260            &mut values,
261            "binding_key",
262            Some(format!(
263                "{}@{}",
264                context.binding_id, context.binding_version
265            )),
266        );
267        insert_string(
268            &mut values,
269            "tenant_id",
270            context.trigger_event.tenant_id.map(|tenant| tenant.0),
271        );
272        insert_string(
273            &mut values,
274            "provider",
275            Some(context.trigger_event.provider.0),
276        );
277        insert_string(
278            &mut values,
279            "trace_id",
280            Some(context.trigger_event.trace_id.0),
281        );
282    } else {
283        insert_string(&mut values, "trigger_id", None);
284        insert_string(&mut values, "trigger_event_id", None);
285        insert_string(&mut values, "binding_key", None);
286        insert_string(&mut values, "tenant_id", None);
287        insert_string(&mut values, "provider", None);
288        insert_string(
289            &mut values,
290            "trace_id",
291            trace_context.as_ref().map(|context| context.0.clone()),
292        );
293    }
294
295    insert_string(
296        &mut values,
297        "span_id",
298        trace_context
299            .as_ref()
300            .map(|context| context.1.clone())
301            .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
302    );
303    insert_string(&mut values, "scheduler_key", None);
304    insert_string(&mut values, "runner", None);
305    insert_string(&mut values, "capacity_class", None);
306    values.insert(
307        "context_values".to_string(),
308        VmValue::Dict(Rc::new(vm.runtime_context.values.clone())),
309    );
310    values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
311    values.insert("debug".to_string(), debug_context_value(vm, cancelled));
312    VmValue::Dict(Rc::new(values))
313}
314
315fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
316    let mut debug = BTreeMap::new();
317    debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
318    debug.insert("waiting_reason".to_string(), VmValue::Nil);
319    debug.insert(
320        "active_task_ids".to_string(),
321        VmValue::List(Rc::new(
322            vm.spawned_tasks
323                .keys()
324                .map(|id| VmValue::String(Rc::from(id.as_str())))
325                .collect(),
326        )),
327    );
328    debug.insert(
329        "held_synchronization".to_string(),
330        VmValue::List(Rc::new(Vec::new())),
331    );
332    debug.insert(
333        "supervisors".to_string(),
334        crate::stdlib::supervisor::supervisor_debug_values(),
335    );
336    VmValue::Dict(Rc::new(debug))
337}
338
339fn insert_string(values: &mut BTreeMap<String, VmValue>, key: &str, value: Option<String>) {
340    values.insert(
341        key.to_string(),
342        value
343            .map(|value| VmValue::String(Rc::from(value)))
344            .unwrap_or(VmValue::Nil),
345    );
346}