fiddler 4.9.1

Data Stream processor written in rust
Documentation
use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::Message;
use crate::MessageBatch;
use crate::{Closer, Error, Processor};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use serde_yaml::Value;

#[derive(Clone)]
pub struct Lines {}

#[async_trait]
impl Processor for Lines {
    async fn process(&self, message: Message) -> Result<MessageBatch, Error> {
        let content = String::from_utf8(message.bytes)
            .map_err(|e| Error::ProcessingError(format!("{}", e)))?;
        let output: Vec<Message> = content
            .split('\n')
            .map(|msg| Message {
                bytes: msg.as_bytes().into(),
                metadata: message.metadata.clone(),
                ..Default::default()
            })
            .collect();
        Ok(output)
    }
}

impl Closer for Lines {}

#[fiddler_registration_func]
fn create_lines(_conf: Value) -> Result<ExecutionType, Error> {
    Ok(ExecutionType::Processor(Box::new(Lines {})))
}

pub(super) fn register_lines() -> Result<(), Error> {
    let config = "type: object";
    let conf_spec = ConfigSpec::from_schema(config)?;

    register_plugin("lines".into(), ItemType::Processor, conf_spec, create_lines)
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn register_plugin() {
        register_lines().unwrap()
    }

    #[tokio::test]
    async fn split_lines() {
        let input_str = "Hello
world";
        let msg = Message {
            bytes: input_str.as_bytes().into(),
            ..Default::default()
        };
        let processor = Lines {};
        let output = processor.process(msg).await.unwrap();
        assert_eq!(
            output,
            vec![
                Message {
                    bytes: "Hello".as_bytes().into(),
                    ..Default::default()
                },
                Message {
                    bytes: "world".as_bytes().into(),
                    ..Default::default()
                }
            ]
        )
    }
}