Skip to main content

harn_vm/
runtime_context.rs

1use std::cell::RefCell;
2
3use crate::value::{VmError, VmValue};
4
5#[derive(Clone, Debug)]
6pub struct RuntimeContext {
7    pub task_id: String,
8    pub parent_task_id: Option<String>,
9    pub root_task_id: String,
10    pub task_name: Option<String>,
11    pub task_group_id: Option<String>,
12    pub scope_id: Option<String>,
13    pub values: crate::value::DictMap,
14}
15
16impl RuntimeContext {
17    pub fn root() -> Self {
18        Self {
19            task_id: "task_root".to_string(),
20            parent_task_id: None,
21            root_task_id: "task_root".to_string(),
22            task_name: Some("root".to_string()),
23            task_group_id: None,
24            scope_id: None,
25            values: crate::value::DictMap::new(),
26        }
27    }
28
29    pub fn child_task(
30        &self,
31        task_id: impl Into<String>,
32        task_name: impl Into<String>,
33        task_group_id: Option<String>,
34    ) -> Self {
35        Self {
36            task_id: task_id.into(),
37            parent_task_id: Some(self.task_id.clone()),
38            root_task_id: self.root_task_id.clone(),
39            task_name: Some(task_name.into()),
40            task_group_id,
41            scope_id: self.scope_id.clone(),
42            values: self.values.clone(),
43        }
44    }
45}
46
47impl Default for RuntimeContext {
48    fn default() -> Self {
49        Self::root()
50    }
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RuntimeContextOverlay {
55    pub workflow_id: Option<String>,
56    pub run_id: Option<String>,
57    pub stage_id: Option<String>,
58    pub worker_id: Option<String>,
59}
60
61thread_local! {
62    static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
63        const { RefCell::new(Vec::new()) };
64}
65
66pub struct RuntimeContextOverlayGuard;
67
68pub fn install_runtime_context_overlay(
69    overlay: RuntimeContextOverlay,
70) -> RuntimeContextOverlayGuard {
71    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
72    RuntimeContextOverlayGuard
73}
74
75/// Per-task ambient-scope swap of the runtime-context overlay stack. See
76/// `orchestration::ambient_scope`: the overlay attributes events (worker_id,
77/// run_id) to the running task, so it must follow that task across `.await`
78/// rather than leak to cooperatively-scheduled siblings.
79pub(crate) fn swap_runtime_context_overlay_stack(
80    next: Vec<RuntimeContextOverlay>,
81) -> Vec<RuntimeContextOverlay> {
82    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), next))
83}
84
85impl Drop for RuntimeContextOverlayGuard {
86    fn drop(&mut self) {
87        RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
88            stack.borrow_mut().pop();
89        });
90    }
91}
92
93fn current_overlay() -> RuntimeContextOverlay {
94    RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
95        let mut merged = RuntimeContextOverlay::default();
96        for overlay in stack.borrow().iter() {
97            if overlay.workflow_id.is_some() {
98                merged.workflow_id = overlay.workflow_id.clone();
99            }
100            if overlay.run_id.is_some() {
101                merged.run_id = overlay.run_id.clone();
102            }
103            if overlay.stage_id.is_some() {
104                merged.stage_id = overlay.stage_id.clone();
105            }
106            if overlay.worker_id.is_some() {
107                merged.worker_id = overlay.worker_id.clone();
108            }
109        }
110        merged
111    })
112}
113
114pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
115    for name in [
116        "runtime_context",
117        "task_current",
118        "runtime_context_values",
119        "runtime_context_get",
120        "runtime_context_set",
121        "runtime_context_clear",
122    ] {
123        vm.register_builtin(name, move |_args, _out| {
124            Err(VmError::Runtime(format!(
125                "{name}: internal runtime context builtin was not intercepted"
126            )))
127        });
128    }
129}
130
131pub(crate) fn dispatch_runtime_context_builtin(
132    vm: &mut crate::vm::Vm,
133    name: &str,
134    args: &[VmValue],
135) -> Option<Result<VmValue, VmError>> {
136    match name {
137        "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
138        "runtime_context_values" => Some(Ok(VmValue::dict(vm.runtime_context.values.clone()))),
139        "runtime_context_get" => Some(runtime_context_get(vm, args)),
140        "runtime_context_set" => Some(runtime_context_set(vm, args)),
141        "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
142        _ => None,
143    }
144}
145
146fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
147    let key = require_key(args, "runtime_context_get")?;
148    Ok(vm
149        .runtime_context
150        .values
151        .get(key.as_str())
152        .cloned()
153        .or_else(|| args.get(1).cloned())
154        .unwrap_or(VmValue::Nil))
155}
156
157fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
158    let key = require_key(args, "runtime_context_set")?;
159    let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
160    Ok(vm
161        .runtime_context
162        .values
163        .insert(crate::value::intern_key(&key), value)
164        .unwrap_or(VmValue::Nil))
165}
166
167fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
168    let key = require_key(args, "runtime_context_clear")?;
169    Ok(vm
170        .runtime_context
171        .values
172        .remove(key.as_str())
173        .unwrap_or(VmValue::Nil))
174}
175
176fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
177    match args.first() {
178        Some(VmValue::String(value)) => Ok(value.to_string()),
179        _ => Err(VmError::Runtime(format!(
180            "{builtin}: first argument must be a string key"
181        ))),
182    }
183}
184
185pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
186    let overlay = current_overlay();
187    let mutation = crate::orchestration::current_mutation_session();
188    let dispatch = crate::triggers::dispatcher::current_dispatch_context();
189    let trace_context = crate::stdlib::tracing::current_trace_context();
190    let agent_session_id = crate::agent_sessions::current_session_id();
191    let agent_ancestry = agent_session_id
192        .as_deref()
193        .and_then(crate::agent_sessions::ancestry);
194    let cancelled = vm
195        .cancel_token
196        .as_ref()
197        .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
198
199    let workflow_id = overlay.workflow_id;
200    let run_id = overlay
201        .run_id
202        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
203    let stage_id = overlay.stage_id;
204    let worker_id = overlay.worker_id.or_else(|| {
205        mutation
206            .as_ref()
207            .and_then(|session| session.worker_id.clone())
208    });
209
210    let mut values = crate::value::DictMap::new();
211    insert_string(
212        &mut values,
213        "task_id",
214        Some(vm.runtime_context.task_id.clone()),
215    );
216    insert_string(
217        &mut values,
218        "parent_task_id",
219        vm.runtime_context.parent_task_id.clone(),
220    );
221    insert_string(
222        &mut values,
223        "root_task_id",
224        Some(vm.runtime_context.root_task_id.clone()),
225    );
226    insert_string(
227        &mut values,
228        "task_name",
229        vm.runtime_context.task_name.clone(),
230    );
231    insert_string(
232        &mut values,
233        "task_group_id",
234        vm.runtime_context.task_group_id.clone(),
235    );
236    insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
237    insert_string(&mut values, "workflow_id", workflow_id);
238    insert_string(&mut values, "run_id", run_id);
239    insert_string(&mut values, "stage_id", stage_id);
240    insert_string(&mut values, "worker_id", worker_id);
241    insert_string(&mut values, "agent_session_id", agent_session_id);
242    insert_string(
243        &mut values,
244        "parent_agent_session_id",
245        agent_ancestry
246            .as_ref()
247            .and_then(|ancestry| ancestry.parent_id.clone()),
248    );
249    insert_string(
250        &mut values,
251        "root_agent_session_id",
252        agent_ancestry
253            .as_ref()
254            .map(|ancestry| ancestry.root_id.clone()),
255    );
256    insert_string(&mut values, "agent_name", None);
257
258    if let Some(context) = dispatch {
259        insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
260        insert_string(
261            &mut values,
262            "trigger_event_id",
263            Some(context.trigger_event.id.0.clone()),
264        );
265        insert_string(
266            &mut values,
267            "binding_key",
268            Some(format!(
269                "{}@{}",
270                context.binding_id, context.binding_version
271            )),
272        );
273        insert_string(
274            &mut values,
275            "tenant_id",
276            context.trigger_event.tenant_id.map(|tenant| tenant.0),
277        );
278        insert_string(
279            &mut values,
280            "provider",
281            Some(context.trigger_event.provider.0),
282        );
283        insert_string(
284            &mut values,
285            "trace_id",
286            Some(context.trigger_event.trace_id.0),
287        );
288    } else {
289        insert_string(&mut values, "trigger_id", None);
290        insert_string(&mut values, "trigger_event_id", None);
291        insert_string(&mut values, "binding_key", None);
292        // Outside a trigger dispatch, fall back to the ambient
293        // `enter_tenant` scope hosts install (today: harn-serve binds
294        // it from the authenticated principal). Keeps
295        // `runtime_context.tenant_id` consistent across trigger and
296        // non-trigger code paths.
297        insert_string(
298            &mut values,
299            "tenant_id",
300            crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
301        );
302        insert_string(&mut values, "provider", None);
303        insert_string(
304            &mut values,
305            "trace_id",
306            trace_context.as_ref().map(|context| context.0.clone()),
307        );
308    }
309
310    insert_string(
311        &mut values,
312        "span_id",
313        trace_context
314            .as_ref()
315            .map(|context| context.1.clone())
316            .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
317    );
318    insert_string(&mut values, "scheduler_key", None);
319    insert_string(&mut values, "runner", None);
320    insert_string(&mut values, "capacity_class", None);
321    values.insert(
322        crate::value::intern_key("context_values"),
323        VmValue::dict(vm.runtime_context.values.clone()),
324    );
325    values.insert(
326        crate::value::intern_key("cancelled"),
327        VmValue::Bool(cancelled),
328    );
329    values.insert(
330        crate::value::intern_key("debug"),
331        debug_context_value(vm, cancelled),
332    );
333    VmValue::dict(values)
334}
335
336fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
337    let mut debug = crate::value::DictMap::new();
338    debug.insert(
339        crate::value::intern_key("cancelled"),
340        VmValue::Bool(cancelled),
341    );
342    debug.insert(crate::value::intern_key("waiting_reason"), VmValue::Nil);
343    debug.insert(
344        crate::value::intern_key("active_task_ids"),
345        VmValue::List(std::sync::Arc::new(
346            vm.spawned_tasks
347                .keys()
348                .map(|id| VmValue::String(arcstr::ArcStr::from(id.as_str())))
349                .collect(),
350        )),
351    );
352    debug.insert(
353        crate::value::intern_key("held_synchronization"),
354        VmValue::List(std::sync::Arc::new(Vec::new())),
355    );
356    debug.insert(
357        crate::value::intern_key("supervisors"),
358        crate::stdlib::supervisor::supervisor_debug_values(),
359    );
360    VmValue::dict(debug)
361}
362
363fn insert_string(values: &mut crate::value::DictMap, key: &str, value: Option<String>) {
364    values.insert(
365        crate::value::intern_key(key),
366        value
367            .map(|value| VmValue::String(arcstr::ArcStr::from(value)))
368            .unwrap_or(VmValue::Nil),
369    );
370}