#![deny(unsafe_op_in_unsafe_fn)]
use arrow_array::UInt8Array;
use dora_node_api::{DoraNode, Event, EventStream, arrow::array::AsArray};
use eyre::Context;
use std::{ffi::c_void, ptr, slice};
pub const HEADER_NODE_API: &str = include_str!("../node_api.h");
struct DoraContext {
node: &'static mut DoraNode,
events: EventStream,
}
#[unsafe(no_mangle)]
pub extern "C" fn init_dora_context_from_env() -> *mut c_void {
let context = || {
let (node, events) = DoraNode::init_from_env()?;
let node = Box::leak(Box::new(node));
Result::<_, eyre::Report>::Ok(DoraContext { node, events })
};
let context = match context().context("failed to initialize node") {
Ok(n) => n,
Err(err) => {
let err: eyre::Error = err;
tracing::error!("{err:?}");
return ptr::null_mut();
}
};
Box::into_raw(Box::new(context)).cast()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn free_dora_context(context: *mut c_void) {
let context: Box<DoraContext> = unsafe { Box::from_raw(context.cast()) };
let DoraContext { node, .. } = *context;
let _ = unsafe { Box::from_raw(node as *const DoraNode as *mut DoraNode) };
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void {
let context: &mut DoraContext = unsafe { &mut *context.cast() };
match context.events.recv() {
Some(event) => Box::into_raw(Box::new(event)).cast(),
None => ptr::null_mut(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType {
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Stop(_) => EventType::Stop,
Event::Input { .. } => EventType::Input,
Event::InputClosed { .. } => EventType::InputClosed,
Event::Error(_) => EventType::Error,
_ => EventType::Unknown,
}
}
#[repr(C)]
pub enum EventType {
Stop,
Input,
InputClosed,
Error,
Unknown,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn read_dora_input_id(
event: *const (),
out_ptr: *mut *const u8,
out_len: *mut usize,
) {
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Input { id, .. } => {
let id = id.as_str().as_bytes();
let ptr = id.as_ptr();
let len = id.len();
unsafe {
*out_ptr = ptr;
*out_len = len;
}
}
_ => unsafe {
*out_ptr = ptr::null();
*out_len = 0;
},
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn read_dora_input_data(
event: *const (),
out_ptr: *mut *const u8,
out_len: *mut usize,
) {
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Input { data, metadata, .. } => match metadata.type_info.data_type {
dora_node_api::arrow::datatypes::DataType::UInt8 => {
let array: &UInt8Array = data.as_primitive();
let ptr = array.values().as_ptr();
unsafe {
*out_ptr = ptr;
*out_len = metadata.type_info.len;
}
}
dora_node_api::arrow::datatypes::DataType::Null => unsafe {
*out_ptr = ptr::null();
*out_len = 0;
},
_ => {
todo!("dora C++ Node does not yet support higher level type of arrow. Only UInt8.
The ultimate solution should be based on arrow FFI interface. Feel free to contribute :)")
}
},
_ => unsafe {
*out_ptr = ptr::null();
*out_len = 0;
},
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn read_dora_input_timestamp(event: *const ()) -> core::ffi::c_ulonglong {
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Input { metadata, .. } => metadata.timestamp().get_time().as_u64(),
_ => 0,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn free_dora_event(event: *mut c_void) {
let _: Box<Event> = unsafe { Box::from_raw(event.cast()) };
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dora_send_output(
context: *mut c_void,
id_ptr: *const u8,
id_len: usize,
data_ptr: *const u8,
data_len: usize,
) -> isize {
match unsafe { try_send_output(context, id_ptr, id_len, data_ptr, data_len) } {
Ok(()) => 0,
Err(err) => {
tracing::error!("{err:?}");
-1
}
}
}
unsafe fn try_send_output(
context: *mut c_void,
id_ptr: *const u8,
id_len: usize,
data_ptr: *const u8,
data_len: usize,
) -> eyre::Result<()> {
let context: &mut DoraContext = unsafe { &mut *context.cast() };
let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?;
let output_id = id.to_owned().into();
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
context
.node
.send_output_raw(output_id, Default::default(), data.len(), |out| {
out.copy_from_slice(data);
})
}