1use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
2use dora_operator_api_types::{
3 DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, arrow,
4};
5use std::ffi::c_void;
6
7pub type OutputFnRaw = unsafe extern "C" fn(
8 id_start: *const u8,
9 id_len: usize,
10 data_start: *const u8,
11 data_len: usize,
12 output_context: *const c_void,
13) -> isize;
14
15pub unsafe fn dora_init_operator<O: DoraOperator>() -> DoraInitResult {
16 let operator: O = Default::default();
17 let ptr: *mut O = Box::leak(Box::new(operator));
18 let operator_context: *mut c_void = ptr.cast();
19 DoraInitResult {
20 result: DoraResult { error: None },
21 operator_context,
22 }
23}
24
25pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
26 let raw: *mut O = operator_context.cast();
27 drop(unsafe { Box::from_raw(raw) });
28 DoraResult { error: None }
29}
30
31pub unsafe fn dora_on_event<O: DoraOperator>(
32 event: &mut RawEvent,
33 send_output: &SendOutput,
34 operator_context: *mut std::ffi::c_void,
35) -> OnEventResult {
36 let mut output_sender = DoraOutputSender::new(send_output);
37
38 let operator: &mut O = unsafe { &mut *operator_context.cast() };
39
40 let event_variant = if let Some(input) = &mut event.input {
41 output_sender.set_open_telemetry_context(&input.metadata.open_telemetry_context);
42 let Some(data_array) = input.data_array.take() else {
43 return OnEventResult {
44 result: DoraResult::from_error("data already taken".to_string()),
45 status: DoraStatus::Continue,
46 };
47 };
48 let data = unsafe { arrow::ffi::from_ffi(data_array, &input.schema) };
49
50 match data {
51 Ok(data) => Event::Input {
52 id: &input.id,
53 data: arrow::array::make_array(data).into(),
54 },
55 Err(err) => Event::InputParseError {
56 id: &input.id,
57 error: format!("{err}"),
58 },
59 }
60 } else if let Some(input_id) = &event.input_closed {
61 Event::InputClosed { id: input_id }
62 } else if event.stop {
63 Event::Stop
64 } else {
65 return OnEventResult {
67 result: DoraResult { error: None },
68 status: DoraStatus::Continue,
69 };
70 };
71 match operator.on_event(&event_variant, &mut output_sender) {
72 Ok(status) => OnEventResult {
73 result: DoraResult { error: None },
74 status,
75 },
76 Err(error) => OnEventResult {
77 result: DoraResult::from_error(error),
78 status: DoraStatus::Stop,
79 },
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::dora_on_event;
86 use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
87 use dora_operator_api_types::{
88 DoraResult, Input, Metadata, OnEventResult, Output, RawEvent, SendOutput,
89 arrow::array::{Array, UInt8Array},
90 safer_ffi::closure::ArcDynFn1,
91 };
92 use std::sync::{Arc, Mutex};
93
94 #[derive(Default)]
95 struct EchoOperator;
96
97 impl DoraOperator for EchoOperator {
98 fn on_event(
99 &mut self,
100 event: &Event,
101 output_sender: &mut DoraOutputSender,
102 ) -> Result<DoraStatus, String> {
103 if let Event::Input { .. } = event {
104 output_sender.send("out".to_string(), UInt8Array::from(vec![1_u8, 2_u8, 3_u8]))?;
105 }
106 Ok(DoraStatus::Continue)
107 }
108 }
109
110 #[test]
111 fn forwards_open_telemetry_context_to_output_metadata() {
112 let received_context = Arc::new(Mutex::new(String::new()));
113 let context_target = Arc::clone(&received_context);
114 let send_output = SendOutput {
115 send_output: ArcDynFn1::new(Arc::new(move |output: Output| {
116 *context_target.lock().expect("poisoned mutex") =
117 output.metadata.open_telemetry_context.to_string();
118 DoraResult::SUCCESS
119 })),
120 };
121
122 let input_array = UInt8Array::from(vec![1_u8, 2_u8]);
123 let (data_array, schema) =
124 dora_operator_api_types::arrow::ffi::to_ffi(&input_array.to_data())
125 .expect("failed to convert input to FFI");
126 let mut event = RawEvent {
127 input: Some(
128 Box::new(Input {
129 id: "in".to_string().into(),
130 data_array: Some(data_array),
131 schema,
132 metadata: Metadata {
133 open_telemetry_context: "traceparent-context".to_string().into(),
134 },
135 })
136 .into(),
137 ),
138 input_closed: None,
139 stop: false,
140 error: None,
141 };
142
143 let operator_context: *mut std::ffi::c_void = Box::into_raw(Box::new(EchoOperator)).cast();
144 let OnEventResult { result, status } =
145 unsafe { dora_on_event::<EchoOperator>(&mut event, &send_output, operator_context) };
146 unsafe { drop(Box::from_raw(operator_context.cast::<EchoOperator>())) };
147
148 assert!(result.error.is_none());
149 assert_eq!(status as u8, DoraStatus::Continue as u8);
150 assert_eq!(
151 *received_context.lock().expect("poisoned mutex"),
152 "traceparent-context"
153 );
154 }
155}