1use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
2use dora_operator_api_types::{
3 DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, arrow,
4};
5use std::ffi::c_void;
6
7pub type OutputFnRaw = unsafe extern "C" fn(
8 id_start: *const u8,
9 id_len: usize,
10 data_start: *const u8,
11 data_len: usize,
12 output_context: *const c_void,
13) -> isize;
14
15pub unsafe fn dora_init_operator<O: DoraOperator>() -> DoraInitResult {
16 let operator: O = Default::default();
17 let ptr: *mut O = Box::leak(Box::new(operator));
18 let operator_context: *mut c_void = ptr.cast();
19 DoraInitResult {
20 result: DoraResult { error: None },
21 operator_context,
22 }
23}
24
25pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
26 let raw: *mut O = operator_context.cast();
27 drop(unsafe { Box::from_raw(raw) });
28 DoraResult { error: None }
29}
30
31pub unsafe fn dora_on_event<O: DoraOperator>(
32 event: &mut RawEvent,
33 send_output: &SendOutput,
34 operator_context: *mut std::ffi::c_void,
35) -> OnEventResult {
36 let mut output_sender = DoraOutputSender(send_output);
37
38 let operator: &mut O = unsafe { &mut *operator_context.cast() };
39
40 let event_variant = if let Some(input) = &mut event.input {
41 let Some(data_array) = input.data_array.take() else {
42 return OnEventResult {
43 result: DoraResult::from_error("data already taken".to_string()),
44 status: DoraStatus::Continue,
45 };
46 };
47 let data = unsafe { arrow::ffi::from_ffi(data_array, &input.schema) };
48
49 match data {
50 Ok(data) => Event::Input {
51 id: &input.id,
52 data: arrow::array::make_array(data).into(),
53 },
54 Err(err) => Event::InputParseError {
55 id: &input.id,
56 error: format!("{err}"),
57 },
58 }
59 } else if let Some(input_id) = &event.input_closed {
60 Event::InputClosed { id: input_id }
61 } else if event.stop {
62 Event::Stop
63 } else {
64 return OnEventResult {
66 result: DoraResult { error: None },
67 status: DoraStatus::Continue,
68 };
69 };
70 match operator.on_event(&event_variant, &mut output_sender) {
71 Ok(status) => OnEventResult {
72 result: DoraResult { error: None },
73 status,
74 },
75 Err(error) => OnEventResult {
76 result: DoraResult::from_error(error),
77 status: DoraStatus::Stop,
78 },
79 }
80}