dora-operator-api 0.5.0

`dora` goal is to be a low latency, composable, and distributed data flow.
Documentation
//! The operator API is a framework to implement dora operators.
//! The implemented operator will be managed by `dora`.
//!
//! This framework enable us to make optimisation and provide advanced features.
//! It is the recommended way of using `dora`.
//!
//! An operator requires to be registered and implement the `DoraOperator` trait.
//! It is composed of an `on_event` method that defines the behaviour
//! of the operator when there is an event such as receiving an input for example.
//!
//! Try it out with:
//!
//! ```bash
//! dora new op --kind operator
//! ```
//!

#![warn(unsafe_op_in_unsafe_fn)]
#![allow(clippy::missing_safety_doc)]

pub use dora_arrow_convert::*;
pub use dora_operator_api_macros::register_operator;
pub use dora_operator_api_types as types;
pub use types::DoraStatus;
use types::{
    Metadata, Output, SendOutput,
    arrow::{self, array::Array},
};

pub mod raw;

#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
    Input { id: &'a str, data: ArrowData },
    InputParseError { id: &'a str, error: String },
    InputClosed { id: &'a str },
    Stop,
}

pub trait DoraOperator: Default {
    #[allow(clippy::result_unit_err)] // we use a () error type only for testing
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String>;
}

pub struct DoraOutputSender<'a> {
    send_output: &'a SendOutput,
    open_telemetry_context: String,
}

impl DoraOutputSender<'_> {
    pub(crate) fn new(send_output: &SendOutput) -> DoraOutputSender<'_> {
        DoraOutputSender {
            send_output,
            open_telemetry_context: String::new(),
        }
    }

    pub(crate) fn set_open_telemetry_context(&mut self, open_telemetry_context: &str) {
        self.open_telemetry_context = open_telemetry_context.to_owned();
    }

    ///  Send an output from the operator:
    ///  - `id` is the `output_id` as defined in your dataflow.
    ///  - `data` is the data that should be sent
    pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
        let (data_array, schema) =
            arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
        let result = self.send_output.send_output.call(Output {
            id: id.into(),
            data_array,
            schema,
            metadata: Metadata {
                open_telemetry_context: self.open_telemetry_context.clone().into(),
            },
        });
        result.into_result()
    }
}