use async_trait::async_trait;
use futures::stream::BoxStream;
use tracing::{info_span, Instrument};
use crate::error::Result;
use crate::providers::genai_events;
use crate::traits::{
ChatMessage, CompletionOptions, LLMProvider, LLMResponse, StreamChunk, ToolChoice,
ToolDefinition,
};
pub mod genai_attrs {
pub const OPERATION_NAME: &str = "gen_ai.operation.name";
pub const SYSTEM: &str = "gen_ai.system";
pub const REQUEST_MODEL: &str = "gen_ai.request.model";
pub const REQUEST_MAX_TOKENS: &str = "gen_ai.request.max_tokens";
pub const REQUEST_TEMPERATURE: &str = "gen_ai.request.temperature";
pub const REQUEST_TOP_P: &str = "gen_ai.request.top_p";
pub const RESPONSE_MODEL: &str = "gen_ai.response.model";
pub const USAGE_INPUT_TOKENS: &str = "gen_ai.usage.input_tokens";
pub const USAGE_OUTPUT_TOKENS: &str = "gen_ai.usage.output_tokens";
pub const RESPONSE_FINISH_REASONS: &str = "gen_ai.response.finish_reasons";
pub const USAGE_REASONING_TOKENS: &str = "gen_ai.usage.reasoning_tokens";
pub const REASONING_CONTENT: &str = "gen_ai.reasoning.content";
}
fn should_capture_content() -> bool {
std::env::var("EDGECODE_CAPTURE_CONTENT")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}
pub struct TracingProvider<P: LLMProvider> {
inner: P,
}
impl<P: LLMProvider> TracingProvider<P> {
pub fn new(inner: P) -> Self {
Self { inner }
}
pub fn inner(&self) -> &P {
&self.inner
}
pub fn into_inner(self) -> P {
self.inner
}
}
#[async_trait]
impl<P: LLMProvider> LLMProvider for TracingProvider<P> {
fn name(&self) -> &str {
self.inner.name()
}
fn model(&self) -> &str {
self.inner.model()
}
fn max_context_length(&self) -> usize {
self.inner.max_context_length()
}
async fn complete(&self, prompt: &str) -> Result<LLMResponse> {
let span = info_span!(
"gen_ai.complete",
{ genai_attrs::OPERATION_NAME } = "complete",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
prompt_length = prompt.len(),
"gen_ai.prompt" = tracing::field::Empty,
"gen_ai.completion.content" = tracing::field::Empty,
"langfuse.observation.input" = tracing::field::Empty,
"langfuse.observation.output" = tracing::field::Empty,
);
if should_capture_content() {
span.record("gen_ai.prompt", prompt);
span.record("langfuse.observation.input", prompt);
}
let response = self.inner.complete(prompt).instrument(span.clone()).await?;
span.record(genai_attrs::RESPONSE_MODEL, &response.model);
span.record(
genai_attrs::USAGE_INPUT_TOKENS,
response.prompt_tokens as i64,
);
span.record(
genai_attrs::USAGE_OUTPUT_TOKENS,
response.completion_tokens as i64,
);
if let Some(reason) = &response.finish_reason {
span.record(genai_attrs::RESPONSE_FINISH_REASONS, reason.as_str());
}
if should_capture_content() {
span.record("gen_ai.completion.content", response.content.as_str());
span.record("langfuse.observation.output", response.content.as_str());
}
Ok(response)
}
async fn complete_with_options(
&self,
prompt: &str,
options: &CompletionOptions,
) -> Result<LLMResponse> {
let span = info_span!(
"gen_ai.complete",
{ genai_attrs::OPERATION_NAME } = "complete",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
{ genai_attrs::REQUEST_MAX_TOKENS } = options.max_tokens.map(|t| t as i64),
{ genai_attrs::REQUEST_TEMPERATURE } = options.temperature.map(|t| t as f64),
{ genai_attrs::REQUEST_TOP_P } = options.top_p.map(|t| t as f64),
prompt_length = prompt.len(),
"gen_ai.prompt" = tracing::field::Empty,
"gen_ai.completion.content" = tracing::field::Empty,
"langfuse.observation.input" = tracing::field::Empty,
"langfuse.observation.output" = tracing::field::Empty,
);
if should_capture_content() {
span.record("gen_ai.prompt", prompt);
span.record("langfuse.observation.input", prompt);
}
let response = self
.inner
.complete_with_options(prompt, options)
.instrument(span.clone())
.await?;
span.record(genai_attrs::RESPONSE_MODEL, &response.model);
span.record(
genai_attrs::USAGE_INPUT_TOKENS,
response.prompt_tokens as i64,
);
span.record(
genai_attrs::USAGE_OUTPUT_TOKENS,
response.completion_tokens as i64,
);
if let Some(reason) = &response.finish_reason {
span.record(genai_attrs::RESPONSE_FINISH_REASONS, reason.as_str());
}
if should_capture_content() {
span.record("gen_ai.completion.content", response.content.as_str());
span.record("langfuse.observation.output", response.content.as_str());
}
let input_messages = vec![ChatMessage {
role: crate::traits::ChatRole::User,
content: prompt.to_string(),
name: None,
tool_calls: None,
tool_call_id: None,
cache_control: None,
images: None,
}];
let output_messages = vec![ChatMessage {
role: crate::traits::ChatRole::Assistant,
content: response.content.clone(),
name: None,
tool_calls: None,
tool_call_id: None,
cache_control: None,
images: None,
}];
async {
genai_events::emit_inference_event(
&input_messages,
&output_messages,
&response,
Some(options),
);
}
.instrument(span)
.await;
Ok(response)
}
async fn chat(
&self,
messages: &[ChatMessage],
options: Option<&CompletionOptions>,
) -> Result<LLMResponse> {
let max_tokens = options.and_then(|o| o.max_tokens).map(|t| t as i64);
let temperature = options.and_then(|o| o.temperature).map(|t| t as f64);
let top_p = options.and_then(|o| o.top_p).map(|t| t as f64);
let span = info_span!(
"gen_ai.chat",
{ genai_attrs::OPERATION_NAME } = "chat",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
{ genai_attrs::REQUEST_MAX_TOKENS } = max_tokens,
{ genai_attrs::REQUEST_TEMPERATURE } = temperature,
{ genai_attrs::REQUEST_TOP_P } = top_p,
message_count = messages.len(),
{ genai_attrs::RESPONSE_MODEL } = tracing::field::Empty,
{ genai_attrs::USAGE_INPUT_TOKENS } = tracing::field::Empty,
{ genai_attrs::USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
{ genai_attrs::USAGE_REASONING_TOKENS } = tracing::field::Empty,
{ genai_attrs::RESPONSE_FINISH_REASONS } = tracing::field::Empty,
"gen_ai.prompt" = tracing::field::Empty,
"gen_ai.completion.content" = tracing::field::Empty,
{ genai_attrs::REASONING_CONTENT } = tracing::field::Empty,
"langfuse.observation.input" = tracing::field::Empty,
"langfuse.observation.output" = tracing::field::Empty,
);
if should_capture_content() {
if let Ok(messages_json) = serde_json::to_string(messages) {
span.record("gen_ai.prompt", messages_json.as_str());
span.record("langfuse.observation.input", messages_json.as_str());
}
}
let response = self
.inner
.chat(messages, options)
.instrument(span.clone())
.await?;
span.record(genai_attrs::RESPONSE_MODEL, &response.model);
span.record(
genai_attrs::USAGE_INPUT_TOKENS,
response.prompt_tokens as i64,
);
span.record(
genai_attrs::USAGE_OUTPUT_TOKENS,
response.completion_tokens as i64,
);
if let Some(thinking_tokens) = response.thinking_tokens {
span.record(genai_attrs::USAGE_REASONING_TOKENS, thinking_tokens as i64);
}
if let Some(reason) = &response.finish_reason {
span.record(genai_attrs::RESPONSE_FINISH_REASONS, reason.as_str());
}
if should_capture_content() {
span.record("gen_ai.completion.content", response.content.as_str());
span.record("langfuse.observation.output", response.content.as_str());
if let Some(thinking) = &response.thinking_content {
let truncated = if thinking.len() > 10240 {
let truncate_at = thinking
.char_indices()
.take_while(|(idx, _)| *idx < 10240)
.last()
.map(|(idx, c)| idx + c.len_utf8())
.unwrap_or(0);
format!("{}...[truncated]", &thinking[..truncate_at])
} else {
thinking.clone()
};
span.record(genai_attrs::REASONING_CONTENT, truncated.as_str());
}
}
let output_messages = vec![ChatMessage {
role: crate::traits::ChatRole::Assistant,
content: response.content.clone(),
name: None,
tool_calls: None,
tool_call_id: None,
cache_control: None,
images: None,
}];
async {
genai_events::emit_inference_event(messages, &output_messages, &response, options);
}
.instrument(span)
.await;
tracing::info!(
target: "gen_ai.usage",
input_tokens = response.prompt_tokens,
output_tokens = response.completion_tokens,
model = %response.model,
"LLM chat completion"
);
Ok(response)
}
async fn chat_with_tools(
&self,
messages: &[ChatMessage],
tools: &[ToolDefinition],
tool_choice: Option<ToolChoice>,
options: Option<&CompletionOptions>,
) -> Result<LLMResponse> {
let max_tokens = options.and_then(|o| o.max_tokens).map(|t| t as i64);
let temperature = options.and_then(|o| o.temperature).map(|t| t as f64);
let span = info_span!(
"gen_ai.chat_with_tools",
{ genai_attrs::OPERATION_NAME } = "chat_with_tools",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
{ genai_attrs::REQUEST_MAX_TOKENS } = max_tokens,
{ genai_attrs::REQUEST_TEMPERATURE } = temperature,
message_count = messages.len(),
tool_count = tools.len(),
{ genai_attrs::RESPONSE_MODEL } = tracing::field::Empty,
{ genai_attrs::USAGE_INPUT_TOKENS } = tracing::field::Empty,
{ genai_attrs::USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
{ genai_attrs::USAGE_REASONING_TOKENS } = tracing::field::Empty,
{ genai_attrs::RESPONSE_FINISH_REASONS } = tracing::field::Empty,
"gen_ai.prompt" = tracing::field::Empty,
"gen_ai.completion.content" = tracing::field::Empty,
{ genai_attrs::REASONING_CONTENT } = tracing::field::Empty,
"langfuse.observation.input" = tracing::field::Empty,
"langfuse.observation.output" = tracing::field::Empty,
"langfuse.observation.type" = "generation",
);
if should_capture_content() {
if let Ok(messages_json) = serde_json::to_string(messages) {
span.record("gen_ai.prompt", messages_json.as_str());
span.record("langfuse.observation.input", messages_json.as_str());
}
}
let response = self
.inner
.chat_with_tools(messages, tools, tool_choice, options)
.instrument(span.clone())
.await?;
span.record(genai_attrs::RESPONSE_MODEL, &response.model);
span.record(
genai_attrs::USAGE_INPUT_TOKENS,
response.prompt_tokens as i64,
);
span.record(
genai_attrs::USAGE_OUTPUT_TOKENS,
response.completion_tokens as i64,
);
if let Some(thinking_tokens) = response.thinking_tokens {
span.record(genai_attrs::USAGE_REASONING_TOKENS, thinking_tokens as i64);
}
if let Some(reason) = &response.finish_reason {
span.record(genai_attrs::RESPONSE_FINISH_REASONS, reason.as_str());
}
if should_capture_content() {
let output_content = if !response.tool_calls.is_empty() {
serde_json::to_string(&response.tool_calls)
.unwrap_or_else(|_| format!("{} tool call(s)", response.tool_calls.len()))
} else {
response.content.clone()
};
span.record("gen_ai.completion.content", output_content.as_str());
span.record("langfuse.observation.output", output_content.as_str());
if let Some(thinking) = &response.thinking_content {
let truncated = if thinking.len() > 10240 {
let truncate_at = thinking
.char_indices()
.take_while(|(idx, _)| *idx < 10240)
.last()
.map(|(idx, c)| idx + c.len_utf8())
.unwrap_or(0);
format!("{}...[truncated]", &thinking[..truncate_at])
} else {
thinking.clone()
};
span.record(genai_attrs::REASONING_CONTENT, truncated.as_str());
}
}
tracing::info!(
target: "gen_ai.usage",
input_tokens = response.prompt_tokens,
output_tokens = response.completion_tokens,
model = %response.model,
tool_calls = response.tool_calls.len(),
"LLM chat with tools completion"
);
Ok(response)
}
async fn stream(&self, prompt: &str) -> Result<BoxStream<'static, Result<String>>> {
let span = info_span!(
"gen_ai.stream",
{ genai_attrs::OPERATION_NAME } = "stream",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
prompt_length = prompt.len(),
"gen_ai.prompt" = tracing::field::Empty,
);
if should_capture_content() {
span.record("gen_ai.prompt", prompt);
}
self.inner.stream(prompt).instrument(span).await
}
async fn chat_with_tools_stream(
&self,
messages: &[ChatMessage],
tools: &[ToolDefinition],
tool_choice: Option<ToolChoice>,
options: Option<&CompletionOptions>,
) -> Result<BoxStream<'static, Result<StreamChunk>>> {
let max_tokens = options.and_then(|o| o.max_tokens).map(|t| t as i64);
let temperature = options.and_then(|o| o.temperature).map(|t| t as f64);
let span = info_span!(
"gen_ai.stream_with_tools",
{ genai_attrs::OPERATION_NAME } = "stream_with_tools",
{ genai_attrs::SYSTEM } = self.inner.name(),
{ genai_attrs::REQUEST_MODEL } = self.inner.model(),
{ genai_attrs::REQUEST_MAX_TOKENS } = max_tokens,
{ genai_attrs::REQUEST_TEMPERATURE } = temperature,
message_count = messages.len(),
tool_count = tools.len(),
"gen_ai.prompt" = tracing::field::Empty,
"langfuse.observation.input" = tracing::field::Empty,
"langfuse.observation.type" = "generation",
);
if should_capture_content() {
if let Ok(messages_json) = serde_json::to_string(messages) {
span.record("gen_ai.prompt", messages_json.as_str());
span.record("langfuse.observation.input", messages_json.as_str());
}
}
tracing::info!(
span_id = ?span.id(),
span_name = "gen_ai.stream_with_tools",
"Created LLM streaming span"
);
self.inner
.chat_with_tools_stream(messages, tools, tool_choice, options)
.instrument(span)
.await
}
fn supports_streaming(&self) -> bool {
self.inner.supports_streaming()
}
fn supports_tool_streaming(&self) -> bool {
self.inner.supports_tool_streaming()
}
fn supports_json_mode(&self) -> bool {
self.inner.supports_json_mode()
}
fn supports_function_calling(&self) -> bool {
self.inner.supports_function_calling()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::mock::MockProvider;
#[test]
fn test_tracing_provider_delegates_name() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert_eq!(traced.name(), "mock");
}
#[test]
fn test_tracing_provider_delegates_model() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert_eq!(traced.model(), "mock-model");
}
#[test]
fn test_tracing_provider_delegates_max_context_length() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert_eq!(traced.max_context_length(), 4096);
}
#[test]
fn test_into_inner() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
let inner = traced.into_inner();
assert_eq!(inner.name(), "mock");
}
#[tokio::test]
async fn test_complete_creates_span() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
let response = traced.complete("Hello").await.unwrap();
assert_eq!(response.content, "Mock response");
}
#[tokio::test]
async fn test_chat_creates_span() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
let messages = vec![ChatMessage::user("Hello")];
let response = traced.chat(&messages, None).await.unwrap();
assert_eq!(response.content, "Mock response");
}
#[tokio::test]
async fn test_complete_with_options_creates_span() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
let options = CompletionOptions {
max_tokens: Some(100),
temperature: Some(0.7),
top_p: Some(0.9),
..Default::default()
};
let response = traced
.complete_with_options("Prompt", &options)
.await
.unwrap();
assert_eq!(response.content, "Mock response");
}
#[test]
fn test_inner_accessor() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert_eq!(traced.inner().name(), "mock");
}
#[test]
fn test_supports_streaming_delegation() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert!(!traced.supports_streaming());
}
#[test]
fn test_supports_json_mode_delegation() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert!(!traced.supports_json_mode());
}
#[test]
fn test_supports_function_calling_delegation() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert!(!traced.supports_function_calling());
}
#[test]
fn test_supports_tool_streaming_delegation() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
assert!(!traced.supports_tool_streaming());
}
#[test]
fn test_genai_attrs_constants() {
assert_eq!(genai_attrs::OPERATION_NAME, "gen_ai.operation.name");
assert_eq!(genai_attrs::SYSTEM, "gen_ai.system");
assert_eq!(genai_attrs::REQUEST_MODEL, "gen_ai.request.model");
assert_eq!(genai_attrs::REQUEST_MAX_TOKENS, "gen_ai.request.max_tokens");
assert_eq!(
genai_attrs::REQUEST_TEMPERATURE,
"gen_ai.request.temperature"
);
assert_eq!(genai_attrs::REQUEST_TOP_P, "gen_ai.request.top_p");
assert_eq!(genai_attrs::RESPONSE_MODEL, "gen_ai.response.model");
assert_eq!(genai_attrs::USAGE_INPUT_TOKENS, "gen_ai.usage.input_tokens");
assert_eq!(
genai_attrs::USAGE_OUTPUT_TOKENS,
"gen_ai.usage.output_tokens"
);
assert_eq!(
genai_attrs::RESPONSE_FINISH_REASONS,
"gen_ai.response.finish_reasons"
);
assert_eq!(
genai_attrs::USAGE_REASONING_TOKENS,
"gen_ai.usage.reasoning_tokens"
);
assert_eq!(genai_attrs::REASONING_CONTENT, "gen_ai.reasoning.content");
}
#[test]
fn test_should_capture_content_default_false() {
std::env::remove_var("EDGECODE_CAPTURE_CONTENT");
assert!(!should_capture_content());
}
#[tokio::test]
async fn test_chat_with_options() {
let mock = MockProvider::new();
let traced = TracingProvider::new(mock);
let messages = vec![ChatMessage::user("Hello")];
let options = CompletionOptions {
max_tokens: Some(50),
temperature: Some(0.5),
..Default::default()
};
let response = traced.chat(&messages, Some(&options)).await.unwrap();
assert_eq!(response.content, "Mock response");
}
#[tokio::test]
async fn test_chat_with_tools_delegation() {
use crate::providers::mock::MockAgentProvider;
let mock = MockAgentProvider::new();
mock.add_response("tool response").await;
let traced = TracingProvider::new(mock);
let messages = vec![ChatMessage::user("use tools")];
let response = traced
.chat_with_tools(&messages, &[], None, None)
.await
.unwrap();
assert_eq!(response.content, "tool response");
}
#[tokio::test]
async fn test_stream_delegation() {
use futures::StreamExt;
let mock = MockProvider::new();
mock.add_response("streamed").await;
let traced = TracingProvider::new(mock);
let mut stream = traced.stream("prompt").await.unwrap();
let chunk = stream.next().await.unwrap().unwrap();
assert_eq!(chunk, "streamed");
}
#[tokio::test]
async fn test_complete_with_queued_response() {
let mock = MockProvider::new();
mock.add_response("custom traced").await;
let traced = TracingProvider::new(mock);
let response = traced.complete("Hi").await.unwrap();
assert_eq!(response.content, "custom traced");
}
}