use async_trait::async_trait;
use crate::error::KumoError;
use super::Pipeline;
pub struct RequireFields {
fields: Vec<String>,
}
impl RequireFields {
pub fn new(fields: &[&str]) -> Self {
Self {
fields: fields.iter().map(|s| s.to_string()).collect(),
}
}
}
#[async_trait]
impl Pipeline for RequireFields {
async fn process(
&self,
item: serde_json::Value,
) -> Result<Option<serde_json::Value>, KumoError> {
for field in &self.fields {
if item.get(field).is_none() {
tracing::debug!(missing_field = %field, "item.drop.missing_field");
return Ok(None);
}
}
Ok(Some(item))
}
}
pub struct FilterPipeline<F>
where
F: Fn(&serde_json::Value) -> bool + Send + Sync,
{
predicate: F,
}
impl<F> FilterPipeline<F>
where
F: Fn(&serde_json::Value) -> bool + Send + Sync,
{
pub fn new(predicate: F) -> Self {
Self { predicate }
}
}
#[async_trait]
impl<F> Pipeline for FilterPipeline<F>
where
F: Fn(&serde_json::Value) -> bool + Send + Sync,
{
async fn process(
&self,
item: serde_json::Value,
) -> Result<Option<serde_json::Value>, KumoError> {
if (self.predicate)(&item) {
Ok(Some(item))
} else {
tracing::debug!("item.drop.filter");
Ok(None)
}
}
}