1use std::collections::HashMap;
2use std::error::Error;
3use std::future::Future;
4use std::process::Output;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use regex::Regex;
9use serde::ser::SerializeMap;
10use serde::Serializer;
11use serde_json::Value;
12use tokio::sync::Mutex;
13use tracing::{Event, Subscriber};
14use tracing::log::LevelFilter;
15use tracing_bunyan_formatter::JsonStorage;
16use tracing_subscriber::Layer;
17use tracing_subscriber::layer::Context;
18
19use crate::{BackgroundWorker, ChannelSender, Config, EventFilters, WebhookMessageFactory, WebhookMessageInputs, WorkerMessage};
20use crate::filters::{Filter, FilterError};
21use crate::worker::worker;
22
23pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
25 target_filters: EventFilters,
31
32 message_filters: Option<EventFilters>,
38
39 event_by_field_filters: Option<EventFilters>,
45
46 field_exclusion_filters: Option<Vec<Regex>>,
51
52 level_filter: Option<String>,
54
55 app_name: String,
56
57 config: C,
59
60 factory: std::marker::PhantomData<F>,
61
62 sender: ChannelSender,
65}
66
67impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
68 pub(crate) fn new(
76 app_name: String,
77 target_filters: EventFilters,
78 message_filters: Option<EventFilters>,
79 event_by_field_filters: Option<EventFilters>,
80 field_exclusion_filters: Option<Vec<Regex>>,
81 level_filter: Option<String>,
82 config: C,
83 ) -> (WebhookLayer<C, F>, BackgroundWorker) {
84 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
85 let layer = WebhookLayer {
86 target_filters,
87 message_filters,
88 field_exclusion_filters,
89 event_by_field_filters,
90 level_filter,
91 app_name,
92 config,
93 factory: Default::default(),
94 sender: tx.clone(),
95 };
96 let background_worker = BackgroundWorker {
97 sender: tx,
98 handle: Arc::new(Mutex::new(None)),
99 rx: Arc::new(Mutex::new(rx)),
100 };
101 (layer, background_worker)
102 }
103
104 pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
106 WebhookLayerBuilder::new(app_name, target_filters)
107 }
108}
109
110pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
118 factory: std::marker::PhantomData<F>,
119 app_name: String,
120 target_filters: EventFilters,
121 message_filters: Option<EventFilters>,
122 event_by_field_filters: Option<EventFilters>,
123 field_exclusion_filters: Option<Vec<Regex>>,
124 level_filters: Option<String>,
125 config: Option<C>,
126}
127
128impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
129 pub(crate) fn new(app_name: String, target_filters: EventFilters) -> Self {
130 Self {
131 factory: Default::default(),
132 app_name,
133 target_filters,
134 message_filters: None,
135 event_by_field_filters: None,
136 field_exclusion_filters: None,
137 level_filters: None,
138 config: None,
139 }
140 }
141
142 pub fn message_filters(mut self, filters: EventFilters) -> Self {
148 self.message_filters = Some(filters);
149 self
150 }
151
152 pub fn event_by_field_filters(mut self, filters: EventFilters) -> Self {
158 self.event_by_field_filters = Some(filters);
159 self
160 }
161
162 pub fn field_exclusion_filters(mut self, filters: Vec<Regex>) -> Self {
167 self.field_exclusion_filters = Some(filters);
168 self
169 }
170
171 pub fn config(mut self, config: C) -> Self {
173 self.config = Some(config);
174 self
175 }
176
177 pub fn level_filters(mut self, level_filters: String) -> Self {
179 self.level_filters = Some(level_filters);
180 self
181 }
182
183 pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
185 WebhookLayer::new(
186 self.app_name,
187 self.target_filters,
188 self.message_filters,
189 self.event_by_field_filters,
190 self.field_exclusion_filters,
191 self.level_filters,
192 self.config.unwrap_or_else(C::new_from_env),
193 )
194 }
195}
196
197impl<S, C, F> Layer<S> for WebhookLayer<C, F>
198where
199 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
200 C: Config+ 'static,
201 F: WebhookMessageFactory + 'static,
202{
203 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
204 let current_span = ctx.lookup_current();
205 let mut event_visitor = JsonStorage::default();
206 event.record(&mut event_visitor);
207
208 let format = || {
209 const KEYWORDS: [&str; 2] = ["message", "error"];
210
211 let target = event.metadata().target();
212 self.target_filters.process(target)?;
213
214 let message = event_visitor
216 .values()
217 .get("message")
218 .and_then(|v| match v {
219 Value::String(s) => Some(s.as_str()),
220 _ => None,
221 })
222 .or_else(|| {
223 event_visitor.values().get("error").and_then(|v| match v {
224 Value::String(s) => Some(s.as_str()),
225 _ => None,
226 })
227 })
228 .unwrap_or("No message");
229
230 self.message_filters.process(message)?;
231 if let Some(level_filters) = &self.level_filter {
232 let message_level = {
233 LevelFilter::from_str(event.metadata().level().as_str())
234 .map_err(|e| FilterError::IoError(Box::new(e)))?
235 };
236 let level_threshold =
237 LevelFilter::from_str(level_filters).map_err(|e| FilterError::IoError(Box::new(e)))?;
238 if message_level > level_threshold {
239 return Err(FilterError::PositiveFilterFailed);
240 }
241 }
242
243 let mut metadata_buffer = Vec::new();
244 let mut serializer = serde_json::Serializer::new(&mut metadata_buffer);
245 let mut map_serializer = serializer.serialize_map(None)?;
246 for (key, value) in event_visitor
249 .values()
250 .iter()
251 .filter(|(&key, _)| !KEYWORDS.contains(&key))
252 .filter(|(&key, _)| self.field_exclusion_filters.process(key).is_ok())
253 {
254 self.event_by_field_filters.process(key)?;
255 map_serializer.serialize_entry(key, value)?;
256 }
257 if let Some(span) = ¤t_span {
259 let extensions = span.extensions();
260 if let Some(visitor) = extensions.get::<JsonStorage>() {
261 for (key, value) in visitor.values() {
262 map_serializer.serialize_entry(key, value)?;
263 }
264 }
265 }
266 map_serializer.end()?;
267
268 let span = match ¤t_span {
269 Some(span) => span.metadata().name(),
270 None => "",
271 };
272
273 let metadata = {
274 let data: HashMap<String, Value> = serde_json::from_slice(metadata_buffer.as_slice()).unwrap();
275 serde_json::to_string_pretty(&data).unwrap()
276 };
277
278 Ok(F::create(WebhookMessageInputs {
279 app_name: self.app_name.clone(),
280 message: message.to_string(),
281 event_level: *event.metadata().level(),
282 source_file: event.metadata().file().unwrap_or("Unknown").to_string(),
283 source_line: event.metadata().line().unwrap_or(0),
284 target: target.to_string(),
285 span: span.to_string(),
286 metadata,
287 webhook_url: self.config.webhook_url().to_string(),
288 }))
289 };
290
291 let result: Result<_, FilterError> = format();
292 if let Ok(formatted) = result {
293 if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
294 println!("failed to send webhook payload to given channel, err = {}", e)
295 };
296 }
297 }
298}