impulse_server_kit/setup/
tracing_init.rs1use impulse_utils::prelude::*;
4use serde::Deserialize;
5use tracing_appender::non_blocking::WorkerGuard as TracingFileGuard;
6use tracing_rfc_5424::transport::{
7 Error as SyslogError, TcpTransport, Transport, UdpTransport, UnixSocket, UnixSocketStream,
8};
9
10#[derive(Clone, Deserialize, Default)]
11pub struct TracingOptions {
13 pub enable_io_logs: Option<bool>,
15 pub io_log_level: Option<String>,
17
18 pub enable_file_logs: Option<bool>,
20 pub file_log_level: Option<String>,
22 pub file_log_rotation: Option<String>,
24 pub file_log_max_rolling_files: Option<u32>,
26
27 pub enable_syslog_logs: Option<bool>,
29 pub syslog_addr: Option<String>,
31 pub syslog_log_level: Option<String>,
33
34 pub enable_ecs_logs: Option<bool>,
36 pub ecs_log_level: Option<String>,
38 pub ecs_rotation: Option<String>,
40 pub ecs_max_rolling_files: Option<u32>,
42
43 #[cfg(feature = "otel")]
44 pub otel_grpc_endpoint: Option<String>,
46 #[cfg(feature = "otel")]
47 pub otel_http_endpoint: Option<String>,
49 #[cfg(feature = "otel")]
50 pub otel_log_level: Option<String>,
52}
53
54#[derive(Default)]
55#[allow(dead_code)]
56pub struct TracingGuards {
60 file_log_guard: Option<TracingFileGuard>,
61 ecs_log_guard: Option<TracingFileGuard>,
62}
63
64fn match_log_level(log_level: &Option<String>) -> MResult<tracing::Level> {
65 if log_level.is_some() {
66 Ok(match log_level.as_ref().unwrap().as_str() {
67 "error" => tracing::Level::ERROR,
68 "warn" => tracing::Level::WARN,
69 "info" => tracing::Level::INFO,
70 "debug" => tracing::Level::DEBUG,
71 "trace" => tracing::Level::TRACE,
72 _ => ServerError::from_public("Incorrect logging level.").with_500().bail()?,
73 })
74 } else if cfg!(debug_assertions) {
75 Ok(tracing::Level::DEBUG)
76 } else {
77 ServerError::from_public("Logging is disabled").with_500().bail()
78 }
79}
80
81fn match_log_file_rolling(log_rolling: &Option<String>) -> MResult<tracing_appender::rolling::Rotation> {
82 if let Some(log_rolling) = log_rolling {
83 Ok(match log_rolling.as_str() {
84 "never" => tracing_appender::rolling::Rotation::NEVER,
85 "daily" => tracing_appender::rolling::Rotation::DAILY,
86 "hourly" => tracing_appender::rolling::Rotation::HOURLY,
87 "minutely" => tracing_appender::rolling::Rotation::MINUTELY,
88 _ => ServerError::from_public(
89 "Incorrect level of log rotation. Choose one of the options: `never`, `daily`, `hourly`, `minutely`.",
90 )
91 .with_500()
92 .bail()?,
93 })
94 } else {
95 Ok(tracing_appender::rolling::Rotation::NEVER)
96 }
97}
98
99enum SyslogTransportWrapper {
100 Udp(UdpTransport),
101 Tcp(TcpTransport),
102 Unix(UnixSocket),
103 UnixStream(UnixSocketStream),
104}
105
106impl<F: tracing_rfc_5424::formatter::SyslogFormatter> Transport<F> for SyslogTransportWrapper {
107 type Error = SyslogError;
108
109 fn send(&self, buf: F::Output) -> Result<(), Self::Error> {
110 match self {
111 SyslogTransportWrapper::Udp(t) => {
112 <tracing_rfc_5424::transport::UdpTransport as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
113 }
114 SyslogTransportWrapper::Tcp(t) => {
115 <tracing_rfc_5424::transport::TcpTransport as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
116 }
117 SyslogTransportWrapper::Unix(t) => {
118 <tracing_rfc_5424::transport::UnixSocket as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
119 }
120 SyslogTransportWrapper::UnixStream(t) => {
121 <tracing_rfc_5424::transport::UnixSocketStream as tracing_rfc_5424::transport::Transport<F>>::send(t, buf)
122 }
123 }
124 }
125}
126
127fn match_syslog_addr(addr: &Option<String>) -> MResult<SyslogTransportWrapper> {
128 use tracing_rfc_5424::transport::*;
129
130 let addr = addr
131 .as_ref()
132 .ok_or(ServerError::from_public("Syslog export address is empty!").with_500())?;
133
134 match addr.as_str() {
135 s if s.starts_with("udp://") => UdpTransport::new(&s[6..])
136 .map(SyslogTransportWrapper::Udp)
137 .map_err(ServerError::from_private),
138 s if s.starts_with("tcp://") => TcpTransport::new(&s[6..])
139 .map(SyslogTransportWrapper::Tcp)
140 .map_err(ServerError::from_private),
141 s if s.starts_with("unix://") => UnixSocket::new(&s[7..])
142 .map(SyslogTransportWrapper::Unix)
143 .map_err(ServerError::from_private),
144 s if s.starts_with("ustream://") => UnixSocketStream::new(&s[10..])
145 .map(SyslogTransportWrapper::UnixStream)
146 .map_err(ServerError::from_private),
147 _ => Err(ServerError::from_public(
148 "Can't init syslog because of incorrect address; your address should start with `udp://`, `tcp://`, `unix://` or `ustream://`.",
149 )),
150 }
151}
152
153#[allow(dead_code)]
154fn log_filter(metadata: &tracing::Metadata) -> bool {
155 #[cfg(not(feature = "log-without-filtering"))]
156 {
157 metadata.module_path().is_none_or(|p| {
158 !(p.contains("salvo")
159 || p.contains("hyper_util")
160 || p.contains("tower")
161 || p.contains("quinn")
162 || p.contains("h2"))
163 })
164 }
165 #[cfg(feature = "log-without-filtering")]
166 {
167 true
168 }
169}
170
171impl TracingOptions {
172 pub fn init(&self, app_name: &str) -> MResult<TracingGuards> {
174 use tracing_appender::rolling;
175 #[allow(unused_imports)]
176 use tracing_subscriber::filter::{LevelFilter, filter_fn};
177 use tracing_subscriber::fmt::format::FmtSpan;
178 use tracing_subscriber::prelude::*;
179 use tracing_subscriber::{fmt, registry};
180
181 #[cfg(feature = "otel")]
182 use crate::otel::api::trace::TracerProvider;
183 #[cfg(feature = "otel")]
184 use crate::otel::exporter::WithExportConfig;
185 #[cfg(feature = "otel")]
186 use crate::otel::sdk::{Resource, trace::RandomIdGenerator};
187
188 let format = fmt::format()
189 .with_level(true)
190 .with_target(true)
191 .with_thread_ids(false)
192 .with_thread_names(false)
193 .with_file(false)
194 .with_line_number(true)
195 .compact();
196
197 let io_tracer = if self.enable_io_logs.is_some_and(|v| v) {
198 let io_log_level = match_log_level(&self.io_log_level)?;
199
200 let io_tracer = fmt::layer()
201 .event_format(format.clone())
202 .with_writer(std::io::stdout)
203 .with_span_events(FmtSpan::CLOSE)
204 .with_filter(LevelFilter::from_level(io_log_level))
205 .with_filter(filter_fn(log_filter));
206 Some(io_tracer)
207 } else {
208 None
209 };
210
211 let (file_tracer, file_log_guard) = if self.enable_file_logs.is_some_and(|v| v) {
212 let file_log_level = match_log_level(&self.file_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
213 let file_log_rotation = match_log_file_rolling(&self.file_log_rotation)?;
214 let file_log_max_rolling_files = self.file_log_max_rolling_files.unwrap_or(5) as usize;
215
216 let file_appender = rolling::RollingFileAppender::builder()
217 .rotation(file_log_rotation)
218 .filename_suffix(app_name)
219 .max_log_files(file_log_max_rolling_files)
220 .build("logs")
221 .map_err(|e| {
222 ServerError::from_private(e)
223 .with_public("Failed to initialize logging to file!")
224 .with_500()
225 })?;
226 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
227
228 let file_tracer = fmt::layer()
229 .event_format(format)
230 .with_writer(non_blocking)
231 .with_ansi(false)
232 .with_span_events(FmtSpan::CLOSE)
233 .with_filter(LevelFilter::from_level(file_log_level))
234 .with_filter(filter_fn(log_filter));
235
236 (Some(file_tracer), Some(guard))
237 } else {
238 (None, None)
239 };
240
241 let syslog_tracer = if self.enable_syslog_logs.is_some_and(|v| v) {
242 let syslog_log_level =
243 match_log_level(&self.syslog_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
244 let transport = match_syslog_addr(&self.syslog_addr)?;
245
246 let format = tracing_rfc_5424::rfc5424::Rfc5424::builder()
247 .appname_as_string(app_name.to_string())
248 .map_err(ServerError::from_private)?
249 .facility(tracing_rfc_5424::facility::Facility::LOG_USER)
250 .build();
251 let syslog_tracer = tracing_rfc_5424::layer::Layer::with_transport_and_syslog_formatter(transport, format)
252 .with_filter(LevelFilter::from_level(syslog_log_level))
253 .with_filter(filter_fn(log_filter));
254
255 Some(syslog_tracer)
256 } else {
257 None
258 };
259
260 let (ecs_tracer, ecs_log_guard) = if self.enable_ecs_logs.is_some_and(|v| v) {
261 let ecs_log_level = match_log_level(&self.ecs_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
262 let ecs_log_rotation = match_log_file_rolling(&self.ecs_rotation)?;
263 let ecs_log_max_rolling_files = self.ecs_max_rolling_files.unwrap_or(5) as usize;
264
265 let file_appender = rolling::RollingFileAppender::builder()
266 .rotation(ecs_log_rotation)
267 .filename_suffix(app_name)
268 .max_log_files(ecs_log_max_rolling_files)
269 .build("ecs-logs")
270 .map_err(|e| {
271 ServerError::from_private(e)
272 .with_public("Failed to initialize ECS logging to file!")
273 .with_500()
274 })?;
275 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
276
277 let ecs_tracer = tracing_ecs::ECSLayerBuilder::default()
278 .normalize_json(false)
279 .with_span_events(FmtSpan::CLOSE)
280 .build_with_writer(non_blocking)
281 .with_filter(LevelFilter::from_level(ecs_log_level))
282 .with_filter(filter_fn(log_filter));
283
284 (Some(ecs_tracer), Some(guard))
285 } else {
286 (None, None)
287 };
288
289 #[cfg(feature = "otel")]
290 let otel_tracer = if let Some(otel_grpc_endpoint) = &self.otel_grpc_endpoint {
291 let otel_log_level = match_log_level(&self.otel_log_level).or_else(|_| match_log_level(&self.io_log_level))?;
292
293 let otel_span_exporter = opentelemetry_otlp::SpanExporter::builder()
294 .with_tonic()
295 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
296 .with_endpoint(otel_grpc_endpoint.as_str())
297 .with_timeout(std::time::Duration::from_secs(5))
298 .build()
299 .map_err(|e| {
300 ServerError::from_private(e)
301 .with_public("Failed to initialize OTEL gRPC telemetry!")
302 .with_500()
303 })?;
304 let otel_tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
305 .with_batch_exporter(otel_span_exporter)
306 .with_id_generator(RandomIdGenerator::default())
307 .with_max_events_per_span(32)
308 .with_max_attributes_per_span(64)
309 .with_resource(Resource::builder().with_service_name(app_name.to_string()).build())
310 .build()
311 .tracer(app_name.to_owned());
312
313 let opentelemetry = tracing_opentelemetry::layer()
314 .with_tracer(otel_tracer_provider)
315 .with_filter(LevelFilter::from_level(otel_log_level))
316 .with_filter(filter_fn(log_filter));
317
318 Some(opentelemetry)
319 } else {
320 None
321 };
322
323 #[cfg(feature = "otel")]
324 if let Some(otel_http_endpoint) = &self.otel_http_endpoint {
325 use opentelemetry_otlp::WithHttpConfig;
326
327 let otel_metric_exporter = opentelemetry_otlp::MetricExporter::builder()
328 .with_http()
329 .with_http_client(reqwest::blocking::Client::new())
330 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
331 .with_endpoint(otel_http_endpoint.as_str())
332 .with_timeout(std::time::Duration::from_secs(5))
333 .build()
334 .map_err(|e| {
335 ServerError::from_private(e)
336 .with_public("Failed to initialize OTEL HTTP telemetry!")
337 .with_500()
338 })?;
339 let otel_metric_reader = opentelemetry_sdk::metrics::PeriodicReader::builder(otel_metric_exporter)
340 .with_interval(std::time::Duration::from_secs(5))
341 .build();
342 let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
343 .with_resource(Resource::builder().with_service_name(app_name.to_string()).build())
344 .with_reader(otel_metric_reader)
345 .build();
346 opentelemetry::global::set_meter_provider(meter_provider.clone());
347 }
348
349 let collector = registry()
350 .with(file_tracer)
351 .with(io_tracer)
352 .with(syslog_tracer)
353 .with(ecs_tracer);
354 #[cfg(feature = "otel")]
355 let collector = collector.with(otel_tracer);
356
357 tracing::subscriber::set_global_default(collector).map_err(|e| {
358 ServerError::from_private(e)
359 .with_public("Can't init global default log collector!")
360 .with_500()
361 })?;
362
363 Ok(TracingGuards {
364 file_log_guard,
365 ecs_log_guard,
366 })
367 }
368}