1use crate::interceptor::{
42 AfterResponseContext, BeforeRequestContext, ErrorContext, Interceptor, StreamChunkContext,
43 StreamEndContext,
44};
45use crate::Result;
46use opentelemetry::{
47 trace::{SpanKind, Tracer},
48 KeyValue,
49};
50use opentelemetry_langfuse::LangfuseContext;
51use opentelemetry_semantic_conventions::attribute::{
52 GEN_AI_OPERATION_NAME, GEN_AI_REQUEST_MAX_TOKENS, GEN_AI_REQUEST_MODEL,
53 GEN_AI_REQUEST_TEMPERATURE, GEN_AI_RESPONSE_ID, GEN_AI_SYSTEM, GEN_AI_USAGE_INPUT_TOKENS,
54 GEN_AI_USAGE_OUTPUT_TOKENS,
55};
56use serde_json::Value;
57use std::sync::{Arc, Mutex};
58use tracing::{debug, error, info};
59
60pub struct LangfuseState<S = opentelemetry::global::BoxedSpan> {
69 pub(crate) span: Mutex<Option<S>>,
71}
72
73impl<S> Default for LangfuseState<S> {
74 fn default() -> Self {
75 Self {
76 span: Mutex::new(None),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct LangfuseConfig {
84 pub debug: bool,
86}
87
88impl Default for LangfuseConfig {
89 fn default() -> Self {
90 Self {
91 debug: std::env::var("LANGFUSE_DEBUG")
92 .unwrap_or_else(|_| "false".to_string())
93 .parse()
94 .unwrap_or(false),
95 }
96 }
97}
98
99impl LangfuseConfig {
100 pub fn new() -> Self {
102 Self::default()
103 }
104
105 #[must_use]
107 pub fn with_debug(mut self, debug: bool) -> Self {
108 self.debug = debug;
109 self
110 }
111}
112
113pub struct LangfuseInterceptor<T: Tracer + Send + Sync> {
121 config: LangfuseConfig,
122 tracer: Arc<T>,
123 context: Arc<LangfuseContext>,
124}
125
126impl<T: Tracer + Send + Sync> LangfuseInterceptor<T>
127where
128 T::Span: Send + Sync + 'static,
129{
130 pub fn new(tracer: T, config: LangfuseConfig) -> Self {
167 if config.debug {
168 info!("Langfuse interceptor initialized");
169 }
170
171 Self {
172 config,
173 tracer: Arc::new(tracer),
174 context: Arc::new(LangfuseContext::new()),
175 }
176 }
177
178 pub fn set_session_id(&self, session_id: impl Into<String>) {
180 self.context.set_session_id(session_id);
181 }
182
183 pub fn set_user_id(&self, user_id: impl Into<String>) {
185 self.context.set_user_id(user_id);
186 }
187
188 pub fn add_tags(&self, tags: Vec<String>) {
190 self.context.add_tags(tags);
191 }
192
193 pub fn add_tag(&self, tag: impl Into<String>) {
195 self.context.add_tag(tag);
196 }
197
198 pub fn set_metadata(&self, metadata: serde_json::Value) {
200 self.context.set_metadata(metadata);
201 }
202
203 pub fn clear_context(&self) {
205 self.context.clear();
206 }
207
208 pub fn context(&self) -> &Arc<LangfuseContext> {
210 &self.context
211 }
212
213 fn extract_request_params(request_json: &str) -> serde_json::Result<Value> {
215 serde_json::from_str(request_json)
216 }
217}
218
219#[async_trait::async_trait]
220impl<T: Tracer + Send + Sync> Interceptor<LangfuseState<T::Span>> for LangfuseInterceptor<T>
221where
222 T::Span: Send + Sync + 'static,
223{
224 async fn before_request(
225 &self,
226 ctx: &mut BeforeRequestContext<'_, LangfuseState<T::Span>>,
227 ) -> Result<()> {
228 let tracer = self.tracer.as_ref();
229
230 let mut attributes = vec![
232 KeyValue::new(GEN_AI_SYSTEM, "openai"),
233 KeyValue::new(GEN_AI_OPERATION_NAME, ctx.operation.to_string()),
234 KeyValue::new(GEN_AI_REQUEST_MODEL, ctx.model.to_string()),
235 ];
236
237 attributes.extend(self.context.get_attributes());
239
240 if let Ok(params) = Self::extract_request_params(ctx.request_json) {
242 if let Some(temperature) = params
243 .get("temperature")
244 .and_then(serde_json::Value::as_f64)
245 {
246 attributes.push(KeyValue::new(GEN_AI_REQUEST_TEMPERATURE, temperature));
247 }
248 if let Some(max_tokens) = params.get("max_tokens").and_then(serde_json::Value::as_i64) {
249 attributes.push(KeyValue::new(GEN_AI_REQUEST_MAX_TOKENS, max_tokens));
250 }
251
252 if let Some(messages) = params.get("messages").and_then(serde_json::Value::as_array) {
254 for (i, message) in messages.iter().enumerate() {
255 if let Some(obj) = message.as_object() {
256 let role = obj
257 .get("role")
258 .and_then(serde_json::Value::as_str)
259 .unwrap_or("unknown")
260 .to_string();
261 let content = obj
262 .get("content")
263 .and_then(serde_json::Value::as_str)
264 .unwrap_or("")
265 .to_string();
266
267 attributes.push(KeyValue::new(format!("gen_ai.prompt.{i}.role"), role));
268 attributes
269 .push(KeyValue::new(format!("gen_ai.prompt.{i}.content"), content));
270 }
271 }
272 }
273 }
274
275 let span = tracer
277 .span_builder(ctx.operation.to_string())
278 .with_kind(SpanKind::Client)
279 .with_attributes(attributes)
280 .start(tracer);
281
282 *ctx.state.span.lock().unwrap() = Some(span);
284
285 if self.config.debug {
286 debug!("Started Langfuse span for operation: {}", ctx.operation);
287 }
288
289 Ok(())
290 }
291
292 async fn after_response(
293 &self,
294 ctx: &AfterResponseContext<'_, LangfuseState<T::Span>>,
295 ) -> Result<()> {
296 use opentelemetry::trace::Span;
297
298 let Some(mut span) = ctx.state.span.lock().unwrap().take() else {
300 if self.config.debug {
301 debug!("No span found in state for operation: {}", ctx.operation);
302 }
303 return Ok(());
304 };
305
306 #[allow(clippy::cast_possible_truncation)]
309 {
310 span.set_attribute(KeyValue::new(
311 "duration_ms",
312 ctx.duration.as_millis() as i64,
313 ));
314 }
315
316 if let Some(input_tokens) = ctx.input_tokens {
318 span.set_attribute(KeyValue::new(GEN_AI_USAGE_INPUT_TOKENS, input_tokens));
319 }
320 if let Some(output_tokens) = ctx.output_tokens {
321 span.set_attribute(KeyValue::new(GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens));
322 }
323
324 if let Ok(response) = Self::extract_request_params(ctx.response_json) {
326 if let Some(id) = response.get("id").and_then(serde_json::Value::as_str) {
328 span.set_attribute(KeyValue::new(GEN_AI_RESPONSE_ID, id.to_string()));
329 }
330
331 if let Some(choices) = response
333 .get("choices")
334 .and_then(serde_json::Value::as_array)
335 {
336 for (i, choice) in choices.iter().enumerate() {
337 if let Some(message) = choice.get("message") {
338 if let Some(role) = message.get("role").and_then(serde_json::Value::as_str)
339 {
340 span.set_attribute(KeyValue::new(
341 format!("gen_ai.completion.{i}.role"),
342 role.to_string(),
343 ));
344 }
345 if let Some(content) =
346 message.get("content").and_then(serde_json::Value::as_str)
347 {
348 span.set_attribute(KeyValue::new(
349 format!("gen_ai.completion.{i}.content"),
350 content.to_string(),
351 ));
352 }
353 }
354 }
355 }
356 }
357
358 span.end();
360
361 if self.config.debug {
362 debug!("Completed Langfuse span for operation: {}", ctx.operation);
363 }
364
365 Ok(())
366 }
367
368 async fn on_stream_chunk(
369 &self,
370 _ctx: &StreamChunkContext<'_, LangfuseState<T::Span>>,
371 ) -> Result<()> {
372 Ok(())
375 }
376
377 async fn on_stream_end(
378 &self,
379 ctx: &StreamEndContext<'_, LangfuseState<T::Span>>,
380 ) -> Result<()> {
381 use opentelemetry::trace::Span;
382
383 let Some(mut span) = ctx.state.span.lock().unwrap().take() else {
385 if self.config.debug {
386 debug!(
387 "No span found in state for stream operation: {}",
388 ctx.operation
389 );
390 }
391 return Ok(());
392 };
393
394 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
397 {
398 span.set_attribute(KeyValue::new(
399 "stream.total_chunks",
400 ctx.total_chunks as i64,
401 ));
402 span.set_attribute(KeyValue::new(
403 "stream.duration_ms",
404 ctx.duration.as_millis() as i64,
405 ));
406 }
407
408 if let Some(input_tokens) = ctx.input_tokens {
409 span.set_attribute(KeyValue::new(GEN_AI_USAGE_INPUT_TOKENS, input_tokens));
410 }
411 if let Some(output_tokens) = ctx.output_tokens {
412 span.set_attribute(KeyValue::new(GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens));
413 }
414
415 span.end();
417
418 if self.config.debug {
419 info!(
420 "Completed streaming span for operation: {} with {} chunks",
421 ctx.operation, ctx.total_chunks
422 );
423 }
424
425 Ok(())
426 }
427
428 async fn on_error(&self, ctx: &ErrorContext<'_, LangfuseState<T::Span>>) {
429 use opentelemetry::trace::{Span, Status};
430
431 let Some(state) = ctx.state else {
433 if self.config.debug {
434 debug!(
435 "No state available for error in operation: {}",
436 ctx.operation
437 );
438 }
439 return;
440 };
441
442 let Some(mut span) = state.span.lock().unwrap().take() else {
443 if self.config.debug {
444 debug!(
445 "No span found in state for error in operation: {}",
446 ctx.operation
447 );
448 }
449 return;
450 };
451
452 span.set_status(Status::error(ctx.error.to_string()));
454
455 span.set_attribute(KeyValue::new("error.type", format!("{:?}", ctx.error)));
457 span.set_attribute(KeyValue::new("error.message", ctx.error.to_string()));
458
459 if let Some(model) = ctx.model {
460 span.set_attribute(KeyValue::new(GEN_AI_REQUEST_MODEL, model.to_string()));
461 }
462
463 span.end();
465
466 if self.config.debug {
467 error!(
468 "Recorded error for operation {}: {}",
469 ctx.operation, ctx.error
470 );
471 }
472 }
473}
474
475#[async_trait::async_trait]
477impl<T: Tracer + Send + Sync> Interceptor<LangfuseState<T::Span>> for Arc<LangfuseInterceptor<T>>
478where
479 T::Span: Send + Sync + 'static,
480{
481 async fn before_request(
482 &self,
483 ctx: &mut BeforeRequestContext<'_, LangfuseState<T::Span>>,
484 ) -> Result<()> {
485 (**self).before_request(ctx).await
486 }
487
488 async fn after_response(
489 &self,
490 ctx: &AfterResponseContext<'_, LangfuseState<T::Span>>,
491 ) -> Result<()> {
492 (**self).after_response(ctx).await
493 }
494
495 async fn on_stream_chunk(
496 &self,
497 ctx: &StreamChunkContext<'_, LangfuseState<T::Span>>,
498 ) -> Result<()> {
499 (**self).on_stream_chunk(ctx).await
500 }
501
502 async fn on_stream_end(
503 &self,
504 ctx: &StreamEndContext<'_, LangfuseState<T::Span>>,
505 ) -> Result<()> {
506 (**self).on_stream_end(ctx).await
507 }
508
509 async fn on_error(&self, ctx: &ErrorContext<'_, LangfuseState<T::Span>>) {
510 (**self).on_error(ctx).await;
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use opentelemetry::trace::noop::NoopTracer;
518
519 #[test]
520 fn test_config_from_env() {
521 std::env::set_var("LANGFUSE_DEBUG", "true");
522
523 let config = LangfuseConfig::default();
524 assert!(config.debug);
525
526 std::env::remove_var("LANGFUSE_DEBUG");
528 }
529
530 #[test]
531 fn test_interceptor_creation() {
532 let tracer = NoopTracer::new();
533 let config = LangfuseConfig::new().with_debug(true);
534 let _interceptor = LangfuseInterceptor::new(tracer, config);
535 }
537}