Skip to main content

cognee_observability/
init.rs

1//! Public entry point for OTEL bring-up.
2//!
3//! [`init_telemetry`] returns `(BoxedTelemetryLayer, TelemetryGuard)`.
4//! Always succeeds in a usable way: when telemetry is disabled or the
5//! build does not include the `telemetry` feature, returns a noop layer
6//! plus a noop guard so call sites never need cfg-gating.
7
8use crate::TelemetryInitError;
9use crate::guard::TelemetryGuard;
10use crate::settings::SettingsView;
11use std::sync::OnceLock;
12use tracing::Subscriber;
13use tracing_subscriber::{layer::Layer, registry::LookupSpan};
14
15/// Process-global marker recording whether *this crate* has installed an
16/// OTEL tracer provider. Set exactly once after a successful
17/// `set_tracer_provider` call inside `init_telemetry`. Replaces the older
18/// Debug-string sniff against `opentelemetry::global::tracer_provider()`,
19/// which broke at OTEL 0.31 because the `GlobalTracerProvider` wrapper's
20/// Debug repr is the opaque string `"GlobalTracerProvider"` regardless of
21/// the inner provider type — the substring `"Noop"` never appears, so the
22/// old heuristic returned `true` on a fresh process and prevented
23/// telemetry from ever being installed.
24static OUR_PROVIDER_INSTALLED: OnceLock<()> = OnceLock::new();
25
26/// Type-erased layer compatible with any `tracing` registry that supports
27/// `LookupSpan`. Boxing is what lets the disabled and enabled paths return
28/// the same shape — the caller composes it onto a subscriber via
29/// `.with(layer)` without seeing the underlying generic parameters.
30pub type BoxedTelemetryLayer<S> = Box<dyn Layer<S> + Send + Sync + 'static>;
31
32/// Python-parity check: should we initialize and emit OTEL spans?
33///
34/// Returns `true` when the operator has explicitly opted in via
35/// `COGNEE_TRACING_ENABLED`, *or* implicitly opted in by setting
36/// `OTEL_EXPORTER_OTLP_ENDPOINT` (Decision 2 in
37/// `01-otel-otlp-export.md` — implicit activation).
38pub fn is_tracing_enabled(settings: &dyn SettingsView) -> bool {
39    settings.tracing_enabled() || !settings.otlp_endpoint().is_empty()
40}
41
42/// Has *this crate* already installed an OTEL tracer provider in the
43/// current process?
44///
45/// Returns `true` once `init_telemetry` has successfully installed an
46/// SDK provider via `opentelemetry::global::set_tracer_provider`, and
47/// `false` otherwise. The semantic shifted at OTEL 0.31 from the older
48/// "has SOMETHING installed OTEL?" host-detection heuristic to "have we
49/// installed it?". The OTEL 0.31 API does not expose a stable way to
50/// distinguish the default noop provider from an SDK provider installed
51/// by an external auto-instrumentation tool (Datadog, Dash0, etc.), so
52/// we instead track our own installation site explicitly.
53///
54/// The legitimate use of this predicate is **idempotency**: a second
55/// `init_telemetry` call within the same process must not overwrite the
56/// provider we already installed; it returns a bridged tracing layer
57/// plus a noop guard so the original guard's flush-on-drop behaviour is
58/// preserved.
59///
60/// Without `telemetry` this can never be true (no OTEL deps to look at).
61pub fn already_instrumented() -> bool {
62    OUR_PROVIDER_INSTALLED.get().is_some()
63}
64
65fn noop_layer<S>() -> BoxedTelemetryLayer<S>
66where
67    S: Subscriber + for<'span> LookupSpan<'span>,
68{
69    Box::new(tracing_subscriber::layer::Identity::new())
70}
71
72/// Build the OTEL `tracing` layer and an RAII guard.
73///
74/// On success returns `(layer, guard)`. The layer must be added to the
75/// subscriber via `.with(layer)`. The guard must be held until the
76/// process is ready to exit; dropping it flushes pending spans.
77///
78/// # Sampler precedence
79///
80/// When `Settings.otel_traces_sampler` is set, it overrides the
81/// `OTEL_TRACES_SAMPLER` env var. When it is empty, the OpenTelemetry
82/// SDK's internal env-var reader picks up `OTEL_TRACES_SAMPLER` /
83/// `OTEL_TRACES_SAMPLER_ARG` directly.
84///
85/// # Errors
86///
87/// Returns [`TelemetryInitError`] when sampler/protocol/processor names
88/// are unrecognised or when the OTLP exporter fails to build. Callers
89/// are free to log-and-continue: this function never panics.
90pub fn init_telemetry<S>(
91    settings: &dyn SettingsView,
92) -> Result<(BoxedTelemetryLayer<S>, TelemetryGuard), TelemetryInitError>
93where
94    S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync + 'static,
95{
96    if !is_tracing_enabled(settings) {
97        return Ok((noop_layer::<S>(), TelemetryGuard::noop()));
98    }
99
100    #[cfg(not(feature = "telemetry"))]
101    {
102        // Reference settings to silence unused-warning on the noop path.
103        let _ = settings.service_name();
104        tracing::warn!(
105            target: "cognee.observability",
106            "tracing requested but cognee-observability was built without `telemetry` feature; spans stay local"
107        );
108        Ok((noop_layer::<S>(), TelemetryGuard::noop()))
109    }
110
111    #[cfg(feature = "telemetry")]
112    {
113        if already_instrumented() {
114            // External tool installed a provider already — bridge to the
115            // global tracer instead of installing our own (Decision 9
116            // safety: never overwrite an externally-set provider).
117            let tracer = opentelemetry::global::tracer("cognee");
118            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
119            return Ok((Box::new(layer), TelemetryGuard::noop()));
120        }
121
122        let provider = telemetry_real::build_provider(settings)?;
123
124        opentelemetry::global::set_tracer_provider(provider.clone());
125        // Mark our installation so that subsequent `init_telemetry`
126        // calls in the same process bridge to the existing provider
127        // rather than overwriting it.
128        let _ = OUR_PROVIDER_INSTALLED.set(());
129
130        // 0.31 removed `tracer_builder("cognee")` in favour of building
131        // an `InstrumentationScope` and passing it to `tracer_with_scope`.
132        use opentelemetry::InstrumentationScope;
133        use opentelemetry::trace::TracerProvider as _;
134        let scope = InstrumentationScope::builder("cognee")
135            .with_version(env!("CARGO_PKG_VERSION"))
136            .build();
137        let tracer = provider.tracer_with_scope(scope);
138        let layer = tracing_opentelemetry::layer().with_tracer(tracer);
139
140        Ok((Box::new(layer), TelemetryGuard::from_provider(provider)))
141    }
142}
143
144#[cfg(feature = "telemetry")]
145mod telemetry_real {
146    use super::SettingsView;
147    use crate::TelemetryInitError;
148
149    pub(super) fn build_provider(
150        settings: &dyn SettingsView,
151    ) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, TelemetryInitError> {
152        use opentelemetry_sdk::trace::SdkTracerProvider;
153
154        let resource = build_resource(settings.service_name());
155        let exporter = build_exporter(settings)?;
156
157        let mut builder = SdkTracerProvider::builder().with_resource(resource);
158        builder = install_exporter_on_builder(builder, exporter, settings.span_processor())?;
159        builder = apply_sampler(builder, settings)?;
160
161        Ok(builder.build())
162    }
163
164    fn build_resource(service_name: &str) -> opentelemetry_sdk::Resource {
165        use opentelemetry::KeyValue;
166        use opentelemetry_sdk::Resource;
167        use opentelemetry_semantic_conventions::resource as semres;
168
169        let env = std::env::var("ENV").unwrap_or_else(|_| "development".to_string());
170
171        // `deployment.environment.name` is gated behind the
172        // `semconv_experimental` feature in 0.31; spell it out as a
173        // literal so we don't have to enable that feature workspace-wide.
174        Resource::builder()
175            .with_attributes([
176                KeyValue::new(semres::SERVICE_NAME, service_name.to_string()),
177                KeyValue::new(semres::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
178                KeyValue::new("deployment.environment.name", env),
179            ])
180            .build()
181    }
182
183    fn build_exporter(
184        settings: &dyn SettingsView,
185    ) -> Result<opentelemetry_otlp::SpanExporter, TelemetryInitError> {
186        use opentelemetry_otlp::{
187            Protocol, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig,
188        };
189
190        let endpoint = settings.otlp_endpoint();
191        let headers = crate::headers::parse_otlp_headers(settings.otlp_headers());
192
193        match settings.otlp_protocol() {
194            "grpc" | "" => {
195                // gRPC takes headers as a tonic MetadataMap, which is
196                // built from an `http::HeaderMap` to avoid pulling in
197                // tonic's `MetadataKey`/`MetadataValue` types directly.
198                let mut http_headers = http::HeaderMap::new();
199                for (k, v) in &headers {
200                    match (
201                        http::header::HeaderName::try_from(k.as_str()),
202                        http::header::HeaderValue::try_from(v.as_str()),
203                    ) {
204                        (Ok(name), Ok(value)) => {
205                            http_headers.insert(name, value);
206                        }
207                        _ => {
208                            tracing::warn!(
209                                target: "cognee.observability",
210                                header = %k,
211                                "OTLP gRPC metadata header rejected (invalid name or value)"
212                            );
213                        }
214                    }
215                }
216                let metadata = tonic::metadata::MetadataMap::from_headers(http_headers);
217                SpanExporter::builder()
218                    .with_tonic()
219                    .with_endpoint(endpoint)
220                    .with_metadata(metadata)
221                    .build()
222                    .map_err(TelemetryInitError::ExporterBuild)
223            }
224            "http/protobuf" | "http" => SpanExporter::builder()
225                .with_http()
226                .with_endpoint(endpoint)
227                .with_protocol(Protocol::HttpBinary)
228                .with_headers(headers.into_iter().collect())
229                .build()
230                .map_err(TelemetryInitError::ExporterBuild),
231            other => Err(TelemetryInitError::UnknownProtocol(other.to_string())),
232        }
233    }
234
235    fn install_exporter_on_builder(
236        builder: opentelemetry_sdk::trace::TracerProviderBuilder,
237        exporter: opentelemetry_otlp::SpanExporter,
238        mode: &str,
239    ) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
240        match mode {
241            "batch" | "" => Ok(builder.with_batch_exporter(exporter)),
242            "simple" => Ok(builder.with_simple_exporter(exporter)),
243            other => Err(TelemetryInitError::UnknownSpanProcessor(other.to_string())),
244        }
245    }
246
247    fn apply_sampler(
248        builder: opentelemetry_sdk::trace::TracerProviderBuilder,
249        settings: &dyn SettingsView,
250    ) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
251        use opentelemetry_sdk::trace::Sampler;
252
253        let name = settings.traces_sampler();
254        if name.is_empty() {
255            // Empty means: defer to the SDK's internal OTEL_TRACES_SAMPLER reader.
256            return Ok(builder);
257        }
258
259        let arg = settings.traces_sampler_arg();
260        let sampler = match name {
261            "always_on" => Sampler::AlwaysOn,
262            "always_off" => Sampler::AlwaysOff,
263            "traceidratio" => Sampler::TraceIdRatioBased(parse_ratio(arg)?),
264            "parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
265            "parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
266            "parentbased_traceidratio" => {
267                Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(parse_ratio(arg)?)))
268            }
269            other => return Err(TelemetryInitError::UnknownSampler(other.to_string())),
270        };
271        Ok(builder.with_sampler(sampler))
272    }
273
274    fn parse_ratio(arg: &str) -> Result<f64, TelemetryInitError> {
275        if arg.is_empty() {
276            return Err(TelemetryInitError::SamplerArgRequired);
277        }
278        arg.parse::<f64>()
279            .map_err(|_| TelemetryInitError::InvalidSamplerArg(arg.to_string()))
280            .and_then(|f| {
281                if (0.0..=1.0).contains(&f) {
282                    Ok(f)
283                } else {
284                    Err(TelemetryInitError::InvalidSamplerArg(arg.to_string()))
285                }
286            })
287    }
288}
289
290#[cfg(test)]
291#[allow(
292    clippy::expect_used,
293    clippy::unwrap_used,
294    reason = "test code — panics are acceptable failures"
295)]
296mod tests {
297    use super::*;
298    use crate::settings::EnvSettingsView;
299    use crate::settings::SettingsView;
300    use tracing_subscriber::Registry;
301    use tracing_subscriber::layer::SubscriberExt;
302
303    #[test]
304    fn init_telemetry_noop_when_tracing_disabled() {
305        let settings = EnvSettingsView::default();
306        let result = init_telemetry::<Registry>(&settings);
307        assert!(result.is_ok());
308        let (layer, guard) = result.expect("init_telemetry returned Ok above");
309        assert!(!guard.has_provider());
310        let _subscriber = Registry::default().with(layer);
311    }
312
313    struct StaticSettings {
314        tracing_enabled: bool,
315        otlp_endpoint: String,
316    }
317
318    impl SettingsView for StaticSettings {
319        fn tracing_enabled(&self) -> bool {
320            self.tracing_enabled
321        }
322        fn service_name(&self) -> &str {
323            "cognee-test"
324        }
325        fn otlp_endpoint(&self) -> &str {
326            &self.otlp_endpoint
327        }
328        fn otlp_headers(&self) -> &str {
329            ""
330        }
331        fn otlp_protocol(&self) -> &str {
332            "grpc"
333        }
334        fn span_processor(&self) -> &str {
335            "batch"
336        }
337        fn traces_sampler(&self) -> &str {
338            ""
339        }
340        fn traces_sampler_arg(&self) -> &str {
341            ""
342        }
343    }
344
345    #[test]
346    fn is_tracing_enabled_python_parity() {
347        let cases = [
348            (false, "", false),
349            (false, "http://example:4317", true),
350            (true, "", true),
351            (true, "http://example:4317", true),
352        ];
353
354        for (flag, endpoint, expected) in cases {
355            let s = StaticSettings {
356                tracing_enabled: flag,
357                otlp_endpoint: endpoint.to_string(),
358            };
359            assert_eq!(
360                is_tracing_enabled(&s),
361                expected,
362                "is_tracing_enabled(flag={flag}, endpoint={endpoint:?}) should be {expected}"
363            );
364        }
365    }
366}