betterstack_tracing/
lib.rs

1//! # betterstack-tracing
2//!
3//! A [tracing](https://docs.rs/tracing) layer for sending logs to [Betterstack](https://betterstack.com).
4//!
5//! ## Features
6//!
7//! - Non-blocking async log sending
8//! - Automatic batching (size and time-based)
9//! - Span context tracking
10//! - Configurable error handling
11//! - Connection pooling and retries
12//!
13//! ## Quick Start
14//!
15//! ```rust,no_run
16//! use betterstack_tracing::BetterstackLayer;
17//! use tracing_subscriber::prelude::*;
18//!
19//! #[tokio::main]
20//! async fn main() {
21//!     let config = BetterstackLayer::builder("your-token-here")
22//!         .build()
23//!         .expect("failed to create config");
24//!
25//!     let betterstack_layer = BetterstackLayer::new(config);
26//!
27//!     tracing_subscriber::registry()
28//!         .with(betterstack_layer)
29//!         .init();
30//!
31//!     tracing::info!("Hello from betterstack-tracing!");
32//! }
33//! ```
34
35mod config;
36mod error;
37mod payload;
38mod sender;
39mod visitor;
40
41pub use config::{BetterstackConfig, BetterstackConfigBuilder};
42pub use error::{BetterstackError, Result};
43
44use std::sync::Arc;
45use tokio::sync::mpsc;
46use tracing::Subscriber;
47use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan};
48
49use sender::WorkerMessage;
50
51/// The main tracing layer for sending logs to Betterstack
52pub struct BetterstackLayer {
53    config: Arc<BetterstackConfig>,
54    tx: mpsc::Sender<WorkerMessage>,
55    worker_handle: Option<tokio::task::JoinHandle<()>>,
56}
57
58impl BetterstackLayer {
59    /// Create a new BetterstackLayer with the given configuration
60    pub fn new(config: BetterstackConfig) -> Self {
61        let config = Arc::new(config);
62        let (tx, rx) = mpsc::channel(config.channel_capacity);
63
64        // Spawn the background worker task
65        let worker_handle = tokio::spawn(sender::worker_task(rx, config.clone()));
66
67        Self {
68            config,
69            tx,
70            worker_handle: Some(worker_handle),
71        }
72    }
73
74    /// Create a configuration builder with the given token
75    ///
76    /// # Example
77    ///
78    /// ```rust,no_run
79    /// use betterstack_tracing::BetterstackLayer;
80    /// use std::time::Duration;
81    ///
82    /// let config = BetterstackLayer::builder("your-token")
83    ///     .batch_size(50)
84    ///     .batch_delay(Duration::from_secs(5))
85    ///     .build()
86    ///     .expect("failed to create config");
87    ///
88    /// let layer = BetterstackLayer::new(config);
89    /// ```
90    pub fn builder(token: impl Into<String>) -> BetterstackConfigBuilder {
91        BetterstackConfigBuilder::new(token)
92    }
93
94    /// Flush any pending logs
95    ///
96    /// This sends a flush message to the worker and waits for it to complete.
97    /// Useful before shutdown to ensure all logs are sent.
98    pub async fn flush(&self) {
99        let _ = self.tx.send(WorkerMessage::Flush).await;
100        // Give the worker a moment to flush
101        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
102    }
103}
104
105impl<S> Layer<S> for BetterstackLayer
106where
107    S: Subscriber + for<'a> LookupSpan<'a>,
108{
109    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
110        use visitor::FieldVisitor;
111
112        // Create a visitor to collect fields
113        let mut visitor = FieldVisitor::new();
114        event.record(&mut visitor);
115
116        // Extract message from visitor or use target as fallback
117        let message = visitor
118            .message()
119            .unwrap_or_else(|| event.metadata().target())
120            .to_string();
121
122        // Get span context if enabled
123        let mut span_context = if self.config.include_span_context {
124            visitor::collect_span_context(&ctx, event, &message)
125        } else {
126            Default::default()
127        };
128
129        // Override message from span context with the actual event message
130        span_context.message = message;
131
132        // Build the payload
133        let payload = payload::LogPayload::from_event(
134            event,
135            visitor.into_fields(),
136            span_context,
137            &self.config,
138        );
139
140        // Validate payload size to comply with Betterstack limits (1 MiB max)
141        match serde_json::to_vec(&payload) {
142            Ok(serialized) => {
143                let size = serialized.len();
144
145                if size > config::MAX_LOG_RECORD_SIZE {
146                    // Log record exceeds 1 MiB limit - drop it
147                    tracing::warn!(
148                        size = size,
149                        max_size = config::MAX_LOG_RECORD_SIZE,
150                        "Log record exceeds Betterstack size limit and will be dropped"
151                    );
152                    return;
153                }
154
155                if size > config::RECOMMENDED_LOG_SIZE {
156                    // Log is over recommended size but still within limits
157                    tracing::debug!(
158                        size = size,
159                        recommended_size = config::RECOMMENDED_LOG_SIZE,
160                        "Log record exceeds recommended size"
161                    );
162                }
163
164                // Send to worker (non-blocking)
165                // If the channel is full, we drop the log to avoid blocking
166                let _ = self.tx.try_send(WorkerMessage::Log(payload));
167            }
168            Err(e) => {
169                // Failed to serialize - log error
170                tracing::error!(error = %e, "Failed to serialize log payload");
171            }
172        }
173    }
174}
175
176impl Drop for BetterstackLayer {
177    fn drop(&mut self) {
178        // Send shutdown signal
179        let _ = self.tx.try_send(WorkerMessage::Shutdown);
180
181        // Wait for worker to finish with a timeout
182        if let Some(handle) = self.worker_handle.take() {
183            // Try to get the current runtime or create a new one for cleanup
184            if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
185                runtime_handle.block_on(async {
186                    let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
187                });
188            } else {
189                // If we're not in a tokio runtime, just abort the task
190                handle.abort();
191            }
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_builder_creates_config() {
202        let config = BetterstackLayer::builder("test-token")
203            .build()
204            .expect("failed to create config");
205
206        // Just verify the config was created successfully
207        assert_eq!(config.token(), "test-token");
208    }
209}