cognee_observability/
init.rs1use crate::TelemetryInitError;
9use crate::guard::TelemetryGuard;
10use crate::settings::SettingsView;
11use std::sync::OnceLock;
12use tracing::Subscriber;
13use tracing_subscriber::{layer::Layer, registry::LookupSpan};
14
15static OUR_PROVIDER_INSTALLED: OnceLock<()> = OnceLock::new();
25
26pub type BoxedTelemetryLayer<S> = Box<dyn Layer<S> + Send + Sync + 'static>;
31
32pub fn is_tracing_enabled(settings: &dyn SettingsView) -> bool {
39 settings.tracing_enabled() || !settings.otlp_endpoint().is_empty()
40}
41
42pub fn already_instrumented() -> bool {
62 OUR_PROVIDER_INSTALLED.get().is_some()
63}
64
65fn noop_layer<S>() -> BoxedTelemetryLayer<S>
66where
67 S: Subscriber + for<'span> LookupSpan<'span>,
68{
69 Box::new(tracing_subscriber::layer::Identity::new())
70}
71
72pub fn init_telemetry<S>(
91 settings: &dyn SettingsView,
92) -> Result<(BoxedTelemetryLayer<S>, TelemetryGuard), TelemetryInitError>
93where
94 S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync + 'static,
95{
96 if !is_tracing_enabled(settings) {
97 return Ok((noop_layer::<S>(), TelemetryGuard::noop()));
98 }
99
100 #[cfg(not(feature = "telemetry"))]
101 {
102 let _ = settings.service_name();
104 tracing::warn!(
105 target: "cognee.observability",
106 "tracing requested but cognee-observability was built without `telemetry` feature; spans stay local"
107 );
108 Ok((noop_layer::<S>(), TelemetryGuard::noop()))
109 }
110
111 #[cfg(feature = "telemetry")]
112 {
113 if already_instrumented() {
114 let tracer = opentelemetry::global::tracer("cognee");
118 let layer = tracing_opentelemetry::layer().with_tracer(tracer);
119 return Ok((Box::new(layer), TelemetryGuard::noop()));
120 }
121
122 let provider = telemetry_real::build_provider(settings)?;
123
124 opentelemetry::global::set_tracer_provider(provider.clone());
125 let _ = OUR_PROVIDER_INSTALLED.set(());
129
130 use opentelemetry::InstrumentationScope;
133 use opentelemetry::trace::TracerProvider as _;
134 let scope = InstrumentationScope::builder("cognee")
135 .with_version(env!("CARGO_PKG_VERSION"))
136 .build();
137 let tracer = provider.tracer_with_scope(scope);
138 let layer = tracing_opentelemetry::layer().with_tracer(tracer);
139
140 Ok((Box::new(layer), TelemetryGuard::from_provider(provider)))
141 }
142}
143
144#[cfg(feature = "telemetry")]
145mod telemetry_real {
146 use super::SettingsView;
147 use crate::TelemetryInitError;
148
149 pub(super) fn build_provider(
150 settings: &dyn SettingsView,
151 ) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, TelemetryInitError> {
152 use opentelemetry_sdk::trace::SdkTracerProvider;
153
154 let resource = build_resource(settings.service_name());
155 let exporter = build_exporter(settings)?;
156
157 let mut builder = SdkTracerProvider::builder().with_resource(resource);
158 builder = install_exporter_on_builder(builder, exporter, settings.span_processor())?;
159 builder = apply_sampler(builder, settings)?;
160
161 Ok(builder.build())
162 }
163
164 fn build_resource(service_name: &str) -> opentelemetry_sdk::Resource {
165 use opentelemetry::KeyValue;
166 use opentelemetry_sdk::Resource;
167 use opentelemetry_semantic_conventions::resource as semres;
168
169 let env = std::env::var("ENV").unwrap_or_else(|_| "development".to_string());
170
171 Resource::builder()
175 .with_attributes([
176 KeyValue::new(semres::SERVICE_NAME, service_name.to_string()),
177 KeyValue::new(semres::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
178 KeyValue::new("deployment.environment.name", env),
179 ])
180 .build()
181 }
182
183 fn build_exporter(
184 settings: &dyn SettingsView,
185 ) -> Result<opentelemetry_otlp::SpanExporter, TelemetryInitError> {
186 use opentelemetry_otlp::{
187 Protocol, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig,
188 };
189
190 let endpoint = settings.otlp_endpoint();
191 let headers = crate::headers::parse_otlp_headers(settings.otlp_headers());
192
193 match settings.otlp_protocol() {
194 "grpc" | "" => {
195 let mut http_headers = http::HeaderMap::new();
199 for (k, v) in &headers {
200 match (
201 http::header::HeaderName::try_from(k.as_str()),
202 http::header::HeaderValue::try_from(v.as_str()),
203 ) {
204 (Ok(name), Ok(value)) => {
205 http_headers.insert(name, value);
206 }
207 _ => {
208 tracing::warn!(
209 target: "cognee.observability",
210 header = %k,
211 "OTLP gRPC metadata header rejected (invalid name or value)"
212 );
213 }
214 }
215 }
216 let metadata = tonic::metadata::MetadataMap::from_headers(http_headers);
217 SpanExporter::builder()
218 .with_tonic()
219 .with_endpoint(endpoint)
220 .with_metadata(metadata)
221 .build()
222 .map_err(TelemetryInitError::ExporterBuild)
223 }
224 "http/protobuf" | "http" => SpanExporter::builder()
225 .with_http()
226 .with_endpoint(endpoint)
227 .with_protocol(Protocol::HttpBinary)
228 .with_headers(headers.into_iter().collect())
229 .build()
230 .map_err(TelemetryInitError::ExporterBuild),
231 other => Err(TelemetryInitError::UnknownProtocol(other.to_string())),
232 }
233 }
234
235 fn install_exporter_on_builder(
236 builder: opentelemetry_sdk::trace::TracerProviderBuilder,
237 exporter: opentelemetry_otlp::SpanExporter,
238 mode: &str,
239 ) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
240 match mode {
241 "batch" | "" => Ok(builder.with_batch_exporter(exporter)),
242 "simple" => Ok(builder.with_simple_exporter(exporter)),
243 other => Err(TelemetryInitError::UnknownSpanProcessor(other.to_string())),
244 }
245 }
246
247 fn apply_sampler(
248 builder: opentelemetry_sdk::trace::TracerProviderBuilder,
249 settings: &dyn SettingsView,
250 ) -> Result<opentelemetry_sdk::trace::TracerProviderBuilder, TelemetryInitError> {
251 use opentelemetry_sdk::trace::Sampler;
252
253 let name = settings.traces_sampler();
254 if name.is_empty() {
255 return Ok(builder);
257 }
258
259 let arg = settings.traces_sampler_arg();
260 let sampler = match name {
261 "always_on" => Sampler::AlwaysOn,
262 "always_off" => Sampler::AlwaysOff,
263 "traceidratio" => Sampler::TraceIdRatioBased(parse_ratio(arg)?),
264 "parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
265 "parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
266 "parentbased_traceidratio" => {
267 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(parse_ratio(arg)?)))
268 }
269 other => return Err(TelemetryInitError::UnknownSampler(other.to_string())),
270 };
271 Ok(builder.with_sampler(sampler))
272 }
273
274 fn parse_ratio(arg: &str) -> Result<f64, TelemetryInitError> {
275 if arg.is_empty() {
276 return Err(TelemetryInitError::SamplerArgRequired);
277 }
278 arg.parse::<f64>()
279 .map_err(|_| TelemetryInitError::InvalidSamplerArg(arg.to_string()))
280 .and_then(|f| {
281 if (0.0..=1.0).contains(&f) {
282 Ok(f)
283 } else {
284 Err(TelemetryInitError::InvalidSamplerArg(arg.to_string()))
285 }
286 })
287 }
288}
289
290#[cfg(test)]
291#[allow(
292 clippy::expect_used,
293 clippy::unwrap_used,
294 reason = "test code — panics are acceptable failures"
295)]
296mod tests {
297 use super::*;
298 use crate::settings::EnvSettingsView;
299 use crate::settings::SettingsView;
300 use tracing_subscriber::Registry;
301 use tracing_subscriber::layer::SubscriberExt;
302
303 #[test]
304 fn init_telemetry_noop_when_tracing_disabled() {
305 let settings = EnvSettingsView::default();
306 let result = init_telemetry::<Registry>(&settings);
307 assert!(result.is_ok());
308 let (layer, guard) = result.expect("init_telemetry returned Ok above");
309 assert!(!guard.has_provider());
310 let _subscriber = Registry::default().with(layer);
311 }
312
313 struct StaticSettings {
314 tracing_enabled: bool,
315 otlp_endpoint: String,
316 }
317
318 impl SettingsView for StaticSettings {
319 fn tracing_enabled(&self) -> bool {
320 self.tracing_enabled
321 }
322 fn service_name(&self) -> &str {
323 "cognee-test"
324 }
325 fn otlp_endpoint(&self) -> &str {
326 &self.otlp_endpoint
327 }
328 fn otlp_headers(&self) -> &str {
329 ""
330 }
331 fn otlp_protocol(&self) -> &str {
332 "grpc"
333 }
334 fn span_processor(&self) -> &str {
335 "batch"
336 }
337 fn traces_sampler(&self) -> &str {
338 ""
339 }
340 fn traces_sampler_arg(&self) -> &str {
341 ""
342 }
343 }
344
345 #[test]
346 fn is_tracing_enabled_python_parity() {
347 let cases = [
348 (false, "", false),
349 (false, "http://example:4317", true),
350 (true, "", true),
351 (true, "http://example:4317", true),
352 ];
353
354 for (flag, endpoint, expected) in cases {
355 let s = StaticSettings {
356 tracing_enabled: flag,
357 otlp_endpoint: endpoint.to_string(),
358 };
359 assert_eq!(
360 is_tracing_enabled(&s),
361 expected,
362 "is_tracing_enabled(flag={flag}, endpoint={endpoint:?}) should be {expected}"
363 );
364 }
365 }
366}