Skip to main content

harn_vm/
runtime_context.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3
4use crate::value::{VmError, VmValue};
5
6#[derive(Clone, Debug)]
7pub struct RuntimeContext {
8    pub task_id: String,
9    pub parent_task_id: Option<String>,
10    pub root_task_id: String,
11    pub task_name: Option<String>,
12    pub task_group_id: Option<String>,
13    pub scope_id: Option<String>,
14    pub values: BTreeMap<String, VmValue>,
15}
16
17impl RuntimeContext {
18    pub fn root() -> Self {
19        Self {
20            task_id: "task_root".to_string(),
21            parent_task_id: None,
22            root_task_id: "task_root".to_string(),
23            task_name: Some("root".to_string()),
24            task_group_id: None,
25            scope_id: None,
26            values: BTreeMap::new(),
27        }
28    }
29
30    pub fn child_task(
31        &self,
32        task_id: impl Into<String>,
33        task_name: impl Into<String>,
34        task_group_id: Option<String>,
35    ) -> Self {
36        Self {
37            task_id: task_id.into(),
38            parent_task_id: Some(self.task_id.clone()),
39            root_task_id: self.root_task_id.clone(),
40            task_name: Some(task_name.into()),
41            task_group_id,
42            scope_id: self.scope_id.clone(),
43            values: self.values.clone(),
44        }
45    }
46}
47
48impl Default for RuntimeContext {
49    fn default() -> Self {
50        Self::root()
51    }
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RuntimeContextOverlay {
56    pub workflow_id: Option<String>,
57    pub run_id: Option<String>,
58    pub stage_id: Option<String>,
59    pub worker_id: Option<String>,
60}
61
62thread_local! {
63    static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
64        const { RefCell::new(Vec::new()) };
65}
66
67pub struct RuntimeContextOverlayGuard;
68
69pub fn install_runtime_context_overlay(
70    overlay: RuntimeContextOverlay,
71) -> RuntimeContextOverlayGuard {
72    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
73    RuntimeContextOverlayGuard
74}
75
76impl Drop for RuntimeContextOverlayGuard {
77    fn drop(&mut self) {
78        RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
79            stack.borrow_mut().pop();
80        });
81    }
82}
83
84fn current_overlay() -> RuntimeContextOverlay {
85    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
86        let mut merged = RuntimeContextOverlay::default();
87        for overlay in stack.borrow().iter() {
88            if overlay.workflow_id.is_some() {
89                merged.workflow_id = overlay.workflow_id.clone();
90            }
91            if overlay.run_id.is_some() {
92                merged.run_id = overlay.run_id.clone();
93            }
94            if overlay.stage_id.is_some() {
95                merged.stage_id = overlay.stage_id.clone();
96            }
97            if overlay.worker_id.is_some() {
98                merged.worker_id = overlay.worker_id.clone();
99            }
100        }
101        merged
102    })
103}
104
105pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
106    for name in [
107        "runtime_context",
108        "task_current",
109        "runtime_context_values",
110        "runtime_context_get",
111        "runtime_context_set",
112        "runtime_context_clear",
113    ] {
114        vm.register_builtin(name, move |_args, _out| {
115            Err(VmError::Runtime(format!(
116                "{name}: internal runtime context builtin was not intercepted"
117            )))
118        });
119    }
120}
121
122pub(crate) fn dispatch_runtime_context_builtin(
123    vm: &mut crate::vm::Vm,
124    name: &str,
125    args: &[VmValue],
126) -> Option<Result<VmValue, VmError>> {
127    match name {
128        "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
129        "runtime_context_values" => Some(Ok(VmValue::Dict(std::sync::Arc::new(
130            vm.runtime_context.values.clone(),
131        )))),
132        "runtime_context_get" => Some(runtime_context_get(vm, args)),
133        "runtime_context_set" => Some(runtime_context_set(vm, args)),
134        "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
135        _ => None,
136    }
137}
138
139fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
140    let key = require_key(args, "runtime_context_get")?;
141    Ok(vm
142        .runtime_context
143        .values
144        .get(&key)
145        .cloned()
146        .or_else(|| args.get(1).cloned())
147        .unwrap_or(VmValue::Nil))
148}
149
150fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
151    let key = require_key(args, "runtime_context_set")?;
152    let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
153    Ok(vm
154        .runtime_context
155        .values
156        .insert(key, value)
157        .unwrap_or(VmValue::Nil))
158}
159
160fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
161    let key = require_key(args, "runtime_context_clear")?;
162    Ok(vm
163        .runtime_context
164        .values
165        .remove(&key)
166        .unwrap_or(VmValue::Nil))
167}
168
169fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
170    match args.first() {
171        Some(VmValue::String(value)) => Ok(value.to_string()),
172        _ => Err(VmError::Runtime(format!(
173            "{builtin}: first argument must be a string key"
174        ))),
175    }
176}
177
178pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
179    let overlay = current_overlay();
180    let mutation = crate::orchestration::current_mutation_session();
181    let dispatch = crate::triggers::dispatcher::current_dispatch_context();
182    let trace_context = crate::stdlib::tracing::current_trace_context();
183    let agent_session_id = crate::agent_sessions::current_session_id();
184    let agent_ancestry = agent_session_id
185        .as_deref()
186        .and_then(crate::agent_sessions::ancestry);
187    let cancelled = vm
188        .cancel_token
189        .as_ref()
190        .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
191
192    let workflow_id = overlay.workflow_id;
193    let run_id = overlay
194        .run_id
195        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
196    let stage_id = overlay.stage_id;
197    let worker_id = overlay.worker_id.or_else(|| {
198        mutation
199            .as_ref()
200            .and_then(|session| session.worker_id.clone())
201    });
202
203    let mut values = BTreeMap::new();
204    insert_string(
205        &mut values,
206        "task_id",
207        Some(vm.runtime_context.task_id.clone()),
208    );
209    insert_string(
210        &mut values,
211        "parent_task_id",
212        vm.runtime_context.parent_task_id.clone(),
213    );
214    insert_string(
215        &mut values,
216        "root_task_id",
217        Some(vm.runtime_context.root_task_id.clone()),
218    );
219    insert_string(
220        &mut values,
221        "task_name",
222        vm.runtime_context.task_name.clone(),
223    );
224    insert_string(
225        &mut values,
226        "task_group_id",
227        vm.runtime_context.task_group_id.clone(),
228    );
229    insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
230    insert_string(&mut values, "workflow_id", workflow_id);
231    insert_string(&mut values, "run_id", run_id);
232    insert_string(&mut values, "stage_id", stage_id);
233    insert_string(&mut values, "worker_id", worker_id);
234    insert_string(&mut values, "agent_session_id", agent_session_id);
235    insert_string(
236        &mut values,
237        "parent_agent_session_id",
238        agent_ancestry
239            .as_ref()
240            .and_then(|ancestry| ancestry.parent_id.clone()),
241    );
242    insert_string(
243        &mut values,
244        "root_agent_session_id",
245        agent_ancestry
246            .as_ref()
247            .map(|ancestry| ancestry.root_id.clone()),
248    );
249    insert_string(&mut values, "agent_name", None);
250
251    if let Some(context) = dispatch {
252        insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
253        insert_string(
254            &mut values,
255            "trigger_event_id",
256            Some(context.trigger_event.id.0.clone()),
257        );
258        insert_string(
259            &mut values,
260            "binding_key",
261            Some(format!(
262                "{}@{}",
263                context.binding_id, context.binding_version
264            )),
265        );
266        insert_string(
267            &mut values,
268            "tenant_id",
269            context.trigger_event.tenant_id.map(|tenant| tenant.0),
270        );
271        insert_string(
272            &mut values,
273            "provider",
274            Some(context.trigger_event.provider.0),
275        );
276        insert_string(
277            &mut values,
278            "trace_id",
279            Some(context.trigger_event.trace_id.0),
280        );
281    } else {
282        insert_string(&mut values, "trigger_id", None);
283        insert_string(&mut values, "trigger_event_id", None);
284        insert_string(&mut values, "binding_key", None);
285        // Outside a trigger dispatch, fall back to the ambient
286        // `enter_tenant` scope hosts install (today: harn-serve binds
287        // it from the authenticated principal). Keeps
288        // `runtime_context.tenant_id` consistent across trigger and
289        // non-trigger code paths.
290        insert_string(
291            &mut values,
292            "tenant_id",
293            crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
294        );
295        insert_string(&mut values, "provider", None);
296        insert_string(
297            &mut values,
298            "trace_id",
299            trace_context.as_ref().map(|context| context.0.clone()),
300        );
301    }
302
303    insert_string(
304        &mut values,
305        "span_id",
306        trace_context
307            .as_ref()
308            .map(|context| context.1.clone())
309            .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
310    );
311    insert_string(&mut values, "scheduler_key", None);
312    insert_string(&mut values, "runner", None);
313    insert_string(&mut values, "capacity_class", None);
314    values.insert(
315        "context_values".to_string(),
316        VmValue::Dict(std::sync::Arc::new(vm.runtime_context.values.clone())),
317    );
318    values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
319    values.insert("debug".to_string(), debug_context_value(vm, cancelled));
320    VmValue::Dict(std::sync::Arc::new(values))
321}
322
323fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
324    let mut debug = BTreeMap::new();
325    debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
326    debug.insert("waiting_reason".to_string(), VmValue::Nil);
327    debug.insert(
328        "active_task_ids".to_string(),
329        VmValue::List(std::sync::Arc::new(
330            vm.spawned_tasks
331                .keys()
332                .map(|id| VmValue::String(std::sync::Arc::from(id.as_str())))
333                .collect(),
334        )),
335    );
336    debug.insert(
337        "held_synchronization".to_string(),
338        VmValue::List(std::sync::Arc::new(Vec::new())),
339    );
340    debug.insert(
341        "supervisors".to_string(),
342        crate::stdlib::supervisor::supervisor_debug_values(),
343    );
344    VmValue::Dict(std::sync::Arc::new(debug))
345}
346
347fn insert_string(values: &mut BTreeMap<String, VmValue>, key: &str, value: Option<String>) {
348    values.insert(
349        key.to_string(),
350        value
351            .map(|value| VmValue::String(std::sync::Arc::from(value)))
352            .unwrap_or(VmValue::Nil),
353    );
354}