1use std::collections::BTreeMap;
19use std::sync::Arc;
20
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23use typed_builder::TypedBuilder;
24use uuid::Uuid;
25
26use crate::api::llm::LlmAttributes;
27use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
28use crate::api::tool::ToolAttributes;
29use crate::codec::request::AnnotatedLlmRequest;
30use crate::codec::response::AnnotatedLlmResponse;
31use crate::json::Json;
32
33pub const ATOF_VERSION: &str = "0.1";
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
38#[builder(field_defaults(setter(into)))]
39pub struct DataSchema {
40 pub name: String,
42 pub version: String,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
51#[serde(transparent)]
52pub struct EventCategory(String);
53
54impl EventCategory {
55 pub fn agent() -> Self {
57 Self("agent".into())
58 }
59
60 pub fn function() -> Self {
62 Self("function".into())
63 }
64
65 pub fn llm() -> Self {
67 Self("llm".into())
68 }
69
70 pub fn tool() -> Self {
72 Self("tool".into())
73 }
74
75 pub fn retriever() -> Self {
77 Self("retriever".into())
78 }
79
80 pub fn embedder() -> Self {
82 Self("embedder".into())
83 }
84
85 pub fn reranker() -> Self {
87 Self("reranker".into())
88 }
89
90 pub fn guardrail() -> Self {
92 Self("guardrail".into())
93 }
94
95 pub fn evaluator() -> Self {
97 Self("evaluator".into())
98 }
99
100 pub fn custom() -> Self {
102 Self("custom".into())
103 }
104
105 pub fn unknown() -> Self {
107 Self("unknown".into())
108 }
109
110 pub fn new(value: impl Into<String>) -> Self {
112 Self(value.into())
113 }
114
115 pub fn as_str(&self) -> &str {
117 self.0.as_str()
118 }
119
120 pub fn to_scope_type(&self) -> ScopeType {
123 match self.as_str() {
124 "agent" => ScopeType::Agent,
125 "function" => ScopeType::Function,
126 "tool" => ScopeType::Tool,
127 "llm" => ScopeType::Llm,
128 "retriever" => ScopeType::Retriever,
129 "embedder" => ScopeType::Embedder,
130 "reranker" => ScopeType::Reranker,
131 "guardrail" => ScopeType::Guardrail,
132 "evaluator" => ScopeType::Evaluator,
133 "custom" => ScopeType::Custom,
134 _ => ScopeType::Unknown,
135 }
136 }
137}
138
139impl From<ScopeType> for EventCategory {
140 fn from(value: ScopeType) -> Self {
141 match value {
142 ScopeType::Agent => Self::agent(),
143 ScopeType::Function => Self::function(),
144 ScopeType::Tool => Self::tool(),
145 ScopeType::Llm => Self::llm(),
146 ScopeType::Retriever => Self::retriever(),
147 ScopeType::Embedder => Self::embedder(),
148 ScopeType::Reranker => Self::reranker(),
149 ScopeType::Guardrail => Self::guardrail(),
150 ScopeType::Evaluator => Self::evaluator(),
151 ScopeType::Custom => Self::custom(),
152 ScopeType::Unknown => Self::unknown(),
153 }
154 }
155}
156
157impl From<&EventCategory> for ScopeType {
158 fn from(value: &EventCategory) -> Self {
159 value.to_scope_type()
160 }
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "lowercase")]
166pub enum ScopeCategory {
167 Start,
169 End,
171}
172
173#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypedBuilder)]
179#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
180pub struct CategoryProfile {
181 #[builder(default)]
183 #[serde(default, skip_serializing_if = "Option::is_none")]
184 pub model_name: Option<String>,
185
186 #[builder(default)]
188 #[serde(default, skip_serializing_if = "Option::is_none")]
189 pub tool_call_id: Option<String>,
190
191 #[builder(default)]
193 #[serde(default, skip_serializing_if = "Option::is_none")]
194 pub subtype: Option<String>,
195
196 #[builder(default)]
198 #[serde(flatten)]
199 pub extra: BTreeMap<String, Json>,
200
201 #[builder(default)]
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
205
206 #[builder(default)]
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
210}
211
212impl CategoryProfile {
213 pub fn is_wire_empty(&self) -> bool {
215 self.model_name.is_none()
216 && self.tool_call_id.is_none()
217 && self.subtype.is_none()
218 && self.extra.is_empty()
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
224#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
225pub struct BaseEvent {
226 #[builder(default = ATOF_VERSION.to_string())]
228 pub atof_version: String,
229 #[builder(default)]
231 pub parent_uuid: Option<Uuid>,
232 #[builder(default = Uuid::now_v7())]
234 pub uuid: Uuid,
235 #[builder(default = Utc::now())]
237 #[serde(with = "timestamp")]
238 pub timestamp: DateTime<Utc>,
239 pub name: String,
241 #[builder(default)]
243 pub data: Option<Json>,
244 #[builder(default)]
246 pub data_schema: Option<DataSchema>,
247 #[builder(default)]
249 pub metadata: Option<Json>,
250}
251
252#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
254#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
255pub struct ScopeEvent {
256 #[serde(flatten)]
258 #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
259 pub base: BaseEvent,
260 pub scope_category: ScopeCategory,
262 #[builder(default)]
264 pub attributes: Vec<String>,
265 pub category: EventCategory,
267 #[builder(default)]
269 pub category_profile: Option<CategoryProfile>,
270}
271
272impl ScopeEvent {
273 pub fn new(
275 base: BaseEvent,
276 scope_category: ScopeCategory,
277 attributes: Vec<String>,
278 category: EventCategory,
279 category_profile: Option<CategoryProfile>,
280 ) -> Self {
281 Self {
282 base,
283 scope_category,
284 attributes: canonicalize_attributes(attributes),
285 category,
286 category_profile,
287 }
288 }
289}
290
291#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
293#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
294pub struct MarkEvent {
295 #[serde(flatten)]
297 #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
298 pub base: BaseEvent,
299 #[builder(default)]
301 pub category: Option<EventCategory>,
302 #[builder(default)]
304 pub category_profile: Option<CategoryProfile>,
305}
306
307impl MarkEvent {
308 pub fn new(
310 base: BaseEvent,
311 category: Option<EventCategory>,
312 category_profile: Option<CategoryProfile>,
313 ) -> Self {
314 Self {
315 base,
316 category,
317 category_profile,
318 }
319 }
320}
321
322#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
324#[serde(tag = "kind", rename_all = "lowercase")]
325pub enum Event {
326 Scope(ScopeEvent),
328 Mark(MarkEvent),
330}
331
332impl Event {
333 pub fn kind(&self) -> &'static str {
335 match self {
336 Self::Scope(_) => "scope",
337 Self::Mark(_) => "mark",
338 }
339 }
340
341 pub fn scope_category(&self) -> Option<ScopeCategory> {
343 match self {
344 Self::Scope(event) => Some(event.scope_category),
345 Self::Mark(_) => None,
346 }
347 }
348
349 pub fn category(&self) -> Option<&EventCategory> {
351 match self {
352 Self::Scope(event) => Some(&event.category),
353 Self::Mark(event) => event.category.as_ref(),
354 }
355 }
356
357 pub fn category_profile(&self) -> Option<&CategoryProfile> {
359 match self {
360 Self::Scope(event) => event.category_profile.as_ref(),
361 Self::Mark(event) => event.category_profile.as_ref(),
362 }
363 }
364
365 pub fn category_profile_mut(&mut self) -> Option<&mut CategoryProfile> {
367 match self {
368 Self::Scope(event) => event.category_profile.as_mut(),
369 Self::Mark(event) => event.category_profile.as_mut(),
370 }
371 }
372
373 pub fn parent_uuid(&self) -> Option<Uuid> {
375 self.base().parent_uuid
376 }
377
378 pub fn uuid(&self) -> Uuid {
380 self.base().uuid
381 }
382
383 pub fn timestamp(&self) -> &DateTime<Utc> {
385 &self.base().timestamp
386 }
387
388 pub fn name(&self) -> &str {
390 self.base().name.as_str()
391 }
392
393 pub fn data(&self) -> Option<&Json> {
395 self.base().data.as_ref()
396 }
397
398 pub fn data_schema(&self) -> Option<&DataSchema> {
400 self.base().data_schema.as_ref()
401 }
402
403 pub fn metadata(&self) -> Option<&Json> {
405 self.base().metadata.as_ref()
406 }
407
408 pub fn attributes(&self) -> Option<&[String]> {
410 match self {
411 Self::Scope(event) => Some(event.attributes.as_slice()),
412 Self::Mark(_) => None,
413 }
414 }
415
416 pub fn scope_type(&self) -> Option<ScopeType> {
418 self.category().map(EventCategory::to_scope_type)
419 }
420
421 pub fn input(&self) -> Option<&Json> {
423 match self {
424 Self::Scope(event) if event.scope_category == ScopeCategory::Start => {
425 event.base.data.as_ref()
426 }
427 _ => None,
428 }
429 }
430
431 pub fn output(&self) -> Option<&Json> {
433 match self {
434 Self::Scope(event) if event.scope_category == ScopeCategory::End => {
435 event.base.data.as_ref()
436 }
437 _ => None,
438 }
439 }
440
441 pub fn model_name(&self) -> Option<&str> {
443 self.category_profile()
444 .and_then(|profile| profile.model_name.as_deref())
445 }
446
447 pub fn tool_call_id(&self) -> Option<&str> {
449 self.category_profile()
450 .and_then(|profile| profile.tool_call_id.as_deref())
451 }
452
453 pub fn annotated_request(&self) -> Option<&Arc<AnnotatedLlmRequest>> {
455 self.category_profile()
456 .and_then(|profile| profile.annotated_request.as_ref())
457 }
458
459 pub fn annotated_response(&self) -> Option<&Arc<AnnotatedLlmResponse>> {
461 self.category_profile()
462 .and_then(|profile| profile.annotated_response.as_ref())
463 }
464
465 pub fn is_scope_start(&self) -> bool {
467 matches!(
468 self,
469 Self::Scope(ScopeEvent {
470 scope_category: ScopeCategory::Start,
471 ..
472 })
473 )
474 }
475
476 pub fn is_scope_end(&self) -> bool {
478 matches!(
479 self,
480 Self::Scope(ScopeEvent {
481 scope_category: ScopeCategory::End,
482 ..
483 })
484 )
485 }
486
487 fn base(&self) -> &BaseEvent {
488 match self {
489 Self::Scope(event) => &event.base,
490 Self::Mark(event) => &event.base,
491 }
492 }
493}
494
495pub fn attributes_from_handle(attributes: HandleAttributes) -> Vec<String> {
497 match attributes {
498 HandleAttributes::Scope(attributes) => scope_attributes_to_strings(attributes),
499 HandleAttributes::Tool(attributes) => tool_attributes_to_strings(attributes),
500 HandleAttributes::Llm(attributes) => llm_attributes_to_strings(attributes),
501 }
502}
503
504pub fn scope_attributes_to_strings(attributes: ScopeAttributes) -> Vec<String> {
506 let mut values = Vec::new();
507 if attributes.contains(ScopeAttributes::PARALLEL) {
508 values.push("parallel".to_string());
509 }
510 if attributes.contains(ScopeAttributes::RELOCATABLE) {
511 values.push("relocatable".to_string());
512 }
513 values
514}
515
516pub fn tool_attributes_to_strings(attributes: ToolAttributes) -> Vec<String> {
518 let mut values = Vec::new();
519 if attributes.contains(ToolAttributes::REMOTE) {
520 values.push("remote".to_string());
521 }
522 values
523}
524
525pub fn llm_attributes_to_strings(attributes: LlmAttributes) -> Vec<String> {
527 let mut values = Vec::new();
528 if attributes.contains(LlmAttributes::STATEFUL) {
529 values.push("stateful".to_string());
530 }
531 if attributes.contains(LlmAttributes::STREAMING) {
532 values.push("streaming".to_string());
533 }
534 values
535}
536
537fn canonicalize_attributes(mut attributes: Vec<String>) -> Vec<String> {
538 attributes.sort();
539 attributes.dedup();
540 attributes
541}
542
543mod timestamp {
544 use chrono::{DateTime, Utc};
545 use serde::{
546 Deserializer, Serializer,
547 de::{self, Visitor},
548 };
549 use std::fmt;
550
551 pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
552 where
553 S: Serializer,
554 {
555 serializer.serialize_str(&value.to_rfc3339())
556 }
557
558 pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
559 where
560 D: Deserializer<'de>,
561 {
562 deserializer.deserialize_any(TimestampVisitor)
563 }
564
565 struct TimestampVisitor;
566
567 impl<'de> Visitor<'de> for TimestampVisitor {
568 type Value = DateTime<Utc>;
569
570 fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
571 formatter.write_str("an RFC 3339 timestamp string or epoch microseconds integer")
572 }
573
574 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
575 where
576 E: de::Error,
577 {
578 DateTime::parse_from_rfc3339(value)
579 .map(|timestamp| timestamp.with_timezone(&Utc))
580 .map_err(E::custom)
581 }
582
583 fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
584 where
585 E: de::Error,
586 {
587 DateTime::<Utc>::from_timestamp_micros(value)
588 .ok_or_else(|| E::custom("epoch microseconds value is out of range"))
589 }
590
591 fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
592 where
593 E: de::Error,
594 {
595 let value = i64::try_from(value)
596 .map_err(|_| E::custom("epoch microseconds value is out of range"))?;
597 self.visit_i64(value)
598 }
599 }
600}