use std::{collections::HashMap, ffi::c_void, ptr, slice};
use postcard::from_bytes;
use reifydb_abi::{
constants::CURRENT_API,
data::buffer::BufferFFI,
operator::{
column::{OperatorColumnFFI, OperatorColumnsFFI},
descriptor::OperatorDescriptorFFI,
types::OPERATOR_MAGIC,
},
};
use reifydb_core::interface::catalog::flow::FlowNodeId;
use reifydb_type::value::Value;
use crate::{
ffi::wrapper::{OperatorWrapper, create_vtable},
operator::{FFIOperatorWithMetadata, column::operator::OperatorColumn},
};
fn str_to_buffer(s: &'static str) -> BufferFFI {
BufferFFI {
ptr: s.as_ptr(),
len: s.len(),
cap: s.len(),
}
}
fn columns_to_ffi(columns: &'static [OperatorColumn]) -> OperatorColumnsFFI {
if columns.is_empty() {
return OperatorColumnsFFI::empty();
}
let ffi_columns: Vec<OperatorColumnFFI> = columns
.iter()
.map(|c| {
let ffi_type = c.type_constraint.to_ffi();
OperatorColumnFFI {
name: str_to_buffer(c.name),
base_type: ffi_type.base_type,
constraint_type: ffi_type.constraint_type,
constraint_param1: ffi_type.constraint_param1,
constraint_param2: ffi_type.constraint_param2,
description: str_to_buffer(c.description),
}
})
.collect();
let column_count = ffi_columns.len();
let columns_ptr = Box::leak(ffi_columns.into_boxed_slice()).as_ptr();
OperatorColumnsFFI {
columns: columns_ptr,
column_count,
}
}
pub fn create_descriptor<O: FFIOperatorWithMetadata>() -> OperatorDescriptorFFI {
OperatorDescriptorFFI {
api: CURRENT_API,
operator: str_to_buffer(O::NAME),
version: str_to_buffer(O::VERSION),
description: str_to_buffer(O::DESCRIPTION),
input_columns: columns_to_ffi(O::INPUT_COLUMNS),
output_columns: columns_to_ffi(O::OUTPUT_COLUMNS),
capabilities: O::CAPABILITIES,
vtable: create_vtable::<O>(),
}
}
pub unsafe extern "C" fn create_operator_instance<O: FFIOperatorWithMetadata>(
config_ptr: *const u8,
config_len: usize,
operator_id: u64,
) -> *mut c_void {
let config = if config_ptr.is_null() || config_len == 0 {
HashMap::new()
} else {
let config_bytes = unsafe { slice::from_raw_parts(config_ptr, config_len) };
match from_bytes::<HashMap<String, Value>>(config_bytes) {
Ok(decoded_config) => decoded_config,
Err(e) => {
panic!(
"Failed to deserialize operator config for operator {}: {}. Using empty config.",
operator_id, e
);
}
}
};
let operator = match O::new(FlowNodeId(operator_id), &config) {
Ok(op) => op,
Err(e) => {
eprintln!("Failed to create operator: {}", e);
return ptr::null_mut();
}
};
let wrapper = Box::new(OperatorWrapper::new(operator));
Box::into_raw(wrapper) as *mut c_void
}
pub extern "C" fn operator_magic() -> u32 {
OPERATOR_MAGIC
}