1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::rc::Rc;
4
5use crate::value::{VmError, VmValue};
6
7#[derive(Clone, Debug)]
8pub struct RuntimeContext {
9 pub task_id: String,
10 pub parent_task_id: Option<String>,
11 pub root_task_id: String,
12 pub task_name: Option<String>,
13 pub task_group_id: Option<String>,
14 pub scope_id: Option<String>,
15 pub values: BTreeMap<String, VmValue>,
16}
17
18impl RuntimeContext {
19 pub fn root() -> Self {
20 Self {
21 task_id: "task_root".to_string(),
22 parent_task_id: None,
23 root_task_id: "task_root".to_string(),
24 task_name: Some("root".to_string()),
25 task_group_id: None,
26 scope_id: None,
27 values: BTreeMap::new(),
28 }
29 }
30
31 pub fn child_task(
32 &self,
33 task_id: impl Into<String>,
34 task_name: impl Into<String>,
35 task_group_id: Option<String>,
36 ) -> Self {
37 Self {
38 task_id: task_id.into(),
39 parent_task_id: Some(self.task_id.clone()),
40 root_task_id: self.root_task_id.clone(),
41 task_name: Some(task_name.into()),
42 task_group_id,
43 scope_id: self.scope_id.clone(),
44 values: self.values.clone(),
45 }
46 }
47}
48
49impl Default for RuntimeContext {
50 fn default() -> Self {
51 Self::root()
52 }
53}
54
55#[derive(Clone, Debug, Default)]
56pub struct RuntimeContextOverlay {
57 pub workflow_id: Option<String>,
58 pub run_id: Option<String>,
59 pub stage_id: Option<String>,
60 pub worker_id: Option<String>,
61}
62
63thread_local! {
64 static RUNTIME_CONTEXT_OVERLAY_STACK: RefCell<Vec<RuntimeContextOverlay>> =
65 const { RefCell::new(Vec::new()) };
66}
67
68pub struct RuntimeContextOverlayGuard;
69
70pub fn install_runtime_context_overlay(
71 overlay: RuntimeContextOverlay,
72) -> RuntimeContextOverlayGuard {
73 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| stack.borrow_mut().push(overlay));
74 RuntimeContextOverlayGuard
75}
76
77impl Drop for RuntimeContextOverlayGuard {
78 fn drop(&mut self) {
79 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
80 stack.borrow_mut().pop();
81 });
82 }
83}
84
85fn current_overlay() -> RuntimeContextOverlay {
86 RUNTIME_CONTEXT_OVERLAY_STACK.with(|stack| {
87 let mut merged = RuntimeContextOverlay::default();
88 for overlay in stack.borrow().iter() {
89 if overlay.workflow_id.is_some() {
90 merged.workflow_id = overlay.workflow_id.clone();
91 }
92 if overlay.run_id.is_some() {
93 merged.run_id = overlay.run_id.clone();
94 }
95 if overlay.stage_id.is_some() {
96 merged.stage_id = overlay.stage_id.clone();
97 }
98 if overlay.worker_id.is_some() {
99 merged.worker_id = overlay.worker_id.clone();
100 }
101 }
102 merged
103 })
104}
105
106pub fn register_runtime_context_builtins(vm: &mut crate::vm::Vm) {
107 for name in [
108 "runtime_context",
109 "task_current",
110 "runtime_context_values",
111 "runtime_context_get",
112 "runtime_context_set",
113 "runtime_context_clear",
114 ] {
115 vm.register_builtin(name, move |_args, _out| {
116 Err(VmError::Runtime(format!(
117 "{name}: internal runtime context builtin was not intercepted"
118 )))
119 });
120 }
121}
122
123pub(crate) fn dispatch_runtime_context_builtin(
124 vm: &mut crate::vm::Vm,
125 name: &str,
126 args: &[VmValue],
127) -> Option<Result<VmValue, VmError>> {
128 match name {
129 "runtime_context" | "task_current" => Some(Ok(runtime_context_value(vm))),
130 "runtime_context_values" => Some(Ok(VmValue::Dict(Rc::new(
131 vm.runtime_context.values.clone(),
132 )))),
133 "runtime_context_get" => Some(runtime_context_get(vm, args)),
134 "runtime_context_set" => Some(runtime_context_set(vm, args)),
135 "runtime_context_clear" => Some(runtime_context_clear(vm, args)),
136 _ => None,
137 }
138}
139
140fn runtime_context_get(vm: &crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
141 let key = require_key(args, "runtime_context_get")?;
142 Ok(vm
143 .runtime_context
144 .values
145 .get(&key)
146 .cloned()
147 .or_else(|| args.get(1).cloned())
148 .unwrap_or(VmValue::Nil))
149}
150
151fn runtime_context_set(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
152 let key = require_key(args, "runtime_context_set")?;
153 let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
154 Ok(vm
155 .runtime_context
156 .values
157 .insert(key, value)
158 .unwrap_or(VmValue::Nil))
159}
160
161fn runtime_context_clear(vm: &mut crate::vm::Vm, args: &[VmValue]) -> Result<VmValue, VmError> {
162 let key = require_key(args, "runtime_context_clear")?;
163 Ok(vm
164 .runtime_context
165 .values
166 .remove(&key)
167 .unwrap_or(VmValue::Nil))
168}
169
170fn require_key(args: &[VmValue], builtin: &str) -> Result<String, VmError> {
171 match args.first() {
172 Some(VmValue::String(value)) => Ok(value.to_string()),
173 _ => Err(VmError::Runtime(format!(
174 "{builtin}: first argument must be a string key"
175 ))),
176 }
177}
178
179pub(crate) fn runtime_context_value(vm: &crate::vm::Vm) -> VmValue {
180 let overlay = current_overlay();
181 let mutation = crate::orchestration::current_mutation_session();
182 let dispatch = crate::triggers::dispatcher::current_dispatch_context();
183 let trace_context = crate::stdlib::tracing::current_trace_context();
184 let agent_session_id = crate::agent_sessions::current_session_id();
185 let agent_ancestry = agent_session_id
186 .as_deref()
187 .and_then(crate::agent_sessions::ancestry);
188 let cancelled = vm
189 .cancel_token
190 .as_ref()
191 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
192
193 let workflow_id = overlay.workflow_id;
194 let run_id = overlay
195 .run_id
196 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()));
197 let stage_id = overlay.stage_id;
198 let worker_id = overlay.worker_id.or_else(|| {
199 mutation
200 .as_ref()
201 .and_then(|session| session.worker_id.clone())
202 });
203
204 let mut values = BTreeMap::new();
205 insert_string(
206 &mut values,
207 "task_id",
208 Some(vm.runtime_context.task_id.clone()),
209 );
210 insert_string(
211 &mut values,
212 "parent_task_id",
213 vm.runtime_context.parent_task_id.clone(),
214 );
215 insert_string(
216 &mut values,
217 "root_task_id",
218 Some(vm.runtime_context.root_task_id.clone()),
219 );
220 insert_string(
221 &mut values,
222 "task_name",
223 vm.runtime_context.task_name.clone(),
224 );
225 insert_string(
226 &mut values,
227 "task_group_id",
228 vm.runtime_context.task_group_id.clone(),
229 );
230 insert_string(&mut values, "scope_id", vm.runtime_context.scope_id.clone());
231 insert_string(&mut values, "workflow_id", workflow_id);
232 insert_string(&mut values, "run_id", run_id);
233 insert_string(&mut values, "stage_id", stage_id);
234 insert_string(&mut values, "worker_id", worker_id);
235 insert_string(&mut values, "agent_session_id", agent_session_id.clone());
236 insert_string(
237 &mut values,
238 "parent_agent_session_id",
239 agent_ancestry
240 .as_ref()
241 .and_then(|ancestry| ancestry.parent_id.clone()),
242 );
243 insert_string(
244 &mut values,
245 "root_agent_session_id",
246 agent_ancestry
247 .as_ref()
248 .map(|ancestry| ancestry.root_id.clone()),
249 );
250 insert_string(&mut values, "agent_name", None);
251
252 if let Some(context) = dispatch {
253 insert_string(&mut values, "trigger_id", Some(context.binding_id.clone()));
254 insert_string(
255 &mut values,
256 "trigger_event_id",
257 Some(context.trigger_event.id.0.clone()),
258 );
259 insert_string(
260 &mut values,
261 "binding_key",
262 Some(format!(
263 "{}@{}",
264 context.binding_id, context.binding_version
265 )),
266 );
267 insert_string(
268 &mut values,
269 "tenant_id",
270 context.trigger_event.tenant_id.map(|tenant| tenant.0),
271 );
272 insert_string(
273 &mut values,
274 "provider",
275 Some(context.trigger_event.provider.0),
276 );
277 insert_string(
278 &mut values,
279 "trace_id",
280 Some(context.trigger_event.trace_id.0),
281 );
282 } else {
283 insert_string(&mut values, "trigger_id", None);
284 insert_string(&mut values, "trigger_event_id", None);
285 insert_string(&mut values, "binding_key", None);
286 insert_string(&mut values, "tenant_id", None);
287 insert_string(&mut values, "provider", None);
288 insert_string(
289 &mut values,
290 "trace_id",
291 trace_context.as_ref().map(|context| context.0.clone()),
292 );
293 }
294
295 insert_string(
296 &mut values,
297 "span_id",
298 trace_context
299 .as_ref()
300 .map(|context| context.1.clone())
301 .or_else(|| crate::tracing::current_span_id().map(|id| id.to_string())),
302 );
303 insert_string(&mut values, "scheduler_key", None);
304 insert_string(&mut values, "runner", None);
305 insert_string(&mut values, "capacity_class", None);
306 values.insert(
307 "context_values".to_string(),
308 VmValue::Dict(Rc::new(vm.runtime_context.values.clone())),
309 );
310 values.insert("cancelled".to_string(), VmValue::Bool(cancelled));
311 values.insert("debug".to_string(), debug_context_value(vm, cancelled));
312 VmValue::Dict(Rc::new(values))
313}
314
315fn debug_context_value(vm: &crate::vm::Vm, cancelled: bool) -> VmValue {
316 let mut debug = BTreeMap::new();
317 debug.insert("cancelled".to_string(), VmValue::Bool(cancelled));
318 debug.insert("waiting_reason".to_string(), VmValue::Nil);
319 debug.insert(
320 "active_task_ids".to_string(),
321 VmValue::List(Rc::new(
322 vm.spawned_tasks
323 .keys()
324 .map(|id| VmValue::String(Rc::from(id.as_str())))
325 .collect(),
326 )),
327 );
328 debug.insert(
329 "held_synchronization".to_string(),
330 VmValue::List(Rc::new(Vec::new())),
331 );
332 VmValue::Dict(Rc::new(debug))
333}
334
335fn insert_string(values: &mut BTreeMap<String, VmValue>, key: &str, value: Option<String>) {
336 values.insert(
337 key.to_string(),
338 value
339 .map(|value| VmValue::String(Rc::from(value)))
340 .unwrap_or(VmValue::Nil),
341 );
342}