nemo-flow 0.1.0

Core Rust SDK for NeMo Flow observability, scope management, and runtime instrumentation.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::api::event::{BaseEvent, MarkEvent};
use crate::api::runtime::NemoFlowContextState;
use crate::api::runtime::global_context;
use crate::api::runtime::{
    current_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
};
use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
use crate::error::{FlowError, Result};
use crate::json::Json;
use bitflags::bitflags;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::api::llm::LlmAttributes;
use crate::api::tool::ToolAttributes;

bitflags! {
    /// Bitflags that modify scope behavior and observability.
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
    pub struct ScopeAttributes: u32 {
        /// Marks the scope as running in parallel with sibling work.
        const PARALLEL    = 0b01;
        /// Marks the scope as safe to move across execution contexts.
        const RELOCATABLE = 0b10;
    }
}

/// Semantic category attached to a scope lifecycle span.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScopeType {
    /// A top-level agent or workflow scope.
    Agent,
    /// A generic function or application step.
    Function,
    /// A tool lifecycle scope.
    Tool,
    /// An LLM lifecycle scope.
    Llm,
    /// A retrieval step such as document search.
    Retriever,
    /// An embedding generation step.
    Embedder,
    /// A reranking step.
    Reranker,
    /// A guardrail or validation step.
    Guardrail,
    /// An evaluation or scoring step.
    Evaluator,
    /// A caller-defined custom scope category.
    Custom,
    /// A fallback for unknown or unsupported scope categories.
    Unknown,
}

impl ScopeType {
    /// Return the stable lowercase string form used for encoded scope types.
    pub const fn as_str(self) -> &'static str {
        match self {
            Self::Agent => "agent",
            Self::Function => "function",
            Self::Tool => "tool",
            Self::Llm => "llm",
            Self::Retriever => "retriever",
            Self::Embedder => "embedder",
            Self::Reranker => "reranker",
            Self::Guardrail => "guardrail",
            Self::Evaluator => "evaluator",
            Self::Custom => "custom",
            Self::Unknown => "unknown",
        }
    }
}

/// Attribute bitflags attached to a concrete handle kind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HandleAttributes {
    /// Scope-specific attributes.
    Scope(ScopeAttributes),
    /// Tool-specific attributes.
    Tool(ToolAttributes),
    /// LLM-specific attributes.
    Llm(LlmAttributes),
}

/// Runtime-owned handle identifying an active or completed scope.
#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct ScopeHandle {
    /// Unique scope identifier.
    #[builder(default = Uuid::now_v7())]
    pub uuid: Uuid,
    /// Timestamp captured when the scope handle was created.
    #[builder(default = Utc::now())]
    pub started_at: DateTime<Utc>,
    /// Semantic category of the scope.
    pub scope_type: ScopeType,
    /// Human-readable scope name.
    #[builder(setter(into))]
    pub name: String,
    /// Optional application payload stored on the handle.
    #[builder(default)]
    pub data: Option<Json>,
    /// Optional metadata attached to the scope.
    #[builder(default)]
    pub metadata: Option<Json>,
    /// Scope behavior flags.
    #[builder(default = ScopeAttributes::empty())]
    pub attributes: ScopeAttributes,
    /// UUID of the parent scope, if any.
    #[builder(default)]
    pub parent_uuid: Option<Uuid>,
}

/// Builder parameters for [`push_scope`].
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct PushScopeParams<'a> {
    /// Human-readable scope name recorded on emitted lifecycle events.
    pub name: &'a str,
    /// Semantic category for the new scope.
    pub scope_type: ScopeType,
    /// Optional explicit parent scope.
    #[builder(default)]
    pub parent: Option<&'a ScopeHandle>,
    /// Scope attribute bitflags applied to the new scope.
    #[builder(default = ScopeAttributes::empty())]
    pub attributes: ScopeAttributes,
    /// Optional application payload stored on the scope handle.
    #[builder(default)]
    pub data: Option<Json>,
    /// Optional JSON metadata recorded on the emitted start event.
    #[builder(default)]
    pub metadata: Option<Json>,
    /// Optional JSON payload exported as the scope start event data.
    #[builder(default)]
    pub input: Option<Json>,
    /// Optional timestamp recorded on the emitted start event.
    #[builder(default)]
    pub timestamp: Option<DateTime<Utc>>,
}

