tracing_layer_core/
layer.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use regex::Regex;
6use serde::ser::SerializeMap;
7use serde::Serializer;
8use serde_json::Value;
9use tokio::sync::Mutex;
10use tracing::log::LevelFilter;
11use tracing::{Event, Subscriber};
12use tracing_bunyan_formatter::JsonStorage;
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::Layer;
15
16use crate::filters::{Filter, FilterError};
17use crate::{
18    BackgroundWorker, ChannelSender, Config, EventFilters, WebhookMessageFactory, WebhookMessageInputs, WorkerMessage,
19};
20
21/// Layer for forwarding tracing events to webhook endpoints.
22pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
23    /// Filter events by their target.
24    ///
25    /// Filter type semantics:
26    /// - Subtractive: Exclude an event if the target does NOT MATCH a given regex.
27    /// - Additive: Exclude an event if the target MATCHES a given regex.
28    target_filters: EventFilters,
29
30    /// Filter events by their message.
31    ///
32    /// Filter type semantics:
33    /// - Positive: Exclude an event if the message MATCHES a given regex, and
34    /// - Negative: Exclude an event if the message does NOT MATCH a given regex.
35    message_filters: Option<EventFilters>,
36
37    /// Filter events by fields.
38    ///
39    /// Filter type semantics:
40    /// - Positive: Exclude the event if its key MATCHES a given regex.
41    /// - Negative: Exclude the event if its key does NOT MATCH a given regex.
42    event_by_field_filters: Option<EventFilters>,
43
44    /// Filter fields of events from being sent to the webhook.
45    ///
46    /// Filter type semantics:
47    /// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
48    field_exclusion_filters: Option<Vec<Regex>>,
49
50    /// Filter events by their level.
51    level_filter: Option<String>,
52
53    app_name: String,
54
55    /// Configure the layer's connection to the Webhook API.
56    config: C,
57
58    factory: std::marker::PhantomData<F>,
59
60    /// An unbounded sender, which the caller must send `WorkerMessage::Shutdown` in order to cancel
61    /// worker's receive-send loop.
62    sender: ChannelSender,
63}
64
65impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
66    /// Create a new layer for forwarding messages to the webhook, using a specified
67    /// configuration. The background worker must be started in order to spawn spawns
68    /// a task onto the tokio runtime to begin sending tracing events to the webhook.
69    ///
70    /// Returns the tracing_subscriber::Layer impl to add to a registry, an unbounded-mpsc sender
71    /// used to shutdown the background worker, and a future to spawn as a task on a tokio runtime
72    /// to initialize the worker's processing and sending of HTTP requests to the webhook.
73    pub(crate) fn new(
74        app_name: String,
75        target_filters: EventFilters,
76        message_filters: Option<EventFilters>,
77        event_by_field_filters: Option<EventFilters>,
78        field_exclusion_filters: Option<Vec<Regex>>,
79        level_filter: Option<String>,
80        config: C,
81    ) -> (WebhookLayer<C, F>, BackgroundWorker) {
82        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
83        let layer = WebhookLayer {
84            target_filters,
85            message_filters,
86            field_exclusion_filters,
87            event_by_field_filters,
88            level_filter,
89            app_name,
90            config,
91            factory: Default::default(),
92            sender: tx.clone(),
93        };
94        let background_worker = BackgroundWorker {
95            sender: tx,
96            handle: Arc::new(Mutex::new(None)),
97            rx: Arc::new(Mutex::new(rx)),
98        };
99        (layer, background_worker)
100    }
101
102    /// Create a new builder for the webhook layer.
103    pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
104        WebhookLayerBuilder::new(app_name, target_filters)
105    }
106}
107
108/// A builder for creating a webhook layer.
109///
110/// The layer requires a regex for selecting events to be sent to webhook by their target. Specifying
111/// no filter (e.g. ".*") will cause an explosion in the number of messages observed by the layer.
112///
113/// Several methods expose initialization of optional filtering mechanisms, along with webhook
114/// configuration that defaults to searching in the local environment variables.
115pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
116    factory: std::marker::PhantomData<F>,
117    app_name: String,
118    target_filters: EventFilters,
119    message_filters: Option<EventFilters>,
120    event_by_field_filters: Option<EventFilters>,
121    field_exclusion_filters: Option<Vec<Regex>>,
122    level_filters: Option<String>,
123    config: Option<C>,
124}
125
126impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
127    pub(crate) fn new(app_name: String, target_filters: EventFilters) -> Self {
128        Self {
129            factory: Default::default(),
130            app_name,
131            target_filters,
132            message_filters: None,
133            event_by_field_filters: None,
134            field_exclusion_filters: None,
135            level_filters: None,
136            config: None,
137        }
138    }
139
140    /// Filter events by their message.
141    ///
142    /// Filter type semantics:
143    /// - Positive: Exclude an event if the message MATCHES a given regex, and
144    /// - Negative: Exclude an event if the message does NOT MATCH a given regex.
145    pub fn message_filters(mut self, filters: EventFilters) -> Self {
146        self.message_filters = Some(filters);
147        self
148    }
149
150    /// Filter events by fields.
151    ///
152    /// Filter type semantics:
153    /// - Positive: Exclude the event if its key MATCHES a given regex.
154    /// - Negative: Exclude the event if its key does NOT MATCH a given regex.
155    pub fn event_by_field_filters(mut self, filters: EventFilters) -> Self {
156        self.event_by_field_filters = Some(filters);
157        self
158    }
159
160    /// Filter fields of events from being sent to the webhook.
161    ///
162    /// Filter type semantics:
163    /// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
164    pub fn field_exclusion_filters(mut self, filters: Vec<Regex>) -> Self {
165        self.field_exclusion_filters = Some(filters);
166        self
167    }
168
169    /// Configure the layer's connection to the webhook.
170    pub fn config(mut self, config: C) -> Self {
171        self.config = Some(config);
172        self
173    }
174
175    /// Configure which levels of events to send to the webhook.
176    pub fn level_filters(mut self, level_filters: String) -> Self {
177        self.level_filters = Some(level_filters);
178        self
179    }
180
181    /// Create a webhook layer and its corresponding background worker to (async) send the messages.
182    pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
183        WebhookLayer::new(
184            self.app_name,
185            self.target_filters,
186            self.message_filters,
187            self.event_by_field_filters,
188            self.field_exclusion_filters,
189            self.level_filters,
190            self.config.unwrap_or_else(C::new_from_env),
191        )
192    }
193}
194
195impl<S, C, F> Layer<S> for WebhookLayer<C, F>
196where
197    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
198    C: Config + 'static,
199    F: WebhookMessageFactory + 'static,
200{
201    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
202        let current_span = ctx.lookup_current();
203        let mut event_visitor = JsonStorage::default();
204        event.record(&mut event_visitor);
205
206        let format = || {
207            const KEYWORDS: [&str; 2] = ["message", "error"];
208
209            let target = event.metadata().target();
210            self.target_filters.process(target)?;
211
212            // Extract the "message" field, if provided. Fallback to the target, if missing.
213            let message = event_visitor
214                .values()
215                .get("message")
216                .and_then(|v| match v {
217                    Value::String(s) => Some(s.as_str()),
218                    _ => None,
219                })
220                .or_else(|| {
221                    event_visitor.values().get("error").and_then(|v| match v {
222                        Value::String(s) => Some(s.as_str()),
223                        _ => None,
224                    })
225                })
226                .unwrap_or("No message");
227
228            self.message_filters.process(message)?;
229            if let Some(level_filters) = &self.level_filter {
230                let message_level = {
231                    LevelFilter::from_str(event.metadata().level().as_str())
232                        .map_err(|e| FilterError::IoError(Box::new(e)))?
233                };
234                let level_threshold =
235                    LevelFilter::from_str(level_filters).map_err(|e| FilterError::IoError(Box::new(e)))?;
236                if message_level > level_threshold {
237                    return Err(FilterError::PositiveFilterFailed);
238                }
239            }
240
241            let mut metadata_buffer = Vec::new();
242            let mut serializer = serde_json::Serializer::new(&mut metadata_buffer);
243            let mut map_serializer = serializer.serialize_map(None)?;
244            // Add all the other fields associated with the event, expect the message we
245            // already used.
246            for (key, value) in event_visitor
247                .values()
248                .iter()
249                .filter(|(&key, _)| !KEYWORDS.contains(&key))
250                .filter(|(&key, _)| self.field_exclusion_filters.process(key).is_ok())
251            {
252                self.event_by_field_filters.process(key)?;
253                map_serializer.serialize_entry(key, value)?;
254            }
255            // Add all the fields from the current span, if we have one.
256            if let Some(span) = &current_span {
257                let extensions = span.extensions();
258                if let Some(visitor) = extensions.get::<JsonStorage>() {
259                    for (key, value) in visitor.values() {
260                        map_serializer.serialize_entry(key, value)?;
261                    }
262                }
263            }
264            map_serializer.end()?;
265
266            let span = match &current_span {
267                Some(span) => span.metadata().name(),
268                None => "",
269            };
270
271            let metadata = {
272                let data: HashMap<String, Value> = serde_json::from_slice(metadata_buffer.as_slice()).unwrap();
273                serde_json::to_string_pretty(&data).unwrap()
274            };
275
276            Ok(F::create(WebhookMessageInputs {
277                app_name: self.app_name.clone(),
278                message: message.to_string(),
279                event_level: *event.metadata().level(),
280                source_file: event.metadata().file().unwrap_or("Unknown").to_string(),
281                source_line: event.metadata().line().unwrap_or(0),
282                target: target.to_string(),
283                span: span.to_string(),
284                metadata,
285                webhook_url: self.config.webhook_url().to_string(),
286            }))
287        };
288
289        let result: Result<_, FilterError> = format();
290        if let Ok(formatted) = result {
291            if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
292                #[cfg(feature = "log-errors")]
293                eprintln!("ERROR: failed to send webhook payload to given channel, err = {}", e)
294            };
295        }
296    }
297}