1#![warn(unsafe_op_in_unsafe_fn)]
19#![allow(clippy::missing_safety_doc)]
20
21pub use dora_arrow_convert::*;
22pub use dora_operator_api_macros::register_operator;
23pub use dora_operator_api_types as types;
24pub use types::DoraStatus;
25use types::{
26 Metadata, Output, SendOutput,
27 arrow::{self, array::Array},
28};
29
30pub mod raw;
31
32#[derive(Debug)]
33#[non_exhaustive]
34pub enum Event<'a> {
35 Input { id: &'a str, data: ArrowData },
36 InputParseError { id: &'a str, error: String },
37 InputClosed { id: &'a str },
38 Stop,
39}
40
41pub trait DoraOperator: Default {
42 #[allow(clippy::result_unit_err)] fn on_event(
44 &mut self,
45 event: &Event,
46 output_sender: &mut DoraOutputSender,
47 ) -> Result<DoraStatus, String>;
48}
49
50pub struct DoraOutputSender<'a>(&'a SendOutput);
51
52impl DoraOutputSender<'_> {
53 pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
57 let (data_array, schema) =
58 arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
59 let result = self.0.send_output.call(Output {
60 id: id.into(),
61 data_array,
62 schema,
63 metadata: Metadata {
64 open_telemetry_context: String::new().into(), },
66 });
67 result.into_result()
68 }
69}