1use std::collections::BTreeMap;
17use std::sync::Arc;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use typed_builder::TypedBuilder;
22use uuid::Uuid;
23
24use crate::api::llm::LlmAttributes;
25use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
26use crate::api::tool::ToolAttributes;
27use crate::codec::request::AnnotatedLlmRequest;
28use crate::codec::response::AnnotatedLlmResponse;
29use crate::json::Json;
30
31pub const ATOF_VERSION: &str = "0.1";
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
36#[builder(field_defaults(setter(into)))]
37pub struct DataSchema {
38 pub name: String,
40 pub version: String,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
49#[serde(transparent)]
50pub struct EventCategory(String);
51
52impl EventCategory {
53 pub fn agent() -> Self {
55 Self("agent".into())
56 }
57
58 pub fn function() -> Self {
60 Self("function".into())
61 }
62
63 pub fn llm() -> Self {
65 Self("llm".into())
66 }
67
68 pub fn tool() -> Self {
70 Self("tool".into())
71 }
72
73 pub fn retriever() -> Self {
75 Self("retriever".into())
76 }
77
78 pub fn embedder() -> Self {
80 Self("embedder".into())
81 }
82
83 pub fn reranker() -> Self {
85 Self("reranker".into())
86 }
87
88 pub fn guardrail() -> Self {
90 Self("guardrail".into())
91 }
92
93 pub fn evaluator() -> Self {
95 Self("evaluator".into())
96 }
97
98 pub fn custom() -> Self {
100 Self("custom".into())
101 }
102
103 pub fn unknown() -> Self {
105 Self("unknown".into())
106 }
107
108 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112
113 pub fn as_str(&self) -> &str {
115 self.0.as_str()
116 }
117
118 pub fn to_scope_type(&self) -> ScopeType {
121 match self.as_str() {
122 "agent" => ScopeType::Agent,
123 "function" => ScopeType::Function,
124 "tool" => ScopeType::Tool,
125 "llm" => ScopeType::Llm,
126 "retriever" => ScopeType::Retriever,
127 "embedder" => ScopeType::Embedder,
128 "reranker" => ScopeType::Reranker,
129 "guardrail" => ScopeType::Guardrail,
130 "evaluator" => ScopeType::Evaluator,
131 "custom" => ScopeType::Custom,
132 _ => ScopeType::Unknown,
133 }
134 }
135}
136
137impl From<ScopeType> for EventCategory {
138 fn from(value: ScopeType) -> Self {
139 match value {
140 ScopeType::Agent => Self::agent(),
141 ScopeType::Function => Self::function(),
142 ScopeType::Tool => Self::tool(),
143 ScopeType::Llm => Self::llm(),
144 ScopeType::Retriever => Self::retriever(),
145 ScopeType::Embedder => Self::embedder(),
146 ScopeType::Reranker => Self::reranker(),
147 ScopeType::Guardrail => Self::guardrail(),
148 ScopeType::Evaluator => Self::evaluator(),
149 ScopeType::Custom => Self::custom(),
150 ScopeType::Unknown => Self::unknown(),
151 }
152 }
153}
154
155impl From<&EventCategory> for ScopeType {
156 fn from(value: &EventCategory) -> Self {
157 value.to_scope_type()
158 }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "lowercase")]
164pub enum ScopeCategory {
165 Start,
167 End,
169}
170
171#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypedBuilder)]
176#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
177pub struct CategoryProfile {
178 #[builder(default)]
180 #[serde(default, skip_serializing_if = "Option::is_none")]
181 pub model_name: Option<String>,
182
183 #[builder(default)]
185 #[serde(default, skip_serializing_if = "Option::is_none")]
186 pub tool_call_id: Option<String>,
187
188 #[builder(default)]
190 #[serde(default, skip_serializing_if = "Option::is_none")]
191 pub subtype: Option<String>,
192
193 #[builder(default)]
195 #[serde(flatten)]
196 pub extra: BTreeMap<String, Json>,
197
198 #[builder(default)]
200 #[serde(default, skip_serializing_if = "Option::is_none")]
201 pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
202
203 #[builder(default)]
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
207}
208
209impl CategoryProfile {
210 pub fn is_wire_empty(&self) -> bool {
212 self.model_name.is_none()
213 && self.tool_call_id.is_none()
214 && self.subtype.is_none()
215 && self.extra.is_empty()
216 }
217}
218
219#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
221#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
222pub struct BaseEvent {
223 #[builder(default = ATOF_VERSION.to_string())]
225 pub atof_version: String,
226 #[builder(default)]
228 pub parent_uuid: Option<Uuid>,
229 #[builder(default = Uuid::now_v7())]
231 pub uuid: Uuid,
232 #[builder(default = Utc::now())]
234 #[serde(with = "timestamp")]
235 pub timestamp: DateTime<Utc>,
236 pub name: String,
238 #[builder(default)]
240 pub data: Option<Json>,
241 #[builder(default)]
243 pub data_schema: Option<DataSchema>,
244 #[builder(default)]
246 pub metadata: Option<Json>,
247}
248
249#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
251#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
252pub struct ScopeEvent {
253 #[serde(flatten)]
255 #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
256 pub base: BaseEvent,
257 pub scope_category: ScopeCategory,
259 #[builder(default)]
261 pub attributes: Vec<String>,
262 pub category: EventCategory,
264 #[builder(default)]
266 pub category_profile: Option<CategoryProfile>,
267}
268
269impl ScopeEvent {
270 pub fn new(
272 base: BaseEvent,
273 scope_category: ScopeCategory,
274 attributes: Vec<String>,
275 category: EventCategory,
276 category_profile: Option<CategoryProfile>,
277 ) -> Self {
278 Self {
279 base,
280 scope_category,
281 attributes: canonicalize_attributes(attributes),
282 category,
283 category_profile,
284 }
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
290#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
291pub struct MarkEvent {
292 #[serde(flatten)]
294 #[builder(setter(skip), default = BaseEvent::builder().name("").build())]
295 pub base: BaseEvent,
296 #[builder(default)]
298 pub category: Option<EventCategory>,
299 #[builder(default)]
301 pub category_profile: Option<CategoryProfile>,
302}
303
304impl MarkEvent {
305 pub fn new(
307 base: BaseEvent,
308 category: Option<EventCategory>,
309 category_profile: Option<CategoryProfile>,
310 ) -> Self {
311 Self {
312 base,
313 category,
314 category_profile,
315 }
316 }
317}
318
319#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
321#[serde(tag = "kind", rename_all = "lowercase")]
322pub enum Event {
323 Scope(ScopeEvent),
325 Mark(MarkEvent),
327}
328
329impl Event {
330 pub fn kind(&self) -> &'static str {
332 match self {
333 Self::Scope(_) => "scope",
334 Self::Mark(_) => "mark",
335 }
336 }
337
338 pub fn scope_category(&self) -> Option<ScopeCategory> {
340 match self {
341 Self::Scope(event) => Some(event.scope_category),
342 Self::Mark(_) => None,
343 }
344 }
345
346 pub fn category(&self) -> Option<&EventCategory> {
348 match self {
349 Self::Scope(event) => Some(&event.category),
350 Self::Mark(event) => event.category.as_ref(),
351 }
352 }
353
354 pub fn category_profile(&self) -> Option<&CategoryProfile> {
356 match self {
357 Self::Scope(event) => event.category_profile.as_ref(),
358 Self::Mark(event) => event.category_profile.as_ref(),
359 }
360 }
361
362 pub fn category_profile_mut(&mut self) -> Option<&mut CategoryProfile> {
364 match self {
365 Self::Scope(event) => event.category_profile.as_mut(),
366 Self::Mark(event) => event.category_profile.as_mut(),
367 }
368 }
369
370 pub fn parent_uuid(&self) -> Option<Uuid> {
372 self.base().parent_uuid
373 }
374
375 pub fn uuid(&self) -> Uuid {
377 self.base().uuid
378 }
379
380 pub fn timestamp(&self) -> &DateTime<Utc> {
382 &self.base().timestamp
383 }
384
385 pub fn name(&self) -> &str {
387 self.base().name.as_str()
388 }
389
390 pub fn data(&self) -> Option<&Json> {
392 self.base().data.as_ref()
393 }
394
395 pub fn data_schema(&self) -> Option<&DataSchema> {
397 self.base().data_schema.as_ref()
398 }
399
400 pub fn metadata(&self) -> Option<&Json> {
402 self.base().metadata.as_ref()
403 }
404
405 pub fn attributes(&self) -> Option<&[String]> {
407 match self {
408 Self::Scope(event) => Some(event.attributes.as_slice()),
409 Self::Mark(_) => None,
410 }
411 }
412
413 pub fn scope_type(&self) -> Option<ScopeType> {
415 self.category().map(EventCategory::to_scope_type)
416 }
417
418 pub fn input(&self) -> Option<&Json> {
420 match self {
421 Self::Scope(event) if event.scope_category == ScopeCategory::Start => {
422 event.base.data.as_ref()
423 }
424 _ => None,
425 }
426 }
427
428 pub fn output(&self) -> Option<&Json> {
430 match self {
431 Self::Scope(event) if event.scope_category == ScopeCategory::End => {
432 event.base.data.as_ref()
433 }
434 _ => None,
435 }
436 }
437
438 pub fn model_name(&self) -> Option<&str> {
440 self.category_profile()
441 .and_then(|profile| profile.model_name.as_deref())
442 }
443
444 pub fn tool_call_id(&self) -> Option<&str> {
446 self.category_profile()
447 .and_then(|profile| profile.tool_call_id.as_deref())
448 }
449
450 pub fn annotated_request(&self) -> Option<&Arc<AnnotatedLlmRequest>> {
452 self.category_profile()
453 .and_then(|profile| profile.annotated_request.as_ref())
454 }
455
456 pub fn annotated_response(&self) -> Option<&Arc<AnnotatedLlmResponse>> {
458 self.category_profile()
459 .and_then(|profile| profile.annotated_response.as_ref())
460 }
461
462 pub fn is_scope_start(&self) -> bool {
464 matches!(
465 self,
466 Self::Scope(ScopeEvent {
467 scope_category: ScopeCategory::Start,
468 ..
469 })
470 )
471 }
472
473 pub fn is_scope_end(&self) -> bool {
475 matches!(
476 self,
477 Self::Scope(ScopeEvent {
478 scope_category: ScopeCategory::End,
479 ..
480 })
481 )
482 }
483
484 fn base(&self) -> &BaseEvent {
485 match self {
486 Self::Scope(event) => &event.base,
487 Self::Mark(event) => &event.base,
488 }
489 }
490}
491
492pub fn attributes_from_handle(attributes: HandleAttributes) -> Vec<String> {
494 match attributes {
495 HandleAttributes::Scope(attributes) => scope_attributes_to_strings(attributes),
496 HandleAttributes::Tool(attributes) => tool_attributes_to_strings(attributes),
497 HandleAttributes::Llm(attributes) => llm_attributes_to_strings(attributes),
498 }
499}
500
501pub fn scope_attributes_to_strings(attributes: ScopeAttributes) -> Vec<String> {
503 let mut values = Vec::new();
504 if attributes.contains(ScopeAttributes::PARALLEL) {
505 values.push("parallel".to_string());
506 }
507 if attributes.contains(ScopeAttributes::RELOCATABLE) {
508 values.push("relocatable".to_string());
509 }
510 values
511}
512
513pub fn tool_attributes_to_strings(attributes: ToolAttributes) -> Vec<String> {
515 let mut values = Vec::new();
516 if attributes.contains(ToolAttributes::REMOTE) {
517 values.push("remote".to_string());
518 }
519 values
520}
521
522pub fn llm_attributes_to_strings(attributes: LlmAttributes) -> Vec<String> {
524 let mut values = Vec::new();
525 if attributes.contains(LlmAttributes::STATEFUL) {
526 values.push("stateful".to_string());
527 }
528 if attributes.contains(LlmAttributes::STREAMING) {
529 values.push("streaming".to_string());
530 }
531 values
532}
533
534fn canonicalize_attributes(mut attributes: Vec<String>) -> Vec<String> {
535 attributes.sort();
536 attributes.dedup();
537 attributes
538}
539
540mod timestamp {
541 use chrono::{DateTime, Utc};
542 use serde::{
543 Deserializer, Serializer,
544 de::{self, Visitor},
545 };
546 use std::fmt;
547
548 pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
549 where
550 S: Serializer,
551 {
552 serializer.serialize_str(&value.to_rfc3339())
553 }
554
555 pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
556 where
557 D: Deserializer<'de>,
558 {
559 deserializer.deserialize_any(TimestampVisitor)
560 }
561
562 struct TimestampVisitor;
563
564 impl<'de> Visitor<'de> for TimestampVisitor {
565 type Value = DateTime<Utc>;
566
567 fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
568 formatter.write_str("an RFC 3339 timestamp string or epoch microseconds integer")
569 }
570
571 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
572 where
573 E: de::Error,
574 {
575 DateTime::parse_from_rfc3339(value)
576 .map(|timestamp| timestamp.with_timezone(&Utc))
577 .map_err(E::custom)
578 }
579
580 fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
581 where
582 E: de::Error,
583 {
584 DateTime::<Utc>::from_timestamp_micros(value)
585 .ok_or_else(|| E::custom("epoch microseconds value is out of range"))
586 }
587
588 fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
589 where
590 E: de::Error,
591 {
592 let value = i64::try_from(value)
593 .map_err(|_| E::custom("epoch microseconds value is out of range"))?;
594 self.visit_i64(value)
595 }
596 }
597}