fiddler 4.9.1

Data Stream processor written in rust
Documentation
use std::ffi::CString;

use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::Message;
use crate::MessageBatch;
use crate::{Closer, Error, Processor};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyString};
use serde::{Deserialize, Serialize};
use serde_yaml::Value;

#[derive(Clone, Deserialize, Serialize)]
pub struct PyProcSpec {
    code: String,
    #[serde(rename = "string")]
    use_string: Option<bool>,
}

pub struct PyProc {
    code: String,
    use_string: bool,
}

#[async_trait]
impl Processor for PyProc {
    async fn process(&self, message: Message) -> Result<MessageBatch, Error> {
        Python::with_gil(|py| {
            let locals = PyDict::new(py);
            if self.use_string {
                let raw_string = String::from_utf8(message.bytes)
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?;
                let python_string = PyString::new(py, &raw_string);
                locals
                    .set_item("root", python_string)
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?;
            } else {
                let raw_bytes = message.bytes.as_slice();
                let python_bytes = PyBytes::new(py, raw_bytes);
                locals
                    .set_item("root", python_bytes)
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?;
            };

            let code = CString::new(self.code.clone())
                .map_err(|e| Error::ProcessingError(format!("{}", e)))?;

            py.run(&code, None, Some(&locals))
                .map_err(|e| Error::ProcessingError(format!("{}", e)))?;

            if self.use_string {
                let root: String = locals
                    .get_item("root")
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?
                    .ok_or(Error::ProcessingError("no root module found".into()))?
                    .extract()
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?;

                Ok(vec![Message {
                    bytes: root.as_bytes().into(),
                    metadata: message.metadata.clone(),
                    ..Default::default()
                }])
            } else {
                let root: Vec<u8> = locals
                    .get_item("root")
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?
                    .ok_or(Error::ProcessingError("no root module found".into()))?
                    .extract()
                    .map_err(|e| Error::ProcessingError(format!("{}", e)))?;

                Ok(vec![Message {
                    bytes: root,
                    metadata: message.metadata.clone(),
                    ..Default::default()
                }])
            }
        })
    }
}

impl Closer for PyProc {}

#[fiddler_registration_func]
fn create_python(conf: Value) -> Result<ExecutionType, Error> {
    let c: PyProcSpec = serde_yaml::from_value(conf.clone())?;
    let mut p = PyProc {
        code: c.code,
        use_string: false,
    };

    if let Some(b) = c.use_string {
        p.use_string = b;
    };

    Ok(ExecutionType::Processor(Box::new(p)))
}

pub(super) fn register_python() -> Result<(), Error> {
    let config = "type: object
properties:
  code: 
    type: string
  string:
    type: boolean
required:
  - code";
    let conf_spec = ConfigSpec::from_schema(config)?;

    register_plugin(
        "python".into(),
        ItemType::Processor,
        conf_spec,
        create_python,
    )
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn register_plugin() {
        register_python().unwrap()
    }
}