trace_weft_server/storage/
analytics.rs1use clickhouse::{Client, Row};
2use serde::{Deserialize, Serialize};
3use trace_weft_core::SpanRecord;
4
5#[derive(Row, Serialize, Deserialize)]
6pub struct ClickHouseSpan {
7 pub trace_id: String,
8 pub span_id: String,
9 pub parent_span_id: String,
10 pub run_id: String,
11 pub span_kind: String,
12 pub name: String,
13 pub start_time: i64,
14 pub end_time: i64,
15 pub status: String,
16 pub latency_ms: i64,
17 pub input_tokens: i64,
18 pub output_tokens: i64,
19 pub total_tokens: i64,
20 pub model_provider: String,
21 pub model_name: String,
22}
23
24impl From<&SpanRecord> for ClickHouseSpan {
25 fn from(span: &SpanRecord) -> Self {
26 let input_tokens = span
27 .token_usage
28 .as_ref()
29 .map(|t| t.input as i64)
30 .unwrap_or(0);
31 let output_tokens = span
32 .token_usage
33 .as_ref()
34 .map(|t| t.output as i64)
35 .unwrap_or(0);
36
37 Self {
38 trace_id: span.trace_id.0.to_string(),
39 span_id: span.span_id.0.to_string(),
40 parent_span_id: span
41 .parent_span_id
42 .map(|id| id.0.to_string())
43 .unwrap_or_default(),
44 run_id: span.run_id.0.to_string(),
45 span_kind: format!("{:?}", span.span_kind),
46 name: span.name.clone(),
47 start_time: span.start_time as i64,
48 end_time: span.end_time.unwrap_or(span.start_time) as i64,
49 status: format!("{:?}", span.status),
50 latency_ms: span.latency_ms.unwrap_or(0) as i64,
51 input_tokens,
52 output_tokens,
53 total_tokens: input_tokens + output_tokens,
54 model_provider: span.model_provider.clone().unwrap_or_default(),
55 model_name: span.model_name.clone().unwrap_or_default(),
56 }
57 }
58}
59
60pub struct ClickHouseAnalytics {
61 client: Client,
62}
63
64impl ClickHouseAnalytics {
65 pub fn new(url: &str, user: &str, password: &str, database: &str) -> Self {
66 let client = Client::default()
67 .with_url(url)
68 .with_user(user)
69 .with_password(password)
70 .with_database(database);
71
72 Self { client }
73 }
74
75 pub async fn ingest_batch(&self, spans: &[SpanRecord]) -> anyhow::Result<()> {
76 let mut insert = self.client.insert("spans_buffer")?;
77 for span in spans {
78 let ch_span: ClickHouseSpan = span.into();
79 insert.write(&ch_span).await?;
80 }
81 insert.end().await?;
82 Ok(())
83 }
84}