juncture_telemetry/
langfuse.rs1use tracing::debug;
7
8use crate::models::{Observation, Trace};
9
10#[derive(Clone, Debug)]
12pub struct LangfuseConfig {
13 pub public_key: String,
15 pub secret_key: String,
17 pub base_url: String,
19}
20
21#[derive(Clone, Debug)]
26pub struct LangfuseExporter {
27 config: LangfuseConfig,
28 client: reqwest::Client,
29}
30
31impl LangfuseExporter {
32 #[must_use]
34 pub fn new(config: LangfuseConfig) -> Self {
35 Self {
36 config,
37 client: reqwest::Client::new(),
38 }
39 }
40
41 pub async fn export(
47 &self,
48 trace: &Trace,
49 observations: &[Observation],
50 ) -> Result<(), LangfuseExportError> {
51 let now = chrono::Utc::now().to_rfc3339();
52 let mut batch = Vec::new();
53 batch.push(Self::build_trace_item(trace, &now));
54 for obs in observations {
55 batch.push(Self::build_obs_item(obs, &now));
56 }
57 self.send_batch(&batch).await
58 }
59
60 fn build_trace_item(trace: &Trace, now: &str) -> serde_json::Value {
61 serde_json::json!({
62 "id": format!("juncture-trace-{}", trace.id),
63 "type": "trace-create",
64 "timestamp": now,
65 "body": {
66 "id": trace.id.to_string(),
67 "name": trace.name,
68 "sessionId": trace.session_id,
69 "userId": trace.user_id,
70 "tags": trace.tags,
71 "metadata": trace.metadata,
72 "input": trace.input,
73 "output": trace.output,
74 }
75 })
76 }
77
78 fn build_obs_item(obs: &Observation, now: &str) -> serde_json::Value {
79 let obs_type = match obs.observation_type {
80 crate::models::ObservationType::Generation => "generation-create",
81 _ => "span-create",
82 };
83
84 let mut body = serde_json::json!({
85 "id": obs.id.to_string(),
86 "traceId": obs.trace_id.to_string(),
87 "name": obs.name,
88 "startTime": obs.start_time.to_rfc3339(),
89 "endTime": obs.end_time.map(|t| t.to_rfc3339()),
90 "input": obs.input,
91 "output": obs.output,
92 "metadata": obs.metadata,
93 "level": obs.level.as_str(),
94 });
95
96 if let Some(ref model) = obs.model {
97 body["model"] = serde_json::Value::String(model.clone());
98 }
99 if let Some(ref usage) = obs.usage {
100 body["usageDetails"] = serde_json::json!({
101 "input": usage.input_tokens,
102 "output": usage.output_tokens,
103 "total": usage.total_tokens,
104 });
105 }
106 if let Some(cost) = obs.cost {
107 body["costDetails"] = serde_json::json!({"total": cost});
108 }
109 if let Some(parent_id) = obs.parent_observation_id {
110 body["parentObservationId"] = serde_json::Value::String(parent_id.to_string());
111 }
112
113 serde_json::json!({
114 "id": format!("juncture-obs-{}", obs.id),
115 "type": obs_type,
116 "timestamp": now,
117 "body": body,
118 })
119 }
120
121 async fn send_batch(&self, batch: &[serde_json::Value]) -> Result<(), LangfuseExportError> {
122 let resp = self
123 .client
124 .post(format!("{}/api/public/ingestion", self.config.base_url))
125 .basic_auth(&self.config.public_key, Some(&self.config.secret_key))
126 .json(&serde_json::json!({"batch": batch}))
127 .send()
128 .await
129 .map_err(|e| LangfuseExportError::Network(e.to_string()))?;
130
131 let status = resp.status();
132 let body: serde_json::Value = resp
133 .json()
134 .await
135 .map_err(|e| LangfuseExportError::Network(e.to_string()))?;
136
137 if !status.is_success() {
138 return Err(LangfuseExportError::Http(status.as_u16(), body.to_string()));
139 }
140
141 let error_count = body["errors"].as_array().map_or(0, Vec::len);
142 if error_count > 0 {
143 let msgs: Vec<String> = body["errors"].as_array().map_or_else(Vec::new, |arr| {
144 arr.iter()
145 .filter_map(|e| e["message"].as_str().map(String::from))
146 .collect()
147 });
148 return Err(LangfuseExportError::Langfuse(msgs.join("; ")));
149 }
150
151 debug!("langfuse export: {} items sent", batch.len());
152 Ok(())
153 }
154}
155
156#[derive(Debug, thiserror::Error)]
158pub enum LangfuseExportError {
159 #[error("network error: {0}")]
161 Network(String),
162 #[error("HTTP {0}: {1}")]
164 Http(u16, String),
165 #[error("langfuse errors: {0}")]
167 Langfuse(String),
168}