1use anyhow::{Context, Result};
32use async_nats::Client as NatsClient;
33use chrono::{DateTime, Utc};
34use governor::{
35 clock::DefaultClock, middleware::NoOpMiddleware, state::direct::NotKeyed, state::InMemoryState,
36 Quota, RateLimiter,
37};
38use serde::{Deserialize, Serialize};
39use smith_bus::subjects::builders::LogSubject;
40use smith_config::{LoggingConfig, NatsConfig, NatsLoggingConfig};
41use std::collections::HashMap;
42use std::num::NonZeroU32;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::mpsc;
46use tokio::time::interval;
47use tracing::{Event, Subscriber};
48use tracing_subscriber::{
49 layer::{Context as TracingContext, SubscriberExt},
50 util::SubscriberInitExt,
51 EnvFilter, Layer, Registry,
52};
53use uuid::Uuid;
54
55pub mod error;
56pub mod metrics;
57
58pub use error::{LoggingError, LoggingResult};
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct LogEntry {
63 pub timestamp: DateTime<Utc>,
65
66 pub level: String,
68
69 pub service: String,
71
72 pub target: String,
74
75 pub message: String,
77
78 pub fields: HashMap<String, serde_json::Value>,
80
81 pub span: Option<SpanInfo>,
83
84 pub trace: Option<TraceInfo>,
86
87 pub correlation_id: String,
89
90 pub node_id: String,
92
93 pub metadata: LogMetadata,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct SpanInfo {
100 pub id: String,
102
103 pub parent_id: Option<String>,
105
106 pub name: String,
108
109 pub fields: HashMap<String, serde_json::Value>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TraceInfo {
116 pub id: String,
118
119 pub context: HashMap<String, String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct LogMetadata {
126 pub file: Option<String>,
128
129 pub line: Option<u32>,
131
132 pub module_path: Option<String>,
134
135 pub thread_id: Option<String>,
137
138 pub performance_category: Option<String>,
140}
141
142pub struct NatsLoggingLayer {
144 service_name: String,
146
147 config: NatsLoggingConfig,
149
150 log_sender: mpsc::UnboundedSender<LogEntry>,
152
153 rate_limiter: Option<Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>>,
155
156 node_id: String,
158}
159
160struct LogProcessor {
162 nats_client: NatsClient,
164
165 config: NatsLoggingConfig,
167
168 log_receiver: mpsc::UnboundedReceiver<LogEntry>,
170
171 buffer: Vec<LogEntry>,
173
174 service_name: String,
176}
177
178pub struct LoggingGuard {
180 _processor_handle: tokio::task::JoinHandle<Result<()>>,
182}
183
184impl Drop for LoggingGuard {
185 fn drop(&mut self) {
186 tracing::debug!("Logging infrastructure shutting down");
188 }
189}
190
191impl NatsLoggingLayer {
192 pub fn new(
194 service_name: String,
195 config: NatsLoggingConfig,
196 nats_client: NatsClient,
197 ) -> Result<(Self, LoggingGuard)> {
198 let (log_sender, log_receiver) = mpsc::unbounded_channel();
199
200 let rate_limiter = if config.rate_limit > 0 {
202 let quota = Quota::per_second(
203 NonZeroU32::new(config.rate_limit as u32)
204 .context("Invalid rate limit configuration")?,
205 );
206 Some(Arc::new(RateLimiter::direct(quota)))
207 } else {
208 None
209 };
210
211 let short_uuid = Uuid::new_v4().to_string();
213 let node_id = format!(
214 "{}_{}",
215 hostname::get().unwrap_or_default().to_string_lossy(),
216 &short_uuid[..8]
217 );
218
219 let processor = LogProcessor {
221 nats_client,
222 config: config.clone(),
223 log_receiver,
224 buffer: Vec::with_capacity(config.batch_size),
225 service_name: service_name.clone(),
226 };
227
228 let processor_handle = tokio::spawn(async move { processor.run().await });
229
230 let layer = Self {
231 service_name,
232 config,
233 log_sender,
234 rate_limiter,
235 node_id,
236 };
237
238 let guard = LoggingGuard {
239 _processor_handle: processor_handle,
240 };
241
242 Ok((layer, guard))
243 }
244
245 fn should_process(&self, event: &Event) -> bool {
247 if let Some(ref level_filter) = self.config.level_filter {
249 let event_level = event.metadata().level();
250 let filter_level = match level_filter.as_str() {
251 "error" => tracing::Level::ERROR,
252 "warn" => tracing::Level::WARN,
253 "info" => tracing::Level::INFO,
254 "debug" => tracing::Level::DEBUG,
255 "trace" => tracing::Level::TRACE,
256 _ => return true, };
258
259 if *event_level > filter_level {
260 return false;
261 }
262 }
263
264 if !self.config.target_filters.is_empty() {
266 let target = event.metadata().target();
267 let matches = self
268 .config
269 .target_filters
270 .iter()
271 .any(|filter| target.starts_with(filter));
272 if !matches {
273 return false;
274 }
275 }
276
277 if let Some(ref rate_limiter) = self.rate_limiter {
279 if rate_limiter.check().is_err() {
280 return false;
281 }
282 }
283
284 true
285 }
286
287 fn event_to_log_entry<S>(&self, event: &Event, ctx: &TracingContext<'_, S>) -> LogEntry
289 where
290 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
291 {
292 let metadata = event.metadata();
293
294 let mut field_visitor = FieldVisitor::new();
296 event.record(&mut field_visitor);
297
298 let span = if self.config.include_spans {
300 ctx.event_span(event).map(|span_ref| {
301 let span_metadata = span_ref.metadata();
302 let mut span_fields = HashMap::new();
303
304 let span_name = span_metadata.name().to_string();
306 span_fields.insert(
307 "span_name".to_string(),
308 serde_json::Value::String(span_name),
309 );
310
311 SpanInfo {
312 id: format!("{:x}", span_ref.id().into_u64()),
313 parent_id: span_ref
314 .parent()
315 .map(|p| format!("{:x}", p.id().into_u64())),
316 name: span_metadata.name().to_string(),
317 fields: span_fields,
318 }
319 })
320 } else {
321 None
322 };
323
324 let correlation_id = Uuid::new_v4().to_string();
326
327 let log_metadata = LogMetadata {
329 file: if cfg!(debug_assertions) {
330 metadata.file().map(|s| s.to_string())
331 } else {
332 None
333 },
334 line: if cfg!(debug_assertions) {
335 metadata.line()
336 } else {
337 None
338 },
339 module_path: metadata.module_path().map(|s| s.to_string()),
340 thread_id: Some(format!("{:?}", std::thread::current().id())),
341 performance_category: field_visitor
342 .fields
343 .get("performance_category")
344 .and_then(|v| v.as_str().map(|s| s.to_string())),
345 };
346
347 LogEntry {
348 timestamp: Utc::now(),
349 level: metadata.level().to_string().to_uppercase(),
350 service: self.service_name.clone(),
351 target: metadata.target().to_string(),
352 message: field_visitor.message,
353 fields: field_visitor.fields,
354 span,
355 trace: None, correlation_id,
357 node_id: self.node_id.clone(),
358 metadata: log_metadata,
359 }
360 }
361}
362
363impl<S> Layer<S> for NatsLoggingLayer
364where
365 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
366{
367 fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
368 if !self.should_process(event) {
370 return;
371 }
372
373 let log_entry = self.event_to_log_entry(event, &ctx);
375
376 if self.log_sender.send(log_entry).is_err() {
378 if self.config.fallback_to_console {
380 eprintln!(
381 "NATS logging unavailable, log entry lost: {}",
382 event.metadata().target()
383 );
384 }
385 }
386 }
387}
388
389struct FieldVisitor {
391 message: String,
392 fields: HashMap<String, serde_json::Value>,
393}
394
395impl FieldVisitor {
396 fn new() -> Self {
397 Self {
398 message: String::new(),
399 fields: HashMap::new(),
400 }
401 }
402}
403
404impl tracing::field::Visit for FieldVisitor {
405 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
406 let value_str = format!("{:?}", value);
407
408 if field.name() == "message" {
409 self.message = value_str;
410 } else {
411 self.fields.insert(
412 field.name().to_string(),
413 serde_json::Value::String(value_str),
414 );
415 }
416 }
417
418 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
419 if field.name() == "message" {
420 self.message = value.to_string();
421 } else {
422 self.fields.insert(
423 field.name().to_string(),
424 serde_json::Value::String(value.to_string()),
425 );
426 }
427 }
428
429 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
430 self.fields.insert(
431 field.name().to_string(),
432 serde_json::Value::Number(value.into()),
433 );
434 }
435
436 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
437 self.fields.insert(
438 field.name().to_string(),
439 serde_json::Value::Number(value.into()),
440 );
441 }
442
443 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
444 self.fields
445 .insert(field.name().to_string(), serde_json::Value::Bool(value));
446 }
447}
448
449impl LogProcessor {
450 async fn run(mut self) -> Result<()> {
452 let mut batch_timer = if self.config.batch_enabled {
453 Some(interval(Duration::from_millis(
454 self.config.batch_timeout_ms,
455 )))
456 } else {
457 None
458 };
459
460 loop {
461 tokio::select! {
462 log_entry = self.log_receiver.recv() => {
464 match log_entry {
465 Some(entry) => {
466 if self.config.batch_enabled {
467 self.buffer.push(entry);
468 if self.buffer.len() >= self.config.batch_size {
469 self.flush_buffer().await?;
470 }
471 } else {
472 self.publish_single(entry).await?;
473 }
474 }
475 None => {
476 if !self.buffer.is_empty() {
478 self.flush_buffer().await?;
479 }
480 break;
481 }
482 }
483 }
484
485 _ = async {
487 if let Some(ref mut timer) = batch_timer {
488 timer.tick().await;
489 } else {
490 std::future::pending::<()>().await;
491 }
492 } => {
493 if !self.buffer.is_empty() {
494 self.flush_buffer().await?;
495 }
496 }
497 }
498 }
499
500 Ok(())
501 }
502
503 async fn publish_single(&self, entry: LogEntry) -> Result<()> {
505 let subject = self.get_subject_for_entry(&entry);
506 let payload = serde_json::to_vec(&entry).context("Failed to serialize log entry")?;
507
508 let timeout = Duration::from_millis(self.config.publish_timeout_ms);
509
510 for attempt in 1..=self.config.max_retries {
511 match tokio::time::timeout(
512 timeout,
513 self.nats_client
514 .publish(subject.clone(), payload.clone().into()),
515 )
516 .await
517 {
518 Ok(Ok(_)) => return Ok(()),
519 Ok(Err(e)) => {
520 tracing::warn!(
521 "NATS publish failed (attempt {}/{}): {}",
522 attempt,
523 self.config.max_retries,
524 e
525 );
526 if attempt < self.config.max_retries {
527 tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
528 }
529 }
530 Err(_) => {
531 tracing::warn!(
532 "NATS publish timeout (attempt {}/{})",
533 attempt,
534 self.config.max_retries
535 );
536 if attempt < self.config.max_retries {
537 tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
538 }
539 }
540 }
541 }
542
543 if self.config.fallback_to_console {
545 eprintln!(
546 "NATS logging failed, falling back to console: {}",
547 entry.message
548 );
549 }
550
551 Err(anyhow::anyhow!(
552 "Failed to publish log entry after {} retries",
553 self.config.max_retries
554 ))
555 }
556
557 async fn flush_buffer(&mut self) -> Result<()> {
559 if self.buffer.is_empty() {
560 return Ok(());
561 }
562
563 let entries =
564 std::mem::replace(&mut self.buffer, Vec::with_capacity(self.config.batch_size));
565
566 let chunk_size = 10; for chunk in entries.chunks(chunk_size) {
569 let tasks: Vec<_> = chunk
570 .iter()
571 .map(|entry| self.publish_single(entry.clone()))
572 .collect();
573
574 let results = futures::future::join_all(tasks).await;
576
577 for (i, result) in results.iter().enumerate() {
579 if let Err(e) = result {
580 tracing::debug!("Failed to publish log entry {}: {}", i, e);
581 }
582 }
583 }
584
585 Ok(())
586 }
587
588 fn get_subject_for_entry(&self, entry: &LogEntry) -> String {
590 match entry.level.as_str() {
591 "ERROR" => LogSubject::error(&self.service_name),
592 "WARN" | "INFO" | "DEBUG" | "TRACE" => {
593 LogSubject::service(&self.service_name, &entry.level.to_lowercase())
594 }
595 _ => LogSubject::service(&self.service_name, &"unknown".to_string()),
596 }
597 }
598}
599
600pub async fn init_logging(
602 logging_config: &LoggingConfig,
603 nats_config: &NatsConfig,
604 service_name: &str,
605) -> Result<Option<LoggingGuard>> {
606 let env_filter =
607 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&logging_config.level));
608
609 let fmt_layer = tracing_subscriber::fmt::layer()
610 .with_target(true)
611 .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc_3339())
612 .with_level(true);
613
614 let registry = Registry::default().with(env_filter).with(fmt_layer);
615
616 if logging_config.nats.enabled {
618 let nats_client = async_nats::connect(&nats_config.url)
620 .await
621 .context("Failed to connect to NATS for logging")?;
622
623 let (nats_layer, guard) = NatsLoggingLayer::new(
624 service_name.to_string(),
625 logging_config.nats.clone(),
626 nats_client,
627 )?;
628
629 match registry.with(nats_layer).try_init() {
630 Ok(()) => {
631 tracing::info!(
632 "Logging initialized with NATS integration for service: {}",
633 service_name
634 );
635 Ok(Some(guard))
636 }
637 Err(err) => {
638 tracing::warn!(
639 "Logging already initialized, skipping duplicate subscriber: {}",
640 err
641 );
642 drop(guard);
644 Ok(None)
645 }
646 }
647 } else {
648 match registry.try_init() {
649 Ok(()) => {
650 tracing::info!(
651 "Logging initialized without NATS integration for service: {}",
652 service_name
653 );
654 }
655 Err(err) => {
656 tracing::warn!(
657 "Logging already initialized, skipping duplicate subscriber: {}",
658 err
659 );
660 }
661 }
662 Ok(None)
663 }
664}
665
666pub fn init_console_logging(level: &str) -> Result<()> {
668 let env_filter = EnvFilter::new(level);
669 let builder = tracing_subscriber::fmt()
670 .with_env_filter(env_filter)
671 .with_target(true)
672 .with_timer(tracing_subscriber::fmt::time::ChronoUtc::rfc_3339())
673 .with_level(true);
674
675 let _ = builder.try_init();
677
678 Ok(())
679}
680
681pub use tracing::{debug, error, info, trace, warn};