1mod http;
25mod queue;
26mod scheduler;
27
28#[cfg(feature = "tracing-reqwest")]
36pub use http::reqwest::ReqwestTraceMiddleware;
37
38#[cfg(feature = "tracing-axum")]
44pub use http::axum::axum_tracing_middleware;
45
46#[cfg(feature = "tracing-scheduler")]
50pub use scheduler::scheduler_tracing;
51
52#[cfg(feature = "tracing-consumer")]
56pub use queue::consume as queue_consumer_tracing;
57
58use opentelemetry::global;
59use opentelemetry::global::BoxedTracer;
60use opentelemetry::trace::{TraceContextExt, TracerProvider};
61use opentelemetry_otlp::{SpanExporter, WithExportConfig};
62use opentelemetry_sdk::propagation::TraceContextPropagator;
63use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
64use serde_json::{Map, Number, Value, json};
65use std::sync::{LazyLock, OnceLock};
66use tracing::Span;
67use tracing_opentelemetry::OpenTelemetrySpanExt;
68use tracing_subscriber::EnvFilter;
69use tracing_subscriber::fmt::format::{JsonFields, Writer};
70use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
71use tracing_subscriber::layer::SubscriberExt;
72use tracing_subscriber::util::SubscriberInitExt;
73
74static APP_NAME: OnceLock<String> = OnceLock::new();
75
76static TRACER: LazyLock<BoxedTracer> =
77 LazyLock::new(|| global::tracer(APP_NAME.get().unwrap().as_str()));
78
79pub fn init(app_name: &str, log_level: &str, log_format: LogFormat, otlp_endpoint: Option<&str>) {
82 APP_NAME.set(app_name.to_string()).unwrap();
83 global::set_text_map_propagator(TraceContextPropagator::new());
84 let mut provider_builder = SdkTracerProvider::builder();
85 match otlp_endpoint {
86 None => {}
87 Some(endpoint) => {
88 let exporter = SpanExporter::builder()
89 .with_tonic()
90 .with_endpoint(endpoint)
91 .build();
92 match exporter {
93 Ok(exporter) => {
94 provider_builder = provider_builder
95 .with_batch_exporter(exporter)
96 .with_sampler(Sampler::AlwaysOn);
97 }
98 Err(err) => {
99 tracing::error!("Failed to create OTLP exporter: {:?}", err);
100 }
101 }
102 }
103 }
104 let provider = provider_builder.build();
105 let tracer = provider.tracer(app_name.to_string());
106 global::set_tracer_provider(provider);
107
108 let telemetry_layer = tracing_opentelemetry::layer()
109 .with_tracer(tracer)
110 .with_error_records_to_exceptions(true);
111 let filter_layer = EnvFilter::builder()
112 .with_default_directive(log_level.parse().unwrap())
113 .from_env_lossy();
114 let registry = tracing_subscriber::registry()
115 .with(telemetry_layer)
116 .with(filter_layer);
117
118 match log_format {
119 LogFormat::Json => {
120 let fmt_layer = tracing_subscriber::fmt::layer()
121 .event_format(JsonTraceIdFormatter)
122 .fmt_fields(JsonFields::default());
123 registry.with(fmt_layer).init();
124 }
125 LogFormat::Line => {
126 registry.with(tracing_subscriber::fmt::layer()).init();
127 }
128 }
129}
130
131pub enum LogFormat {
132 Json,
133 Line,
134}
135
136struct JsonTraceIdFormatter;
137
138impl<S, N> FormatEvent<S, N> for JsonTraceIdFormatter
139where
140 S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
141 N: for<'a> FormatFields<'a> + 'static,
142{
143 fn format_event(
144 &self,
145 _ctx: &FmtContext<'_, S, N>,
146 mut writer: Writer<'_>,
147 event: &tracing::Event<'_>,
148 ) -> std::fmt::Result {
149 let context = Span::current().context();
151
152 let trace_id = context.span().span_context().trace_id().to_string();
153 let span_id = context.span().span_context().span_id().to_string();
154
155 let mut map = Map::new();
157 let mut visitor = SerdeMapVisitor { map: &mut map };
158
159 event.record(&mut visitor);
161
162 map.insert("trace_id".to_string(), json!(trace_id));
164 map.insert("span_id".to_string(), json!(span_id));
166 let lvl = event.metadata().level().to_string();
168 map.insert("level".to_string(), json!(lvl));
169 map.insert(
171 "target".to_string(),
172 json!(event.metadata().target().to_string()),
173 );
174 map.insert(
176 "line_number".to_string(),
177 json!(event.metadata().line().unwrap_or(0)),
178 );
179
180 map.insert(
182 "timestamp".to_string(),
183 json!(chrono::Utc::now().to_rfc3339()),
184 );
185
186 let v: Value = Value::Object(map);
188 writeln!(&mut writer, "{}", v)
189 }
190}
191
192struct SerdeMapVisitor<'a> {
193 map: &'a mut Map<String, Value>,
194}
195
196impl<'a> tracing::field::Visit for SerdeMapVisitor<'a> {
197 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
198 let n = Number::from_f64(value).unwrap_or_else(|| Number::from(0));
199 self.map.insert(field.name().to_string(), Value::Number(n));
200 }
201 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
202 self.map
203 .insert(field.name().to_string(), Value::Number(Number::from(value)));
204 }
205 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
206 self.map
207 .insert(field.name().to_string(), Value::Number(Number::from(value)));
208 }
209 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
210 self.map
211 .insert(field.name().to_string(), Value::Bool(value));
212 }
213 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
214 self.map
215 .insert(field.name().to_string(), Value::String(value.to_string()));
216 }
217 fn record_error(
218 &mut self,
219 field: &tracing::field::Field,
220 value: &(dyn std::error::Error + 'static),
221 ) {
222 self.map
223 .insert(field.name().to_string(), Value::String(value.to_string()));
224 }
225 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
226 self.map.insert(
227 field.name().to_string(),
228 Value::String(format!("{:?}", value)),
229 );
230 }
231}