openai_ergonomic/
langfuse_interceptor.rs

1//! Langfuse interceptor for OpenTelemetry-based LLM observability.
2//!
3//! This interceptor automatically instruments `OpenAI` API calls with OpenTelemetry spans.
4//! You must configure the OpenTelemetry tracer with Langfuse exporter separately.
5//!
6//! # Usage
7//!
8//! ```no_run
9//! # use openai_ergonomic::{Builder, Client};
10//! # use openai_ergonomic::langfuse_interceptor::{LangfuseInterceptor, LangfuseConfig};
11//! # use opentelemetry_langfuse::ExporterBuilder;
12//! # use opentelemetry_sdk::runtime::Tokio;
13//! # use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
14//! # use opentelemetry_sdk::trace::SdkTracerProvider;
15//! # use opentelemetry::global;
16//! # use opentelemetry::trace::TracerProvider;
17//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
18//! // 1. Build Langfuse exporter
19//! let exporter = ExporterBuilder::from_env()?.build()?;
20//!
21//! // 2. Create tracer provider
22//! let provider = SdkTracerProvider::builder()
23//!     .with_span_processor(BatchSpanProcessor::builder(exporter, Tokio).build())
24//!     .build();
25//!
26//! global::set_tracer_provider(provider.clone());
27//!
28//! // 3. Create interceptor with tracer
29//! let tracer = provider.tracer("openai-ergonomic");
30//! let interceptor = LangfuseInterceptor::new(tracer, LangfuseConfig::new());
31//! let client = Client::from_env()?
32//!     .with_interceptor(Box::new(interceptor))
33//!     .build();
34//!
35//! // Traces are automatically sent to Langfuse
36//! let response = client.send_chat(client.chat_simple("Hello!")).await?;
37//! # Ok(())
38//! # }
39//! ```
40
41use crate::interceptor::{
42    AfterResponseContext, BeforeRequestContext, ErrorContext, Interceptor, StreamChunkContext,
43    StreamEndContext,
44};
45use crate::Result;
46use opentelemetry::{
47    trace::{SpanKind, Tracer},
48    KeyValue,
49};
50use opentelemetry_langfuse::LangfuseContext;
51use opentelemetry_semantic_conventions::attribute::{
52    GEN_AI_OPERATION_NAME, GEN_AI_REQUEST_MAX_TOKENS, GEN_AI_REQUEST_MODEL,
53    GEN_AI_REQUEST_TEMPERATURE, GEN_AI_RESPONSE_ID, GEN_AI_SYSTEM, GEN_AI_USAGE_INPUT_TOKENS,
54    GEN_AI_USAGE_OUTPUT_TOKENS,
55};
56use serde_json::Value;
57use std::sync::{Arc, Mutex};
58use tracing::{debug, error, info};
59
60/// State managed by the Langfuse interceptor across the request lifecycle.
61///
62/// This state is passed through the interceptor hooks to maintain
63/// span information without requiring global state.
64///
65/// Uses `Mutex` for interior mutability to ensure thread-safety in async contexts.
66///
67/// The generic type `S` is the Span type from the tracer.
68pub struct LangfuseState<S = opentelemetry::global::BoxedSpan> {
69    /// The active span for this request (uses Mutex for thread-safe interior mutability)
70    pub(crate) span: Mutex<Option<S>>,
71}
72
73impl<S> Default for LangfuseState<S> {
74    fn default() -> Self {
75        Self {
76            span: Mutex::new(None),
77        }
78    }
79}
80
81/// Configuration for the Langfuse interceptor.
82#[derive(Debug, Clone)]
83pub struct LangfuseConfig {
84    /// Enable debug logging
85    pub debug: bool,
86}
87
88impl Default for LangfuseConfig {
89    fn default() -> Self {
90        Self {
91            debug: std::env::var("LANGFUSE_DEBUG")
92                .unwrap_or_else(|_| "false".to_string())
93                .parse()
94                .unwrap_or(false),
95        }
96    }
97}
98
99impl LangfuseConfig {
100    /// Create a new configuration.
101    pub fn new() -> Self {
102        Self::default()
103    }
104
105    /// Enable debug logging.
106    #[must_use]
107    pub fn with_debug(mut self, debug: bool) -> Self {
108        self.debug = debug;
109        self
110    }
111}
112
113/// Langfuse interceptor for OpenTelemetry-based observability.
114///
115/// This interceptor automatically creates spans for API calls.
116/// Spans are maintained across `before_request` and `after_response` using a global registry
117/// and request metadata, requiring no user code changes.
118///
119/// The tracer must be configured externally - this interceptor only instruments API calls.
120pub struct LangfuseInterceptor<T: Tracer + Send + Sync> {
121    config: LangfuseConfig,
122    tracer: Arc<T>,
123    context: Arc<LangfuseContext>,
124}
125
126impl<T: Tracer + Send + Sync> LangfuseInterceptor<T>
127where
128    T::Span: Send + Sync + 'static,
129{
130    /// Create a new Langfuse interceptor with the given tracer.
131    ///
132    /// The tracer should be configured to export to Langfuse using
133    /// `opentelemetry_langfuse::ExporterBuilder`.
134    ///
135    /// # Example
136    ///
137    /// ```no_run
138    /// use opentelemetry::global;
139    /// use opentelemetry::trace::TracerProvider;
140    /// use opentelemetry_langfuse::ExporterBuilder;
141    /// use opentelemetry_sdk::runtime::Tokio;
142    /// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
143    /// use opentelemetry_sdk::trace::SdkTracerProvider;
144    ///
145    /// # async fn setup() -> Result<(), Box<dyn std::error::Error>> {
146    /// // Build exporter
147    /// let exporter = ExporterBuilder::from_env()?.build()?;
148    ///
149    /// // Create tracer provider with batch processor
150    /// let provider = SdkTracerProvider::builder()
151    ///     .with_span_processor(BatchSpanProcessor::builder(exporter, Tokio).build())
152    ///     .build();
153    ///
154    /// // Set as global provider
155    /// global::set_tracer_provider(provider.clone());
156    ///
157    /// // Get tracer for interceptor
158    /// let tracer = provider.tracer("openai-ergonomic");
159    ///
160    /// // Create interceptor with tracer
161    /// use openai_ergonomic::langfuse_interceptor::{LangfuseInterceptor, LangfuseConfig};
162    /// let interceptor = LangfuseInterceptor::new(tracer, LangfuseConfig::new());
163    /// # Ok(())
164    /// # }
165    /// ```
166    pub fn new(tracer: T, config: LangfuseConfig) -> Self {
167        if config.debug {
168            info!("Langfuse interceptor initialized");
169        }
170
171        Self {
172            config,
173            tracer: Arc::new(tracer),
174            context: Arc::new(LangfuseContext::new()),
175        }
176    }
177
178    /// Set the session ID for traces created by this interceptor.
179    pub fn set_session_id(&self, session_id: impl Into<String>) {
180        self.context.set_session_id(session_id);
181    }
182
183    /// Set the user ID for traces created by this interceptor.
184    pub fn set_user_id(&self, user_id: impl Into<String>) {
185        self.context.set_user_id(user_id);
186    }
187
188    /// Add tags to traces created by this interceptor.
189    pub fn add_tags(&self, tags: Vec<String>) {
190        self.context.add_tags(tags);
191    }
192
193    /// Add a single tag to traces created by this interceptor.
194    pub fn add_tag(&self, tag: impl Into<String>) {
195        self.context.add_tag(tag);
196    }
197
198    /// Set metadata for traces created by this interceptor.
199    pub fn set_metadata(&self, metadata: serde_json::Value) {
200        self.context.set_metadata(metadata);
201    }
202
203    /// Clear all context attributes.
204    pub fn clear_context(&self) {
205        self.context.clear();
206    }
207
208    /// Get a reference to the Langfuse context.
209    pub fn context(&self) -> &Arc<LangfuseContext> {
210        &self.context
211    }
212
213    /// Extract request parameters from JSON.
214    fn extract_request_params(request_json: &str) -> serde_json::Result<Value> {
215        serde_json::from_str(request_json)
216    }
217}
218
219#[async_trait::async_trait]
220impl<T: Tracer + Send + Sync> Interceptor<LangfuseState<T::Span>> for LangfuseInterceptor<T>
221where
222    T::Span: Send + Sync + 'static,
223{
224    async fn before_request(
225        &self,
226        ctx: &mut BeforeRequestContext<'_, LangfuseState<T::Span>>,
227    ) -> Result<()> {
228        let tracer = self.tracer.as_ref();
229
230        // Build initial attributes
231        let mut attributes = vec![
232            KeyValue::new(GEN_AI_SYSTEM, "openai"),
233            KeyValue::new(GEN_AI_OPERATION_NAME, ctx.operation.to_string()),
234            KeyValue::new(GEN_AI_REQUEST_MODEL, ctx.model.to_string()),
235        ];
236
237        // Add Langfuse context attributes from this interceptor's context
238        attributes.extend(self.context.get_attributes());
239
240        // Parse request JSON and add relevant attributes
241        if let Ok(params) = Self::extract_request_params(ctx.request_json) {
242            if let Some(temperature) = params
243                .get("temperature")
244                .and_then(serde_json::Value::as_f64)
245            {
246                attributes.push(KeyValue::new(GEN_AI_REQUEST_TEMPERATURE, temperature));
247            }
248            if let Some(max_tokens) = params.get("max_tokens").and_then(serde_json::Value::as_i64) {
249                attributes.push(KeyValue::new(GEN_AI_REQUEST_MAX_TOKENS, max_tokens));
250            }
251
252            // Add messages as gen_ai.prompt attributes
253            if let Some(messages) = params.get("messages").and_then(serde_json::Value::as_array) {
254                for (i, message) in messages.iter().enumerate() {
255                    if let Some(obj) = message.as_object() {
256                        let role = obj
257                            .get("role")
258                            .and_then(serde_json::Value::as_str)
259                            .unwrap_or("unknown")
260                            .to_string();
261                        let content = obj
262                            .get("content")
263                            .and_then(serde_json::Value::as_str)
264                            .unwrap_or("")
265                            .to_string();
266
267                        attributes.push(KeyValue::new(format!("gen_ai.prompt.{i}.role"), role));
268                        attributes
269                            .push(KeyValue::new(format!("gen_ai.prompt.{i}.content"), content));
270                    }
271                }
272            }
273        }
274
275        // Create span and store it in state
276        let span = tracer
277            .span_builder(ctx.operation.to_string())
278            .with_kind(SpanKind::Client)
279            .with_attributes(attributes)
280            .start(tracer);
281
282        // Store span directly in state
283        *ctx.state.span.lock().unwrap() = Some(span);
284
285        if self.config.debug {
286            debug!("Started Langfuse span for operation: {}", ctx.operation);
287        }
288
289        Ok(())
290    }
291
292    async fn after_response(
293        &self,
294        ctx: &AfterResponseContext<'_, LangfuseState<T::Span>>,
295    ) -> Result<()> {
296        use opentelemetry::trace::Span;
297
298        // Take the span from state so we can end it
299        let Some(mut span) = ctx.state.span.lock().unwrap().take() else {
300            if self.config.debug {
301                debug!("No span found in state for operation: {}", ctx.operation);
302            }
303            return Ok(());
304        };
305
306        // Add response attributes to the span
307
308        #[allow(clippy::cast_possible_truncation)]
309        {
310            span.set_attribute(KeyValue::new(
311                "duration_ms",
312                ctx.duration.as_millis() as i64,
313            ));
314        }
315
316        // Add usage metrics if available
317        if let Some(input_tokens) = ctx.input_tokens {
318            span.set_attribute(KeyValue::new(GEN_AI_USAGE_INPUT_TOKENS, input_tokens));
319        }
320        if let Some(output_tokens) = ctx.output_tokens {
321            span.set_attribute(KeyValue::new(GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens));
322        }
323
324        // Parse response and add completion content
325        if let Ok(response) = Self::extract_request_params(ctx.response_json) {
326            // Add response ID if available
327            if let Some(id) = response.get("id").and_then(serde_json::Value::as_str) {
328                span.set_attribute(KeyValue::new(GEN_AI_RESPONSE_ID, id.to_string()));
329            }
330
331            // Add completion content
332            if let Some(choices) = response
333                .get("choices")
334                .and_then(serde_json::Value::as_array)
335            {
336                for (i, choice) in choices.iter().enumerate() {
337                    if let Some(message) = choice.get("message") {
338                        if let Some(role) = message.get("role").and_then(serde_json::Value::as_str)
339                        {
340                            span.set_attribute(KeyValue::new(
341                                format!("gen_ai.completion.{i}.role"),
342                                role.to_string(),
343                            ));
344                        }
345                        if let Some(content) =
346                            message.get("content").and_then(serde_json::Value::as_str)
347                        {
348                            span.set_attribute(KeyValue::new(
349                                format!("gen_ai.completion.{i}.content"),
350                                content.to_string(),
351                            ));
352                        }
353                    }
354                }
355            }
356        }
357
358        // End the span
359        span.end();
360
361        if self.config.debug {
362            debug!("Completed Langfuse span for operation: {}", ctx.operation);
363        }
364
365        Ok(())
366    }
367
368    async fn on_stream_chunk(
369        &self,
370        _ctx: &StreamChunkContext<'_, LangfuseState<T::Span>>,
371    ) -> Result<()> {
372        // Stream chunks can add attributes to the current span if needed
373        // For now, we just let them pass through
374        Ok(())
375    }
376
377    async fn on_stream_end(
378        &self,
379        ctx: &StreamEndContext<'_, LangfuseState<T::Span>>,
380    ) -> Result<()> {
381        use opentelemetry::trace::Span;
382
383        // Take the span from state so we can end it
384        let Some(mut span) = ctx.state.span.lock().unwrap().take() else {
385            if self.config.debug {
386                debug!(
387                    "No span found in state for stream operation: {}",
388                    ctx.operation
389                );
390            }
391            return Ok(());
392        };
393
394        // Add final streaming attributes
395
396        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
397        {
398            span.set_attribute(KeyValue::new(
399                "stream.total_chunks",
400                ctx.total_chunks as i64,
401            ));
402            span.set_attribute(KeyValue::new(
403                "stream.duration_ms",
404                ctx.duration.as_millis() as i64,
405            ));
406        }
407
408        if let Some(input_tokens) = ctx.input_tokens {
409            span.set_attribute(KeyValue::new(GEN_AI_USAGE_INPUT_TOKENS, input_tokens));
410        }
411        if let Some(output_tokens) = ctx.output_tokens {
412            span.set_attribute(KeyValue::new(GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens));
413        }
414
415        // End the span
416        span.end();
417
418        if self.config.debug {
419            info!(
420                "Completed streaming span for operation: {} with {} chunks",
421                ctx.operation, ctx.total_chunks
422            );
423        }
424
425        Ok(())
426    }
427
428    async fn on_error(&self, ctx: &ErrorContext<'_, LangfuseState<T::Span>>) {
429        use opentelemetry::trace::{Span, Status};
430
431        // Take the span from state if available
432        let Some(state) = ctx.state else {
433            if self.config.debug {
434                debug!(
435                    "No state available for error in operation: {}",
436                    ctx.operation
437                );
438            }
439            return;
440        };
441
442        let Some(mut span) = state.span.lock().unwrap().take() else {
443            if self.config.debug {
444                debug!(
445                    "No span found in state for error in operation: {}",
446                    ctx.operation
447                );
448            }
449            return;
450        };
451
452        // Set the span status to error
453        span.set_status(Status::error(ctx.error.to_string()));
454
455        // Add error attributes to the span
456        span.set_attribute(KeyValue::new("error.type", format!("{:?}", ctx.error)));
457        span.set_attribute(KeyValue::new("error.message", ctx.error.to_string()));
458
459        if let Some(model) = ctx.model {
460            span.set_attribute(KeyValue::new(GEN_AI_REQUEST_MODEL, model.to_string()));
461        }
462
463        // End the span
464        span.end();
465
466        if self.config.debug {
467            error!(
468                "Recorded error for operation {}: {}",
469                ctx.operation, ctx.error
470            );
471        }
472    }
473}
474
475// Implement Interceptor for Arc<LangfuseInterceptor<T>> to allow sharing the interceptor
476#[async_trait::async_trait]
477impl<T: Tracer + Send + Sync> Interceptor<LangfuseState<T::Span>> for Arc<LangfuseInterceptor<T>>
478where
479    T::Span: Send + Sync + 'static,
480{
481    async fn before_request(
482        &self,
483        ctx: &mut BeforeRequestContext<'_, LangfuseState<T::Span>>,
484    ) -> Result<()> {
485        (**self).before_request(ctx).await
486    }
487
488    async fn after_response(
489        &self,
490        ctx: &AfterResponseContext<'_, LangfuseState<T::Span>>,
491    ) -> Result<()> {
492        (**self).after_response(ctx).await
493    }
494
495    async fn on_stream_chunk(
496        &self,
497        ctx: &StreamChunkContext<'_, LangfuseState<T::Span>>,
498    ) -> Result<()> {
499        (**self).on_stream_chunk(ctx).await
500    }
501
502    async fn on_stream_end(
503        &self,
504        ctx: &StreamEndContext<'_, LangfuseState<T::Span>>,
505    ) -> Result<()> {
506        (**self).on_stream_end(ctx).await
507    }
508
509    async fn on_error(&self, ctx: &ErrorContext<'_, LangfuseState<T::Span>>) {
510        (**self).on_error(ctx).await;
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use opentelemetry::trace::noop::NoopTracer;
518
519    #[test]
520    fn test_config_from_env() {
521        std::env::set_var("LANGFUSE_DEBUG", "true");
522
523        let config = LangfuseConfig::default();
524        assert!(config.debug);
525
526        // Cleanup
527        std::env::remove_var("LANGFUSE_DEBUG");
528    }
529
530    #[test]
531    fn test_interceptor_creation() {
532        let tracer = NoopTracer::new();
533        let config = LangfuseConfig::new().with_debug(true);
534        let _interceptor = LangfuseInterceptor::new(tracer, config);
535        // No assertion needed - just verify it compiles and constructs
536    }
537}