Skip to main content

smith_logging/
lib.rs

1//! Structured logging infrastructure for Smith platform
2//!
3//! This crate provides high-performance, structured logging with NATS integration
4//! for centralized log collection across all Smith platform services.
5//!
6//! # Features
7//!
8//! - **NATS Integration**: Emit logs directly to NATS subjects for centralized collection
9//! - **Performance Optimized**: Asynchronous logging with batching and buffering
10//! - **Structured Format**: JSON logs with consistent fields and correlation IDs
11//! - **Configurable Filtering**: Per-module, per-level filtering with target-based rules
12//! - **Graceful Fallback**: Console logging when NATS is unavailable
13//! - **Rate Limiting**: Prevent log flooding with configurable rate limits
14//!
15//! # Usage
16//!
17//! ```rust,ignore
18//! use smith_logging::{init_logging, LoggingLayer};
19//! use smith_config::Config;
20//!
21//! // Initialize logging with NATS integration
22//! let config = Config::development();
23//! let _guard = init_logging(&config.logging, &config.nats, "smith-core").await?;
24//!
25//! // Use standard tracing macros
26//! tracing::info!("Service started");
27//! tracing::warn!("High memory usage detected");
28//! tracing::error!("Database connection failed");
29//! ```
30
31use anyhow::{Context, Result};
32use async_nats::Client as NatsClient;
33use chrono::{DateTime, Utc};
34use governor::{
35    clock::DefaultClock, middleware::NoOpMiddleware, state::direct::NotKeyed, state::InMemoryState,
36    Quota, RateLimiter,
37};
38use serde::{Deserialize, Serialize};
39use smith_bus::subjects::builders::LogSubject;
40use smith_config::{LoggingConfig, NatsConfig, NatsLoggingConfig};
41use std::collections::HashMap;
42use std::num::NonZeroU32;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::mpsc;
46use tokio::time::interval;
47use tracing::{Event, Subscriber};
48use tracing_subscriber::{
49    layer::{Context as TracingContext, SubscriberExt},
50    util::SubscriberInitExt,
51    EnvFilter, Layer, Registry,
52};
53use uuid::Uuid;
54
55pub mod error;
56pub mod metrics;
57
58pub use error::{LoggingError, LoggingResult};
59
60/// Structured log entry for NATS transmission
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct LogEntry {
63    /// Timestamp in RFC 3339 format
64    pub timestamp: DateTime<Utc>,
65
66    /// Log level (ERROR, WARN, INFO, DEBUG, TRACE)
67    pub level: String,
68
69    /// Service name (e.g., "smith-core", "smith-executor")
70    pub service: String,
71
72    /// Module target (e.g., "smith::planner")
73    pub target: String,
74
75    /// Log message
76    pub message: String,
77
78    /// Structured fields from the log event
79    pub fields: HashMap<String, serde_json::Value>,
80
81    /// Span information (if available)
82    pub span: Option<SpanInfo>,
83
84    /// Trace information (if enabled)
85    pub trace: Option<TraceInfo>,
86
87    /// Unique correlation ID for this log entry
88    pub correlation_id: String,
89
90    /// Node/instance identifier
91    pub node_id: String,
92
93    /// Additional metadata
94    pub metadata: LogMetadata,
95}
96
97/// Span information for distributed tracing
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct SpanInfo {
100    /// Span ID
101    pub id: String,
102
103    /// Parent span ID (if any)
104    pub parent_id: Option<String>,
105
106    /// Span name
107    pub name: String,
108
109    /// Span fields
110    pub fields: HashMap<String, serde_json::Value>,
111}
112
113/// Trace information
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TraceInfo {
116    /// Trace ID
117    pub id: String,
118
119    /// Trace context
120    pub context: HashMap<String, String>,
121}
122
123/// Additional log metadata
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct LogMetadata {
126    /// Source file (in debug builds)
127    pub file: Option<String>,
128
129    /// Source line (in debug builds)
130    pub line: Option<u32>,
131
132    /// Module path
133    pub module_path: Option<String>,
134
135    /// Thread ID
136    pub thread_id: Option<String>,
137
138    /// Performance category (if performance logging is enabled)
139    pub performance_category: Option<String>,
140}
141
142/// NATS logging layer for tracing-subscriber
143pub struct NatsLoggingLayer {
144    /// Service name for this logger
145    service_name: String,
146
147    /// Configuration
148    config: NatsLoggingConfig,
149
150    /// Async sender for log entries
151    log_sender: mpsc::UnboundedSender<LogEntry>,
152
153    /// Rate limiter (optional)
154    rate_limiter: Option<Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>>,
155
156    /// Node ID for this instance
157    node_id: String,
158}
159
160/// Background log processor that handles NATS publishing
161struct LogProcessor {
162    /// NATS client
163    nats_client: NatsClient,
164
165    /// Configuration
166    config: NatsLoggingConfig,
167
168    /// Log entry receiver
169    log_receiver: mpsc::UnboundedReceiver<LogEntry>,
170
171    /// Buffer for batching
172    buffer: Vec<LogEntry>,
173
174    /// Service name
175    service_name: String,
176}
177
178/// Guard that ensures proper cleanup of logging infrastructure
179pub struct LoggingGuard {
180    /// Handle to the background processor
181    _processor_handle: tokio::task::JoinHandle<Result<()>>,
182}
183
184impl Drop for LoggingGuard {
185    fn drop(&mut self) {
186        // The processor handle will be cancelled when dropped
187        tracing::debug!("Logging infrastructure shutting down");
188    }
189}
190
191impl NatsLoggingLayer {
192    /// Create a new NATS logging layer
193    pub fn new(
194        service_name: String,
195        config: NatsLoggingConfig,
196        nats_client: NatsClient,
197    ) -> Result<(Self, LoggingGuard)> {
198        let (log_sender, log_receiver) = mpsc::unbounded_channel();
199
200        // Create rate limiter if configured
201        let rate_limiter = if config.rate_limit > 0 {
202            let quota = Quota::per_second(
203                NonZeroU32::new(config.rate_limit as u32)
204                    .context("Invalid rate limit configuration")?,
205            );
206            Some(Arc::new(RateLimiter::direct(quota)))
207        } else {
208            None
209        };
210
211        // Generate node ID
212        let short_uuid = Uuid::new_v4().to_string();
213        let node_id = format!(
214            "{}_{}",
215            hostname::get().unwrap_or_default().to_string_lossy(),
216            &short_uuid[..8]
217        );
218
219        // Start background processor
220        let processor = LogProcessor {
221            nats_client,
222            config: config.clone(),
223            log_receiver,
224            buffer: Vec::with_capacity(config.batch_size),
225            service_name: service_name.clone(),
226        };
227
228        let processor_handle = tokio::spawn(async move { processor.run().await });
229
230        let layer = Self {
231            service_name,
232            config,
233            log_sender,
234            rate_limiter,
235            node_id,
236        };
237
238        let guard = LoggingGuard {
239            _processor_handle: processor_handle,
240        };
241
242        Ok((layer, guard))
243    }
244
245    /// Check if this log event should be processed
246    fn should_process(&self, event: &Event) -> bool {
247        // Check level filter
248        if let Some(ref level_filter) = self.config.level_filter {
249            let event_level = event.metadata().level();
250            let filter_level = match level_filter.as_str() {
251                "error" => tracing::Level::ERROR,
252                "warn" => tracing::Level::WARN,
253                "info" => tracing::Level::INFO,
254                "debug" => tracing::Level::DEBUG,
255                "trace" => tracing::Level::TRACE,
256                _ => return true, // Invalid filter, allow all
257            };
258
259            if *event_level > filter_level {
260                return false;
261            }
262        }
263
264        // Check target filters
265        if !self.config.target_filters.is_empty() {
266            let target = event.metadata().target();
267            let matches = self
268                .config
269                .target_filters
270                .iter()
271                .any(|filter| target.starts_with(filter));
272            if !matches {
273                return false;
274            }
275        }
276
277        // Check rate limit
278        if let Some(ref rate_limiter) = self.rate_limiter {
279            if rate_limiter.check().is_err() {
280                return false;
281            }
282        }
283
284        true
285    }
286
287    /// Convert tracing event to log entry
288    fn event_to_log_entry<S>(&self, event: &Event, ctx: &TracingContext<'_, S>) -> LogEntry
289    where
290        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
291    {
292        let metadata = event.metadata();
293
294        // Extract fields from the event
295        let mut field_visitor = FieldVisitor::new();
296        event.record(&mut field_visitor);
297
298        // Extract span information if enabled
299        let span = if self.config.include_spans {
300            ctx.event_span(event).map(|span_ref| {
301                let span_metadata = span_ref.metadata();
302                let mut span_fields = HashMap::new();
303
304                // Extract span fields (simplified implementation)
305                let span_name = span_metadata.name().to_string();
306                span_fields.insert(
307                    "span_name".to_string(),
308                    serde_json::Value::String(span_name),
309                );
310
311                SpanInfo {
312                    id: format!("{:x}", span_ref.id().into_u64()),
313                    parent_id: span_ref
314                        .parent()
315                        .map(|p| format!("{:x}", p.id().into_u64())),
316                    name: span_metadata.name().to_string(),
317                    fields: span_fields,
318                }
319            })
320        } else {
321            None
322        };
323
324        // Generate correlation ID
325        let correlation_id = Uuid::new_v4().to_string();
326
327        // Create metadata
328        let log_metadata = LogMetadata {
329            file: if cfg!(debug_assertions) {
330                metadata.file().map(|s| s.to_string())
331            } else {
332                None
333            },
334            line: if cfg!(debug_assertions) {
335                metadata.line()
336            } else {
337                None
338            },
339            module_path: metadata.module_path().map(|s| s.to_string()),
340            thread_id: Some(format!("{:?}", std::thread::current().id())),
341            performance_category: field_visitor
342                .fields
343                .get("performance_category")
344                .and_then(|v| v.as_str().map(|s| s.to_string())),
345        };
346
347        LogEntry {
348            timestamp: Utc::now(),
349            level: metadata.level().to_string().to_uppercase(),
350            service: self.service_name.clone(),
351            target: metadata.target().to_string(),
352            message: field_visitor.message,
353            fields: field_visitor.fields,
354            span,
355            trace: None, // TODO: Implement trace context extraction
356            correlation_id,
357            node_id: self.node_id.clone(),
358            metadata: log_metadata,
359        }
360    }
361}
362
363impl<S> Layer<S> for NatsLoggingLayer
364where
365    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
366{
367    fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
368        // Check if we should process this event
369        if !self.should_process(event) {
370            return;
371        }
372
373        // Convert to log entry
374        let log_entry = self.event_to_log_entry(event, &ctx);
375
376        // Send to background processor (non-blocking)
377        if self.log_sender.send(log_entry).is_err() {
378            // Channel closed, processor is shutting down
379            if self.config.fallback_to_console {
380                eprintln!(
381                    "NATS logging unavailable, log entry lost: {}",
382                    event.metadata().target()
383                );
384            }
385        }
386    }
387}
388
389/// Field visitor for extracting structured data from log events
390struct FieldVisitor {
391    message: String,
392    fields: HashMap<String, serde_json::Value>,
393}
394
395impl FieldVisitor {
396    fn new() -> Self {
397        Self {
398            message: String::new(),
399            fields: HashMap::new(),
400        }
401    }
402}
403
404impl tracing::field::Visit for FieldVisitor {
405    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
406        let value_str = format!("{:?}", value);
407
408        if field.name() == "message" {
409            self.message = value_str;
410        } else {
411            self.fields.insert(
412                field.name().to_string(),
413                serde_json::Value::String(value_str),
414            );
415        }
416    }
417
418    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
419        if field.name() == "message" {
420            self.message = value.to_string();
421        } else {
422            self.fields.insert(
423                field.name().to_string(),
424                serde_json::Value::String(value.to_string()),
425            );
426        }
427    }
428
429    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
430        self.fields.insert(
431            field.name().to_string(),
432            serde_json::Value::Number(value.into()),
433        );
434    }
435
436    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
437        self.fields.insert(
438            field.name().to_string(),
439            serde_json::Value::Number(value.into()),
440        );
441    }
442
443    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
444        self.fields
445            .insert(field.name().to_string(), serde_json::Value::Bool(value));
446    }
447}
448
449impl LogProcessor {
450    /// Main processing loop
451    async fn run(mut self) -> Result<()> {
452        let mut batch_timer = if self.config.batch_enabled {
453            Some(interval(Duration::from_millis(
454                self.config.batch_timeout_ms,
455            )))
456        } else {
457            None
458        };
459
460        loop {
461            tokio::select! {
462                // Receive log entry
463                log_entry = self.log_receiver.recv() => {
464                    match log_entry {
465                        Some(entry) => {
466                            if self.config.batch_enabled {
467                                self.buffer.push(entry);
468                                if self.buffer.len() >= self.config.batch_size {
469                                    self.flush_buffer().await?;
470                                }
471                            } else {
472                                self.publish_single(entry).await?;
473                            }
474                        }
475                        None => {
476                            // Channel closed, flush buffer and exit
477                            if !self.buffer.is_empty() {
478                                self.flush_buffer().await?;
479                            }
480                            break;
481                        }
482                    }
483                }
484
485                // Batch timeout
486                _ = async {
487                    if let Some(ref mut timer) = batch_timer {
488                        timer.tick().await;
489                    } else {
490                        std::future::pending::<()>().await;
491                    }
492                } => {
493                    if !self.buffer.is_empty() {
494                        self.flush_buffer().await?;
495                    }
496                }
497            }
498        }
499
500        Ok(())
501    }
502
503    /// Publish a single log entry
504    async fn publish_single(&self, entry: LogEntry) -> Result<()> {
505        let subject = self.get_subject_for_entry(&entry);
506        let payload = serde_json::to_vec(&entry).context("Failed to serialize log entry")?;
507
508        let timeout = Duration::from_millis(self.config.publish_timeout_ms);
509
510        for attempt in 1..=self.config.max_retries {
511            match tokio::time::timeout(
512                timeout,
513                self.nats_client
514                    .publish(subject.clone(), payload.clone().into()),
515            )
516            .await
517            {
518                Ok(Ok(_)) => return Ok(()),
519                Ok(Err(e)) => {
520                    tracing::warn!(
521                        "NATS publish failed (attempt {}/{}): {}",
522                        attempt,
523                        self.config.max_retries,
524                        e
525                    );
526                    if attempt < self.config.max_retries {
527                        tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
528                    }
529                }
530                Err(_) => {
531                    tracing::warn!(
532                        "NATS publish timeout (attempt {}/{})",
533                        attempt,
534                        self.config.max_retries
535                    );
536                    if attempt < self.config.max_retries {
537                        tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
538                    }
539                }
540            }
541        }
542
543        // All retries failed
544        if self.config.fallback_to_console {
545            eprintln!(
546                "NATS logging failed, falling back to console: {}",
547                entry.message
548            );
549        }
550
551        Err(anyhow::anyhow!(
552            "Failed to publish log entry after {} retries",
553            self.config.max_retries
554        ))
555    }
556
557    /// Flush the buffer by publishing all entries
558    async fn flush_buffer(&mut self) -> Result<()> {
559        if self.buffer.is_empty() {
560            return Ok(());
561        }
562
563        let entries =
564            std::mem::replace(&mut self.buffer, Vec::with_capacity(self.config.batch_size));
565
566        // Publish entries in parallel (but limited concurrency)
567        let chunk_size = 10; // Limit concurrent publishes
568        for chunk in entries.chunks(chunk_size) {
569            let tasks: Vec<_> = chunk
570                .iter()
571                .map(|entry| self.publish_single(entry.clone()))
572                .collect();
573
574            // Wait for all tasks in this chunk to complete
575            let results = futures::future::join_all(tasks).await;
576
577            // Log any errors (already handled in publish_single)
578            for (i, result) in results.iter().enumerate() {
579                if let Err(e) = result {
580                    tracing::debug!("Failed to publish log entry {}: {}", i, e);
581                }
582            }
583        }
584
585        Ok(())
586    }
587
588    /// Get appropriate NATS subject for a log entry
589    fn get_subject_for_entry(&self, entry: &LogEntry) -> String {
590        match entry.level.as_str() {
591            "ERROR" => LogSubject::error(&self.service_name),
592            "WARN" | "INFO" | "DEBUG" | "TRACE" => {
593                LogSubject::service(&self.service_name, &entry.level.to_lowercase())
594            }
595            _ => LogSubject::service(&self.service_name, &"unknown".to_string()),
596        }
597    }
598}
599
600/// Initialize logging with NATS integration
601pub async fn init_logging(
602    logging_config: &LoggingConfig,
603    nats_config: &NatsConfig,
604    service_name: &str,
605) -> Result<Option<LoggingGuard>> {
606    let env_filter =
607        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&logging_config.level));
608
609    let fmt_layer = tracing_subscriber::fmt::layer()
610        .with_target(true)
611        .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc_3339())
612        .with_level(true);
613
614    let registry = Registry::default().with(env_filter).with(fmt_layer);
615
616    // Add NATS layer if enabled
617    if logging_config.nats.enabled {
618        // Connect to NATS
619        let nats_client = async_nats::connect(&nats_config.url)
620            .await
621            .context("Failed to connect to NATS for logging")?;
622
623        let (nats_layer, guard) = NatsLoggingLayer::new(
624            service_name.to_string(),
625            logging_config.nats.clone(),
626            nats_client,
627        )?;
628
629        match registry.with(nats_layer).try_init() {
630            Ok(()) => {
631                tracing::info!(
632                    "Logging initialized with NATS integration for service: {}",
633                    service_name
634                );
635                Ok(Some(guard))
636            }
637            Err(err) => {
638                tracing::warn!(
639                    "Logging already initialized, skipping duplicate subscriber: {}",
640                    err
641                );
642                // Drop guard so the background processor shuts down cleanly.
643                drop(guard);
644                Ok(None)
645            }
646        }
647    } else {
648        match registry.try_init() {
649            Ok(()) => {
650                tracing::info!(
651                    "Logging initialized without NATS integration for service: {}",
652                    service_name
653                );
654            }
655            Err(err) => {
656                tracing::warn!(
657                    "Logging already initialized, skipping duplicate subscriber: {}",
658                    err
659                );
660            }
661        }
662        Ok(None)
663    }
664}
665
666/// Simplified initialization for testing
667pub fn init_console_logging(level: &str) -> Result<()> {
668    let env_filter = EnvFilter::new(level);
669    let builder = tracing_subscriber::fmt()
670        .with_env_filter(env_filter)
671        .with_target(true)
672        .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc_3339())
673        .with_level(true);
674
675    // Ignore error if a subscriber is already set; this happens in test environments.
676    let _ = builder.try_init();
677
678    Ok(())
679}
680
681// Re-export for convenience
682pub use tracing::{debug, error, info, trace, warn};