use opentelemetry::global;
use opentelemetry_http::HttpClient;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::sync::OnceLock;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Debug, Clone)]
struct ReqwestHttpClient(reqwest::Client);
#[async_trait::async_trait]
impl HttpClient for ReqwestHttpClient {
async fn send_bytes(
&self,
request: http::Request<bytes::Bytes>,
) -> Result<http::Response<bytes::Bytes>, opentelemetry_http::HttpError> {
let mut response = self
.0
.execute(request.try_into()?)
.await?
.error_for_status()?;
let status = response.status();
let headers = std::mem::take(response.headers_mut());
let body = response.bytes().await?;
let mut http_response = http::Response::builder().status(status).body(body)?;
*http_response.headers_mut() = headers;
Ok(http_response)
}
}
pub(crate) static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
pub fn init_telemetry(endpoint: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_telemetry_with_headers(endpoint, vec![])
}
pub fn init_telemetry_with_headers(
endpoint: &str,
headers: Vec<(String, String)>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
if TRACER_PROVIDER.get().is_some() {
return Ok(());
}
let mut http_builder = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(ReqwestHttpClient(reqwest::Client::new()))
.with_endpoint(endpoint);
if !headers.is_empty() {
let header_map: std::collections::HashMap<String, String> =
headers.iter().cloned().collect();
http_builder = http_builder.with_headers(header_map);
}
let exporter = http_builder.build()?;
let provider = TRACER_PROVIDER.get_or_init(|| {
SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build()
});
global::set_tracer_provider(provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = provider.tracer("hyperinfer-client");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber_init = tracing_subscriber::registry()
.with(filter)
.with(otel_layer)
.try_init();
if let Err(e) = subscriber_init {
return Err(e.into());
}
Ok(())
}
fn is_insecure_langfuse_url(host: &str) -> bool {
if std::env::var("ALLOW_INSECURE_LANGFUSE_HTTP").is_ok() {
return false;
}
let Ok(url) = reqwest::Url::parse(host) else {
return true;
};
if url.scheme() != "http" {
return false;
}
let host_str = url.host_str().unwrap_or("");
!matches!(host_str, "localhost" | "127.0.0.1" | "::1" | "[::1]")
}
pub fn init_langfuse_telemetry(
public_key: &str,
secret_key: &str,
langfuse_host: Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let host = langfuse_host.unwrap_or("https://cloud.langfuse.com");
if is_insecure_langfuse_url(host) {
return Err(
"Langfuse OTLP endpoint uses Basic Authentication, which requires HTTPS. \
Use an https:// URL, or set ALLOW_INSECURE_LANGFUSE_HTTP=1 to override."
.into(),
);
}
let endpoint = format!("{}/api/public/otel/v1/traces", host);
use base64::Engine as _;
let credentials =
base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", public_key, secret_key));
let auth_header = format!("Basic {}", credentials);
init_telemetry_with_headers(&endpoint, vec![("Authorization".to_string(), auth_header)])
}
pub fn shutdown_telemetry() {
if let Some(provider) = TRACER_PROVIDER.get() {
let _ = provider.shutdown();
}
}
pub fn set_gen_ai_attributes(span: &Span, system: &str, model: &str, operation: &str) {
span.set_attribute("gen_ai.provider.name", system.to_owned());
span.set_attribute("gen_ai.request.model", model.to_owned());
span.set_attribute("gen_ai.operation.name", operation.to_owned());
}
pub fn set_gen_ai_usage(span: &Span, input_tokens: u32, output_tokens: u32) {
span.set_attribute("gen_ai.usage.input_tokens", input_tokens as i64);
span.set_attribute("gen_ai.usage.output_tokens", output_tokens as i64);
}
pub fn set_gen_ai_response(span: &Span, response_id: &str, finish_reason: &str) {
span.set_attribute("gen_ai.response.id", response_id.to_owned());
span.set_attribute("gen_ai.response.finish_reasons", finish_reason.to_owned());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_set_gen_ai_attributes_no_panic() {
let span = tracing::info_span!("test_span");
let _guard = span.enter();
set_gen_ai_attributes(&tracing::Span::current(), "openai", "gpt-4", "chat");
}
#[test]
fn test_set_gen_ai_usage_no_panic() {
let span = tracing::info_span!("test_span");
let _guard = span.enter();
set_gen_ai_usage(&tracing::Span::current(), 100, 50);
}
#[test]
fn test_set_gen_ai_response_no_panic() {
let span = tracing::info_span!("test_span");
let _guard = span.enter();
set_gen_ai_response(&tracing::Span::current(), "resp-123", "stop");
}
#[test]
fn test_langfuse_basic_auth_encoding() {
use base64::Engine as _;
let public_key = "pk-lf-test";
let secret_key = "sk-lf-test";
let expected = format!(
"Basic {}",
base64::engine::general_purpose::STANDARD
.encode(format!("{}:{}", public_key, secret_key))
);
let stripped = expected.strip_prefix("Basic ").unwrap();
let decoded = String::from_utf8(
base64::engine::general_purpose::STANDARD
.decode(stripped)
.unwrap(),
)
.unwrap();
assert_eq!(decoded, "pk-lf-test:sk-lf-test");
}
#[test]
fn test_tracer_provider_get_or_init_is_idempotent() {
use std::sync::OnceLock;
static TEST_PROVIDER: OnceLock<u32> = OnceLock::new();
let v1 = TEST_PROVIDER.get_or_init(|| 42);
assert_eq!(*v1, 42);
let v2 = TEST_PROVIDER.get_or_init(|| panic!("should not be called"));
assert!(
std::ptr::eq(v1, v2),
"get_or_init should return same instance"
);
}
#[test]
fn test_init_telemetry_with_headers_build_error() {
let endpoint = "http://\0invalid";
let res = init_telemetry_with_headers(endpoint, vec![]);
assert!(res.is_err());
}
#[test]
fn test_is_insecure_langfuse_url() {
std::env::remove_var("ALLOW_INSECURE_LANGFUSE_HTTP");
assert!(!super::is_insecure_langfuse_url("https://example.com"));
assert!(!super::is_insecure_langfuse_url(
"https://cloud.langfuse.com"
));
assert!(super::is_insecure_langfuse_url("http://example.com"));
assert!(super::is_insecure_langfuse_url("http://evil-localhost.com"));
assert!(super::is_insecure_langfuse_url("http://127.0.0.1.evil.com"));
assert!(!super::is_insecure_langfuse_url("http://localhost:8080"));
assert!(!super::is_insecure_langfuse_url("http://127.0.0.1:8080"));
assert!(!super::is_insecure_langfuse_url("http://[::1]:8080"));
assert!(super::is_insecure_langfuse_url("not-a-url"));
std::env::set_var("ALLOW_INSECURE_LANGFUSE_HTTP", "1");
assert!(!super::is_insecure_langfuse_url("http://example.com"));
std::env::remove_var("ALLOW_INSECURE_LANGFUSE_HTTP");
}
}