use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use rsigma_eval::EvaluationResult;
use rsigma_eval::pipeline::sources::ExtractExpr;
use serde_json::Value;
use super::{
EnrichError, EnrichErrorKind, Enricher, EnricherKind, OnError, Scope, inject_enrichment,
template::render_template,
};
use crate::sources::SourceCache;
use crate::sources::extract::apply_extract;
pub struct LookupEnricher {
id: String,
kind: EnricherKind,
inject_field: String,
source_id: String,
extract: Option<ExtractExpr>,
default: Option<Value>,
timeout: Duration,
on_error: OnError,
scope: Scope,
cache: Arc<SourceCache>,
}
impl LookupEnricher {
#[allow(clippy::too_many_arguments)]
pub fn new(
id: String,
kind: EnricherKind,
inject_field: String,
source_id: String,
extract: Option<ExtractExpr>,
default: Option<Value>,
timeout: Duration,
on_error: OnError,
scope: Scope,
cache: Arc<SourceCache>,
) -> Self {
Self {
id,
kind,
inject_field,
source_id,
extract,
default,
timeout,
on_error,
scope,
cache,
}
}
fn render_extract(&self, result: &EvaluationResult) -> Option<ExtractExpr> {
let original = self.extract.as_ref()?;
let (lang, raw) = match original {
ExtractExpr::Jq(s) => ("jq", s),
ExtractExpr::JsonPath(s) => ("jsonpath", s),
ExtractExpr::Cel(s) => ("cel", s),
};
let rendered = render_template(raw, result);
Some(match lang {
"jq" => ExtractExpr::Jq(rendered),
"jsonpath" => ExtractExpr::JsonPath(rendered),
"cel" => ExtractExpr::Cel(rendered),
_ => return None,
})
}
}
#[async_trait]
impl Enricher for LookupEnricher {
fn kind(&self) -> EnricherKind {
self.kind
}
fn id(&self) -> &str {
&self.id
}
fn inject_field(&self) -> &str {
&self.inject_field
}
fn timeout(&self) -> Duration {
self.timeout
}
fn scope(&self) -> &Scope {
&self.scope
}
fn on_error(&self) -> OnError {
self.on_error
}
async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError> {
let cached = self.cache.get(&self.source_id);
let cached_value = match cached {
Some(v) => v,
None => {
if let Some(d) = &self.default {
inject_enrichment(result, &self.inject_field, d.clone());
return Ok(());
}
return Err(EnrichError {
enricher_id: self.id.clone(),
kind: EnrichErrorKind::Fetch(format!(
"cache miss for source '{}'",
self.source_id
)),
});
}
};
let extracted = match self.render_extract(result) {
None => cached_value.clone(),
Some(expr) => apply_extract(&cached_value, &expr).map_err(|e| EnrichError {
enricher_id: self.id.clone(),
kind: EnrichErrorKind::Extract(format!("{}", e.kind)),
})?,
};
if extracted.is_null() {
if let Some(d) = &self.default {
inject_enrichment(result, &self.inject_field, d.clone());
return Ok(());
}
return Err(EnrichError {
enricher_id: self.id.clone(),
kind: EnrichErrorKind::Fetch(format!(
"no extract match for source '{}'",
self.source_id
)),
});
}
inject_enrichment(result, &self.inject_field, extracted);
Ok(())
}
}