nemo_flow/api/runtime/scope_stack.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Scope stack storage and propagation helpers.
5//!
6//! The runtime tracks the current scope hierarchy through a shared
7//! [`ScopeStack`] stored in task-local or thread-local state. Advanced callers
8//! can use this module to inspect the active scope chain, attach scope-local
9//! middleware, or propagate scope context into worker threads.
10
11use std::cell::RefCell;
12use std::sync::{Arc, RwLock};
13
14use uuid::Uuid;
15
16use crate::api::runtime::callbacks::EventSubscriberFn;
17use crate::api::scope::{ScopeHandle, ScopeType};
18use crate::context::registries::ScopeLocalRegistries;
19use crate::error::{FlowError, Result};
20use crate::registry::SortedRegistry;
21
22/// Mutable stack of active scopes plus their scope-local registries.
23///
24/// The stack always contains an implicit root scope. Additional scopes are
25/// pushed as the public API opens lifecycle spans and removed when those spans
26/// close.
27pub struct ScopeStack {
28 stack: Vec<ScopeHandle>,
29 scope_registries: std::collections::HashMap<Uuid, ScopeLocalRegistries>,
30}
31
32impl ScopeStack {
33 /// Create a new scope stack containing only the implicit root scope.
34 ///
35 /// # Returns
36 /// A [`ScopeStack`] initialized with a single root scope and no
37 /// scope-local registries.
38 pub fn new() -> Self {
39 let root = ScopeHandle::builder()
40 .name("root")
41 .scope_type(ScopeType::Agent)
42 .build();
43 Self {
44 stack: vec![root],
45 scope_registries: std::collections::HashMap::new(),
46 }
47 }
48
49 /// Push a scope handle onto the top of the stack.
50 ///
51 /// # Parameters
52 /// - `handle`: Scope handle to make the new top-most active scope.
53 pub fn push(&mut self, handle: ScopeHandle) {
54 self.stack.push(handle);
55 }
56
57 /// Return the current top-most scope handle.
58 ///
59 /// # Returns
60 /// A shared reference to the active scope at the top of the stack.
61 ///
62 /// # Notes
63 /// This function never returns `None` because the implicit root scope is
64 /// always present.
65 pub fn top(&self) -> &ScopeHandle {
66 self.stack
67 .last()
68 .expect("scope stack should never be empty")
69 }
70
71 /// Return the current top-most scope handle mutably.
72 ///
73 /// # Returns
74 /// A mutable reference to the active scope at the top of the stack.
75 pub fn top_mut(&mut self) -> &mut ScopeHandle {
76 self.stack
77 .last_mut()
78 .expect("scope stack should never be empty")
79 }
80
81 /// Return the UUID of the implicit root scope.
82 ///
83 /// # Returns
84 /// The stable UUID of the root scope stored at the bottom of the stack.
85 pub fn root_uuid(&self) -> Uuid {
86 self.stack
87 .first()
88 .expect("scope stack should never be empty")
89 .uuid
90 }
91
92 /// Return the full ordered stack of scope handles.
93 ///
94 /// # Returns
95 /// A slice of scopes ordered from root to the current top-most scope.
96 pub fn scopes(&self) -> &[ScopeHandle] {
97 &self.stack
98 }
99
100 /// Find a scope handle by UUID.
101 ///
102 /// # Parameters
103 /// - `uuid`: UUID of the scope to search for.
104 ///
105 /// # Returns
106 /// `Some(&ScopeHandle)` when the scope is active on this stack and `None`
107 /// otherwise.
108 pub fn find(&self, uuid: &Uuid) -> Option<&ScopeHandle> {
109 self.stack.iter().find(|handle| handle.uuid == *uuid)
110 }
111
112 /// Remove the current top scope if it matches `uuid`.
113 ///
114 /// # Parameters
115 /// - `uuid`: UUID of the scope expected to be at the top of the stack.
116 ///
117 /// # Returns
118 /// A [`Result`] containing the removed [`ScopeHandle`].
119 ///
120 /// # Errors
121 /// Returns [`FlowError::InvalidArgument`] when the scope exists but is not
122 /// the current top of the stack or when the caller attempts to remove the
123 /// implicit root scope. Returns [`FlowError::NotFound`] when the UUID is
124 /// not present on the stack.
125 pub fn remove(&mut self, uuid: &Uuid) -> Result<ScopeHandle> {
126 let top = self
127 .stack
128 .last()
129 .expect("scope stack should never be empty");
130 if top.uuid == *uuid {
131 if self.stack.len() == 1 {
132 return Err(FlowError::InvalidArgument(
133 "root scope cannot be removed".into(),
134 ));
135 }
136 self.scope_registries.remove(uuid);
137 return Ok(self
138 .stack
139 .pop()
140 .expect("scope stack should contain a removable top scope"));
141 }
142
143 if self.stack.iter().any(|handle| handle.uuid == *uuid) {
144 return Err(FlowError::InvalidArgument(
145 "scope handle is not at the top of the stack".into(),
146 ));
147 }
148
149 Err(FlowError::NotFound("scope handle not found".into()))
150 }
151
152 /// Get or create the scope-local registries for an active scope.
153 ///
154 /// # Parameters
155 /// - `uuid`: UUID of an active scope on this stack.
156 ///
157 /// # Returns
158 /// `Some(&mut ScopeLocalRegistries)` when the scope is active and `None`
159 /// otherwise.
160 ///
161 /// # Notes
162 /// When the scope is active but has no registries yet, this function
163 /// creates an empty scope-local registry set first.
164 pub fn local_registries_mut(&mut self, uuid: &Uuid) -> Option<&mut ScopeLocalRegistries> {
165 if !self.stack.iter().any(|handle| handle.uuid == *uuid) {
166 return None;
167 }
168 Some(self.scope_registries.entry(*uuid).or_default())
169 }
170
171 /// Collect one registry field from every active scope that owns it.
172 ///
173 /// # Parameters
174 /// - `field`: Projection function selecting the registry field to collect
175 /// from each scope-local registry.
176 ///
177 /// # Returns
178 /// A vector of registry references ordered from root toward the current
179 /// top-most scope.
180 pub fn collect_scope_local_registries<'a, T>(
181 &'a self,
182 field: impl Fn(&'a ScopeLocalRegistries) -> &'a SortedRegistry<T>,
183 ) -> Vec<&'a SortedRegistry<T>> {
184 self.stack
185 .iter()
186 .filter_map(|handle| self.scope_registries.get(&handle.uuid))
187 .map(field)
188 .collect()
189 }
190
191 /// Collect all scope-local subscribers visible from the active stack.
192 ///
193 /// # Returns
194 /// A vector of subscribers collected from each active scope that owns
195 /// scope-local registries.
196 pub fn collect_scope_local_subscribers(&self) -> Vec<EventSubscriberFn> {
197 self.stack
198 .iter()
199 .filter_map(|handle| self.scope_registries.get(&handle.uuid))
200 .flat_map(|registries| registries.event_subscribers.values().cloned())
201 .collect()
202 }
203
204 /// Return the scope-local registries for `uuid` without creating them.
205 ///
206 /// # Parameters
207 /// - `uuid`: UUID of the scope whose registries should be borrowed.
208 ///
209 /// # Returns
210 /// `Some(&ScopeLocalRegistries)` when registries already exist for that
211 /// scope and `None` otherwise.
212 pub fn scope_registries_get(&self, uuid: &Uuid) -> Option<&ScopeLocalRegistries> {
213 self.scope_registries.get(uuid)
214 }
215}
216
217impl std::fmt::Debug for ScopeStack {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 f.debug_struct("ScopeStack")
220 .field("stack", &self.stack)
221 .field("scope_registries_count", &self.scope_registries.len())
222 .finish()
223 }
224}
225
226impl Default for ScopeStack {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232/// Shared handle type for the runtime scope stack.
233///
234/// The runtime stores the active [`ScopeStack`] behind an [`Arc`] and [`RwLock`]
235/// so bindings can propagate it across execution contexts while still allowing
236/// concurrent readers.
237pub type ScopeStackHandle = Arc<RwLock<ScopeStack>>;
238
239/// Captured thread-local scope stack binding.
240///
241/// This preserves both the visible scope stack handle and whether it was
242/// explicitly installed on the current thread.
243#[derive(Clone)]
244pub struct ThreadScopeStackBinding {
245 stack: ScopeStackHandle,
246 explicit: bool,
247}
248
249/// Create a new scope stack handle with an implicit root scope.
250///
251/// The returned handle wraps a freshly initialized [`ScopeStack`] inside an
252/// [`Arc`] and [`RwLock`] so it can be shared across async tasks or threads.
253///
254/// # Returns
255/// A new [`ScopeStackHandle`] containing exactly one implicit root scope.
256///
257/// # Notes
258/// The root scope is always present and cannot be removed.
259pub fn create_scope_stack() -> ScopeStackHandle {
260 Arc::new(RwLock::new(ScopeStack::new()))
261}
262
263tokio::task_local! {
264 /// Task-local scope stack handle used by async execution contexts.
265 pub static TASK_SCOPE_STACK: ScopeStackHandle;
266}
267
268thread_local! {
269 /// Thread-local fallback scope stack for non-task contexts.
270 static THREAD_SCOPE_STACK: RefCell<ScopeStackHandle> = RefCell::new(create_scope_stack());
271 /// Whether the current thread explicitly owns a scope stack.
272 static THREAD_SCOPE_STACK_EXPLICIT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
273}
274
275/// Return the scope stack visible to the current execution context.
276///
277/// This resolves task-local scope state first and otherwise falls back to the
278/// current thread-local scope stack handle.
279///
280/// # Returns
281/// The active [`ScopeStackHandle`] for the current async task or thread.
282///
283/// # Notes
284/// When no explicit thread-local stack has been installed yet, the default
285/// per-thread root-only stack is returned.
286pub fn current_scope_stack() -> ScopeStackHandle {
287 TASK_SCOPE_STACK
288 .try_with(|stack| stack.clone())
289 .unwrap_or_else(|_| THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone()))
290}
291
292/// Install an explicit scope stack for the current thread.
293///
294/// This replaces the thread-local scope stack handle and marks the current
295/// thread as explicitly scope-aware for later propagation checks.
296///
297/// # Parameters
298/// - `handle`: Scope stack handle to install for the current thread.
299///
300/// # Returns
301/// `()`.
302///
303/// # Notes
304/// Use this when propagating an existing scope stack into worker threads.
305pub fn set_thread_scope_stack(handle: ScopeStackHandle) {
306 THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
307 THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(true));
308}
309
310/// Capture the current thread-local scope stack binding.
311///
312/// This is intended for foreign runtimes that temporarily bind a scope stack to
313/// an OS thread and need to restore the exact previous state before releasing
314/// that thread back to their scheduler.
315pub fn capture_thread_scope_stack() -> ThreadScopeStackBinding {
316 let stack = THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone());
317 let explicit = THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get());
318 ThreadScopeStackBinding { stack, explicit }
319}
320
321/// Restore a previously captured thread-local scope stack binding.
322pub fn restore_thread_scope_stack(binding: ThreadScopeStackBinding) {
323 THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = binding.stack);
324 THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(binding.explicit));
325}
326
327/// Synchronize the thread-local scope stack without marking it explicit.
328///
329/// This updates the thread-local slot used by native runtime code while
330/// preserving whether the thread was explicitly marked as owning a scope stack.
331///
332/// # Parameters
333/// - `handle`: Scope stack handle to synchronize into thread-local storage.
334///
335/// # Returns
336/// `()`.
337///
338/// # Notes
339/// Python bindings use this to mirror `ContextVar` state into Rust without
340/// forcing `scope_stack_active()` to become `true` for the thread.
341pub fn sync_thread_scope_stack(handle: ScopeStackHandle) {
342 THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = handle);
343}
344
345/// Report whether the current context has an explicitly active scope stack.
346///
347/// This checks task-local state first and otherwise falls back to the
348/// thread-local explicit flag.
349///
350/// # Returns
351/// `true` when the current async task or thread already owns an active scope
352/// stack and `false` otherwise.
353///
354/// # Notes
355/// A synchronized thread-local stack does not count as explicit unless it was
356/// installed through [`set_thread_scope_stack`].
357pub fn scope_stack_active() -> bool {
358 TASK_SCOPE_STACK
359 .try_with(|_| true)
360 .unwrap_or_else(|_| THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get()))
361}
362
363/// Capture the current scope stack handle for use in another thread.
364///
365/// This returns the handle currently visible to the caller so it can be passed
366/// into [`set_thread_scope_stack`] elsewhere.
367///
368/// # Returns
369/// A [`Result`] containing the active [`ScopeStackHandle`].
370///
371/// # Errors
372/// Returns an error when the current context does not yet own an active scope
373/// stack.
374///
375/// # Notes
376/// The returned handle is shared; it does not clone the underlying stack.
377pub fn propagate_scope_to_thread() -> Result<ScopeStackHandle> {
378 if !scope_stack_active() {
379 return Err(FlowError::Internal(
380 "no active scope stack in current context; call create_scope_stack() and set_thread_scope_stack() first"
381 .into(),
382 ));
383 }
384 Ok(current_scope_stack())
385}
386
387/// Clone the current top-most scope handle from the active stack.
388///
389/// # Returns
390/// A cloned [`ScopeHandle`] representing the current active scope.
391pub fn task_scope_top() -> ScopeHandle {
392 let stack = current_scope_stack();
393 let guard = stack.read().expect("scope stack lock poisoned");
394 guard.top().clone()
395}
396
397/// Push a scope handle onto the active stack.
398///
399/// # Parameters
400/// - `handle`: Scope handle to push onto the current execution context's stack.
401pub fn task_scope_push(handle: ScopeHandle) {
402 let stack = current_scope_stack();
403 let mut guard = stack.write().expect("scope stack lock poisoned");
404 guard.push(handle);
405}
406
407/// Remove a scope handle from the active stack.
408///
409/// # Parameters
410/// - `uuid`: UUID of the scope expected to be at the top of the active stack.
411///
412/// # Returns
413/// A [`Result`] containing the removed [`ScopeHandle`].
414///
415/// # Errors
416/// Propagates the same errors returned by [`ScopeStack::remove`].
417pub fn task_scope_remove(uuid: &Uuid) -> Result<ScopeHandle> {
418 let stack = current_scope_stack();
419 let mut guard = stack.write().expect("scope stack lock poisoned");
420 guard.remove(uuid)
421}