Skip to main content

rustvello_core/
context.rs

1//! Execution context for tasks running inside a rustvello runner.
2//!
3//! Provides `tokio::task_local!` context so that a running task can discover
4//! its own invocation identity, workflow membership, and the runner that
5//! is executing it.  This mirrors pynenc's `context.py` module.
6//!
7//! # Usage
8//!
9//! The [`TaskRunner`] sets both contexts before executing a task:
10//!
11//! ```rust,ignore
12//! use rustvello_core::context::*;
13//!
14//! INVOCATION_CTX.scope(inv_ctx, RUNNER_CTX.scope(run_ctx, async {
15//!     // inside here, get_invocation_context() returns Some(...)
16//!     let ctx = get_invocation_context().unwrap();
17//! })).await;
18//! ```
19//!
20//! When a task calls `app.call()` inside its body, the app layer reads
21//! the current `InvocationContext` to determine parent/workflow inheritance.
22
23use std::sync::Arc;
24
25use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
26use rustvello_proto::invocation::WorkflowIdentity;
27use serde::{Deserialize, Serialize};
28
29/// Get a numeric thread identifier. Uses `ThreadId`'s debug representation
30/// since `as_u64()` is nightly-only (`thread_id_value` feature).
31/// If the `Debug` format changes in future Rust releases, falls back to 0.
32fn current_thread_id() -> u64 {
33    let id = std::thread::current().id();
34    let debug = format!("{id:?}");
35    // ThreadId debug format is "ThreadId(N)" — extract N.
36    // This has been stable since Rust 1.0 and is unlikely to change,
37    // but we fall back to 0 if parsing fails.
38    debug
39        .trim_start_matches("ThreadId(")
40        .trim_end_matches(')')
41        .parse()
42        .unwrap_or(0)
43}
44
45// ---------------------------------------------------------------------------
46// InvocationContext — set per-invocation by the runner
47// ---------------------------------------------------------------------------
48
49/// Context for the currently executing invocation.
50///
51/// Stored in a `tokio::task_local!` variable so any code running inside the
52/// task's future can retrieve it without passing references through the
53/// call stack.
54#[derive(Debug, Clone)]
55pub struct InvocationContext {
56    /// The invocation being executed.
57    pub invocation_id: InvocationId,
58    /// The task that is being executed.
59    pub task_id: TaskId,
60    /// The workflow this invocation belongs to.
61    pub workflow: WorkflowIdentity,
62    /// The parent invocation that spawned this one (None for top-level).
63    pub parent_invocation_id: Option<InvocationId>,
64    /// The current retry attempt number (0 for first attempt).
65    pub num_retries: u32,
66}
67
68// ---------------------------------------------------------------------------
69// RunnerContext — set per-runner by the runner's main loop
70// ---------------------------------------------------------------------------
71
72/// Context identifying the runner that is executing the current task.
73///
74/// Mirrors pynenc's `RunnerContext` with hierarchical parent chain,
75/// process/host metadata, and JSON serialization for monitoring.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct RunnerContext {
78    /// The runner's unique identifier.
79    pub runner_id: RunnerId,
80    /// The class/type name of the runner (e.g. "PersistentTokioRunner", "ExternalRunner").
81    ///
82    /// Set at runner creation time so monitoring and recovery can distinguish
83    /// runner types without introspecting the `app_id`.
84    pub runner_cls: Arc<str>,
85    /// The application identifier (shared via `Arc` to avoid per-invocation clones).
86    #[serde(
87        serialize_with = "serialize_arc_str",
88        deserialize_with = "deserialize_arc_str"
89    )]
90    pub app_id: Arc<str>,
91    /// Process ID of the runner.
92    pub pid: u32,
93    /// Hostname where the runner is executing.
94    pub hostname: String,
95    /// Thread ID (or tokio task ID) of the current execution.
96    pub thread_id: u64,
97    /// Optional parent context (for hierarchical runner relationships).
98    pub parent_ctx: Option<Box<RunnerContext>>,
99}
100
101fn serialize_arc_str<S: serde::Serializer>(v: &Arc<str>, s: S) -> Result<S::Ok, S::Error> {
102    s.serialize_str(v)
103}
104
105fn deserialize_arc_str<'de, D: serde::Deserializer<'de>>(d: D) -> Result<Arc<str>, D::Error> {
106    let s = String::deserialize(d)?;
107    Ok(Arc::from(s.as_str()))
108}
109
110impl RunnerContext {
111    /// Create a new `RunnerContext` capturing current process/host metadata.
112    pub fn new(runner_id: RunnerId, app_id: Arc<str>, runner_cls: impl Into<Arc<str>>) -> Self {
113        Self {
114            runner_id,
115            runner_cls: runner_cls.into(),
116            app_id,
117            pid: std::process::id(),
118            hostname: Self::get_hostname(),
119            thread_id: current_thread_id(),
120            parent_ctx: None,
121        }
122    }
123
124    /// Create a child context with this context as the parent.
125    ///
126    /// The child inherits the parent's `runner_cls` — use this for worker tasks
127    /// that run under the same runner type as the parent.
128    pub fn new_child(&self, runner_id: RunnerId) -> Self {
129        Self {
130            runner_id,
131            runner_cls: Arc::clone(&self.runner_cls),
132            app_id: Arc::clone(&self.app_id),
133            pid: std::process::id(),
134            hostname: self.hostname.clone(),
135            thread_id: current_thread_id(),
136            parent_ctx: Some(Box::new(self.clone())),
137        }
138    }
139
140    /// Get the root runner_id by traversing up the parent chain.
141    pub fn root_runner_id(&self) -> &RunnerId {
142        match &self.parent_ctx {
143            Some(parent) => parent.root_runner_id(),
144            None => &self.runner_id,
145        }
146    }
147
148    /// Create an external runner context (hostname-pid identity).
149    ///
150    /// Used when code runs outside any runner (scripts, CLI, tests).
151    /// Matches pynenc's `ExternalRunner.get_default_external_runner_context()`.
152    pub fn external() -> Self {
153        let hostname = Self::get_hostname();
154        let pid = std::process::id();
155        let runner_id = RunnerId::from_string(format!("{hostname}-{pid}"));
156        Self {
157            runner_id,
158            runner_cls: Arc::from("ExternalRunner"),
159            app_id: Arc::from("external"),
160            pid,
161            hostname,
162            thread_id: current_thread_id(),
163            parent_ctx: None,
164        }
165    }
166
167    pub(crate) fn get_hostname() -> String {
168        hostname::get().map_or_else(
169            |_| "unknown".to_string(),
170            |h| h.to_string_lossy().into_owned(),
171        )
172    }
173}
174
175// ---------------------------------------------------------------------------
176// task_local storage
177// ---------------------------------------------------------------------------
178
179tokio::task_local! {
180    /// The invocation context for the currently running task.
181    pub static INVOCATION_CTX: InvocationContext;
182    /// The runner context for the current execution environment.
183    pub static RUNNER_CTX: RunnerContext;
184}
185
186// ---------------------------------------------------------------------------
187// Thread-local fallbacks for spawn_blocking / rayon
188// ---------------------------------------------------------------------------
189
190// `tokio::task_local!` does NOT cross `spawn_blocking` boundaries.
191// To ensure child-task submissions from blocking tasks still capture the
192// parent worker's runner identity, we mirror pynenc's `threading.local()`
193// approach: a `std::thread_local!` that is set by each runner before
194// entering `spawn_blocking` and is cleared afterwards.
195std::thread_local! {
196    static THREAD_RUNNER_CTX: std::cell::RefCell<Option<RunnerContext>> =
197        const { std::cell::RefCell::new(None) };
198    static THREAD_INVOCATION_CTX: std::cell::RefCell<Option<InvocationContext>> =
199        const { std::cell::RefCell::new(None) };
200}
201
202/// Set the thread-local runner context (for use before `spawn_blocking`).
203pub fn set_thread_runner_context(ctx: RunnerContext) {
204    THREAD_RUNNER_CTX.with(|cell| {
205        *cell.borrow_mut() = Some(ctx);
206    });
207}
208
209/// Clear the thread-local runner context.
210pub fn clear_thread_runner_context() {
211    THREAD_RUNNER_CTX.with(|cell| {
212        *cell.borrow_mut() = None;
213    });
214}
215
216/// Set the thread-local invocation context (for use before `spawn_blocking`).
217pub fn set_thread_invocation_context(ctx: InvocationContext) {
218    THREAD_INVOCATION_CTX.with(|cell| {
219        *cell.borrow_mut() = Some(ctx);
220    });
221}
222
223/// Clear the thread-local invocation context.
224pub fn clear_thread_invocation_context() {
225    THREAD_INVOCATION_CTX.with(|cell| {
226        *cell.borrow_mut() = None;
227    });
228}
229
230/// Get the current invocation context, if running inside a task.
231///
232/// Resolution order:
233/// 1. tokio `INVOCATION_CTX` task_local (async task execution path)
234/// 2. `std::thread_local` fallback (spawn_blocking / rayon path)
235///
236/// Returns `None` when called outside a runner-managed task execution
237/// (e.g. from a test or from top-level application code).
238pub fn get_invocation_context() -> Option<InvocationContext> {
239    // 1. Try tokio task-local
240    if let Ok(ctx) = INVOCATION_CTX.try_with(Clone::clone) {
241        return Some(ctx);
242    }
243    // 2. Try thread-local fallback (spawn_blocking / rayon)
244    THREAD_INVOCATION_CTX.with(|cell| cell.borrow().clone())
245}
246
247/// Access the current invocation context by reference, avoiding a clone
248/// when the tokio task-local is available.
249///
250/// Returns `None` when called outside a runner-managed task execution.
251pub fn with_invocation_context<F, R>(f: F) -> Option<R>
252where
253    F: FnOnce(&InvocationContext) -> R,
254{
255    get_invocation_context().as_ref().map(f)
256}
257
258/// Get the current runner context, if set.
259pub fn get_runner_context() -> Option<RunnerContext> {
260    RUNNER_CTX.try_with(Clone::clone).ok()
261}
262
263/// Access the current runner context by reference, avoiding a clone.
264pub fn with_runner_context<F, R>(f: F) -> Option<R>
265where
266    F: FnOnce(&RunnerContext) -> R,
267{
268    RUNNER_CTX.try_with(f).ok()
269}
270
271/// Get the runner ID for the current execution context.
272///
273/// Mirrors pynenc's `get_or_create_runner_context()` — **never returns None**.
274///
275/// Resolution order:
276/// 1. tokio `RUNNER_CTX` task_local (set by runner during async task execution)
277/// 2. `std::thread_local` fallback (set for `spawn_blocking` tasks)
278/// 3. External runner identity: `"{hostname}-{pid}"` (matches pynenc's `ExternalRunner`)
279pub fn get_or_create_runner_id() -> RunnerId {
280    // 1. Try tokio task-local (async task execution path)
281    if let Some(rid) = with_runner_context(|ctx| ctx.runner_id.clone()) {
282        return rid;
283    }
284
285    // 2. Try thread-local fallback (spawn_blocking path)
286    if let Some(rid) =
287        THREAD_RUNNER_CTX.with(|cell| cell.borrow().as_ref().map(|ctx| ctx.runner_id.clone()))
288    {
289        return rid;
290    }
291
292    // 3. External runner identity (top-level submission from non-runner code)
293    external_runner_id()
294}
295
296/// Get the full runner context for the current execution.
297///
298/// Same resolution as [`get_or_create_runner_id`] but returns the full context.
299pub fn get_or_create_runner_context() -> RunnerContext {
300    // 1. Try tokio task-local
301    if let Ok(ctx) = RUNNER_CTX.try_with(Clone::clone) {
302        return ctx;
303    }
304
305    // 2. Try thread-local fallback
306    if let Some(ctx) = THREAD_RUNNER_CTX.with(|cell| cell.borrow().clone()) {
307        return ctx;
308    }
309
310    // 3. External runner context
311    RunnerContext::external()
312}
313
314/// Build a stable external runner ID: `"{hostname}-{pid}"`.
315///
316/// Matches pynenc's `ExternalRunner` which uses hostname-pid since external
317/// processes are not managed by the framework.
318fn external_runner_id() -> RunnerId {
319    let hostname = RunnerContext::get_hostname();
320    let pid = std::process::id();
321    RunnerId::from_string(format!("{hostname}-{pid}"))
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    fn sample_invocation_ctx() -> InvocationContext {
329        let inv_id = InvocationId::from_string("inv-1");
330        let task_id = TaskId::new("mod", "my_task");
331        InvocationContext {
332            invocation_id: inv_id.clone(),
333            task_id: task_id.clone(),
334            workflow: WorkflowIdentity::root(inv_id, task_id),
335            parent_invocation_id: None,
336            num_retries: 0,
337        }
338    }
339
340    fn sample_runner_ctx() -> RunnerContext {
341        RunnerContext::new(
342            RunnerId::from_string("runner-1"),
343            Arc::from("test-app"),
344            "TestRunner",
345        )
346    }
347
348    #[tokio::test]
349    async fn context_not_set_outside_scope() {
350        assert!(get_invocation_context().is_none());
351        assert!(get_runner_context().is_none());
352    }
353
354    #[tokio::test]
355    async fn invocation_context_set_get() {
356        let ctx = sample_invocation_ctx();
357        INVOCATION_CTX
358            .scope(ctx.clone(), async {
359                let got = get_invocation_context().unwrap();
360                assert_eq!(got.invocation_id, ctx.invocation_id);
361                assert_eq!(got.task_id, ctx.task_id);
362                assert!(got.parent_invocation_id.is_none());
363            })
364            .await;
365    }
366
367    #[tokio::test]
368    async fn runner_context_set_get() {
369        let ctx = sample_runner_ctx();
370        RUNNER_CTX
371            .scope(ctx, async {
372                let got = get_runner_context().unwrap();
373                assert_eq!(got.runner_id, RunnerId::from_string("runner-1"));
374                assert_eq!(&*got.app_id, "test-app");
375            })
376            .await;
377    }
378
379    #[tokio::test]
380    async fn nested_invocation_scopes() {
381        let outer = sample_invocation_ctx();
382        let inner = InvocationContext {
383            invocation_id: InvocationId::from_string("inv-inner"),
384            task_id: TaskId::new("mod", "inner_task"),
385            workflow: outer.workflow.clone(),
386            parent_invocation_id: Some(outer.invocation_id.clone()),
387            num_retries: 0,
388        };
389
390        INVOCATION_CTX
391            .scope(outer.clone(), async {
392                // Outer context visible
393                assert_eq!(
394                    get_invocation_context().unwrap().invocation_id.as_str(),
395                    "inv-1"
396                );
397
398                // Inner scope overrides
399                INVOCATION_CTX
400                    .scope(inner, async {
401                        let ctx = get_invocation_context().unwrap();
402                        assert_eq!(ctx.invocation_id.as_str(), "inv-inner");
403                        assert_eq!(ctx.parent_invocation_id.as_ref().unwrap().as_str(), "inv-1");
404                    })
405                    .await;
406
407                // Outer context restored
408                assert_eq!(
409                    get_invocation_context().unwrap().invocation_id.as_str(),
410                    "inv-1"
411                );
412            })
413            .await;
414    }
415
416    #[tokio::test]
417    async fn both_contexts_together() {
418        let inv_ctx = sample_invocation_ctx();
419        let run_ctx = sample_runner_ctx();
420
421        INVOCATION_CTX
422            .scope(
423                inv_ctx,
424                RUNNER_CTX.scope(run_ctx, async {
425                    assert!(get_invocation_context().is_some());
426                    assert!(get_runner_context().is_some());
427                }),
428            )
429            .await;
430
431        // Outside both scopes
432        assert!(get_invocation_context().is_none());
433        assert!(get_runner_context().is_none());
434    }
435}