/// Builder parameters for [`NemoFlowContextState::create_scope_handle`].
#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct CreateScopeHandleParams<'a> {
    /// Human-readable scope name.
    pub name: &'a str,
    /// Optional parent scope UUID.
    #[builder(default)]
    pub parent_uuid: Option<Uuid>,
    /// Semantic category of the scope.
    pub scope_type: ScopeType,
    /// Scope attribute bitflags.
    #[builder(default = ScopeAttributes::empty())]
    pub attributes: ScopeAttributes,
    /// Optional application payload stored on the handle.
    #[builder(default)]
    pub data: Option<Json>,
    /// Optional metadata stored on the handle.
    #[builder(default)]
    pub metadata: Option<Json>,
    /// Optional timestamp captured as the handle start time and reused by the
    /// emitted start event. When omitted, the current UTC time is used.
    #[builder(default)]
    pub timestamp: Option<DateTime<Utc>>,
}

/// Builder parameters for [`NemoFlowContextState::build_scope_end_event`].
#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct EndScopeHandleParams<'a> {
    /// Scope handle to serialize into the emitted end event.
    pub handle: &'a ScopeHandle,
    /// Optional JSON payload exported as the semantic scope output.
    #[builder(default)]
    pub data: Option<Json>,
    /// Optional timestamp recorded on the emitted end event. When omitted, the
    /// runtime records the current UTC time, or one microsecond after the
    /// handle start time if the current time is not later.
    #[builder(default)]
    pub timestamp: Option<DateTime<Utc>>,
}

/// Builder parameters for [`pop_scope`].
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct PopScopeParams<'a> {
    /// UUID of the scope that should be popped.
    pub handle_uuid: &'a Uuid,
    /// Optional JSON payload exported as the semantic scope output.
    #[builder(default)]
    pub output: Option<Json>,
    /// Optional timestamp recorded on the emitted end event. When omitted, the
    /// runtime records the current UTC time, or one microsecond after the
    /// handle start time if the current time is not later.
    #[builder(default)]
    pub timestamp: Option<DateTime<Utc>>,
}

/// Builder parameters for [`event`].
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct EmitMarkEventParams<'a> {
    /// Event name to emit.
    pub name: &'a str,
    /// Optional explicit parent scope.
    #[builder(default)]
    pub parent: Option<&'a ScopeHandle>,
    /// Optional JSON payload recorded as the mark data.
    #[builder(default)]
    pub data: Option<Json>,
    /// Optional JSON metadata recorded on the emitted event.
    #[builder(default)]
    pub metadata: Option<Json>,
    /// Optional timestamp recorded on the emitted mark event. When omitted, the
    /// current UTC time is used.
    #[builder(default)]
    pub timestamp: Option<DateTime<Utc>>,
}

/// Return the current scope at the top of the active stack.
///
/// This reads the task-local or thread-local scope stack without mutating it
/// and returns a clone of the current top-most [`ScopeHandle`].
///
/// # Returns
/// A [`Result`] containing the current [`ScopeHandle`] when the runtime owner
/// check succeeds.
///
/// # Errors
/// Returns an error when the current binding has not initialized the shared
/// runtime ownership correctly.
pub fn get_handle() -> Result<ScopeHandle> {
    ensure_runtime_owner()?;
    Ok(task_scope_top())
}

/// Push a new scope onto the active scope stack.
///
/// This creates a new [`ScopeHandle`], emits a scope-start event to global and
/// scope-local subscribers, and makes the new scope the current top of stack.
///
/// # Parameters
/// - `name`: Human-readable scope name recorded on emitted lifecycle events.
/// - `scope_type`: Semantic category for the new scope.
/// - `parent`: Optional explicit parent scope. When `None`, the current top of
///   stack is used as the parent.
/// - `attributes`: Bitflags that modify scope behavior and observability.
/// - `data`: Optional application payload stored on the returned handle.
/// - `metadata`: Optional JSON metadata recorded on the emitted start event.
/// - `input`: Optional JSON payload exported as the ATOF data payload.
/// - `timestamp`: Optional timestamp recorded as the handle start time and on
///   the emitted start event. When `None`, the current UTC time is used.
///
/// # Returns
/// A [`Result`] containing the newly created [`ScopeHandle`].
///
/// # Errors
/// Returns an error when the runtime owner check fails or when internal state
/// cannot be read safely.
///
/// # Notes
/// Scope-local subscribers attached to ancestor scopes observe the emitted
/// start event before the function returns.
pub fn push_scope(params: PushScopeParams<'_>) -> Result<ScopeHandle> {
    ensure_runtime_owner()?;
    let parent_uuid = resolve_parent_uuid(params.parent);
    let (handle, event, subscribers) = {
        let scope_stack = current_scope_stack();
        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
        let context = global_context();
        let state = context
            .read()
            .map_err(|error| FlowError::Internal(error.to_string()))?;
        let handle_params = CreateScopeHandleParams::builder()
            .name(params.name)
            .parent_uuid_opt(parent_uuid)
            .scope_type(params.scope_type)
            .attributes(params.attributes)
            .data_opt(params.data)
            .metadata_opt(params.metadata)
            .timestamp_opt(params.timestamp)
            .build();
        let handle = state.create_scope_handle(handle_params);
        let event = state.build_scope_start_event(&handle, params.input);
        (handle, event, subscribers)
    };
    task_scope_push(handle.clone());
    NemoFlowContextState::emit_event(&event, &subscribers);
    Ok(handle)
}

