Skip to main content

trace_weft_server/storage/
analytics.rs

1use clickhouse::{Client, Row};
2use serde::{Deserialize, Serialize};
3use trace_weft_core::SpanRecord;
4
5#[derive(Row, Serialize, Deserialize)]
6pub struct ClickHouseSpan {
7    pub trace_id: String,
8    pub span_id: String,
9    pub parent_span_id: String,
10    pub run_id: String,
11    pub span_kind: String,
12    pub name: String,
13    pub start_time: i64,
14    pub end_time: i64,
15    pub status: String,
16    pub latency_ms: i64,
17    pub input_tokens: i64,
18    pub output_tokens: i64,
19    pub total_tokens: i64,
20    pub model_provider: String,
21    pub model_name: String,
22}
23
24impl From<&SpanRecord> for ClickHouseSpan {
25    fn from(span: &SpanRecord) -> Self {
26        let input_tokens = span
27            .token_usage
28            .as_ref()
29            .map(|t| t.input as i64)
30            .unwrap_or(0);
31        let output_tokens = span
32            .token_usage
33            .as_ref()
34            .map(|t| t.output as i64)
35            .unwrap_or(0);
36
37        Self {
38            trace_id: span.trace_id.0.to_string(),
39            span_id: span.span_id.0.to_string(),
40            parent_span_id: span
41                .parent_span_id
42                .map(|id| id.0.to_string())
43                .unwrap_or_default(),
44            run_id: span.run_id.0.to_string(),
45            span_kind: format!("{:?}", span.span_kind),
46            name: span.name.clone(),
47            start_time: span.start_time as i64,
48            end_time: span.end_time.unwrap_or(span.start_time) as i64,
49            status: format!("{:?}", span.status),
50            latency_ms: span.latency_ms.unwrap_or(0) as i64,
51            input_tokens,
52            output_tokens,
53            total_tokens: input_tokens + output_tokens,
54            model_provider: span.model_provider.clone().unwrap_or_default(),
55            model_name: span.model_name.clone().unwrap_or_default(),
56        }
57    }
58}
59
60pub struct ClickHouseAnalytics {
61    client: Client,
62}
63
64impl ClickHouseAnalytics {
65    pub fn new(url: &str, user: &str, password: &str, database: &str) -> Self {
66        let client = Client::default()
67            .with_url(url)
68            .with_user(user)
69            .with_password(password)
70            .with_database(database);
71
72        Self { client }
73    }
74
75    pub async fn ingest_batch(&self, spans: &[SpanRecord]) -> anyhow::Result<()> {
76        let mut insert = self.client.insert("spans_buffer")?;
77        for span in spans {
78            let ch_span: ClickHouseSpan = span.into();
79            insert.write(&ch_span).await?;
80        }
81        insert.end().await?;
82        Ok(())
83    }
84}