dora_node_api_c/lib.rs
1#![deny(unsafe_op_in_unsafe_fn)]
2
3use arrow_array::UInt8Array;
4use dora_node_api::{DoraNode, Event, EventStream, arrow::array::AsArray};
5use eyre::Context;
6use std::{ffi::c_void, ptr, slice};
7
8pub const HEADER_NODE_API: &str = include_str!("../node_api.h");
9
10struct DoraContext {
11 node: &'static mut DoraNode,
12 events: EventStream,
13}
14
15/// Initializes a dora context from the environment variables that were set by
16/// the dora-coordinator.
17///
18/// Returns a pointer to the dora context on success. This pointer can be
19/// used to call dora API functions that expect a `context` argument. Any
20/// other use is prohibited. To free the dora context when it is no longer
21/// needed, use the [`free_dora_context`] function.
22///
23/// On error, a null pointer is returned.
24#[unsafe(no_mangle)]
25pub extern "C" fn init_dora_context_from_env() -> *mut c_void {
26 let context = || {
27 let (node, events) = DoraNode::init_from_env()?;
28 let node = Box::leak(Box::new(node));
29 Result::<_, eyre::Report>::Ok(DoraContext { node, events })
30 };
31 let context = match context().context("failed to initialize node") {
32 Ok(n) => n,
33 Err(err) => {
34 let err: eyre::Error = err;
35 tracing::error!("{err:?}");
36 return ptr::null_mut();
37 }
38 };
39
40 Box::into_raw(Box::new(context)).cast()
41}
42
43/// Frees the given dora context.
44///
45/// ## Safety
46///
47/// Only pointers created through [`init_dora_context_from_env`] are allowed
48/// as arguments. Each context pointer must be freed exactly once. After
49/// freeing, the pointer must not be used anymore.
50#[unsafe(no_mangle)]
51pub unsafe extern "C" fn free_dora_context(context: *mut c_void) {
52 let context: Box<DoraContext> = unsafe { Box::from_raw(context.cast()) };
53 // drop all fields except for `node`
54 let DoraContext { node, .. } = *context;
55 // convert the `'static` reference back to a Box, then drop it
56 let _ = unsafe { Box::from_raw(node as *const DoraNode as *mut DoraNode) };
57}
58
59/// Waits for the next incoming event for the node.
60///
61/// Returns a pointer to the event on success. This pointer must not be used
62/// directly. Instead, use the `read_dora_event_*` functions to read out the
63/// type and payload of the event. When the event is not needed anymore, use
64/// [`free_dora_event`] to free it again.
65///
66/// Returns a null pointer when all event streams were closed. This means that
67/// no more event will be available. Nodes typically react by stopping.
68///
69/// ## Safety
70///
71/// The `context` argument must be a dora context created through
72/// [`init_dora_context_from_env`]. The context must be still valid, i.e., not
73/// freed yet.
74#[unsafe(no_mangle)]
75pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void {
76 let context: &mut DoraContext = unsafe { &mut *context.cast() };
77 match context.events.recv() {
78 Some(event) => Box::into_raw(Box::new(event)).cast(),
79 None => ptr::null_mut(),
80 }
81}
82
83/// Reads out the type of the given event.
84///
85/// ## Safety
86///
87/// The `event` argument must be a dora event received through
88/// [`dora_next_event`]. The event must be still valid, i.e., not
89/// freed yet.
90#[unsafe(no_mangle)]
91pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType {
92 let event: &Event = unsafe { &*event.cast() };
93 match event {
94 Event::Stop(_) => EventType::Stop,
95 Event::Input { .. } => EventType::Input,
96 Event::InputClosed { .. } => EventType::InputClosed,
97 Event::Error(_) => EventType::Error,
98 _ => EventType::Unknown,
99 }
100}
101
102#[repr(C)]
103pub enum EventType {
104 Stop,
105 Input,
106 InputClosed,
107 Error,
108 Unknown,
109}
110
111/// Reads out the ID of the given input event.
112///
113/// Writes the `out_ptr` and `out_len` with the start pointer and length of the
114/// ID string of the input. The ID is guaranteed to be valid UTF-8.
115///
116/// Writes a null pointer and length `0` if the given event is not an input event.
117///
118/// ## Safety
119///
120/// - The `event` argument must be a dora event received through
121/// [`dora_next_event`]. The event must be still valid, i.e., not
122/// freed yet. The returned `out_ptr` must not be used after
123/// freeing the `event`, since it points directly into the event's
124/// memory.
125///
126/// - Note: `Out_ptr` is not a null-terminated string. The length of the string
127/// is given by `out_len`.
128#[unsafe(no_mangle)]
129pub unsafe extern "C" fn read_dora_input_id(
130 event: *const (),
131 out_ptr: *mut *const u8,
132 out_len: *mut usize,
133) {
134 let event: &Event = unsafe { &*event.cast() };
135 match event {
136 Event::Input { id, .. } => {
137 let id = id.as_str().as_bytes();
138 let ptr = id.as_ptr();
139 let len = id.len();
140 unsafe {
141 *out_ptr = ptr;
142 *out_len = len;
143 }
144 }
145 _ => unsafe {
146 *out_ptr = ptr::null();
147 *out_len = 0;
148 },
149 }
150}
151
152/// Reads out the data of the given input event.
153///
154/// Writes the `out_ptr` and `out_len` with the start pointer and length of the
155/// input's data array. The data array is a raw byte array, whose format
156/// depends on the source operator/node.
157///
158/// Writes a null pointer and length `0` if the given event is not an input event
159/// or when an input event has no associated data.
160///
161/// ## Safety
162///
163/// The `event` argument must be a dora event received through
164/// [`dora_next_event`]. The event must be still valid, i.e., not
165/// freed yet. The returned `out_ptr` must not be used after
166/// freeing the `event`, since it points directly into the event's
167/// memory.
168#[unsafe(no_mangle)]
169pub unsafe extern "C" fn read_dora_input_data(
170 event: *const (),
171 out_ptr: *mut *const u8,
172 out_len: *mut usize,
173) {
174 let event: &Event = unsafe { &*event.cast() };
175 match event {
176 Event::Input { data, metadata, .. } => match metadata.type_info.data_type {
177 dora_node_api::arrow::datatypes::DataType::UInt8 => {
178 let array: &UInt8Array = data.as_primitive();
179 let ptr = array.values().as_ptr();
180 unsafe {
181 *out_ptr = ptr;
182 *out_len = metadata.type_info.len;
183 }
184 }
185 dora_node_api::arrow::datatypes::DataType::Null => unsafe {
186 *out_ptr = ptr::null();
187 *out_len = 0;
188 },
189 _ => {
190 todo!("dora C++ Node does not yet support higher level type of arrow. Only UInt8.
191 The ultimate solution should be based on arrow FFI interface. Feel free to contribute :)")
192 }
193 },
194 _ => unsafe {
195 *out_ptr = ptr::null();
196 *out_len = 0;
197 },
198 }
199}
200
201/// Reads out the timestamp of the given input event from metadata.
202///
203/// ## Safety
204///
205/// Return `0` if the given event is not an input event.
206#[unsafe(no_mangle)]
207pub unsafe extern "C" fn read_dora_input_timestamp(event: *const ()) -> core::ffi::c_ulonglong {
208 let event: &Event = unsafe { &*event.cast() };
209 match event {
210 Event::Input { metadata, .. } => metadata.timestamp().get_time().as_u64(),
211 _ => 0,
212 }
213}
214
215/// Frees the given dora event.
216///
217/// ## Safety
218///
219/// Only pointers created through [`dora_next_event`] are allowed
220/// as arguments. Each context pointer must be freed exactly once. After
221/// freeing, the pointer and all derived pointers must not be used anymore.
222/// This also applies to the `read_dora_event_*` functions, which return
223/// pointers into the original event structure.
224#[unsafe(no_mangle)]
225pub unsafe extern "C" fn free_dora_event(event: *mut c_void) {
226 let _: Box<Event> = unsafe { Box::from_raw(event.cast()) };
227}
228
229/// Sends the given output to subscribed dora nodes/operators.
230///
231/// The `id_ptr` and `id_len` fields must be the start pointer and length of an
232/// UTF8-encoded string. The ID string must correspond to one of the node's
233/// outputs specified in the dataflow YAML file.
234///
235/// The `data_ptr` and `data_len` fields must be the start pointer and length
236/// a byte array. The dora API sends this data as-is, without any processing.
237///
238/// ## Safety
239///
240/// - The `id_ptr` and `id_len` fields must be the start pointer and length of an
241/// UTF8-encoded string.
242/// - The `data_ptr` and `data_len` fields must be the start pointer and length
243/// a byte array.
244#[unsafe(no_mangle)]
245pub unsafe extern "C" fn dora_send_output(
246 context: *mut c_void,
247 id_ptr: *const u8,
248 id_len: usize,
249 data_ptr: *const u8,
250 data_len: usize,
251) -> isize {
252 match unsafe { try_send_output(context, id_ptr, id_len, data_ptr, data_len) } {
253 Ok(()) => 0,
254 Err(err) => {
255 tracing::error!("{err:?}");
256 -1
257 }
258 }
259}
260
261unsafe fn try_send_output(
262 context: *mut c_void,
263 id_ptr: *const u8,
264 id_len: usize,
265 data_ptr: *const u8,
266 data_len: usize,
267) -> eyre::Result<()> {
268 let context: &mut DoraContext = unsafe { &mut *context.cast() };
269 let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?;
270 let output_id = id.to_owned().into();
271 let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
272 context
273 .node
274 .send_output_raw(output_id, Default::default(), data.len(), |out| {
275 out.copy_from_slice(data);
276 })
277}