tracing-better-stack 0.1.0

A tracing-subscriber layer for Better Stack (Logtail) logging
Documentation
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::{Interval, interval};

use crate::config::BetterStackConfig;
use crate::log_event::LogEvent;
use crate::sender::send_batch;

/// Processes log events in batches for efficient transmission to Better Stack.
///
/// `BatchProcessor` collects incoming log events and sends them to Better Stack
/// either when the batch reaches the configured size or when the timeout interval
/// expires, whichever comes first.
pub(crate) struct BatchProcessor {
    /// Channel receiver for incoming log events from the tracing layer.
    receiver: UnboundedReceiver<LogEvent>,
    /// Configuration for Better Stack including API endpoint, batch size, and timeout.
    config: BetterStackConfig,
    /// Timer that triggers periodic flushes to ensure events don't sit too long.
    interval: Interval,
    /// Buffer holding log events until they're ready to be sent.
    batch: Vec<LogEvent>,
}

impl BatchProcessor {
    /// Creates a new `BatchProcessor` with the specified configuration.
    ///
    /// # Arguments
    ///
    /// * `receiver` - Channel receiver for incoming log events
    /// * `config` - Better Stack configuration containing batch size and timeout settings
    ///
    /// # Returns
    ///
    /// A new `BatchProcessor` instance ready to process log events.
    pub(crate) fn new(receiver: UnboundedReceiver<LogEvent>, config: BetterStackConfig) -> Self {
        let interval = interval(config.batch_timeout);
        let batch_capacity = config.batch_size;
        Self {
            receiver,
            config,
            interval,
            batch: Vec::with_capacity(batch_capacity),
        }
    }

    /// Runs the batch processor, continuously receiving and batching log events.
    ///
    /// This method runs an event loop that:
    /// - Receives log events from the channel
    /// - Adds them to the current batch
    /// - Flushes the batch when it reaches the configured size
    /// - Periodically flushes based on the timeout interval
    /// - Flushes any remaining events when the channel is closed
    ///
    /// The processor continues running until the channel is closed.
    pub(crate) async fn run(mut self) {
        loop {
            tokio::select! {
                Some(event) = self.receiver.recv() => {
                    self.batch.push(event);
                    if self.batch.len() >= self.config.batch_size {
                        self.flush().await;
                    }
                }
                _ = self.interval.tick() => {
                    if !self.batch.is_empty() {
                        self.flush().await;
                    }
                }
                else => {
                    // Channel closed, flush any remaining events
                    if !self.batch.is_empty() {
                        self.flush().await;
                    }
                    break;
                }
            }
        }
    }

    /// Flushes the current batch of log events to Better Stack.
    ///
    /// This method:
    /// - Checks if the batch is empty and returns early if so
    /// - Replaces the current batch with a new empty batch
    /// - Sends the collected events to Better Stack
    /// - Logs an error if the send operation fails
    async fn flush(&mut self) {
        if self.batch.is_empty() {
            return;
        }

        let batch = std::mem::replace(&mut self.batch, Vec::with_capacity(self.config.batch_size));

        if let Err(e) = send_batch(batch, &self.config).await {
            eprintln!("Failed to send batch to Better Stack: {}", e);
        }
    }
}