1use std::cell::RefCell;
2use std::collections::BTreeMap;
3
4use crate::value::{VmError, VmValue};
5
6#[derive(Clone, Debug)]
7pub struct RuntimeContext {
8 pub task_id: String,
9 pub parent_task_id: Option<String>,
10 pub root_task_id: String,
11 pub task_name: Option<String>,
12 pub task_group_id: Option<String>,
13 pub scope_id: Option<String>,
14 pub values: BTreeMap<String, VmValue>,
15}
16
17impl RuntimeContext {
18 pub fn root() -> Self {
19 Self {
20 task_id: "task_root".to_string(),
21 parent_task_id: None,
22 root_task_id: "task_root".to_string(),
23 task_name: Some("root".to_string()),
24 task_group_id: None,
25 scope_id: None,
26 values: BTreeMap::new(),
27 }
28 }
29
30 pub fn child_task(
31 &self,
32 task_id: impl Into<String>,
33 task_name: impl Into<String>,
34 task_group_id: Option<String>,
35 ) -> Self {
36 Self {
37 task_id: task_id.into(),
38 parent_task_id: Some(self.task_id.clone()),
39 root_task_id: self.root_task_id.clone(),
40 task_name: Some(task_name.into()),
41 task_group_id,
42 scope_id: self.scope_id.clone(),
43 values: self.values.clone(),
44 }
45 }
46}
47
48impl Default for RuntimeContext {
49 fn default() -> Self {
50 Self::root()
51 }
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RuntimeContextOverlay {
56 pub workflow_id: Option<String>,
57 pub run_id: Option<String>,
58 pub stage_id: Option<String>,
59 pub worker_id: Option<String>,
60}
61
62thread_local! {
63 static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
64 const { RefCell::new(Vec::new()) };
65}
66
67pub struct RuntimeContextOverlayGuard;
68
69pub fn install_runtime_context_overlay(
70 overlay: RuntimeContextOverlay,
71) -> RuntimeContextOverlayGuard {
72 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
73 RuntimeContextOverlayGuard
74}
75
76impl Drop for RuntimeContextOverlayGuard {
77 fn drop(&mut self) {
78 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
79 stack.borrow_mut().pop();
80 });
81 }
82}
83
84fn current_overlay() -> RuntimeContextOverlay {
85 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
86 let mut merged = RuntimeContextOverlay::default();
87 for overlay in stack.borrow().iter() {
88 if overlay.workflow_id.is_some() {
89 merged.workflow_id = overlay.workflow_id.clone();
90 }
91 if overlay.run_id.is_some() {
92 merged.run_id = overlay.run_id.clone();
93 }
94 if overlay.stage_id.is_some() {
95 merged.stage_id = overlay.stage_id.clone();
96 }
97 if overlay.worker_id.is_some() {
98 merged.worker_id = overlay.worker_id.clone();
99 }
100 }
101 merged
102 })
103}
104
105pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
106 for name in [
107 "runtime_context",
108 "task_current",
109 "runtime_context_values",
110 "runtime_context_get",
111 "runtime_context_set",
112 "runtime_context_clear",
113 ] {
114 vm.register_builtin(name, move |_args, _out| {
115 Err(VmError::Runtime(format!(
116 "{name}: internal runtime context builtin was not intercepted"
117 )))
118 });
119 }
120}
121
122pub(crate) fn dispatch_runtime_context_builtin(
123 vm: &mut crate::vm::Vm,
124 name: &str,
125 args: &[VmValue],
126) -> Option<Result<VmValue, VmError>> {
127 match name {
128 "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
129 "runtime_context_values" => Some(Ok(VmValue::Dict(std::sync::Arc::new(
130 vm.runtime_context.values.clone(),
131 )))),
132 "runtime_context_get" => Some(runtime_context_get(vm, args)),
133 "runtime_context_set" => Some(runtime_context_set(vm, args)),
134 "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
135 _ => None,
136 }
137}
138
139fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
140 let key = require_key(args, "runtime_context_get")?;
141 Ok(vm
142 .runtime_context
143 .values
144 .get(&key)
145 .cloned()
146 .or_else(|| args.get(1).cloned())
147 .unwrap_or(VmValue::Nil))
148}
149
150fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
151 let key = require_key(args, "runtime_context_set")?;
152 let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
153 Ok(vm
154 .runtime_context
155 .values
156 .insert(key, value)
157 .unwrap_or(VmValue::Nil))
158}
159
160fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
161 let key = require_key(args, "runtime_context_clear")?;
162 Ok(vm
163 .runtime_context
164 .values
165 .remove(&key)
166 .unwrap_or(VmValue::Nil))
167}
168
169fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
170 match args.first() {
171 Some(VmValue::String(value)) => Ok(value.to_string()),
172 _ => Err(VmError::Runtime(format!(
173 "{builtin}: first argument must be a string key"
174 ))),
175 }
176}
177
178pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
179 let overlay = current_overlay();
180 let mutation = crate::orchestration::current_mutation_session();
181 let dispatch = crate::triggers::dispatcher::current_dispatch_context();
182 let trace_context = crate::stdlib::tracing::current_trace_context();
183 let agent_session_id = crate::agent_sessions::current_session_id();
184 let agent_ancestry = agent_session_id
185 .as_deref()
186 .and_then(crate::agent_sessions::ancestry);
187 let cancelled = vm
188 .cancel_token
189 .as_ref()
190 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
191
192 let workflow_id = overlay.workflow_id;
193 let run_id = overlay
194 .run_id
195 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
196 let stage_id = overlay.stage_id;
197 let worker_id = overlay.worker_id.or_else(|| {
198 mutation
199 .as_ref()
200 .and_then(|session| session.worker_id.clone())
201 });
202
203 let mut values = BTreeMap::new();
204 insert_string(
205 &mut values,
206 "task_id",
207 Some(vm.runtime_context.task_id.clone()),
208 );
209 insert_string(
210 &mut values,
211 "parent_task_id",
212 vm.runtime_context.parent_task_id.clone(),
213 );
214 insert_string(
215 &mut values,
216 "root_task_id",
217 Some(vm.runtime_context.root_task_id.clone()),
218 );
219 insert_string(
220 &mut values,
221 "task_name",
222 vm.runtime_context.task_name.clone(),
223 );
224 insert_string(
225 &mut values,
226 "task_group_id",
227 vm.runtime_context.task_group_id.clone(),
228 );
229 insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
230 insert_string(&mut values, "workflow_id", workflow_id);
231 insert_string(&mut values, "run_id", run_id);
232 insert_string(&mut values, "stage_id", stage_id);
233 insert_string(&mut values, "worker_id", worker_id);
234 insert_string(&mut values, "agent_session_id", agent_session_id);
235 insert_string(
236 &mut values,
237 "parent_agent_session_id",
238 agent_ancestry
239 .as_ref()
240 .and_then(|ancestry| ancestry.parent_id.clone()),
241 );
242 insert_string(
243 &mut values,
244 "root_agent_session_id",
245 agent_ancestry
246 .as_ref()
247 .map(|ancestry| ancestry.root_id.clone()),
248 );
249 insert_string(&mut values, "agent_name", None);
250
251 if let Some(context) = dispatch {
252 insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
253 insert_string(
254 &mut values,
255 "trigger_event_id",
256 Some(context.trigger_event.id.0.clone()),
257 );
258 insert_string(
259 &mut values,
260 "binding_key",
261 Some(format!(
262 "{}@{}",
263 context.binding_id, context.binding_version
264 )),
265 );
266 insert_string(
267 &mut values,
268 "tenant_id",
269 context.trigger_event.tenant_id.map(|tenant| tenant.0),
270 );
271 insert_string(
272 &mut values,
273 "provider",
274 Some(context.trigger_event.provider.0),
275 );
276 insert_string(
277 &mut values,
278 "trace_id",
279 Some(context.trigger_event.trace_id.0),
280 );
281 } else {
282 insert_string(&mut values, "trigger_id", None);
283 insert_string(&mut values, "trigger_event_id", None);
284 insert_string(&mut values, "binding_key", None);
285 insert_string(
291 &mut values,
292 "tenant_id",
293 crate::harness_tenant::current_tenant_id().map(|tenant| tenant.0),
294 );
295 insert_string(&mut values, "provider", None);
296 insert_string(
297 &mut values,
298 "trace_id",
299 trace_context.as_ref().map(|context| context.0.clone()),
300 );
301 }
302
303 insert_string(
304 &mut values,
305 "span_id",
306 trace_context
307 .as_ref()
308 .map(|context| context.1.clone())
309 .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
310 );
311 insert_string(&mut values, "scheduler_key", None);
312 insert_string(&mut values, "runner", None);
313 insert_string(&mut values, "capacity_class", None);
314 values.insert(
315 "context_values".to_string(),
316 VmValue::Dict(std::sync::Arc::new(vm.runtime_context.values.clone())),
317 );
318 values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
319 values.insert("debug".to_string(), debug_context_value(vm, cancelled));
320 VmValue::Dict(std::sync::Arc::new(values))
321}
322
323fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
324 let mut debug = BTreeMap::new();
325 debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
326 debug.insert("waiting_reason".to_string(), VmValue::Nil);
327 debug.insert(
328 "active_task_ids".to_string(),
329 VmValue::List(std::sync::Arc::new(
330 vm.spawned_tasks
331 .keys()
332 .map(|id| VmValue::String(std::sync::Arc::from(id.as_str())))
333 .collect(),
334 )),
335 );
336 debug.insert(
337 "held_synchronization".to_string(),
338 VmValue::List(std::sync::Arc::new(Vec::new())),
339 );
340 debug.insert(
341 "supervisors".to_string(),
342 crate::stdlib::supervisor::supervisor_debug_values(),
343 );
344 VmValue::Dict(std::sync::Arc::new(debug))
345}
346
347fn insert_string(values: &mut BTreeMap<String, VmValue>, key: &str, value: Option<String>) {
348 values.insert(
349 key.to_string(),
350 value
351 .map(|value| VmValue::String(std::sync::Arc::from(value)))
352 .unwrap_or(VmValue::Nil),
353 );
354}