impulse_server_kit/setup/
tracing_init.rs

1//! Tracing initializer module.
2
3use impulse_utils::prelude::*;
4use serde::Deserialize;
5use tracing_appender::non_blocking::WorkerGuard as TracingFileGuard;
6use tracing_rfc_5424::transport::{
7  Error as SyslogError, TcpTransport, Transport, UdpTransport, UnixSocket, UnixSocketStream,
8};
9
10#[derive(Clone, Deserialize, Default)]
11/// Tracing options.
12pub struct TracingOptions {
13  /// Enable I/O logs.
14  pub enable_io_logs: Option<bool>,
15  /// Log level; for no logging delete the line in YAML completely.
16  pub io_log_level: Option<String>,
17
18  /// Enable file logs.
19  pub enable_file_logs: Option<bool>,
20  /// File's log level. Defaults to `log_level`.
21  pub file_log_level: Option<String>,
22  /// File rolling rotation, if you have a ton of logs and need to split them.
23  pub file_log_rotation: Option<String>,
24  /// Files limitation for autoremove.
25  pub file_log_max_rolling_files: Option<u32>,
26
27  /// Enable RFC 5424 logging.
28  pub enable_syslog_logs: Option<bool>,
29  /// Address to send logs via TCP/UDP or into UNIX sockets.
30  pub syslog_addr: Option<String>,
31  /// Syslog's log level. Defaults to `log_level`.
32  pub syslog_log_level: Option<String>,
33
34  /// Enable Elastic Common Schema structured logging.
35  pub enable_ecs_logs: Option<bool>,
36  /// ECS log level. Defaults to `log_level`.
37  pub ecs_log_level: Option<String>,
38  /// ECS rolling rotation, if you have a ton of ECS logs and need to split them.
39  pub ecs_rotation: Option<String>,
40  /// ECS files limitation for autoremove.
41  pub ecs_max_rolling_files: Option<u32>,
42
43  #[cfg(feature = "otel")]
44  /// Endpoint to export OpenTelemetry via gRPC (e.g., Jaeger).
45  pub otel_grpc_endpoint: Option<String>,
46  #[cfg(feature = "otel")]
47  /// Endpoint to export OpenTelemetry via HTTP binary protocol (e.g., Prometheus).
48  pub otel_http_endpoint: Option<String>,
49  #[cfg(feature = "otel")]
50  /// OpenTelemetry log level. Defaults to `log_level`.
51  pub otel_log_level: Option<String>,
52}
53
54#[derive(Default)]
55#[allow(dead_code)]
56/// Tracing guards.
57///
58/// Holds the file guards to write logs into them.
59pub struct TracingGuards {
60  file_log_guard: Option<TracingFileGuard>,
61  ecs_log_guard: Option<TracingFileGuard>,
62}
63
64fn match_log_level(log_level: &Option<String>) -> MResult<tracing::Level> {
65  if log_level.is_some() {
66    Ok(match log_level.as_ref().unwrap().as_str() {
67      "error" => tracing::Level::ERROR,
68      "warn" => tracing::Level::WARN,
69      "info" => tracing::Level::INFO,
70      "debug" => tracing::Level::DEBUG,
71      "trace" => tracing::Level::TRACE,
72      _ => ServerError::from_public("Incorrect logging level.").with_500().bail()?,
73    })
74  } else if cfg!(debug_assertions) {
75    Ok(tracing::Level::DEBUG)
76  } else {
77    ServerError::from_public("Logging is disabled").with_500().bail()
78  }
79}
80
81fn match_log_file_rolling(log_rolling: &Option<String>) -> MResult<tracing_appender::rolling::Rotation> {
82  if let Some(log_rolling) = log_rolling {
83    Ok(match log_rolling.as_str() {
84      "never" => tracing_appender::rolling::Rotation::NEVER,
85      "daily" => tracing_appender::rolling::Rotation::DAILY,
86      "hourly" => tracing_appender::rolling::Rotation::HOURLY,
87      "minutely" => tracing_appender::rolling::Rotation::MINUTELY,
88      _ => ServerError::from_public(
89        "Incorrect level of log rotation. Choose one of the options: `never`, `daily`, `hourly`, `minutely`.",
90      )
91      .with_500()
92      .bail()?,
93    })
94  } else {
95    Ok(tracing_appender::rolling::Rotation::NEVER)
96  }
97}
98
99enum SyslogTransportWrapper {
100  Udp(UdpTransport),
101  Tcp(TcpTransport),
102  Unix(UnixSocket),
103  UnixStream(UnixSocketStream),
104}
105
106impl<F: tracing_rfc_5424::formatter::SyslogFormatter> Transport<F> for SyslogTransportWrapper {
107  type Error = SyslogError;
108
109  fn send(&self, buf: F::Output) -> Result<(), Self::Error> {
110    match self {
111      SyslogTransportWrapper::Udp(t) => {
112        <tracing_rfc_5424::transport::UdpTransport as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
113      }
114      SyslogTransportWrapper::Tcp(t) => {
115        <tracing_rfc_5424::transport::TcpTransport as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
116      }
117      SyslogTransportWrapper::Unix(t) => {
118        <tracing_rfc_5424::transport::UnixSocket as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
119      }
120      SyslogTransportWrapper::UnixStream(t) => {
121        <tracing_rfc_5424::transport::UnixSocketStream as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
122      }
123    }
124  }
125}
126
127fn match_syslog_addr(addr: &Option<String>) -> MResult<SyslogTransportWrapper> {
128  use tracing_rfc_5424::transport::*;
129
130  let addr = addr
131    .as_ref()
132    .ok_or(ServerError::from_public("Syslog export address is empty!").with_500())?;
133
134  match addr.as_str() {
135    s if s.starts_with("udp://") => UdpTransport::new(&s[6..])
136      .map(SyslogTransportWrapper::Udp)
137      .map_err(ServerError::from_private),
138    s if s.starts_with("tcp://") => TcpTransport::new(&s[6..])
139      .map(SyslogTransportWrapper::Tcp)
140      .map_err(ServerError::from_private),
141    s if s.starts_with("unix://") => UnixSocket::new(&s[7..])
142      .map(SyslogTransportWrapper::Unix)
143      .map_err(ServerError::from_private),
144    s if s.starts_with("ustream://") => UnixSocketStream::new(&s[10..])
145      .map(SyslogTransportWrapper::UnixStream)
146      .map_err(ServerError::from_private),
147    _ => Err(ServerError::from_public(
148      "Can't init syslog because of incorrect address; your address should start with `udp://`, `tcp://`, `unix://` or `ustream://`.",
149    )),
150  }
151}
152
153#[allow(dead_code)]
154fn log_filter(metadata: &tracing::Metadata) -> bool {
155  #[cfg(not(feature = "log-without-filtering"))]
156  {
157    metadata.module_path().is_none_or(|p| {
158      !(p.contains("salvo")
159        || p.contains("hyper_util")
160        || p.contains("tower")
161        || p.contains("quinn")
162        || p.contains("h2"))
163    })
164  }
165  #[cfg(feature = "log-without-filtering")]
166  {
167    true
168  }
169}
170
171impl TracingOptions {
172  /// Inits logging application-wide.
173  pub fn init(&self, app_name: &str) -> MResult<TracingGuards> {
174    use tracing_appender::rolling;
175    #[allow(unused_imports)]
176    use tracing_subscriber::filter::{LevelFilter, filter_fn};
177    use tracing_subscriber::fmt::format::FmtSpan;
178    use tracing_subscriber::prelude::*;
179    use tracing_subscriber::{fmt, registry};
180
181    #[cfg(feature = "otel")]
182    use crate::otel::api::trace::TracerProvider;
183    #[cfg(feature = "otel")]
184    use crate::otel::exporter::WithExportConfig;
185    #[cfg(feature = "otel")]
186    use crate::otel::sdk::{Resource, trace::RandomIdGenerator};
187
188    let format = fmt::format()
189      .with_level(true)
190      .with_target(true)
191      .with_thread_ids(false)
192      .with_thread_names(false)
193      .with_file(false)
194      .with_line_number(true)
195      .compact();
196
197    let io_tracer = if self.enable_io_logs.is_some_and(|v| v) {
198      let io_log_level = match_log_level(&self.io_log_level)?;
199
200      let io_tracer = fmt::layer()
201        .event_format(format.clone())
202        .with_writer(std::io::stdout)
203        .with_span_events(FmtSpan::CLOSE)
204        .with_filter(LevelFilter::from_level(io_log_level))
205        .with_filter(filter_fn(log_filter));
206      Some(io_tracer)
207    } else {
208      None
209    };
210
211    let (file_tracer, file_log_guard) = if self.enable_file_logs.is_some_and(|v| v) {
212      let file_log_level = match_log_level(&self.file_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
213      let file_log_rotation = match_log_file_rolling(&self.file_log_rotation)?;
214      let file_log_max_rolling_files = self.file_log_max_rolling_files.unwrap_or(5) as usize;
215
216      let file_appender = rolling::RollingFileAppender::builder()
217        .rotation(file_log_rotation)
218        .filename_suffix(app_name)
219        .max_log_files(file_log_max_rolling_files)
220        .build("logs")
221        .map_err(|e| {
222          ServerError::from_private(e)
223            .with_public("Failed to initialize logging to file!")
224            .with_500()
225        })?;
226      let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
227
228      let file_tracer = fmt::layer()
229        .event_format(format)
230        .with_writer(non_blocking)
231        .with_ansi(false)
232        .with_span_events(FmtSpan::CLOSE)
233        .with_filter(LevelFilter::from_level(file_log_level))
234        .with_filter(filter_fn(log_filter));
235
236      (Some(file_tracer), Some(guard))
237    } else {
238      (None, None)
239    };
240
241    let syslog_tracer = if self.enable_syslog_logs.is_some_and(|v| v) {
242      let syslog_log_level =
243        match_log_level(&self.syslog_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
244      let transport = match_syslog_addr(&self.syslog_addr)?;
245
246      let format = tracing_rfc_5424::rfc5424::Rfc5424::builder()
247        .appname_as_string(app_name.to_string())
248        .map_err(ServerError::from_private)?
249        .facility(tracing_rfc_5424::facility::Facility::LOG_USER)
250        .build();
251      let syslog_tracer = tracing_rfc_5424::layer::Layer::with_transport_and_syslog_formatter(transport, format)
252        .with_filter(LevelFilter::from_level(syslog_log_level))
253        .with_filter(filter_fn(log_filter));
254
255      Some(syslog_tracer)
256    } else {
257      None
258    };
259
260    let (ecs_tracer, ecs_log_guard) = if self.enable_ecs_logs.is_some_and(|v| v) {
261      let ecs_log_level = match_log_level(&self.ecs_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
262      let ecs_log_rotation = match_log_file_rolling(&self.ecs_rotation)?;
263      let ecs_log_max_rolling_files = self.ecs_max_rolling_files.unwrap_or(5) as usize;
264
265      let file_appender = rolling::RollingFileAppender::builder()
266        .rotation(ecs_log_rotation)
267        .filename_suffix(app_name)
268        .max_log_files(ecs_log_max_rolling_files)
269        .build("ecs-logs")
270        .map_err(|e| {
271          ServerError::from_private(e)
272            .with_public("Failed to initialize ECS logging to file!")
273            .with_500()
274        })?;
275      let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
276
277      let ecs_tracer = tracing_ecs::ECSLayerBuilder::default()
278        .normalize_json(false)
279        .with_span_events(FmtSpan::CLOSE)
280        .build_with_writer(non_blocking)
281        .with_filter(LevelFilter::from_level(ecs_log_level))
282        .with_filter(filter_fn(log_filter));
283
284      (Some(ecs_tracer), Some(guard))
285    } else {
286      (None, None)
287    };
288
289    #[cfg(feature = "otel")]
290    let otel_tracer = if let Some(otel_grpc_endpoint) = &self.otel_grpc_endpoint {
291      let otel_log_level = match_log_level(&self.otel_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
292
293      let otel_span_exporter = opentelemetry_otlp::SpanExporter::builder()
294        .with_tonic()
295        .with_protocol(opentelemetry_otlp::Protocol::Grpc)
296        .with_endpoint(otel_grpc_endpoint.as_str())
297        .with_timeout(std::time::Duration::from_secs(5))
298        .build()
299        .map_err(|e| {
300          ServerError::from_private(e)
301            .with_public("Failed to initialize OTEL gRPC telemetry!")
302            .with_500()
303        })?;
304      let otel_tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
305        .with_batch_exporter(otel_span_exporter)
306        .with_id_generator(RandomIdGenerator::default())
307        .with_max_events_per_span(32)
308        .with_max_attributes_per_span(64)
309        .with_resource(Resource::builder().with_service_name(app_name.to_string()).build())
310        .build()
311        .tracer(app_name.to_owned());
312
313      let opentelemetry = tracing_opentelemetry::layer()
314        .with_tracer(otel_tracer_provider)
315        .with_filter(LevelFilter::from_level(otel_log_level))
316        .with_filter(filter_fn(log_filter));
317
318      Some(opentelemetry)
319    } else {
320      None
321    };
322
323    #[cfg(feature = "otel")]
324    if let Some(otel_http_endpoint) = &self.otel_http_endpoint {
325      use opentelemetry_otlp::WithHttpConfig;
326
327      let otel_metric_exporter = opentelemetry_otlp::MetricExporter::builder()
328        .with_http()
329        .with_http_client(reqwest::blocking::Client::new())
330        .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
331        .with_endpoint(otel_http_endpoint.as_str())
332        .with_timeout(std::time::Duration::from_secs(5))
333        .build()
334        .map_err(|e| {
335          ServerError::from_private(e)
336            .with_public("Failed to initialize OTEL HTTP telemetry!")
337            .with_500()
338        })?;
339      let otel_metric_reader = opentelemetry_sdk::metrics::PeriodicReader::builder(otel_metric_exporter)
340        .with_interval(std::time::Duration::from_secs(5))
341        .build();
342      let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
343        .with_resource(Resource::builder().with_service_name(app_name.to_string()).build())
344        .with_reader(otel_metric_reader)
345        .build();
346      opentelemetry::global::set_meter_provider(meter_provider.clone());
347    }
348
349    let collector = registry()
350      .with(file_tracer)
351      .with(io_tracer)
352      .with(syslog_tracer)
353      .with(ecs_tracer);
354    #[cfg(feature = "otel")]
355    let collector = collector.with(otel_tracer);
356
357    tracing::subscriber::set_global_default(collector).map_err(|e| {
358      ServerError::from_private(e)
359        .with_public("Can't init global default log collector!")
360        .with_500()
361    })?;
362
363    Ok(TracingGuards {
364      file_log_guard,
365      ecs_log_guard,
366    })
367  }
368}