Skip to main content

nemo_flow/api/
event.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::collections::BTreeMap;
17use std::sync::Arc;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use typed_builder::TypedBuilder;
22use uuid::Uuid;
23
24use crate::api::llm::LlmAttributes;
25use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
26use crate::api::tool::ToolAttributes;
27use crate::codec::request::AnnotatedLlmRequest;
28use crate::codec::response::AnnotatedLlmResponse;
29use crate::json::Json;
30
31/// ATOF protocol version emitted by this runtime.
32pub const ATOF_VERSION: &str = "0.1";
33
34/// Identifier for the schema that describes an event's opaque `data` payload.
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
36#[builder(field_defaults(setter(into)))]
37pub struct DataSchema {
38    /// Schema name.
39    pub name: String,
40    /// Schema version.
41    pub version: String,
42}
43
44/// Semantic category carried by ATOF `category`.
45///
46/// This is intentionally string-backed so consumers can preserve category
47/// values from newer producers without failing deserialization.
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49#[serde(transparent)]
50pub struct EventCategory(String);
51
52impl EventCategory {
53    /// Top-level agent or workflow scope.
54    pub fn agent() -> Self {
55        Self("agent".into())
56    }
57
58    /// Generic function or application step.
59    pub fn function() -> Self {
60        Self("function".into())
61    }
62
63    /// LLM call.
64    pub fn llm() -> Self {
65        Self("llm".into())
66    }
67
68    /// Tool invocation.
69    pub fn tool() -> Self {
70        Self("tool".into())
71    }
72
73    /// Retrieval step.
74    pub fn retriever() -> Self {
75        Self("retriever".into())
76    }
77
78    /// Embedding-generation step.
79    pub fn embedder() -> Self {
80        Self("embedder".into())
81    }
82
83    /// Result reranking step.
84    pub fn reranker() -> Self {
85        Self("reranker".into())
86    }
87
88    /// Guardrail or validation step.
89    pub fn guardrail() -> Self {
90        Self("guardrail".into())
91    }
92
93    /// Evaluation or scoring step.
94    pub fn evaluator() -> Self {
95        Self("evaluator".into())
96    }
97
98    /// Vendor-defined custom category.
99    pub fn custom() -> Self {
100        Self("custom".into())
101    }
102
103    /// Unknown or unclassified work.
104    pub fn unknown() -> Self {
105        Self("unknown".into())
106    }
107
108    /// Create a category from an arbitrary producer-provided string.
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112
113    /// Return the string form serialized on the wire.
114    pub fn as_str(&self) -> &str {
115        self.0.as_str()
116    }
117
118    /// Convert this category to the closest legacy scope type for internal
119    /// adapters that still need span-kind classification.
120    pub fn to_scope_type(&self) -> ScopeType {
121        match self.as_str() {
122            "agent" => ScopeType::Agent,
123            "function" => ScopeType::Function,
124            "tool" => ScopeType::Tool,
125            "llm" => ScopeType::Llm,
126            "retriever" => ScopeType::Retriever,
127            "embedder" => ScopeType::Embedder,
128            "reranker" => ScopeType::Reranker,
129            "guardrail" => ScopeType::Guardrail,
130            "evaluator" => ScopeType::Evaluator,
131            "custom" => ScopeType::Custom,
132            _ => ScopeType::Unknown,
133        }
134    }
135}
136
137impl From<ScopeType> for EventCategory {
138    fn from(value: ScopeType) -> Self {
139        match value {
140            ScopeType::Agent => Self::agent(),
141            ScopeType::Function => Self::function(),
142            ScopeType::Tool => Self::tool(),
143            ScopeType::Llm => Self::llm(),
144            ScopeType::Retriever => Self::retriever(),
145            ScopeType::Embedder => Self::embedder(),
146            ScopeType::Reranker => Self::reranker(),
147            ScopeType::Guardrail => Self::guardrail(),
148            ScopeType::Evaluator => Self::evaluator(),
149            ScopeType::Custom => Self::custom(),
150            ScopeType::Unknown => Self::unknown(),
151        }
152    }
153}
154
155impl From<&EventCategory> for ScopeType {
156    fn from(value: &EventCategory) -> Self {
157        value.to_scope_type()
158    }
159}
160
161/// ATOF lifecycle phase for a scope event.
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "lowercase")]
164pub enum ScopeCategory {
165    /// Scope was entered.
166    Start,
167    /// Scope was exited.
168    End,
169}
170
171/// Category-specific profile data.
172///
173/// Unknown wire keys are preserved in `extra`. LLM annotations are runtime-only
174/// enrichment used by internal adaptive/ATIF logic and are never serialized.
175#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypedBuilder)]
176#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
177pub struct CategoryProfile {
178    /// Normalized model identifier for LLM events.
179    #[builder(default)]
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub model_name: Option<String>,
182
183    /// LLM-provider correlation ID for Tool events.
184    #[builder(default)]
185    #[serde(default, skip_serializing_if = "Option::is_none")]
186    pub tool_call_id: Option<String>,
187
188    /// Vendor subtype required when `category == "custom"`.
189    #[builder(default)]
190    #[serde(default, skip_serializing_if = "Option::is_none")]
191    pub subtype: Option<String>,
192
193    /// Unknown category-profile keys preserved from newer producers.
194    #[builder(default)]
195    #[serde(flatten)]
196    pub extra: BTreeMap<String, Json>,
197
198    /// Normalized request annotation for LLM start events.
199    #[builder(default)]
200    #[serde(default, skip_serializing_if = "Option::is_none")]
201    pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
202
203    /// Normalized response annotation for LLM end events.
204    #[builder(default)]
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
207}
208
209impl CategoryProfile {
210    /// Return true when the profile has no wire-serialized fields.
211    pub fn is_wire_empty(&self) -> bool {
212        self.model_name.is_none()
213            && self.tool_call_id.is_none()
214            && self.subtype.is_none()
215            && self.extra.is_empty()
216    }
217}
218
219/// Shared event metadata carried by every ATOF event.
220#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
221#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
222pub struct BaseEvent {
223    /// ATOF protocol version.
224    #[builder(default = ATOF_VERSION.to_string())]
225    pub atof_version: String,
226    /// UUID of the parent scope, if any.
227    #[builder(default)]
228    pub parent_uuid: Option<Uuid>,
229    /// Unique identifier for the event or span.
230    #[builder(default = Uuid::now_v7())]
231    pub uuid: Uuid,
232    /// Event timestamp in UTC.
233    #[builder(default = Utc::now())]
234    #[serde(with = "timestamp")]
235    pub timestamp: DateTime<Utc>,
236    /// Human-readable event name.
237    pub name: String,
238    /// Application-defined payload.
239    #[builder(default)]
240    pub data: Option<Json>,
241    /// Optional schema identifier for `data`.
242    #[builder(default)]
243    pub data_schema: Option<DataSchema>,
244    /// Optional tracing/correlation metadata.
245    #[builder(default)]
246    pub metadata: Option<Json>,
247}
248
249/// ATOF scope lifecycle event.
250#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
251#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
252pub struct ScopeEvent {
253    /// Shared ATOF envelope.
254    #[serde(flatten)]
255    #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
256    pub base: BaseEvent,
257    /// Scope lifecycle phase.
258    pub scope_category: ScopeCategory,
259    /// Canonical lowercase behavioral flags.
260    #[builder(default)]
261    pub attributes: Vec<String>,
262    /// Semantic category of work.
263    pub category: EventCategory,
264    /// Category-specific typed fields.
265    #[builder(default)]
266    pub category_profile: Option<CategoryProfile>,
267}
268
269impl ScopeEvent {
270    /// Construct a scope event from a base envelope and ATOF-specific fields.
271    pub fn new(
272        base: BaseEvent,
273        scope_category: ScopeCategory,
274        attributes: Vec<String>,
275        category: EventCategory,
276        category_profile: Option<CategoryProfile>,
277    ) -> Self {
278        Self {
279            base,
280            scope_category,
281            attributes: canonicalize_attributes(attributes),
282            category,
283            category_profile,
284        }
285    }
286}
287
288/// ATOF point-in-time mark event.
289#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
290#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
291pub struct MarkEvent {
292    /// Shared ATOF envelope.
293    #[serde(flatten)]
294    #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
295    pub base: BaseEvent,
296    /// Optional semantic category for the checkpoint.
297    #[builder(default)]
298    pub category: Option<EventCategory>,
299    /// Optional category-specific typed fields.
300    #[builder(default)]
301    pub category_profile: Option<CategoryProfile>,
302}
303
304impl MarkEvent {
305    /// Construct a mark event from a base envelope and optional category data.
306    pub fn new(
307        base: BaseEvent,
308        category: Option<EventCategory>,
309        category_profile: Option<CategoryProfile>,
310    ) -> Self {
311        Self {
312            base,
313            category,
314            category_profile,
315        }
316    }
317}
318
319/// Tagged union covering the two ATOF event kinds emitted by the runtime.
320#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
321#[serde(tag = "kind", rename_all = "lowercase")]
322pub enum Event {
323    /// Scope lifecycle event.
324    Scope(ScopeEvent),
325    /// Point-in-time checkpoint event.
326    Mark(MarkEvent),
327}
328
329impl Event {
330    /// Return the ATOF event kind.
331    pub fn kind(&self) -> &'static str {
332        match self {
333            Self::Scope(_) => "scope",
334            Self::Mark(_) => "mark",
335        }
336    }
337
338    /// Return the lifecycle phase for scope events.
339    pub fn scope_category(&self) -> Option<ScopeCategory> {
340        match self {
341            Self::Scope(event) => Some(event.scope_category),
342            Self::Mark(_) => None,
343        }
344    }
345
346    /// Return the semantic category if present.
347    pub fn category(&self) -> Option<&EventCategory> {
348        match self {
349            Self::Scope(event) => Some(&event.category),
350            Self::Mark(event) => event.category.as_ref(),
351        }
352    }
353
354    /// Return the category-specific profile if present.
355    pub fn category_profile(&self) -> Option<&CategoryProfile> {
356        match self {
357            Self::Scope(event) => event.category_profile.as_ref(),
358            Self::Mark(event) => event.category_profile.as_ref(),
359        }
360    }
361
362    /// Return the mutable category-specific profile if present.
363    pub fn category_profile_mut(&mut self) -> Option<&mut CategoryProfile> {
364        match self {
365            Self::Scope(event) => event.category_profile.as_mut(),
366            Self::Mark(event) => event.category_profile.as_mut(),
367        }
368    }
369
370    /// Return the parent scope UUID, if the event is nested under a scope.
371    pub fn parent_uuid(&self) -> Option<Uuid> {
372        self.base().parent_uuid
373    }
374
375    /// Return the unique event or span UUID.
376    pub fn uuid(&self) -> Uuid {
377        self.base().uuid
378    }
379
380    /// Return the event timestamp.
381    pub fn timestamp(&self) -> &DateTime<Utc> {
382        &self.base().timestamp
383    }
384
385    /// Return the human-readable event name.
386    pub fn name(&self) -> &str {
387        self.base().name.as_str()
388    }
389
390    /// Return the optional application payload attached to the event.
391    pub fn data(&self) -> Option<&Json> {
392        self.base().data.as_ref()
393    }
394
395    /// Return the optional data schema.
396    pub fn data_schema(&self) -> Option<&DataSchema> {
397        self.base().data_schema.as_ref()
398    }
399
400    /// Return the optional metadata attached to the event.
401    pub fn metadata(&self) -> Option<&Json> {
402        self.base().metadata.as_ref()
403    }
404
405    /// Return attributes for scope events.
406    pub fn attributes(&self) -> Option<&[String]> {
407        match self {
408            Self::Scope(event) => Some(event.attributes.as_slice()),
409            Self::Mark(_) => None,
410        }
411    }
412
413    /// Return the semantic scope category for scope events.
414    pub fn scope_type(&self) -> Option<ScopeType> {
415        self.category().map(EventCategory::to_scope_type)
416    }
417
418    /// Return the semantic input payload for start events.
419    pub fn input(&self) -> Option<&Json> {
420        match self {
421            Self::Scope(event) if event.scope_category == ScopeCategory::Start => {
422                event.base.data.as_ref()
423            }
424            _ => None,
425        }
426    }
427
428    /// Return the semantic output payload for end events.
429    pub fn output(&self) -> Option<&Json> {
430        match self {
431            Self::Scope(event) if event.scope_category == ScopeCategory::End => {
432                event.base.data.as_ref()
433            }
434            _ => None,
435        }
436    }
437
438    /// Return the normalized model name for LLM events.
439    pub fn model_name(&self) -> Option<&str> {
440        self.category_profile()
441            .and_then(|profile| profile.model_name.as_deref())
442    }
443
444    /// Return the provider-specific tool-call correlation identifier.
445    pub fn tool_call_id(&self) -> Option<&str> {
446        self.category_profile()
447            .and_then(|profile| profile.tool_call_id.as_deref())
448    }
449
450    /// Return the runtime-only annotated LLM request.
451    pub fn annotated_request(&self) -> Option<&Arc<AnnotatedLlmRequest>> {
452        self.category_profile()
453            .and_then(|profile| profile.annotated_request.as_ref())
454    }
455
456    /// Return the runtime-only annotated LLM response.
457    pub fn annotated_response(&self) -> Option<&Arc<AnnotatedLlmResponse>> {
458        self.category_profile()
459            .and_then(|profile| profile.annotated_response.as_ref())
460    }
461
462    /// Return true for scope-start events.
463    pub fn is_scope_start(&self) -> bool {
464        matches!(
465            self,
466            Self::Scope(ScopeEvent {
467                scope_category: ScopeCategory::Start,
468                ..
469            })
470        )
471    }
472
473    /// Return true for scope-end events.
474    pub fn is_scope_end(&self) -> bool {
475        matches!(
476            self,
477            Self::Scope(ScopeEvent {
478                scope_category: ScopeCategory::End,
479                ..
480            })
481        )
482    }
483
484    fn base(&self) -> &BaseEvent {
485        match self {
486            Self::Scope(event) => &event.base,
487            Self::Mark(event) => &event.base,
488        }
489    }
490}
491
492/// Convert handle bitflags into ATOF attributes.
493pub fn attributes_from_handle(attributes: HandleAttributes) -> Vec<String> {
494    match attributes {
495        HandleAttributes::Scope(attributes) => scope_attributes_to_strings(attributes),
496        HandleAttributes::Tool(attributes) => tool_attributes_to_strings(attributes),
497        HandleAttributes::Llm(attributes) => llm_attributes_to_strings(attributes),
498    }
499}
500
501/// Convert scope bitflags into ATOF attributes.
502pub fn scope_attributes_to_strings(attributes: ScopeAttributes) -> Vec<String> {
503    let mut values = Vec::new();
504    if attributes.contains(ScopeAttributes::PARALLEL) {
505        values.push("parallel".to_string());
506    }
507    if attributes.contains(ScopeAttributes::RELOCATABLE) {
508        values.push("relocatable".to_string());
509    }
510    values
511}
512
513/// Convert tool bitflags into ATOF attributes.
514pub fn tool_attributes_to_strings(attributes: ToolAttributes) -> Vec<String> {
515    let mut values = Vec::new();
516    if attributes.contains(ToolAttributes::REMOTE) {
517        values.push("remote".to_string());
518    }
519    values
520}
521
522/// Convert LLM bitflags into ATOF attributes.
523pub fn llm_attributes_to_strings(attributes: LlmAttributes) -> Vec<String> {
524    let mut values = Vec::new();
525    if attributes.contains(LlmAttributes::STATEFUL) {
526        values.push("stateful".to_string());
527    }
528    if attributes.contains(LlmAttributes::STREAMING) {
529        values.push("streaming".to_string());
530    }
531    values
532}
533
534fn canonicalize_attributes(mut attributes: Vec<String>) -> Vec<String> {
535    attributes.sort();
536    attributes.dedup();
537    attributes
538}
539
540mod timestamp {
541    use chrono::{DateTime, Utc};
542    use serde::{
543        Deserializer, Serializer,
544        de::{self, Visitor},
545    };
546    use std::fmt;
547
548    pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
549    where
550        S: Serializer,
551    {
552        serializer.serialize_str(&value.to_rfc3339())
553    }
554
555    pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
556    where
557        D: Deserializer<'de>,
558    {
559        deserializer.deserialize_any(TimestampVisitor)
560    }
561
562    struct TimestampVisitor;
563
564    impl<'de> Visitor<'de> for TimestampVisitor {
565        type Value = DateTime<Utc>;
566
567        fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
568            formatter.write_str("an RFC 3339 timestamp string or epoch microseconds integer")
569        }
570
571        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
572        where
573            E: de::Error,
574        {
575            DateTime::parse_from_rfc3339(value)
576                .map(|timestamp| timestamp.with_timezone(&Utc))
577                .map_err(E::custom)
578        }
579
580        fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
581        where
582            E: de::Error,
583        {
584            DateTime::<Utc>::from_timestamp_micros(value)
585                .ok_or_else(|| E::custom("epoch microseconds value is out of range"))
586        }
587
588        fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
589        where
590            E: de::Error,
591        {
592            let value = i64::try_from(value)
593                .map_err(|_| E::custom("epoch microseconds value is out of range"))?;
594            self.visit_i64(value)
595        }
596    }
597}