Skip to main content

dora_operator_api/
lib.rs

1//! The operator API is a framework to implement dora operators.
2//! The implemented operator will be managed by `dora`.
3//!
4//! This framework enable us to make optimisation and provide advanced features.
5//! It is the recommended way of using `dora`.
6//!
7//! An operator requires to be registered and implement the `DoraOperator` trait.
8//! It is composed of an `on_event` method that defines the behaviour
9//! of the operator when there is an event such as receiving an input for example.
10//!
11//! Try it out with:
12//!
13//! ```bash
14//! dora new op --kind operator
15//! ```
16//!
17
18#![warn(unsafe_op_in_unsafe_fn)]
19#![allow(clippy::missing_safety_doc)]
20
21pub use dora_arrow_convert::*;
22pub use dora_operator_api_macros::register_operator;
23pub use dora_operator_api_types as types;
24pub use types::DoraStatus;
25use types::{
26    Metadata, Output, SendOutput,
27    arrow::{self, array::Array},
28};
29
30pub mod raw;
31
32#[derive(Debug)]
33#[non_exhaustive]
34pub enum Event<'a> {
35    Input { id: &'a str, data: ArrowData },
36    InputParseError { id: &'a str, error: String },
37    InputClosed { id: &'a str },
38    Stop,
39}
40
41pub trait DoraOperator: Default {
42    #[allow(clippy::result_unit_err)] // we use a () error type only for testing
43    fn on_event(
44        &mut self,
45        event: &Event,
46        output_sender: &mut DoraOutputSender,
47    ) -> Result<DoraStatus, String>;
48}
49
50pub struct DoraOutputSender<'a> {
51    send_output: &'a SendOutput,
52    open_telemetry_context: String,
53}
54
55impl DoraOutputSender<'_> {
56    pub(crate) fn new(send_output: &SendOutput) -> DoraOutputSender<'_> {
57        DoraOutputSender {
58            send_output,
59            open_telemetry_context: String::new(),
60        }
61    }
62
63    pub(crate) fn set_open_telemetry_context(&mut self, open_telemetry_context: &str) {
64        self.open_telemetry_context = open_telemetry_context.to_owned();
65    }
66
67    ///  Send an output from the operator:
68    ///  - `id` is the `output_id` as defined in your dataflow.
69    ///  - `data` is the data that should be sent
70    pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
71        let (data_array, schema) =
72            arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
73        let result = self.send_output.send_output.call(Output {
74            id: id.into(),
75            data_array,
76            schema,
77            metadata: Metadata {
78                open_telemetry_context: self.open_telemetry_context.clone().into(),
79            },
80        });
81        result.into_result()
82    }
83}