tracing-better-stack 0.1.0

A tracing-subscriber layer for Better Stack (Logtail) logging
Documentation
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;

use crate::batch::BatchProcessor;
use crate::config::BetterStackConfig;
use crate::log_event::{LogEvent, LogValue};

/// A tracing layer that sends log events to Better Stack.
///
/// `BetterStackLayer` implements the `tracing_subscriber::Layer` trait and can be
/// used with any tracing subscriber to automatically send structured log events
/// to Better Stack's log management platform.
///
/// The layer batches events for efficient transmission and handles retries
/// automatically based on the provided configuration.
///
/// # Example
///
/// ```rust,no_run
/// use tracing_subscriber::prelude::*;
/// use tracing_better_stack::{BetterStackLayer, BetterStackConfig};
///
/// let layer = BetterStackLayer::new(
///     BetterStackConfig::builder("s1234567.us-east-9.betterstackdata.com", "source_token")
///         .batch_size(100)
///         .build()
/// );
///
/// tracing_subscriber::registry()
///     .with(layer)
///     .init();
/// ```
pub struct BetterStackLayer {
    /// Channel sender for dispatching events to the batch processor.
    sender: Arc<Mutex<Option<UnboundedSender<LogEvent>>>>,
    /// Configuration for Better Stack integration.
    config: Arc<BetterStackConfig>,
}

impl BetterStackLayer {
    /// Creates a new Better Stack layer with the specified configuration.
    ///
    /// The layer will lazily initialize its background processor on the first
    /// log event when a Tokio runtime is available.
    ///
    /// # Arguments
    ///
    /// * `config` - Better Stack configuration specifying endpoint, credentials, and batching behavior
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use tracing_better_stack::{BetterStackLayer, BetterStackConfig};
    ///
    /// let config = BetterStackConfig::builder("s1234567.us-east-9.betterstackdata.com", "your-source-token")
    ///     .batch_size(200)
    ///     .build();
    /// let layer = BetterStackLayer::new(config);
    /// ```
    pub fn new(config: BetterStackConfig) -> Self {
        Self {
            sender: Arc::new(Mutex::new(None)),
            config: Arc::new(config),
        }
    }

    /// Creates a configuration builder for Better Stack.
    ///
    /// This is a convenience method that creates a `BetterStackConfigBuilder`
    /// for configuring the layer with a fluent API.
    ///
    /// # Arguments
    ///
    /// * `ingesting_host` - The Better Stack ingesting host for your source
    /// * `source_token` - Your Better Stack source token for authentication
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use tracing_subscriber::prelude::*;
    /// use tracing_better_stack::BetterStackLayer;
    /// use std::time::Duration;
    ///
    /// let config = BetterStackLayer::builder("s1234567.us-east-9.betterstackdata.com", "your-source-token")
    ///     .batch_size(200)
    ///     .batch_timeout(Duration::from_secs(10))
    ///     .include_spans(false)
    ///     .build();
    /// let layer = BetterStackLayer::new(config);
    ///
    /// tracing_subscriber::registry()
    ///     .with(layer)
    ///     .init();
    /// ```
    pub fn builder(
        ingesting_host: impl Into<String>,
        source_token: impl Into<String>,
    ) -> crate::config::BetterStackConfigBuilder {
        BetterStackConfig::builder(ingesting_host, source_token)
    }

    /// Ensures the background batch processor is initialized.
    ///
    /// This method lazily initializes the batch processor on first use,
    /// spawning it on the current Tokio runtime if available.
    fn ensure_initialized(&self) {
        let mut sender = self.sender.lock().unwrap();
        if sender.is_none()
            && let Ok(handle) = tokio::runtime::Handle::try_current()
        {
            let (tx, rx) = unbounded_channel();
            let config = Arc::clone(&self.config);

            handle.spawn(async move {
                let processor = BatchProcessor::new(rx, (*config).clone());
                processor.run().await;
            });

            *sender = Some(tx);
        }
    }

