use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{Level, Span, debug, error, info, trace, warn};
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
use crate::{
ConnectionId,
Duration,
Instant,
Side,
frame::FrameType,
transport_parameters::TransportParameterId,
};
#[cfg(test)]
mod tests;
mod components;
mod filters;
mod formatters;
mod lifecycle;
pub mod metrics;
mod structured;
pub use components::*;
pub use filters::*;
pub use formatters::*;
pub use lifecycle::*;
pub use metrics::*;
pub use structured::*;
static LOGGER: once_cell::sync::OnceCell<Arc<Logger>> = once_cell::sync::OnceCell::new();
#[allow(clippy::expect_used)]
pub fn init_logging(config: LoggingConfig) -> Result<(), LoggingError> {
let logger = Arc::new(Logger::new(config)?);
LOGGER
.set(logger.clone())
.map_err(|_| LoggingError::AlreadyInitialized)?;
let env_filter = EnvFilter::from_default_env().add_directive(
"ant_quic=debug"
.parse()
.expect("Static directive should always parse"),
);
if logger.use_json() {
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_thread_ids(true)
.with_level(true);
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
} else {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_level(true);
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
}
info!("ant-quic logging system initialized");
Ok(())
}
#[allow(clippy::expect_used)]
pub fn logger() -> Arc<Logger> {
LOGGER.get().cloned().unwrap_or_else(|| {
let config = LoggingConfig::default();
let logger = Arc::new(Logger::new(config).expect("Failed to create default logger"));
let _ = LOGGER.set(logger.clone());
logger
})
}
pub struct Logger {
config: LoggingConfig,
metrics_collector: Arc<MetricsCollector>,
event_buffer: Arc<Mutex<Vec<LogEvent>>>,
rate_limiter: Arc<RateLimiter>,
}
impl Logger {
pub fn new(config: LoggingConfig) -> Result<Self, LoggingError> {
let rate_limit = config.rate_limit_per_second;
let buffer_size = config.event_buffer_size;
Ok(Self {
config,
metrics_collector: Arc::new(MetricsCollector::new()),
event_buffer: Arc::new(Mutex::new(Vec::with_capacity(buffer_size))),
rate_limiter: Arc::new(RateLimiter::new(rate_limit, Duration::from_secs(1))),
})
}
fn use_json(&self) -> bool {
self.config.json_output
}
pub fn log_event(&self, event: LogEvent) {
if !self.rate_limiter.should_log(event.level) {
return;
}
if let Ok(mut buffer) = self.event_buffer.lock() {
if buffer.len() < 10000 {
buffer.push(event.clone());
}
}
match event.level {
Level::ERROR => error!("{} - {}", event.target, event.message),
Level::WARN => warn!("{} - {}", event.target, event.message),
Level::INFO => info!("{} - {}", event.target, event.message),
Level::DEBUG => debug!("{} - {}", event.target, event.message),
Level::TRACE => trace!("{} - {}", event.target, event.message),
}
self.metrics_collector.record_event(&event);
}
pub fn recent_events(&self, count: usize) -> Vec<LogEvent> {
match self.event_buffer.lock() {
Ok(buffer) => buffer.iter().rev().take(count).cloned().collect(),
_ => Vec::new(),
}
}
pub fn metrics_summary(&self) -> MetricsSummary {
self.metrics_collector.summary()
}
}
#[derive(Debug, Clone)]
pub struct LoggingConfig {
pub json_output: bool,
pub rate_limit_per_second: u64,
pub component_levels: HashMap<String, Level>,
pub collect_metrics: bool,
pub event_buffer_size: usize,
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
json_output: false,
rate_limit_per_second: 1000,
component_levels: HashMap::new(),
collect_metrics: true,
event_buffer_size: 10000,
}
}
}
#[derive(Debug, Clone)]
pub struct LogEvent {
pub timestamp: Instant,
pub level: Level,
pub target: String,
pub message: String,
pub fields: HashMap<String, String>,
pub span_id: Option<String>,
}
#[derive(Debug, Clone, Copy)]
pub enum ConnectionRole {
Client,
Server,
}
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub id: ConnectionId,
pub remote_addr: SocketAddr,
pub role: ConnectionRole,
}
#[derive(Debug)]
pub struct FrameInfo {
pub frame_type: FrameType,
pub size: usize,
pub packet_number: Option<u64>,
}
#[derive(Debug)]
pub struct TransportParamInfo {
pub(crate) param_id: TransportParameterId,
pub value: Option<Vec<u8>>,
pub side: Side,
}
#[derive(Debug)]
pub struct NatTraversalInfo {
pub remote_addr: SocketAddr,
pub candidate_count: usize,
}
#[derive(Debug, Default)]
pub struct ErrorContext {
pub component: &'static str,
pub operation: &'static str,
pub connection_id: Option<ConnectionId>,
pub additional_fields: Vec<(&'static str, &'static str)>,
}
#[derive(Debug, Default)]
pub struct WarningContext {
pub component: &'static str,
pub details: Vec<(&'static str, &'static str)>,
}
#[derive(Debug, Default)]
pub struct InfoContext {
pub component: &'static str,
pub details: Vec<(&'static str, &'static str)>,
}
#[derive(Debug, Default)]
pub struct DebugContext {
pub component: &'static str,
pub details: Vec<(&'static str, &'static str)>,
}
#[derive(Debug, Default)]
pub struct TraceContext {
pub component: &'static str,
pub details: Vec<(&'static str, &'static str)>,
}
#[derive(Debug, thiserror::Error)]
pub enum LoggingError {
#[error("Logging system already initialized")]
AlreadyInitialized,
#[error("Failed to initialize tracing subscriber: {0}")]
SubscriberError(String),
}
pub struct RateLimiter {
max_events: u64,
window: Duration,
events_in_window: AtomicU64,
window_start: Mutex<Instant>,
}
impl RateLimiter {
pub fn new(max_events: u64, window: Duration) -> Self {
Self {
max_events,
window,
events_in_window: AtomicU64::new(0),
window_start: Mutex::new(Instant::now()),
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
pub fn should_log(&self, level: Level) -> bool {
if level == Level::ERROR {
return true;
}
let now = Instant::now();
let mut window_start = self
.window_start
.lock()
.expect("Mutex poisoning is unexpected in normal operation");
if now.duration_since(*window_start) > self.window {
*window_start = now;
self.events_in_window.store(0, Ordering::Relaxed);
}
let current = self.events_in_window.fetch_add(1, Ordering::Relaxed);
current < self.max_events
}
}
pub fn log_error(message: &str, context: ErrorContext) {
let mut fields = HashMap::new();
fields.insert("component".to_string(), context.component.to_string());
fields.insert("operation".to_string(), context.operation.to_string());
if let Some(conn_id) = context.connection_id {
fields.insert("conn_id".to_string(), format!("{conn_id:?}"));
}
for (key, value) in context.additional_fields {
fields.insert(key.to_string(), value.to_string());
}
logger().log_event(LogEvent {
timestamp: Instant::now(),
level: Level::ERROR,
target: format!("ant_quic::{}", context.component),
message: message.to_string(),
fields,
span_id: None,
});
}
pub fn log_warning(message: &str, context: WarningContext) {
let mut fields = HashMap::new();
fields.insert("component".to_string(), context.component.to_string());
for (key, value) in context.details {
fields.insert(key.to_string(), value.to_string());
}
logger().log_event(LogEvent {
timestamp: Instant::now(),
level: Level::WARN,
target: format!("ant_quic::{}", context.component),
message: message.to_string(),
fields,
span_id: None,
});
}
pub fn log_info(message: &str, context: InfoContext) {
let mut fields = HashMap::new();
fields.insert("component".to_string(), context.component.to_string());
for (key, value) in context.details {
fields.insert(key.to_string(), value.to_string());
}
logger().log_event(LogEvent {
timestamp: Instant::now(),
level: Level::INFO,
target: format!("ant_quic::{}", context.component),
message: message.to_string(),
fields,
span_id: None,
});
}
pub fn log_debug(message: &str, context: DebugContext) {
let mut fields = HashMap::new();
fields.insert("component".to_string(), context.component.to_string());
for (key, value) in context.details {
fields.insert(key.to_string(), value.to_string());
}
logger().log_event(LogEvent {
timestamp: Instant::now(),
level: Level::DEBUG,
target: format!("ant_quic::{}", context.component),
message: message.to_string(),
fields,
span_id: None,
});
}
pub fn log_trace(message: &str, context: TraceContext) {
let mut fields = HashMap::new();
fields.insert("component".to_string(), context.component.to_string());
for (key, value) in context.details {
fields.insert(key.to_string(), value.to_string());
}
logger().log_event(LogEvent {
timestamp: Instant::now(),
level: Level::TRACE,
target: format!("ant_quic::{}", context.component),
message: message.to_string(),
fields,
span_id: None,
});
}
pub fn create_connection_span(conn_id: &ConnectionId) -> Span {
tracing::span!(
Level::DEBUG,
"connection",
conn_id = %format!("{:?}", conn_id),
)
}
pub fn create_frame_span(frame_type: FrameType) -> Span {
tracing::span!(
Level::TRACE,
"frame",
frame_type = ?frame_type,
)
}