1use std::{path::PathBuf, time::Duration};
4
5use chrono::SecondsFormat;
6use eyre::{DefaultHandler, EyreHandler, bail};
7use tracing::Subscriber;
8use tracing_appender::rolling::RollingFileAppender;
9use tracing_opentelemetry::OpenTelemetryLayer;
10use tracing_subscriber::{
11 EnvFilter, Layer, Registry,
12 layer::SubscriberExt,
13 registry,
14 reload::{self, Handle},
15};
16
17#[cfg(feature = "log_throttling")]
18use tracing_throttle::TracingRateLimitLayer;
19
20#[cfg(feature = "error_aggregation")]
21use {crate::libs::log::error_aggregation::*, std::sync::Arc};
22
23#[deprecated(
24 since = "1.3.0",
25 note = "This code is not used in current projects and should probably not be used going forward"
26)]
27pub mod legacy;
28
29#[cfg(feature = "error_aggregation")]
30pub mod error_aggregation;
31pub mod level_filter;
32pub mod otel;
33
34pub use level_filter::*;
35pub use otel::{OtelConfig, OtelGuards, OtelProtocol};
36
37pub use tracing_appender::rolling::Rotation as LogRotation;
39
40pub use tracing_appender::non_blocking::WorkerGuard;
41
42#[derive(Debug)]
43pub struct LoggingConfig {
44 pub level: LogLevel,
45 pub file_config: Option<FileLoggingConfig>,
46 pub otel_config: OtelConfig,
50 #[cfg(feature = "error_aggregation")]
51 pub error_aggregation: ErrorAggregationConfig,
52 #[cfg(feature = "log_throttling")]
53 pub throttling_config: Option<LogThrottlingConfig>,
54}
55
56#[derive(Debug, Clone)]
57pub struct FileLoggingConfig {
58 pub path: PathBuf,
59 pub file_prefix: Option<String>,
62 pub rotation: Option<LogRotation>,
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct LogThrottlingConfig {
68 pub summary_emission_interval: Option<Duration>,
70 pub metrics_emission_interval: Option<Duration>,
72 pub excluded_fields: Option<Vec<String>>,
80 pub exemptions: Option<Vec<String>>,
89}
90
91pub struct LogSetupReturn {
92 pub reload_handle: LogReloadHandle,
93 pub log_guards: (WorkerGuard, Option<WorkerGuard>),
94 pub otel_guards: Option<OtelGuards>,
97 #[cfg(feature = "error_aggregation")]
98 pub errors_container: Arc<ErrorAggregationContainer>,
99 #[cfg(feature = "log_throttling")]
100 pub log_throttling_handle: TracingRateLimitLayer,
102}
103
104struct LoggingSubscriberParts {
106 subscriber: Box<dyn Subscriber + Send + Sync + 'static>,
107 reload_handle: LogReloadHandle,
108 log_guards: (WorkerGuard, Option<WorkerGuard>), otel_guards: Option<OtelGuards>,
111 #[cfg(feature = "error_aggregation")]
112 errors_container: Arc<ErrorAggregationContainer>,
113 #[cfg(feature = "log_throttling")]
114 log_throttling_handle: TracingRateLimitLayer,
115}
116
117fn build_logging_subscriber(config: LoggingConfig) -> eyre::Result<LoggingSubscriberParts> {
120 let global_env_filter = build_env_filter(config.level)?;
122 let (reloadable_global_filter, global_reload_handle) = reload::Layer::new(global_env_filter);
123
124 let (non_blocking_stdout, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
125
126 let stdout_layer = tracing_subscriber::fmt::layer()
127 .with_thread_names(true)
128 .with_line_number(true)
129 .with_writer(non_blocking_stdout);
130
131 #[cfg(feature = "error_aggregation")]
132 let error_aggregation_config = config.error_aggregation.clone();
133
134 #[cfg(feature = "log_throttling")]
135 let throttling_config = config.throttling_config.clone().unwrap_or_default();
136
137 #[cfg(feature = "log_throttling")]
138 let mut exemptions = vec!["tracing_throttle::infrastructure::layer".to_string()]; #[cfg(feature = "log_throttling")]
140 exemptions.extend(throttling_config.exemptions.unwrap_or_default());
141
142 #[cfg(feature = "log_throttling")]
143 let rate_limit_filter = TracingRateLimitLayer::builder()
144 .with_excluded_fields(throttling_config.excluded_fields.unwrap_or_default())
145 .with_exempt_targets(exemptions)
146 .with_active_emission(throttling_config.summary_emission_interval.is_some())
147 .with_summary_interval(
148 throttling_config
149 .summary_emission_interval
150 .unwrap_or(Duration::from_secs(5 * 60)),
151 )
152 .build()
153 .expect("Error building tracing rate limit layer");
154
155 #[cfg(feature = "log_throttling")]
156 if let Some(metrics_duration) = throttling_config.metrics_emission_interval {
157 let metrics = rate_limit_filter.metrics().clone();
158
159 tokio::spawn(async move {
161 loop {
162 tokio::time::sleep(metrics_duration).await;
163
164 let snapshot = metrics.snapshot();
165 tracing::info!(
166 events_allowed = snapshot.events_allowed,
167 events_suppressed = snapshot.events_suppressed,
168 suppression_rate = format!("{:.1}%", snapshot.suppression_rate() * 100.0),
169 "Rate limiting metrics"
170 );
171 }
172 });
173 }
174
175 #[cfg(feature = "log_throttling")]
176 let log_throttling_handle = rate_limit_filter.clone();
177
178 let (file_layer, file_guard) = match config.file_config {
179 None => (None, None),
180 Some(file_config) => {
181 let appender = RollingFileAppender::builder()
182 .rotation(file_config.rotation.unwrap_or(LogRotation::NEVER))
183 .filename_prefix(file_config.file_prefix.unwrap_or_else(|| {
184 chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)
185 })) .filename_suffix("log")
187 .build(file_config.path)?;
188
189 let (non_blocking_appender, guard) = tracing_appender::non_blocking(appender);
190
191 let base_layer = tracing_subscriber::fmt::layer()
192 .with_thread_names(true)
193 .with_line_number(true)
194 .with_ansi(false)
195 .with_writer(non_blocking_appender);
196
197 #[cfg(feature = "log_throttling")]
198 let file_layer = base_layer.with_filter(rate_limit_filter);
199
200 #[cfg(not(feature = "log_throttling"))]
201 let file_layer = base_layer;
202
203 (Some(file_layer), Some(guard))
204 }
205 };
206
207 let reload_handle = LogReloadHandle(global_reload_handle);
208
209 let otel_result = otel::build_otel_layer(&config.otel_config);
211 let otel_guards = otel_result.guards;
212 let otel_tracer = otel_result.tracer;
213
214 let subscriber = registry().with(reloadable_global_filter);
218
219 let sinks = stdout_layer.and_then(file_layer);
221
222 #[cfg(feature = "error_aggregation")]
224 let (sinks, errors_container) = {
225 use crate::libs::log::error_aggregation::get_error_aggregation;
226 let (container, error_layer) = get_error_aggregation(error_aggregation_config);
227 (sinks.and_then(error_layer), container)
228 };
229
230 let subscriber = subscriber.with(sinks);
232
233 let subscriber: Box<dyn Subscriber + Send + Sync + 'static> = match (otel_tracer, &otel_guards) {
235 (Some(tracer), Some(guards)) => {
236 let trace_layer = OpenTelemetryLayer::new(tracer);
237 let log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&guards.logger_provider);
238 Box::new(subscriber.with(trace_layer).with(log_layer))
239 }
240 _ => Box::new(subscriber),
241 };
242
243 Ok(LoggingSubscriberParts {
244 subscriber,
245 reload_handle,
246 log_guards: (stdout_guard, file_guard),
247 otel_guards,
248 #[cfg(feature = "error_aggregation")]
249 errors_container,
250 #[cfg(feature = "log_throttling")]
251 log_throttling_handle,
252 })
253}
254
255pub fn setup_logging(config: LoggingConfig) -> eyre::Result<LogSetupReturn> {
272 use tracing_subscriber::util::SubscriberInitExt;
273
274 let parts = build_logging_subscriber(config)?;
275
276 parts.subscriber.init();
277
278 Ok(LogSetupReturn {
279 reload_handle: parts.reload_handle,
280 log_guards: parts.log_guards,
281 otel_guards: parts.otel_guards,
282 #[cfg(feature = "error_aggregation")]
283 errors_container: parts.errors_container,
284 #[cfg(feature = "log_throttling")]
285 log_throttling_handle: parts.log_throttling_handle,
286 })
287}
288
289pub type GlobalLogReloadHandle = Handle<EnvFilter, Registry>;
291
292#[derive(Clone)]
294pub struct LogReloadHandle(GlobalLogReloadHandle);
295
296impl std::fmt::Debug for LogReloadHandle {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 f.debug_struct("LogReloadHandle")
299 .field("inner", &"Handle<EnvFilter, Registry>")
300 .finish()
301 }
302}
303
304impl LogReloadHandle {
305 pub fn set_log_level(&self, level: LogLevel) -> eyre::Result<()> {
308 match build_env_filter(level) {
309 Ok(filter) => match self.0.modify(|env_filter| *env_filter = filter) {
310 Ok(_) => Ok(()),
311 Err(error) => {
312 tracing::error!(
313 ?error,
314 "Error setting new global log filter. Ignoring reload attempt"
315 );
316 bail!("Error setting new global log filter: {error}. Ignoring reload attempt")
317 }
318 },
319 Err(error) => {
320 tracing::error!(
321 ?error,
322 "Error building new filter from given log level. Ignoring reload attempt"
323 );
324 bail!(
325 "Error building new filter from given log level: {error}. Ignoring reload attempt"
326 )
327 }
328 }
329 }
330}
331
332pub struct CustomEyreHandler {
336 default_handler: Box<dyn EyreHandler>,
337 location: Option<&'static std::panic::Location<'static>>,
338}
339
340impl CustomEyreHandler {
341 pub fn default_with_location_saving(
342 error: &(dyn std::error::Error + 'static),
343 ) -> Box<dyn EyreHandler> {
344 Box::new(Self {
345 default_handler: DefaultHandler::default_with(error),
346 location: None,
347 })
348 }
349
350 pub fn get_location(&self) -> &Option<&'static std::panic::Location<'static>> {
351 &self.location
352 }
353}
354
355impl EyreHandler for CustomEyreHandler {
356 fn display(
357 &self,
358 error: &(dyn std::error::Error + 'static),
359 f: &mut core::fmt::Formatter<'_>,
360 ) -> core::fmt::Result {
361 self.default_handler.display(error, f)
362 }
363
364 fn debug(
365 &self,
366 error: &(dyn std::error::Error + 'static),
367 f: &mut core::fmt::Formatter<'_>,
368 ) -> core::fmt::Result {
369 self.default_handler.debug(error, f)
370 }
371
372 fn track_caller(&mut self, location: &'static std::panic::Location<'static>) {
373 self.location = Some(location); self.default_handler.track_caller(location);
375 }
376}
377
378#[cfg(test)]
379pub struct LogSetupReturnTest {
380 _guard: tracing::subscriber::DefaultGuard,
381 reload_handle: LogReloadHandle,
382 #[allow(dead_code)]
383 log_guards: (WorkerGuard, Option<WorkerGuard>),
384 #[allow(dead_code)]
385 otel_guards: Option<OtelGuards>,
386 #[cfg(feature = "error_aggregation")]
387 #[allow(dead_code)]
388 errors_container: Arc<ErrorAggregationContainer>,
389}
390
391#[cfg(test)]
394pub fn setup_logging_test(config: LoggingConfig) -> eyre::Result<LogSetupReturnTest> {
395 let parts = build_logging_subscriber(config)?;
396
397 let guard = tracing::subscriber::set_default(parts.subscriber);
399
400 Ok(LogSetupReturnTest {
401 _guard: guard,
402 reload_handle: parts.reload_handle,
403 log_guards: parts.log_guards,
404 otel_guards: parts.otel_guards,
405 #[cfg(feature = "error_aggregation")]
406 errors_container: parts.errors_container,
407 })
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use std::fs;
414 use tracing::{debug, error, info};
415
416 #[cfg(feature = "error_aggregation")]
417 fn default_error_aggregation_config() -> ErrorAggregationConfig {
418 ErrorAggregationConfig {
419 limit: 100,
420 normalize: true,
421 }
422 }
423
424 #[tokio::test]
426 async fn test_basic_logging_stdout() {
427 let config = LoggingConfig {
428 level: LogLevel::Info,
429 file_config: None,
430 otel_config: OtelConfig::default(),
431 #[cfg(feature = "error_aggregation")]
432 error_aggregation: default_error_aggregation_config(),
433 #[cfg(feature = "log_throttling")]
434 throttling_config: Some(LogThrottlingConfig::default()),
435 };
436
437 let _guard = setup_logging_test(config);
438 assert!(_guard.is_ok(), "Failed to setup logging");
439
440 info!("Test info message");
442 error!("Test error message");
443 }
444
445 #[tokio::test]
450 async fn test_file_logging_comprehensive() {
451 let temp_dir = tempfile::tempdir().unwrap();
452
453 let config = LoggingConfig {
454 level: LogLevel::Info,
455 file_config: Some(FileLoggingConfig {
456 path: temp_dir.path().to_path_buf(),
457 file_prefix: Some("test".to_string()),
458 rotation: None,
459 }),
460 otel_config: OtelConfig::default(),
461 #[cfg(feature = "error_aggregation")]
462 error_aggregation: default_error_aggregation_config(),
463 #[cfg(feature = "log_throttling")]
464 throttling_config: Some(LogThrottlingConfig::default()),
465 };
466
467 let log_setup = setup_logging_test(config).unwrap();
468
469 info!("info_message_before_reload");
471 debug!("debug_message_before_reload");
472 error!("error_message");
473
474 std::thread::sleep(std::time::Duration::from_millis(100));
475
476 let result = log_setup.reload_handle.set_log_level(LogLevel::Debug);
478 assert!(result.is_ok());
479
480 info!("info_message_after_reload");
482 debug!("debug_message_after_reload");
483
484 std::thread::sleep(std::time::Duration::from_millis(100));
485 drop(log_setup);
486
487 std::thread::sleep(std::time::Duration::from_millis(100));
488
489 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
491 assert_eq!(log_files.len(), 1, "Expected exactly one log file");
492
493 let log_file = log_files[0].as_ref().unwrap();
494 let log_contents = fs::read_to_string(log_file.path()).unwrap();
495
496 assert!(log_contents.contains("info_message_before_reload"));
498 assert!(
499 !log_contents.contains("debug_message_before_reload"),
500 "Debug should not appear before reload"
501 );
502 assert!(log_contents.contains("error_message"));
503
504 assert!(log_contents.contains("info_message_after_reload"));
506
507 assert!(
509 log_contents.contains("debug_message_after_reload"),
510 "Debug should appear after reload to DEBUG level"
511 );
512 }
513
514 #[tokio::test]
519 async fn test_otel_disabled_returns_none_guards() {
520 let config = LoggingConfig {
521 level: LogLevel::Info,
522 file_config: None,
523 otel_config: OtelConfig::default(), #[cfg(feature = "error_aggregation")]
525 error_aggregation: default_error_aggregation_config(),
526 #[cfg(feature = "log_throttling")]
527 throttling_config: None,
528 };
529
530 let result = setup_logging_test(config);
531 assert!(result.is_ok(), "Setup should succeed with OTel disabled");
532
533 let guard = result.unwrap();
534 assert!(
535 guard.otel_guards.is_none(),
536 "otel_guards should be None when OTel is disabled"
537 );
538 }
539
540 #[tokio::test]
544 async fn test_otel_graceful_degradation_unreachable_endpoint() {
545 let config = LoggingConfig {
546 level: LogLevel::Info,
547 file_config: None,
548 otel_config: OtelConfig {
549 enabled: true,
550 endpoint: Some("http://nonexistent.invalid:4317".to_string()),
551 ..OtelConfig::default()
552 },
553 #[cfg(feature = "error_aggregation")]
554 error_aggregation: default_error_aggregation_config(),
555 #[cfg(feature = "log_throttling")]
556 throttling_config: None,
557 };
558
559 let result = setup_logging_test(config);
561 assert!(result.is_ok(), "Setup should succeed even with unreachable OTel endpoint");
562
563 let guard = result.unwrap();
564 assert!(
568 guard.otel_guards.is_some(),
569 "otel_guards should be Some even with unreachable endpoint (SDK initializes synchronously)"
570 );
571 }
572
573 #[tokio::test]
576 async fn test_file_logging_with_otel_attempted() {
577 let temp_dir = tempfile::tempdir().unwrap();
578
579 let config = LoggingConfig {
580 level: LogLevel::Info,
581 file_config: Some(FileLoggingConfig {
582 path: temp_dir.path().to_path_buf(),
583 file_prefix: Some("otel_test".to_string()),
584 rotation: None,
585 }),
586 otel_config: OtelConfig {
587 enabled: true,
588 endpoint: Some("http://localhost:4317".to_string()), ..OtelConfig::default()
590 },
591 #[cfg(feature = "error_aggregation")]
592 error_aggregation: default_error_aggregation_config(),
593 #[cfg(feature = "log_throttling")]
594 throttling_config: None,
595 };
596
597 let log_setup = setup_logging_test(config).unwrap();
598
599 info!("file_log_with_otel_attempted");
601 error!("error_log_with_otel_attempted");
602
603 std::thread::sleep(std::time::Duration::from_millis(100));
604 drop(log_setup);
605 std::thread::sleep(std::time::Duration::from_millis(100));
606
607 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
609 assert_eq!(log_files.len(), 1, "Expected exactly one log file");
610
611 let log_contents = fs::read_to_string(log_files[0].as_ref().unwrap().path()).unwrap();
612 assert!(log_contents.contains("file_log_with_otel_attempted"));
613 assert!(log_contents.contains("error_log_with_otel_attempted"));
614 }
615
616 #[tokio::test]
618 async fn test_log_level_reload_with_otel_enabled() {
619 let temp_dir = tempfile::tempdir().unwrap();
620
621 let config = LoggingConfig {
622 level: LogLevel::Info,
623 file_config: Some(FileLoggingConfig {
624 path: temp_dir.path().to_path_buf(),
625 file_prefix: Some("reload_test".to_string()),
626 rotation: None,
627 }),
628 otel_config: OtelConfig {
629 enabled: true,
630 endpoint: Some("http://localhost:4317".to_string()),
631 ..OtelConfig::default()
632 },
633 #[cfg(feature = "error_aggregation")]
634 error_aggregation: default_error_aggregation_config(),
635 #[cfg(feature = "log_throttling")]
636 throttling_config: None,
637 };
638
639 let log_setup = setup_logging_test(config).unwrap();
640
641 info!("info_before_reload");
643 debug!("debug_before_reload"); std::thread::sleep(std::time::Duration::from_millis(100));
646
647 let result = log_setup.reload_handle.set_log_level(LogLevel::Debug);
649 assert!(result.is_ok(), "Log level reload should succeed with OTel enabled");
650
651 info!("info_after_reload");
653 debug!("debug_after_reload");
654
655 std::thread::sleep(std::time::Duration::from_millis(100));
656 drop(log_setup);
657 std::thread::sleep(std::time::Duration::from_millis(100));
658
659 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
660 let log_contents = fs::read_to_string(log_files[0].as_ref().unwrap().path()).unwrap();
661
662 assert!(log_contents.contains("info_before_reload"));
663 assert!(!log_contents.contains("debug_before_reload"), "Debug should not appear before reload");
664 assert!(log_contents.contains("info_after_reload"));
665 assert!(log_contents.contains("debug_after_reload"), "Debug should appear after reload");
666 }
667
668 #[tokio::test]
670 async fn test_otel_config_fields() {
671 use std::collections::HashMap;
672
673 let config = OtelConfig {
674 enabled: true,
675 service_name: Some("test-service".to_string()),
676 endpoint: Some("http://collector:4317".to_string()),
677 protocol: otel::OtelProtocol::HttpProtobuf,
678 headers: {
679 let mut h = HashMap::new();
680 h.insert("x-api-key".to_string(), "secret-key".to_string());
681 h
682 },
683 };
684
685 assert!(config.enabled);
686 assert_eq!(config.service_name, Some("test-service".to_string()));
687 assert_eq!(config.endpoint, Some("http://collector:4317".to_string()));
688 assert!(matches!(config.protocol, otel::OtelProtocol::HttpProtobuf));
689 assert_eq!(config.headers.get("x-api-key"), Some(&"secret-key".to_string()));
690
691 let default_config = OtelConfig::default();
693 assert!(!default_config.enabled);
694 assert!(default_config.service_name.is_none());
695 assert!(default_config.endpoint.is_none());
696 assert!(matches!(default_config.protocol, otel::OtelProtocol::Grpc));
697 assert!(default_config.headers.is_empty());
698 }
699
700 #[tokio::test]
703 async fn test_otel_enabled_no_endpoint_uses_fallback() {
704 let config = LoggingConfig {
705 level: LogLevel::Info,
706 file_config: None,
707 otel_config: OtelConfig {
708 enabled: true,
709 endpoint: None, ..OtelConfig::default()
711 },
712 #[cfg(feature = "error_aggregation")]
713 error_aggregation: default_error_aggregation_config(),
714 #[cfg(feature = "log_throttling")]
715 throttling_config: None,
716 };
717
718 let result = setup_logging_test(config);
720 assert!(result.is_ok(), "Setup should succeed with OTel enabled but no endpoint");
721
722 let guard = result.unwrap();
723 let _ = guard.otel_guards;
726 }
727}