Skip to main content

nemo_flow/api/
event.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Event types for Agent Trajectory Observability Format (ATOF) runtime events.
17
18use std::collections::BTreeMap;
19use std::sync::Arc;
20
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23use typed_builder::TypedBuilder;
24use uuid::Uuid;
25
26use crate::api::llm::LlmAttributes;
27use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
28use crate::api::tool::ToolAttributes;
29use crate::codec::request::AnnotatedLlmRequest;
30use crate::codec::response::AnnotatedLlmResponse;
31use crate::json::Json;
32
33/// Agent Trajectory Observability Format (ATOF) protocol version emitted by this runtime.
34pub const ATOF_VERSION: &str = "0.1";
35
36/// Identifier for the schema that describes an event's opaque `data` payload.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
38#[builder(field_defaults(setter(into)))]
39pub struct DataSchema {
40    /// Schema name.
41    pub name: String,
42    /// Schema version.
43    pub version: String,
44}
45
46/// Semantic category carried by ATOF `category`.
47///
48/// This is intentionally string-backed so consumers can preserve category
49/// values from newer producers without failing deserialization.
50#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
51#[serde(transparent)]
52pub struct EventCategory(String);
53
54impl EventCategory {
55    /// Top-level agent or workflow scope.
56    pub fn agent() -> Self {
57        Self("agent".into())
58    }
59
60    /// Generic function or application step.
61    pub fn function() -> Self {
62        Self("function".into())
63    }
64
65    /// LLM call.
66    pub fn llm() -> Self {
67        Self("llm".into())
68    }
69
70    /// Tool invocation.
71    pub fn tool() -> Self {
72        Self("tool".into())
73    }
74
75    /// Retrieval step.
76    pub fn retriever() -> Self {
77        Self("retriever".into())
78    }
79
80    /// Embedding-generation step.
81    pub fn embedder() -> Self {
82        Self("embedder".into())
83    }
84
85    /// Result reranking step.
86    pub fn reranker() -> Self {
87        Self("reranker".into())
88    }
89
90    /// Guardrail or validation step.
91    pub fn guardrail() -> Self {
92        Self("guardrail".into())
93    }
94
95    /// Evaluation or scoring step.
96    pub fn evaluator() -> Self {
97        Self("evaluator".into())
98    }
99
100    /// Vendor-defined custom category.
101    pub fn custom() -> Self {
102        Self("custom".into())
103    }
104
105    /// Unknown or unclassified work.
106    pub fn unknown() -> Self {
107        Self("unknown".into())
108    }
109
110    /// Create a category from an arbitrary producer-provided string.
111    pub fn new(value: impl Into<String>) -> Self {
112        Self(value.into())
113    }
114
115    /// Return the string form serialized on the wire.
116    pub fn as_str(&self) -> &str {
117        self.0.as_str()
118    }
119
120    /// Convert this category to the closest legacy scope type for internal
121    /// adapters that still need span-kind classification.
122    pub fn to_scope_type(&self) -> ScopeType {
123        match self.as_str() {
124            "agent" => ScopeType::Agent,
125            "function" => ScopeType::Function,
126            "tool" => ScopeType::Tool,
127            "llm" => ScopeType::Llm,
128            "retriever" => ScopeType::Retriever,
129            "embedder" => ScopeType::Embedder,
130            "reranker" => ScopeType::Reranker,
131            "guardrail" => ScopeType::Guardrail,
132            "evaluator" => ScopeType::Evaluator,
133            "custom" => ScopeType::Custom,
134            _ => ScopeType::Unknown,
135        }
136    }
137}
138
139impl From<ScopeType> for EventCategory {
140    fn from(value: ScopeType) -> Self {
141        match value {
142            ScopeType::Agent => Self::agent(),
143            ScopeType::Function => Self::function(),
144            ScopeType::Tool => Self::tool(),
145            ScopeType::Llm => Self::llm(),
146            ScopeType::Retriever => Self::retriever(),
147            ScopeType::Embedder => Self::embedder(),
148            ScopeType::Reranker => Self::reranker(),
149            ScopeType::Guardrail => Self::guardrail(),
150            ScopeType::Evaluator => Self::evaluator(),
151            ScopeType::Custom => Self::custom(),
152            ScopeType::Unknown => Self::unknown(),
153        }
154    }
155}
156
157impl From<&EventCategory> for ScopeType {
158    fn from(value: &EventCategory) -> Self {
159        value.to_scope_type()
160    }
161}
162
163/// Agent Trajectory Observability Format (ATOF) lifecycle phase for a scope event.
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "lowercase")]
166pub enum ScopeCategory {
167    /// Scope was entered.
168    Start,
169    /// Scope was exited.
170    End,
171}
172
173/// Category-specific profile data.
174///
175/// Unknown wire keys are preserved in `extra`. LLM annotations are runtime-only
176/// enrichment used by internal adaptive and Agent Trajectory Interchange Format
177/// (ATIF) logic and are never serialized.
178#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypedBuilder)]
179#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
180pub struct CategoryProfile {
181    /// Normalized model identifier for LLM events.
182    #[builder(default)]
183    #[serde(default, skip_serializing_if = "Option::is_none")]
184    pub model_name: Option<String>,
185
186    /// LLM-provider correlation ID for Tool events.
187    #[builder(default)]
188    #[serde(default, skip_serializing_if = "Option::is_none")]
189    pub tool_call_id: Option<String>,
190
191    /// Vendor subtype required when `category == "custom"`.
192    #[builder(default)]
193    #[serde(default, skip_serializing_if = "Option::is_none")]
194    pub subtype: Option<String>,
195
196    /// Unknown category-profile keys preserved from newer producers.
197    #[builder(default)]
198    #[serde(flatten)]
199    pub extra: BTreeMap<String, Json>,
200
201    /// Normalized request annotation for LLM start events.
202    #[builder(default)]
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
205
206    /// Normalized response annotation for LLM end events.
207    #[builder(default)]
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
210}
211
212impl CategoryProfile {
213    /// Return true when the profile has no wire-serialized fields.
214    pub fn is_wire_empty(&self) -> bool {
215        self.model_name.is_none()
216            && self.tool_call_id.is_none()
217            && self.subtype.is_none()
218            && self.extra.is_empty()
219    }
220}
221
222/// Shared event metadata carried by every ATOF event.
223#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
224#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
225pub struct BaseEvent {
226    /// ATOF protocol version.
227    #[builder(default = ATOF_VERSION.to_string())]
228    pub atof_version: String,
229    /// UUID of the parent scope, if any.
230    #[builder(default)]
231    pub parent_uuid: Option<Uuid>,
232    /// Unique identifier for the event or span.
233    #[builder(default = Uuid::now_v7())]
234    pub uuid: Uuid,
235    /// Event timestamp in UTC.
236    #[builder(default = Utc::now())]
237    #[serde(with = "timestamp")]
238    pub timestamp: DateTime<Utc>,
239    /// Human-readable event name.
240    pub name: String,
241    /// Application-defined payload.
242    #[builder(default)]
243    pub data: Option<Json>,
244    /// Optional schema identifier for `data`.
245    #[builder(default)]
246    pub data_schema: Option<DataSchema>,
247    /// Optional tracing/correlation metadata.
248    #[builder(default)]
249    pub metadata: Option<Json>,
250}
251
252/// ATOF scope lifecycle event.
253#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
254#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
255pub struct ScopeEvent {
256    /// Shared ATOF envelope.
257    #[serde(flatten)]
258    #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
259    pub base: BaseEvent,
260    /// Scope lifecycle phase.
261    pub scope_category: ScopeCategory,
262    /// Canonical lowercase behavioral flags.
263    #[builder(default)]
264    pub attributes: Vec<String>,
265    /// Semantic category of work.
266    pub category: EventCategory,
267    /// Category-specific typed fields.
268    #[builder(default)]
269    pub category_profile: Option<CategoryProfile>,
270}
271
272impl ScopeEvent {
273    /// Construct a scope event from a base envelope and ATOF-specific fields.
274    pub fn new(
275        base: BaseEvent,
276        scope_category: ScopeCategory,
277        attributes: Vec<String>,
278        category: EventCategory,
279        category_profile: Option<CategoryProfile>,
280    ) -> Self {
281        Self {
282            base,
283            scope_category,
284            attributes: canonicalize_attributes(attributes),
285            category,
286            category_profile,
287        }
288    }
289}
290
291/// ATOF point-in-time mark event.
292#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
293#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
294pub struct MarkEvent {
295    /// Shared ATOF envelope.
296    #[serde(flatten)]
297    #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
298    pub base: BaseEvent,
299    /// Optional semantic category for the checkpoint.
300    #[builder(default)]
301    pub category: Option<EventCategory>,
302    /// Optional category-specific typed fields.
303    #[builder(default)]
304    pub category_profile: Option<CategoryProfile>,
305}
306
307impl MarkEvent {
308    /// Construct a mark event from a base envelope and optional category data.
309    pub fn new(
310        base: BaseEvent,
311        category: Option<EventCategory>,
312        category_profile: Option<CategoryProfile>,
313    ) -> Self {
314        Self {
315            base,
316            category,
317            category_profile,
318        }
319    }
320}
321
322/// Tagged union covering the two ATOF event kinds emitted by the runtime.
323#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
324#[serde(tag = "kind", rename_all = "lowercase")]
325pub enum Event {
326    /// Scope lifecycle event.
327    Scope(ScopeEvent),
328    /// Point-in-time checkpoint event.
329    Mark(MarkEvent),
330}
331
332impl Event {
333    /// Return the ATOF event kind.
334    pub fn kind(&self) -> &'static str {
335        match self {
336            Self::Scope(_) => "scope",
337            Self::Mark(_) => "mark",
338        }
339    }
340
341    /// Return the lifecycle phase for scope events.
342    pub fn scope_category(&self) -> Option<ScopeCategory> {
343        match self {
344            Self::Scope(event) => Some(event.scope_category),
345            Self::Mark(_) => None,
346        }
347    }
348
349    /// Return the semantic category if present.
350    pub fn category(&self) -> Option<&EventCategory> {
351        match self {
352            Self::Scope(event) => Some(&event.category),
353            Self::Mark(event) => event.category.as_ref(),
354        }
355    }
356
357    /// Return the category-specific profile if present.
358    pub fn category_profile(&self) -> Option<&CategoryProfile> {
359        match self {
360            Self::Scope(event) => event.category_profile.as_ref(),
361            Self::Mark(event) => event.category_profile.as_ref(),
362        }
363    }
364
365    /// Return the mutable category-specific profile if present.
366    pub fn category_profile_mut(&mut self) -> Option<&mut CategoryProfile> {
367        match self {
368            Self::Scope(event) => event.category_profile.as_mut(),
369            Self::Mark(event) => event.category_profile.as_mut(),
370        }
371    }
372
373    /// Return the parent scope UUID, if the event is nested under a scope.
374    pub fn parent_uuid(&self) -> Option<Uuid> {
375        self.base().parent_uuid
376    }
377
378    /// Return the unique event or span UUID.
379    pub fn uuid(&self) -> Uuid {
380        self.base().uuid
381    }
382
383    /// Return the event timestamp.
384    pub fn timestamp(&self) -> &DateTime<Utc> {
385        &self.base().timestamp
386    }
387
388    /// Return the human-readable event name.
389    pub fn name(&self) -> &str {
390        self.base().name.as_str()
391    }
392
393    /// Return the optional application payload attached to the event.
394    pub fn data(&self) -> Option<&Json> {
395        self.base().data.as_ref()
396    }
397
398    /// Return the optional data schema.
399    pub fn data_schema(&self) -> Option<&DataSchema> {
400        self.base().data_schema.as_ref()
401    }
402
403    /// Return the optional metadata attached to the event.
404    pub fn metadata(&self) -> Option<&Json> {
405        self.base().metadata.as_ref()
406    }
407
408    /// Return attributes for scope events.
409    pub fn attributes(&self) -> Option<&[String]> {
410        match self {
411            Self::Scope(event) => Some(event.attributes.as_slice()),
412            Self::Mark(_) => None,
413        }
414    }
415
416    /// Return the semantic scope category for scope events.
417    pub fn scope_type(&self) -> Option<ScopeType> {
418        self.category().map(EventCategory::to_scope_type)
419    }
420
421    /// Return the semantic input payload for start events.
422    pub fn input(&self) -> Option<&Json> {
423        match self {
424            Self::Scope(event) if event.scope_category == ScopeCategory::Start => {
425                event.base.data.as_ref()
426            }
427            _ => None,
428        }
429    }
430
431    /// Return the semantic output payload for end events.
432    pub fn output(&self) -> Option<&Json> {
433        match self {
434            Self::Scope(event) if event.scope_category == ScopeCategory::End => {
435                event.base.data.as_ref()
436            }
437            _ => None,
438        }
439    }
440
441    /// Return the normalized model name for LLM events.
442    pub fn model_name(&self) -> Option<&str> {
443        self.category_profile()
444            .and_then(|profile| profile.model_name.as_deref())
445    }
446
447    /// Return the provider-specific tool-call correlation identifier.
448    pub fn tool_call_id(&self) -> Option<&str> {
449        self.category_profile()
450            .and_then(|profile| profile.tool_call_id.as_deref())
451    }
452
453    /// Return the runtime-only annotated LLM request.
454    pub fn annotated_request(&self) -> Option<&Arc<AnnotatedLlmRequest>> {
455        self.category_profile()
456            .and_then(|profile| profile.annotated_request.as_ref())
457    }
458
459    /// Return the runtime-only annotated LLM response.
460    pub fn annotated_response(&self) -> Option<&Arc<AnnotatedLlmResponse>> {
461        self.category_profile()
462            .and_then(|profile| profile.annotated_response.as_ref())
463    }
464
465    /// Return true for scope-start events.
466    pub fn is_scope_start(&self) -> bool {
467        matches!(
468            self,
469            Self::Scope(ScopeEvent {
470                scope_category: ScopeCategory::Start,
471                ..
472            })
473        )
474    }
475
476    /// Return true for scope-end events.
477    pub fn is_scope_end(&self) -> bool {
478        matches!(
479            self,
480            Self::Scope(ScopeEvent {
481                scope_category: ScopeCategory::End,
482                ..
483            })
484        )
485    }
486
487    fn base(&self) -> &BaseEvent {
488        match self {
489            Self::Scope(event) => &event.base,
490            Self::Mark(event) => &event.base,
491        }
492    }
493}
494
495/// Convert handle bitflags into ATOF attributes.
496pub fn attributes_from_handle(attributes: HandleAttributes) -> Vec<String> {
497    match attributes {
498        HandleAttributes::Scope(attributes) => scope_attributes_to_strings(attributes),
499        HandleAttributes::Tool(attributes) => tool_attributes_to_strings(attributes),
500        HandleAttributes::Llm(attributes) => llm_attributes_to_strings(attributes),
501    }
502}
503
504/// Convert scope bitflags into ATOF attributes.
505pub fn scope_attributes_to_strings(attributes: ScopeAttributes) -> Vec<String> {
506    let mut values = Vec::new();
507    if attributes.contains(ScopeAttributes::PARALLEL) {
508        values.push("parallel".to_string());
509    }
510    if attributes.contains(ScopeAttributes::RELOCATABLE) {
511        values.push("relocatable".to_string());
512    }
513    values
514}
515
516/// Convert tool bitflags into ATOF attributes.
517pub fn tool_attributes_to_strings(attributes: ToolAttributes) -> Vec<String> {
518    let mut values = Vec::new();
519    if attributes.contains(ToolAttributes::REMOTE) {
520        values.push("remote".to_string());
521    }
522    values
523}
524
525/// Convert LLM bitflags into ATOF attributes.
526pub fn llm_attributes_to_strings(attributes: LlmAttributes) -> Vec<String> {
527    let mut values = Vec::new();
528    if attributes.contains(LlmAttributes::STATEFUL) {
529        values.push("stateful".to_string());
530    }
531    if attributes.contains(LlmAttributes::STREAMING) {
532        values.push("streaming".to_string());
533    }
534    values
535}
536
537fn canonicalize_attributes(mut attributes: Vec<String>) -> Vec<String> {
538    attributes.sort();
539    attributes.dedup();
540    attributes
541}
542
543mod timestamp {
544    use chrono::{DateTime, Utc};
545    use serde::{
546        Deserializer, Serializer,
547        de::{self, Visitor},
548    };
549    use std::fmt;
550
551    pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
552    where
553        S: Serializer,
554    {
555        serializer.serialize_str(&value.to_rfc3339())
556    }
557
558    pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
559    where
560        D: Deserializer<'de>,
561    {
562        deserializer.deserialize_any(TimestampVisitor)
563    }
564
565    struct TimestampVisitor;
566
567    impl<'de> Visitor<'de> for TimestampVisitor {
568        type Value = DateTime<Utc>;
569
570        fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
571            formatter.write_str("an RFC 3339 timestamp string or epoch microseconds integer")
572        }
573
574        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
575        where
576            E: de::Error,
577        {
578            DateTime::parse_from_rfc3339(value)
579                .map(|timestamp| timestamp.with_timezone(&Utc))
580                .map_err(E::custom)
581        }
582
583        fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
584        where
585            E: de::Error,
586        {
587            DateTime::<Utc>::from_timestamp_micros(value)
588                .ok_or_else(|| E::custom("epoch microseconds value is out of range"))
589        }
590
591        fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
592        where
593            E: de::Error,
594        {
595            let value = i64::try_from(value)
596                .map_err(|_| E::custom("epoch microseconds value is out of range"))?;
597            self.visit_i64(value)
598        }
599    }
600}