1use std::cell::RefCell;
2
3use crate::value::{VmError, VmValue};
4
5#[derive(Clone, Debug)]
6pub struct RuntimeContext {
7 pub task_id: String,
8 pub parent_task_id: Option<String>,
9 pub root_task_id: String,
10 pub task_name: Option<String>,
11 pub task_group_id: Option<String>,
12 pub scope_id: Option<String>,
13 pub values: crate::value::DictMap,
14}
15
16impl RuntimeContext {
17 pub fn root() -> Self {
18 Self {
19 task_id: "task_root".to_string(),
20 parent_task_id: None,
21 root_task_id: "task_root".to_string(),
22 task_name: Some("root".to_string()),
23 task_group_id: None,
24 scope_id: None,
25 values: crate::value::DictMap::new(),
26 }
27 }
28
29 pub fn child_task(
30 &self,
31 task_id: impl Into<String>,
32 task_name: impl Into<String>,
33 task_group_id: Option<String>,
34 ) -> Self {
35 Self {
36 task_id: task_id.into(),
37 parent_task_id: Some(self.task_id.clone()),
38 root_task_id: self.root_task_id.clone(),
39 task_name: Some(task_name.into()),
40 task_group_id,
41 scope_id: self.scope_id.clone(),
42 values: self.values.clone(),
43 }
44 }
45}
46
47impl Default for RuntimeContext {
48 fn default() -> Self {
49 Self::root()
50 }
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RuntimeContextOverlay {
55 pub workflow_id: Option<String>,
56 pub run_id: Option<String>,
57 pub stage_id: Option<String>,
58 pub worker_id: Option<String>,
59}
60
61thread_local! {
62 static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
63 const { RefCell::new(Vec::new()) };
64}
65
66pub struct RuntimeContextOverlayGuard;
67
68pub fn install_runtime_context_overlay(
69 overlay: RuntimeContextOverlay,
70) -> RuntimeContextOverlayGuard {
71 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
72 RuntimeContextOverlayGuard
73}
74
75pub(crate) fn swap_runtime_context_overlay_stack(
80 next: Vec<RuntimeContextOverlay>,
81) -> Vec<RuntimeContextOverlay> {
82 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| std::mem::replace(&mut *stack.borrow_mut(), next))
83}
84
85impl Drop for RuntimeContextOverlayGuard {
86 fn drop(&mut self) {
87 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
88 stack.borrow_mut().pop();
89 });
90 }
91}
92
93fn current_overlay() -> RuntimeContextOverlay {
94 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
95 let mut merged = RuntimeContextOverlay::default();
96 for overlay in stack.borrow().iter() {
97 if overlay.workflow_id.is_some() {
98 merged.workflow_id = overlay.workflow_id.clone();
99 }
100 if overlay.run_id.is_some() {
101 merged.run_id = overlay.run_id.clone();
102 }
103 if overlay.stage_id.is_some() {
104 merged.stage_id = overlay.stage_id.clone();
105 }
106 if overlay.worker_id.is_some() {
107 merged.worker_id = overlay.worker_id.clone();
108 }
109 }
110 merged
111 })
112}
113
114pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
115 for name in [
116 "runtime_context",
117 "task_current",
118 "runtime_context_values",
119 "runtime_context_get",
120 "runtime_context_set",
121 "runtime_context_clear",
122 ] {
123 vm.register_builtin(name, move |_args, _out| {
124 Err(VmError::Runtime(format!(
125 "{name}: internal runtime context builtin was not intercepted"
126 )))
127 });
128 }
129}
130
131pub(crate) fn dispatch_runtime_context_builtin(
132 vm: &mut crate::vm::Vm,
133 name: &str,
134 args: &[VmValue],
135) -> Option<Result<VmValue, VmError>> {
136 match name {
137 "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
138 "runtime_context_values" => Some(Ok(VmValue::dict(vm.runtime_context.values.clone()))),
139 "runtime_context_get" => Some(runtime_context_get(vm, args)),
140 "runtime_context_set" => Some(runtime_context_set(vm, args)),
141 "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
142 _ => None,
143 }
144}
145
146fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
147 let key = require_key(args, "runtime_context_get")?;
148 Ok(vm
149 .runtime_context
150 .values
151 .get(key.as_str())
152 .cloned()
153 .or_else(|| args.get(1).cloned())
154 .unwrap_or(VmValue::Nil))
155}
156
157fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
158 let key = require_key(args, "runtime_context_set")?;
159 let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
160 Ok(vm
161 .runtime_context
162 .values
163 .insert(crate::value::intern_key(&key), value)
164 .unwrap_or(VmValue::Nil))
165}
166
167fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
168 let key = require_key(args, "runtime_context_clear")?;
169 Ok(vm
170 .runtime_context
171 .values
172 .remove(key.as_str())
173 .unwrap_or(VmValue::Nil))
174}
175
176fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
177 match args.first() {
178 Some(VmValue::String(value)) => Ok(value.to_string()),
179 _ => Err(VmError::Runtime(format!(
180 "{builtin}: first argument must be a string key"
181 ))),
182 }
183}
184
185pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
186 let overlay = current_overlay();
187 let mutation = crate::orchestration::current_mutation_session();
188 let dispatch = crate::triggers::dispatcher::current_dispatch_context();
189 let trace_context = crate::stdlib::tracing::current_trace_context();
190 let agent_session_id = crate::agent_sessions::current_session_id();
191 let agent_ancestry = agent_session_id
192 .as_deref()
193 .and_then(crate::agent_sessions::ancestry);
194 let cancelled = vm
195 .cancel_token
196 .as_ref()
197 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
198
199 let workflow_id = overlay.workflow_id;
200 let run_id = overlay
201 .run_id
202 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
203 let stage_id = overlay.stage_id;
204 let worker_id = overlay.worker_id.or_else(|| {
205 mutation
206 .as_ref()
207 .and_then(|session| session.worker_id.clone())
208 });
209
210 let mut values = crate::value::DictMap::new();
211 insert_string(
212 &mut values,
213 "task_id",
214 Some(vm.runtime_context.task_id.clone()),
215 );
216 insert_string(
217 &mut values,
218 "parent_task_id",
219 vm.runtime_context.parent_task_id.clone(),
220 );
221 insert_string(
222 &mut values,
223 "root_task_id",
224 Some(vm.runtime_context.root_task_id.clone()),
225 );
226 insert_string(
227 &mut values,
228 "task_name",
229 vm.runtime_context.task_name.clone(),
230 );
231 insert_string(
232 &mut values,
233 "task_group_id",
234 vm.runtime_context.task_group_id.clone(),
235 );
236 insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
237 insert_string(&mut values, "workflow_id", workflow_id);
238 insert_string(&mut values, "run_id", run_id);
239 insert_string(&mut values, "stage_id", stage_id);
240 insert_string(&mut values, "worker_id", worker_id);
241 insert_string(&mut values, "agent_session_id", agent_session_id);
242 insert_string(
243 &mut values,
244 "parent_agent_session_id",
245 agent_ancestry
246 .as_ref()
247 .and_then(|ancestry| ancestry.parent_id.clone()),
248 );
249 insert_string(
250 &mut values,
251 "root_agent_session_id",
252 agent_ancestry
253 .as_ref()
254 .map(|ancestry| ancestry.root_id.clone()),
255 );
256 insert_string(&mut values, "agent_name", None);
257
258 if let Some(context) = dispatch {
259 insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
260 insert_string(
261 &mut values,
262 "trigger_event_id",
263 Some(context.trigger_event.id.0.clone()),
264 );
265 insert_string(
266 &mut values,
267 "binding_key",
268 Some(format!(
269 "{}@{}",
270 context.binding_id, context.binding_version
271 )),
272 );
273 insert_string(
274 &mut values,
275 "tenant_id",
276 context.trigger_event.tenant_id.map(|tenant| tenant.0),
277 );
278 insert_string(
279 &mut values,
280 "provider",
281 Some(context.trigger_event.provider.0),
282 );
283 insert_string(
284 &mut values,
285 "trace_id",
286 Some(context.trigger_event.trace_id.0),
287 );
288 } else {
289 insert_string(&mut values, "trigger_id", None);
290 insert_string(&mut values, "trigger_event_id", None);
291 insert_string(&mut values, "binding_key", None);
292 insert_string(
298 &mut values,
299 "tenant_id",
300 crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
301 );
302 insert_string(&mut values, "provider", None);
303 insert_string(
304 &mut values,
305 "trace_id",
306 trace_context.as_ref().map(|context| context.0.clone()),
307 );
308 }
309
310 insert_string(
311 &mut values,
312 "span_id",
313 trace_context
314 .as_ref()
315 .map(|context| context.1.clone())
316 .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
317 );
318 insert_string(&mut values, "scheduler_key", None);
319 insert_string(&mut values, "runner", None);
320 insert_string(&mut values, "capacity_class", None);
321 values.insert(
322 crate::value::intern_key("context_values"),
323 VmValue::dict(vm.runtime_context.values.clone()),
324 );
325 values.insert(
326 crate::value::intern_key("cancelled"),
327 VmValue::Bool(cancelled),
328 );
329 values.insert(
330 crate::value::intern_key("debug"),
331 debug_context_value(vm, cancelled),
332 );
333 VmValue::dict(values)
334}
335
336fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
337 let mut debug = crate::value::DictMap::new();
338 debug.insert(
339 crate::value::intern_key("cancelled"),
340 VmValue::Bool(cancelled),
341 );
342 debug.insert(crate::value::intern_key("waiting_reason"), VmValue::Nil);
343 debug.insert(
344 crate::value::intern_key("active_task_ids"),
345 VmValue::List(std::sync::Arc::new(
346 vm.spawned_tasks
347 .keys()
348 .map(|id| VmValue::String(arcstr::ArcStr::from(id.as_str())))
349 .collect(),
350 )),
351 );
352 debug.insert(
353 crate::value::intern_key("held_synchronization"),
354 VmValue::List(std::sync::Arc::new(Vec::new())),
355 );
356 debug.insert(
357 crate::value::intern_key("supervisors"),
358 crate::stdlib::supervisor::supervisor_debug_values(),
359 );
360 VmValue::dict(debug)
361}
362
363fn insert_string(values: &mut crate::value::DictMap, key: &str, value: Option<String>) {
364 values.insert(
365 crate::value::intern_key(key),
366 value
367 .map(|value| VmValue::String(arcstr::ArcStr::from(value)))
368 .unwrap_or(VmValue::Nil),
369 );
370}