use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use libloading::Library;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::ffi::{CStr, CString, c_char, c_uchar};
use std::path::{Path, PathBuf};
use std::ptr;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::host::controller::{LuaRuntimeSpaceControllerBridge, controller_space_id_for_binding};
use crate::host::database::{
LuaRuntimeDatabaseCallbackMode, LuaRuntimeDatabaseProviderMode, RuntimeDatabaseBindingContext,
RuntimeDatabaseKind, RuntimeDatabaseProviderCallbacks, RuntimeLanceDbProviderAction,
RuntimeLanceDbProviderRequest,
};
use crate::lua_skill::{SkillLanceDbLogLevel, SkillLanceDbMeta};
use crate::runtime_logging::{info as log_info, warn as log_warn};
use crate::runtime_options::LuaRuntimeHostOptions;
use vldb_controller_client::ControllerLanceDbEnableRequest;
#[repr(C)]
struct VldbLancedbRuntimeHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbLancedbEngineHandle {
_private: [u8; 0],
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbLancedbByteBuffer {
data: *mut c_uchar,
len: usize,
cap: usize,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbLancedbRuntimeOptions {
default_db_path: *const c_char,
db_root: *const c_char,
read_consistency_interval_ms: u64,
has_read_consistency_interval: u8,
max_upsert_payload: usize,
max_search_limit: usize,
max_concurrent_requests: usize,
}
type RuntimeOptionsDefaultFn = unsafe extern "C" fn() -> VldbLancedbRuntimeOptions;
type RuntimeCreateFn =
unsafe extern "C" fn(VldbLancedbRuntimeOptions) -> *mut VldbLancedbRuntimeHandle;
type RuntimeDestroyFn = unsafe extern "C" fn(*mut VldbLancedbRuntimeHandle);
type RuntimeOpenDefaultEngineFn =
unsafe extern "C" fn(*mut VldbLancedbRuntimeHandle) -> *mut VldbLancedbEngineHandle;
type RuntimeDatabasePathForNameFn =
unsafe extern "C" fn(*mut VldbLancedbRuntimeHandle, *const c_char) -> *mut c_char;
type EngineCreateTableJsonFn =
unsafe extern "C" fn(*mut VldbLancedbEngineHandle, *const c_char) -> *mut c_char;
type EngineVectorUpsertFn = unsafe extern "C" fn(
*mut VldbLancedbEngineHandle,
*const c_char,
*const u8,
usize,
) -> *mut c_char;
type EngineVectorSearchFn = unsafe extern "C" fn(
*mut VldbLancedbEngineHandle,
*const c_char,
*mut VldbLancedbByteBuffer,
) -> *mut c_char;
type EngineDeleteJsonFn =
unsafe extern "C" fn(*mut VldbLancedbEngineHandle, *const c_char) -> *mut c_char;
type EngineDropTableJsonFn =
unsafe extern "C" fn(*mut VldbLancedbEngineHandle, *const c_char) -> *mut c_char;
type EngineDestroyFn = unsafe extern "C" fn(*mut VldbLancedbEngineHandle);
type BytesFreeFn = unsafe extern "C" fn(VldbLancedbByteBuffer);
type StringFreeFn = unsafe extern "C" fn(*mut c_char);
type LastErrorMessageFn = unsafe extern "C" fn() -> *const c_char;
type ClearLastErrorFn = unsafe extern "C" fn();
struct LoadedLanceDbApi {
_library: Library,
library_path: PathBuf,
runtime_options_default: RuntimeOptionsDefaultFn,
runtime_create: RuntimeCreateFn,
runtime_destroy: RuntimeDestroyFn,
runtime_open_default_engine: RuntimeOpenDefaultEngineFn,
runtime_database_path_for_name: RuntimeDatabasePathForNameFn,
engine_create_table_json: EngineCreateTableJsonFn,
engine_vector_upsert: EngineVectorUpsertFn,
engine_vector_search: EngineVectorSearchFn,
engine_delete_json: EngineDeleteJsonFn,
engine_drop_table_json: EngineDropTableJsonFn,
engine_destroy: EngineDestroyFn,
bytes_free: BytesFreeFn,
string_free: StringFreeFn,
last_error_message: LastErrorMessageFn,
clear_last_error: ClearLastErrorFn,
}
unsafe impl Send for LoadedLanceDbApi {}
unsafe impl Sync for LoadedLanceDbApi {}
impl LoadedLanceDbApi {
fn load(library_path: &Path) -> Result<Self, String> {
if !library_path.exists() {
return Err(format!(
"LanceDB dynamic library path does not exist: {}",
library_path.display()
));
}
let library = unsafe { Library::new(library_path) }.map_err(|error| {
format!(
"failed to load {}: {}: {}",
library_path.display(),
error,
error
)
})?;
unsafe { Self::from_library(library_path.to_path_buf(), library) }
}
unsafe fn from_library(library_path: PathBuf, library: Library) -> Result<Self, String> {
macro_rules! load_symbol {
($name:literal, $ty:ty) => {{
unsafe {
*library
.get::<$ty>(concat!($name, "\0").as_bytes())
.map_err(|error| {
format!(
"failed to load symbol {} from {}: {}",
$name,
library_path.display(),
error
)
})?
}
}};
}
Ok(Self {
runtime_options_default: load_symbol!(
"vldb_lancedb_runtime_options_default",
RuntimeOptionsDefaultFn
),
runtime_create: load_symbol!("vldb_lancedb_runtime_create", RuntimeCreateFn),
runtime_destroy: load_symbol!("vldb_lancedb_runtime_destroy", RuntimeDestroyFn),
runtime_open_default_engine: load_symbol!(
"vldb_lancedb_runtime_open_default_engine",
RuntimeOpenDefaultEngineFn
),
runtime_database_path_for_name: load_symbol!(
"vldb_lancedb_runtime_database_path_for_name",
RuntimeDatabasePathForNameFn
),
engine_create_table_json: load_symbol!(
"vldb_lancedb_engine_create_table_json",
EngineCreateTableJsonFn
),
engine_vector_upsert: load_symbol!(
"vldb_lancedb_engine_vector_upsert",
EngineVectorUpsertFn
),
engine_vector_search: load_symbol!(
"vldb_lancedb_engine_vector_search",
EngineVectorSearchFn
),
engine_delete_json: load_symbol!("vldb_lancedb_engine_delete_json", EngineDeleteJsonFn),
engine_drop_table_json: load_symbol!(
"vldb_lancedb_engine_drop_table_json",
EngineDropTableJsonFn
),
engine_destroy: load_symbol!("vldb_lancedb_engine_destroy", EngineDestroyFn),
bytes_free: load_symbol!("vldb_lancedb_bytes_free", BytesFreeFn),
string_free: load_symbol!("vldb_lancedb_string_free", StringFreeFn),
last_error_message: load_symbol!("vldb_lancedb_last_error_message", LastErrorMessageFn),
clear_last_error: load_symbol!("vldb_lancedb_clear_last_error", ClearLastErrorFn),
_library: library,
library_path,
})
}
fn take_last_error_message(&self) -> String {
unsafe {
let ptr = (self.last_error_message)();
let text = if ptr.is_null() {
"unknown LanceDB host error".to_string()
} else {
CStr::from_ptr(ptr).to_string_lossy().to_string()
};
(self.clear_last_error)();
text
}
}
fn take_owned_string(&self, ptr: *mut c_char) -> Result<String, String> {
if ptr.is_null() {
return Err(self.take_last_error_message());
}
unsafe {
let text = CStr::from_ptr(ptr).to_string_lossy().to_string();
(self.string_free)(ptr);
Ok(text)
}
}
fn take_owned_bytes(&self, buffer: VldbLancedbByteBuffer) -> Vec<u8> {
if buffer.data.is_null() || buffer.len == 0 {
return Vec::new();
}
unsafe {
let bytes = std::slice::from_raw_parts(buffer.data, buffer.len).to_vec();
(self.bytes_free)(buffer);
bytes
}
}
}
struct SkillHandleState {
runtime: *mut VldbLancedbRuntimeHandle,
engine: *mut VldbLancedbEngineHandle,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum LanceDbBindingMode {
DynamicLibrary,
HostCallback,
SpaceController,
}
unsafe impl Send for SkillHandleState {}
pub struct LanceDbSkillBinding {
api: Option<Arc<LoadedLanceDbApi>>,
skill_name: String,
skill_dir_name: String,
database_path: String,
config: SkillLanceDbMeta,
provider_mode: LanceDbBindingMode,
callback_mode: LuaRuntimeDatabaseCallbackMode,
handles: Option<Mutex<SkillHandleState>>,
controller: Option<Arc<LuaRuntimeSpaceControllerBridge>>,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
provider_binding: RuntimeDatabaseBindingContext,
}
impl LanceDbSkillBinding {
pub fn status_json(&self) -> Value {
json!({
"enabled": true,
"initialized": true,
"skill_name": self.skill_name,
"skill_dir_name": self.skill_dir_name,
"database_path": self.database_path,
"integration_mode": self.integration_mode_name(),
"library_path": self.api.as_ref().map(|api| api.library_path.to_string_lossy().to_string()).unwrap_or_default(),
"space_label": self.provider_binding.space_label,
"root_name": self.provider_binding.root_name,
"binding_tag": self.provider_binding.binding_tag,
"space_root": self.provider_binding.space_root,
"default_database_path": self.provider_binding.default_database_path,
"log_level": self.config.log_level.as_str(),
"slow_log_enabled": self.config.slow_log_enabled,
"slow_log_threshold_ms": self.config.slow_log_threshold_ms,
})
}
pub fn info_json(&self) -> Value {
self.status_json()
}
pub fn create_table_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
self.log_info("create_table", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let request_json = serde_json::to_string(input).map_err(|error| error.to_string())?;
let result = bridge.run(move |client| async move {
client
.create_lancedb_table(space_id, binding_id, request_json)
.await
})?;
self.log_if_slow("create_table", started_at, None);
return Ok(json!({ "message": result.message }));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeLanceDbProviderAction::CreateTable, input)
.map(|result| result.meta);
}
self.call_json_string("create_table", input, |api, state, input_ptr| unsafe {
(api.engine_create_table_json)(state.engine, input_ptr)
})
}
pub fn vector_upsert_json(&self, input: &Value, data: &[u8]) -> Result<Value, String> {
if self.is_space_controller_mode() {
self.log_info(
"vector_upsert",
Some(format!("payload_bytes={}", data.len())),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let request_json = serde_json::to_string(input).map_err(|error| error.to_string())?;
let payload = data.to_vec();
let result = bridge.run(move |client| async move {
client
.upsert_lancedb(space_id, binding_id, request_json, payload)
.await
})?;
self.log_if_slow(
"vector_upsert",
started_at,
Some(format!("payload_bytes={}", data.len())),
);
return Ok(json!({
"message": result.message,
"version": result.version,
"input_rows": result.input_rows,
"inserted_rows": result.inserted_rows,
"updated_rows": result.updated_rows,
"deleted_rows": result.deleted_rows,
}));
}
if self.is_host_provider_mode() {
let mut host_input = input.clone();
if let Some(object) = host_input.as_object_mut() {
object.insert(
"data_base64".to_string(),
Value::String(BASE64_STANDARD.encode(data)),
);
}
return self
.dispatch_host_provider(RuntimeLanceDbProviderAction::VectorUpsert, &host_input)
.map(|result| result.meta);
}
let api = self.api_ref();
let input_text = serde_json::to_string(input).map_err(|error| error.to_string())?;
let input_cstr = CString::new(input_text)
.map_err(|_| "input json contains interior NUL bytes".to_string())?;
self.log_info(
"vector_upsert",
Some(format!("payload_bytes={}", data.len())),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
unsafe {
let response = (api.engine_vector_upsert)(
guard.engine,
input_cstr.as_ptr(),
data.as_ptr(),
data.len(),
);
let text = match api.take_owned_string(response) {
Ok(text) => text,
Err(error) => {
drop(guard);
self.log_warning("vector_upsert", &error);
return Err(error);
}
};
let value = serde_json::from_str(&text).map_err(|error| {
format!("failed to parse LanceDB upsert response JSON: {}", error)
})?;
drop(guard);
self.log_if_slow(
"vector_upsert",
started_at,
Some(format!("payload_bytes={}", data.len())),
);
Ok(value)
}
}
pub fn vector_search_json(&self, input: &Value) -> Result<(Value, Vec<u8>), String> {
if self.is_space_controller_mode() {
self.log_info("vector_search", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let request_json = serde_json::to_string(input).map_err(|error| error.to_string())?;
let result = bridge.run(move |client| async move {
client
.search_lancedb(space_id, binding_id, request_json)
.await
})?;
self.log_if_slow(
"vector_search",
started_at,
Some(format!("result_bytes={}", result.data.len())),
);
return Ok((
json!({
"message": result.message,
"format": result.format,
"rows": result.rows,
}),
result.data,
));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeLanceDbProviderAction::VectorSearch, input)
.map(|result| (result.meta, result.bytes));
}
let api = self.api_ref();
let input_text = serde_json::to_string(input).map_err(|error| error.to_string())?;
let input_cstr = CString::new(input_text)
.map_err(|_| "input json contains interior NUL bytes".to_string())?;
self.log_info("vector_search", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let mut buffer = VldbLancedbByteBuffer {
data: ptr::null_mut(),
len: 0,
cap: 0,
};
unsafe {
let response =
(api.engine_vector_search)(guard.engine, input_cstr.as_ptr(), &mut buffer);
let text = match api.take_owned_string(response) {
Ok(text) => text,
Err(error) => {
drop(guard);
self.log_warning("vector_search", &error);
return Err(error);
}
};
let meta: Value = serde_json::from_str(&text).map_err(|error| {
format!("failed to parse LanceDB search response JSON: {}", error)
})?;
let bytes = api.take_owned_bytes(buffer);
drop(guard);
self.log_if_slow(
"vector_search",
started_at,
Some(format!("result_bytes={}", bytes.len())),
);
Ok((meta, bytes))
}
}
pub fn delete_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
self.log_info("delete", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let request_json = serde_json::to_string(input).map_err(|error| error.to_string())?;
let result = bridge.run(move |client| async move {
client
.delete_lancedb(space_id, binding_id, request_json)
.await
})?;
self.log_if_slow("delete", started_at, None);
return Ok(json!({
"message": result.message,
"version": result.version,
"deleted_rows": result.deleted_rows,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeLanceDbProviderAction::Delete, input)
.map(|result| result.meta);
}
self.call_json_string("delete", input, |api, state, input_ptr| unsafe {
(api.engine_delete_json)(state.engine, input_ptr)
})
}
pub fn drop_table_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
self.log_info("drop_table", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let table_name = require_string_field(input, "table_name")?.to_string();
let result = bridge.run(move |client| async move {
client
.drop_lancedb_table(space_id, binding_id, table_name)
.await
})?;
self.log_if_slow("drop_table", started_at, None);
return Ok(json!({ "message": result.message }));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeLanceDbProviderAction::DropTable, input)
.map(|result| result.meta);
}
self.call_json_string("drop_table", input, |api, state, input_ptr| unsafe {
(api.engine_drop_table_json)(state.engine, input_ptr)
})
}
fn call_json_string<F>(
&self,
operation: &str,
input: &Value,
invoke: F,
) -> Result<Value, String>
where
F: Fn(&LoadedLanceDbApi, &SkillHandleState, *const c_char) -> *mut c_char,
{
let input_text = serde_json::to_string(input).map_err(|error| error.to_string())?;
let input_cstr = CString::new(input_text)
.map_err(|_| "input json contains interior NUL bytes".to_string())?;
self.log_info(operation, None);
let started_at = Instant::now();
let api = self.api_ref();
let guard = self.lock_handles()?;
let response = invoke(api, &guard, input_cstr.as_ptr());
let text = match api.take_owned_string(response) {
Ok(text) => text,
Err(error) => {
drop(guard);
self.log_warning(operation, &error);
return Err(error);
}
};
let value = serde_json::from_str(&text)
.map_err(|error| format!("failed to parse LanceDB response JSON: {}", error))?;
drop(guard);
self.log_if_slow(operation, started_at, None);
Ok(value)
}
fn log_info(&self, operation: &str, extra: Option<String>) {
if self.config.log_level == SkillLanceDbLogLevel::Info {
match extra {
Some(extra) => log_info(format!(
"[LanceDb:info] skill={} db={} op={} {}",
self.skill_name, self.skill_dir_name, operation, extra
)),
None => log_info(format!(
"[LanceDb:info] skill={} db={} op={}",
self.skill_name, self.skill_dir_name, operation
)),
}
}
}
fn log_if_slow(&self, operation: &str, started_at: Instant, extra: Option<String>) {
if !self.config.slow_log_enabled {
return;
}
let elapsed_ms = started_at.elapsed().as_millis() as u64;
if elapsed_ms < self.config.slow_log_threshold_ms {
return;
}
match extra {
Some(extra) => log_info(format!(
"[LanceDb:slow] skill={} db={} op={} elapsed_ms={} {}",
self.skill_name, self.skill_dir_name, operation, elapsed_ms, extra
)),
None => log_info(format!(
"[LanceDb:slow] skill={} db={} op={} elapsed_ms={}",
self.skill_name, self.skill_dir_name, operation, elapsed_ms
)),
}
}
fn log_warning(&self, operation: &str, message: &str) {
if matches!(
self.config.log_level,
SkillLanceDbLogLevel::Info | SkillLanceDbLogLevel::Warning
) {
log_warn(format!(
"[LanceDb:warn] skill={} db={} op={} message={}",
self.skill_name, self.skill_dir_name, operation, message
));
}
}
fn is_host_provider_mode(&self) -> bool {
self.provider_mode == LanceDbBindingMode::HostCallback
}
fn is_space_controller_mode(&self) -> bool {
self.provider_mode == LanceDbBindingMode::SpaceController
}
fn api_ref(&self) -> &LoadedLanceDbApi {
self.api
.as_ref()
.expect("LanceDB dynamic-library API missing in host provider mode")
}
fn integration_mode_name(&self) -> &'static str {
match self.provider_mode {
LanceDbBindingMode::DynamicLibrary => "dynamic_library",
LanceDbBindingMode::HostCallback => "host_callback",
LanceDbBindingMode::SpaceController => "space_controller",
}
}
fn dispatch_host_provider(
&self,
action: RuntimeLanceDbProviderAction,
input: &Value,
) -> Result<crate::host::database::RuntimeLanceDbProviderResult, String> {
let request = RuntimeLanceDbProviderRequest {
action,
binding: self.provider_binding.clone(),
input: input.clone(),
};
self.provider_callbacks
.dispatch_lancedb_provider_request(&request, self.callback_mode)
}
fn lock_handles(&self) -> Result<std::sync::MutexGuard<'_, SkillHandleState>, String> {
self.handles
.as_ref()
.ok_or_else(|| {
"LanceDB dynamic-library handles are unavailable in host provider mode".to_string()
})?
.lock()
.map_err(|_| "failed to acquire LanceDB handle lock".to_string())
}
fn controller_bridge(&self) -> Result<&Arc<LuaRuntimeSpaceControllerBridge>, String> {
self.controller
.as_ref()
.ok_or_else(|| "LanceDB space-controller bridge is unavailable".to_string())
}
fn controller_space_id(&self) -> String {
controller_space_id_for_binding(&self.provider_binding)
}
fn controller_binding_id(&self) -> Result<String, String> {
Ok(self
.controller_bridge()?
.controller_binding_id_for_binding(&self.provider_binding))
}
}
impl Drop for LanceDbSkillBinding {
fn drop(&mut self) {
let Some(handles) = self.handles.as_ref() else {
return;
};
let Some(api) = self.api.as_ref() else {
return;
};
if let Ok(mut guard) = handles.lock() {
unsafe {
if !guard.engine.is_null() {
(api.engine_destroy)(guard.engine);
guard.engine = ptr::null_mut();
}
if !guard.runtime.is_null() {
(api.runtime_destroy)(guard.runtime);
guard.runtime = ptr::null_mut();
}
}
}
}
}
pub struct LanceDbSkillHost {
api: Option<Arc<LoadedLanceDbApi>>,
controller: Option<Arc<LuaRuntimeSpaceControllerBridge>>,
skills: Mutex<HashMap<String, Arc<LanceDbSkillBinding>>>,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
host_options: LuaRuntimeHostOptions,
}
impl LanceDbSkillHost {
pub fn new(
host_options: LuaRuntimeHostOptions,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
) -> Result<Self, String> {
let api = match host_options.lancedb_provider_mode {
LuaRuntimeDatabaseProviderMode::DynamicLibrary => {
let library_path = host_options.lancedb_library_path.clone().ok_or_else(|| {
"LanceDB dynamic-library mode requires host_options.lancedb_library_path"
.to_string()
})?;
Some(Arc::new(LoadedLanceDbApi::load(&library_path)?))
}
LuaRuntimeDatabaseProviderMode::HostCallback => {
if !provider_callbacks
.has_lancedb_provider_callback_for_mode(host_options.lancedb_callback_mode)
{
return Err(format!(
"LanceDB host-callback mode is enabled but no {} callback is registered",
callback_mode_name(host_options.lancedb_callback_mode)
));
}
None
}
LuaRuntimeDatabaseProviderMode::SpaceController => None,
};
let controller = match host_options.lancedb_provider_mode {
LuaRuntimeDatabaseProviderMode::SpaceController => Some(
LuaRuntimeSpaceControllerBridge::new(&host_options, "lancedb")?,
),
_ => None,
};
Ok(Self {
api,
controller,
skills: Mutex::new(HashMap::new()),
provider_callbacks,
host_options,
})
}
pub fn register_skill(
&self,
root_name: &str,
skill_name: &str,
skill_dir: &Path,
config: SkillLanceDbMeta,
) -> Result<Arc<LanceDbSkillBinding>, String> {
let mut guard = self
.skills
.lock()
.map_err(|_| "failed to acquire LanceDB skill registry lock".to_string())?;
if let Some(existing) = guard.get(skill_name) {
return Ok(existing.clone());
}
let skill_dir_name = skill_dir
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| {
format!(
"invalid skill directory name for {}: {}",
skill_name,
skill_dir.display()
)
})?
.to_string();
let skills_root = skill_dir.parent().ok_or_else(|| {
format!(
"invalid skill root for {}: {}",
skill_name,
skill_dir.display()
)
})?;
let sidecar_root = skills_root
.parent()
.unwrap_or(skills_root)
.join(self.host_options.database_dir_name.as_str());
let db_path = sidecar_root.join("lancedb").join(skill_name);
let database_path = db_path.to_string_lossy().to_string();
let binding_context = RuntimeDatabaseBindingContext::new(
root_name,
skill_name,
root_name,
sidecar_root.to_string_lossy().to_string(),
skill_dir.to_string_lossy().to_string(),
skill_dir_name.clone(),
RuntimeDatabaseKind::LanceDb,
database_path.clone(),
);
let (resolved_path, handles, provider_mode, controller) = if let Some(api) =
self.api.as_ref()
{
std::fs::create_dir_all(&db_path).map_err(|error| {
format!(
"failed to create LanceDB directory {}: {}: {}",
db_path.display(),
error,
error
)
})?;
let default_path = CString::new(database_path.clone())
.map_err(|_| "database path contains interior NUL bytes".to_string())?;
let mut options = unsafe { (api.runtime_options_default)() };
options.default_db_path = default_path.as_ptr();
options.db_root = ptr::null();
let runtime = unsafe { (api.runtime_create)(options) };
if runtime.is_null() {
return Err(api.take_last_error_message());
}
let engine = unsafe { (api.runtime_open_default_engine)(runtime) };
if engine.is_null() {
unsafe {
(api.runtime_destroy)(runtime);
}
return Err(api.take_last_error_message());
}
let resolved_path = unsafe {
api.take_owned_string((api.runtime_database_path_for_name)(runtime, ptr::null()))
}
.unwrap_or(database_path.clone());
(
resolved_path,
Some(Mutex::new(SkillHandleState { runtime, engine })),
LanceDbBindingMode::DynamicLibrary,
None,
)
} else if matches!(
self.host_options.lancedb_provider_mode,
LuaRuntimeDatabaseProviderMode::SpaceController
) {
let controller = self
.controller
.as_ref()
.ok_or_else(|| "LanceDB space-controller bridge is unavailable".to_string())?
.clone();
let controller_space_id = controller_space_id_for_binding(&binding_context);
let controller_binding_id =
controller.controller_binding_id_for_binding(&binding_context);
let controller_database_path = database_path.clone();
controller.attach_binding(&binding_context)?;
controller.run(move |client| async move {
client
.enable_lancedb(ControllerLanceDbEnableRequest {
space_id: controller_space_id,
binding_id: controller_binding_id,
default_db_path: controller_database_path,
..ControllerLanceDbEnableRequest::default()
})
.await
})?;
(
database_path.clone(),
None,
LanceDbBindingMode::SpaceController,
Some(controller),
)
} else {
(
database_path.clone(),
None,
LanceDbBindingMode::HostCallback,
None,
)
};
let binding = Arc::new(LanceDbSkillBinding {
api: self.api.clone(),
skill_name: skill_name.to_string(),
skill_dir_name,
database_path: resolved_path,
config,
provider_mode,
callback_mode: self.host_options.lancedb_callback_mode,
handles,
controller,
provider_callbacks: self.provider_callbacks.clone(),
provider_binding: binding_context,
});
guard.insert(skill_name.to_string(), binding.clone());
Ok(binding)
}
pub fn binding_for_skill(
&self,
skill_name: &str,
) -> Result<Option<Arc<LanceDbSkillBinding>>, String> {
let skills = self
.skills
.lock()
.map_err(|_| "LanceDB skill binding registry lock poisoned".to_string())?;
Ok(skills.get(skill_name).cloned())
}
}
pub fn disabled_skill_status_json(skill_name: Option<&str>) -> Value {
json!({
"enabled": false,
"initialized": false,
"skill_name": skill_name.unwrap_or(""),
"integration_mode": "dynamic_library",
"reason": "current skill has not enabled lancedb"
})
}
fn callback_mode_name(mode: LuaRuntimeDatabaseCallbackMode) -> &'static str {
match mode {
LuaRuntimeDatabaseCallbackMode::Standard => "standard",
LuaRuntimeDatabaseCallbackMode::Json => "json",
}
}
fn require_string_field<'a>(input: &'a Value, field_name: &str) -> Result<&'a str, String> {
input
.get(field_name)
.and_then(Value::as_str)
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| format!("missing or empty field `{}`", field_name))
}