use std::collections::BTreeMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::api::llm::LlmAttributes;
use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
use crate::api::tool::ToolAttributes;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::response::AnnotatedLlmResponse;
use crate::json::Json;
pub const ATOF_VERSION: &str = "0.1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(into)))]
pub struct DataSchema {
pub name: String,
pub version: String,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct EventCategory(String);
impl EventCategory {
pub fn agent() -> Self {
Self("agent".into())
}
pub fn function() -> Self {
Self("function".into())
}
pub fn llm() -> Self {
Self("llm".into())
}
pub fn tool() -> Self {
Self("tool".into())
}
pub fn retriever() -> Self {
Self("retriever".into())
}
pub fn embedder() -> Self {
Self("embedder".into())
}
pub fn reranker() -> Self {
Self("reranker".into())
}
pub fn guardrail() -> Self {
Self("guardrail".into())
}
pub fn evaluator() -> Self {
Self("evaluator".into())
}
pub fn custom() -> Self {
Self("custom".into())
}
pub fn unknown() -> Self {
Self("unknown".into())
}
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
pub fn to_scope_type(&self) -> ScopeType {
match self.as_str() {
"agent" => ScopeType::Agent,
"function" => ScopeType::Function,
"tool" => ScopeType::Tool,
"llm" => ScopeType::Llm,
"retriever" => ScopeType::Retriever,
"embedder" => ScopeType::Embedder,
"reranker" => ScopeType::Reranker,
"guardrail" => ScopeType::Guardrail,
"evaluator" => ScopeType::Evaluator,
"custom" => ScopeType::Custom,
_ => ScopeType::Unknown,
}
}
}
impl From<ScopeType> for EventCategory {
fn from(value: ScopeType) -> Self {
match value {
ScopeType::Agent => Self::agent(),
ScopeType::Function => Self::function(),
ScopeType::Tool => Self::tool(),
ScopeType::Llm => Self::llm(),
ScopeType::Retriever => Self::retriever(),
ScopeType::Embedder => Self::embedder(),
ScopeType::Reranker => Self::reranker(),
ScopeType::Guardrail => Self::guardrail(),
ScopeType::Evaluator => Self::evaluator(),
ScopeType::Custom => Self::custom(),
ScopeType::Unknown => Self::unknown(),
}
}
}
impl From<&EventCategory> for ScopeType {
fn from(value: &EventCategory) -> Self {
value.to_scope_type()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScopeCategory {
Start,
End,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct CategoryProfile {
#[builder(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model_name: Option<String>,
#[builder(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
#[builder(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subtype: Option<String>,
#[builder(default)]
#[serde(flatten)]
pub extra: BTreeMap<String, Json>,
#[builder(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
#[builder(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
}
impl CategoryProfile {
pub fn is_wire_empty(&self) -> bool {
self.model_name.is_none()
&& self.tool_call_id.is_none()
&& self.subtype.is_none()
&& self.extra.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct BaseEvent {
#[builder(default = ATOF_VERSION.to_string())]
pub atof_version: String,
#[builder(default)]
pub parent_uuid: Option<Uuid>,
#[builder(default = Uuid::now_v7())]
pub uuid: Uuid,
#[builder(default = Utc::now())]
#[serde(with = "timestamp")]
pub timestamp: DateTime<Utc>,
pub name: String,
#[builder(default)]
pub data: Option<Json>,
#[builder(default)]
pub data_schema: Option<DataSchema>,
#[builder(default)]
pub metadata: Option<Json>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct ScopeEvent {
#[serde(flatten)]
#[builder(setter(skip), default = BaseEvent::builder().name("").build())]
pub base: BaseEvent,
pub scope_category: ScopeCategory,
#[builder(default)]
pub attributes: Vec<String>,
pub category: EventCategory,
#[builder(default)]
pub category_profile: Option<CategoryProfile>,
}
impl ScopeEvent {
pub fn new(
base: BaseEvent,
scope_category: ScopeCategory,
attributes: Vec<String>,
category: EventCategory,
category_profile: Option<CategoryProfile>,
) -> Self {
Self {
base,
scope_category,
attributes: canonicalize_attributes(attributes),
category,
category_profile,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
#[builder(field_defaults(setter(into, strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
pub struct MarkEvent {
#[serde(flatten)]
#[builder(setter(skip), default = BaseEvent::builder().name("").build())]
pub base: BaseEvent,
#[builder(default)]
pub category: Option<EventCategory>,
#[builder(default)]
pub category_profile: Option<CategoryProfile>,
}
impl MarkEvent {
pub fn new(
base: BaseEvent,
category: Option<EventCategory>,
category_profile: Option<CategoryProfile>,
) -> Self {
Self {
base,
category,
category_profile,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "lowercase")]
pub enum Event {
Scope(ScopeEvent),
Mark(MarkEvent),
}
impl Event {
pub fn kind(&self) -> &'static str {
match self {
Self::Scope(_) => "scope",
Self::Mark(_) => "mark",
}
}
pub fn scope_category(&self) -> Option<ScopeCategory> {
match self {
Self::Scope(event) => Some(event.scope_category),
Self::Mark(_) => None,
}
}
pub fn category(&self) -> Option<&EventCategory> {
match self {
Self::Scope(event) => Some(&event.category),
Self::Mark(event) => event.category.as_ref(),
}
}
pub fn category_profile(&self) -> Option<&CategoryProfile> {
match self {
Self::Scope(event) => event.category_profile.as_ref(),
Self::Mark(event) => event.category_profile.as_ref(),
}
}
pub fn category_profile_mut(&mut self) -> Option<&mut CategoryProfile> {
match self {
Self::Scope(event) => event.category_profile.as_mut(),
Self::Mark(event) => event.category_profile.as_mut(),
}
}
pub fn parent_uuid(&self) -> Option<Uuid> {
self.base().parent_uuid
}
pub fn uuid(&self) -> Uuid {
self.base().uuid
}
pub fn timestamp(&self) -> &DateTime<Utc> {
&self.base().timestamp
}
pub fn name(&self) -> &str {
self.base().name.as_str()
}
pub fn data(&self) -> Option<&Json> {
self.base().data.as_ref()
}
pub fn data_schema(&self) -> Option<&DataSchema> {
self.base().data_schema.as_ref()
}
pub fn metadata(&self) -> Option<&Json> {
self.base().metadata.as_ref()
}
pub fn attributes(&self) -> Option<&[String]> {
match self {
Self::Scope(event) => Some(event.attributes.as_slice()),
Self::Mark(_) => None,
}
}
pub fn scope_type(&self) -> Option<ScopeType> {
self.category().map(EventCategory::to_scope_type)
}
pub fn input(&self) -> Option<&Json> {
match self {
Self::Scope(event) if event.scope_category == ScopeCategory::Start => {
event.base.data.as_ref()
}
_ => None,
}
}
pub fn output(&self) -> Option<&Json> {
match self {
Self::Scope(event) if event.scope_category == ScopeCategory::End => {
event.base.data.as_ref()
}
_ => None,
}
}
pub fn model_name(&self) -> Option<&str> {
self.category_profile()
.and_then(|profile| profile.model_name.as_deref())
}
pub fn tool_call_id(&self) -> Option<&str> {
self.category_profile()
.and_then(|profile| profile.tool_call_id.as_deref())
}
pub fn annotated_request(&self) -> Option<&Arc<AnnotatedLlmRequest>> {
self.category_profile()
.and_then(|profile| profile.annotated_request.as_ref())
}
pub fn annotated_response(&self) -> Option<&Arc<AnnotatedLlmResponse>> {
self.category_profile()
.and_then(|profile| profile.annotated_response.as_ref())
}
pub fn is_scope_start(&self) -> bool {
matches!(
self,
Self::Scope(ScopeEvent {
scope_category: ScopeCategory::Start,
..
})
)
}
pub fn is_scope_end(&self) -> bool {
matches!(
self,
Self::Scope(ScopeEvent {
scope_category: ScopeCategory::End,
..
})
)
}
fn base(&self) -> &BaseEvent {
match self {
Self::Scope(event) => &event.base,
Self::Mark(event) => &event.base,
}
}
}
pub fn attributes_from_handle(attributes: HandleAttributes) -> Vec<String> {
match attributes {
HandleAttributes::Scope(attributes) => scope_attributes_to_strings(attributes),
HandleAttributes::Tool(attributes) => tool_attributes_to_strings(attributes),
HandleAttributes::Llm(attributes) => llm_attributes_to_strings(attributes),
}
}
pub fn scope_attributes_to_strings(attributes: ScopeAttributes) -> Vec<String> {
let mut values = Vec::new();
if attributes.contains(ScopeAttributes::PARALLEL) {
values.push("parallel".to_string());
}
if attributes.contains(ScopeAttributes::RELOCATABLE) {
values.push("relocatable".to_string());
}
values
}
pub fn tool_attributes_to_strings(attributes: ToolAttributes) -> Vec<String> {
let mut values = Vec::new();
if attributes.contains(ToolAttributes::REMOTE) {
values.push("remote".to_string());
}
values
}
pub fn llm_attributes_to_strings(attributes: LlmAttributes) -> Vec<String> {
let mut values = Vec::new();
if attributes.contains(LlmAttributes::STATEFUL) {
values.push("stateful".to_string());
}
if attributes.contains(LlmAttributes::STREAMING) {
values.push("streaming".to_string());
}
values
}
fn canonicalize_attributes(mut attributes: Vec<String>) -> Vec<String> {
attributes.sort();
attributes.dedup();
attributes
}
mod timestamp {
use chrono::{DateTime, Utc};
use serde::{
Deserializer, Serializer,
de::{self, Visitor},
};
use std::fmt;
pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&value.to_rfc3339())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(TimestampVisitor)
}
struct TimestampVisitor;
impl<'de> Visitor<'de> for TimestampVisitor {
type Value = DateTime<Utc>;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("an RFC 3339 timestamp string or epoch microseconds integer")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
DateTime::parse_from_rfc3339(value)
.map(|timestamp| timestamp.with_timezone(&Utc))
.map_err(E::custom)
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
DateTime::<Utc>::from_timestamp_micros(value)
.ok_or_else(|| E::custom("epoch microseconds value is out of range"))
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
let value = i64::try_from(value)
.map_err(|_| E::custom("epoch microseconds value is out of range"))?;
self.visit_i64(value)
}
}
}