Skip to main content

nemo_flow/api/
scope.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::event::{BaseEvent, MarkEvent};
5use crate::api::runtime::NemoFlowContextState;
6use crate::api::runtime::global_context;
7use crate::api::runtime::{
8    current_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
9};
10use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
11use crate::error::{FlowError, Result};
12use crate::json::Json;
13use bitflags::bitflags;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use typed_builder::TypedBuilder;
17use uuid::Uuid;
18
19use crate::api::llm::LlmAttributes;
20use crate::api::tool::ToolAttributes;
21
22bitflags! {
23    /// Bitflags that modify scope behavior and observability.
24    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25    pub struct ScopeAttributes: u32 {
26        /// Marks the scope as running in parallel with sibling work.
27        const PARALLEL    = 0b01;
28        /// Marks the scope as safe to move across execution contexts.
29        const RELOCATABLE = 0b10;
30    }
31}
32
33/// Semantic category attached to a scope lifecycle span.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ScopeType {
37    /// A top-level agent or workflow scope.
38    Agent,
39    /// A generic function or application step.
40    Function,
41    /// A tool lifecycle scope.
42    Tool,
43    /// An LLM lifecycle scope.
44    Llm,
45    /// A retrieval step such as document search.
46    Retriever,
47    /// An embedding generation step.
48    Embedder,
49    /// A reranking step.
50    Reranker,
51    /// A guardrail or validation step.
52    Guardrail,
53    /// An evaluation or scoring step.
54    Evaluator,
55    /// A caller-defined custom scope category.
56    Custom,
57    /// A fallback for unknown or unsupported scope categories.
58    Unknown,
59}
60
61impl ScopeType {
62    /// Return the stable lowercase string form used for encoded scope types.
63    pub const fn as_str(self) -> &'static str {
64        match self {
65            Self::Agent => "agent",
66            Self::Function => "function",
67            Self::Tool => "tool",
68            Self::Llm => "llm",
69            Self::Retriever => "retriever",
70            Self::Embedder => "embedder",
71            Self::Reranker => "reranker",
72            Self::Guardrail => "guardrail",
73            Self::Evaluator => "evaluator",
74            Self::Custom => "custom",
75            Self::Unknown => "unknown",
76        }
77    }
78}
79
80/// Attribute bitflags attached to a concrete handle kind.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
82pub enum HandleAttributes {
83    /// Scope-specific attributes.
84    Scope(ScopeAttributes),
85    /// Tool-specific attributes.
86    Tool(ToolAttributes),
87    /// LLM-specific attributes.
88    Llm(LlmAttributes),
89}
90
91/// Runtime-owned handle identifying an active or completed scope.
92#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
93#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
94pub struct ScopeHandle {
95    /// Unique scope identifier.
96    #[builder(default = Uuid::now_v7())]
97    pub uuid: Uuid,
98    /// Timestamp captured when the scope handle was created.
99    #[builder(default = Utc::now())]
100    pub started_at: DateTime<Utc>,
101    /// Semantic category of the scope.
102    pub scope_type: ScopeType,
103    /// Human-readable scope name.
104    #[builder(setter(into))]
105    pub name: String,
106    /// Optional application payload stored on the handle.
107    #[builder(default)]
108    pub data: Option<Json>,
109    /// Optional metadata attached to the scope.
110    #[builder(default)]
111    pub metadata: Option<Json>,
112    /// Scope behavior flags.
113    #[builder(default = ScopeAttributes::empty())]
114    pub attributes: ScopeAttributes,
115    /// UUID of the parent scope, if any.
116    #[builder(default)]
117    pub parent_uuid: Option<Uuid>,
118}
119
120/// Builder parameters for [`push_scope`].
121#[derive(TypedBuilder)]
122#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
123pub struct PushScopeParams<'a> {
124    /// Human-readable scope name recorded on emitted lifecycle events.
125    pub name: &'a str,
126    /// Semantic category for the new scope.
127    pub scope_type: ScopeType,
128    /// Optional explicit parent scope.
129    #[builder(default)]
130    pub parent: Option<&'a ScopeHandle>,
131    /// Scope attribute bitflags applied to the new scope.
132    #[builder(default = ScopeAttributes::empty())]
133    pub attributes: ScopeAttributes,
134    /// Optional application payload stored on the scope handle.
135    #[builder(default)]
136    pub data: Option<Json>,
137    /// Optional JSON metadata recorded on the emitted start event.
138    #[builder(default)]
139    pub metadata: Option<Json>,
140    /// Optional JSON payload exported as the scope start event data.
141    #[builder(default)]
142    pub input: Option<Json>,
143    /// Optional timestamp recorded on the emitted start event.
144    #[builder(default)]
145    pub timestamp: Option<DateTime<Utc>>,
146}
147
148/// Builder parameters for [`NemoFlowContextState::create_scope_handle`].
149#[derive(Debug, Clone, TypedBuilder)]
150#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
151pub struct CreateScopeHandleParams<'a> {
152    /// Human-readable scope name.
153    pub name: &'a str,
154    /// Optional parent scope UUID.
155    #[builder(default)]
156    pub parent_uuid: Option<Uuid>,
157    /// Semantic category of the scope.
158    pub scope_type: ScopeType,
159    /// Scope attribute bitflags.
160    #[builder(default = ScopeAttributes::empty())]
161    pub attributes: ScopeAttributes,
162    /// Optional application payload stored on the handle.
163    #[builder(default)]
164    pub data: Option<Json>,
165    /// Optional metadata stored on the handle.
166    #[builder(default)]
167    pub metadata: Option<Json>,
168    /// Optional timestamp captured as the handle start time and reused by the
169    /// emitted start event. When omitted, the current UTC time is used.
170    #[builder(default)]
171    pub timestamp: Option<DateTime<Utc>>,
172}
173
174/// Builder parameters for [`NemoFlowContextState::build_scope_end_event`].
175#[derive(Debug, Clone, TypedBuilder)]
176#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
177pub struct EndScopeHandleParams<'a> {
178    /// Scope handle to serialize into the emitted end event.
179    pub handle: &'a ScopeHandle,
180    /// Optional JSON payload exported as the semantic scope output.
181    #[builder(default)]
182    pub data: Option<Json>,
183    /// Optional timestamp recorded on the emitted end event. When omitted, the
184    /// runtime records the current UTC time, or one microsecond after the
185    /// handle start time if the current time is not later.
186    #[builder(default)]
187    pub timestamp: Option<DateTime<Utc>>,
188}
189
190/// Builder parameters for [`pop_scope`].
191#[derive(TypedBuilder)]
192#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
193pub struct PopScopeParams<'a> {
194    /// UUID of the scope that should be popped.
195    pub handle_uuid: &'a Uuid,
196    /// Optional JSON payload exported as the semantic scope output.
197    #[builder(default)]
198    pub output: Option<Json>,
199    /// Optional timestamp recorded on the emitted end event. When omitted, the
200    /// runtime records the current UTC time, or one microsecond after the
201    /// handle start time if the current time is not later.
202    #[builder(default)]
203    pub timestamp: Option<DateTime<Utc>>,
204}
205
206/// Builder parameters for [`event`].
207#[derive(TypedBuilder)]
208#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
209pub struct EmitMarkEventParams<'a> {
210    /// Event name to emit.
211    pub name: &'a str,
212    /// Optional explicit parent scope.
213    #[builder(default)]
214    pub parent: Option<&'a ScopeHandle>,
215    /// Optional JSON payload recorded as the mark data.
216    #[builder(default)]
217    pub data: Option<Json>,
218    /// Optional JSON metadata recorded on the emitted event.
219    #[builder(default)]
220    pub metadata: Option<Json>,
221    /// Optional timestamp recorded on the emitted mark event. When omitted, the
222    /// current UTC time is used.
223    #[builder(default)]
224    pub timestamp: Option<DateTime<Utc>>,
225}
226
227/// Return the current scope at the top of the active stack.
228///
229/// This reads the task-local or thread-local scope stack without mutating it
230/// and returns a clone of the current top-most [`ScopeHandle`].
231///
232/// # Returns
233/// A [`Result`] containing the current [`ScopeHandle`] when the runtime owner
234/// check succeeds.
235///
236/// # Errors
237/// Returns an error when the current binding has not initialized the shared
238/// runtime ownership correctly.
239pub fn get_handle() -> Result<ScopeHandle> {
240    ensure_runtime_owner()?;
241    Ok(task_scope_top())
242}
243
244/// Push a new scope onto the active scope stack.
245///
246/// This creates a new [`ScopeHandle`], emits a scope-start event to global and
247/// scope-local subscribers, and makes the new scope the current top of stack.
248///
249/// # Parameters
250/// - `name`: Human-readable scope name recorded on emitted lifecycle events.
251/// - `scope_type`: Semantic category for the new scope.
252/// - `parent`: Optional explicit parent scope. When `None`, the current top of
253///   stack is used as the parent.
254/// - `attributes`: Bitflags that modify scope behavior and observability.
255/// - `data`: Optional application payload stored on the returned handle.
256/// - `metadata`: Optional JSON metadata recorded on the emitted start event.
257/// - `input`: Optional JSON payload exported as the Agent Trajectory
258///   Observability Format (ATOF) data payload.
259/// - `timestamp`: Optional timestamp recorded as the handle start time and on
260///   the emitted start event. When `None`, the current UTC time is used.
261///
262/// # Returns
263/// A [`Result`] containing the newly created [`ScopeHandle`].
264///
265/// # Errors
266/// Returns an error when the runtime owner check fails or when internal state
267/// cannot be read safely.
268///
269/// # Notes
270/// Scope-local subscribers attached to ancestor scopes observe the emitted
271/// start event before the function returns.
272pub fn push_scope(params: PushScopeParams<'_>) -> Result<ScopeHandle> {
273    ensure_runtime_owner()?;
274    let parent_uuid = resolve_parent_uuid(params.parent);
275    let (handle, event, subscribers) = {
276        let scope_stack = current_scope_stack();
277        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
278        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
279        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
280        let context = global_context();
281        let state = context
282            .read()
283            .map_err(|error| FlowError::Internal(error.to_string()))?;
284        let handle_params = CreateScopeHandleParams::builder()
285            .name(params.name)
286            .parent_uuid_opt(parent_uuid)
287            .scope_type(params.scope_type)
288            .attributes(params.attributes)
289            .data_opt(params.data)
290            .metadata_opt(params.metadata)
291            .timestamp_opt(params.timestamp)
292            .build();
293        let handle = state.create_scope_handle(handle_params);
294        let event = state.build_scope_start_event(&handle, params.input);
295        (handle, event, subscribers)
296    };
297    task_scope_push(handle.clone());
298    NemoFlowContextState::emit_event(&event, &subscribers);
299    Ok(handle)
300}
301
302/// Pop the current scope from the active scope stack.
303///
304/// This emits a scope-end event for the target scope and removes any
305/// scope-local registrations owned by that scope.
306///
307/// # Parameters
308/// - `handle_uuid`: UUID of the scope that should be popped.
309/// - `output`: Optional JSON payload exported as the semantic scope output.
310/// - `timestamp`: Optional timestamp recorded on the emitted end event. When
311///   `None`, the runtime uses the current UTC time, or one microsecond after
312///   the handle start time if the current time is not later.
313///
314/// # Returns
315/// A [`Result`] that is `Ok(())` when the scope was popped successfully.
316///
317/// # Errors
318/// Returns [`FlowError::InvalidArgument`] when the target scope exists but is
319/// not the current top of stack, and [`FlowError::NotFound`] when the UUID is
320/// unknown to the active stack.
321///
322/// # Notes
323/// The implicit root scope cannot be removed.
324pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
325    ensure_runtime_owner()?;
326    let scope_stack = current_scope_stack();
327    let (scope, event, subscribers) = {
328        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
329        let top = scope_guard.top();
330        if top.uuid != *params.handle_uuid {
331            if scope_guard.find(params.handle_uuid).is_some() {
332                return Err(FlowError::InvalidArgument(
333                    "scope handle is not at the top of the stack".into(),
334                ));
335            }
336            return Err(FlowError::NotFound("scope handle not found".into()));
337        }
338        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
339        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
340        let scope = top.clone();
341        let context = global_context();
342        let state = context
343            .read()
344            .map_err(|error| FlowError::Internal(error.to_string()))?;
345        let event = state.build_scope_end_event(
346            EndScopeHandleParams::builder()
347                .handle(&scope)
348                .data_opt(params.output)
349                .timestamp_opt(params.timestamp)
350                .build(),
351        );
352        (scope, event, subscribers)
353    };
354    let removed = task_scope_remove(params.handle_uuid)?;
355    debug_assert_eq!(removed.uuid, scope.uuid);
356    NemoFlowContextState::emit_event(&event, &subscribers);
357    Ok(())
358}
359
360/// Emit a standalone mark event under the current or provided scope.
361///
362/// This creates a point-in-time lifecycle event without pushing or popping a
363/// new scope.
364///
365/// # Parameters
366/// - `name`: Event name to emit.
367/// - `parent`: Optional explicit parent scope. When `None`, the current top of
368///   stack is used.
369/// - `data`: Optional JSON payload recorded on the emitted event.
370/// - `metadata`: Optional JSON metadata recorded on the emitted event.
371/// - `timestamp`: Optional timestamp recorded on the emitted mark event. When
372///   `None`, the current UTC time is used.
373///
374/// # Returns
375/// A [`Result`] that is `Ok(())` after the event has been emitted.
376///
377/// # Errors
378/// Returns an error when the runtime owner check fails or when internal state
379/// cannot be read safely.
380///
381/// # Notes
382/// Scope-local subscribers attached to ancestor scopes observe the emitted
383/// mark event just like scope, tool, and LLM lifecycle events.
384pub fn event(params: EmitMarkEventParams<'_>) -> Result<()> {
385    ensure_runtime_owner()?;
386    let parent_uuid = resolve_parent_uuid(params.parent);
387    let (event, subscribers) = {
388        let scope_stack = current_scope_stack();
389        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
390        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
391        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
392        let context = global_context();
393        let state = context
394            .read()
395            .map_err(|error| FlowError::Internal(error.to_string()))?;
396        let event = state.create_event(MarkEvent::new(
397            BaseEvent::builder()
398                .name(params.name)
399                .parent_uuid_opt(parent_uuid)
400                .timestamp(params.timestamp.unwrap_or_else(Utc::now))
401                .data_opt(params.data)
402                .metadata_opt(params.metadata)
403                .build(),
404            None,
405            None,
406        ));
407        (event, subscribers)
408    };
409    NemoFlowContextState::emit_event(&event, &subscribers);
410    Ok(())
411}