rustvello_core/context.rs
1//! Execution context for tasks running inside a rustvello runner.
2//!
3//! Provides `tokio::task_local!` context so that a running task can discover
4//! its own invocation identity, workflow membership, and the runner that
5//! is executing it. This mirrors pynenc's `context.py` module.
6//!
7//! # Usage
8//!
9//! The [`TaskRunner`] sets both contexts before executing a task:
10//!
11//! ```rust,ignore
12//! use rustvello_core::context::*;
13//!
14//! INVOCATION_CTX.scope(inv_ctx, RUNNER_CTX.scope(run_ctx, async {
15//! // inside here, get_invocation_context() returns Some(...)
16//! let ctx = get_invocation_context().unwrap();
17//! })).await;
18//! ```
19//!
20//! When a task calls `app.call()` inside its body, the app layer reads
21//! the current `InvocationContext` to determine parent/workflow inheritance.
22
23use std::sync::Arc;
24
25use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
26use rustvello_proto::invocation::WorkflowIdentity;
27use serde::{Deserialize, Serialize};
28
29/// Get a numeric thread identifier. Uses `ThreadId`'s debug representation
30/// since `as_u64()` is nightly-only (`thread_id_value` feature).
31/// If the `Debug` format changes in future Rust releases, falls back to 0.
32fn current_thread_id() -> u64 {
33 let id = std::thread::current().id();
34 let debug = format!("{id:?}");
35 // ThreadId debug format is "ThreadId(N)" — extract N.
36 // This has been stable since Rust 1.0 and is unlikely to change,
37 // but we fall back to 0 if parsing fails.
38 debug
39 .trim_start_matches("ThreadId(")
40 .trim_end_matches(')')
41 .parse()
42 .unwrap_or(0)
43}
44
45// ---------------------------------------------------------------------------
46// InvocationContext — set per-invocation by the runner
47// ---------------------------------------------------------------------------
48
49/// Context for the currently executing invocation.
50///
51/// Stored in a `tokio::task_local!` variable so any code running inside the
52/// task's future can retrieve it without passing references through the
53/// call stack.
54#[derive(Debug, Clone)]
55pub struct InvocationContext {
56 /// The invocation being executed.
57 pub invocation_id: InvocationId,
58 /// The task that is being executed.
59 pub task_id: TaskId,
60 /// The workflow this invocation belongs to.
61 pub workflow: WorkflowIdentity,
62 /// The parent invocation that spawned this one (None for top-level).
63 pub parent_invocation_id: Option<InvocationId>,
64 /// The current retry attempt number (0 for first attempt).
65 pub num_retries: u32,
66}
67
68// ---------------------------------------------------------------------------
69// RunnerContext — set per-runner by the runner's main loop
70// ---------------------------------------------------------------------------
71
72/// Context identifying the runner that is executing the current task.
73///
74/// Mirrors pynenc's `RunnerContext` with hierarchical parent chain,
75/// process/host metadata, and JSON serialization for monitoring.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct RunnerContext {
78 /// The runner's unique identifier.
79 pub runner_id: RunnerId,
80 /// The class/type name of the runner (e.g. "PersistentTokioRunner", "ExternalRunner").
81 ///
82 /// Set at runner creation time so monitoring and recovery can distinguish
83 /// runner types without introspecting the `app_id`.
84 pub runner_cls: Arc<str>,
85 /// The application identifier (shared via `Arc` to avoid per-invocation clones).
86 #[serde(
87 serialize_with = "serialize_arc_str",
88 deserialize_with = "deserialize_arc_str"
89 )]
90 pub app_id: Arc<str>,
91 /// Process ID of the runner.
92 pub pid: u32,
93 /// Hostname where the runner is executing.
94 pub hostname: String,
95 /// Thread ID (or tokio task ID) of the current execution.
96 pub thread_id: u64,
97 /// Optional parent context (for hierarchical runner relationships).
98 pub parent_ctx: Option<Box<RunnerContext>>,
99}
100
101fn serialize_arc_str<S: serde::Serializer>(v: &Arc<str>, s: S) -> Result<S::Ok, S::Error> {
102 s.serialize_str(v)
103}
104
105fn deserialize_arc_str<'de, D: serde::Deserializer<'de>>(d: D) -> Result<Arc<str>, D::Error> {
106 let s = String::deserialize(d)?;
107 Ok(Arc::from(s.as_str()))
108}
109
110impl RunnerContext {
111 /// Create a new `RunnerContext` capturing current process/host metadata.
112 pub fn new(runner_id: RunnerId, app_id: Arc<str>, runner_cls: impl Into<Arc<str>>) -> Self {
113 Self {
114 runner_id,
115 runner_cls: runner_cls.into(),
116 app_id,
117 pid: std::process::id(),
118 hostname: Self::get_hostname(),
119 thread_id: current_thread_id(),
120 parent_ctx: None,
121 }
122 }
123
124 /// Create a child context with this context as the parent.
125 ///
126 /// The child inherits the parent's `runner_cls` — use this for worker tasks
127 /// that run under the same runner type as the parent.
128 pub fn new_child(&self, runner_id: RunnerId) -> Self {
129 Self {
130 runner_id,
131 runner_cls: Arc::clone(&self.runner_cls),
132 app_id: Arc::clone(&self.app_id),
133 pid: std::process::id(),
134 hostname: self.hostname.clone(),
135 thread_id: current_thread_id(),
136 parent_ctx: Some(Box::new(self.clone())),
137 }
138 }
139
140 /// Get the root runner_id by traversing up the parent chain.
141 pub fn root_runner_id(&self) -> &RunnerId {
142 match &self.parent_ctx {
143 Some(parent) => parent.root_runner_id(),
144 None => &self.runner_id,
145 }
146 }
147
148 /// Create an external runner context (hostname-pid identity).
149 ///
150 /// Used when code runs outside any runner (scripts, CLI, tests).
151 /// Matches pynenc's `ExternalRunner.get_default_external_runner_context()`.
152 pub fn external() -> Self {
153 let hostname = Self::get_hostname();
154 let pid = std::process::id();
155 let runner_id = RunnerId::from_string(format!("{hostname}-{pid}"));
156 Self {
157 runner_id,
158 runner_cls: Arc::from("ExternalRunner"),
159 app_id: Arc::from("external"),
160 pid,
161 hostname,
162 thread_id: current_thread_id(),
163 parent_ctx: None,
164 }
165 }
166
167 pub(crate) fn get_hostname() -> String {
168 hostname::get().map_or_else(
169 |_| "unknown".to_string(),
170 |h| h.to_string_lossy().into_owned(),
171 )
172 }
173}
174
175// ---------------------------------------------------------------------------
176// task_local storage
177// ---------------------------------------------------------------------------
178
179tokio::task_local! {
180 /// The invocation context for the currently running task.
181 pub static INVOCATION_CTX: InvocationContext;
182 /// The runner context for the current execution environment.
183 pub static RUNNER_CTX: RunnerContext;
184}
185
186// ---------------------------------------------------------------------------
187// Thread-local fallbacks for spawn_blocking / rayon
188// ---------------------------------------------------------------------------
189
190// `tokio::task_local!` does NOT cross `spawn_blocking` boundaries.
191// To ensure child-task submissions from blocking tasks still capture the
192// parent worker's runner identity, we mirror pynenc's `threading.local()`
193// approach: a `std::thread_local!` that is set by each runner before
194// entering `spawn_blocking` and is cleared afterwards.
195std::thread_local! {
196 static THREAD_RUNNER_CTX: std::cell::RefCell<Option<RunnerContext>> =
197 const { std::cell::RefCell::new(None) };
198 static THREAD_INVOCATION_CTX: std::cell::RefCell<Option<InvocationContext>> =
199 const { std::cell::RefCell::new(None) };
200}
201
202/// Set the thread-local runner context (for use before `spawn_blocking`).
203pub fn set_thread_runner_context(ctx: RunnerContext) {
204 THREAD_RUNNER_CTX.with(|cell| {
205 *cell.borrow_mut() = Some(ctx);
206 });
207}
208
209/// Clear the thread-local runner context.
210pub fn clear_thread_runner_context() {
211 THREAD_RUNNER_CTX.with(|cell| {
212 *cell.borrow_mut() = None;
213 });
214}
215
216/// Set the thread-local invocation context (for use before `spawn_blocking`).
217pub fn set_thread_invocation_context(ctx: InvocationContext) {
218 THREAD_INVOCATION_CTX.with(|cell| {
219 *cell.borrow_mut() = Some(ctx);
220 });
221}
222
223/// Clear the thread-local invocation context.
224pub fn clear_thread_invocation_context() {
225 THREAD_INVOCATION_CTX.with(|cell| {
226 *cell.borrow_mut() = None;
227 });
228}
229
230/// Get the current invocation context, if running inside a task.
231///
232/// Resolution order:
233/// 1. tokio `INVOCATION_CTX` task_local (async task execution path)
234/// 2. `std::thread_local` fallback (spawn_blocking / rayon path)
235///
236/// Returns `None` when called outside a runner-managed task execution
237/// (e.g. from a test or from top-level application code).
238pub fn get_invocation_context() -> Option<InvocationContext> {
239 // 1. Try tokio task-local
240 if let Ok(ctx) = INVOCATION_CTX.try_with(Clone::clone) {
241 return Some(ctx);
242 }
243 // 2. Try thread-local fallback (spawn_blocking / rayon)
244 THREAD_INVOCATION_CTX.with(|cell| cell.borrow().clone())
245}
246
247/// Access the current invocation context by reference, avoiding a clone
248/// when the tokio task-local is available.
249///
250/// Returns `None` when called outside a runner-managed task execution.
251pub fn with_invocation_context<F, R>(f: F) -> Option<R>
252where
253 F: FnOnce(&InvocationContext) -> R,
254{
255 get_invocation_context().as_ref().map(f)
256}
257
258/// Get the current runner context, if set.
259pub fn get_runner_context() -> Option<RunnerContext> {
260 RUNNER_CTX.try_with(Clone::clone).ok()
261}
262
263/// Access the current runner context by reference, avoiding a clone.
264pub fn with_runner_context<F, R>(f: F) -> Option<R>
265where
266 F: FnOnce(&RunnerContext) -> R,
267{
268 RUNNER_CTX.try_with(f).ok()
269}
270
271/// Get the runner ID for the current execution context.
272///
273/// Mirrors pynenc's `get_or_create_runner_context()` — **never returns None**.
274///
275/// Resolution order:
276/// 1. tokio `RUNNER_CTX` task_local (set by runner during async task execution)
277/// 2. `std::thread_local` fallback (set for `spawn_blocking` tasks)
278/// 3. External runner identity: `"{hostname}-{pid}"` (matches pynenc's `ExternalRunner`)
279pub fn get_or_create_runner_id() -> RunnerId {
280 // 1. Try tokio task-local (async task execution path)
281 if let Some(rid) = with_runner_context(|ctx| ctx.runner_id.clone()) {
282 return rid;
283 }
284
285 // 2. Try thread-local fallback (spawn_blocking path)
286 if let Some(rid) =
287 THREAD_RUNNER_CTX.with(|cell| cell.borrow().as_ref().map(|ctx| ctx.runner_id.clone()))
288 {
289 return rid;
290 }
291
292 // 3. External runner identity (top-level submission from non-runner code)
293 external_runner_id()
294}
295
296/// Get the full runner context for the current execution.
297///
298/// Same resolution as [`get_or_create_runner_id`] but returns the full context.
299pub fn get_or_create_runner_context() -> RunnerContext {
300 // 1. Try tokio task-local
301 if let Ok(ctx) = RUNNER_CTX.try_with(Clone::clone) {
302 return ctx;
303 }
304
305 // 2. Try thread-local fallback
306 if let Some(ctx) = THREAD_RUNNER_CTX.with(|cell| cell.borrow().clone()) {
307 return ctx;
308 }
309
310 // 3. External runner context
311 RunnerContext::external()
312}
313
314/// Build a stable external runner ID: `"{hostname}-{pid}"`.
315///
316/// Matches pynenc's `ExternalRunner` which uses hostname-pid since external
317/// processes are not managed by the framework.
318fn external_runner_id() -> RunnerId {
319 let hostname = RunnerContext::get_hostname();
320 let pid = std::process::id();
321 RunnerId::from_string(format!("{hostname}-{pid}"))
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 fn sample_invocation_ctx() -> InvocationContext {
329 let inv_id = InvocationId::from_string("inv-1");
330 let task_id = TaskId::new("mod", "my_task");
331 InvocationContext {
332 invocation_id: inv_id.clone(),
333 task_id: task_id.clone(),
334 workflow: WorkflowIdentity::root(inv_id, task_id),
335 parent_invocation_id: None,
336 num_retries: 0,
337 }
338 }
339
340 fn sample_runner_ctx() -> RunnerContext {
341 RunnerContext::new(
342 RunnerId::from_string("runner-1"),
343 Arc::from("test-app"),
344 "TestRunner",
345 )
346 }
347
348 #[tokio::test]
349 async fn context_not_set_outside_scope() {
350 assert!(get_invocation_context().is_none());
351 assert!(get_runner_context().is_none());
352 }
353
354 #[tokio::test]
355 async fn invocation_context_set_get() {
356 let ctx = sample_invocation_ctx();
357 INVOCATION_CTX
358 .scope(ctx.clone(), async {
359 let got = get_invocation_context().unwrap();
360 assert_eq!(got.invocation_id, ctx.invocation_id);
361 assert_eq!(got.task_id, ctx.task_id);
362 assert!(got.parent_invocation_id.is_none());
363 })
364 .await;
365 }
366
367 #[tokio::test]
368 async fn runner_context_set_get() {
369 let ctx = sample_runner_ctx();
370 RUNNER_CTX
371 .scope(ctx, async {
372 let got = get_runner_context().unwrap();
373 assert_eq!(got.runner_id, RunnerId::from_string("runner-1"));
374 assert_eq!(&*got.app_id, "test-app");
375 })
376 .await;
377 }
378
379 #[tokio::test]
380 async fn nested_invocation_scopes() {
381 let outer = sample_invocation_ctx();
382 let inner = InvocationContext {
383 invocation_id: InvocationId::from_string("inv-inner"),
384 task_id: TaskId::new("mod", "inner_task"),
385 workflow: outer.workflow.clone(),
386 parent_invocation_id: Some(outer.invocation_id.clone()),
387 num_retries: 0,
388 };
389
390 INVOCATION_CTX
391 .scope(outer.clone(), async {
392 // Outer context visible
393 assert_eq!(
394 get_invocation_context().unwrap().invocation_id.as_str(),
395 "inv-1"
396 );
397
398 // Inner scope overrides
399 INVOCATION_CTX
400 .scope(inner, async {
401 let ctx = get_invocation_context().unwrap();
402 assert_eq!(ctx.invocation_id.as_str(), "inv-inner");
403 assert_eq!(ctx.parent_invocation_id.as_ref().unwrap().as_str(), "inv-1");
404 })
405 .await;
406
407 // Outer context restored
408 assert_eq!(
409 get_invocation_context().unwrap().invocation_id.as_str(),
410 "inv-1"
411 );
412 })
413 .await;
414 }
415
416 #[tokio::test]
417 async fn both_contexts_together() {
418 let inv_ctx = sample_invocation_ctx();
419 let run_ctx = sample_runner_ctx();
420
421 INVOCATION_CTX
422 .scope(
423 inv_ctx,
424 RUNNER_CTX.scope(run_ctx, async {
425 assert!(get_invocation_context().is_some());
426 assert!(get_runner_context().is_some());
427 }),
428 )
429 .await;
430
431 // Outside both scopes
432 assert!(get_invocation_context().is_none());
433 assert!(get_runner_context().is_none());
434 }
435}