/// Pop the current scope from the active scope stack.
///
/// This emits a scope-end event for the target scope and removes any
/// scope-local registrations owned by that scope.
///
/// # Parameters
/// - `handle_uuid`: UUID of the scope that should be popped.
/// - `output`: Optional JSON payload exported as the semantic scope output.
/// - `timestamp`: Optional timestamp recorded on the emitted end event. When
///   `None`, the runtime uses the current UTC time, or one microsecond after
///   the handle start time if the current time is not later.
///
/// # Returns
/// A [`Result`] that is `Ok(())` when the scope was popped successfully.
///
/// # Errors
/// Returns [`FlowError::InvalidArgument`] when the target scope exists but is
/// not the current top of stack, and [`FlowError::NotFound`] when the UUID is
/// unknown to the active stack.
///
/// # Notes
/// The implicit root scope cannot be removed.
pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
    ensure_runtime_owner()?;
    let scope_stack = current_scope_stack();
    let (scope, event, subscribers) = {
        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
        let top = scope_guard.top();
        if top.uuid != *params.handle_uuid {
            if scope_guard.find(params.handle_uuid).is_some() {
                return Err(FlowError::InvalidArgument(
                    "scope handle is not at the top of the stack".into(),
                ));
            }
            return Err(FlowError::NotFound("scope handle not found".into()));
        }
        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
        let scope = top.clone();
        let context = global_context();
        let state = context
            .read()
            .map_err(|error| FlowError::Internal(error.to_string()))?;
        let event = state.build_scope_end_event(
            EndScopeHandleParams::builder()
                .handle(&scope)
                .data_opt(params.output)
                .timestamp_opt(params.timestamp)
                .build(),
        );
        (scope, event, subscribers)
    };
    let removed = task_scope_remove(params.handle_uuid)?;
    debug_assert_eq!(removed.uuid, scope.uuid);
    NemoFlowContextState::emit_event(&event, &subscribers);
    Ok(())
}

/// Emit a standalone mark event under the current or provided scope.
///
/// This creates a point-in-time lifecycle event without pushing or popping a
/// new scope.
///
/// # Parameters
/// - `name`: Event name to emit.
/// - `parent`: Optional explicit parent scope. When `None`, the current top of
///   stack is used.
/// - `data`: Optional JSON payload recorded on the emitted event.
/// - `metadata`: Optional JSON metadata recorded on the emitted event.
/// - `timestamp`: Optional timestamp recorded on the emitted mark event. When
///   `None`, the current UTC time is used.
///
/// # Returns
/// A [`Result`] that is `Ok(())` after the event has been emitted.
///
/// # Errors
/// Returns an error when the runtime owner check fails or when internal state
/// cannot be read safely.
///
/// # Notes
/// Scope-local subscribers attached to ancestor scopes observe the emitted
/// mark event just like scope, tool, and LLM lifecycle events.
pub fn event(params: EmitMarkEventParams<'_>) -> Result<()> {
    ensure_runtime_owner()?;
    let parent_uuid = resolve_parent_uuid(params.parent);
    let (event, subscribers) = {
        let scope_stack = current_scope_stack();
        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
        let context = global_context();
        let state = context
            .read()
            .map_err(|error| FlowError::Internal(error.to_string()))?;
        let event = state.create_event(MarkEvent::new(
            BaseEvent::builder()
                .name(params.name)
                .parent_uuid_opt(parent_uuid)
                .timestamp(params.timestamp.unwrap_or_else(Utc::now))
                .data_opt(params.data)
                .metadata_opt(params.metadata)
                .build(),
            None,
            None,
        ));
        (event, subscribers)
    };
    NemoFlowContextState::emit_event(&event, &subscribers);
    Ok(())
}