#![warn(unsafe_op_in_unsafe_fn)]
#![allow(clippy::missing_safety_doc)]
pub use dora_arrow_convert::*;
pub use dora_operator_api_macros::register_operator;
pub use dora_operator_api_types as types;
pub use types::DoraStatus;
use types::{
Metadata, Output, SendOutput,
arrow::{self, array::Array},
};
pub mod raw;
#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
Input { id: &'a str, data: ArrowData },
InputParseError { id: &'a str, error: String },
InputClosed { id: &'a str },
Stop,
}
pub trait DoraOperator: Default {
#[allow(clippy::result_unit_err)] fn on_event(
&mut self,
event: &Event,
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, String>;
}
pub struct DoraOutputSender<'a> {
send_output: &'a SendOutput,
open_telemetry_context: String,
}
impl DoraOutputSender<'_> {
pub(crate) fn new(send_output: &SendOutput) -> DoraOutputSender<'_> {
DoraOutputSender {
send_output,
open_telemetry_context: String::new(),
}
}
pub(crate) fn set_open_telemetry_context(&mut self, open_telemetry_context: &str) {
self.open_telemetry_context = open_telemetry_context.to_owned();
}
pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
let (data_array, schema) =
arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
let result = self.send_output.send_output.call(Output {
id: id.into(),
data_array,
schema,
metadata: Metadata {
open_telemetry_context: self.open_telemetry_context.clone().into(),
},
});
result.into_result()
}
}