dora_node_api_c/
lib.rs

1#![deny(unsafe_op_in_unsafe_fn)]
2
3use arrow_array::UInt8Array;
4use dora_node_api::{DoraNode, Event, EventStream, arrow::array::AsArray};
5use eyre::Context;
6use std::{ffi::c_void, ptr, slice};
7
8pub const HEADER_NODE_API: &str = include_str!("../node_api.h");
9
10struct DoraContext {
11    node: &'static mut DoraNode,
12    events: EventStream,
13}
14
15/// Initializes a dora context from the environment variables that were set by
16/// the dora-coordinator.
17///
18/// Returns a pointer to the dora context on success. This pointer can be
19/// used to call dora API functions that expect a `context` argument. Any
20/// other use is prohibited. To free the dora context when it is no longer
21/// needed, use the [`free_dora_context`] function.
22///
23/// On error, a null pointer is returned.
24#[unsafe(no_mangle)]
25pub extern "C" fn init_dora_context_from_env() -> *mut c_void {
26    let context = || {
27        let (node, events) = DoraNode::init_from_env()?;
28        let node = Box::leak(Box::new(node));
29        Result::<_, eyre::Report>::Ok(DoraContext { node, events })
30    };
31    let context = match context().context("failed to initialize node") {
32        Ok(n) => n,
33        Err(err) => {
34            let err: eyre::Error = err;
35            tracing::error!("{err:?}");
36            return ptr::null_mut();
37        }
38    };
39
40    Box::into_raw(Box::new(context)).cast()
41}
42
43/// Frees the given dora context.
44///
45/// ## Safety
46///
47/// Only pointers created through [`init_dora_context_from_env`] are allowed
48/// as arguments. Each context pointer must be freed exactly once. After
49/// freeing, the pointer must not be used anymore.
50#[unsafe(no_mangle)]
51pub unsafe extern "C" fn free_dora_context(context: *mut c_void) {
52    let context: Box<DoraContext> = unsafe { Box::from_raw(context.cast()) };
53    // drop all fields except for `node`
54    let DoraContext { node, .. } = *context;
55    // convert the `'static` reference back to a Box, then drop it
56    let _ = unsafe { Box::from_raw(node as *const DoraNode as *mut DoraNode) };
57}
58
59/// Waits for the next incoming event for the node.
60///
61/// Returns a pointer to the event on success. This pointer must not be used
62/// directly. Instead, use the `read_dora_event_*` functions to read out the
63/// type and payload of the event. When the event is not needed anymore, use
64/// [`free_dora_event`] to free it again.
65///
66/// Returns a null pointer when all event streams were closed. This means that
67/// no more event will be available. Nodes typically react by stopping.
68///
69/// ## Safety
70///
71/// The `context` argument must be a dora context created through
72/// [`init_dora_context_from_env`]. The context must be still valid, i.e., not
73/// freed yet.
74#[unsafe(no_mangle)]
75pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void {
76    let context: &mut DoraContext = unsafe { &mut *context.cast() };
77    match context.events.recv() {
78        Some(event) => Box::into_raw(Box::new(event)).cast(),
79        None => ptr::null_mut(),
80    }
81}
82
83/// Reads out the type of the given event.
84///
85/// ## Safety
86///
87/// The `event` argument must be a dora event received through
88/// [`dora_next_event`]. The event must be still valid, i.e., not
89/// freed yet.
90#[unsafe(no_mangle)]
91pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType {
92    let event: &Event = unsafe { &*event.cast() };
93    match event {
94        Event::Stop(_) => EventType::Stop,
95        Event::Input { .. } => EventType::Input,
96        Event::InputClosed { .. } => EventType::InputClosed,
97        Event::Error(_) => EventType::Error,
98        _ => EventType::Unknown,
99    }
100}
101
102#[repr(C)]
103pub enum EventType {
104    Stop,
105    Input,
106    InputClosed,
107    Error,
108    Unknown,
109}
110
111/// Reads out the ID of the given input event.
112///
113/// Writes the `out_ptr` and `out_len` with the start pointer and length of the
114/// ID string of the input. The ID is guaranteed to be valid UTF-8.
115///
116/// Writes a null pointer and length `0` if the given event is not an input event.
117///
118/// ## Safety
119///
120/// - The `event` argument must be a dora event received through
121///   [`dora_next_event`]. The event must be still valid, i.e., not
122///   freed yet. The returned `out_ptr` must not be used after
123///   freeing the `event`, since it points directly into the event's
124///   memory.
125///
126/// - Note: `Out_ptr` is not a null-terminated string. The length of the string
127///   is given by `out_len`.
128#[unsafe(no_mangle)]
129pub unsafe extern "C" fn read_dora_input_id(
130    event: *const (),
131    out_ptr: *mut *const u8,
132    out_len: *mut usize,
133) {
134    let event: &Event = unsafe { &*event.cast() };
135    match event {
136        Event::Input { id, .. } => {
137            let id = id.as_str().as_bytes();
138            let ptr = id.as_ptr();
139            let len = id.len();
140            unsafe {
141                *out_ptr = ptr;
142                *out_len = len;
143            }
144        }
145        _ => unsafe {
146            *out_ptr = ptr::null();
147            *out_len = 0;
148        },
149    }
150}
151
152/// Reads out the data of the given input event.
153///
154/// Writes the `out_ptr` and `out_len` with the start pointer and length of the
155/// input's data array. The data array is a raw byte array, whose format
156/// depends on the source operator/node.
157///
158/// Writes a null pointer and length `0` if the given event is not an input event
159/// or when an input event has no associated data.
160///
161/// ## Safety
162///
163/// The `event` argument must be a dora event received through
164/// [`dora_next_event`]. The event must be still valid, i.e., not
165/// freed yet. The returned `out_ptr` must not be used after
166/// freeing the `event`, since it points directly into the event's
167/// memory.
168#[unsafe(no_mangle)]
169pub unsafe extern "C" fn read_dora_input_data(
170    event: *const (),
171    out_ptr: *mut *const u8,
172    out_len: *mut usize,
173) {
174    let event: &Event = unsafe { &*event.cast() };
175    match event {
176        Event::Input { data, metadata, .. } => match metadata.type_info.data_type {
177            dora_node_api::arrow::datatypes::DataType::UInt8 => {
178                let array: &UInt8Array = data.as_primitive();
179                let ptr = array.values().as_ptr();
180                unsafe {
181                    *out_ptr = ptr;
182                    *out_len = metadata.type_info.len;
183                }
184            }
185            dora_node_api::arrow::datatypes::DataType::Null => unsafe {
186                *out_ptr = ptr::null();
187                *out_len = 0;
188            },
189            _ => {
190                todo!("dora C++ Node does not yet support higher level type of arrow. Only UInt8. 
191                The ultimate solution should be based on arrow FFI interface. Feel free to contribute :)")
192            }
193        },
194        _ => unsafe {
195            *out_ptr = ptr::null();
196            *out_len = 0;
197        },
198    }
199}
200
201/// Reads out the timestamp of the given input event from metadata.
202///
203/// ## Safety
204///
205/// Return `0` if the given event is not an input event.
206#[unsafe(no_mangle)]
207pub unsafe extern "C" fn read_dora_input_timestamp(event: *const ()) -> core::ffi::c_ulonglong {
208    let event: &Event = unsafe { &*event.cast() };
209    match event {
210        Event::Input { metadata, .. } => metadata.timestamp().get_time().as_u64(),
211        _ => 0,
212    }
213}
214
215/// Frees the given dora event.
216///
217/// ## Safety
218///
219/// Only pointers created through [`dora_next_event`] are allowed
220/// as arguments. Each context pointer must be freed exactly once. After
221/// freeing, the pointer and all derived pointers must not be used anymore.
222/// This also applies to the `read_dora_event_*` functions, which return
223/// pointers into the original event structure.
224#[unsafe(no_mangle)]
225pub unsafe extern "C" fn free_dora_event(event: *mut c_void) {
226    let _: Box<Event> = unsafe { Box::from_raw(event.cast()) };
227}
228
229/// Sends the given output to subscribed dora nodes/operators.
230///
231/// The `id_ptr` and `id_len` fields must be the start pointer and length of an
232/// UTF8-encoded string. The ID string must correspond to one of the node's
233/// outputs specified in the dataflow YAML file.
234///
235/// The `data_ptr` and `data_len` fields must be the start pointer and length
236/// a byte array. The dora API sends this data as-is, without any processing.
237///
238/// ## Safety
239///
240/// - The `id_ptr` and `id_len` fields must be the start pointer and length of an
241///   UTF8-encoded string.
242/// - The `data_ptr` and `data_len` fields must be the start pointer and length
243///   a byte array.
244#[unsafe(no_mangle)]
245pub unsafe extern "C" fn dora_send_output(
246    context: *mut c_void,
247    id_ptr: *const u8,
248    id_len: usize,
249    data_ptr: *const u8,
250    data_len: usize,
251) -> isize {
252    match unsafe { try_send_output(context, id_ptr, id_len, data_ptr, data_len) } {
253        Ok(()) => 0,
254        Err(err) => {
255            tracing::error!("{err:?}");
256            -1
257        }
258    }
259}
260
261unsafe fn try_send_output(
262    context: *mut c_void,
263    id_ptr: *const u8,
264    id_len: usize,
265    data_ptr: *const u8,
266    data_len: usize,
267) -> eyre::Result<()> {
268    let context: &mut DoraContext = unsafe { &mut *context.cast() };
269    let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?;
270    let output_id = id.to_owned().into();
271    let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
272    context
273        .node
274        .send_output_raw(output_id, Default::default(), data.len(), |out| {
275            out.copy_from_slice(data);
276        })
277}