use std::collections::BTreeMap;
use std::sync::Arc;
use async_trait::async_trait;
use base64::Engine as _;
use serde::Serialize;
use serde_json::{json, Value};
use tracing::warn;
use uuid::Uuid;
use crate::core::tracing::events::{EventKind, TraceEvent};
use crate::core::tracing::pipeline::{ExportMetadata, Exporter};
use crate::telemetry::backends::langfuse::config::LangfuseConfig;
pub struct LangfuseExporter {
config: LangfuseConfig,
pub tags: Vec<String>,
pub workflow_name: String,
client: reqwest::Client,
}
impl LangfuseExporter {
pub fn new(config: LangfuseConfig) -> Self {
Self {
config,
tags: Vec::new(),
workflow_name: "operonx".into(),
client: reqwest::Client::new(),
}
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn with_workflow_name(mut self, name: impl Into<String>) -> Self {
self.workflow_name = name.into();
self
}
fn auth_header(&self) -> String {
let creds = format!("{}:{}", self.config.public_key, self.config.secret_key);
base64::engine::general_purpose::STANDARD.encode(creds)
}
fn ingestion_url(&self) -> String {
let host = self.config.host.trim_end_matches('/');
format!("{}/api/public/ingestion", host)
}
fn build_batch(
&self,
events: Vec<TraceEvent>,
request_id: &str,
metadata: &ExportMetadata,
) -> Vec<BatchItem> {
let workflow_name = metadata
.workflow_name
.clone()
.unwrap_or_else(|| self.workflow_name.clone());
let mut tags = self.tags.clone();
tags.extend(metadata.tags.iter().cloned());
let mut items: Vec<BatchItem> = Vec::new();
items.push(BatchItem::new(
"trace-create",
json!({
"id": request_id,
"name": workflow_name,
"userId": metadata.user_id,
"sessionId": metadata.session_id,
"tags": tags,
"timestamp": events
.first()
.map(|e| e.timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)),
}),
));
let records = gather_op_records(events);
for r in records.into_values() {
let span_id = r.span_id.clone();
let parent = parent_observation_id(&r.ctx, &records_map(&r));
let item_type = if r.has_llm_usage {
"generation-create"
} else {
"span-create"
};
let mut body = json!({
"id": span_id,
"traceId": request_id,
"parentObservationId": parent,
"name": r.op_name,
"startTime": r.start_time,
"endTime": r.end_time,
"input": r.inputs,
"output": r.outputs,
"metadata": r.metadata,
"level": if r.status == "error" { "ERROR" } else { "DEFAULT" },
"statusMessage": r.status_message,
});
if r.has_llm_usage {
if let Value::Object(ref mut map) = body {
map.insert("model".into(), Value::String(r.llm_model.clone()));
map.insert(
"usage".into(),
json!({
"promptTokens": r.prompt_tokens,
"completionTokens": r.completion_tokens,
"totalTokens": r.total_tokens,
}),
);
}
}
items.push(BatchItem::new(item_type, body));
}
items
}
}
#[async_trait]
impl Exporter for LangfuseExporter {
fn name(&self) -> &'static str {
"LangfuseExporter"
}
async fn export(&self, events: Vec<TraceEvent>, request_id: String, metadata: ExportMetadata) {
if !self.config.enabled || events.is_empty() {
return;
}
let batch = self.build_batch(events, &request_id, &metadata);
if batch.is_empty() {
return;
}
let body = json!({"batch": batch});
let url = self.ingestion_url();
let resp = self
.client
.post(&url)
.header("Authorization", format!("Basic {}", self.auth_header()))
.header("Content-Type", "application/json")
.json(&body)
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {}
Ok(r) => {
let status = r.status();
let text = r.text().await.unwrap_or_default();
warn!(
"LangfuseExporter: ingest failed status={} body={}",
status,
truncate(&text, 200)
);
}
Err(e) => warn!("LangfuseExporter: HTTP error: {}", e),
}
}
}
#[derive(Debug, Clone, Serialize)]
struct BatchItem {
id: String,
timestamp: String,
#[serde(rename = "type")]
item_type: String,
body: Value,
}
impl BatchItem {
fn new(item_type: &str, body: Value) -> Self {
Self {
id: Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
item_type: item_type.into(),
body,
}
}
}
#[derive(Debug, Clone)]
struct OpRecord {
span_id: String,
op_name: String,
ctx: Vec<String>,
start_time: Option<String>,
end_time: Option<String>,
inputs: Value,
outputs: Value,
metadata: Value,
status: String,
status_message: Option<String>,
has_llm_usage: bool,
llm_model: String,
prompt_tokens: u64,
completion_tokens: u64,
total_tokens: u64,
yield_count: u32,
}
impl OpRecord {
fn empty(op_name: String, ctx: Vec<String>) -> Self {
Self {
span_id: Uuid::new_v4().to_string(),
op_name,
ctx,
start_time: None,
end_time: None,
inputs: Value::Null,
outputs: Value::Null,
metadata: json!({}),
status: "ok".into(),
status_message: None,
has_llm_usage: false,
llm_model: String::new(),
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
yield_count: 0,
}
}
}
fn gather_op_records(events: Vec<TraceEvent>) -> BTreeMap<(String, Vec<String>), OpRecord> {
let mut records: BTreeMap<(String, Vec<String>), OpRecord> = BTreeMap::new();
for e in events {
let Some(op_name) = e.op_name.clone() else {
continue; };
let key = (op_name.clone(), e.ctx.clone());
let rec = records
.entry(key)
.or_insert_with(|| OpRecord::empty(op_name, e.ctx.clone()));
match e.kind {
EventKind::OpStart => {
rec.start_time = Some(
e.timestamp
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
);
rec.inputs = e.payload.get("inputs").cloned().unwrap_or(Value::Null);
}
EventKind::OpEnd => {
rec.end_time = Some(
e.timestamp
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
);
rec.outputs = e.payload.get("outputs").cloned().unwrap_or(Value::Null);
if let Some(status) = e.payload.get("status").and_then(|v| v.as_str()) {
rec.status = status.into();
if status == "error" {
rec.status_message = e
.payload
.get("error")
.and_then(|v| v.as_str())
.map(String::from);
}
}
}
EventKind::OpYield => {
rec.yield_count += 1;
if let Some(Value::Object(map)) = Some(&mut rec.metadata) {
map.insert("yield_count".into(), json!(rec.yield_count));
if let Some(y) = e.payload.get("yielded") {
map.insert("last_yielded".into(), y.clone());
}
}
}
EventKind::Annotation => {
let key = e.payload.get("key").and_then(|v| v.as_str()).unwrap_or("");
let value = e.payload.get("value").cloned().unwrap_or(Value::Null);
if let Value::Object(map) = &mut rec.metadata {
map.insert(key.into(), value);
}
}
EventKind::LlmUsage => {
rec.has_llm_usage = true;
rec.llm_model = e
.payload
.get("model")
.and_then(|v| v.as_str())
.unwrap_or("")
.into();
rec.prompt_tokens = e
.payload
.get("prompt_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
rec.completion_tokens = e
.payload
.get("completion_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
rec.total_tokens = e
.payload
.get("total_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
}
_ => {}
}
}
records
}
fn parent_observation_id(
ctx: &[String],
all_records: &BTreeMap<(String, Vec<String>), OpRecord>,
) -> Option<String> {
if ctx.len() <= 1 {
return None;
}
for prefix_len in (1..ctx.len()).rev() {
let prefix: Vec<String> = ctx[..prefix_len].to_vec();
for ((_op, rec_ctx), rec) in all_records.iter() {
if rec_ctx == &prefix {
return Some(rec.span_id.clone());
}
}
}
None
}
fn records_map(_anchor: &OpRecord) -> BTreeMap<(String, Vec<String>), OpRecord> {
BTreeMap::new()
}
fn truncate(s: &str, n: usize) -> String {
if s.len() <= n {
s.to_string()
} else {
format!("{}...", &s[..n])
}
}
#[allow(dead_code)]
fn _arc_kept_for_shared_client() -> Arc<()> {
Arc::new(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::tracing::events::EventKind;
use chrono::Utc;
fn make_ev(
seq: u64,
kind: EventKind,
op_name: &str,
ctx: &[&str],
payload: serde_json::Map<String, Value>,
) -> TraceEvent {
let mut p = BTreeMap::new();
for (k, v) in payload {
p.insert(k, v);
}
TraceEvent {
event_id: format!("e{seq}"),
request_id: "r1".into(),
kind,
op_name: Some(op_name.into()),
ctx: ctx.iter().map(|s| s.to_string()).collect(),
timestamp: Utc::now(),
seq,
payload: p,
}
}
#[test]
fn build_batch_emits_trace_then_spans() {
let cfg = LangfuseConfig {
host: "https://lf.example".into(),
public_key: "pk".into(),
secret_key: "sk".into(),
enabled: true,
no_proxy: None,
sample_rate: 1.0,
trace_filter: None,
};
let exporter = LangfuseExporter::new(cfg);
let mut start_payload = serde_json::Map::new();
start_payload.insert("inputs".into(), json!({"x": 1}));
let mut end_payload = serde_json::Map::new();
end_payload.insert("outputs".into(), json!({"y": 2}));
end_payload.insert("status".into(), json!("ok"));
end_payload.insert("duration_ms".into(), json!(12.5));
let events = vec![
make_ev(0, EventKind::OpStart, "main.op", &["main"], start_payload),
make_ev(1, EventKind::OpEnd, "main.op", &["main"], end_payload),
];
let batch = exporter.build_batch(events, "r1", &ExportMetadata::default());
assert_eq!(batch.len(), 2);
assert_eq!(batch[0].item_type, "trace-create");
assert_eq!(batch[1].item_type, "span-create");
let body = batch[1].body.as_object().unwrap();
assert_eq!(body.get("name").and_then(|v| v.as_str()), Some("main.op"));
assert_eq!(body.get("input"), Some(&json!({"x": 1})));
assert_eq!(body.get("output"), Some(&json!({"y": 2})));
}
#[test]
fn build_batch_emits_generation_create_when_llm_usage_present() {
let cfg = LangfuseConfig {
host: "https://lf.example".into(),
public_key: "pk".into(),
secret_key: "sk".into(),
enabled: true,
no_proxy: None,
sample_rate: 1.0,
trace_filter: None,
};
let exporter = LangfuseExporter::new(cfg);
let mut start_payload = serde_json::Map::new();
start_payload.insert("inputs".into(), json!({"messages": []}));
let mut end_payload = serde_json::Map::new();
end_payload.insert("outputs".into(), json!({"content": "hi"}));
end_payload.insert("status".into(), json!("ok"));
let mut usage_payload = serde_json::Map::new();
usage_payload.insert("model".into(), json!("gpt-4o"));
usage_payload.insert("prompt_tokens".into(), json!(10));
usage_payload.insert("completion_tokens".into(), json!(20));
usage_payload.insert("total_tokens".into(), json!(30));
let events = vec![
make_ev(0, EventKind::OpStart, "main.llm", &["main"], start_payload),
make_ev(1, EventKind::LlmUsage, "main.llm", &["main"], usage_payload),
make_ev(2, EventKind::OpEnd, "main.llm", &["main"], end_payload),
];
let batch = exporter.build_batch(events, "r1", &ExportMetadata::default());
assert_eq!(batch.len(), 2);
assert_eq!(batch[1].item_type, "generation-create");
let body = batch[1].body.as_object().unwrap();
assert_eq!(body.get("model").and_then(|v| v.as_str()), Some("gpt-4o"));
let usage = body.get("usage").unwrap();
assert_eq!(usage.get("totalTokens").and_then(|v| v.as_u64()), Some(30));
}
}