Skip to main content

juncture_telemetry/
langfuse.rs

1//! Langfuse cloud exporter.
2//!
3//! Sends telemetry data to Langfuse cloud via the REST ingestion API.
4//! Runs asynchronously in the background, non-blocking to the hot path.
5
6use tracing::debug;
7
8use crate::models::{Observation, Trace};
9
10/// Configuration for Langfuse cloud export.
11#[derive(Clone, Debug)]
12pub struct LangfuseConfig {
13    /// Langfuse public key.
14    pub public_key: String,
15    /// Langfuse secret key.
16    pub secret_key: String,
17    /// Langfuse API base URL (e.g., `https://cloud.langfuse.com`).
18    pub base_url: String,
19}
20
21/// Exports telemetry data to Langfuse cloud via REST API.
22///
23/// Uses the `/api/public/ingestion` endpoint with Basic Auth.
24/// Supports `trace-create`, `generation-create`, and `span-create` types.
25#[derive(Clone, Debug)]
26pub struct LangfuseExporter {
27    config: LangfuseConfig,
28    client: reqwest::Client,
29}
30
31impl LangfuseExporter {
32    /// Create a new Langfuse exporter.
33    #[must_use]
34    pub fn new(config: LangfuseConfig) -> Self {
35        Self {
36            config,
37            client: reqwest::Client::new(),
38        }
39    }
40
41    /// Export a trace and its observations to Langfuse cloud.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if the HTTP request fails or Langfuse returns errors.
46    pub async fn export(
47        &self,
48        trace: &Trace,
49        observations: &[Observation],
50    ) -> Result<(), LangfuseExportError> {
51        let now = chrono::Utc::now().to_rfc3339();
52        let mut batch = Vec::new();
53        batch.push(Self::build_trace_item(trace, &now));
54        for obs in observations {
55            batch.push(Self::build_obs_item(obs, &now));
56        }
57        self.send_batch(&batch).await
58    }
59
60    fn build_trace_item(trace: &Trace, now: &str) -> serde_json::Value {
61        serde_json::json!({
62            "id": format!("juncture-trace-{}", trace.id),
63            "type": "trace-create",
64            "timestamp": now,
65            "body": {
66                "id": trace.id.to_string(),
67                "name": trace.name,
68                "sessionId": trace.session_id,
69                "userId": trace.user_id,
70                "tags": trace.tags,
71                "metadata": trace.metadata,
72                "input": trace.input,
73                "output": trace.output,
74            }
75        })
76    }
77
78    fn build_obs_item(obs: &Observation, now: &str) -> serde_json::Value {
79        let obs_type = match obs.observation_type {
80            crate::models::ObservationType::Generation => "generation-create",
81            _ => "span-create",
82        };
83
84        let mut body = serde_json::json!({
85            "id": obs.id.to_string(),
86            "traceId": obs.trace_id.to_string(),
87            "name": obs.name,
88            "startTime": obs.start_time.to_rfc3339(),
89            "endTime": obs.end_time.map(|t| t.to_rfc3339()),
90            "input": obs.input,
91            "output": obs.output,
92            "metadata": obs.metadata,
93            "level": obs.level.as_str(),
94        });
95
96        if let Some(ref model) = obs.model {
97            body["model"] = serde_json::Value::String(model.clone());
98        }
99        if let Some(ref usage) = obs.usage {
100            body["usageDetails"] = serde_json::json!({
101                "input": usage.input_tokens,
102                "output": usage.output_tokens,
103                "total": usage.total_tokens,
104            });
105        }
106        if let Some(cost) = obs.cost {
107            body["costDetails"] = serde_json::json!({"total": cost});
108        }
109        if let Some(parent_id) = obs.parent_observation_id {
110            body["parentObservationId"] = serde_json::Value::String(parent_id.to_string());
111        }
112
113        serde_json::json!({
114            "id": format!("juncture-obs-{}", obs.id),
115            "type": obs_type,
116            "timestamp": now,
117            "body": body,
118        })
119    }
120
121    async fn send_batch(&self, batch: &[serde_json::Value]) -> Result<(), LangfuseExportError> {
122        let resp = self
123            .client
124            .post(format!("{}/api/public/ingestion", self.config.base_url))
125            .basic_auth(&self.config.public_key, Some(&self.config.secret_key))
126            .json(&serde_json::json!({"batch": batch}))
127            .send()
128            .await
129            .map_err(|e| LangfuseExportError::Network(e.to_string()))?;
130
131        let status = resp.status();
132        let body: serde_json::Value = resp
133            .json()
134            .await
135            .map_err(|e| LangfuseExportError::Network(e.to_string()))?;
136
137        if !status.is_success() {
138            return Err(LangfuseExportError::Http(status.as_u16(), body.to_string()));
139        }
140
141        let error_count = body["errors"].as_array().map_or(0, Vec::len);
142        if error_count > 0 {
143            let msgs: Vec<String> = body["errors"].as_array().map_or_else(Vec::new, |arr| {
144                arr.iter()
145                    .filter_map(|e| e["message"].as_str().map(String::from))
146                    .collect()
147            });
148            return Err(LangfuseExportError::Langfuse(msgs.join("; ")));
149        }
150
151        debug!("langfuse export: {} items sent", batch.len());
152        Ok(())
153    }
154}
155
156/// Errors from Langfuse cloud export.
157#[derive(Debug, thiserror::Error)]
158pub enum LangfuseExportError {
159    /// Network error.
160    #[error("network error: {0}")]
161    Network(String),
162    /// HTTP error status.
163    #[error("HTTP {0}: {1}")]
164    Http(u16, String),
165    /// Langfuse API returned errors.
166    #[error("langfuse errors: {0}")]
167    Langfuse(String),
168}