drasi_lib/managers/tracing_layer.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tracing layer for routing logs to component-specific streams.
16//!
17//! This module provides a custom `tracing_subscriber::Layer` that captures log events
18//! and routes them to the appropriate component's log stream based on span context.
19//!
20//! # How It Works
21//!
22//! 1. Components create tracing spans with `component_id` and `component_type` attributes
23//! 2. Log events (from `tracing::info!()` or `log::info!()` via bridge) occur within these spans
24//! 3. `ComponentLogLayer` extracts the component info from the span hierarchy
25//! 4. Logs are routed synchronously to `ComponentLogRegistry` for storage and broadcast
26//!
27//! # Global Registry
28//!
29//! Since `tracing` uses a single global subscriber per process, we use a shared global
30//! `ComponentLogRegistry` that all `DrasiLib` instances can access. Call `get_or_init_global_registry()`
31//! to get the shared registry, which will be initialized on first use.
32//!
33//! # Example
34//!
35//! ```ignore
36//! use tracing::Instrument;
37//!
38//! // Create a span for the component
39//! let span = tracing::info_span!(
40//! "source",
41//! component_id = %source_id,
42//! component_type = "source"
43//! );
44//!
45//! // Run code within the span - logs are automatically routed
46//! async {
47//! tracing::info!("Starting source");
48//! // or log::info!("Starting source"); - works via tracing-log bridge
49//! }.instrument(span).await;
50//! ```
51
52use std::sync::Arc;
53
54use tokio::sync::mpsc;
55use tracing::field::{Field, Visit};
56use tracing::{Event, Subscriber};
57use tracing_subscriber::layer::Context;
58use tracing_subscriber::registry::LookupSpan;
59use tracing_subscriber::Layer;
60
61use super::component_log::{ComponentLogRegistry, LogLevel, LogMessage};
62use crate::channels::ComponentType;
63
64use std::sync::OnceLock;
65
66/// Default capacity for the log message channel.
67/// This provides backpressure when logging volume is high.
68const LOG_CHANNEL_CAPACITY: usize = 10_000;
69
70/// Global log registry shared by all DrasiLib instances.
71/// Since tracing uses a single global subscriber, we need a single shared registry.
72static GLOBAL_LOG_REGISTRY: OnceLock<Arc<ComponentLogRegistry>> = OnceLock::new();
73
74/// Global sender for the log worker. Initialized alongside the registry.
75static GLOBAL_LOG_SENDER: OnceLock<mpsc::Sender<LogMessage>> = OnceLock::new();
76
77/// Get or initialize the shared global log registry.
78///
79/// This returns a shared registry that all DrasiLib instances use. The tracing
80/// subscriber is global (per-process), so all logs from all DrasiLib instances
81/// go to the same registry.
82///
83/// On first call, this initializes the tracing subscriber with the registry
84/// and spawns a background worker to process log messages.
85pub fn get_or_init_global_registry() -> Arc<ComponentLogRegistry> {
86 GLOBAL_LOG_REGISTRY
87 .get_or_init(|| {
88 let registry = Arc::new(ComponentLogRegistry::new());
89
90 // Create bounded channel for log messages
91 let (tx, rx) = mpsc::channel::<LogMessage>(LOG_CHANNEL_CAPACITY);
92
93 // Store sender globally for the tracing layer to use
94 let _ = GLOBAL_LOG_SENDER.set(tx);
95
96 // Spawn the log worker in a dedicated thread with its own runtime.
97 // This ensures the worker is independent of any caller's runtime.
98 spawn_log_worker(registry.clone(), rx);
99
100 // Initialize tracing subscriber
101 init_tracing_internal(registry.clone());
102
103 registry
104 })
105 .clone()
106}
107
108/// Spawn the background worker that processes log messages.
109///
110/// This worker drains the channel and writes logs to the registry.
111/// Uses a dedicated thread with its own tokio runtime to ensure
112/// independence from the caller's async context.
113fn spawn_log_worker(registry: Arc<ComponentLogRegistry>, mut rx: mpsc::Receiver<LogMessage>) {
114 std::thread::Builder::new()
115 .name("drasi-log-worker".to_string())
116 .spawn(move || {
117 let rt = tokio::runtime::Builder::new_current_thread()
118 .enable_all()
119 .build()
120 .expect("Failed to create log worker runtime");
121
122 rt.block_on(async move {
123 while let Some(message) = rx.recv().await {
124 registry.log(message).await;
125 }
126 });
127 })
128 .expect("Failed to spawn log worker thread");
129}
130
131/// Initialize the tracing subscriber with component log routing.
132///
133/// This sets up:
134/// - `ComponentLogLayer` for routing logs to component-specific streams
135/// - `fmt::layer()` for console output
136/// - `EnvFilter` for level control via `RUST_LOG` environment variable
137/// - `tracing-log` bridge for `log` crate compatibility
138///
139/// # Arguments
140///
141/// * `log_registry` - The registry to route component logs to
142///
143/// # Example
144///
145/// ```ignore
146/// use drasi_lib::managers::ComponentLogRegistry;
147/// use std::sync::Arc;
148///
149/// let log_registry = Arc::new(ComponentLogRegistry::new());
150/// drasi_lib::init_tracing(log_registry.clone());
151///
152/// // Now both tracing::info!() and log::info!() work
153/// tracing::info!("Hello from tracing");
154/// log::info!("Hello from log crate");
155/// ```
156///
157/// # Note
158///
159/// If another `log` crate logger was initialized before calling this function,
160/// `log::info!()` calls will go to that logger instead. However, `tracing::info!()`
161/// calls will still be captured correctly.
162///
163/// # Deprecated
164///
165/// Prefer using `get_or_init_global_registry()` which handles initialization automatically.
166/// This function is kept for backward compatibility.
167pub fn init_tracing(log_registry: Arc<ComponentLogRegistry>) {
168 // Ensure global registry is initialized (which sets up the channel worker)
169 let _ = get_or_init_global_registry();
170
171 // If caller provided a different registry, warn them
172 // (can't actually use it since tracing subscriber is already set)
173 if !Arc::ptr_eq(&log_registry, &get_or_init_global_registry()) {
174 tracing::warn!(
175 "init_tracing called with custom registry, but global registry already initialized. \
176 The provided registry will be ignored. Use get_or_init_global_registry() instead."
177 );
178 }
179}
180
181/// Internal initialization - sets up tracing subscriber without channel/worker.
182fn init_tracing_internal(log_registry: Arc<ComponentLogRegistry>) {
183 use tracing_subscriber::prelude::*;
184 use tracing_subscriber::{fmt, EnvFilter};
185
186 // Try to install the log->tracing bridge
187 // Use init() which returns Result, ignore error if logger already set
188 let _ = tracing_log::LogTracer::init();
189
190 // Build the subscriber with our custom layer
191 // Use RUST_LOG if set, otherwise default to INFO level
192 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
193
194 let subscriber = tracing_subscriber::registry()
195 .with(filter)
196 .with(ComponentLogLayer::new(log_registry))
197 .with(fmt::layer().with_target(true).with_level(true));
198
199 // Try to set as the global subscriber
200 // Use try_init to handle case where subscriber is already set
201 let _ = tracing::subscriber::set_global_default(subscriber);
202}
203
204/// Try to initialize tracing, returning whether initialization succeeded.
205///
206/// Unlike `init_tracing()`, this returns `false` if a subscriber is already set,
207/// allowing callers to handle this case.
208///
209/// # Deprecated
210///
211/// Prefer using `get_or_init_global_registry()` which handles initialization automatically.
212pub fn try_init_tracing(log_registry: Arc<ComponentLogRegistry>) -> bool {
213 // Check if already initialized
214 if GLOBAL_LOG_REGISTRY.get().is_some() {
215 return false;
216 }
217
218 // Initialize via the standard path
219 let _ = get_or_init_global_registry();
220
221 // Warn if caller's registry differs
222 if !Arc::ptr_eq(&log_registry, &get_or_init_global_registry()) {
223 tracing::warn!(
224 "try_init_tracing called with custom registry, but initialization uses global registry. \
225 The provided registry will be ignored."
226 );
227 }
228
229 true
230}
231
232/// Tracing layer that routes log events to component-specific streams.
233///
234/// This layer intercepts all tracing events and checks if they occur within
235/// a span that has `component_id` and `component_type` attributes. If so,
236/// the log is routed to that component's log stream in the registry.
237pub struct ComponentLogLayer {
238 registry: Arc<ComponentLogRegistry>,
239}
240
241impl ComponentLogLayer {
242 /// Create a new layer with the given log registry.
243 pub fn new(registry: Arc<ComponentLogRegistry>) -> Self {
244 Self { registry }
245 }
246}
247
248impl<S> Layer<S> for ComponentLogLayer
249where
250 S: Subscriber + for<'a> LookupSpan<'a>,
251{
252 fn on_new_span(
253 &self,
254 attrs: &tracing::span::Attributes<'_>,
255 id: &tracing::span::Id,
256 ctx: Context<'_, S>,
257 ) {
258 // Extract component info from span attributes and cache in extensions
259 let mut visitor = ComponentInfoVisitor::default();
260 attrs.record(&mut visitor);
261
262 if let Some(info) = visitor.into_component_info() {
263 if let Some(span) = ctx.span(id) {
264 let mut extensions = span.extensions_mut();
265 extensions.insert(info);
266 }
267 }
268 }
269
270 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
271 // Try to find component context from current or parent spans
272 let component_info = ctx.event_span(event).and_then(|span| {
273 // Walk up the span tree to find component info
274 let mut current = Some(span);
275 while let Some(span_ref) = current {
276 if let Some(info) = extract_component_info(&span_ref) {
277 return Some(info);
278 }
279 current = span_ref.parent();
280 }
281 None
282 });
283
284 // If we found component context, route the log
285 if let Some(info) = component_info {
286 let level = convert_level(*event.metadata().level());
287 let message = extract_message(event);
288
289 let log_message = LogMessage::with_instance(
290 level,
291 message,
292 info.instance_id,
293 info.component_id,
294 info.component_type,
295 );
296
297 // Send to the log worker via bounded channel
298 // This provides backpressure instead of spawning unbounded tasks
299 if let Some(sender) = GLOBAL_LOG_SENDER.get() {
300 // Use try_send to avoid blocking in the tracing layer
301 // If channel is full, log is dropped (better than OOM)
302 if sender.try_send(log_message).is_err() {
303 // Channel full or closed - log still goes to console via fmt layer
304 // Could add a metric here for monitoring dropped logs
305 }
306 }
307 }
308 }
309}
310
311/// Component info stored in span extensions.
312#[derive(Clone)]
313struct ComponentInfo {
314 instance_id: String,
315 component_id: String,
316 component_type: ComponentType,
317}
318
319/// Extract component info from a span's cached extensions.
320fn extract_component_info<S>(
321 span: &tracing_subscriber::registry::SpanRef<'_, S>,
322) -> Option<ComponentInfo>
323where
324 S: Subscriber + for<'a> LookupSpan<'a>,
325{
326 // Component info is cached in span extensions during on_new_span
327 let extensions = span.extensions();
328 extensions.get::<ComponentInfo>().cloned()
329}
330
331/// Visitor for extracting component info from span/event fields.
332#[derive(Default)]
333struct ComponentInfoVisitor {
334 instance_id: Option<String>,
335 component_id: Option<String>,
336 component_type: Option<String>,
337}
338
339impl Visit for ComponentInfoVisitor {
340 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
341 match field.name() {
342 "instance_id" => {
343 self.instance_id = Some(format!("{value:?}").trim_matches('"').to_string())
344 }
345 "component_id" => {
346 self.component_id = Some(format!("{value:?}").trim_matches('"').to_string())
347 }
348 "component_type" => {
349 self.component_type = Some(format!("{value:?}").trim_matches('"').to_string())
350 }
351 _ => {}
352 }
353 }
354
355 fn record_str(&mut self, field: &Field, value: &str) {
356 match field.name() {
357 "instance_id" => self.instance_id = Some(value.to_string()),
358 "component_id" => self.component_id = Some(value.to_string()),
359 "component_type" => self.component_type = Some(value.to_string()),
360 _ => {}
361 }
362 }
363}
364
365impl ComponentInfoVisitor {
366 fn into_component_info(self) -> Option<ComponentInfo> {
367 let component_id = self.component_id?;
368 let component_type = self
369 .component_type
370 .as_deref()
371 .and_then(parse_component_type)?;
372 Some(ComponentInfo {
373 instance_id: self.instance_id.unwrap_or_default(),
374 component_id,
375 component_type,
376 })
377 }
378}
379
380/// Parse a component type string into a ComponentType enum.
381fn parse_component_type(s: &str) -> Option<ComponentType> {
382 match s.to_lowercase().as_str() {
383 "source" => Some(ComponentType::Source),
384 "query" => Some(ComponentType::Query),
385 "reaction" => Some(ComponentType::Reaction),
386 _ => None,
387 }
388}
389
390/// Visitor for extracting the message from an event.
391#[derive(Default)]
392struct MessageVisitor {
393 message: Option<String>,
394 fields: Vec<String>,
395}
396
397impl Visit for MessageVisitor {
398 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
399 if field.name() == "message" {
400 self.message = Some(format!("{value:?}").trim_matches('"').to_string());
401 } else {
402 self.fields.push(format!("{}={value:?}", field.name()));
403 }
404 }
405
406 fn record_str(&mut self, field: &Field, value: &str) {
407 if field.name() == "message" {
408 self.message = Some(value.to_string());
409 } else {
410 self.fields.push(format!("{}={}", field.name(), value));
411 }
412 }
413}
414
415/// Extract the message from a tracing event.
416fn extract_message(event: &Event<'_>) -> String {
417 let mut visitor = MessageVisitor::default();
418 event.record(&mut visitor);
419
420 if let Some(msg) = visitor.message {
421 msg
422 } else if !visitor.fields.is_empty() {
423 visitor.fields.join(", ")
424 } else {
425 // Fallback: use the event metadata name
426 event.metadata().name().to_string()
427 }
428}
429
430/// Convert tracing Level to our LogLevel.
431fn convert_level(level: tracing::Level) -> LogLevel {
432 match level {
433 tracing::Level::ERROR => LogLevel::Error,
434 tracing::Level::WARN => LogLevel::Warn,
435 tracing::Level::INFO => LogLevel::Info,
436 tracing::Level::DEBUG => LogLevel::Debug,
437 tracing::Level::TRACE => LogLevel::Trace,
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444
445 #[test]
446 fn test_parse_component_type() {
447 assert_eq!(parse_component_type("source"), Some(ComponentType::Source));
448 assert_eq!(parse_component_type("Source"), Some(ComponentType::Source));
449 assert_eq!(parse_component_type("SOURCE"), Some(ComponentType::Source));
450 assert_eq!(parse_component_type("query"), Some(ComponentType::Query));
451 assert_eq!(
452 parse_component_type("reaction"),
453 Some(ComponentType::Reaction)
454 );
455 assert_eq!(parse_component_type("unknown"), None);
456 }
457
458 #[test]
459 fn test_convert_level() {
460 assert_eq!(convert_level(tracing::Level::ERROR), LogLevel::Error);
461 assert_eq!(convert_level(tracing::Level::WARN), LogLevel::Warn);
462 assert_eq!(convert_level(tracing::Level::INFO), LogLevel::Info);
463 assert_eq!(convert_level(tracing::Level::DEBUG), LogLevel::Debug);
464 assert_eq!(convert_level(tracing::Level::TRACE), LogLevel::Trace);
465 }
466}