1use std::cell::RefCell;
2
3use crate::value::{VmError, VmValue};
4
5#[derive(Clone, Debug)]
6pub struct RuntimeContext {
7 pub task_id: String,
8 pub parent_task_id: Option<String>,
9 pub root_task_id: String,
10 pub task_name: Option<String>,
11 pub task_group_id: Option<String>,
12 pub scope_id: Option<String>,
13 pub values: crate::value::DictMap,
14}
15
16impl RuntimeContext {
17 pub fn root() -> Self {
18 Self {
19 task_id: "task_root".to_string(),
20 parent_task_id: None,
21 root_task_id: "task_root".to_string(),
22 task_name: Some("root".to_string()),
23 task_group_id: None,
24 scope_id: None,
25 values: crate::value::DictMap::new(),
26 }
27 }
28
29 pub fn child_task(
30 &self,
31 task_id: impl Into<String>,
32 task_name: impl Into<String>,
33 task_group_id: Option<String>,
34 ) -> Self {
35 Self {
36 task_id: task_id.into(),
37 parent_task_id: Some(self.task_id.clone()),
38 root_task_id: self.root_task_id.clone(),
39 task_name: Some(task_name.into()),
40 task_group_id,
41 scope_id: self.scope_id.clone(),
42 values: self.values.clone(),
43 }
44 }
45}
46
47impl Default for RuntimeContext {
48 fn default() -> Self {
49 Self::root()
50 }
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RuntimeContextOverlay {
55 pub workflow_id: Option<String>,
56 pub run_id: Option<String>,
57 pub stage_id: Option<String>,
58 pub worker_id: Option<String>,
59}
60
61thread_local! {
62 static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
63 const { RefCell::new(Vec::new()) };
64}
65
66pub struct RuntimeContextOverlayGuard;
67
68pub fn install_runtime_context_overlay(
69 overlay: RuntimeContextOverlay,
70) -> RuntimeContextOverlayGuard {
71 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
72 RuntimeContextOverlayGuard
73}
74
75impl Drop for RuntimeContextOverlayGuard {
76 fn drop(&mut self) {
77 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
78 stack.borrow_mut().pop();
79 });
80 }
81}
82
83fn current_overlay() -> RuntimeContextOverlay {
84 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
85 let mut merged = RuntimeContextOverlay::default();
86 for overlay in stack.borrow().iter() {
87 if overlay.workflow_id.is_some() {
88 merged.workflow_id = overlay.workflow_id.clone();
89 }
90 if overlay.run_id.is_some() {
91 merged.run_id = overlay.run_id.clone();
92 }
93 if overlay.stage_id.is_some() {
94 merged.stage_id = overlay.stage_id.clone();
95 }
96 if overlay.worker_id.is_some() {
97 merged.worker_id = overlay.worker_id.clone();
98 }
99 }
100 merged
101 })
102}
103
104pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
105 for name in [
106 "runtime_context",
107 "task_current",
108 "runtime_context_values",
109 "runtime_context_get",
110 "runtime_context_set",
111 "runtime_context_clear",
112 ] {
113 vm.register_builtin(name, move |_args, _out| {
114 Err(VmError::Runtime(format!(
115 "{name}: internal runtime context builtin was not intercepted"
116 )))
117 });
118 }
119}
120
121pub(crate) fn dispatch_runtime_context_builtin(
122 vm: &mut crate::vm::Vm,
123 name: &str,
124 args: &[VmValue],
125) -> Option<Result<VmValue, VmError>> {
126 match name {
127 "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
128 "runtime_context_values" => Some(Ok(VmValue::dict(vm.runtime_context.values.clone()))),
129 "runtime_context_get" => Some(runtime_context_get(vm, args)),
130 "runtime_context_set" => Some(runtime_context_set(vm, args)),
131 "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
132 _ => None,
133 }
134}
135
136fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
137 let key = require_key(args, "runtime_context_get")?;
138 Ok(vm
139 .runtime_context
140 .values
141 .get(&key)
142 .cloned()
143 .or_else(|| args.get(1).cloned())
144 .unwrap_or(VmValue::Nil))
145}
146
147fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
148 let key = require_key(args, "runtime_context_set")?;
149 let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
150 Ok(vm
151 .runtime_context
152 .values
153 .insert(key, value)
154 .unwrap_or(VmValue::Nil))
155}
156
157fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
158 let key = require_key(args, "runtime_context_clear")?;
159 Ok(vm
160 .runtime_context
161 .values
162 .remove(&key)
163 .unwrap_or(VmValue::Nil))
164}
165
166fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
167 match args.first() {
168 Some(VmValue::String(value)) => Ok(value.to_string()),
169 _ => Err(VmError::Runtime(format!(
170 "{builtin}: first argument must be a string key"
171 ))),
172 }
173}
174
175pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
176 let overlay = current_overlay();
177 let mutation = crate::orchestration::current_mutation_session();
178 let dispatch = crate::triggers::dispatcher::current_dispatch_context();
179 let trace_context = crate::stdlib::tracing::current_trace_context();
180 let agent_session_id = crate::agent_sessions::current_session_id();
181 let agent_ancestry = agent_session_id
182 .as_deref()
183 .and_then(crate::agent_sessions::ancestry);
184 let cancelled = vm
185 .cancel_token
186 .as_ref()
187 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
188
189 let workflow_id = overlay.workflow_id;
190 let run_id = overlay
191 .run_id
192 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
193 let stage_id = overlay.stage_id;
194 let worker_id = overlay.worker_id.or_else(|| {
195 mutation
196 .as_ref()
197 .and_then(|session| session.worker_id.clone())
198 });
199
200 let mut values = crate::value::DictMap::new();
201 insert_string(
202 &mut values,
203 "task_id",
204 Some(vm.runtime_context.task_id.clone()),
205 );
206 insert_string(
207 &mut values,
208 "parent_task_id",
209 vm.runtime_context.parent_task_id.clone(),
210 );
211 insert_string(
212 &mut values,
213 "root_task_id",
214 Some(vm.runtime_context.root_task_id.clone()),
215 );
216 insert_string(
217 &mut values,
218 "task_name",
219 vm.runtime_context.task_name.clone(),
220 );
221 insert_string(
222 &mut values,
223 "task_group_id",
224 vm.runtime_context.task_group_id.clone(),
225 );
226 insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
227 insert_string(&mut values, "workflow_id", workflow_id);
228 insert_string(&mut values, "run_id", run_id);
229 insert_string(&mut values, "stage_id", stage_id);
230 insert_string(&mut values, "worker_id", worker_id);
231 insert_string(&mut values, "agent_session_id", agent_session_id);
232 insert_string(
233 &mut values,
234 "parent_agent_session_id",
235 agent_ancestry
236 .as_ref()
237 .and_then(|ancestry| ancestry.parent_id.clone()),
238 );
239 insert_string(
240 &mut values,
241 "root_agent_session_id",
242 agent_ancestry
243 .as_ref()
244 .map(|ancestry| ancestry.root_id.clone()),
245 );
246 insert_string(&mut values, "agent_name", None);
247
248 if let Some(context) = dispatch {
249 insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
250 insert_string(
251 &mut values,
252 "trigger_event_id",
253 Some(context.trigger_event.id.0.clone()),
254 );
255 insert_string(
256 &mut values,
257 "binding_key",
258 Some(format!(
259 "{}@{}",
260 context.binding_id, context.binding_version
261 )),
262 );
263 insert_string(
264 &mut values,
265 "tenant_id",
266 context.trigger_event.tenant_id.map(|tenant| tenant.0),
267 );
268 insert_string(
269 &mut values,
270 "provider",
271 Some(context.trigger_event.provider.0),
272 );
273 insert_string(
274 &mut values,
275 "trace_id",
276 Some(context.trigger_event.trace_id.0),
277 );
278 } else {
279 insert_string(&mut values, "trigger_id", None);
280 insert_string(&mut values, "trigger_event_id", None);
281 insert_string(&mut values, "binding_key", None);
282 insert_string(
288 &mut values,
289 "tenant_id",
290 crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
291 );
292 insert_string(&mut values, "provider", None);
293 insert_string(
294 &mut values,
295 "trace_id",
296 trace_context.as_ref().map(|context| context.0.clone()),
297 );
298 }
299
300 insert_string(
301 &mut values,
302 "span_id",
303 trace_context
304 .as_ref()
305 .map(|context| context.1.clone())
306 .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
307 );
308 insert_string(&mut values, "scheduler_key", None);
309 insert_string(&mut values, "runner", None);
310 insert_string(&mut values, "capacity_class", None);
311 values.insert(
312 "context_values".to_string(),
313 VmValue::dict(vm.runtime_context.values.clone()),
314 );
315 values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
316 values.insert("debug".to_string(), debug_context_value(vm, cancelled));
317 VmValue::dict(values)
318}
319
320fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
321 let mut debug = crate::value::DictMap::new();
322 debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
323 debug.insert("waiting_reason".to_string(), VmValue::Nil);
324 debug.insert(
325 "active_task_ids".to_string(),
326 VmValue::List(std::sync::Arc::new(
327 vm.spawned_tasks
328 .keys()
329 .map(|id| VmValue::String(std::sync::Arc::from(id.as_str())))
330 .collect(),
331 )),
332 );
333 debug.insert(
334 "held_synchronization".to_string(),
335 VmValue::List(std::sync::Arc::new(Vec::new())),
336 );
337 debug.insert(
338 "supervisors".to_string(),
339 crate::stdlib::supervisor::supervisor_debug_values(),
340 );
341 VmValue::dict(debug)
342}
343
344fn insert_string(values: &mut crate::value::DictMap, key: &str, value: Option<String>) {
345 values.insert(
346 key.to_string(),
347 value
348 .map(|value| VmValue::String(std::sync::Arc::from(value)))
349 .unwrap_or(VmValue::Nil),
350 );
351}