dora_operator_api/
lib.rs

1//! The operator API is a framework to implement dora operators.
2//! The implemented operator will be managed by `dora`.
3//!
4//! This framework enable us to make optimisation and provide advanced features.
5//! It is the recommended way of using `dora`.
6//!
7//! An operator requires to be registered and implement the `DoraOperator` trait.
8//! It is composed of an `on_event` method that defines the behaviour
9//! of the operator when there is an event such as receiving an input for example.
10//!
11//! Try it out with:
12//!
13//! ```bash
14//! dora new op --kind operator
15//! ```
16//!
17
18#![warn(unsafe_op_in_unsafe_fn)]
19#![allow(clippy::missing_safety_doc)]
20
21pub use dora_arrow_convert::*;
22pub use dora_operator_api_macros::register_operator;
23pub use dora_operator_api_types as types;
24pub use types::DoraStatus;
25use types::{
26    Metadata, Output, SendOutput,
27    arrow::{self, array::Array},
28};
29
30pub mod raw;
31
32#[derive(Debug)]
33#[non_exhaustive]
34pub enum Event<'a> {
35    Input { id: &'a str, data: ArrowData },
36    InputParseError { id: &'a str, error: String },
37    InputClosed { id: &'a str },
38    Stop,
39}
40
41pub trait DoraOperator: Default {
42    #[allow(clippy::result_unit_err)] // we use a () error type only for testing
43    fn on_event(
44        &mut self,
45        event: &Event,
46        output_sender: &mut DoraOutputSender,
47    ) -> Result<DoraStatus, String>;
48}
49
50pub struct DoraOutputSender<'a>(&'a SendOutput);
51
52impl DoraOutputSender<'_> {
53    ///  Send an output from the operator:
54    ///  - `id` is the `output_id` as defined in your dataflow.
55    ///  - `data` is the data that should be sent
56    pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
57        let (data_array, schema) =
58            arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
59        let result = self.0.send_output.call(Output {
60            id: id.into(),
61            data_array,
62            schema,
63            metadata: Metadata {
64                open_telemetry_context: String::new().into(), // TODO
65            },
66        });
67        result.into_result()
68    }
69}