tracing_layer_core/
layer.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::future::Future;
4use std::process::Output;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use regex::Regex;
9use serde::ser::SerializeMap;
10use serde::Serializer;
11use serde_json::Value;
12use tokio::sync::Mutex;
13use tracing::{Event, Subscriber};
14use tracing::log::LevelFilter;
15use tracing_bunyan_formatter::JsonStorage;
16use tracing_subscriber::Layer;
17use tracing_subscriber::layer::Context;
18
19use crate::{BackgroundWorker, ChannelSender, Config, EventFilters, WebhookMessageFactory, WebhookMessageInputs, WorkerMessage};
20use crate::filters::{Filter, FilterError};
21use crate::worker::worker;
22
23/// Layer for forwarding tracing events to webhook endpoints.
24pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
25    /// Filter events by their target.
26    ///
27    /// Filter type semantics:
28    /// - Subtractive: Exclude an event if the target does NOT MATCH a given regex.
29    /// - Additive: Exclude an event if the target MATCHES a given regex.
30    target_filters: EventFilters,
31
32    /// Filter events by their message.
33    ///
34    /// Filter type semantics:
35    /// - Positive: Exclude an event if the message MATCHES a given regex, and
36    /// - Negative: Exclude an event if the message does NOT MATCH a given regex.
37    message_filters: Option<EventFilters>,
38
39    /// Filter events by fields.
40    ///
41    /// Filter type semantics:
42    /// - Positive: Exclude the event if its key MATCHES a given regex.
43    /// - Negative: Exclude the event if its key does NOT MATCH a given regex.
44    event_by_field_filters: Option<EventFilters>,
45
46    /// Filter fields of events from being sent to the webhook.
47    ///
48    /// Filter type semantics:
49    /// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
50    field_exclusion_filters: Option<Vec<Regex>>,
51
52    /// Filter events by their level.
53    level_filter: Option<String>,
54
55    app_name: String,
56
57    /// Configure the layer's connection to the Webhook API.
58    config: C,
59
60    factory: std::marker::PhantomData<F>,
61
62    /// An unbounded sender, which the caller must send `WorkerMessage::Shutdown` in order to cancel
63    /// worker's receive-send loop.
64    sender: ChannelSender,
65}
66
67impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
68    /// Create a new layer for forwarding messages to the webhook, using a specified
69    /// configuration. The background worker must be started in order to spawn spawns
70    /// a task onto the tokio runtime to begin sending tracing events to the webhook.
71    ///
72    /// Returns the tracing_subscriber::Layer impl to add to a registry, an unbounded-mpsc sender
73    /// used to shutdown the background worker, and a future to spawn as a task on a tokio runtime
74    /// to initialize the worker's processing and sending of HTTP requests to the webhook.
75    pub(crate) fn new(
76        app_name: String,
77        target_filters: EventFilters,
78        message_filters: Option<EventFilters>,
79        event_by_field_filters: Option<EventFilters>,
80        field_exclusion_filters: Option<Vec<Regex>>,
81        level_filter: Option<String>,
82        config: C,
83    ) -> (WebhookLayer<C, F>, BackgroundWorker) {
84        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
85        let layer = WebhookLayer {
86            target_filters,
87            message_filters,
88            field_exclusion_filters,
89            event_by_field_filters,
90            level_filter,
91            app_name,
92            config,
93            factory: Default::default(),
94            sender: tx.clone(),
95        };
96        let background_worker = BackgroundWorker {
97            sender: tx,
98            handle: Arc::new(Mutex::new(None)),
99            rx: Arc::new(Mutex::new(rx)),
100        };
101        (layer, background_worker)
102    }
103
104    /// Create a new builder for the webhook layer.
105    pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
106        WebhookLayerBuilder::new(app_name, target_filters)
107    }
108}
109
110/// A builder for creating a webhook layer.
111///
112/// The layer requires a regex for selecting events to be sent to webhook by their target. Specifying
113/// no filter (e.g. ".*") will cause an explosion in the number of messages observed by the layer.
114///
115/// Several methods expose initialization of optional filtering mechanisms, along with webhook
116/// configuration that defaults to searching in the local environment variables.
117pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
118    factory: std::marker::PhantomData<F>,
119    app_name: String,
120    target_filters: EventFilters,
121    message_filters: Option<EventFilters>,
122    event_by_field_filters: Option<EventFilters>,
123    field_exclusion_filters: Option<Vec<Regex>>,
124    level_filters: Option<String>,
125    config: Option<C>,
126}
127
128impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
129    pub(crate) fn new(app_name: String, target_filters: EventFilters) -> Self {
130        Self {
131            factory: Default::default(),
132            app_name,
133            target_filters,
134            message_filters: None,
135            event_by_field_filters: None,
136            field_exclusion_filters: None,
137            level_filters: None,
138            config: None,
139        }
140    }
141
142    /// Filter events by their message.
143    ///
144    /// Filter type semantics:
145    /// - Positive: Exclude an event if the message MATCHES a given regex, and
146    /// - Negative: Exclude an event if the message does NOT MATCH a given regex.
147    pub fn message_filters(mut self, filters: EventFilters) -> Self {
148        self.message_filters = Some(filters);
149        self
150    }
151
152    /// Filter events by fields.
153    ///
154    /// Filter type semantics:
155    /// - Positive: Exclude the event if its key MATCHES a given regex.
156    /// - Negative: Exclude the event if its key does NOT MATCH a given regex.
157    pub fn event_by_field_filters(mut self, filters: EventFilters) -> Self {
158        self.event_by_field_filters = Some(filters);
159        self
160    }
161
162    /// Filter fields of events from being sent to the webhook.
163    ///
164    /// Filter type semantics:
165    /// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
166    pub fn field_exclusion_filters(mut self, filters: Vec<Regex>) -> Self {
167        self.field_exclusion_filters = Some(filters);
168        self
169    }
170
171    /// Configure the layer's connection to the webhook.
172    pub fn config(mut self, config: C) -> Self {
173        self.config = Some(config);
174        self
175    }
176
177    /// Configure which levels of events to send to the webhook.
178    pub fn level_filters(mut self, level_filters: String) -> Self {
179        self.level_filters = Some(level_filters);
180        self
181    }
182
183    /// Create a webhook layer and its corresponding background worker to (async) send the messages.
184    pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
185        WebhookLayer::new(
186            self.app_name,
187            self.target_filters,
188            self.message_filters,
189            self.event_by_field_filters,
190            self.field_exclusion_filters,
191            self.level_filters,
192            self.config.unwrap_or_else(C::new_from_env),
193        )
194    }
195}
196
197impl<S, C, F> Layer<S> for WebhookLayer<C, F>
198where
199    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
200    C: Config+ 'static,
201    F: WebhookMessageFactory + 'static,
202{
203    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
204        let current_span = ctx.lookup_current();
205        let mut event_visitor = JsonStorage::default();
206        event.record(&mut event_visitor);
207
208        let format = || {
209            const KEYWORDS: [&str; 2] = ["message", "error"];
210
211            let target = event.metadata().target();
212            self.target_filters.process(target)?;
213
214            // Extract the "message" field, if provided. Fallback to the target, if missing.
215            let message = event_visitor
216                .values()
217                .get("message")
218                .and_then(|v| match v {
219                    Value::String(s) => Some(s.as_str()),
220                    _ => None,
221                })
222                .or_else(|| {
223                    event_visitor.values().get("error").and_then(|v| match v {
224                        Value::String(s) => Some(s.as_str()),
225                        _ => None,
226                    })
227                })
228                .unwrap_or("No message");
229
230            self.message_filters.process(message)?;
231            if let Some(level_filters) = &self.level_filter {
232                let message_level = {
233                    LevelFilter::from_str(event.metadata().level().as_str())
234                        .map_err(|e| FilterError::IoError(Box::new(e)))?
235                };
236                let level_threshold =
237                    LevelFilter::from_str(level_filters).map_err(|e| FilterError::IoError(Box::new(e)))?;
238                if message_level > level_threshold {
239                    return Err(FilterError::PositiveFilterFailed);
240                }
241            }
242
243            let mut metadata_buffer = Vec::new();
244            let mut serializer = serde_json::Serializer::new(&mut metadata_buffer);
245            let mut map_serializer = serializer.serialize_map(None)?;
246            // Add all the other fields associated with the event, expect the message we
247            // already used.
248            for (key, value) in event_visitor
249                .values()
250                .iter()
251                .filter(|(&key, _)| !KEYWORDS.contains(&key))
252                .filter(|(&key, _)| self.field_exclusion_filters.process(key).is_ok())
253            {
254                self.event_by_field_filters.process(key)?;
255                map_serializer.serialize_entry(key, value)?;
256            }
257            // Add all the fields from the current span, if we have one.
258            if let Some(span) = &current_span {
259                let extensions = span.extensions();
260                if let Some(visitor) = extensions.get::<JsonStorage>() {
261                    for (key, value) in visitor.values() {
262                        map_serializer.serialize_entry(key, value)?;
263                    }
264                }
265            }
266            map_serializer.end()?;
267
268            let span = match &current_span {
269                Some(span) => span.metadata().name(),
270                None => "",
271            };
272
273            let metadata = {
274                let data: HashMap<String, Value> = serde_json::from_slice(metadata_buffer.as_slice()).unwrap();
275                serde_json::to_string_pretty(&data).unwrap()
276            };
277
278            Ok(F::create(WebhookMessageInputs {
279                app_name: self.app_name.clone(),
280                message: message.to_string(),
281                event_level: *event.metadata().level(),
282                source_file: event.metadata().file().unwrap_or("Unknown").to_string(),
283                source_line: event.metadata().line().unwrap_or(0),
284                target: target.to_string(),
285                span: span.to_string(),
286                metadata,
287                webhook_url: self.config.webhook_url().to_string(),
288            }))
289        };
290
291        let result: Result<_, FilterError> = format();
292        if let Ok(formatted) = result {
293            if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
294                println!("failed to send webhook payload to given channel, err = {}", e)
295            };
296        }
297    }
298}