greentic_telemetry/
init.rs1use anyhow::{Result, anyhow};
2use once_cell::sync::OnceCell;
3#[cfg(feature = "otlp")]
4use opentelemetry::global;
5#[cfg(feature = "otlp")]
6use opentelemetry_otlp::{
7 MetricExporter, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig,
8};
9#[cfg(feature = "otlp")]
10use opentelemetry_sdk::{
11 metrics::SdkMeterProvider,
12 propagation::TraceContextPropagator,
13 resource::Resource,
14 trace::{BatchSpanProcessor, Sampler, SdkTracerProvider},
15};
16#[cfg(feature = "otlp")]
17use std::collections::HashMap;
18#[cfg(feature = "dev")]
19use std::io::IsTerminal;
20#[cfg(feature = "dev")]
21use tracing_appender::rolling;
22#[cfg(any(feature = "dev", feature = "prod-json", feature = "otlp"))]
23use tracing_subscriber::EnvFilter;
24#[cfg(feature = "otlp")]
25use tracing_subscriber::Registry;
26#[cfg(any(feature = "dev", feature = "prod-json"))]
27use tracing_subscriber::fmt;
28#[cfg(any(feature = "dev", feature = "prod-json", feature = "otlp"))]
29use tracing_subscriber::prelude::*;
30
31#[cfg(feature = "otlp")]
32use crate::export::{Compression, Sampling};
33use crate::export::{ExportConfig, ExportMode};
34use crate::redaction;
35#[cfg(any(feature = "dev", feature = "prod-json"))]
36use crate::redaction::RedactingFormatFields;
37
38static INITED: OnceCell<()> = OnceCell::new();
39#[cfg(feature = "otlp")]
40static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();
41#[cfg(feature = "otlp")]
42static METER_PROVIDER: OnceCell<SdkMeterProvider> = OnceCell::new();
43#[cfg(feature = "otlp")]
44static INIT_GUARD: OnceCell<()> = OnceCell::new();
45
46#[derive(Clone, Debug)]
47pub struct TelemetryConfig {
48 pub service_name: String,
50}
51
52fn init_fmt_layers(_cfg: &TelemetryConfig) -> Result<()> {
53 #[cfg(any(feature = "dev", feature = "prod-json"))]
54 let filter = EnvFilter::try_from_default_env()
55 .or_else(|_| EnvFilter::try_new("info"))
56 .unwrap();
57
58 #[cfg(feature = "dev")]
59 {
60 let cfg = _cfg;
61 let filter = filter.clone();
62 let file_appender = rolling::daily(".dev-logs", format!("{}.log", cfg.service_name));
63 let (nb, _guard) = tracing_appender::non_blocking(file_appender);
64 let stdout_is_tty = std::io::stdout().is_terminal();
65
66 let layer_stdout = fmt::layer()
67 .with_target(true)
68 .fmt_fields(RedactingFormatFields)
69 .pretty()
70 .with_ansi(stdout_is_tty);
71 let layer_file = fmt::layer()
72 .with_writer(nb)
73 .with_ansi(false)
74 .fmt_fields(RedactingFormatFields)
75 .json();
76
77 let _ = tracing_subscriber::registry()
78 .with(filter)
79 .with(layer_stdout)
80 .with(layer_file)
81 .try_init();
82 }
83
84 #[cfg(all(not(feature = "dev"), feature = "prod-json"))]
85 {
86 let filter = filter;
87 let layer_json = fmt::layer()
88 .with_target(true)
89 .with_span_list(true)
90 .fmt_fields(RedactingFormatFields);
91 let _ = tracing_subscriber::registry()
92 .with(filter)
93 .with(layer_json)
94 .try_init();
95 }
96
97 #[cfg(feature = "dev-console")]
98 {
99 if std::env::var_os("TOKIO_CONSOLE").is_some()
100 && std::panic::catch_unwind(console_subscriber::init).is_err()
101 {
102 tracing::warn!(
103 "dev-console feature enabled but tokio_unstable not set; skipping console subscriber init"
104 );
105 }
106 }
107
108 Ok(())
109}
110
111pub fn init_telemetry(cfg: TelemetryConfig) -> Result<()> {
112 redaction::init_from_env();
113
114 if INITED.get().is_some() {
115 return Ok(());
116 }
117
118 init_fmt_layers(&cfg)?;
119
120 configure_otlp(&cfg.service_name)?;
121
122 let _ = INITED.set(());
123 Ok(())
124}
125
126#[cfg(feature = "otlp")]
127fn configure_otlp(service_name: &str) -> Result<()> {
128 global::set_text_map_propagator(TraceContextPropagator::new());
129
130 if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
131 let resource = Resource::builder()
132 .with_service_name(service_name.to_string())
133 .build();
134 install_otlp(&endpoint, resource)?;
135 }
136
137 Ok(())
138}
139
140#[cfg(not(feature = "otlp"))]
141fn configure_otlp(service_name: &str) -> Result<()> {
142 if std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some() {
143 tracing::warn!(
144 service = %service_name,
145 "otlp feature disabled; ignoring OTEL_EXPORTER_OTLP_ENDPOINT"
146 );
147 }
148 Ok(())
149}
150
151#[cfg(feature = "otlp")]
152fn install_otlp(endpoint: &str, resource: Resource) -> Result<()> {
153 let mut span_exporter_builder = SpanExporter::builder().with_tonic();
154 span_exporter_builder = span_exporter_builder.with_endpoint(endpoint.to_string());
155 let span_exporter = redaction::wrap_span_exporter(span_exporter_builder.build()?);
156
157 let span_processor = BatchSpanProcessor::builder(span_exporter).build();
158 let tracer_provider = SdkTracerProvider::builder()
159 .with_resource(resource.clone())
160 .with_span_processor(span_processor)
161 .build();
162 global::set_tracer_provider(tracer_provider.clone());
163 let _ = TRACER_PROVIDER.set(tracer_provider);
164
165 let mut metric_exporter_builder = MetricExporter::builder().with_tonic();
166 metric_exporter_builder = metric_exporter_builder.with_endpoint(endpoint.to_string());
167 let metric_exporter = metric_exporter_builder.build()?;
168 let meter_provider = SdkMeterProvider::builder()
169 .with_resource(resource)
170 .with_periodic_exporter(metric_exporter)
171 .build();
172 global::set_meter_provider(meter_provider.clone());
173 let _ = METER_PROVIDER.set(meter_provider);
174
175 Ok(())
176}
177
178#[cfg(feature = "otlp")]
179pub fn shutdown() {
180 if let Some(provider) = TRACER_PROVIDER.get() {
181 let _ = provider.shutdown();
182 }
183 if let Some(provider) = METER_PROVIDER.get() {
184 let _ = provider.shutdown();
185 }
186}
187
188#[cfg(not(feature = "otlp"))]
189pub fn shutdown() {}
190
191#[cfg(feature = "otlp")]
192fn serialize_headers(headers: &HashMap<String, String>) -> Option<String> {
193 if headers.is_empty() {
194 return None;
195 }
196 let mut parts = Vec::new();
197 for (key, value) in headers {
198 if key.trim().is_empty() {
199 continue;
200 }
201 parts.push(format!("{key}={value}"));
202 }
203 if parts.is_empty() {
204 None
205 } else {
206 Some(parts.join(","))
207 }
208}
209
210#[cfg(feature = "otlp")]
211fn install_otlp_from_export(cfg: TelemetryConfig, export: ExportConfig) -> Result<()> {
212 if INIT_GUARD.get().is_some() {
213 return Ok(());
214 }
215
216 let endpoint = export.endpoint.unwrap_or_else(|| match export.mode {
217 ExportMode::OtlpHttp => "http://localhost:4318".into(),
218 _ => "http://localhost:4317".into(),
219 });
220
221 let resource = Resource::builder()
222 .with_service_name(cfg.service_name)
223 .build();
224
225 let sampler = match export.sampling {
226 Sampling::TraceIdRatio(ratio) if (0.0..1.0).contains(&ratio) && ratio < 1.0 => {
227 Sampler::TraceIdRatioBased(ratio)
228 }
229 Sampling::AlwaysOff => Sampler::AlwaysOff,
230 _ => Sampler::AlwaysOn,
231 };
232
233 let span_exporter = if matches!(export.mode, ExportMode::OtlpHttp) {
234 let mut builder = SpanExporter::builder()
235 .with_http()
236 .with_endpoint(endpoint.clone());
237 if !export.headers.is_empty() {
238 builder = builder.with_headers(export.headers.clone());
239 }
240 if let Some(compression) = export.compression {
241 builder = builder.with_compression(map_compression(compression));
242 }
243 builder.build().map_err(|e| anyhow!(e.to_string()))?
244 } else {
245 if let Some(serialized) = serialize_headers(&export.headers) {
246 unsafe {
247 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", &serialized);
248 std::env::set_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS", &serialized);
249 std::env::set_var("OTEL_EXPORTER_OTLP_METRICS_HEADERS", serialized.clone());
250 }
251 }
252 let mut builder = SpanExporter::builder()
253 .with_tonic()
254 .with_endpoint(endpoint.clone());
255 if let Some(compression) = export.compression {
256 builder = builder.with_compression(map_compression(compression));
257 }
258 builder.build().map_err(|e| anyhow!(e.to_string()))?
259 };
260 let span_exporter = redaction::wrap_span_exporter(span_exporter);
261
262 let tracer_provider = SdkTracerProvider::builder()
263 .with_batch_exporter(span_exporter)
264 .with_sampler(sampler)
265 .with_resource(resource.clone())
266 .build();
267 global::set_tracer_provider(tracer_provider.clone());
268 let _ = TRACER_PROVIDER.set(tracer_provider);
269
270 let metric_exporter = if matches!(export.mode, ExportMode::OtlpHttp) {
271 let mut builder = MetricExporter::builder()
272 .with_http()
273 .with_endpoint(endpoint.clone());
274 if !export.headers.is_empty() {
275 builder = builder.with_headers(export.headers.clone());
276 }
277 if let Some(compression) = export.compression {
278 builder = builder.with_compression(map_compression(compression));
279 }
280 builder.build().map_err(|e| anyhow!(e.to_string()))?
281 } else {
282 let mut builder = MetricExporter::builder()
283 .with_tonic()
284 .with_endpoint(endpoint.clone());
285 if let Some(compression) = export.compression {
286 builder = builder.with_compression(map_compression(compression));
287 }
288 builder.build().map_err(|e| anyhow!(e.to_string()))?
289 };
290
291 let meter_provider = SdkMeterProvider::builder()
292 .with_resource(resource)
293 .with_periodic_exporter(metric_exporter)
294 .build();
295 global::set_meter_provider(meter_provider.clone());
296 let _ = METER_PROVIDER.set(meter_provider);
297
298 {
299 let tracer = global::tracer("greentic-telemetry");
300
301 let subscriber = Registry::default()
302 .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
303 .with(tracing_opentelemetry::layer().with_tracer(tracer));
304
305 let _ = subscriber.try_init();
306 }
307
308 let _ = INIT_GUARD.set(());
309
310 Ok(())
311}
312#[cfg(feature = "otlp")]
313fn map_compression(c: Compression) -> opentelemetry_otlp::Compression {
314 match c {
315 Compression::Gzip => opentelemetry_otlp::Compression::Gzip,
316 }
317}
318
319pub fn init_telemetry_auto(cfg: TelemetryConfig) -> Result<()> {
321 let export = ExportConfig::from_env()?;
322 init_telemetry_from_config(cfg, export)
323}
324
325pub fn init_telemetry_from_config(cfg: TelemetryConfig, export: ExportConfig) -> Result<()> {
327 redaction::init_from_env();
328
329 if INITED.get().is_some() {
330 return Ok(());
331 }
332
333 match export.mode {
334 ExportMode::JsonStdout => init_fmt_layers(&cfg)?,
335 ExportMode::OtlpGrpc | ExportMode::OtlpHttp => {
336 #[cfg(feature = "otlp")]
337 {
338 install_otlp_from_export(cfg, export)?
339 }
340 #[cfg(not(feature = "otlp"))]
341 {
342 return Err(anyhow!(
343 "otlp feature disabled; cannot install OTLP exporter from config"
344 ));
345 }
346 }
347 }
348
349 let _ = INITED.set(());
350 Ok(())
351}