1use std::{path::PathBuf, time::Duration};
4
5use chrono::SecondsFormat;
6use eyre::{DefaultHandler, EyreHandler, bail};
7use tracing::Subscriber;
8use tracing_appender::rolling::RollingFileAppender;
9use tracing_opentelemetry::OpenTelemetryLayer;
10use tracing_subscriber::{
11 EnvFilter, Layer, Registry,
12 layer::SubscriberExt,
13 registry,
14 reload::{self, Handle},
15};
16
17#[cfg(feature = "log_throttling")]
18use tracing_throttle::TracingRateLimitLayer;
19
20#[cfg(feature = "error_aggregation")]
21use {crate::libs::log::error_aggregation::*, std::sync::Arc};
22
23#[deprecated(
24 since = "1.3.0",
25 note = "This code is not used in current projects and should probably not be used going forward"
26)]
27pub mod legacy;
28
29#[cfg(feature = "error_aggregation")]
30pub mod error_aggregation;
31pub mod level_filter;
32pub mod otel;
33
34pub use level_filter::*;
35pub use otel::{OtelConfig, OtelGuards};
36
37pub use tracing_appender::rolling::Rotation as LogRotation;
39
40pub use tracing_appender::non_blocking::WorkerGuard;
41
42#[derive(Debug)]
43pub struct LoggingConfig {
44 pub level: LogLevel,
45 pub file_config: Option<FileLoggingConfig>,
46 pub otel_config: OtelConfig,
50 #[cfg(feature = "error_aggregation")]
51 pub error_aggregation: ErrorAggregationConfig,
52 #[cfg(feature = "log_throttling")]
53 pub throttling_config: Option<LogThrottlingConfig>,
54}
55
56#[derive(Debug, Clone)]
57pub struct FileLoggingConfig {
58 pub path: PathBuf,
59 pub file_prefix: Option<String>,
62 pub rotation: Option<LogRotation>,
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct LogThrottlingConfig {
68 pub summary_emission_interval: Option<Duration>,
70 pub metrics_emission_interval: Option<Duration>,
72 pub excluded_fields: Option<Vec<String>>,
80 pub exemptions: Option<Vec<String>>,
89}
90
91pub struct LogSetupReturn {
92 pub reload_handle: LogReloadHandle,
93 pub log_guards: (WorkerGuard, Option<WorkerGuard>),
94 pub otel_guards: Option<OtelGuards>,
97 #[cfg(feature = "error_aggregation")]
98 pub errors_container: Arc<ErrorAggregationContainer>,
99 #[cfg(feature = "log_throttling")]
100 pub log_throttling_handle: TracingRateLimitLayer,
102}
103
104struct LoggingSubscriberParts {
106 subscriber: Box<dyn Subscriber + Send + Sync + 'static>,
107 reload_handle: LogReloadHandle,
108 log_guards: (WorkerGuard, Option<WorkerGuard>), otel_guards: Option<OtelGuards>,
111 #[cfg(feature = "error_aggregation")]
112 errors_container: Arc<ErrorAggregationContainer>,
113 #[cfg(feature = "log_throttling")]
114 log_throttling_handle: TracingRateLimitLayer,
115}
116
117fn build_logging_subscriber(config: LoggingConfig) -> eyre::Result<LoggingSubscriberParts> {
120 let global_env_filter = build_env_filter(config.level)?;
122 let (reloadable_global_filter, global_reload_handle) = reload::Layer::new(global_env_filter);
123
124 let (non_blocking_stdout, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
125
126 let stdout_layer = tracing_subscriber::fmt::layer()
127 .with_thread_names(true)
128 .with_line_number(true)
129 .with_writer(non_blocking_stdout);
130
131 #[cfg(feature = "error_aggregation")]
132 let error_aggregation_config = config.error_aggregation.clone();
133
134 #[cfg(feature = "log_throttling")]
135 let throttling_config = config.throttling_config.clone().unwrap_or_default();
136
137 #[cfg(feature = "log_throttling")]
138 let mut exemptions = vec!["tracing_throttle::infrastructure::layer".to_string()]; #[cfg(feature = "log_throttling")]
140 exemptions.extend(throttling_config.exemptions.unwrap_or_default());
141
142 #[cfg(feature = "log_throttling")]
143 let rate_limit_filter = TracingRateLimitLayer::builder()
144 .with_excluded_fields(throttling_config.excluded_fields.unwrap_or_default())
145 .with_exempt_targets(exemptions)
146 .with_active_emission(throttling_config.summary_emission_interval.is_some())
147 .with_summary_interval(
148 throttling_config
149 .summary_emission_interval
150 .unwrap_or(Duration::from_secs(5 * 60)),
151 )
152 .build()
153 .expect("Error building tracing rate limit layer");
154
155 #[cfg(feature = "log_throttling")]
156 if let Some(metrics_duration) = throttling_config.metrics_emission_interval {
157 let metrics = rate_limit_filter.metrics().clone();
158
159 tokio::spawn(async move {
161 loop {
162 tokio::time::sleep(metrics_duration).await;
163
164 let snapshot = metrics.snapshot();
165 tracing::info!(
166 events_allowed = snapshot.events_allowed,
167 events_suppressed = snapshot.events_suppressed,
168 suppression_rate = format!("{:.1}%", snapshot.suppression_rate() * 100.0),
169 "Rate limiting metrics"
170 );
171 }
172 });
173 }
174
175 #[cfg(feature = "log_throttling")]
176 let log_throttling_handle = rate_limit_filter.clone();
177
178 let (file_layer, file_guard) = match config.file_config {
179 None => (None, None),
180 Some(file_config) => {
181 let appender = RollingFileAppender::builder()
182 .rotation(file_config.rotation.unwrap_or(LogRotation::NEVER))
183 .filename_prefix(file_config.file_prefix.unwrap_or_else(|| {
184 chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)
185 })) .filename_suffix("log")
187 .build(file_config.path)?;
188
189 let (non_blocking_appender, guard) = tracing_appender::non_blocking(appender);
190
191 let base_layer = tracing_subscriber::fmt::layer()
192 .with_thread_names(true)
193 .with_line_number(true)
194 .with_ansi(false)
195 .with_writer(non_blocking_appender);
196
197 #[cfg(feature = "log_throttling")]
198 let file_layer = base_layer.with_filter(rate_limit_filter);
199
200 #[cfg(not(feature = "log_throttling"))]
201 let file_layer = base_layer;
202
203 (Some(file_layer), Some(guard))
204 }
205 };
206
207 let reload_handle = LogReloadHandle(global_reload_handle);
208
209 let otel_result = otel::build_otel_layer(&config.otel_config);
211 let otel_guards = otel_result.guards;
212 let otel_tracer = otel_result.tracer;
213
214 let subscriber = registry().with(reloadable_global_filter);
218
219 let sinks = stdout_layer.and_then(file_layer);
221
222 #[cfg(feature = "error_aggregation")]
224 let (sinks, errors_container) = {
225 use crate::libs::log::error_aggregation::get_error_aggregation;
226 let (container, error_layer) = get_error_aggregation(error_aggregation_config);
227 (sinks.and_then(error_layer), container)
228 };
229
230 let subscriber = subscriber.with(sinks);
232
233 let subscriber: Box<dyn Subscriber + Send + Sync + 'static> = match (otel_tracer, &otel_guards)
235 {
236 (Some(tracer), Some(guards)) => {
237 let trace_layer = OpenTelemetryLayer::new(tracer);
238 let log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
239 &guards.logger_provider,
240 );
241 Box::new(subscriber.with(trace_layer).with(log_layer))
242 }
243 _ => Box::new(subscriber),
244 };
245
246 Ok(LoggingSubscriberParts {
247 subscriber,
248 reload_handle,
249 log_guards: (stdout_guard, file_guard),
250 otel_guards,
251 #[cfg(feature = "error_aggregation")]
252 errors_container,
253 #[cfg(feature = "log_throttling")]
254 log_throttling_handle,
255 })
256}
257
258pub fn setup_logging(config: LoggingConfig) -> eyre::Result<LogSetupReturn> {
275 use tracing_subscriber::util::SubscriberInitExt;
276
277 let parts = build_logging_subscriber(config)?;
278
279 parts.subscriber.init();
280
281 Ok(LogSetupReturn {
282 reload_handle: parts.reload_handle,
283 log_guards: parts.log_guards,
284 otel_guards: parts.otel_guards,
285 #[cfg(feature = "error_aggregation")]
286 errors_container: parts.errors_container,
287 #[cfg(feature = "log_throttling")]
288 log_throttling_handle: parts.log_throttling_handle,
289 })
290}
291
292pub type GlobalLogReloadHandle = Handle<EnvFilter, Registry>;
294
295#[derive(Clone)]
297pub struct LogReloadHandle(GlobalLogReloadHandle);
298
299impl std::fmt::Debug for LogReloadHandle {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 f.debug_struct("LogReloadHandle")
302 .field("inner", &"Handle<EnvFilter, Registry>")
303 .finish()
304 }
305}
306
307impl LogReloadHandle {
308 pub fn set_log_level(&self, level: LogLevel) -> eyre::Result<()> {
311 match build_env_filter(level) {
312 Ok(filter) => match self.0.modify(|env_filter| *env_filter = filter) {
313 Ok(_) => Ok(()),
314 Err(error) => {
315 tracing::error!(
316 ?error,
317 "Error setting new global log filter. Ignoring reload attempt"
318 );
319 bail!("Error setting new global log filter: {error}. Ignoring reload attempt")
320 }
321 },
322 Err(error) => {
323 tracing::error!(
324 ?error,
325 "Error building new filter from given log level. Ignoring reload attempt"
326 );
327 bail!(
328 "Error building new filter from given log level: {error}. Ignoring reload attempt"
329 )
330 }
331 }
332 }
333}
334
335pub struct CustomEyreHandler {
339 default_handler: Box<dyn EyreHandler>,
340 location: Option<&'static std::panic::Location<'static>>,
341}
342
343impl CustomEyreHandler {
344 pub fn default_with_location_saving(
345 error: &(dyn std::error::Error + 'static),
346 ) -> Box<dyn EyreHandler> {
347 Box::new(Self {
348 default_handler: DefaultHandler::default_with(error),
349 location: None,
350 })
351 }
352
353 pub fn get_location(&self) -> &Option<&'static std::panic::Location<'static>> {
354 &self.location
355 }
356}
357
358impl EyreHandler for CustomEyreHandler {
359 fn display(
360 &self,
361 error: &(dyn std::error::Error + 'static),
362 f: &mut core::fmt::Formatter<'_>,
363 ) -> core::fmt::Result {
364 self.default_handler.display(error, f)
365 }
366
367 fn debug(
368 &self,
369 error: &(dyn std::error::Error + 'static),
370 f: &mut core::fmt::Formatter<'_>,
371 ) -> core::fmt::Result {
372 self.default_handler.debug(error, f)
373 }
374
375 fn track_caller(&mut self, location: &'static std::panic::Location<'static>) {
376 self.location = Some(location); self.default_handler.track_caller(location);
378 }
379}
380
381#[cfg(test)]
382pub struct LogSetupReturnTest {
383 _guard: tracing::subscriber::DefaultGuard,
384 reload_handle: LogReloadHandle,
385 #[allow(dead_code)]
386 log_guards: (WorkerGuard, Option<WorkerGuard>),
387 #[allow(dead_code)]
388 otel_guards: Option<OtelGuards>,
389 #[cfg(feature = "error_aggregation")]
390 #[allow(dead_code)]
391 errors_container: Arc<ErrorAggregationContainer>,
392}
393
394#[cfg(test)]
397pub fn setup_logging_test(config: LoggingConfig) -> eyre::Result<LogSetupReturnTest> {
398 let parts = build_logging_subscriber(config)?;
399
400 let guard = tracing::subscriber::set_default(parts.subscriber);
402
403 Ok(LogSetupReturnTest {
404 _guard: guard,
405 reload_handle: parts.reload_handle,
406 log_guards: parts.log_guards,
407 otel_guards: parts.otel_guards,
408 #[cfg(feature = "error_aggregation")]
409 errors_container: parts.errors_container,
410 })
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use std::fs;
417 use tracing::{debug, error, info};
418
419 #[cfg(feature = "error_aggregation")]
420 fn default_error_aggregation_config() -> ErrorAggregationConfig {
421 ErrorAggregationConfig {
422 limit: 100,
423 normalize: true,
424 }
425 }
426
427 #[tokio::test]
429 async fn test_basic_logging_stdout() {
430 let config = LoggingConfig {
431 level: LogLevel::Info,
432 file_config: None,
433 otel_config: OtelConfig::default(),
434 #[cfg(feature = "error_aggregation")]
435 error_aggregation: default_error_aggregation_config(),
436 #[cfg(feature = "log_throttling")]
437 throttling_config: Some(LogThrottlingConfig::default()),
438 };
439
440 let _guard = setup_logging_test(config);
441 assert!(_guard.is_ok(), "Failed to setup logging");
442
443 info!("Test info message");
445 error!("Test error message");
446 }
447
448 #[tokio::test]
453 async fn test_file_logging_comprehensive() {
454 let temp_dir = tempfile::tempdir().unwrap();
455
456 let config = LoggingConfig {
457 level: LogLevel::Info,
458 file_config: Some(FileLoggingConfig {
459 path: temp_dir.path().to_path_buf(),
460 file_prefix: Some("test".to_string()),
461 rotation: None,
462 }),
463 otel_config: OtelConfig::default(),
464 #[cfg(feature = "error_aggregation")]
465 error_aggregation: default_error_aggregation_config(),
466 #[cfg(feature = "log_throttling")]
467 throttling_config: Some(LogThrottlingConfig::default()),
468 };
469
470 let log_setup = setup_logging_test(config).unwrap();
471
472 info!("info_message_before_reload");
474 debug!("debug_message_before_reload");
475 error!("error_message");
476
477 std::thread::sleep(std::time::Duration::from_millis(100));
478
479 let result = log_setup.reload_handle.set_log_level(LogLevel::Debug);
481 assert!(result.is_ok());
482
483 info!("info_message_after_reload");
485 debug!("debug_message_after_reload");
486
487 std::thread::sleep(std::time::Duration::from_millis(100));
488 drop(log_setup);
489
490 std::thread::sleep(std::time::Duration::from_millis(100));
491
492 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
494 assert_eq!(log_files.len(), 1, "Expected exactly one log file");
495
496 let log_file = log_files[0].as_ref().unwrap();
497 let log_contents = fs::read_to_string(log_file.path()).unwrap();
498
499 assert!(log_contents.contains("info_message_before_reload"));
501 assert!(
502 !log_contents.contains("debug_message_before_reload"),
503 "Debug should not appear before reload"
504 );
505 assert!(log_contents.contains("error_message"));
506
507 assert!(log_contents.contains("info_message_after_reload"));
509
510 assert!(
512 log_contents.contains("debug_message_after_reload"),
513 "Debug should appear after reload to DEBUG level"
514 );
515 }
516
517 #[tokio::test]
522 async fn test_otel_disabled_returns_none_guards() {
523 let config = LoggingConfig {
524 level: LogLevel::Info,
525 file_config: None,
526 otel_config: OtelConfig::default(), #[cfg(feature = "error_aggregation")]
528 error_aggregation: default_error_aggregation_config(),
529 #[cfg(feature = "log_throttling")]
530 throttling_config: None,
531 };
532
533 let result = setup_logging_test(config);
534 assert!(result.is_ok(), "Setup should succeed with OTel disabled");
535
536 let guard = result.unwrap();
537 assert!(
538 guard.otel_guards.is_none(),
539 "otel_guards should be None when OTel is disabled"
540 );
541 }
542
543 #[tokio::test]
547 async fn test_otel_graceful_degradation_unreachable_endpoint() {
548 let config = LoggingConfig {
549 level: LogLevel::Info,
550 file_config: None,
551 otel_config: OtelConfig {
552 enabled: true,
553 endpoint: Some("http://nonexistent.invalid:4317".to_string()),
554 ..OtelConfig::default()
555 },
556 #[cfg(feature = "error_aggregation")]
557 error_aggregation: default_error_aggregation_config(),
558 #[cfg(feature = "log_throttling")]
559 throttling_config: None,
560 };
561
562 let result = setup_logging_test(config);
564 assert!(
565 result.is_ok(),
566 "Setup should succeed even with unreachable OTel endpoint"
567 );
568
569 let guard = result.unwrap();
570 assert!(
574 guard.otel_guards.is_some(),
575 "otel_guards should be Some even with unreachable endpoint (SDK initializes synchronously)"
576 );
577 }
578
579 #[tokio::test]
582 async fn test_file_logging_with_otel_attempted() {
583 let temp_dir = tempfile::tempdir().unwrap();
584
585 let config = LoggingConfig {
586 level: LogLevel::Info,
587 file_config: Some(FileLoggingConfig {
588 path: temp_dir.path().to_path_buf(),
589 file_prefix: Some("otel_test".to_string()),
590 rotation: None,
591 }),
592 otel_config: OtelConfig {
593 enabled: true,
594 endpoint: Some("http://localhost:4317".to_string()), ..OtelConfig::default()
596 },
597 #[cfg(feature = "error_aggregation")]
598 error_aggregation: default_error_aggregation_config(),
599 #[cfg(feature = "log_throttling")]
600 throttling_config: None,
601 };
602
603 let log_setup = setup_logging_test(config).unwrap();
604
605 info!("file_log_with_otel_attempted");
607 error!("error_log_with_otel_attempted");
608
609 std::thread::sleep(std::time::Duration::from_millis(100));
610 drop(log_setup);
611 std::thread::sleep(std::time::Duration::from_millis(100));
612
613 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
615 assert_eq!(log_files.len(), 1, "Expected exactly one log file");
616
617 let log_contents = fs::read_to_string(log_files[0].as_ref().unwrap().path()).unwrap();
618 assert!(log_contents.contains("file_log_with_otel_attempted"));
619 assert!(log_contents.contains("error_log_with_otel_attempted"));
620 }
621
622 #[tokio::test]
624 async fn test_log_level_reload_with_otel_enabled() {
625 let temp_dir = tempfile::tempdir().unwrap();
626
627 let config = LoggingConfig {
628 level: LogLevel::Info,
629 file_config: Some(FileLoggingConfig {
630 path: temp_dir.path().to_path_buf(),
631 file_prefix: Some("reload_test".to_string()),
632 rotation: None,
633 }),
634 otel_config: OtelConfig {
635 enabled: true,
636 endpoint: Some("http://localhost:4317".to_string()),
637 ..OtelConfig::default()
638 },
639 #[cfg(feature = "error_aggregation")]
640 error_aggregation: default_error_aggregation_config(),
641 #[cfg(feature = "log_throttling")]
642 throttling_config: None,
643 };
644
645 let log_setup = setup_logging_test(config).unwrap();
646
647 info!("info_before_reload");
649 debug!("debug_before_reload"); std::thread::sleep(std::time::Duration::from_millis(100));
652
653 let result = log_setup.reload_handle.set_log_level(LogLevel::Debug);
655 assert!(
656 result.is_ok(),
657 "Log level reload should succeed with OTel enabled"
658 );
659
660 info!("info_after_reload");
662 debug!("debug_after_reload");
663
664 std::thread::sleep(std::time::Duration::from_millis(100));
665 drop(log_setup);
666 std::thread::sleep(std::time::Duration::from_millis(100));
667
668 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
669 let log_contents = fs::read_to_string(log_files[0].as_ref().unwrap().path()).unwrap();
670
671 assert!(log_contents.contains("info_before_reload"));
672 assert!(
673 !log_contents.contains("debug_before_reload"),
674 "Debug should not appear before reload"
675 );
676 assert!(log_contents.contains("info_after_reload"));
677 assert!(
678 log_contents.contains("debug_after_reload"),
679 "Debug should appear after reload"
680 );
681 }
682
683 #[tokio::test]
685 async fn test_otel_config_fields() {
686 use std::collections::HashMap;
687
688 let config = OtelConfig {
689 enabled: true,
690 service_name: Some("test-service".to_string()),
691 endpoint: Some("http://collector:4317".to_string()),
692 headers: {
693 let mut h = HashMap::new();
694 h.insert("x-api-key".to_string(), "secret-key".to_string());
695 h
696 },
697 };
698
699 assert!(config.enabled);
700 assert_eq!(config.service_name, Some("test-service".to_string()));
701 assert_eq!(config.endpoint, Some("http://collector:4317".to_string()));
702 assert_eq!(
703 config.headers.get("x-api-key"),
704 Some(&"secret-key".to_string())
705 );
706
707 let default_config = OtelConfig::default();
709 assert!(!default_config.enabled);
710 assert!(default_config.service_name.is_none());
711 assert!(default_config.endpoint.is_none());
712 assert!(default_config.headers.is_empty());
713 }
714
715 #[tokio::test]
718 async fn test_otel_enabled_no_endpoint_uses_fallback() {
719 let config = LoggingConfig {
720 level: LogLevel::Info,
721 file_config: None,
722 otel_config: OtelConfig {
723 enabled: true,
724 endpoint: None, ..OtelConfig::default()
726 },
727 #[cfg(feature = "error_aggregation")]
728 error_aggregation: default_error_aggregation_config(),
729 #[cfg(feature = "log_throttling")]
730 throttling_config: None,
731 };
732
733 let result = setup_logging_test(config);
735 assert!(
736 result.is_ok(),
737 "Setup should succeed with OTel enabled but no endpoint"
738 );
739
740 let guard = result.unwrap();
741 let _ = guard.otel_guards;
744 }
745}