trace-weft-server 0.2.0

Axum API and query layer for TraceWeft (SQLite, Postgres, auth, OTLP ingest)
Documentation
use clickhouse::{Client, Row};
use serde::{Deserialize, Serialize};
use trace_weft_core::SpanRecord;

#[derive(Row, Serialize, Deserialize)]
pub struct ClickHouseSpan {
    pub trace_id: String,
    pub span_id: String,
    pub parent_span_id: String,
    pub run_id: String,
    pub span_kind: String,
    pub name: String,
    pub start_time: i64,
    pub end_time: i64,
    pub status: String,
    pub latency_ms: i64,
    pub input_tokens: i64,
    pub output_tokens: i64,
    pub total_tokens: i64,
    pub model_provider: String,
    pub model_name: String,
}

impl From<&SpanRecord> for ClickHouseSpan {
    fn from(span: &SpanRecord) -> Self {
        let input_tokens = span
            .token_usage
            .as_ref()
            .map(|t| t.input as i64)
            .unwrap_or(0);
        let output_tokens = span
            .token_usage
            .as_ref()
            .map(|t| t.output as i64)
            .unwrap_or(0);

        Self {
            trace_id: span.trace_id.0.to_string(),
            span_id: span.span_id.0.to_string(),
            parent_span_id: span
                .parent_span_id
                .map(|id| id.0.to_string())
                .unwrap_or_default(),
            run_id: span.run_id.0.to_string(),
            span_kind: format!("{:?}", span.span_kind),
            name: span.name.clone(),
            start_time: span.start_time as i64,
            end_time: span.end_time.unwrap_or(span.start_time) as i64,
            status: format!("{:?}", span.status),
            latency_ms: span.latency_ms.unwrap_or(0) as i64,
            input_tokens,
            output_tokens,
            total_tokens: input_tokens + output_tokens,
            model_provider: span.model_provider.clone().unwrap_or_default(),
            model_name: span.model_name.clone().unwrap_or_default(),
        }
    }
}

pub struct ClickHouseAnalytics {
    client: Client,
}

impl ClickHouseAnalytics {
    pub fn new(url: &str, user: &str, password: &str, database: &str) -> Self {
        let client = Client::default()
            .with_url(url)
            .with_user(user)
            .with_password(password)
            .with_database(database);

        Self { client }
    }

    pub async fn ingest_batch(&self, spans: &[SpanRecord]) -> anyhow::Result<()> {
        let mut insert = self.client.insert("spans_buffer")?;
        for span in spans {
            let ch_span: ClickHouseSpan = span.into();
            insert.write(&ch_span).await?;
        }
        insert.end().await?;
        Ok(())
    }
}