1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use regex::Regex;
6use serde::ser::SerializeMap;
7use serde::Serializer;
8use serde_json::Value;
9use tokio::sync::Mutex;
10use tracing::log::LevelFilter;
11use tracing::{Event, Subscriber};
12use tracing_bunyan_formatter::JsonStorage;
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::Layer;
15
16use crate::filters::{Filter, FilterError};
17use crate::{
18 BackgroundWorker, ChannelSender, Config, EventFilters, WebhookMessageFactory, WebhookMessageInputs, WorkerMessage,
19};
20
21pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
23 target_filters: EventFilters,
29
30 message_filters: Option<EventFilters>,
36
37 event_by_field_filters: Option<EventFilters>,
43
44 field_exclusion_filters: Option<Vec<Regex>>,
49
50 level_filter: Option<String>,
52
53 app_name: String,
54
55 config: C,
57
58 factory: std::marker::PhantomData<F>,
59
60 sender: ChannelSender,
63}
64
65impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
66 pub(crate) fn new(
74 app_name: String,
75 target_filters: EventFilters,
76 message_filters: Option<EventFilters>,
77 event_by_field_filters: Option<EventFilters>,
78 field_exclusion_filters: Option<Vec<Regex>>,
79 level_filter: Option<String>,
80 config: C,
81 ) -> (WebhookLayer<C, F>, BackgroundWorker) {
82 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
83 let layer = WebhookLayer {
84 target_filters,
85 message_filters,
86 field_exclusion_filters,
87 event_by_field_filters,
88 level_filter,
89 app_name,
90 config,
91 factory: Default::default(),
92 sender: tx.clone(),
93 };
94 let background_worker = BackgroundWorker {
95 sender: tx,
96 handle: Arc::new(Mutex::new(None)),
97 rx: Arc::new(Mutex::new(rx)),
98 };
99 (layer, background_worker)
100 }
101
102 pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
104 WebhookLayerBuilder::new(app_name, target_filters)
105 }
106}
107
108pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
116 factory: std::marker::PhantomData<F>,
117 app_name: String,
118 target_filters: EventFilters,
119 message_filters: Option<EventFilters>,
120 event_by_field_filters: Option<EventFilters>,
121 field_exclusion_filters: Option<Vec<Regex>>,
122 level_filters: Option<String>,
123 config: Option<C>,
124}
125
126impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
127 pub(crate) fn new(app_name: String, target_filters: EventFilters) -> Self {
128 Self {
129 factory: Default::default(),
130 app_name,
131 target_filters,
132 message_filters: None,
133 event_by_field_filters: None,
134 field_exclusion_filters: None,
135 level_filters: None,
136 config: None,
137 }
138 }
139
140 pub fn message_filters(mut self, filters: EventFilters) -> Self {
146 self.message_filters = Some(filters);
147 self
148 }
149
150 pub fn event_by_field_filters(mut self, filters: EventFilters) -> Self {
156 self.event_by_field_filters = Some(filters);
157 self
158 }
159
160 pub fn field_exclusion_filters(mut self, filters: Vec<Regex>) -> Self {
165 self.field_exclusion_filters = Some(filters);
166 self
167 }
168
169 pub fn config(mut self, config: C) -> Self {
171 self.config = Some(config);
172 self
173 }
174
175 pub fn level_filters(mut self, level_filters: String) -> Self {
177 self.level_filters = Some(level_filters);
178 self
179 }
180
181 pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
183 WebhookLayer::new(
184 self.app_name,
185 self.target_filters,
186 self.message_filters,
187 self.event_by_field_filters,
188 self.field_exclusion_filters,
189 self.level_filters,
190 self.config.unwrap_or_else(C::new_from_env),
191 )
192 }
193}
194
195impl<S, C, F> Layer<S> for WebhookLayer<C, F>
196where
197 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
198 C: Config + 'static,
199 F: WebhookMessageFactory + 'static,
200{
201 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
202 let current_span = ctx.lookup_current();
203 let mut event_visitor = JsonStorage::default();
204 event.record(&mut event_visitor);
205
206 let format = || {
207 const KEYWORDS: [&str; 2] = ["message", "error"];
208
209 let target = event.metadata().target();
210 self.target_filters.process(target)?;
211
212 let message = event_visitor
214 .values()
215 .get("message")
216 .and_then(|v| match v {
217 Value::String(s) => Some(s.as_str()),
218 _ => None,
219 })
220 .or_else(|| {
221 event_visitor.values().get("error").and_then(|v| match v {
222 Value::String(s) => Some(s.as_str()),
223 _ => None,
224 })
225 })
226 .unwrap_or("No message");
227
228 self.message_filters.process(message)?;
229 if let Some(level_filters) = &self.level_filter {
230 let message_level = {
231 LevelFilter::from_str(event.metadata().level().as_str())
232 .map_err(|e| FilterError::IoError(Box::new(e)))?
233 };
234 let level_threshold =
235 LevelFilter::from_str(level_filters).map_err(|e| FilterError::IoError(Box::new(e)))?;
236 if message_level > level_threshold {
237 return Err(FilterError::PositiveFilterFailed);
238 }
239 }
240
241 let mut metadata_buffer = Vec::new();
242 let mut serializer = serde_json::Serializer::new(&mut metadata_buffer);
243 let mut map_serializer = serializer.serialize_map(None)?;
244 for (key, value) in event_visitor
247 .values()
248 .iter()
249 .filter(|(&key, _)| !KEYWORDS.contains(&key))
250 .filter(|(&key, _)| self.field_exclusion_filters.process(key).is_ok())
251 {
252 self.event_by_field_filters.process(key)?;
253 map_serializer.serialize_entry(key, value)?;
254 }
255 if let Some(span) = ¤t_span {
257 let extensions = span.extensions();
258 if let Some(visitor) = extensions.get::<JsonStorage>() {
259 for (key, value) in visitor.values() {
260 map_serializer.serialize_entry(key, value)?;
261 }
262 }
263 }
264 map_serializer.end()?;
265
266 let span = match ¤t_span {
267 Some(span) => span.metadata().name(),
268 None => "",
269 };
270
271 let metadata = {
272 let data: HashMap<String, Value> = serde_json::from_slice(metadata_buffer.as_slice()).unwrap();
273 serde_json::to_string_pretty(&data).unwrap()
274 };
275
276 Ok(F::create(WebhookMessageInputs {
277 app_name: self.app_name.clone(),
278 message: message.to_string(),
279 event_level: *event.metadata().level(),
280 source_file: event.metadata().file().unwrap_or("Unknown").to_string(),
281 source_line: event.metadata().line().unwrap_or(0),
282 target: target.to_string(),
283 span: span.to_string(),
284 metadata,
285 webhook_url: self.config.webhook_url().to_string(),
286 }))
287 };
288
289 let result: Result<_, FilterError> = format();
290 if let Ok(formatted) = result {
291 if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
292 #[cfg(feature = "log-errors")]
293 eprintln!("ERROR: failed to send webhook payload to given channel, err = {}", e)
294 };
295 }
296 }
297}