    /// Sends a log event to the batch processor.
    ///
    /// This method ensures the processor is initialized and then sends
    /// the event through the channel. If the channel is closed, an error
    /// is logged to stderr.
    fn send_event(&self, event: LogEvent) {
        self.ensure_initialized();

        if let Some(tx) = self.sender.lock().unwrap().as_ref()
            && tx.send(event).is_err()
        {
            eprintln!("Failed to send event to Better Stack: channel closed");
        }
    }
}

impl<S> Layer<S> for BetterStackLayer
where
    S: Subscriber + for<'a> LookupSpan<'a>,
{
    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
        let mut fields = HashMap::new();
        let mut visitor = FieldVisitor::new(&mut fields);
        event.record(&mut visitor);

        let level = match *event.metadata().level() {
            Level::ERROR => "error",
            Level::WARN => "warn",
            Level::INFO => "info",
            Level::DEBUG => "debug",
            Level::TRACE => "trace",
        };

        // Extract message from fields if present
        let message = fields
            .remove("message")
            .and_then(|v| match v {
                LogValue::String(s) => Some(s),
                _ => None,
            })
            .unwrap_or_default();

        // Create the log event
        let mut log_event = LogEvent::new(level, message).with_target(event.metadata().target());

        // Add location information if enabled
        if self.config.include_location {
            let file = event.metadata().file().map(|s| s.to_string());
            let line = event.metadata().line();
            log_event = log_event.with_location(file, line);
        }

        // Add span information if enabled
        if self.config.include_spans
            && let Some(scope) = ctx.event_scope(event)
        {
            let mut span_fields = HashMap::new();
            let mut span_name = String::new();

            for span in scope.from_root() {
                if span_name.is_empty() {
                    span_name = span.name().to_string();
                }

                let extensions = span.extensions();
                // Collect span fields
                if let Some(visitor) = extensions.get::<SpanFieldStorage>() {
                    span_fields.extend(visitor.fields.clone());
                }
            }

            if !span_name.is_empty() && !span_fields.is_empty() {
                log_event = log_event.with_span(span_name, span_fields);
            }
        }

        // Add remaining fields to the log event
        for (key, value) in fields {
            log_event.add_field(key, value);
        }

        self.send_event(log_event);
    }

    fn on_new_span(
        &self,
        attrs: &tracing::span::Attributes<'_>,
        id: &tracing::span::Id,
        ctx: Context<'_, S>,
    ) {
        if !self.config.include_spans {
            return;
        }

        let span = ctx.span(id).unwrap();
        let mut fields = HashMap::new();
        let mut visitor = FieldVisitor::new(&mut fields);
        attrs.record(&mut visitor);

        let mut extensions = span.extensions_mut();
        extensions.insert(SpanFieldStorage { fields });
    }
}

struct SpanFieldStorage {
    fields: HashMap<String, LogValue>,
}

struct FieldVisitor<'a> {
    fields: &'a mut HashMap<String, LogValue>,
}

impl<'a> FieldVisitor<'a> {
    fn new(fields: &'a mut HashMap<String, LogValue>) -> Self {
        Self { fields }
    }
}

impl<'a> tracing::field::Visit for FieldVisitor<'a> {
    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
        self.fields.insert(
            field.name().to_string(),
            LogValue::String(value.to_string()),
        );
    }

    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
        self.fields
            .insert(field.name().to_string(), LogValue::Number(value));
    }

    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
        self.fields
            .insert(field.name().to_string(), LogValue::Number(value as i64));
    }

    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
        self.fields
            .insert(field.name().to_string(), LogValue::Bool(value));
    }

    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
        self.fields
            .insert(field.name().to_string(), LogValue::Float(value));
    }

    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
        self.fields.insert(
            field.name().to_string(),
            LogValue::String(format!("{:?}", value)),
        );
    }
}