Skip to main content

nemo_flow/api/runtime/
scope_stack.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Scope stack storage and propagation helpers.
5//!
6//! The runtime tracks the current scope hierarchy through a shared
7//! [`ScopeStack`] stored in task-local or thread-local state. Advanced callers
8//! can use this module to inspect the active scope chain, attach scope-local
9//! middleware, or propagate scope context into worker threads.
10
11use std::cell::RefCell;
12use std::sync::{Arc, RwLock};
13
14use uuid::Uuid;
15
16use crate::api::runtime::callbacks::EventSubscriberFn;
17use crate::api::scope::{ScopeHandle, ScopeType};
18use crate::context::registries::ScopeLocalRegistries;
19use crate::error::{FlowError, Result};
20use crate::registry::SortedRegistry;
21
22/// Mutable stack of active scopes plus their scope-local registries.
23///
24/// The stack always contains an implicit root scope. Additional scopes are
25/// pushed as the public API opens lifecycle spans and removed when those spans
26/// close.
27pub struct ScopeStack {
28    stack: Vec<ScopeHandle>,
29    scope_registries: std::collections::HashMap<Uuid, ScopeLocalRegistries>,
30}
31
32impl ScopeStack {
33    /// Create a new scope stack containing only the implicit root scope.
34    ///
35    /// # Returns
36    /// A [`ScopeStack`] initialized with a single root scope and no
37    /// scope-local registries.
38    pub fn new() -> Self {
39        let root = ScopeHandle::builder()
40            .name("root")
41            .scope_type(ScopeType::Agent)
42            .build();
43        Self {
44            stack: vec![root],
45            scope_registries: std::collections::HashMap::new(),
46        }
47    }
48
49    /// Push a scope handle onto the top of the stack.
50    ///
51    /// # Parameters
52    /// - `handle`: Scope handle to make the new top-most active scope.
53    pub fn push(&mut self, handle: ScopeHandle) {
54        self.stack.push(handle);
55    }
56
57    /// Return the current top-most scope handle.
58    ///
59    /// # Returns
60    /// A shared reference to the active scope at the top of the stack.
61    ///
62    /// # Notes
63    /// This function never returns `None` because the implicit root scope is
64    /// always present.
65    pub fn top(&self) -> &ScopeHandle {
66        self.stack
67            .last()
68            .expect("scope stack should never be empty")
69    }
70
71    /// Return the current top-most scope handle mutably.
72    ///
73    /// # Returns
74    /// A mutable reference to the active scope at the top of the stack.
75    pub fn top_mut(&mut self) -> &mut ScopeHandle {
76        self.stack
77            .last_mut()
78            .expect("scope stack should never be empty")
79    }
80
81    /// Return the UUID of the implicit root scope.
82    ///
83    /// # Returns
84    /// The stable UUID of the root scope stored at the bottom of the stack.
85    pub fn root_uuid(&self) -> Uuid {
86        self.stack
87            .first()
88            .expect("scope stack should never be empty")
89            .uuid
90    }
91
92    /// Return the full ordered stack of scope handles.
93    ///
94    /// # Returns
95    /// A slice of scopes ordered from root to the current top-most scope.
96    pub fn scopes(&self) -> &[ScopeHandle] {
97        &self.stack
98    }
99
100    /// Find a scope handle by UUID.
101    ///
102    /// # Parameters
103    /// - `uuid`: UUID of the scope to search for.
104    ///
105    /// # Returns
106    /// `Some(&ScopeHandle)` when the scope is active on this stack and `None`
107    /// otherwise.
108    pub fn find(&self, uuid: &Uuid) -> Option<&ScopeHandle> {
109        self.stack.iter().find(|handle| handle.uuid == *uuid)
110    }
111
112    /// Remove the current top scope if it matches `uuid`.
113    ///
114    /// # Parameters
115    /// - `uuid`: UUID of the scope expected to be at the top of the stack.
116    ///
117    /// # Returns
118    /// A [`Result`] containing the removed [`ScopeHandle`].
119    ///
120    /// # Errors
121    /// Returns [`FlowError::InvalidArgument`] when the scope exists but is not
122    /// the current top of the stack or when the caller attempts to remove the
123    /// implicit root scope. Returns [`FlowError::NotFound`] when the UUID is
124    /// not present on the stack.
125    pub fn remove(&mut self, uuid: &Uuid) -> Result<ScopeHandle> {
126        let top = self
127            .stack
128            .last()
129            .expect("scope stack should never be empty");
130        if top.uuid == *uuid {
131            if self.stack.len() == 1 {
132                return Err(FlowError::InvalidArgument(
133                    "root scope cannot be removed".into(),
134                ));
135            }
136            self.scope_registries.remove(uuid);
137            return Ok(self
138                .stack
139                .pop()
140                .expect("scope stack should contain a removable top scope"));
141        }
142
143        if self.stack.iter().any(|handle| handle.uuid == *uuid) {
144            return Err(FlowError::InvalidArgument(
145                "scope handle is not at the top of the stack".into(),
146            ));
147        }
148
149        Err(FlowError::NotFound("scope handle not found".into()))
150    }
151
152    /// Get or create the scope-local registries for an active scope.
153    ///
154    /// # Parameters
155    /// - `uuid`: UUID of an active scope on this stack.
156    ///
157    /// # Returns
158    /// `Some(&mut ScopeLocalRegistries)` when the scope is active and `None`
159    /// otherwise.
160    ///
161    /// # Notes
162    /// When the scope is active but has no registries yet, this function
163    /// creates an empty scope-local registry set first.
164    pub fn local_registries_mut(&mut self, uuid: &Uuid) -> Option<&mut ScopeLocalRegistries> {
165        if !self.stack.iter().any(|handle| handle.uuid == *uuid) {
166            return None;
167        }
168        Some(self.scope_registries.entry(*uuid).or_default())
169    }
170
171    /// Collect one registry field from every active scope that owns it.
172    ///
173    /// # Parameters
174    /// - `field`: Projection function selecting the registry field to collect
175    ///   from each scope-local registry.
176    ///
177    /// # Returns
178    /// A vector of registry references ordered from root toward the current
179    /// top-most scope.
180    pub fn collect_scope_local_registries<'a, T>(
181        &'a self,
182        field: impl Fn(&'a ScopeLocalRegistries) -> &'a SortedRegistry<T>,
183    ) -> Vec<&'a SortedRegistry<T>> {
184        self.stack
185            .iter()
186            .filter_map(|handle| self.scope_registries.get(&handle.uuid))
187            .map(field)
188            .collect()
189    }
190
191    /// Collect all scope-local subscribers visible from the active stack.
192    ///
193    /// # Returns
194    /// A vector of subscribers collected from each active scope that owns
195    /// scope-local registries.
196    pub fn collect_scope_local_subscribers(&self) -> Vec<EventSubscriberFn> {
197        self.stack
198            .iter()
199            .filter_map(|handle| self.scope_registries.get(&handle.uuid))
200            .flat_map(|registries| registries.event_subscribers.values().cloned())
201            .collect()
202    }
203
204    /// Return the scope-local registries for `uuid` without creating them.
205    ///
206    /// # Parameters
207    /// - `uuid`: UUID of the scope whose registries should be borrowed.
208    ///
209    /// # Returns
210    /// `Some(&ScopeLocalRegistries)` when registries already exist for that
211    /// scope and `None` otherwise.
212    pub fn scope_registries_get(&self, uuid: &Uuid) -> Option<&ScopeLocalRegistries> {
213        self.scope_registries.get(uuid)
214    }
215}
216
217impl std::fmt::Debug for ScopeStack {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        f.debug_struct("ScopeStack")
220            .field("stack", &self.stack)
221            .field("scope_registries_count", &self.scope_registries.len())
222            .finish()
223    }
224}
225
226impl Default for ScopeStack {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232/// Shared handle type for the runtime scope stack.
233///
234/// The runtime stores the active [`ScopeStack`] behind an [`Arc`] and [`RwLock`]
235/// so bindings can propagate it across execution contexts while still allowing
236/// concurrent readers.
237pub type ScopeStackHandle = Arc<RwLock<ScopeStack>>;
238
239/// Create a new scope stack handle with an implicit root scope.
240///
241/// The returned handle wraps a freshly initialized [`ScopeStack`] inside an
242/// [`Arc`] and [`RwLock`] so it can be shared across async tasks or threads.
243///
244/// # Returns
245/// A new [`ScopeStackHandle`] containing exactly one implicit root scope.
246///
247/// # Notes
248/// The root scope is always present and cannot be removed.
249pub fn create_scope_stack() -> ScopeStackHandle {
250    Arc::new(RwLock::new(ScopeStack::new()))
251}
252
253tokio::task_local! {
254    /// Task-local scope stack handle used by async execution contexts.
255    pub static TASK_SCOPE_STACK: ScopeStackHandle;
256}
257
258thread_local! {
259    /// Thread-local fallback scope stack for non-task contexts.
260    static THREAD_SCOPE_STACK: RefCell<ScopeStackHandle> = RefCell::new(create_scope_stack());
261    /// Whether the current thread explicitly owns a scope stack.
262    static THREAD_SCOPE_STACK_EXPLICIT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
263}
264
265/// Return the scope stack visible to the current execution context.
266///
267/// This resolves task-local scope state first and otherwise falls back to the
268/// current thread-local scope stack handle.
269///
270/// # Returns
271/// The active [`ScopeStackHandle`] for the current async task or thread.
272///
273/// # Notes
274/// When no explicit thread-local stack has been installed yet, the default
275/// per-thread root-only stack is returned.
276pub fn current_scope_stack() -> ScopeStackHandle {
277    TASK_SCOPE_STACK
278        .try_with(|stack| stack.clone())
279        .unwrap_or_else(|_| THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone()))
280}
281
282/// Install an explicit scope stack for the current thread.
283///
284/// This replaces the thread-local scope stack handle and marks the current
285/// thread as explicitly scope-aware for later propagation checks.
286///
287/// # Parameters
288/// - `handle`: Scope stack handle to install for the current thread.
289///
290/// # Returns
291/// `()`.
292///
293/// # Notes
294/// Use this when propagating an existing scope stack into worker threads.
295pub fn set_thread_scope_stack(handle: ScopeStackHandle) {
296    THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
297    THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(true));
298}
299
300/// Synchronize the thread-local scope stack without marking it explicit.
301///
302/// This updates the thread-local slot used by native runtime code while
303/// preserving whether the thread was explicitly marked as owning a scope stack.
304///
305/// # Parameters
306/// - `handle`: Scope stack handle to synchronize into thread-local storage.
307///
308/// # Returns
309/// `()`.
310///
311/// # Notes
312/// Python bindings use this to mirror `ContextVar` state into Rust without
313/// forcing `scope_stack_active()` to become `true` for the thread.
314pub fn sync_thread_scope_stack(handle: ScopeStackHandle) {
315    THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
316}
317
318/// Report whether the current context has an explicitly active scope stack.
319///
320/// This checks task-local state first and otherwise falls back to the
321/// thread-local explicit flag.
322///
323/// # Returns
324/// `true` when the current async task or thread already owns an active scope
325/// stack and `false` otherwise.
326///
327/// # Notes
328/// A synchronized thread-local stack does not count as explicit unless it was
329/// installed through [`set_thread_scope_stack`].
330pub fn scope_stack_active() -> bool {
331    TASK_SCOPE_STACK
332        .try_with(|_| true)
333        .unwrap_or_else(|_| THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get()))
334}
335
336/// Capture the current scope stack handle for use in another thread.
337///
338/// This returns the handle currently visible to the caller so it can be passed
339/// into [`set_thread_scope_stack`] elsewhere.
340///
341/// # Returns
342/// A [`Result`] containing the active [`ScopeStackHandle`].
343///
344/// # Errors
345/// Returns an error when the current context does not yet own an active scope
346/// stack.
347///
348/// # Notes
349/// The returned handle is shared; it does not clone the underlying stack.
350pub fn propagate_scope_to_thread() -> Result<ScopeStackHandle> {
351    if !scope_stack_active() {
352        return Err(FlowError::Internal(
353            "no active scope stack in current context; call create_scope_stack() and set_thread_scope_stack() first"
354                .into(),
355        ));
356    }
357    Ok(current_scope_stack())
358}
359
360/// Clone the current top-most scope handle from the active stack.
361///
362/// # Returns
363/// A cloned [`ScopeHandle`] representing the current active scope.
364pub fn task_scope_top() -> ScopeHandle {
365    let stack = current_scope_stack();
366    let guard = stack.read().expect("scope stack lock poisoned");
367    guard.top().clone()
368}
369
370/// Push a scope handle onto the active stack.
371///
372/// # Parameters
373/// - `handle`: Scope handle to push onto the current execution context's stack.
374pub fn task_scope_push(handle: ScopeHandle) {
375    let stack = current_scope_stack();
376    let mut guard = stack.write().expect("scope stack lock poisoned");
377    guard.push(handle);
378}
379
380/// Remove a scope handle from the active stack.
381///
382/// # Parameters
383/// - `uuid`: UUID of the scope expected to be at the top of the active stack.
384///
385/// # Returns
386/// A [`Result`] containing the removed [`ScopeHandle`].
387///
388/// # Errors
389/// Propagates the same errors returned by [`ScopeStack::remove`].
390pub fn task_scope_remove(uuid: &Uuid) -> Result<ScopeHandle> {
391    let stack = current_scope_stack();
392    let mut guard = stack.write().expect("scope stack lock poisoned");
393    guard.remove(uuid)
394}