1use std::{path::PathBuf, time::Duration};
4
5use chrono::SecondsFormat;
6use eyre::{DefaultHandler, EyreHandler, bail};
7use tracing::Subscriber;
8use tracing_appender::rolling::RollingFileAppender;
9use tracing_subscriber::{
10 EnvFilter, Layer, Registry,
11 layer::SubscriberExt,
12 registry,
13 reload::{self, Handle},
14};
15
16#[cfg(feature = "log_throttling")]
17use tracing_throttle::TracingRateLimitLayer;
18
19#[cfg(feature = "error_aggregation")]
20use {crate::libs::log::error_aggregation::*, std::sync::Arc};
21
22#[deprecated(
23 since = "1.3.0",
24 note = "This code is not used in current projects and should probably not be used going forward"
25)]
26pub mod legacy;
27
28#[cfg(feature = "error_aggregation")]
29pub mod error_aggregation;
30pub mod level_filter;
31
32pub use level_filter::*;
33
34pub use tracing_appender::rolling::Rotation as LogRotation;
36
37pub use tracing_appender::non_blocking::WorkerGuard;
38
39#[derive(Debug)]
40pub struct LoggingConfig {
41 pub level: LogLevel,
42 pub file_config: Option<FileLoggingConfig>,
43 #[cfg(feature = "error_aggregation")]
44 pub error_aggregation: ErrorAggregationConfig,
45 #[cfg(feature = "log_throttling")]
46 pub throttling_config: Option<LogThrottlingConfig>,
47}
48
49#[derive(Debug, Clone)]
50pub struct FileLoggingConfig {
51 pub path: PathBuf,
52 pub file_prefix: Option<String>,
55 pub rotation: Option<LogRotation>,
57}
58
59#[derive(Debug, Default, Clone)]
60pub struct LogThrottlingConfig {
61 pub summary_emission_interval: Option<Duration>,
63 pub metrics_emission_interval: Option<Duration>,
65 pub excluded_fields: Option<Vec<String>>,
73 pub exemptions: Option<Vec<String>>,
82}
83
84pub struct LogSetupReturn {
85 pub reload_handle: LogReloadHandle,
86 pub log_guards: (WorkerGuard, Option<WorkerGuard>),
87 #[cfg(feature = "error_aggregation")]
88 pub errors_container: Arc<ErrorAggregationContainer>,
89 #[cfg(feature = "log_throttling")]
90 pub log_throttling_handle: TracingRateLimitLayer,
92}
93
94struct LoggingSubscriberParts {
96 subscriber: Box<dyn Subscriber + Send + Sync + 'static>,
97 reload_handle: LogReloadHandle,
98 log_guards: (WorkerGuard, Option<WorkerGuard>), #[cfg(feature = "error_aggregation")]
100 errors_container: Arc<ErrorAggregationContainer>,
101 #[cfg(feature = "log_throttling")]
102 log_throttling_handle: TracingRateLimitLayer,
103}
104
105fn build_logging_subscriber(config: LoggingConfig) -> eyre::Result<LoggingSubscriberParts> {
108 let global_env_filter = build_env_filter(config.level)?;
110 let (reloadable_global_filter, global_reload_handle) = reload::Layer::new(global_env_filter);
111
112 let (non_blocking_stdout, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
113
114 let stdout_layer = tracing_subscriber::fmt::layer()
115 .with_thread_names(true)
116 .with_line_number(true)
117 .with_writer(non_blocking_stdout);
118
119 #[cfg(feature = "error_aggregation")]
120 let error_aggregation_config = config.error_aggregation.clone();
121
122 #[cfg(feature = "log_throttling")]
123 let throttling_config = config.throttling_config.clone().unwrap_or_default();
124
125 #[cfg(feature = "log_throttling")]
126 let mut exemptions = vec!["tracing_throttle::infrastructure::layer".to_string()]; #[cfg(feature = "log_throttling")]
128 exemptions.extend(throttling_config.exemptions.unwrap_or_default());
129
130 #[cfg(feature = "log_throttling")]
131 let rate_limit_filter = TracingRateLimitLayer::builder()
132 .with_excluded_fields(throttling_config.excluded_fields.unwrap_or_default())
133 .with_exempt_targets(exemptions)
134 .with_active_emission(throttling_config.summary_emission_interval.is_some())
135 .with_summary_interval(
136 throttling_config
137 .summary_emission_interval
138 .unwrap_or(Duration::from_secs(5 * 60)),
139 )
140 .build()
141 .expect("Error building tracing rate limit layer");
142
143 #[cfg(feature = "log_throttling")]
144 if let Some(metrics_duration) = throttling_config.metrics_emission_interval {
145 let metrics = rate_limit_filter.metrics().clone();
146
147 tokio::spawn(async move {
149 loop {
150 tokio::time::sleep(metrics_duration).await;
151
152 let snapshot = metrics.snapshot();
153 tracing::info!(
154 events_allowed = snapshot.events_allowed,
155 events_suppressed = snapshot.events_suppressed,
156 suppression_rate = format!("{:.1}%", snapshot.suppression_rate() * 100.0),
157 "Rate limiting metrics"
158 );
159 }
160 });
161 }
162
163 #[cfg(feature = "log_throttling")]
164 let log_throttling_handle = rate_limit_filter.clone();
165
166 let (file_layer, file_guard) = match config.file_config {
167 None => (None, None),
168 Some(file_config) => {
169 let appender = RollingFileAppender::builder()
170 .rotation(file_config.rotation.unwrap_or(LogRotation::NEVER))
171 .filename_prefix(file_config.file_prefix.unwrap_or_else(|| {
172 chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)
173 })) .filename_suffix("log")
175 .build(file_config.path)?;
176
177 let (non_blocking_appender, guard) = tracing_appender::non_blocking(appender);
178
179 let base_layer = tracing_subscriber::fmt::layer()
180 .with_thread_names(true)
181 .with_line_number(true)
182 .with_ansi(false)
183 .with_writer(non_blocking_appender);
184
185 #[cfg(feature = "log_throttling")]
186 let file_layer = base_layer.with_filter(rate_limit_filter);
187
188 #[cfg(not(feature = "log_throttling"))]
189 let file_layer = base_layer;
190
191 (Some(file_layer), Some(guard))
192 }
193 };
194
195 let reload_handle = LogReloadHandle(global_reload_handle);
196
197 #[cfg(feature = "error_aggregation")]
200 {
201 use crate::libs::log::error_aggregation::get_error_aggregation;
202
203 let (container, error_layer) = get_error_aggregation(error_aggregation_config);
204
205 let subscriber = registry()
206 .with(reloadable_global_filter) .with(stdout_layer.and_then(file_layer).and_then(error_layer));
208
209 #[cfg(feature = "log_throttling")]
210 return Ok(LoggingSubscriberParts {
211 subscriber: Box::new(subscriber),
212 reload_handle,
213 log_guards: (stdout_guard, file_guard),
214 errors_container: container,
215 log_throttling_handle,
216 });
217
218 #[cfg(not(feature = "log_throttling"))]
219 return Ok(LoggingSubscriberParts {
220 subscriber: Box::new(subscriber),
221 reload_handle,
222 log_guards: (stdout_guard, file_guard),
223 errors_container: container,
224 });
225 }
226
227 #[cfg(not(feature = "error_aggregation"))]
228 {
229 let subscriber = registry()
230 .with(reloadable_global_filter) .with(stdout_layer.and_then(file_layer));
232
233 #[cfg(feature = "log_throttling")]
234 return Ok(LoggingSubscriberParts {
235 subscriber: Box::new(subscriber),
236 reload_handle,
237 log_guards: (stdout_guard, file_guard),
238 log_throttling_handle,
239 });
240
241 #[cfg(not(feature = "log_throttling"))]
242 return Ok(LoggingSubscriberParts {
243 subscriber: Box::new(subscriber),
244 reload_handle,
245 log_guards: (stdout_guard, file_guard),
246 });
247 }
248}
249
250pub fn setup_logging(config: LoggingConfig) -> eyre::Result<LogSetupReturn> {
266 use tracing_subscriber::util::SubscriberInitExt;
267
268 let parts = build_logging_subscriber(config)?;
269
270 parts.subscriber.init();
271
272 #[cfg(feature = "error_aggregation")]
273 {
274 #[cfg(feature = "log_throttling")]
275 return Ok(LogSetupReturn {
276 reload_handle: parts.reload_handle,
277 log_guards: parts.log_guards,
278 errors_container: parts.errors_container,
279 log_throttling_handle: parts.log_throttling_handle,
280 });
281 #[cfg(not(feature = "log_throttling"))]
282 return Ok(LogSetupReturn {
283 reload_handle: parts.reload_handle,
284 log_guards: parts.log_guards,
285 errors_container: parts.errors_container,
286 });
287 }
288
289 #[cfg(not(feature = "error_aggregation"))]
290 {
291 #[cfg(feature = "log_throttling")]
292 return Ok(LogSetupReturn {
293 reload_handle: parts.reload_handle,
294 log_guards: parts.log_guards,
295 log_throttling_handle: parts.log_throttling_handle,
296 });
297 #[cfg(not(feature = "log_throttling"))]
298 return Ok(LogSetupReturn {
299 reload_handle: parts.reload_handle,
300 log_guards: parts.log_guards,
301 });
302 }
303}
304
305pub type GlobalLogReloadHandle = Handle<EnvFilter, Registry>;
307
308#[derive(Clone)]
310pub struct LogReloadHandle(GlobalLogReloadHandle);
311
312impl std::fmt::Debug for LogReloadHandle {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 f.debug_struct("LogReloadHandle")
315 .field("inner", &"Handle<EnvFilter, Registry>")
316 .finish()
317 }
318}
319
320impl LogReloadHandle {
321 pub fn set_log_level(&self, level: LogLevel) -> eyre::Result<()> {
324 match build_env_filter(level) {
325 Ok(filter) => match self.0.modify(|env_filter| *env_filter = filter) {
326 Ok(_) => Ok(()),
327 Err(error) => {
328 tracing::error!(
329 ?error,
330 "Error setting new global log filter. Ignoring reload attempt"
331 );
332 bail!("Error setting new global log filter: {error}. Ignoring reload attempt")
333 }
334 },
335 Err(error) => {
336 tracing::error!(
337 ?error,
338 "Error building new filter from given log level. Ignoring reload attempt"
339 );
340 bail!(
341 "Error building new filter from given log level: {error}. Ignoring reload attempt"
342 )
343 }
344 }
345 }
346}
347
348pub struct CustomEyreHandler {
352 default_handler: Box<dyn EyreHandler>,
353 location: Option<&'static std::panic::Location<'static>>,
354}
355
356impl CustomEyreHandler {
357 pub fn default_with_location_saving(
358 error: &(dyn std::error::Error + 'static),
359 ) -> Box<dyn EyreHandler> {
360 Box::new(Self {
361 default_handler: DefaultHandler::default_with(error),
362 location: None,
363 })
364 }
365
366 pub fn get_location(&self) -> &Option<&'static std::panic::Location<'static>> {
367 &self.location
368 }
369}
370
371impl EyreHandler for CustomEyreHandler {
372 fn display(
373 &self,
374 error: &(dyn std::error::Error + 'static),
375 f: &mut core::fmt::Formatter<'_>,
376 ) -> core::fmt::Result {
377 self.default_handler.display(error, f)
378 }
379
380 fn debug(
381 &self,
382 error: &(dyn std::error::Error + 'static),
383 f: &mut core::fmt::Formatter<'_>,
384 ) -> core::fmt::Result {
385 self.default_handler.debug(error, f)
386 }
387
388 fn track_caller(&mut self, location: &'static std::panic::Location<'static>) {
389 self.location = Some(location); self.default_handler.track_caller(location);
391 }
392}
393
394#[cfg(test)]
395pub struct LogSetupReturnTest {
396 _guard: tracing::subscriber::DefaultGuard,
397 reload_handle: LogReloadHandle,
398 #[allow(dead_code)]
399 log_guards: (WorkerGuard, Option<WorkerGuard>),
400 #[cfg(feature = "error_aggregation")]
401 #[allow(dead_code)]
402 errors_container: Arc<ErrorAggregationContainer>,
403}
404
405#[cfg(test)]
408pub fn setup_logging_test(config: LoggingConfig) -> eyre::Result<LogSetupReturnTest> {
409 let parts = build_logging_subscriber(config)?;
410
411 let guard = tracing::subscriber::set_default(parts.subscriber);
413
414 #[cfg(feature = "error_aggregation")]
415 {
416 Ok(LogSetupReturnTest {
417 _guard: guard,
418 reload_handle: parts.reload_handle,
419 log_guards: parts.log_guards,
420 errors_container: parts.errors_container,
421 })
422 }
423
424 #[cfg(not(feature = "error_aggregation"))]
425 {
426 Ok(LogSetupReturnTest {
427 _guard: guard,
428 reload_handle: parts.reload_handle,
429 log_guards: parts.log_guards,
430 })
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use std::fs;
438 use tracing::{debug, error, info};
439
440 #[cfg(feature = "error_aggregation")]
441 fn default_error_aggregation_config() -> ErrorAggregationConfig {
442 ErrorAggregationConfig {
443 limit: 100,
444 normalize: true,
445 }
446 }
447
448 #[tokio::test]
450 async fn test_basic_logging_stdout() {
451 let config = LoggingConfig {
452 level: LogLevel::Info,
453 file_config: None,
454 #[cfg(feature = "error_aggregation")]
455 error_aggregation: default_error_aggregation_config(),
456 #[cfg(feature = "log_throttling")]
457 throttling_config: Some(LogThrottlingConfig::default()),
458 };
459
460 let _guard = setup_logging_test(config);
461 assert!(_guard.is_ok(), "Failed to setup logging");
462
463 info!("Test info message");
465 error!("Test error message");
466 }
467
468 #[tokio::test]
473 async fn test_file_logging_comprehensive() {
474 let temp_dir = tempfile::tempdir().unwrap();
475
476 let config = LoggingConfig {
477 level: LogLevel::Info,
478 file_config: Some(FileLoggingConfig {
479 path: temp_dir.path().to_path_buf(),
480 file_prefix: Some("test".to_string()),
481 rotation: None,
482 }),
483 #[cfg(feature = "error_aggregation")]
484 error_aggregation: default_error_aggregation_config(),
485 #[cfg(feature = "log_throttling")]
486 throttling_config: Some(LogThrottlingConfig::default()),
487 };
488
489 let log_setup = setup_logging_test(config).unwrap();
490
491 info!("info_message_before_reload");
493 debug!("debug_message_before_reload");
494 error!("error_message");
495
496 std::thread::sleep(std::time::Duration::from_millis(100));
497
498 let result = log_setup.reload_handle.set_log_level(LogLevel::Debug);
500 assert!(result.is_ok());
501
502 info!("info_message_after_reload");
504 debug!("debug_message_after_reload");
505
506 std::thread::sleep(std::time::Duration::from_millis(100));
507 drop(log_setup);
508
509 std::thread::sleep(std::time::Duration::from_millis(100));
510
511 let log_files: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
513 assert_eq!(log_files.len(), 1, "Expected exactly one log file");
514
515 let log_file = log_files[0].as_ref().unwrap();
516 let log_contents = fs::read_to_string(log_file.path()).unwrap();
517
518 assert!(log_contents.contains("info_message_before_reload"));
520 assert!(
521 !log_contents.contains("debug_message_before_reload"),
522 "Debug should not appear before reload"
523 );
524 assert!(log_contents.contains("error_message"));
525
526 assert!(log_contents.contains("info_message_after_reload"));
528
529 assert!(
531 log_contents.contains("debug_message_after_reload"),
532 "Debug should appear after reload to DEBUG level"
533 );
534 }
535
536 #[tokio::test]
537 async fn test_build_env_filter() {
538 let filter = build_env_filter(LogLevel::Info);
539 assert!(filter.is_ok());
540
541 let filter = build_env_filter(LogLevel::Debug);
542 assert!(filter.is_ok());
543
544 let filter = build_env_filter(LogLevel::Detail);
545 assert!(filter.is_ok());
546 }
547
548 }