nemo_flow/api/runtime/scope_stack.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Scope stack storage and propagation helpers.
5//!
6//! The runtime tracks the current scope hierarchy through a shared
7//! [`ScopeStack`] stored in task-local or thread-local state. Advanced callers
8//! can use this module to inspect the active scope chain, attach scope-local
9//! middleware, or propagate scope context into worker threads.
10
11use std::cell::RefCell;
12use std::sync::{Arc, RwLock};
13
14use uuid::Uuid;
15
16use crate::api::runtime::callbacks::EventSubscriberFn;
17use crate::api::scope::{ScopeHandle, ScopeType};
18use crate::context::registries::ScopeLocalRegistries;
19use crate::error::{FlowError, Result};
20use crate::registry::SortedRegistry;
21
22/// Mutable stack of active scopes plus their scope-local registries.
23///
24/// The stack always contains an implicit root scope. Additional scopes are
25/// pushed as the public API opens lifecycle spans and removed when those spans
26/// close.
27pub struct ScopeStack {
28 stack: Vec<ScopeHandle>,
29 scope_registries: std::collections::HashMap<Uuid, ScopeLocalRegistries>,
30}
31
32impl ScopeStack {
33 /// Create a new scope stack containing only the implicit root scope.
34 ///
35 /// # Returns
36 /// A [`ScopeStack`] initialized with a single root scope and no
37 /// scope-local registries.
38 pub fn new() -> Self {
39 let root = ScopeHandle::builder()
40 .name("root")
41 .scope_type(ScopeType::Agent)
42 .build();
43 Self {
44 stack: vec![root],
45 scope_registries: std::collections::HashMap::new(),
46 }
47 }
48
49 /// Push a scope handle onto the top of the stack.
50 ///
51 /// # Parameters
52 /// - `handle`: Scope handle to make the new top-most active scope.
53 pub fn push(&mut self, handle: ScopeHandle) {
54 self.stack.push(handle);
55 }
56
57 /// Return the current top-most scope handle.
58 ///
59 /// # Returns
60 /// A shared reference to the active scope at the top of the stack.
61 ///
62 /// # Notes
63 /// This function never returns `None` because the implicit root scope is
64 /// always present.
65 pub fn top(&self) -> &ScopeHandle {
66 self.stack
67 .last()
68 .expect("scope stack should never be empty")
69 }
70
71 /// Return the current top-most scope handle mutably.
72 ///
73 /// # Returns
74 /// A mutable reference to the active scope at the top of the stack.
75 pub fn top_mut(&mut self) -> &mut ScopeHandle {
76 self.stack
77 .last_mut()
78 .expect("scope stack should never be empty")
79 }
80
81 /// Return the UUID of the implicit root scope.
82 ///
83 /// # Returns
84 /// The stable UUID of the root scope stored at the bottom of the stack.
85 pub fn root_uuid(&self) -> Uuid {
86 self.stack
87 .first()
88 .expect("scope stack should never be empty")
89 .uuid
90 }
91
92 /// Return the full ordered stack of scope handles.
93 ///
94 /// # Returns
95 /// A slice of scopes ordered from root to the current top-most scope.
96 pub fn scopes(&self) -> &[ScopeHandle] {
97 &self.stack
98 }
99
100 /// Find a scope handle by UUID.
101 ///
102 /// # Parameters
103 /// - `uuid`: UUID of the scope to search for.
104 ///
105 /// # Returns
106 /// `Some(&ScopeHandle)` when the scope is active on this stack and `None`
107 /// otherwise.
108 pub fn find(&self, uuid: &Uuid) -> Option<&ScopeHandle> {
109 self.stack.iter().find(|handle| handle.uuid == *uuid)
110 }
111
112 /// Remove the current top scope if it matches `uuid`.
113 ///
114 /// # Parameters
115 /// - `uuid`: UUID of the scope expected to be at the top of the stack.
116 ///
117 /// # Returns
118 /// A [`Result`] containing the removed [`ScopeHandle`].
119 ///
120 /// # Errors
121 /// Returns [`FlowError::InvalidArgument`] when the scope exists but is not
122 /// the current top of the stack or when the caller attempts to remove the
123 /// implicit root scope. Returns [`FlowError::NotFound`] when the UUID is
124 /// not present on the stack.
125 pub fn remove(&mut self, uuid: &Uuid) -> Result<ScopeHandle> {
126 let top = self
127 .stack
128 .last()
129 .expect("scope stack should never be empty");
130 if top.uuid == *uuid {
131 if self.stack.len() == 1 {
132 return Err(FlowError::InvalidArgument(
133 "root scope cannot be removed".into(),
134 ));
135 }
136 self.scope_registries.remove(uuid);
137 return Ok(self
138 .stack
139 .pop()
140 .expect("scope stack should contain a removable top scope"));
141 }
142
143 if self.stack.iter().any(|handle| handle.uuid == *uuid) {
144 return Err(FlowError::InvalidArgument(
145 "scope handle is not at the top of the stack".into(),
146 ));
147 }
148
149 Err(FlowError::NotFound("scope handle not found".into()))
150 }
151
152 /// Get or create the scope-local registries for an active scope.
153 ///
154 /// # Parameters
155 /// - `uuid`: UUID of an active scope on this stack.
156 ///
157 /// # Returns
158 /// `Some(&mut ScopeLocalRegistries)` when the scope is active and `None`
159 /// otherwise.
160 ///
161 /// # Notes
162 /// When the scope is active but has no registries yet, this function
163 /// creates an empty scope-local registry set first.
164 pub fn local_registries_mut(&mut self, uuid: &Uuid) -> Option<&mut ScopeLocalRegistries> {
165 if !self.stack.iter().any(|handle| handle.uuid == *uuid) {
166 return None;
167 }
168 Some(self.scope_registries.entry(*uuid).or_default())
169 }
170
171 /// Collect one registry field from every active scope that owns it.
172 ///
173 /// # Parameters
174 /// - `field`: Projection function selecting the registry field to collect
175 /// from each scope-local registry.
176 ///
177 /// # Returns
178 /// A vector of registry references ordered from root toward the current
179 /// top-most scope.
180 pub fn collect_scope_local_registries<'a, T>(
181 &'a self,
182 field: impl Fn(&'a ScopeLocalRegistries) -> &'a SortedRegistry<T>,
183 ) -> Vec<&'a SortedRegistry<T>> {
184 self.stack
185 .iter()
186 .filter_map(|handle| self.scope_registries.get(&handle.uuid))
187 .map(field)
188 .collect()
189 }
190
191 /// Collect all scope-local subscribers visible from the active stack.
192 ///
193 /// # Returns
194 /// A vector of subscribers collected from each active scope that owns
195 /// scope-local registries.
196 pub fn collect_scope_local_subscribers(&self) -> Vec<EventSubscriberFn> {
197 self.stack
198 .iter()
199 .filter_map(|handle| self.scope_registries.get(&handle.uuid))
200 .flat_map(|registries| registries.event_subscribers.values().cloned())
201 .collect()
202 }
203
204 /// Return the scope-local registries for `uuid` without creating them.
205 ///
206 /// # Parameters
207 /// - `uuid`: UUID of the scope whose registries should be borrowed.
208 ///
209 /// # Returns
210 /// `Some(&ScopeLocalRegistries)` when registries already exist for that
211 /// scope and `None` otherwise.
212 pub fn scope_registries_get(&self, uuid: &Uuid) -> Option<&ScopeLocalRegistries> {
213 self.scope_registries.get(uuid)
214 }
215}
216
217impl std::fmt::Debug for ScopeStack {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 f.debug_struct("ScopeStack")
220 .field("stack", &self.stack)
221 .field("scope_registries_count", &self.scope_registries.len())
222 .finish()
223 }
224}
225
226impl Default for ScopeStack {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232/// Shared handle type for the runtime scope stack.
233///
234/// The runtime stores the active [`ScopeStack`] behind an [`Arc`] and [`RwLock`]
235/// so bindings can propagate it across execution contexts while still allowing
236/// concurrent readers.
237pub type ScopeStackHandle = Arc<RwLock<ScopeStack>>;
238
239/// Create a new scope stack handle with an implicit root scope.
240///
241/// The returned handle wraps a freshly initialized [`ScopeStack`] inside an
242/// [`Arc`] and [`RwLock`] so it can be shared across async tasks or threads.
243///
244/// # Returns
245/// A new [`ScopeStackHandle`] containing exactly one implicit root scope.
246///
247/// # Notes
248/// The root scope is always present and cannot be removed.
249pub fn create_scope_stack() -> ScopeStackHandle {
250 Arc::new(RwLock::new(ScopeStack::new()))
251}
252
253tokio::task_local! {
254 /// Task-local scope stack handle used by async execution contexts.
255 pub static TASK_SCOPE_STACK: ScopeStackHandle;
256}
257
258thread_local! {
259 /// Thread-local fallback scope stack for non-task contexts.
260 static THREAD_SCOPE_STACK: RefCell<ScopeStackHandle> = RefCell::new(create_scope_stack());
261 /// Whether the current thread explicitly owns a scope stack.
262 static THREAD_SCOPE_STACK_EXPLICIT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
263}
264
265/// Return the scope stack visible to the current execution context.
266///
267/// This resolves task-local scope state first and otherwise falls back to the
268/// current thread-local scope stack handle.
269///
270/// # Returns
271/// The active [`ScopeStackHandle`] for the current async task or thread.
272///
273/// # Notes
274/// When no explicit thread-local stack has been installed yet, the default
275/// per-thread root-only stack is returned.
276pub fn current_scope_stack() -> ScopeStackHandle {
277 TASK_SCOPE_STACK
278 .try_with(|stack| stack.clone())
279 .unwrap_or_else(|_| THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone()))
280}
281
282/// Install an explicit scope stack for the current thread.
283///
284/// This replaces the thread-local scope stack handle and marks the current
285/// thread as explicitly scope-aware for later propagation checks.
286///
287/// # Parameters
288/// - `handle`: Scope stack handle to install for the current thread.
289///
290/// # Returns
291/// `()`.
292///
293/// # Notes
294/// Use this when propagating an existing scope stack into worker threads.
295pub fn set_thread_scope_stack(handle: ScopeStackHandle) {
296 THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
297 THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(true));
298}
299
300/// Synchronize the thread-local scope stack without marking it explicit.
301///
302/// This updates the thread-local slot used by native runtime code while
303/// preserving whether the thread was explicitly marked as owning a scope stack.
304///
305/// # Parameters
306/// - `handle`: Scope stack handle to synchronize into thread-local storage.
307///
308/// # Returns
309/// `()`.
310///
311/// # Notes
312/// Python bindings use this to mirror `ContextVar` state into Rust without
313/// forcing `scope_stack_active()` to become `true` for the thread.
314pub fn sync_thread_scope_stack(handle: ScopeStackHandle) {
315 THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
316}
317
318/// Report whether the current context has an explicitly active scope stack.
319///
320/// This checks task-local state first and otherwise falls back to the
321/// thread-local explicit flag.
322///
323/// # Returns
324/// `true` when the current async task or thread already owns an active scope
325/// stack and `false` otherwise.
326///
327/// # Notes
328/// A synchronized thread-local stack does not count as explicit unless it was
329/// installed through [`set_thread_scope_stack`].
330pub fn scope_stack_active() -> bool {
331 TASK_SCOPE_STACK
332 .try_with(|_| true)
333 .unwrap_or_else(|_| THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get()))
334}
335
336/// Capture the current scope stack handle for use in another thread.
337///
338/// This returns the handle currently visible to the caller so it can be passed
339/// into [`set_thread_scope_stack`] elsewhere.
340///
341/// # Returns
342/// A [`Result`] containing the active [`ScopeStackHandle`].
343///
344/// # Errors
345/// Returns an error when the current context does not yet own an active scope
346/// stack.
347///
348/// # Notes
349/// The returned handle is shared; it does not clone the underlying stack.
350pub fn propagate_scope_to_thread() -> Result<ScopeStackHandle> {
351 if !scope_stack_active() {
352 return Err(FlowError::Internal(
353 "no active scope stack in current context; call create_scope_stack() and set_thread_scope_stack() first"
354 .into(),
355 ));
356 }
357 Ok(current_scope_stack())
358}
359
360/// Clone the current top-most scope handle from the active stack.
361///
362/// # Returns
363/// A cloned [`ScopeHandle`] representing the current active scope.
364pub fn task_scope_top() -> ScopeHandle {
365 let stack = current_scope_stack();
366 let guard = stack.read().expect("scope stack lock poisoned");
367 guard.top().clone()
368}
369
370/// Push a scope handle onto the active stack.
371///
372/// # Parameters
373/// - `handle`: Scope handle to push onto the current execution context's stack.
374pub fn task_scope_push(handle: ScopeHandle) {
375 let stack = current_scope_stack();
376 let mut guard = stack.write().expect("scope stack lock poisoned");
377 guard.push(handle);
378}
379
380/// Remove a scope handle from the active stack.
381///
382/// # Parameters
383/// - `uuid`: UUID of the scope expected to be at the top of the active stack.
384///
385/// # Returns
386/// A [`Result`] containing the removed [`ScopeHandle`].
387///
388/// # Errors
389/// Propagates the same errors returned by [`ScopeStack::remove`].
390pub fn task_scope_remove(uuid: &Uuid) -> Result<ScopeHandle> {
391 let stack = current_scope_stack();
392 let mut guard = stack.write().expect("scope stack lock poisoned");
393 guard.remove(uuid)
394}