use crate::api::event::{BaseEvent, MarkEvent};
use crate::api::runtime::NemoFlowContextState;
use crate::api::runtime::global_context;
use crate::api::runtime::{
current_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
};
use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
use crate::error::{FlowError, Result};
use crate::json::Json;
use bitflags::bitflags;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::api::llm::LlmAttributes;
use crate::api::tool::ToolAttributes;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ScopeAttributes: u32 {
const PARALLEL = 0b01;
const RELOCATABLE = 0b10;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScopeType {
Agent,
Function,
Tool,
Llm,
Retriever,
Embedder,
Reranker,
Guardrail,
Evaluator,
Custom,
Unknown,
}
impl ScopeType {
pub const fn as_str(self) -> &'static str {
match self {
Self::Agent => "agent",
Self::Function => "function",
Self::Tool => "tool",
Self::Llm => "llm",
Self::Retriever => "retriever",
Self::Embedder => "embedder",
Self::Reranker => "reranker",
Self::Guardrail => "guardrail",
Self::Evaluator => "evaluator",
Self::Custom => "custom",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HandleAttributes {
Scope(ScopeAttributes),
Tool(ToolAttributes),
Llm(LlmAttributes),
}
#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct ScopeHandle {
#[builder(default = Uuid::now_v7())]
pub uuid: Uuid,
#[builder(default = Utc::now())]
pub started_at: DateTime<Utc>,
pub scope_type: ScopeType,
#[builder(setter(into))]
pub name: String,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub metadata: Option<Json>,
#[builder(default = ScopeAttributes::empty())]
pub attributes: ScopeAttributes,
#[builder(default)]
pub parent_uuid: Option<Uuid>,
}
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct PushScopeParams<'a> {
pub name: &'a str,
pub scope_type: ScopeType,
#[builder(default)]
pub parent: Option<&'a ScopeHandle>,
#[builder(default = ScopeAttributes::empty())]
pub attributes: ScopeAttributes,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub metadata: Option<Json>,
#[builder(default)]
pub input: Option<Json>,
#[builder(default)]
pub timestamp: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct CreateScopeHandleParams<'a> {
pub name: &'a str,
#[builder(default)]
pub parent_uuid: Option<Uuid>,
pub scope_type: ScopeType,
#[builder(default = ScopeAttributes::empty())]
pub attributes: ScopeAttributes,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub metadata: Option<Json>,
#[builder(default)]
pub timestamp: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct EndScopeHandleParams<'a> {
pub handle: &'a ScopeHandle,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub timestamp: Option<DateTime<Utc>>,
}
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct PopScopeParams<'a> {
pub handle_uuid: &'a Uuid,
#[builder(default)]
pub output: Option<Json>,
#[builder(default)]
pub timestamp: Option<DateTime<Utc>>,
}
#[derive(TypedBuilder)]
#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct EmitMarkEventParams<'a> {
pub name: &'a str,
#[builder(default)]
pub parent: Option<&'a ScopeHandle>,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub metadata: Option<Json>,
#[builder(default)]
pub timestamp: Option<DateTime<Utc>>,
}
pub fn get_handle() -> Result<ScopeHandle> {
ensure_runtime_owner()?;
Ok(task_scope_top())
}
pub fn push_scope(params: PushScopeParams<'_>) -> Result<ScopeHandle> {
ensure_runtime_owner()?;
let parent_uuid = resolve_parent_uuid(params.parent);
let (handle, event, subscribers) = {
let scope_stack = current_scope_stack();
let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
let scope_subscribers = scope_guard.collect_scope_local_subscribers();
let subscribers = snapshot_event_subscribers(scope_subscribers)?;
let context = global_context();
let state = context
.read()
.map_err(|error| FlowError::Internal(error.to_string()))?;
let handle_params = CreateScopeHandleParams::builder()
.name(params.name)
.parent_uuid_opt(parent_uuid)
.scope_type(params.scope_type)
.attributes(params.attributes)
.data_opt(params.data)
.metadata_opt(params.metadata)
.timestamp_opt(params.timestamp)
.build();
let handle = state.create_scope_handle(handle_params);
let event = state.build_scope_start_event(&handle, params.input);
(handle, event, subscribers)
};
task_scope_push(handle.clone());
NemoFlowContextState::emit_event(&event, &subscribers);
Ok(handle)
}
pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
ensure_runtime_owner()?;
let scope_stack = current_scope_stack();
let (scope, event, subscribers) = {
let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
let top = scope_guard.top();
if top.uuid != *params.handle_uuid {
if scope_guard.find(params.handle_uuid).is_some() {
return Err(FlowError::InvalidArgument(
"scope handle is not at the top of the stack".into(),
));
}
return Err(FlowError::NotFound("scope handle not found".into()));
}
let scope_subscribers = scope_guard.collect_scope_local_subscribers();
let subscribers = snapshot_event_subscribers(scope_subscribers)?;
let scope = top.clone();
let context = global_context();
let state = context
.read()
.map_err(|error| FlowError::Internal(error.to_string()))?;
let event = state.build_scope_end_event(
EndScopeHandleParams::builder()
.handle(&scope)
.data_opt(params.output)
.timestamp_opt(params.timestamp)
.build(),
);
(scope, event, subscribers)
};
let removed = task_scope_remove(params.handle_uuid)?;
debug_assert_eq!(removed.uuid, scope.uuid);
NemoFlowContextState::emit_event(&event, &subscribers);
Ok(())
}
pub fn event(params: EmitMarkEventParams<'_>) -> Result<()> {
ensure_runtime_owner()?;
let parent_uuid = resolve_parent_uuid(params.parent);
let (event, subscribers) = {
let scope_stack = current_scope_stack();
let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
let scope_subscribers = scope_guard.collect_scope_local_subscribers();
let subscribers = snapshot_event_subscribers(scope_subscribers)?;
let context = global_context();
let state = context
.read()
.map_err(|error| FlowError::Internal(error.to_string()))?;
let event = state.create_event(MarkEvent::new(
BaseEvent::builder()
.name(params.name)
.parent_uuid_opt(parent_uuid)
.timestamp(params.timestamp.unwrap_or_else(Utc::now))
.data_opt(params.data)
.metadata_opt(params.metadata)
.build(),
None,
None,
));
(event, subscribers)
};
NemoFlowContextState::emit_event(&event, &subscribers);
Ok(())
}