1#![warn(unsafe_op_in_unsafe_fn)]
19#![allow(clippy::missing_safety_doc)]
20
21pub use dora_arrow_convert::*;
22pub use dora_operator_api_macros::register_operator;
23pub use dora_operator_api_types as types;
24pub use types::DoraStatus;
25use types::{
26 Metadata, Output, SendOutput,
27 arrow::{self, array::Array},
28};
29
30pub mod raw;
31
32#[derive(Debug)]
33#[non_exhaustive]
34pub enum Event<'a> {
35 Input { id: &'a str, data: ArrowData },
36 InputParseError { id: &'a str, error: String },
37 InputClosed { id: &'a str },
38 Stop,
39}
40
41pub trait DoraOperator: Default {
42 #[allow(clippy::result_unit_err)] fn on_event(
44 &mut self,
45 event: &Event,
46 output_sender: &mut DoraOutputSender,
47 ) -> Result<DoraStatus, String>;
48}
49
50pub struct DoraOutputSender<'a> {
51 send_output: &'a SendOutput,
52 open_telemetry_context: String,
53}
54
55impl DoraOutputSender<'_> {
56 pub(crate) fn new(send_output: &SendOutput) -> DoraOutputSender<'_> {
57 DoraOutputSender {
58 send_output,
59 open_telemetry_context: String::new(),
60 }
61 }
62
63 pub(crate) fn set_open_telemetry_context(&mut self, open_telemetry_context: &str) {
64 self.open_telemetry_context = open_telemetry_context.to_owned();
65 }
66
67 pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
71 let (data_array, schema) =
72 arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
73 let result = self.send_output.send_output.call(Output {
74 id: id.into(),
75 data_array,
76 schema,
77 metadata: Metadata {
78 open_telemetry_context: self.open_telemetry_context.clone().into(),
79 },
80 });
81 result.into_result()
82 }
83}