nemo_flow/api/scope.rs
1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::event::{BaseEvent, MarkEvent};
5use crate::api::runtime::NemoFlowContextState;
6use crate::api::runtime::global_context;
7use crate::api::runtime::{
8 current_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
9};
10use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
11use crate::error::{FlowError, Result};
12use crate::json::Json;
13use bitflags::bitflags;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use typed_builder::TypedBuilder;
17use uuid::Uuid;
18
19use crate::api::llm::LlmAttributes;
20use crate::api::tool::ToolAttributes;
21
22bitflags! {
23 /// Bitflags that modify scope behavior and observability.
24 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25 pub struct ScopeAttributes: u32 {
26 /// Marks the scope as running in parallel with sibling work.
27 const PARALLEL = 0b01;
28 /// Marks the scope as safe to move across execution contexts.
29 const RELOCATABLE = 0b10;
30 }
31}
32
33/// Semantic category attached to a scope lifecycle span.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ScopeType {
37 /// A top-level agent or workflow scope.
38 Agent,
39 /// A generic function or application step.
40 Function,
41 /// A tool lifecycle scope.
42 Tool,
43 /// An LLM lifecycle scope.
44 Llm,
45 /// A retrieval step such as document search.
46 Retriever,
47 /// An embedding generation step.
48 Embedder,
49 /// A reranking step.
50 Reranker,
51 /// A guardrail or validation step.
52 Guardrail,
53 /// An evaluation or scoring step.
54 Evaluator,
55 /// A caller-defined custom scope category.
56 Custom,
57 /// A fallback for unknown or unsupported scope categories.
58 Unknown,
59}
60
61impl ScopeType {
62 /// Return the stable lowercase string form used for encoded scope types.
63 pub const fn as_str(self) -> &'static str {
64 match self {
65 Self::Agent => "agent",
66 Self::Function => "function",
67 Self::Tool => "tool",
68 Self::Llm => "llm",
69 Self::Retriever => "retriever",
70 Self::Embedder => "embedder",
71 Self::Reranker => "reranker",
72 Self::Guardrail => "guardrail",
73 Self::Evaluator => "evaluator",
74 Self::Custom => "custom",
75 Self::Unknown => "unknown",
76 }
77 }
78}
79
80/// Attribute bitflags attached to a concrete handle kind.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
82pub enum HandleAttributes {
83 /// Scope-specific attributes.
84 Scope(ScopeAttributes),
85 /// Tool-specific attributes.
86 Tool(ToolAttributes),
87 /// LLM-specific attributes.
88 Llm(LlmAttributes),
89}
90
91/// Runtime-owned handle identifying an active or completed scope.
92#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
93#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
94pub struct ScopeHandle {
95 /// Unique scope identifier.
96 #[builder(default = Uuid::now_v7())]
97 pub uuid: Uuid,
98 /// Timestamp captured when the scope handle was created.
99 #[builder(default = Utc::now())]
100 pub started_at: DateTime<Utc>,
101 /// Semantic category of the scope.
102 pub scope_type: ScopeType,
103 /// Human-readable scope name.
104 #[builder(setter(into))]
105 pub name: String,
106 /// Optional application payload stored on the handle.
107 #[builder(default)]
108 pub data: Option<Json>,
109 /// Optional metadata attached to the scope.
110 #[builder(default)]
111 pub metadata: Option<Json>,
112 /// Scope behavior flags.
113 #[builder(default = ScopeAttributes::empty())]
114 pub attributes: ScopeAttributes,
115 /// UUID of the parent scope, if any.
116 #[builder(default)]
117 pub parent_uuid: Option<Uuid>,
118}
119
120/// Builder parameters for [`push_scope`].
121#[derive(TypedBuilder)]
122#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
123pub struct PushScopeParams<'a> {
124 /// Human-readable scope name recorded on emitted lifecycle events.
125 pub name: &'a str,
126 /// Semantic category for the new scope.
127 pub scope_type: ScopeType,
128 /// Optional explicit parent scope.
129 #[builder(default)]
130 pub parent: Option<&'a ScopeHandle>,
131 /// Scope attribute bitflags applied to the new scope.
132 #[builder(default = ScopeAttributes::empty())]
133 pub attributes: ScopeAttributes,
134 /// Optional application payload stored on the scope handle.
135 #[builder(default)]
136 pub data: Option<Json>,
137 /// Optional JSON metadata recorded on the emitted start event.
138 #[builder(default)]
139 pub metadata: Option<Json>,
140 /// Optional JSON payload exported as the scope start event data.
141 #[builder(default)]
142 pub input: Option<Json>,
143 /// Optional timestamp recorded on the emitted start event.
144 #[builder(default)]
145 pub timestamp: Option<DateTime<Utc>>,
146}
147
148/// Builder parameters for [`NemoFlowContextState::create_scope_handle`].
149#[derive(Debug, Clone, TypedBuilder)]
150#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
151pub struct CreateScopeHandleParams<'a> {
152 /// Human-readable scope name.
153 pub name: &'a str,
154 /// Optional parent scope UUID.
155 #[builder(default)]
156 pub parent_uuid: Option<Uuid>,
157 /// Semantic category of the scope.
158 pub scope_type: ScopeType,
159 /// Scope attribute bitflags.
160 #[builder(default = ScopeAttributes::empty())]
161 pub attributes: ScopeAttributes,
162 /// Optional application payload stored on the handle.
163 #[builder(default)]
164 pub data: Option<Json>,
165 /// Optional metadata stored on the handle.
166 #[builder(default)]
167 pub metadata: Option<Json>,
168 /// Optional timestamp captured as the handle start time and reused by the
169 /// emitted start event. When omitted, the current UTC time is used.
170 #[builder(default)]
171 pub timestamp: Option<DateTime<Utc>>,
172}
173
174/// Builder parameters for [`NemoFlowContextState::build_scope_end_event`].
175#[derive(Debug, Clone, TypedBuilder)]
176#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
177pub struct EndScopeHandleParams<'a> {
178 /// Scope handle to serialize into the emitted end event.
179 pub handle: &'a ScopeHandle,
180 /// Optional JSON payload exported as the semantic scope output.
181 #[builder(default)]
182 pub data: Option<Json>,
183 /// Optional timestamp recorded on the emitted end event. When omitted, the
184 /// runtime records the current UTC time, or one microsecond after the
185 /// handle start time if the current time is not later.
186 #[builder(default)]
187 pub timestamp: Option<DateTime<Utc>>,
188}
189
190/// Builder parameters for [`pop_scope`].
191#[derive(TypedBuilder)]
192#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
193pub struct PopScopeParams<'a> {
194 /// UUID of the scope that should be popped.
195 pub handle_uuid: &'a Uuid,
196 /// Optional JSON payload exported as the semantic scope output.
197 #[builder(default)]
198 pub output: Option<Json>,
199 /// Optional timestamp recorded on the emitted end event. When omitted, the
200 /// runtime records the current UTC time, or one microsecond after the
201 /// handle start time if the current time is not later.
202 #[builder(default)]
203 pub timestamp: Option<DateTime<Utc>>,
204}
205
206/// Builder parameters for [`event`].
207#[derive(TypedBuilder)]
208#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
209pub struct EmitMarkEventParams<'a> {
210 /// Event name to emit.
211 pub name: &'a str,
212 /// Optional explicit parent scope.
213 #[builder(default)]
214 pub parent: Option<&'a ScopeHandle>,
215 /// Optional JSON payload recorded as the mark data.
216 #[builder(default)]
217 pub data: Option<Json>,
218 /// Optional JSON metadata recorded on the emitted event.
219 #[builder(default)]
220 pub metadata: Option<Json>,
221 /// Optional timestamp recorded on the emitted mark event. When omitted, the
222 /// current UTC time is used.
223 #[builder(default)]
224 pub timestamp: Option<DateTime<Utc>>,
225}
226
227/// Return the current scope at the top of the active stack.
228///
229/// This reads the task-local or thread-local scope stack without mutating it
230/// and returns a clone of the current top-most [`ScopeHandle`].
231///
232/// # Returns
233/// A [`Result`] containing the current [`ScopeHandle`] when the runtime owner
234/// check succeeds.
235///
236/// # Errors
237/// Returns an error when the current binding has not initialized the shared
238/// runtime ownership correctly.
239pub fn get_handle() -> Result<ScopeHandle> {
240 ensure_runtime_owner()?;
241 Ok(task_scope_top())
242}
243
244/// Push a new scope onto the active scope stack.
245///
246/// This creates a new [`ScopeHandle`], emits a scope-start event to global and
247/// scope-local subscribers, and makes the new scope the current top of stack.
248///
249/// # Parameters
250/// - `name`: Human-readable scope name recorded on emitted lifecycle events.
251/// - `scope_type`: Semantic category for the new scope.
252/// - `parent`: Optional explicit parent scope. When `None`, the current top of
253/// stack is used as the parent.
254/// - `attributes`: Bitflags that modify scope behavior and observability.
255/// - `data`: Optional application payload stored on the returned handle.
256/// - `metadata`: Optional JSON metadata recorded on the emitted start event.
257/// - `input`: Optional JSON payload exported as the ATOF data payload.
258/// - `timestamp`: Optional timestamp recorded as the handle start time and on
259/// the emitted start event. When `None`, the current UTC time is used.
260///
261/// # Returns
262/// A [`Result`] containing the newly created [`ScopeHandle`].
263///
264/// # Errors
265/// Returns an error when the runtime owner check fails or when internal state
266/// cannot be read safely.
267///
268/// # Notes
269/// Scope-local subscribers attached to ancestor scopes observe the emitted
270/// start event before the function returns.
271pub fn push_scope(params: PushScopeParams<'_>) -> Result<ScopeHandle> {
272 ensure_runtime_owner()?;
273 let parent_uuid = resolve_parent_uuid(params.parent);
274 let (handle, event, subscribers) = {
275 let scope_stack = current_scope_stack();
276 let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
277 let scope_subscribers = scope_guard.collect_scope_local_subscribers();
278 let subscribers = snapshot_event_subscribers(scope_subscribers)?;
279 let context = global_context();
280 let state = context
281 .read()
282 .map_err(|error| FlowError::Internal(error.to_string()))?;
283 let handle_params = CreateScopeHandleParams::builder()
284 .name(params.name)
285 .parent_uuid_opt(parent_uuid)
286 .scope_type(params.scope_type)
287 .attributes(params.attributes)
288 .data_opt(params.data)
289 .metadata_opt(params.metadata)
290 .timestamp_opt(params.timestamp)
291 .build();
292 let handle = state.create_scope_handle(handle_params);
293 let event = state.build_scope_start_event(&handle, params.input);
294 (handle, event, subscribers)
295 };
296 task_scope_push(handle.clone());
297 NemoFlowContextState::emit_event(&event, &subscribers);
298 Ok(handle)
299}
300
301/// Pop the current scope from the active scope stack.
302///
303/// This emits a scope-end event for the target scope and removes any
304/// scope-local registrations owned by that scope.
305///
306/// # Parameters
307/// - `handle_uuid`: UUID of the scope that should be popped.
308/// - `output`: Optional JSON payload exported as the semantic scope output.
309/// - `timestamp`: Optional timestamp recorded on the emitted end event. When
310/// `None`, the runtime uses the current UTC time, or one microsecond after
311/// the handle start time if the current time is not later.
312///
313/// # Returns
314/// A [`Result`] that is `Ok(())` when the scope was popped successfully.
315///
316/// # Errors
317/// Returns [`FlowError::InvalidArgument`] when the target scope exists but is
318/// not the current top of stack, and [`FlowError::NotFound`] when the UUID is
319/// unknown to the active stack.
320///
321/// # Notes
322/// The implicit root scope cannot be removed.
323pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
324 ensure_runtime_owner()?;
325 let scope_stack = current_scope_stack();
326 let (scope, event, subscribers) = {
327 let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
328 let top = scope_guard.top();
329 if top.uuid != *params.handle_uuid {
330 if scope_guard.find(params.handle_uuid).is_some() {
331 return Err(FlowError::InvalidArgument(
332 "scope handle is not at the top of the stack".into(),
333 ));
334 }
335 return Err(FlowError::NotFound("scope handle not found".into()));
336 }
337 let scope_subscribers = scope_guard.collect_scope_local_subscribers();
338 let subscribers = snapshot_event_subscribers(scope_subscribers)?;
339 let scope = top.clone();
340 let context = global_context();
341 let state = context
342 .read()
343 .map_err(|error| FlowError::Internal(error.to_string()))?;
344 let event = state.build_scope_end_event(
345 EndScopeHandleParams::builder()
346 .handle(&scope)
347 .data_opt(params.output)
348 .timestamp_opt(params.timestamp)
349 .build(),
350 );
351 (scope, event, subscribers)
352 };
353 let removed = task_scope_remove(params.handle_uuid)?;
354 debug_assert_eq!(removed.uuid, scope.uuid);
355 NemoFlowContextState::emit_event(&event, &subscribers);
356 Ok(())
357}
358
359/// Emit a standalone mark event under the current or provided scope.
360///
361/// This creates a point-in-time lifecycle event without pushing or popping a
362/// new scope.
363///
364/// # Parameters
365/// - `name`: Event name to emit.
366/// - `parent`: Optional explicit parent scope. When `None`, the current top of
367/// stack is used.
368/// - `data`: Optional JSON payload recorded on the emitted event.
369/// - `metadata`: Optional JSON metadata recorded on the emitted event.
370/// - `timestamp`: Optional timestamp recorded on the emitted mark event. When
371/// `None`, the current UTC time is used.
372///
373/// # Returns
374/// A [`Result`] that is `Ok(())` after the event has been emitted.
375///
376/// # Errors
377/// Returns an error when the runtime owner check fails or when internal state
378/// cannot be read safely.
379///
380/// # Notes
381/// Scope-local subscribers attached to ancestor scopes observe the emitted
382/// mark event just like scope, tool, and LLM lifecycle events.
383pub fn event(params: EmitMarkEventParams<'_>) -> Result<()> {
384 ensure_runtime_owner()?;
385 let parent_uuid = resolve_parent_uuid(params.parent);
386 let (event, subscribers) = {
387 let scope_stack = current_scope_stack();
388 let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
389 let scope_subscribers = scope_guard.collect_scope_local_subscribers();
390 let subscribers = snapshot_event_subscribers(scope_subscribers)?;
391 let context = global_context();
392 let state = context
393 .read()
394 .map_err(|error| FlowError::Internal(error.to_string()))?;
395 let event = state.create_event(MarkEvent::new(
396 BaseEvent::builder()
397 .name(params.name)
398 .parent_uuid_opt(parent_uuid)
399 .timestamp(params.timestamp.unwrap_or_else(Utc::now))
400 .data_opt(params.data)
401 .metadata_opt(params.metadata)
402 .build(),
403 None,
404 None,
405 ));
406 (event, subscribers)
407 };
408 NemoFlowContextState::emit_event(&event, &subscribers);
409 Ok(())
410}