use langfuse_core::config::{LangfuseConfig, SpanFilterFn};
use langfuse_core::error::LangfuseError;
use opentelemetry::Context;
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider, SpanData, SpanProcessor,
};
use std::collections::HashMap;
use std::time::Duration;
pub struct LangfuseTracing {
provider: SdkTracerProvider,
}
impl std::fmt::Debug for LangfuseTracing {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LangfuseTracing")
.field("provider", &"<SdkTracerProvider>")
.finish()
}
}
impl LangfuseTracing {
pub fn builder(config: &LangfuseConfig) -> LangfuseTracingBuilder {
LangfuseTracingBuilder {
config: config.clone(),
}
}
pub fn provider(&self) -> &SdkTracerProvider {
&self.provider
}
pub fn shutdown(&self) -> Result<(), opentelemetry_sdk::error::OTelSdkError> {
self.provider.shutdown()
}
}
pub struct LangfuseTracingBuilder {
config: LangfuseConfig,
}
impl std::fmt::Debug for LangfuseTracingBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LangfuseTracingBuilder")
.field("config", &self.config)
.finish()
}
}
impl LangfuseTracingBuilder {
pub fn build(self) -> Result<LangfuseTracing, LangfuseError> {
let mut headers = HashMap::new();
headers.insert("Authorization".to_string(), self.config.basic_auth_header());
if let Some(ref extra) = self.config.additional_headers {
for (k, v) in extra {
headers.insert(k.clone(), v.clone());
}
}
let exporter = SpanExporter::builder()
.with_http()
.with_endpoint(format!(
"{}/api/public/otel/v1/traces",
self.config.base_url.trim_end_matches('/')
))
.with_protocol(Protocol::HttpBinary)
.with_timeout(self.config.timeout)
.with_headers(headers)
.build()
.map_err(|e| LangfuseError::Otel(e.to_string()))?;
let batch_config = BatchConfigBuilder::default()
.with_max_queue_size(self.config.flush_at * 4)
.with_scheduled_delay(self.config.flush_interval)
.with_max_export_batch_size(self.config.flush_at)
.build();
let processor = BatchSpanProcessor::builder(exporter)
.with_batch_config(batch_config)
.build();
let span_filter = self.config.should_export_span.clone();
let service_name =
std::env::var("OTEL_SERVICE_NAME").unwrap_or_else(|_| "unknown_service".to_string());
let mut attrs = vec![
opentelemetry::KeyValue::new("sdk.name", "langfuse-rs"),
opentelemetry::KeyValue::new("sdk.version", env!("CARGO_PKG_VERSION")),
opentelemetry::KeyValue::new("service.name", service_name),
];
if let Some(ref env) = self.config.environment {
attrs.push(opentelemetry::KeyValue::new(
"deployment.environment",
env.clone(),
));
}
if let Some(ref rel) = self.config.release {
attrs.push(opentelemetry::KeyValue::new("service.version", rel.clone()));
}
let resource = opentelemetry_sdk::Resource::builder()
.with_attributes(attrs)
.build();
let mut builder = SdkTracerProvider::builder().with_resource(resource);
if let Some(filter) = span_filter {
builder = builder.with_span_processor(FilteringSpanProcessor::new(processor, filter));
} else {
builder = builder.with_span_processor(processor);
}
let provider = builder.build();
Ok(LangfuseTracing { provider })
}
}
struct FilteringSpanProcessor {
inner: BatchSpanProcessor,
filter: SpanFilterFn,
}
impl std::fmt::Debug for FilteringSpanProcessor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FilteringSpanProcessor")
.field("inner", &self.inner)
.field("filter", &"<filter fn>")
.finish()
}
}
impl FilteringSpanProcessor {
fn new(inner: BatchSpanProcessor, filter: SpanFilterFn) -> Self {
Self { inner, filter }
}
}
impl SpanProcessor for FilteringSpanProcessor {
fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, cx: &Context) {
self.inner.on_start(span, cx);
}
fn on_end(&self, span: SpanData) {
if (self.filter)(&span) {
self.inner.on_end(span);
}
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
}