use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use regex::Regex;
use serde::ser::SerializeMap;
use serde::Serializer;
use serde_json::Value;
use tokio::sync::Mutex;
use tracing::log::LevelFilter;
use tracing::{Event, Subscriber};
use tracing_bunyan_formatter::JsonStorage;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use crate::filters::{Filter, FilterError};
use crate::{
BackgroundWorker, ChannelSender, Config, EventFilters, WebhookMessageFactory, WebhookMessageInputs, WorkerMessage,
};
pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
target_filters: EventFilters,
message_filters: Option<EventFilters>,
event_by_field_filters: Option<EventFilters>,
field_exclusion_filters: Option<Vec<Regex>>,
level_filter: Option<String>,
app_name: String,
config: C,
factory: std::marker::PhantomData<F>,
sender: ChannelSender,
}
impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
pub(crate) fn new(
app_name: String,
target_filters: EventFilters,
message_filters: Option<EventFilters>,
event_by_field_filters: Option<EventFilters>,
field_exclusion_filters: Option<Vec<Regex>>,
level_filter: Option<String>,
config: C,
) -> (WebhookLayer<C, F>, BackgroundWorker) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let layer = WebhookLayer {
target_filters,
message_filters,
field_exclusion_filters,
event_by_field_filters,
level_filter,
app_name,
config,
factory: Default::default(),
sender: tx.clone(),
};
let background_worker = BackgroundWorker {
sender: tx,
handle: Arc::new(Mutex::new(None)),
rx: Arc::new(Mutex::new(rx)),
};
(layer, background_worker)
}
pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
WebhookLayerBuilder::new(app_name, target_filters)
}
}
pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
factory: std::marker::PhantomData<F>,
app_name: String,
target_filters: EventFilters,
message_filters: Option<EventFilters>,
event_by_field_filters: Option<EventFilters>,
field_exclusion_filters: Option<Vec<Regex>>,
level_filters: Option<String>,
config: Option<C>,
}
impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
pub(crate) fn new(app_name: String, target_filters: EventFilters) -> Self {
Self {
factory: Default::default(),
app_name,
target_filters,
message_filters: None,
event_by_field_filters: None,
field_exclusion_filters: None,
level_filters: None,
config: None,
}
}
pub fn message_filters(mut self, filters: EventFilters) -> Self {
self.message_filters = Some(filters);
self
}
pub fn event_by_field_filters(mut self, filters: EventFilters) -> Self {
self.event_by_field_filters = Some(filters);
self
}
pub fn field_exclusion_filters(mut self, filters: Vec<Regex>) -> Self {
self.field_exclusion_filters = Some(filters);
self
}
pub fn config(mut self, config: C) -> Self {
self.config = Some(config);
self
}
pub fn level_filters(mut self, level_filters: String) -> Self {
self.level_filters = Some(level_filters);
self
}
pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
WebhookLayer::new(
self.app_name,
self.target_filters,
self.message_filters,
self.event_by_field_filters,
self.field_exclusion_filters,
self.level_filters,
self.config.unwrap_or_else(C::new_from_env),
)
}
}
impl<S, C, F> Layer<S> for WebhookLayer<C, F>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
C: Config + 'static,
F: WebhookMessageFactory + 'static,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let current_span = ctx.lookup_current();
let mut event_visitor = JsonStorage::default();
event.record(&mut event_visitor);
let format = || {
const KEYWORDS: [&str; 2] = ["message", "error"];
let target = event.metadata().target();
self.target_filters.process(target)?;
let message = event_visitor
.values()
.get("message")
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.or_else(|| {
event_visitor.values().get("error").and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
})
.unwrap_or("No message");
self.message_filters.process(message)?;
if let Some(level_filters) = &self.level_filter {
let message_level = {
LevelFilter::from_str(event.metadata().level().as_str())
.map_err(|e| FilterError::IoError(Box::new(e)))?
};
let level_threshold =
LevelFilter::from_str(level_filters).map_err(|e| FilterError::IoError(Box::new(e)))?;
if message_level > level_threshold {
return Err(FilterError::PositiveFilterFailed);
}
}
let mut metadata_buffer = Vec::new();
let mut serializer = serde_json::Serializer::new(&mut metadata_buffer);
let mut map_serializer = serializer.serialize_map(None)?;
for (key, value) in event_visitor
.values()
.iter()
.filter(|(&key, _)| !KEYWORDS.contains(&key))
.filter(|(&key, _)| self.field_exclusion_filters.process(key).is_ok())
{
self.event_by_field_filters.process(key)?;
map_serializer.serialize_entry(key, value)?;
}
if let Some(span) = ¤t_span {
let extensions = span.extensions();
if let Some(visitor) = extensions.get::<JsonStorage>() {
for (key, value) in visitor.values() {
map_serializer.serialize_entry(key, value)?;
}
}
}
map_serializer.end()?;
let span = match ¤t_span {
Some(span) => span.metadata().name(),
None => "",
};
let metadata = {
let data: HashMap<String, Value> = serde_json::from_slice(metadata_buffer.as_slice()).unwrap();
serde_json::to_string_pretty(&data).unwrap()
};
Ok(F::create(WebhookMessageInputs {
app_name: self.app_name.clone(),
message: message.to_string(),
event_level: *event.metadata().level(),
source_file: event.metadata().file().unwrap_or("Unknown").to_string(),
source_line: event.metadata().line().unwrap_or(0),
target: target.to_string(),
span: span.to_string(),
metadata,
webhook_url: self.config.webhook_url().to_string(),
}))
};
let result: Result<_, FilterError> = format();
if let Ok(formatted) = result {
if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
#[cfg(feature = "log-errors")]
eprintln!("ERROR: failed to send webhook payload to given channel, err = {}", e)
};
}
}
}