fiddler 4.9.1

Data Stream processor written in rust
Documentation
use crate::config::register_plugin;
use crate::config::{parse_configuration_item, Item, ItemType};
use crate::config::{ConfigSpec, ExecutionType};
use crate::Message;
use crate::MessageBatch;
use crate::{Closer, Error, Processor};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;

#[derive(Deserialize, Serialize)]
struct CheckConfig {
    label: Option<String>,
    condition: String,
    processors: Vec<Item>,
}

pub struct Check {
    condition: String,
    processors: Vec<Box<dyn Processor + Send + Sync>>,
}

fn perform_check(condition: &str, json_str: String) -> Result<(), Error> {
    let mut runtime = jmespath::Runtime::new();
    runtime.register_builtin_functions();
    let expr = runtime
        .compile(condition)
        .map_err(|e| Error::ProcessingError(format!("{e}")))?;
    let data = jmespath::Variable::from_json(&json_str)
        .map_err(|e| Error::ProcessingError(format!("{e}")))?;

    let result = expr
        .search(data)
        .map_err(|e| Error::ProcessingError(format!("{e}")))?;

    // Explicitly check that result is a boolean type
    match result.as_boolean() {
        Some(true) => Ok(()),
        Some(false) => Err(Error::ConditionalCheckfailed),
        None => Err(Error::ProcessingError(format!(
            "Condition '{}' did not return a boolean value, got: {:?}",
            condition, result
        ))),
    }
}

#[async_trait]
impl Processor for Check {
    async fn process(&self, message: Message) -> Result<MessageBatch, Error> {
        let mut messages = vec![message.clone()];
        let json_str =
            String::from_utf8(message.bytes).map_err(|e| Error::ProcessingError(format!("{e}")))?;

        perform_check(&self.condition, json_str)?;

        for p in &self.processors {
            let mut new_messages = Vec::new();
            for m in messages.drain(..) {
                let processed = p.process(m).await?;
                new_messages.extend(processed);
            }
            messages = new_messages;
        }
        Ok(messages)
    }
}

#[async_trait]
impl Closer for Check {
    async fn close(&mut self) -> Result<(), Error> {
        for p in &mut self.processors {
            p.close().await?;
        }
        Ok(())
    }
}

#[fiddler_registration_func]
fn create_check(conf: Value) -> Result<ExecutionType, Error> {
    let c: CheckConfig = serde_yaml::from_value(conf.clone())?;
    let _ = jmespath::compile(&c.condition)
        .map_err(|e| Error::ConfigFailedValidation(format!("{e}")))?;

    let mut steps = Vec::new();
    for p in c.processors {
        let ri = parse_configuration_item(ItemType::Processor, &p.extra).await?;
        let proc = ((ri.creator)(ri.config.clone())).await?;
        if let ExecutionType::Processor(rp) = proc {
            steps.push(rp);
        };
    }

    let s = Check {
        condition: c.condition,
        processors: steps,
    };

    Ok(ExecutionType::Processor(Box::new(s)))
}

pub(super) fn register_check() -> Result<(), Error> {
    let config = "type: object
properties:
  label:
    type: string
  condition:
    type: string
  processors:
    type: array";
    let conf_spec = ConfigSpec::from_schema(config)?;

    register_plugin("check".into(), ItemType::Processor, conf_spec, create_check)
}