dora_operator_api/
raw.rs

1use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
2use dora_operator_api_types::{
3    DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, arrow,
4};
5use std::ffi::c_void;
6
7pub type OutputFnRaw = unsafe extern "C" fn(
8    id_start: *const u8,
9    id_len: usize,
10    data_start: *const u8,
11    data_len: usize,
12    output_context: *const c_void,
13) -> isize;
14
15pub unsafe fn dora_init_operator<O: DoraOperator>() -> DoraInitResult {
16    let operator: O = Default::default();
17    let ptr: *mut O = Box::leak(Box::new(operator));
18    let operator_context: *mut c_void = ptr.cast();
19    DoraInitResult {
20        result: DoraResult { error: None },
21        operator_context,
22    }
23}
24
25pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
26    let raw: *mut O = operator_context.cast();
27    drop(unsafe { Box::from_raw(raw) });
28    DoraResult { error: None }
29}
30
31pub unsafe fn dora_on_event<O: DoraOperator>(
32    event: &mut RawEvent,
33    send_output: &SendOutput,
34    operator_context: *mut std::ffi::c_void,
35) -> OnEventResult {
36    let mut output_sender = DoraOutputSender(send_output);
37
38    let operator: &mut O = unsafe { &mut *operator_context.cast() };
39
40    let event_variant = if let Some(input) = &mut event.input {
41        let Some(data_array) = input.data_array.take() else {
42            return OnEventResult {
43                result: DoraResult::from_error("data already taken".to_string()),
44                status: DoraStatus::Continue,
45            };
46        };
47        let data = unsafe { arrow::ffi::from_ffi(data_array, &input.schema) };
48
49        match data {
50            Ok(data) => Event::Input {
51                id: &input.id,
52                data: arrow::array::make_array(data).into(),
53            },
54            Err(err) => Event::InputParseError {
55                id: &input.id,
56                error: format!("{err}"),
57            },
58        }
59    } else if let Some(input_id) = &event.input_closed {
60        Event::InputClosed { id: input_id }
61    } else if event.stop {
62        Event::Stop
63    } else {
64        // ignore unknown events
65        return OnEventResult {
66            result: DoraResult { error: None },
67            status: DoraStatus::Continue,
68        };
69    };
70    match operator.on_event(&event_variant, &mut output_sender) {
71        Ok(status) => OnEventResult {
72            result: DoraResult { error: None },
73            status,
74        },
75        Err(error) => OnEventResult {
76            result: DoraResult::from_error(error),
77            status: DoraStatus::Stop,
78        },
79    }
80}