Skip to main content

nemo_flow/api/
scope.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::event::{BaseEvent, MarkEvent};
5use crate::api::runtime::NemoFlowContextState;
6use crate::api::runtime::global_context;
7use crate::api::runtime::{
8    current_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
9};
10use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
11use crate::error::{FlowError, Result};
12use crate::json::Json;
13use bitflags::bitflags;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use typed_builder::TypedBuilder;
17use uuid::Uuid;
18
19use crate::api::llm::LlmAttributes;
20use crate::api::tool::ToolAttributes;
21
22bitflags! {
23    /// Bitflags that modify scope behavior and observability.
24    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25    pub struct ScopeAttributes: u32 {
26        /// Marks the scope as running in parallel with sibling work.
27        const PARALLEL    = 0b01;
28        /// Marks the scope as safe to move across execution contexts.
29        const RELOCATABLE = 0b10;
30    }
31}
32
33/// Semantic category attached to a scope lifecycle span.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ScopeType {
37    /// A top-level agent or workflow scope.
38    Agent,
39    /// A generic function or application step.
40    Function,
41    /// A tool lifecycle scope.
42    Tool,
43    /// An LLM lifecycle scope.
44    Llm,
45    /// A retrieval step such as document search.
46    Retriever,
47    /// An embedding generation step.
48    Embedder,
49    /// A reranking step.
50    Reranker,
51    /// A guardrail or validation step.
52    Guardrail,
53    /// An evaluation or scoring step.
54    Evaluator,
55    /// A caller-defined custom scope category.
56    Custom,
57    /// A fallback for unknown or unsupported scope categories.
58    Unknown,
59}
60
61impl ScopeType {
62    /// Return the stable lowercase string form used for encoded scope types.
63    pub const fn as_str(self) -> &'static str {
64        match self {
65            Self::Agent => "agent",
66            Self::Function => "function",
67            Self::Tool => "tool",
68            Self::Llm => "llm",
69            Self::Retriever => "retriever",
70            Self::Embedder => "embedder",
71            Self::Reranker => "reranker",
72            Self::Guardrail => "guardrail",
73            Self::Evaluator => "evaluator",
74            Self::Custom => "custom",
75            Self::Unknown => "unknown",
76        }
77    }
78}
79
80/// Attribute bitflags attached to a concrete handle kind.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
82pub enum HandleAttributes {
83    /// Scope-specific attributes.
84    Scope(ScopeAttributes),
85    /// Tool-specific attributes.
86    Tool(ToolAttributes),
87    /// LLM-specific attributes.
88    Llm(LlmAttributes),
89}
90
91/// Runtime-owned handle identifying an active or completed scope.
92#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
93#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
94pub struct ScopeHandle {
95    /// Unique scope identifier.
96    #[builder(default = Uuid::now_v7())]
97    pub uuid: Uuid,
98    /// Timestamp captured when the scope handle was created.
99    #[builder(default = Utc::now())]
100    pub started_at: DateTime<Utc>,
101    /// Semantic category of the scope.
102    pub scope_type: ScopeType,
103    /// Human-readable scope name.
104    #[builder(setter(into))]
105    pub name: String,
106    /// Optional application payload stored on the handle.
107    #[builder(default)]
108    pub data: Option<Json>,
109    /// Optional metadata attached to the scope.
110    #[builder(default)]
111    pub metadata: Option<Json>,
112    /// Scope behavior flags.
113    #[builder(default = ScopeAttributes::empty())]
114    pub attributes: ScopeAttributes,
115    /// UUID of the parent scope, if any.
116    #[builder(default)]
117    pub parent_uuid: Option<Uuid>,
118}
119
120/// Builder parameters for [`push_scope`].
121#[derive(TypedBuilder)]
122#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
123pub struct PushScopeParams<'a> {
124    /// Human-readable scope name recorded on emitted lifecycle events.
125    pub name: &'a str,
126    /// Semantic category for the new scope.
127    pub scope_type: ScopeType,
128    /// Optional explicit parent scope.
129    #[builder(default)]
130    pub parent: Option<&'a ScopeHandle>,
131    /// Scope attribute bitflags applied to the new scope.
132    #[builder(default = ScopeAttributes::empty())]
133    pub attributes: ScopeAttributes,
134    /// Optional application payload stored on the scope handle.
135    #[builder(default)]
136    pub data: Option<Json>,
137    /// Optional JSON metadata recorded on the emitted start event.
138    #[builder(default)]
139    pub metadata: Option<Json>,
140    /// Optional JSON payload exported as the scope start event data.
141    #[builder(default)]
142    pub input: Option<Json>,
143    /// Optional timestamp recorded on the emitted start event.
144    #[builder(default)]
145    pub timestamp: Option<DateTime<Utc>>,
146}
147
148/// Builder parameters for [`NemoFlowContextState::create_scope_handle`].
149#[derive(Debug, Clone, TypedBuilder)]
150#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
151pub struct CreateScopeHandleParams<'a> {
152    /// Human-readable scope name.
153    pub name: &'a str,
154    /// Optional parent scope UUID.
155    #[builder(default)]
156    pub parent_uuid: Option<Uuid>,
157    /// Semantic category of the scope.
158    pub scope_type: ScopeType,
159    /// Scope attribute bitflags.
160    #[builder(default = ScopeAttributes::empty())]
161    pub attributes: ScopeAttributes,
162    /// Optional application payload stored on the handle.
163    #[builder(default)]
164    pub data: Option<Json>,
165    /// Optional metadata stored on the handle.
166    #[builder(default)]
167    pub metadata: Option<Json>,
168    /// Optional timestamp captured as the handle start time and reused by the
169    /// emitted start event. When omitted, the current UTC time is used.
170    #[builder(default)]
171    pub timestamp: Option<DateTime<Utc>>,
172}
173
174/// Builder parameters for [`NemoFlowContextState::build_scope_end_event`].
175#[derive(Debug, Clone, TypedBuilder)]
176#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
177pub struct EndScopeHandleParams<'a> {
178    /// Scope handle to serialize into the emitted end event.
179    pub handle: &'a ScopeHandle,
180    /// Optional JSON payload exported as the semantic scope output.
181    #[builder(default)]
182    pub data: Option<Json>,
183    /// Optional timestamp recorded on the emitted end event. When omitted, the
184    /// runtime records the current UTC time, or one microsecond after the
185    /// handle start time if the current time is not later.
186    #[builder(default)]
187    pub timestamp: Option<DateTime<Utc>>,
188}
189
190/// Builder parameters for [`pop_scope`].
191#[derive(TypedBuilder)]
192#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
193pub struct PopScopeParams<'a> {
194    /// UUID of the scope that should be popped.
195    pub handle_uuid: &'a Uuid,
196    /// Optional JSON payload exported as the semantic scope output.
197    #[builder(default)]
198    pub output: Option<Json>,
199    /// Optional timestamp recorded on the emitted end event. When omitted, the
200    /// runtime records the current UTC time, or one microsecond after the
201    /// handle start time if the current time is not later.
202    #[builder(default)]
203    pub timestamp: Option<DateTime<Utc>>,
204}
205
206/// Builder parameters for [`event`].
207#[derive(TypedBuilder)]
208#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
209pub struct EmitMarkEventParams<'a> {
210    /// Event name to emit.
211    pub name: &'a str,
212    /// Optional explicit parent scope.
213    #[builder(default)]
214    pub parent: Option<&'a ScopeHandle>,
215    /// Optional JSON payload recorded as the mark data.
216    #[builder(default)]
217    pub data: Option<Json>,
218    /// Optional JSON metadata recorded on the emitted event.
219    #[builder(default)]
220    pub metadata: Option<Json>,
221    /// Optional timestamp recorded on the emitted mark event. When omitted, the
222    /// current UTC time is used.
223    #[builder(default)]
224    pub timestamp: Option<DateTime<Utc>>,
225}
226
227/// Return the current scope at the top of the active stack.
228///
229/// This reads the task-local or thread-local scope stack without mutating it
230/// and returns a clone of the current top-most [`ScopeHandle`].
231///
232/// # Returns
233/// A [`Result`] containing the current [`ScopeHandle`] when the runtime owner
234/// check succeeds.
235///
236/// # Errors
237/// Returns an error when the current binding has not initialized the shared
238/// runtime ownership correctly.
239pub fn get_handle() -> Result<ScopeHandle> {
240    ensure_runtime_owner()?;
241    Ok(task_scope_top())
242}
243
244/// Push a new scope onto the active scope stack.
245///
246/// This creates a new [`ScopeHandle`], emits a scope-start event to global and
247/// scope-local subscribers, and makes the new scope the current top of stack.
248///
249/// # Parameters
250/// - `name`: Human-readable scope name recorded on emitted lifecycle events.
251/// - `scope_type`: Semantic category for the new scope.
252/// - `parent`: Optional explicit parent scope. When `None`, the current top of
253///   stack is used as the parent.
254/// - `attributes`: Bitflags that modify scope behavior and observability.
255/// - `data`: Optional application payload stored on the returned handle.
256/// - `metadata`: Optional JSON metadata recorded on the emitted start event.
257/// - `input`: Optional JSON payload exported as the ATOF data payload.
258/// - `timestamp`: Optional timestamp recorded as the handle start time and on
259///   the emitted start event. When `None`, the current UTC time is used.
260///
261/// # Returns
262/// A [`Result`] containing the newly created [`ScopeHandle`].
263///
264/// # Errors
265/// Returns an error when the runtime owner check fails or when internal state
266/// cannot be read safely.
267///
268/// # Notes
269/// Scope-local subscribers attached to ancestor scopes observe the emitted
270/// start event before the function returns.
271pub fn push_scope(params: PushScopeParams<'_>) -> Result<ScopeHandle> {
272    ensure_runtime_owner()?;
273    let parent_uuid = resolve_parent_uuid(params.parent);
274    let (handle, event, subscribers) = {
275        let scope_stack = current_scope_stack();
276        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
277        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
278        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
279        let context = global_context();
280        let state = context
281            .read()
282            .map_err(|error| FlowError::Internal(error.to_string()))?;
283        let handle_params = CreateScopeHandleParams::builder()
284            .name(params.name)
285            .parent_uuid_opt(parent_uuid)
286            .scope_type(params.scope_type)
287            .attributes(params.attributes)
288            .data_opt(params.data)
289            .metadata_opt(params.metadata)
290            .timestamp_opt(params.timestamp)
291            .build();
292        let handle = state.create_scope_handle(handle_params);
293        let event = state.build_scope_start_event(&handle, params.input);
294        (handle, event, subscribers)
295    };
296    task_scope_push(handle.clone());
297    NemoFlowContextState::emit_event(&event, &subscribers);
298    Ok(handle)
299}
300
301/// Pop the current scope from the active scope stack.
302///
303/// This emits a scope-end event for the target scope and removes any
304/// scope-local registrations owned by that scope.
305///
306/// # Parameters
307/// - `handle_uuid`: UUID of the scope that should be popped.
308/// - `output`: Optional JSON payload exported as the semantic scope output.
309/// - `timestamp`: Optional timestamp recorded on the emitted end event. When
310///   `None`, the runtime uses the current UTC time, or one microsecond after
311///   the handle start time if the current time is not later.
312///
313/// # Returns
314/// A [`Result`] that is `Ok(())` when the scope was popped successfully.
315///
316/// # Errors
317/// Returns [`FlowError::InvalidArgument`] when the target scope exists but is
318/// not the current top of stack, and [`FlowError::NotFound`] when the UUID is
319/// unknown to the active stack.
320///
321/// # Notes
322/// The implicit root scope cannot be removed.
323pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
324    ensure_runtime_owner()?;
325    let scope_stack = current_scope_stack();
326    let (scope, event, subscribers) = {
327        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
328        let top = scope_guard.top();
329        if top.uuid != *params.handle_uuid {
330            if scope_guard.find(params.handle_uuid).is_some() {
331                return Err(FlowError::InvalidArgument(
332                    "scope handle is not at the top of the stack".into(),
333                ));
334            }
335            return Err(FlowError::NotFound("scope handle not found".into()));
336        }
337        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
338        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
339        let scope = top.clone();
340        let context = global_context();
341        let state = context
342            .read()
343            .map_err(|error| FlowError::Internal(error.to_string()))?;
344        let event = state.build_scope_end_event(
345            EndScopeHandleParams::builder()
346                .handle(&scope)
347                .data_opt(params.output)
348                .timestamp_opt(params.timestamp)
349                .build(),
350        );
351        (scope, event, subscribers)
352    };
353    let removed = task_scope_remove(params.handle_uuid)?;
354    debug_assert_eq!(removed.uuid, scope.uuid);
355    NemoFlowContextState::emit_event(&event, &subscribers);
356    Ok(())
357}
358
359/// Emit a standalone mark event under the current or provided scope.
360///
361/// This creates a point-in-time lifecycle event without pushing or popping a
362/// new scope.
363///
364/// # Parameters
365/// - `name`: Event name to emit.
366/// - `parent`: Optional explicit parent scope. When `None`, the current top of
367///   stack is used.
368/// - `data`: Optional JSON payload recorded on the emitted event.
369/// - `metadata`: Optional JSON metadata recorded on the emitted event.
370/// - `timestamp`: Optional timestamp recorded on the emitted mark event. When
371///   `None`, the current UTC time is used.
372///
373/// # Returns
374/// A [`Result`] that is `Ok(())` after the event has been emitted.
375///
376/// # Errors
377/// Returns an error when the runtime owner check fails or when internal state
378/// cannot be read safely.
379///
380/// # Notes
381/// Scope-local subscribers attached to ancestor scopes observe the emitted
382/// mark event just like scope, tool, and LLM lifecycle events.
383pub fn event(params: EmitMarkEventParams<'_>) -> Result<()> {
384    ensure_runtime_owner()?;
385    let parent_uuid = resolve_parent_uuid(params.parent);
386    let (event, subscribers) = {
387        let scope_stack = current_scope_stack();
388        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
389        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
390        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
391        let context = global_context();
392        let state = context
393            .read()
394            .map_err(|error| FlowError::Internal(error.to_string()))?;
395        let event = state.create_event(MarkEvent::new(
396            BaseEvent::builder()
397                .name(params.name)
398                .parent_uuid_opt(parent_uuid)
399                .timestamp(params.timestamp.unwrap_or_else(Utc::now))
400                .data_opt(params.data)
401                .metadata_opt(params.metadata)
402                .build(),
403            None,
404            None,
405        ));
406        (event, subscribers)
407    };
408    NemoFlowContextState::emit_event(&event, &subscribers);
409    Ok(())
410}