Skip to main content

harn_vm/
runtime_context.rs

1use std::cell::RefCell;
2
3use crate::value::{VmError, VmValue};
4
5#[derive(Clone, Debug)]
6pub struct RuntimeContext {
7    pub task_id: String,
8    pub parent_task_id: Option<String>,
9    pub root_task_id: String,
10    pub task_name: Option<String>,
11    pub task_group_id: Option<String>,
12    pub scope_id: Option<String>,
13    pub values: crate::value::DictMap,
14}
15
16impl RuntimeContext {
17    pub fn root() -> Self {
18        Self {
19            task_id: "task_root".to_string(),
20            parent_task_id: None,
21            root_task_id: "task_root".to_string(),
22            task_name: Some("root".to_string()),
23            task_group_id: None,
24            scope_id: None,
25            values: crate::value::DictMap::new(),
26        }
27    }
28
29    pub fn child_task(
30        &self,
31        task_id: impl Into<String>,
32        task_name: impl Into<String>,
33        task_group_id: Option<String>,
34    ) -> Self {
35        Self {
36            task_id: task_id.into(),
37            parent_task_id: Some(self.task_id.clone()),
38            root_task_id: self.root_task_id.clone(),
39            task_name: Some(task_name.into()),
40            task_group_id,
41            scope_id: self.scope_id.clone(),
42            values: self.values.clone(),
43        }
44    }
45}
46
47impl Default for RuntimeContext {
48    fn default() -> Self {
49        Self::root()
50    }
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RuntimeContextOverlay {
55    pub workflow_id: Option<String>,
56    pub run_id: Option<String>,
57    pub stage_id: Option<String>,
58    pub worker_id: Option<String>,
59}
60
61thread_local! {
62    static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
63        const { RefCell::new(Vec::new()) };
64}
65
66pub struct RuntimeContextOverlayGuard;
67
68pub fn install_runtime_context_overlay(
69    overlay: RuntimeContextOverlay,
70) -> RuntimeContextOverlayGuard {
71    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
72    RuntimeContextOverlayGuard
73}
74
75impl Drop for RuntimeContextOverlayGuard {
76    fn drop(&mut self) {
77        RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
78            stack.borrow_mut().pop();
79        });
80    }
81}
82
83fn current_overlay() -> RuntimeContextOverlay {
84    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
85        let mut merged = RuntimeContextOverlay::default();
86        for overlay in stack.borrow().iter() {
87            if overlay.workflow_id.is_some() {
88                merged.workflow_id = overlay.workflow_id.clone();
89            }
90            if overlay.run_id.is_some() {
91                merged.run_id = overlay.run_id.clone();
92            }
93            if overlay.stage_id.is_some() {
94                merged.stage_id = overlay.stage_id.clone();
95            }
96            if overlay.worker_id.is_some() {
97                merged.worker_id = overlay.worker_id.clone();
98            }
99        }
100        merged
101    })
102}
103
104pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
105    for name in [
106        "runtime_context",
107        "task_current",
108        "runtime_context_values",
109        "runtime_context_get",
110        "runtime_context_set",
111        "runtime_context_clear",
112    ] {
113        vm.register_builtin(name, move |_args, _out| {
114            Err(VmError::Runtime(format!(
115                "{name}: internal runtime context builtin was not intercepted"
116            )))
117        });
118    }
119}
120
121pub(crate) fn dispatch_runtime_context_builtin(
122    vm: &mut crate::vm::Vm,
123    name: &str,
124    args: &[VmValue],
125) -> Option<Result<VmValue, VmError>> {
126    match name {
127        "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
128        "runtime_context_values" => Some(Ok(VmValue::dict(vm.runtime_context.values.clone()))),
129        "runtime_context_get" => Some(runtime_context_get(vm, args)),
130        "runtime_context_set" => Some(runtime_context_set(vm, args)),
131        "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
132        _ => None,
133    }
134}
135
136fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
137    let key = require_key(args, "runtime_context_get")?;
138    Ok(vm
139        .runtime_context
140        .values
141        .get(&key)
142        .cloned()
143        .or_else(|| args.get(1).cloned())
144        .unwrap_or(VmValue::Nil))
145}
146
147fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
148    let key = require_key(args, "runtime_context_set")?;
149    let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
150    Ok(vm
151        .runtime_context
152        .values
153        .insert(key, value)
154        .unwrap_or(VmValue::Nil))
155}
156
157fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
158    let key = require_key(args, "runtime_context_clear")?;
159    Ok(vm
160        .runtime_context
161        .values
162        .remove(&key)
163        .unwrap_or(VmValue::Nil))
164}
165
166fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
167    match args.first() {
168        Some(VmValue::String(value)) => Ok(value.to_string()),
169        _ => Err(VmError::Runtime(format!(
170            "{builtin}: first argument must be a string key"
171        ))),
172    }
173}
174
175pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
176    let overlay = current_overlay();
177    let mutation = crate::orchestration::current_mutation_session();
178    let dispatch = crate::triggers::dispatcher::current_dispatch_context();
179    let trace_context = crate::stdlib::tracing::current_trace_context();
180    let agent_session_id = crate::agent_sessions::current_session_id();
181    let agent_ancestry = agent_session_id
182        .as_deref()
183        .and_then(crate::agent_sessions::ancestry);
184    let cancelled = vm
185        .cancel_token
186        .as_ref()
187        .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
188
189    let workflow_id = overlay.workflow_id;
190    let run_id = overlay
191        .run_id
192        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
193    let stage_id = overlay.stage_id;
194    let worker_id = overlay.worker_id.or_else(|| {
195        mutation
196            .as_ref()
197            .and_then(|session| session.worker_id.clone())
198    });
199
200    let mut values = crate::value::DictMap::new();
201    insert_string(
202        &mut values,
203        "task_id",
204        Some(vm.runtime_context.task_id.clone()),
205    );
206    insert_string(
207        &mut values,
208        "parent_task_id",
209        vm.runtime_context.parent_task_id.clone(),
210    );
211    insert_string(
212        &mut values,
213        "root_task_id",
214        Some(vm.runtime_context.root_task_id.clone()),
215    );
216    insert_string(
217        &mut values,
218        "task_name",
219        vm.runtime_context.task_name.clone(),
220    );
221    insert_string(
222        &mut values,
223        "task_group_id",
224        vm.runtime_context.task_group_id.clone(),
225    );
226    insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
227    insert_string(&mut values, "workflow_id", workflow_id);
228    insert_string(&mut values, "run_id", run_id);
229    insert_string(&mut values, "stage_id", stage_id);
230    insert_string(&mut values, "worker_id", worker_id);
231    insert_string(&mut values, "agent_session_id", agent_session_id);
232    insert_string(
233        &mut values,
234        "parent_agent_session_id",
235        agent_ancestry
236            .as_ref()
237            .and_then(|ancestry| ancestry.parent_id.clone()),
238    );
239    insert_string(
240        &mut values,
241        "root_agent_session_id",
242        agent_ancestry
243            .as_ref()
244            .map(|ancestry| ancestry.root_id.clone()),
245    );
246    insert_string(&mut values, "agent_name", None);
247
248    if let Some(context) = dispatch {
249        insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
250        insert_string(
251            &mut values,
252            "trigger_event_id",
253            Some(context.trigger_event.id.0.clone()),
254        );
255        insert_string(
256            &mut values,
257            "binding_key",
258            Some(format!(
259                "{}@{}",
260                context.binding_id, context.binding_version
261            )),
262        );
263        insert_string(
264            &mut values,
265            "tenant_id",
266            context.trigger_event.tenant_id.map(|tenant| tenant.0),
267        );
268        insert_string(
269            &mut values,
270            "provider",
271            Some(context.trigger_event.provider.0),
272        );
273        insert_string(
274            &mut values,
275            "trace_id",
276            Some(context.trigger_event.trace_id.0),
277        );
278    } else {
279        insert_string(&mut values, "trigger_id", None);
280        insert_string(&mut values, "trigger_event_id", None);
281        insert_string(&mut values, "binding_key", None);
282        // Outside a trigger dispatch, fall back to the ambient
283        // `enter_tenant` scope hosts install (today: harn-serve binds
284        // it from the authenticated principal). Keeps
285        // `runtime_context.tenant_id` consistent across trigger and
286        // non-trigger code paths.
287        insert_string(
288            &mut values,
289            "tenant_id",
290            crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
291        );
292        insert_string(&mut values, "provider", None);
293        insert_string(
294            &mut values,
295            "trace_id",
296            trace_context.as_ref().map(|context| context.0.clone()),
297        );
298    }
299
300    insert_string(
301        &mut values,
302        "span_id",
303        trace_context
304            .as_ref()
305            .map(|context| context.1.clone())
306            .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
307    );
308    insert_string(&mut values, "scheduler_key", None);
309    insert_string(&mut values, "runner", None);
310    insert_string(&mut values, "capacity_class", None);
311    values.insert(
312        "context_values".to_string(),
313        VmValue::dict(vm.runtime_context.values.clone()),
314    );
315    values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
316    values.insert("debug".to_string(), debug_context_value(vm, cancelled));
317    VmValue::dict(values)
318}
319
320fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
321    let mut debug = crate::value::DictMap::new();
322    debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
323    debug.insert("waiting_reason".to_string(), VmValue::Nil);
324    debug.insert(
325        "active_task_ids".to_string(),
326        VmValue::List(std::sync::Arc::new(
327            vm.spawned_tasks
328                .keys()
329                .map(|id| VmValue::String(std::sync::Arc::from(id.as_str())))
330                .collect(),
331        )),
332    );
333    debug.insert(
334        "held_synchronization".to_string(),
335        VmValue::List(std::sync::Arc::new(Vec::new())),
336    );
337    debug.insert(
338        "supervisors".to_string(),
339        crate::stdlib::supervisor::supervisor_debug_values(),
340    );
341    VmValue::dict(debug)
342}
343
344fn insert_string(values: &mut crate::value::DictMap, key: &str, value: Option<String>) {
345    values.insert(
346        key.to_string(),
347        value
348            .map(|value| VmValue::String(std::sync::Arc::from(value)))
349            .unwrap_or(VmValue::Nil),
350    );
351}