Skip to main content

nemo_flow/api/runtime/
scope_stack.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Scope stack storage and propagation helpers.
5//!
6//! The runtime tracks the current scope hierarchy through a shared
7//! [`ScopeStack`] stored in task-local or thread-local state. Advanced callers
8//! can use this module to inspect the active scope chain, attach scope-local
9//! middleware, or propagate scope context into worker threads.
10
11use std::cell::RefCell;
12use std::sync::{Arc, RwLock};
13
14use uuid::Uuid;
15
16use crate::api::runtime::callbacks::EventSubscriberFn;
17use crate::api::scope::{ScopeHandle, ScopeType};
18use crate::context::registries::ScopeLocalRegistries;
19use crate::error::{FlowError, Result};
20use crate::registry::SortedRegistry;
21
22/// Mutable stack of active scopes plus their scope-local registries.
23///
24/// The stack always contains an implicit root scope. Additional scopes are
25/// pushed as the public API opens lifecycle spans and removed when those spans
26/// close.
27pub struct ScopeStack {
28    stack: Vec<ScopeHandle>,
29    scope_registries: std::collections::HashMap<Uuid, ScopeLocalRegistries>,
30}
31
32impl ScopeStack {
33    /// Create a new scope stack containing only the implicit root scope.
34    ///
35    /// # Returns
36    /// A [`ScopeStack`] initialized with a single root scope and no
37    /// scope-local registries.
38    pub fn new() -> Self {
39        let root = ScopeHandle::builder()
40            .name("root")
41            .scope_type(ScopeType::Agent)
42            .build();
43        Self {
44            stack: vec![root],
45            scope_registries: std::collections::HashMap::new(),
46        }
47    }
48
49    /// Push a scope handle onto the top of the stack.
50    ///
51    /// # Parameters
52    /// - `handle`: Scope handle to make the new top-most active scope.
53    pub fn push(&mut self, handle: ScopeHandle) {
54        self.stack.push(handle);
55    }
56
57    /// Return the current top-most scope handle.
58    ///
59    /// # Returns
60    /// A shared reference to the active scope at the top of the stack.
61    ///
62    /// # Notes
63    /// This function never returns `None` because the implicit root scope is
64    /// always present.
65    pub fn top(&self) -> &ScopeHandle {
66        self.stack
67            .last()
68            .expect("scope stack should never be empty")
69    }
70
71    /// Return the current top-most scope handle mutably.
72    ///
73    /// # Returns
74    /// A mutable reference to the active scope at the top of the stack.
75    pub fn top_mut(&mut self) -> &mut ScopeHandle {
76        self.stack
77            .last_mut()
78            .expect("scope stack should never be empty")
79    }
80
81    /// Return the UUID of the implicit root scope.
82    ///
83    /// # Returns
84    /// The stable UUID of the root scope stored at the bottom of the stack.
85    pub fn root_uuid(&self) -> Uuid {
86        self.stack
87            .first()
88            .expect("scope stack should never be empty")
89            .uuid
90    }
91
92    /// Return the full ordered stack of scope handles.
93    ///
94    /// # Returns
95    /// A slice of scopes ordered from root to the current top-most scope.
96    pub fn scopes(&self) -> &[ScopeHandle] {
97        &self.stack
98    }
99
100    /// Find a scope handle by UUID.
101    ///
102    /// # Parameters
103    /// - `uuid`: UUID of the scope to search for.
104    ///
105    /// # Returns
106    /// `Some(&ScopeHandle)` when the scope is active on this stack and `None`
107    /// otherwise.
108    pub fn find(&self, uuid: &Uuid) -> Option<&ScopeHandle> {
109        self.stack.iter().find(|handle| handle.uuid == *uuid)
110    }
111
112    /// Remove the current top scope if it matches `uuid`.
113    ///
114    /// # Parameters
115    /// - `uuid`: UUID of the scope expected to be at the top of the stack.
116    ///
117    /// # Returns
118    /// A [`Result`] containing the removed [`ScopeHandle`].
119    ///
120    /// # Errors
121    /// Returns [`FlowError::InvalidArgument`] when the scope exists but is not
122    /// the current top of the stack or when the caller attempts to remove the
123    /// implicit root scope. Returns [`FlowError::NotFound`] when the UUID is
124    /// not present on the stack.
125    pub fn remove(&mut self, uuid: &Uuid) -> Result<ScopeHandle> {
126        let top = self
127            .stack
128            .last()
129            .expect("scope stack should never be empty");
130        if top.uuid == *uuid {
131            if self.stack.len() == 1 {
132                return Err(FlowError::InvalidArgument(
133                    "root scope cannot be removed".into(),
134                ));
135            }
136            self.scope_registries.remove(uuid);
137            return Ok(self
138                .stack
139                .pop()
140                .expect("scope stack should contain a removable top scope"));
141        }
142
143        if self.stack.iter().any(|handle| handle.uuid == *uuid) {
144            return Err(FlowError::InvalidArgument(
145                "scope handle is not at the top of the stack".into(),
146            ));
147        }
148
149        Err(FlowError::NotFound("scope handle not found".into()))
150    }
151
152    /// Get or create the scope-local registries for an active scope.
153    ///
154    /// # Parameters
155    /// - `uuid`: UUID of an active scope on this stack.
156    ///
157    /// # Returns
158    /// `Some(&mut ScopeLocalRegistries)` when the scope is active and `None`
159    /// otherwise.
160    ///
161    /// # Notes
162    /// When the scope is active but has no registries yet, this function
163    /// creates an empty scope-local registry set first.
164    pub fn local_registries_mut(&mut self, uuid: &Uuid) -> Option<&mut ScopeLocalRegistries> {
165        if !self.stack.iter().any(|handle| handle.uuid == *uuid) {
166            return None;
167        }
168        Some(self.scope_registries.entry(*uuid).or_default())
169    }
170
171    /// Collect one registry field from every active scope that owns it.
172    ///
173    /// # Parameters
174    /// - `field`: Projection function selecting the registry field to collect
175    ///   from each scope-local registry.
176    ///
177    /// # Returns
178    /// A vector of registry references ordered from root toward the current
179    /// top-most scope.
180    pub fn collect_scope_local_registries<'a, T>(
181        &'a self,
182        field: impl Fn(&'a ScopeLocalRegistries) -> &'a SortedRegistry<T>,
183    ) -> Vec<&'a SortedRegistry<T>> {
184        self.stack
185            .iter()
186            .filter_map(|handle| self.scope_registries.get(&handle.uuid))
187            .map(field)
188            .collect()
189    }
190
191    /// Collect all scope-local subscribers visible from the active stack.
192    ///
193    /// # Returns
194    /// A vector of subscribers collected from each active scope that owns
195    /// scope-local registries.
196    pub fn collect_scope_local_subscribers(&self) -> Vec<EventSubscriberFn> {
197        self.stack
198            .iter()
199            .filter_map(|handle| self.scope_registries.get(&handle.uuid))
200            .flat_map(|registries| registries.event_subscribers.values().cloned())
201            .collect()
202    }
203
204    /// Return the scope-local registries for `uuid` without creating them.
205    ///
206    /// # Parameters
207    /// - `uuid`: UUID of the scope whose registries should be borrowed.
208    ///
209    /// # Returns
210    /// `Some(&ScopeLocalRegistries)` when registries already exist for that
211    /// scope and `None` otherwise.
212    pub fn scope_registries_get(&self, uuid: &Uuid) -> Option<&ScopeLocalRegistries> {
213        self.scope_registries.get(uuid)
214    }
215}
216
217impl std::fmt::Debug for ScopeStack {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        f.debug_struct("ScopeStack")
220            .field("stack", &self.stack)
221            .field("scope_registries_count", &self.scope_registries.len())
222            .finish()
223    }
224}
225
226impl Default for ScopeStack {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232/// Shared handle type for the runtime scope stack.
233///
234/// The runtime stores the active [`ScopeStack`] behind an [`Arc`] and [`RwLock`]
235/// so bindings can propagate it across execution contexts while still allowing
236/// concurrent readers.
237pub type ScopeStackHandle = Arc<RwLock<ScopeStack>>;
238
239/// Captured thread-local scope stack binding.
240///
241/// This preserves both the visible scope stack handle and whether it was
242/// explicitly installed on the current thread.
243#[derive(Clone)]
244pub struct ThreadScopeStackBinding {
245    stack: ScopeStackHandle,
246    explicit: bool,
247}
248
249/// Create a new scope stack handle with an implicit root scope.
250///
251/// The returned handle wraps a freshly initialized [`ScopeStack`] inside an
252/// [`Arc`] and [`RwLock`] so it can be shared across async tasks or threads.
253///
254/// # Returns
255/// A new [`ScopeStackHandle`] containing exactly one implicit root scope.
256///
257/// # Notes
258/// The root scope is always present and cannot be removed.
259pub fn create_scope_stack() -> ScopeStackHandle {
260    Arc::new(RwLock::new(ScopeStack::new()))
261}
262
263tokio::task_local! {
264    /// Task-local scope stack handle used by async execution contexts.
265    pub static TASK_SCOPE_STACK: ScopeStackHandle;
266}
267
268thread_local! {
269    /// Thread-local fallback scope stack for non-task contexts.
270    static THREAD_SCOPE_STACK: RefCell<ScopeStackHandle> = RefCell::new(create_scope_stack());
271    /// Whether the current thread explicitly owns a scope stack.
272    static THREAD_SCOPE_STACK_EXPLICIT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
273}
274
275/// Return the scope stack visible to the current execution context.
276///
277/// This resolves task-local scope state first and otherwise falls back to the
278/// current thread-local scope stack handle.
279///
280/// # Returns
281/// The active [`ScopeStackHandle`] for the current async task or thread.
282///
283/// # Notes
284/// When no explicit thread-local stack has been installed yet, the default
285/// per-thread root-only stack is returned.
286pub fn current_scope_stack() -> ScopeStackHandle {
287    TASK_SCOPE_STACK
288        .try_with(|stack| stack.clone())
289        .unwrap_or_else(|_| THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone()))
290}
291
292/// Install an explicit scope stack for the current thread.
293///
294/// This replaces the thread-local scope stack handle and marks the current
295/// thread as explicitly scope-aware for later propagation checks.
296///
297/// # Parameters
298/// - `handle`: Scope stack handle to install for the current thread.
299///
300/// # Returns
301/// `()`.
302///
303/// # Notes
304/// Use this when propagating an existing scope stack into worker threads.
305pub fn set_thread_scope_stack(handle: ScopeStackHandle) {
306    THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
307    THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(true));
308}
309
310/// Capture the current thread-local scope stack binding.
311///
312/// This is intended for foreign runtimes that temporarily bind a scope stack to
313/// an OS thread and need to restore the exact previous state before releasing
314/// that thread back to their scheduler.
315pub fn capture_thread_scope_stack() -> ThreadScopeStackBinding {
316    let stack = THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone());
317    let explicit = THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get());
318    ThreadScopeStackBinding { stack, explicit }
319}
320
321/// Restore a previously captured thread-local scope stack binding.
322pub fn restore_thread_scope_stack(binding: ThreadScopeStackBinding) {
323    THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = binding.stack);
324    THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(binding.explicit));
325}
326
327/// Synchronize the thread-local scope stack without marking it explicit.
328///
329/// This updates the thread-local slot used by native runtime code while
330/// preserving whether the thread was explicitly marked as owning a scope stack.
331///
332/// # Parameters
333/// - `handle`: Scope stack handle to synchronize into thread-local storage.
334///
335/// # Returns
336/// `()`.
337///
338/// # Notes
339/// Python bindings use this to mirror `ContextVar` state into Rust without
340/// forcing `scope_stack_active()` to become `true` for the thread.
341pub fn sync_thread_scope_stack(handle: ScopeStackHandle) {
342    THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
343}
344
345/// Report whether the current context has an explicitly active scope stack.
346///
347/// This checks task-local state first and otherwise falls back to the
348/// thread-local explicit flag.
349///
350/// # Returns
351/// `true` when the current async task or thread already owns an active scope
352/// stack and `false` otherwise.
353///
354/// # Notes
355/// A synchronized thread-local stack does not count as explicit unless it was
356/// installed through [`set_thread_scope_stack`].
357pub fn scope_stack_active() -> bool {
358    TASK_SCOPE_STACK
359        .try_with(|_| true)
360        .unwrap_or_else(|_| THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get()))
361}
362
363/// Capture the current scope stack handle for use in another thread.
364///
365/// This returns the handle currently visible to the caller so it can be passed
366/// into [`set_thread_scope_stack`] elsewhere.
367///
368/// # Returns
369/// A [`Result`] containing the active [`ScopeStackHandle`].
370///
371/// # Errors
372/// Returns an error when the current context does not yet own an active scope
373/// stack.
374///
375/// # Notes
376/// The returned handle is shared; it does not clone the underlying stack.
377pub fn propagate_scope_to_thread() -> Result<ScopeStackHandle> {
378    if !scope_stack_active() {
379        return Err(FlowError::Internal(
380            "no active scope stack in current context; call create_scope_stack() and set_thread_scope_stack() first"
381                .into(),
382        ));
383    }
384    Ok(current_scope_stack())
385}
386
387/// Clone the current top-most scope handle from the active stack.
388///
389/// # Returns
390/// A cloned [`ScopeHandle`] representing the current active scope.
391pub fn task_scope_top() -> ScopeHandle {
392    let stack = current_scope_stack();
393    let guard = stack.read().expect("scope stack lock poisoned");
394    guard.top().clone()
395}
396
397/// Push a scope handle onto the active stack.
398///
399/// # Parameters
400/// - `handle`: Scope handle to push onto the current execution context's stack.
401pub fn task_scope_push(handle: ScopeHandle) {
402    let stack = current_scope_stack();
403    let mut guard = stack.write().expect("scope stack lock poisoned");
404    guard.push(handle);
405}
406
407/// Remove a scope handle from the active stack.
408///
409/// # Parameters
410/// - `uuid`: UUID of the scope expected to be at the top of the active stack.
411///
412/// # Returns
413/// A [`Result`] containing the removed [`ScopeHandle`].
414///
415/// # Errors
416/// Propagates the same errors returned by [`ScopeStack::remove`].
417pub fn task_scope_remove(uuid: &Uuid) -> Result<ScopeHandle> {
418    let stack = current_scope_stack();
419    let mut guard = stack.write().expect("scope stack lock poisoned");
420    guard.remove(uuid)
421}