use crate::TelemetryInitError;
use crate::guard::TelemetryGuard;
use crate::settings::SettingsView;
use std::sync::OnceLock;
use tracing::Subscriber;
use tracing_subscriber::{layer::Layer, registry::LookupSpan};
static OUR_PROVIDER_INSTALLED: OnceLock<()> = OnceLock::new();
pub type BoxedTelemetryLayer<S> = Box<dyn Layer<S> + Send + Sync + 'static>;
pub fn is_tracing_enabled(settings: &dyn SettingsView) -> bool {
settings.tracing_enabled() || !settings.otlp_endpoint().is_empty()
}
pub fn already_instrumented() -> bool {
OUR_PROVIDER_INSTALLED.get().is_some()
}
fn noop_layer<S>() -> BoxedTelemetryLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
Box::new(tracing_subscriber::layer::Identity::new())
}
pub fn init_telemetry<S>(
settings: &dyn SettingsView,
) -> Result<(BoxedTelemetryLayer<S>, TelemetryGuard), TelemetryInitError>
where
S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync + 'static,
{
if !is_tracing_enabled(settings) {
return Ok((noop_layer::<S>(), TelemetryGuard::noop()));
}
#[cfg(not(feature = "telemetry"))]
{
let _ = settings.service_name();
tracing::warn!(
target: "cognee.observability",
"tracing requested but cognee-observability was built without `telemetry` feature; spans stay local"
);
Ok((noop_layer::<S>(), TelemetryGuard::noop()))
}
#[cfg(feature = "telemetry")]
{
if already_instrumented() {
let tracer = opentelemetry::global::tracer("cognee");
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
return Ok((Box::new(layer), TelemetryGuard::noop()));
}
let provider = telemetry_real::build_provider(settings)?;
opentelemetry::global::set_tracer_provider(provider.clone());
let _ = OUR_PROVIDER_INSTALLED.set(());
use opentelemetry::InstrumentationScope;
use opentelemetry::trace::TracerProvider as _;
let scope = InstrumentationScope::builder("cognee")
.with_version(env!("CARGO_PKG_VERSION"))
.build();
let tracer = provider.tracer_with_scope(scope);
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
Ok((Box::new(layer), TelemetryGuard::from_provider(provider)))
}
}
#[cfg(feature = "telemetry")]
mod telemetry_real {
use super::SettingsView;
use crate::TelemetryInitError;
pub(super) fn build_provider(
settings: &dyn SettingsView,
) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, TelemetryInitError> {
use opentelemetry_sdk::trace::SdkTracerProvider;
let resource = build_resource(settings.service_name());
let exporter = build_exporter(settings)?;
let mut builder = SdkTracerProvider::builder().with_resource(resource);
builder = install_exporter_on_builder(builder, exporter, settings.span_processor())?;
builder = apply_sampler(builder, settings)?;
Ok(builder.build())
}
fn build_resource(service_name: &str) -> opentelemetry_sdk::Resource {
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource as semres;
let env = std::env::var("ENV").unwrap_or_else(|_| "development".to_string());
Resource::builder()
.with_attributes([
KeyValue::new(semres::SERVICE_NAME, service_name.to_string()),
KeyValue::new(semres::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new("deployment.environment.name", env),
])
.build()
}
fn build_exporter(
settings: &dyn SettingsView,
) -> Result<opentelemetry_otlp::SpanExporter, TelemetryInitError> {
use opentelemetry_otlp::{
Protocol, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig,
};
let endpoint = settings.otlp_endpoint();
let headers = crate::headers::parse_otlp_headers(settings.otlp_headers());
match settings.otlp_protocol() {
"grpc" | "" => {
let mut http_headers = http::HeaderMap::new();
for (k, v) in &headers {
match (
http::header::HeaderName::try_from(k.as_str()),
http::header::HeaderValue::try_from(v.as_str()),
) {
(Ok(name), Ok(value)) => {
http_headers.insert(name, value);
}
_ => {
tracing::warn!(
target: "cognee.observability",
header = %k,
"OTLP gRPC metadata header rejected (invalid name or value)"
);
}
}
}
let metadata = tonic::metadata::MetadataMap::from_headers(http_headers);
SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_metadata(metadata)
.build()
.map_err(TelemetryInitError::ExporterBuild)
}
"http/protobuf" | "http" => SpanExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_protocol(Protocol::HttpBinary)
.with_headers(headers.into_iter().collect())
.build()
.map_err(TelemetryInitError::ExporterBuild),
other => Err(TelemetryInitError::UnknownProtocol(other.to_string())),
}
}
fn install_exporter_on_builder(
builder: opentelemetry_sdk::trace::TracerProviderBuilder,
exporter: opentelemetry_otlp::SpanExporter,
mode: &str,
) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
match mode {
"batch" | "" => Ok(builder.with_batch_exporter(exporter)),
"simple" => Ok(builder.with_simple_exporter(exporter)),
other => Err(TelemetryInitError::UnknownSpanProcessor(other.to_string())),
}
}
fn apply_sampler(
builder: opentelemetry_sdk::trace::TracerProviderBuilder,
settings: &dyn SettingsView,
) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
use opentelemetry_sdk::trace::Sampler;
let name = settings.traces_sampler();
if name.is_empty() {
return Ok(builder);
}
let arg = settings.traces_sampler_arg();
let sampler = match name {
"always_on" => Sampler::AlwaysOn,
"always_off" => Sampler::AlwaysOff,
"traceidratio" => Sampler::TraceIdRatioBased(parse_ratio(arg)?),
"parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
"parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
"parentbased_traceidratio" => {
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(parse_ratio(arg)?)))
}
other => return Err(TelemetryInitError::UnknownSampler(other.to_string())),
};
Ok(builder.with_sampler(sampler))
}
fn parse_ratio(arg: &str) -> Result<f64, TelemetryInitError> {
if arg.is_empty() {
return Err(TelemetryInitError::SamplerArgRequired);
}
arg.parse::<f64>()
.map_err(|_| TelemetryInitError::InvalidSamplerArg(arg.to_string()))
.and_then(|f| {
if (0.0..=1.0).contains(&f) {
Ok(f)
} else {
Err(TelemetryInitError::InvalidSamplerArg(arg.to_string()))
}
})
}
}
#[cfg(test)]
#[allow(
clippy::expect_used,
clippy::unwrap_used,
reason = "test code — panics are acceptable failures"
)]
mod tests {
use super::*;
use crate::settings::EnvSettingsView;
use crate::settings::SettingsView;
use tracing_subscriber::Registry;
use tracing_subscriber::layer::SubscriberExt;
#[test]
fn init_telemetry_noop_when_tracing_disabled() {
let settings = EnvSettingsView::default();
let result = init_telemetry::<Registry>(&settings);
assert!(result.is_ok());
let (layer, guard) = result.expect("init_telemetry returned Ok above");
assert!(!guard.has_provider());
let _subscriber = Registry::default().with(layer);
}
struct StaticSettings {
tracing_enabled: bool,
otlp_endpoint: String,
}
impl SettingsView for StaticSettings {
fn tracing_enabled(&self) -> bool {
self.tracing_enabled
}
fn service_name(&self) -> &str {
"cognee-test"
}
fn otlp_endpoint(&self) -> &str {
&self.otlp_endpoint
}
fn otlp_headers(&self) -> &str {
""
}
fn otlp_protocol(&self) -> &str {
"grpc"
}
fn span_processor(&self) -> &str {
"batch"
}
fn traces_sampler(&self) -> &str {
""
}
fn traces_sampler_arg(&self) -> &str {
""
}
}
#[test]
fn is_tracing_enabled_python_parity() {
let cases = [
(false, "", false),
(false, "http://example:4317", true),
(true, "", true),
(true, "http://example:4317", true),
];
for (flag, endpoint, expected) in cases {
let s = StaticSettings {
tracing_enabled: flag,
otlp_endpoint: endpoint.to_string(),
};
assert_eq!(
is_tracing_enabled(&s),
expected,
"is_tracing_enabled(flag={flag}, endpoint={endpoint:?}) should be {expected}"
);
}
}
}