use crate::engine::{LanceDbEngine, LanceDbEngineOptions};
use crate::manager::DatabaseRuntimeConfig;
use crate::runtime::LanceDbRuntime;
use crate::types::{
LanceDbColumnDef, LanceDbColumnType, LanceDbCreateTableInput, LanceDbDeleteInput,
LanceDbDropTableInput, LanceDbInputFormat, LanceDbOutputFormat, LanceDbSearchInput,
LanceDbUpsertInput,
};
use serde::Deserialize;
use std::cell::RefCell;
use std::ffi::{CStr, CString, c_char};
use std::ptr;
use std::sync::OnceLock;
use std::sync::mpsc;
use std::thread;
thread_local! {
static LAST_ERROR: RefCell<Option<CString>> = const { RefCell::new(None) };
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct VldbLancedbRuntimeOptions {
pub default_db_path: *const c_char,
pub db_root: *const c_char,
pub read_consistency_interval_ms: u64,
pub has_read_consistency_interval: u8,
pub max_upsert_payload: usize,
pub max_search_limit: usize,
pub max_concurrent_requests: usize,
}
pub struct VldbLancedbRuntimeHandle {
inner: LanceDbRuntime,
}
#[allow(dead_code)]
pub struct VldbLancedbEngineHandle {
inner: LanceDbEngine,
}
struct FfiRuntimeWorker {
sender: mpsc::Sender<FfiRuntimeJob>,
}
type FfiRuntimeJob = Box<dyn FnOnce(&tokio::runtime::Runtime) + Send + 'static>;
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct VldbLancedbByteBuffer {
pub data: *mut u8,
pub len: usize,
pub cap: usize,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VldbLancedbStatusCode {
Success = 0,
Failure = 1,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VldbLancedbFfiInputFormat {
Unspecified = 0,
JsonRows = 1,
ArrowIpc = 2,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VldbLancedbFfiOutputFormat {
Unspecified = 0,
ArrowIpc = 1,
JsonRows = 2,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
pub struct VldbLancedbUpsertResultPod {
pub version: u64,
pub input_rows: u64,
pub inserted_rows: u64,
pub updated_rows: u64,
pub deleted_rows: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
pub struct VldbLancedbSearchResultMeta {
pub format: u32,
pub rows: u64,
pub byte_length: usize,
}
#[derive(Debug, Deserialize)]
struct CreateTableJsonInput {
table_name: String,
columns: Vec<CreateTableJsonColumn>,
#[serde(default)]
overwrite_if_exists: bool,
}
#[derive(Debug, Deserialize)]
struct CreateTableJsonColumn {
name: String,
column_type: String,
#[serde(default)]
vector_dim: u32,
#[serde(default = "default_nullable")]
nullable: bool,
}
#[derive(Debug, Deserialize)]
struct UpsertJsonInput {
table_name: String,
input_format: String,
#[serde(default)]
key_columns: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct SearchJsonInput {
table_name: String,
vector: Vec<f32>,
#[serde(default = "default_search_limit")]
limit: u32,
#[serde(default)]
filter: String,
#[serde(default)]
vector_column: String,
#[serde(default)]
output_format: String,
}
#[derive(Debug, Deserialize)]
struct DeleteJsonInput {
table_name: String,
condition: String,
}
#[derive(Debug, Deserialize)]
struct DropTableJsonInput {
table_name: String,
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_options_default() -> VldbLancedbRuntimeOptions {
let defaults = LanceDbEngineOptions::default();
VldbLancedbRuntimeOptions {
default_db_path: ptr::null(),
db_root: ptr::null(),
read_consistency_interval_ms: 0,
has_read_consistency_interval: 1,
max_upsert_payload: defaults.max_upsert_payload,
max_search_limit: defaults.max_search_limit,
max_concurrent_requests: defaults.max_concurrent_requests,
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_create(
options: VldbLancedbRuntimeOptions,
) -> *mut VldbLancedbRuntimeHandle {
clear_last_error();
match build_runtime_from_options(options) {
Ok(runtime) => Box::into_raw(Box::new(VldbLancedbRuntimeHandle { inner: runtime })),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_destroy(handle: *mut VldbLancedbRuntimeHandle) {
if handle.is_null() {
return;
}
unsafe {
drop(Box::from_raw(handle));
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_open_default_engine(
handle: *mut VldbLancedbRuntimeHandle,
) -> *mut VldbLancedbEngineHandle {
clear_last_error();
let Some(runtime) = runtime_handle_ref(handle) else {
return ptr::null_mut();
};
let runtime = runtime.inner.clone();
match ffi_block_on(move || async move { runtime.open_default_engine().await }) {
Ok(engine) => Box::into_raw(Box::new(VldbLancedbEngineHandle { inner: engine })),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_open_named_engine(
handle: *mut VldbLancedbRuntimeHandle,
database_name: *const c_char,
) -> *mut VldbLancedbEngineHandle {
clear_last_error();
let Some(runtime) = runtime_handle_ref(handle) else {
return ptr::null_mut();
};
let database_name = match optional_c_string(database_name, "database_name") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let runtime = runtime.inner.clone();
match ffi_block_on(
move || async move { runtime.open_named_engine(database_name.as_deref()).await },
) {
Ok(engine) => Box::into_raw(Box::new(VldbLancedbEngineHandle { inner: engine })),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_database_path_for_name(
handle: *mut VldbLancedbRuntimeHandle,
database_name: *const c_char,
) -> *mut c_char {
clear_last_error();
let Some(runtime) = runtime_handle_ref(handle) else {
return ptr::null_mut();
};
let database_name = match optional_c_string(database_name, "database_name") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
match runtime
.inner
.database_path_for_name(database_name.as_deref())
{
Ok(path) => string_into_raw(path),
Err(error) => {
set_last_error(error.to_string());
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_create_table_json(
handle: *mut VldbLancedbEngineHandle,
input_json: *const c_char,
) -> *mut c_char {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return ptr::null_mut();
};
let input = match parse_json_input::<CreateTableJsonInput>(input_json, "input_json") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let input = match map_create_table_input(input) {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let engine = engine.inner.clone();
match ffi_block_on(move || async move { engine.create_table(input).await }) {
Ok(result) => json_string_response(serde_json::json!({
"success": true,
"message": result.message,
})),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_vector_upsert(
handle: *mut VldbLancedbEngineHandle,
input_json: *const c_char,
data: *const u8,
data_len: usize,
) -> *mut c_char {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return ptr::null_mut();
};
let input = match parse_json_input::<UpsertJsonInput>(input_json, "input_json") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let input = match map_upsert_input(input, data, data_len) {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let engine = engine.inner.clone();
match ffi_block_on(move || async move { engine.vector_upsert(input).await }) {
Ok(result) => json_string_response(serde_json::json!({
"success": true,
"message": result.message,
"version": result.version,
"input_rows": result.input_rows,
"inserted_rows": result.inserted_rows,
"updated_rows": result.updated_rows,
"deleted_rows": result.deleted_rows,
})),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_vector_upsert_raw(
handle: *mut VldbLancedbEngineHandle,
table_name: *const c_char,
input_format: VldbLancedbFfiInputFormat,
data: *const u8,
data_len: usize,
key_columns: *const *const c_char,
key_columns_len: usize,
out_result: *mut VldbLancedbUpsertResultPod,
) -> i32 {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return VldbLancedbStatusCode::Failure as i32;
};
if out_result.is_null() {
set_last_error("out_result must not be null");
return VldbLancedbStatusCode::Failure as i32;
}
let result = (|| -> Result<VldbLancedbUpsertResultPod, String> {
let table_name = required_c_string(table_name, "table_name")?;
let key_columns = copy_c_string_array(key_columns, key_columns_len, "key_columns")?;
let input = LanceDbUpsertInput {
table_name,
input_format: parse_ffi_input_format(input_format),
data: copy_input_bytes(data, data_len)?,
key_columns,
};
let engine = engine.inner.clone();
let result = ffi_block_on(move || async move { engine.vector_upsert(input).await })?;
Ok(VldbLancedbUpsertResultPod {
version: result.version,
input_rows: result.input_rows,
inserted_rows: result.inserted_rows,
updated_rows: result.updated_rows,
deleted_rows: result.deleted_rows,
})
})();
match result {
Ok(result) => {
unsafe {
*out_result = result;
}
VldbLancedbStatusCode::Success as i32
}
Err(message) => {
set_last_error(message);
VldbLancedbStatusCode::Failure as i32
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_vector_search(
handle: *mut VldbLancedbEngineHandle,
input_json: *const c_char,
output_data: *mut VldbLancedbByteBuffer,
) -> *mut c_char {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return ptr::null_mut();
};
if output_data.is_null() {
set_last_error("output_data must not be null");
return ptr::null_mut();
}
let input = match parse_json_input::<SearchJsonInput>(input_json, "input_json") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let input = match map_search_input(input) {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let engine = engine.inner.clone();
match ffi_block_on(move || async move { engine.vector_search(input).await }) {
Ok(result) => {
let data = allocate_byte_buffer(result.data);
unsafe {
*output_data = data;
}
json_string_response(serde_json::json!({
"success": true,
"message": result.message,
"format": result.format.as_wire_name(),
"rows": result.rows,
"byte_length": data.len,
}))
}
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_vector_search_f32(
handle: *mut VldbLancedbEngineHandle,
table_name: *const c_char,
vector_data: *const f32,
vector_len: usize,
limit: u32,
filter: *const c_char,
vector_column: *const c_char,
output_format: VldbLancedbFfiOutputFormat,
output_data: *mut VldbLancedbByteBuffer,
out_result: *mut VldbLancedbSearchResultMeta,
) -> i32 {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return VldbLancedbStatusCode::Failure as i32;
};
if output_data.is_null() {
set_last_error("output_data must not be null");
return VldbLancedbStatusCode::Failure as i32;
}
if out_result.is_null() {
set_last_error("out_result must not be null");
return VldbLancedbStatusCode::Failure as i32;
}
let result = (|| -> Result<(VldbLancedbByteBuffer, VldbLancedbSearchResultMeta), String> {
let table_name = required_c_string(table_name, "table_name")?;
let filter = optional_c_string(filter, "filter")?.unwrap_or_default();
let vector_column = optional_c_string(vector_column, "vector_column")?.unwrap_or_default();
let vector = copy_input_f32_slice(vector_data, vector_len, "vector_data")?;
let search_input = LanceDbSearchInput {
table_name,
vector,
limit,
filter,
vector_column,
output_format: parse_ffi_output_format(output_format),
};
let engine = engine.inner.clone();
let result = ffi_block_on(move || async move { engine.vector_search(search_input).await })?;
let buffer = allocate_byte_buffer(result.data);
let meta = VldbLancedbSearchResultMeta {
format: ffi_output_format_code(result.format),
rows: result.rows,
byte_length: buffer.len,
};
Ok((buffer, meta))
})();
match result {
Ok((buffer, meta)) => {
unsafe {
*output_data = buffer;
*out_result = meta;
}
VldbLancedbStatusCode::Success as i32
}
Err(message) => {
set_last_error(message);
VldbLancedbStatusCode::Failure as i32
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_delete_json(
handle: *mut VldbLancedbEngineHandle,
input_json: *const c_char,
) -> *mut c_char {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return ptr::null_mut();
};
let input = match parse_json_input::<DeleteJsonInput>(input_json, "input_json") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let input = LanceDbDeleteInput {
table_name: input.table_name,
condition: input.condition,
};
let engine = engine.inner.clone();
match ffi_block_on(move || async move { engine.delete(input).await }) {
Ok(result) => json_string_response(serde_json::json!({
"success": true,
"message": result.message,
"version": result.version,
"deleted_rows": result.deleted_rows,
})),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_drop_table_json(
handle: *mut VldbLancedbEngineHandle,
input_json: *const c_char,
) -> *mut c_char {
clear_last_error();
let Some(engine) = engine_handle_ref(handle) else {
return ptr::null_mut();
};
let input = match parse_json_input::<DropTableJsonInput>(input_json, "input_json") {
Ok(value) => value,
Err(message) => {
set_last_error(message);
return ptr::null_mut();
}
};
let input = LanceDbDropTableInput {
table_name: input.table_name,
};
let engine = engine.inner.clone();
match ffi_block_on(move || async move { engine.drop_table(input).await }) {
Ok(result) => json_string_response(serde_json::json!({
"success": true,
"message": result.message,
})),
Err(message) => {
set_last_error(message);
ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_destroy(handle: *mut VldbLancedbEngineHandle) {
if handle.is_null() {
return;
}
unsafe {
drop(Box::from_raw(handle));
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_bytes_free(buffer: VldbLancedbByteBuffer) {
if buffer.data.is_null() || buffer.cap == 0 {
return;
}
unsafe {
drop(Vec::from_raw_parts(buffer.data, buffer.len, buffer.cap));
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_string_free(value: *mut c_char) {
if value.is_null() {
return;
}
unsafe {
drop(CString::from_raw(value));
}
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_last_error_message() -> *const c_char {
LAST_ERROR.with(|cell| {
cell.borrow()
.as_ref()
.map(|value| value.as_ptr())
.unwrap_or(ptr::null())
})
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_clear_last_error() {
clear_last_error();
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_engine_is_null(handle: *const VldbLancedbEngineHandle) -> u8 {
if handle.is_null() { 1 } else { 0 }
}
#[unsafe(no_mangle)]
pub extern "C" fn vldb_lancedb_runtime_is_null(handle: *const VldbLancedbRuntimeHandle) -> u8 {
if handle.is_null() { 1 } else { 0 }
}
fn build_runtime_from_options(
options: VldbLancedbRuntimeOptions,
) -> Result<LanceDbRuntime, String> {
let default_db_path = required_c_string(options.default_db_path, "default_db_path")?;
let db_root = optional_c_string(options.db_root, "db_root")?;
let config = DatabaseRuntimeConfig {
default_db_path,
db_root,
read_consistency_interval_ms: if options.has_read_consistency_interval == 0 {
None
} else {
Some(options.read_consistency_interval_ms)
},
};
let defaults = LanceDbEngineOptions::default();
let engine_options = LanceDbEngineOptions {
max_upsert_payload: normalize_non_zero(
options.max_upsert_payload,
defaults.max_upsert_payload,
),
max_search_limit: normalize_non_zero(options.max_search_limit, defaults.max_search_limit),
max_concurrent_requests: normalize_non_zero(
options.max_concurrent_requests,
defaults.max_concurrent_requests,
),
};
Ok(LanceDbRuntime::new(config, engine_options))
}
fn runtime_handle_ref(
handle: *mut VldbLancedbRuntimeHandle,
) -> Option<&'static VldbLancedbRuntimeHandle> {
if handle.is_null() {
set_last_error("runtime handle must not be null");
return None;
}
Some(unsafe { &*handle })
}
fn engine_handle_ref(
handle: *mut VldbLancedbEngineHandle,
) -> Option<&'static VldbLancedbEngineHandle> {
if handle.is_null() {
set_last_error("engine handle must not be null");
return None;
}
Some(unsafe { &*handle })
}
fn parse_json_input<T>(value: *const c_char, field_name: &str) -> Result<T, String>
where
T: for<'de> Deserialize<'de>,
{
let text = required_c_string(value, field_name)?;
serde_json::from_str(&text).map_err(|error| format!("failed to parse {field_name}: {error}"))
}
fn map_create_table_input(input: CreateTableJsonInput) -> Result<LanceDbCreateTableInput, String> {
let columns = input
.columns
.into_iter()
.map(map_create_table_column)
.collect::<Result<Vec<_>, _>>()?;
Ok(LanceDbCreateTableInput {
table_name: input.table_name,
columns,
overwrite_if_exists: input.overwrite_if_exists,
})
}
fn map_create_table_column(input: CreateTableJsonColumn) -> Result<LanceDbColumnDef, String> {
Ok(LanceDbColumnDef {
name: input.name,
column_type: parse_column_type(&input.column_type)?,
vector_dim: input.vector_dim,
nullable: input.nullable,
})
}
fn map_upsert_input(
input: UpsertJsonInput,
data: *const u8,
data_len: usize,
) -> Result<LanceDbUpsertInput, String> {
Ok(LanceDbUpsertInput {
table_name: input.table_name,
input_format: parse_input_format(&input.input_format)?,
data: copy_input_bytes(data, data_len)?,
key_columns: input.key_columns,
})
}
fn map_search_input(input: SearchJsonInput) -> Result<LanceDbSearchInput, String> {
Ok(LanceDbSearchInput {
table_name: input.table_name,
vector: input.vector,
limit: input.limit,
filter: input.filter,
vector_column: input.vector_column,
output_format: parse_output_format(&input.output_format)?,
})
}
fn parse_column_type(value: &str) -> Result<LanceDbColumnType, String> {
match value.trim().to_ascii_lowercase().as_str() {
"string" => Ok(LanceDbColumnType::String),
"int64" => Ok(LanceDbColumnType::Int64),
"float64" => Ok(LanceDbColumnType::Float64),
"bool" | "boolean" => Ok(LanceDbColumnType::Bool),
"vector_float32" | "vector-float32" => Ok(LanceDbColumnType::VectorFloat32),
"float32" => Ok(LanceDbColumnType::Float32),
"uint64" => Ok(LanceDbColumnType::Uint64),
"int32" => Ok(LanceDbColumnType::Int32),
"uint32" => Ok(LanceDbColumnType::Uint32),
"unspecified" | "" => Ok(LanceDbColumnType::Unspecified),
other => Err(format!("unsupported column_type: {other}")),
}
}
fn parse_input_format(value: &str) -> Result<LanceDbInputFormat, String> {
match value.trim().to_ascii_lowercase().as_str() {
"" | "unspecified" | "json" | "json_rows" | "json-rows" => Ok(LanceDbInputFormat::JsonRows),
"arrow" | "arrow_ipc" | "arrow-ipc" => Ok(LanceDbInputFormat::ArrowIpc),
other => Err(format!("unsupported input_format: {other}")),
}
}
fn parse_ffi_input_format(value: VldbLancedbFfiInputFormat) -> LanceDbInputFormat {
match value {
VldbLancedbFfiInputFormat::Unspecified => LanceDbInputFormat::Unspecified,
VldbLancedbFfiInputFormat::JsonRows => LanceDbInputFormat::JsonRows,
VldbLancedbFfiInputFormat::ArrowIpc => LanceDbInputFormat::ArrowIpc,
}
}
fn parse_output_format(value: &str) -> Result<LanceDbOutputFormat, String> {
match value.trim().to_ascii_lowercase().as_str() {
"" | "unspecified" | "arrow" | "arrow_ipc" | "arrow-ipc" => {
Ok(LanceDbOutputFormat::ArrowIpc)
}
"json" | "json_rows" | "json-rows" => Ok(LanceDbOutputFormat::JsonRows),
other => Err(format!("unsupported output_format: {other}")),
}
}
fn parse_ffi_output_format(value: VldbLancedbFfiOutputFormat) -> LanceDbOutputFormat {
match value {
VldbLancedbFfiOutputFormat::Unspecified => LanceDbOutputFormat::Unspecified,
VldbLancedbFfiOutputFormat::ArrowIpc => LanceDbOutputFormat::ArrowIpc,
VldbLancedbFfiOutputFormat::JsonRows => LanceDbOutputFormat::JsonRows,
}
}
fn ffi_output_format_code(value: LanceDbOutputFormat) -> u32 {
match value {
LanceDbOutputFormat::Unspecified => VldbLancedbFfiOutputFormat::Unspecified as u32,
LanceDbOutputFormat::ArrowIpc => VldbLancedbFfiOutputFormat::ArrowIpc as u32,
LanceDbOutputFormat::JsonRows => VldbLancedbFfiOutputFormat::JsonRows as u32,
}
}
fn copy_input_bytes(data: *const u8, data_len: usize) -> Result<Vec<u8>, String> {
if data_len == 0 {
return Ok(Vec::new());
}
if data.is_null() {
return Err("data must not be null when data_len > 0".to_string());
}
let slice = unsafe { std::slice::from_raw_parts(data, data_len) };
Ok(slice.to_vec())
}
fn copy_input_f32_slice(
data: *const f32,
data_len: usize,
field_name: &str,
) -> Result<Vec<f32>, String> {
if data_len == 0 {
return Err(format!("{field_name} must not be empty"));
}
if data.is_null() {
return Err(format!("{field_name} must not be null when data_len > 0"));
}
let slice = unsafe { std::slice::from_raw_parts(data, data_len) };
Ok(slice.to_vec())
}
fn copy_c_string_array(
values: *const *const c_char,
values_len: usize,
field_name: &str,
) -> Result<Vec<String>, String> {
if values_len == 0 {
return Ok(Vec::new());
}
if values.is_null() {
return Err(format!("{field_name} must not be null when values_len > 0"));
}
let slice = unsafe { std::slice::from_raw_parts(values, values_len) };
slice
.iter()
.enumerate()
.map(|(index, value)| required_c_string(*value, &format!("{field_name}[{index}]")))
.collect()
}
fn allocate_byte_buffer(bytes: Vec<u8>) -> VldbLancedbByteBuffer {
let len = bytes.len();
if len == 0 {
return VldbLancedbByteBuffer {
data: ptr::null_mut(),
len: 0,
cap: 0,
};
}
let mut bytes = bytes;
let cap = bytes.capacity();
let data = bytes.as_mut_ptr();
std::mem::forget(bytes);
VldbLancedbByteBuffer { data, len, cap }
}
fn json_string_response(value: serde_json::Value) -> *mut c_char {
match serde_json::to_string(&value) {
Ok(text) => string_into_raw(text),
Err(error) => {
set_last_error(format!("failed to serialize JSON response: {error}"));
ptr::null_mut()
}
}
}
fn default_nullable() -> bool {
true
}
fn default_search_limit() -> u32 {
10
}
fn required_c_string(value: *const c_char, field_name: &str) -> Result<String, String> {
if value.is_null() {
return Err(format!("{field_name} must not be null"));
}
let text = c_string_to_owned(value, field_name)?;
if text.trim().is_empty() {
return Err(format!("{field_name} must not be empty"));
}
Ok(text)
}
fn optional_c_string(value: *const c_char, field_name: &str) -> Result<Option<String>, String> {
if value.is_null() {
return Ok(None);
}
let text = c_string_to_owned(value, field_name)?;
if text.trim().is_empty() {
Ok(None)
} else {
Ok(Some(text))
}
}
fn c_string_to_owned(value: *const c_char, field_name: &str) -> Result<String, String> {
let c_str = unsafe { CStr::from_ptr(value) };
c_str
.to_str()
.map(|text| text.to_string())
.map_err(|_| format!("{field_name} must be valid UTF-8"))
}
fn string_into_raw(value: String) -> *mut c_char {
let sanitized = sanitize_message(value);
match CString::new(sanitized) {
Ok(text) => text.into_raw(),
Err(_) => ptr::null_mut(),
}
}
fn normalize_non_zero(value: usize, default_value: usize) -> usize {
if value == 0 { default_value } else { value }
}
fn ffi_runtime_worker() -> Result<&'static FfiRuntimeWorker, String> {
static FFI_RUNTIME_WORKER: OnceLock<Result<FfiRuntimeWorker, String>> = OnceLock::new();
match FFI_RUNTIME_WORKER.get_or_init(|| {
let (job_tx, job_rx) = mpsc::channel::<FfiRuntimeJob>();
let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), String>>(1);
thread::Builder::new()
.name("vldb-lancedb-ffi-dispatch".to_string())
.spawn(move || {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.thread_name("vldb-lancedb-ffi")
.build()
{
Ok(runtime) => {
let _ = ready_tx.send(Ok(()));
runtime
}
Err(error) => {
let _ = ready_tx.send(Err(format!("failed to build FFI runtime: {error}")));
return;
}
};
while let Ok(job) = job_rx.recv() {
let _ =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| job(&runtime)));
}
})
.map_err(|error| format!("failed to spawn FFI runtime worker: {error}"))?;
match ready_rx.recv() {
Ok(Ok(())) => Ok(FfiRuntimeWorker { sender: job_tx }),
Ok(Err(message)) => Err(message),
Err(_) => Err("failed to receive FFI runtime worker startup signal".to_string()),
}
}) {
Ok(worker) => Ok(worker),
Err(message) => Err(message.clone()),
}
}
fn ffi_block_on<BuildFuture, F, T, E>(build_future: BuildFuture) -> Result<T, String>
where
BuildFuture: FnOnce() -> F + Send + 'static,
F: std::future::Future<Output = Result<T, E>> + Send + 'static,
T: Send + 'static,
E: std::fmt::Display + Send + 'static,
{
let worker = ffi_runtime_worker()?;
let (result_tx, result_rx) = mpsc::sync_channel::<Result<T, String>>(1);
worker
.sender
.send(Box::new(move |runtime| {
let outcome = runtime
.block_on(build_future())
.map_err(|error| error.to_string());
let _ = result_tx.send(outcome);
}))
.map_err(|_| "FFI runtime worker has stopped".to_string())?;
result_rx
.recv()
.map_err(|_| "FFI runtime worker dropped the response channel".to_string())?
}
fn set_last_error(message: impl Into<String>) {
let sanitized = sanitize_message(message.into());
LAST_ERROR.with(|cell| {
*cell.borrow_mut() = CString::new(sanitized).ok();
});
}
fn clear_last_error() {
LAST_ERROR.with(|cell| {
*cell.borrow_mut() = None;
});
}
fn sanitize_message(message: impl Into<String>) -> String {
message.into().replace('\0', " ")
}
#[cfg(test)]
mod tests {
use super::{
CreateTableJsonInput, SearchJsonInput, UpsertJsonInput, VldbLancedbRuntimeOptions,
allocate_byte_buffer, build_runtime_from_options, default_search_limit, ffi_block_on,
map_create_table_input, map_search_input, map_upsert_input, normalize_non_zero,
parse_column_type, parse_input_format, parse_output_format, sanitize_message,
vldb_lancedb_bytes_free, vldb_lancedb_runtime_options_default,
};
use std::ffi::CString;
use std::thread;
#[test]
fn runtime_options_default_exposes_engine_defaults() {
let options = vldb_lancedb_runtime_options_default();
assert!(options.default_db_path.is_null());
assert_eq!(options.has_read_consistency_interval, 1);
assert!(options.max_upsert_payload > 0);
assert!(options.max_search_limit > 0);
assert!(options.max_concurrent_requests > 0);
}
#[test]
fn normalize_non_zero_uses_default_for_zero() {
assert_eq!(normalize_non_zero(0, 42), 42);
assert_eq!(normalize_non_zero(7, 42), 7);
}
#[test]
fn sanitize_message_replaces_nul_bytes() {
assert_eq!(sanitize_message("a\0b"), "a b");
}
#[test]
fn allocate_byte_buffer_preserves_capacity_for_free() {
let mut bytes = Vec::with_capacity(16);
bytes.extend_from_slice(b"demo");
let buffer = allocate_byte_buffer(bytes);
assert_eq!(buffer.len, 4);
assert_eq!(buffer.cap, 16);
vldb_lancedb_bytes_free(buffer);
}
#[test]
fn runtime_can_be_built_from_ffi_options() {
let default_db_path = CString::new("/tmp/vldb-default").expect("cstring should build");
let db_root = CString::new("/tmp/vldb-root").expect("cstring should build");
let options = VldbLancedbRuntimeOptions {
default_db_path: default_db_path.as_ptr(),
db_root: db_root.as_ptr(),
read_consistency_interval_ms: 250,
has_read_consistency_interval: 1,
max_upsert_payload: 1024,
max_search_limit: 64,
max_concurrent_requests: 8,
};
let runtime = build_runtime_from_options(options).expect("runtime should build");
assert_eq!(runtime.engine_options().max_upsert_payload, 1024);
assert_eq!(
runtime
.database_path_for_name(Some("memory"))
.expect("named path should resolve"),
std::path::PathBuf::from("/tmp/vldb-root")
.join("memory")
.to_string_lossy()
.to_string()
);
}
#[test]
fn parse_helpers_support_expected_wire_names() {
assert_eq!(
parse_column_type("vector_float32").expect("column type should parse") as u8,
crate::types::LanceDbColumnType::VectorFloat32 as u8
);
assert_eq!(
parse_input_format("arrow_ipc").expect("input format should parse") as u8,
crate::types::LanceDbInputFormat::ArrowIpc as u8
);
assert_eq!(
parse_output_format("json").expect("output format should parse") as u8,
crate::types::LanceDbOutputFormat::JsonRows as u8
);
assert_eq!(default_search_limit(), 10);
}
#[test]
fn json_mapping_builds_engine_inputs() {
let create_input = map_create_table_input(CreateTableJsonInput {
table_name: "demo".to_string(),
columns: vec![super::CreateTableJsonColumn {
name: "id".to_string(),
column_type: "string".to_string(),
vector_dim: 0,
nullable: false,
}],
overwrite_if_exists: true,
})
.expect("create input should map");
assert_eq!(create_input.table_name, "demo");
assert_eq!(create_input.columns.len(), 1);
assert!(create_input.overwrite_if_exists);
let payload = b"[{\"id\":\"1\"}]";
let upsert_input = map_upsert_input(
UpsertJsonInput {
table_name: "demo".to_string(),
input_format: "json".to_string(),
key_columns: vec!["id".to_string()],
},
payload.as_ptr(),
payload.len(),
)
.expect("upsert input should map");
assert_eq!(upsert_input.table_name, "demo");
assert_eq!(upsert_input.data, payload);
assert_eq!(upsert_input.key_columns, vec!["id".to_string()]);
let search_input = map_search_input(SearchJsonInput {
table_name: "demo".to_string(),
vector: vec![0.1, 0.2],
limit: 3,
filter: "id = '1'".to_string(),
vector_column: "embedding".to_string(),
output_format: "json".to_string(),
})
.expect("search input should map");
assert_eq!(search_input.limit, 3);
assert_eq!(search_input.vector_column, "embedding");
}
#[test]
fn ffi_block_on_dispatches_cross_thread_jobs_without_enter_guard_panics() {
let mut handles = Vec::new();
for thread_index in 0..6usize {
handles.push(thread::spawn(move || {
for call_index in 0..12usize {
let expected = thread_index * 100 + call_index;
let value = ffi_block_on(move || async move {
let value = tokio::task::spawn_blocking(move || expected)
.await
.map_err(|error| std::io::Error::other(error.to_string()))?;
Ok::<usize, std::io::Error>(value)
})
.expect("ffi worker should execute the dispatched future");
assert_eq!(value, expected);
}
}));
}
for handle in handles {
handle
.join()
.expect("cross-thread ffi caller should finish");
}
}
}