tracing_better_stack/
layer.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
4use tracing::{Event, Level, Subscriber};
5use tracing_subscriber::Layer;
6use tracing_subscriber::layer::Context;
7use tracing_subscriber::registry::LookupSpan;
8
9use crate::batch::BatchProcessor;
10use crate::config::BetterStackConfig;
11use crate::log_event::{LogEvent, LogValue};
12
13/// A tracing layer that sends log events to Better Stack.
14///
15/// `BetterStackLayer` implements the `tracing_subscriber::Layer` trait and can be
16/// used with any tracing subscriber to automatically send structured log events
17/// to Better Stack's log management platform.
18///
19/// The layer batches events for efficient transmission and handles retries
20/// automatically based on the provided configuration.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use tracing_subscriber::prelude::*;
26/// use tracing_better_stack::{BetterStackLayer, BetterStackConfig};
27///
28/// let layer = BetterStackLayer::new(
29///     BetterStackConfig::builder("s1234567.us-east-9.betterstackdata.com", "source_token")
30///         .batch_size(100)
31///         .build()
32/// );
33///
34/// tracing_subscriber::registry()
35///     .with(layer)
36///     .init();
37/// ```
38pub struct BetterStackLayer {
39    /// Channel sender for dispatching events to the batch processor.
40    sender: Arc<Mutex<Option<UnboundedSender<LogEvent>>>>,
41    /// Configuration for Better Stack integration.
42    config: Arc<BetterStackConfig>,
43}
44
45impl BetterStackLayer {
46    /// Creates a new Better Stack layer with the specified configuration.
47    ///
48    /// The layer will lazily initialize its background processor on the first
49    /// log event when a Tokio runtime is available.
50    ///
51    /// # Arguments
52    ///
53    /// * `config` - Better Stack configuration specifying endpoint, credentials, and batching behavior
54    ///
55    /// # Example
56    ///
57    /// ```rust,no_run
58    /// use tracing_better_stack::{BetterStackLayer, BetterStackConfig};
59    ///
60    /// let config = BetterStackConfig::builder("s1234567.us-east-9.betterstackdata.com", "your-source-token")
61    ///     .batch_size(200)
62    ///     .build();
63    /// let layer = BetterStackLayer::new(config);
64    /// ```
65    pub fn new(config: BetterStackConfig) -> Self {
66        Self {
67            sender: Arc::new(Mutex::new(None)),
68            config: Arc::new(config),
69        }
70    }
71
72    /// Creates a configuration builder for Better Stack.
73    ///
74    /// This is a convenience method that creates a `BetterStackConfigBuilder`
75    /// for configuring the layer with a fluent API.
76    ///
77    /// # Arguments
78    ///
79    /// * `ingesting_host` - The Better Stack ingesting host for your source
80    /// * `source_token` - Your Better Stack source token for authentication
81    ///
82    /// # Example
83    ///
84    /// ```rust,no_run
85    /// use tracing_subscriber::prelude::*;
86    /// use tracing_better_stack::BetterStackLayer;
87    /// use std::time::Duration;
88    ///
89    /// let config = BetterStackLayer::builder("s1234567.us-east-9.betterstackdata.com", "your-source-token")
90    ///     .batch_size(200)
91    ///     .batch_timeout(Duration::from_secs(10))
92    ///     .include_spans(false)
93    ///     .build();
94    /// let layer = BetterStackLayer::new(config);
95    ///
96    /// tracing_subscriber::registry()
97    ///     .with(layer)
98    ///     .init();
99    /// ```
100    pub fn builder(
101        ingesting_host: impl Into<String>,
102        source_token: impl Into<String>,
103    ) -> crate::config::BetterStackConfigBuilder {
104        BetterStackConfig::builder(ingesting_host, source_token)
105    }
106
107    /// Ensures the background batch processor is initialized.
108    ///
109    /// This method lazily initializes the batch processor on first use,
110    /// spawning it on the current Tokio runtime if available.
111    fn ensure_initialized(&self) {
112        let mut sender = self.sender.lock().unwrap();
113        if sender.is_none()
114            && let Ok(handle) = tokio::runtime::Handle::try_current()
115        {
116            let (tx, rx) = unbounded_channel();
117            let config = Arc::clone(&self.config);
118
119            handle.spawn(async move {
120                let processor = BatchProcessor::new(rx, (*config).clone());
121                processor.run().await;
122            });
123
124            *sender = Some(tx);
125        }
126    }
127
128    /// Sends a log event to the batch processor.
129    ///
130    /// This method ensures the processor is initialized and then sends
131    /// the event through the channel. If the channel is closed, an error
132    /// is logged to stderr.
133    fn send_event(&self, event: LogEvent) {
134        self.ensure_initialized();
135
136        if let Some(tx) = self.sender.lock().unwrap().as_ref()
137            && tx.send(event).is_err()
138        {
139            eprintln!("Failed to send event to Better Stack: channel closed");
140        }
141    }
142}
143
144impl<S> Layer<S> for BetterStackLayer
145where
146    S: Subscriber + for<'a> LookupSpan<'a>,
147{
148    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
149        let mut fields = HashMap::new();
150        let mut visitor = FieldVisitor::new(&mut fields);
151        event.record(&mut visitor);
152
153        let level = match *event.metadata().level() {
154            Level::ERROR => "error",
155            Level::WARN => "warn",
156            Level::INFO => "info",
157            Level::DEBUG => "debug",
158            Level::TRACE => "trace",
159        };
160
161        // Extract message from fields if present
162        let message = fields
163            .remove("message")
164            .and_then(|v| match v {
165                LogValue::String(s) => Some(s),
166                _ => None,
167            })
168            .unwrap_or_default();
169
170        // Create the log event
171        let mut log_event = LogEvent::new(level, message).with_target(event.metadata().target());
172
173        // Add location information if enabled
174        if self.config.include_location {
175            let file = event.metadata().file().map(|s| s.to_string());
176            let line = event.metadata().line();
177            log_event = log_event.with_location(file, line);
178        }
179
180        // Add span information if enabled
181        if self.config.include_spans
182            && let Some(scope) = ctx.event_scope(event)
183        {
184            let mut span_fields = HashMap::new();
185            let mut span_name = String::new();
186
187            for span in scope.from_root() {
188                if span_name.is_empty() {
189                    span_name = span.name().to_string();
190                }
191
192                let extensions = span.extensions();
193                // Collect span fields
194                if let Some(visitor) = extensions.get::<SpanFieldStorage>() {
195                    span_fields.extend(visitor.fields.clone());
196                }
197            }
198
199            if !span_name.is_empty() && !span_fields.is_empty() {
200                log_event = log_event.with_span(span_name, span_fields);
201            }
202        }
203
204        // Add remaining fields to the log event
205        for (key, value) in fields {
206            log_event.add_field(key, value);
207        }
208
209        self.send_event(log_event);
210    }
211
212    fn on_new_span(
213        &self,
214        attrs: &tracing::span::Attributes<'_>,
215        id: &tracing::span::Id,
216        ctx: Context<'_, S>,
217    ) {
218        if !self.config.include_spans {
219            return;
220        }
221
222        let span = ctx.span(id).unwrap();
223        let mut fields = HashMap::new();
224        let mut visitor = FieldVisitor::new(&mut fields);
225        attrs.record(&mut visitor);
226
227        let mut extensions = span.extensions_mut();
228        extensions.insert(SpanFieldStorage { fields });
229    }
230}
231
232struct SpanFieldStorage {
233    fields: HashMap<String, LogValue>,
234}
235
236struct FieldVisitor<'a> {
237    fields: &'a mut HashMap<String, LogValue>,
238}
239
240impl<'a> FieldVisitor<'a> {
241    fn new(fields: &'a mut HashMap<String, LogValue>) -> Self {
242        Self { fields }
243    }
244}
245
246impl<'a> tracing::field::Visit for FieldVisitor<'a> {
247    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
248        self.fields.insert(
249            field.name().to_string(),
250            LogValue::String(value.to_string()),
251        );
252    }
253
254    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
255        self.fields
256            .insert(field.name().to_string(), LogValue::Number(value));
257    }
258
259    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
260        self.fields
261            .insert(field.name().to_string(), LogValue::Number(value as i64));
262    }
263
264    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
265        self.fields
266            .insert(field.name().to_string(), LogValue::Bool(value));
267    }
268
269    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
270        self.fields
271            .insert(field.name().to_string(), LogValue::Float(value));
272    }
273
274    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
275        self.fields.insert(
276            field.name().to_string(),
277            LogValue::String(format!("{:?}", value)),
278        );
279    }
280}