Skip to main content

dora_operator_api/
raw.rs

1use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
2use dora_operator_api_types::{
3    DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, arrow,
4};
5use std::ffi::c_void;
6
7pub type OutputFnRaw = unsafe extern "C" fn(
8    id_start: *const u8,
9    id_len: usize,
10    data_start: *const u8,
11    data_len: usize,
12    output_context: *const c_void,
13) -> isize;
14
15pub unsafe fn dora_init_operator<O: DoraOperator>() -> DoraInitResult {
16    let operator: O = Default::default();
17    let ptr: *mut O = Box::leak(Box::new(operator));
18    let operator_context: *mut c_void = ptr.cast();
19    DoraInitResult {
20        result: DoraResult { error: None },
21        operator_context,
22    }
23}
24
25pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
26    let raw: *mut O = operator_context.cast();
27    drop(unsafe { Box::from_raw(raw) });
28    DoraResult { error: None }
29}
30
31pub unsafe fn dora_on_event<O: DoraOperator>(
32    event: &mut RawEvent,
33    send_output: &SendOutput,
34    operator_context: *mut std::ffi::c_void,
35) -> OnEventResult {
36    let mut output_sender = DoraOutputSender::new(send_output);
37
38    let operator: &mut O = unsafe { &mut *operator_context.cast() };
39
40    let event_variant = if let Some(input) = &mut event.input {
41        output_sender.set_open_telemetry_context(&input.metadata.open_telemetry_context);
42        let Some(data_array) = input.data_array.take() else {
43            return OnEventResult {
44                result: DoraResult::from_error("data already taken".to_string()),
45                status: DoraStatus::Continue,
46            };
47        };
48        let data = unsafe { arrow::ffi::from_ffi(data_array, &input.schema) };
49
50        match data {
51            Ok(data) => Event::Input {
52                id: &input.id,
53                data: arrow::array::make_array(data).into(),
54            },
55            Err(err) => Event::InputParseError {
56                id: &input.id,
57                error: format!("{err}"),
58            },
59        }
60    } else if let Some(input_id) = &event.input_closed {
61        Event::InputClosed { id: input_id }
62    } else if event.stop {
63        Event::Stop
64    } else {
65        // ignore unknown events
66        return OnEventResult {
67            result: DoraResult { error: None },
68            status: DoraStatus::Continue,
69        };
70    };
71    match operator.on_event(&event_variant, &mut output_sender) {
72        Ok(status) => OnEventResult {
73            result: DoraResult { error: None },
74            status,
75        },
76        Err(error) => OnEventResult {
77            result: DoraResult::from_error(error),
78            status: DoraStatus::Stop,
79        },
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::dora_on_event;
86    use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
87    use dora_operator_api_types::{
88        DoraResult, Input, Metadata, OnEventResult, Output, RawEvent, SendOutput,
89        arrow::array::{Array, UInt8Array},
90        safer_ffi::closure::ArcDynFn1,
91    };
92    use std::sync::{Arc, Mutex};
93
94    #[derive(Default)]
95    struct EchoOperator;
96
97    impl DoraOperator for EchoOperator {
98        fn on_event(
99            &mut self,
100            event: &Event,
101            output_sender: &mut DoraOutputSender,
102        ) -> Result<DoraStatus, String> {
103            if let Event::Input { .. } = event {
104                output_sender.send("out".to_string(), UInt8Array::from(vec![1_u8, 2_u8, 3_u8]))?;
105            }
106            Ok(DoraStatus::Continue)
107        }
108    }
109
110    #[test]
111    fn forwards_open_telemetry_context_to_output_metadata() {
112        let received_context = Arc::new(Mutex::new(String::new()));
113        let context_target = Arc::clone(&received_context);
114        let send_output = SendOutput {
115            send_output: ArcDynFn1::new(Arc::new(move |output: Output| {
116                *context_target.lock().expect("poisoned mutex") =
117                    output.metadata.open_telemetry_context.to_string();
118                DoraResult::SUCCESS
119            })),
120        };
121
122        let input_array = UInt8Array::from(vec![1_u8, 2_u8]);
123        let (data_array, schema) =
124            dora_operator_api_types::arrow::ffi::to_ffi(&input_array.to_data())
125                .expect("failed to convert input to FFI");
126        let mut event = RawEvent {
127            input: Some(
128                Box::new(Input {
129                    id: "in".to_string().into(),
130                    data_array: Some(data_array),
131                    schema,
132                    metadata: Metadata {
133                        open_telemetry_context: "traceparent-context".to_string().into(),
134                    },
135                })
136                .into(),
137            ),
138            input_closed: None,
139            stop: false,
140            error: None,
141        };
142
143        let operator_context: *mut std::ffi::c_void = Box::into_raw(Box::new(EchoOperator)).cast();
144        let OnEventResult { result, status } =
145            unsafe { dora_on_event::<EchoOperator>(&mut event, &send_output, operator_context) };
146        unsafe { drop(Box::from_raw(operator_context.cast::<EchoOperator>())) };
147
148        assert!(result.error.is_none());
149        assert_eq!(status as u8, DoraStatus::Continue as u8);
150        assert_eq!(
151            *received_context.lock().expect("poisoned mutex"),
152            "traceparent-context"
153        );
154    }
155}