use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use libloading::Library;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::ffi::{CStr, CString, c_char};
use std::path::{Path, PathBuf};
use std::ptr;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::host::controller::{LuaRuntimeSpaceControllerBridge, controller_space_id_for_binding};
use crate::host::database::{
LuaRuntimeDatabaseCallbackMode, LuaRuntimeDatabaseProviderMode, RuntimeDatabaseBindingContext,
RuntimeDatabaseKind, RuntimeDatabaseProviderCallbacks, RuntimeSqliteProviderAction,
RuntimeSqliteProviderRequest,
};
use crate::lua_skill::{SkillSqliteLogLevel, SkillSqliteMeta};
use crate::runtime_logging::{info as log_info, warn as log_warn};
use crate::runtime_options::LuaRuntimeHostOptions;
use vldb_controller_client::{
ControllerSqliteEnableRequest, ControllerSqliteTokenizerMode, ControllerSqliteValue,
};
#[repr(C)]
struct VldbSqliteRuntimeHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteDatabaseHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteTokenizeResultHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteCustomWordListHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteSearchResultHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteExecuteResultHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteQueryJsonResultHandle {
_private: [u8; 0],
}
#[repr(C)]
struct VldbSqliteQueryStreamHandle {
_private: [u8; 0],
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum VldbSqliteStatusCode {
Success = 0,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum VldbSqliteFfiTokenizerMode {
None = 0,
Jieba = 1,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteDictionaryMutationResultPod {
success: u8,
affected_rows: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteEnsureFtsIndexResultPod {
success: u8,
tokenizer_mode: u32,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteRebuildFtsIndexResultPod {
success: u8,
tokenizer_mode: u32,
reindexed_rows: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteFtsMutationResultPod {
success: u8,
affected_rows: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
struct VldbSqliteByteView {
data: *const u8,
len: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
struct VldbSqliteByteBuffer {
data: *mut u8,
len: u64,
cap: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum VldbSqliteFfiValueKind {
Null = 0,
Int64 = 1,
Float64 = 2,
String = 3,
Bytes = 4,
Bool = 5,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind,
int64_value: i64,
float64_value: f64,
string_value: *const c_char,
bytes_value: VldbSqliteByteView,
bool_value: u8,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
struct VldbSqliteFfiValueSlice {
values: *const VldbSqliteFfiValue,
len: u64,
}
type RuntimeCreateDefaultFn = unsafe extern "C" fn() -> *mut VldbSqliteRuntimeHandle;
type RuntimeDestroyFn = unsafe extern "C" fn(*mut VldbSqliteRuntimeHandle);
type RuntimeOpenDatabaseFn = unsafe extern "C" fn(
*mut VldbSqliteRuntimeHandle,
*const c_char,
) -> *mut VldbSqliteDatabaseHandle;
type DatabaseDestroyFn = unsafe extern "C" fn(*mut VldbSqliteDatabaseHandle);
type DatabaseDbPathFn = unsafe extern "C" fn(*mut VldbSqliteDatabaseHandle) -> *mut c_char;
type StringFreeFn = unsafe extern "C" fn(*mut c_char);
type LastErrorMessageFn = unsafe extern "C" fn() -> *const c_char;
type ClearLastErrorFn = unsafe extern "C" fn();
type LibraryInfoJsonFn = unsafe extern "C" fn() -> *mut c_char;
type DatabaseExecuteScriptFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*const VldbSqliteFfiValue,
u64,
*const c_char,
) -> *mut VldbSqliteExecuteResultHandle;
type DatabaseExecuteBatchFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*const VldbSqliteFfiValueSlice,
u64,
) -> *mut VldbSqliteExecuteResultHandle;
type DatabaseQueryJsonFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*const VldbSqliteFfiValue,
u64,
*const c_char,
) -> *mut VldbSqliteQueryJsonResultHandle;
type DatabaseQueryStreamFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*const VldbSqliteFfiValue,
u64,
*const c_char,
u64,
) -> *mut VldbSqliteQueryStreamHandle;
type ExecuteResultDestroyFn = unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle);
type ExecuteResultSuccessFn = unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle) -> u8;
type ExecuteResultMessageFn =
unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle) -> *mut c_char;
type ExecuteResultRowsChangedFn = unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle) -> i64;
type ExecuteResultLastInsertRowIdFn =
unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle) -> i64;
type ExecuteResultStatementsExecutedFn =
unsafe extern "C" fn(*mut VldbSqliteExecuteResultHandle) -> i64;
type QueryJsonResultDestroyFn = unsafe extern "C" fn(*mut VldbSqliteQueryJsonResultHandle);
type QueryJsonResultJsonDataFn =
unsafe extern "C" fn(*mut VldbSqliteQueryJsonResultHandle) -> *mut c_char;
type QueryJsonResultRowCountFn = unsafe extern "C" fn(*mut VldbSqliteQueryJsonResultHandle) -> u64;
type QueryStreamDestroyFn = unsafe extern "C" fn(*mut VldbSqliteQueryStreamHandle);
type QueryStreamChunkCountFn = unsafe extern "C" fn(*mut VldbSqliteQueryStreamHandle) -> u64;
type QueryStreamRowCountFn = unsafe extern "C" fn(*mut VldbSqliteQueryStreamHandle) -> u64;
type QueryStreamTotalBytesFn = unsafe extern "C" fn(*mut VldbSqliteQueryStreamHandle) -> u64;
type QueryStreamGetChunkFn =
unsafe extern "C" fn(*mut VldbSqliteQueryStreamHandle, u64) -> VldbSqliteByteBuffer;
type BytesFreeFn = unsafe extern "C" fn(VldbSqliteByteBuffer);
type DatabaseTokenizeTextFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
VldbSqliteFfiTokenizerMode,
*const c_char,
u8,
) -> *mut VldbSqliteTokenizeResultHandle;
type TokenizeResultDestroyFn = unsafe extern "C" fn(*mut VldbSqliteTokenizeResultHandle);
type TokenizeResultNormalizedTextFn =
unsafe extern "C" fn(*mut VldbSqliteTokenizeResultHandle) -> *mut c_char;
type TokenizeResultFtsQueryFn =
unsafe extern "C" fn(*mut VldbSqliteTokenizeResultHandle) -> *mut c_char;
type TokenizeResultTokenCountFn = unsafe extern "C" fn(*mut VldbSqliteTokenizeResultHandle) -> u64;
type TokenizeResultGetTokenFn =
unsafe extern "C" fn(*mut VldbSqliteTokenizeResultHandle, u64) -> *mut c_char;
type DatabaseUpsertCustomWordFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
u64,
*mut VldbSqliteDictionaryMutationResultPod,
) -> i32;
type DatabaseRemoveCustomWordFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*mut VldbSqliteDictionaryMutationResultPod,
) -> i32;
type DatabaseListCustomWordsFn =
unsafe extern "C" fn(*mut VldbSqliteDatabaseHandle) -> *mut VldbSqliteCustomWordListHandle;
type CustomWordListDestroyFn = unsafe extern "C" fn(*mut VldbSqliteCustomWordListHandle);
type CustomWordListLenFn = unsafe extern "C" fn(*mut VldbSqliteCustomWordListHandle) -> u64;
type CustomWordListGetWordFn =
unsafe extern "C" fn(*mut VldbSqliteCustomWordListHandle, u64) -> *mut c_char;
type CustomWordListGetWeightFn =
unsafe extern "C" fn(*mut VldbSqliteCustomWordListHandle, u64) -> u64;
type DatabaseEnsureFtsIndexFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
VldbSqliteFfiTokenizerMode,
*mut VldbSqliteEnsureFtsIndexResultPod,
) -> i32;
type DatabaseRebuildFtsIndexFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
VldbSqliteFfiTokenizerMode,
*mut VldbSqliteRebuildFtsIndexResultPod,
) -> i32;
type DatabaseUpsertFtsDocumentFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
VldbSqliteFfiTokenizerMode,
*const c_char,
*const c_char,
*const c_char,
*const c_char,
*mut VldbSqliteFtsMutationResultPod,
) -> i32;
type DatabaseDeleteFtsDocumentFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
*const c_char,
*mut VldbSqliteFtsMutationResultPod,
) -> i32;
type DatabaseSearchFtsFn = unsafe extern "C" fn(
*mut VldbSqliteDatabaseHandle,
*const c_char,
VldbSqliteFfiTokenizerMode,
*const c_char,
u32,
u32,
) -> *mut VldbSqliteSearchResultHandle;
type SearchResultDestroyFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle);
type SearchResultTotalFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle) -> u64;
type SearchResultLenFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle) -> u64;
type SearchResultSourceFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle) -> *mut c_char;
type SearchResultQueryModeFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle) -> *mut c_char;
type SearchResultGetIdFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> *mut c_char;
type SearchResultGetFilePathFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> *mut c_char;
type SearchResultGetTitleFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> *mut c_char;
type SearchResultGetTitleHighlightFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> *mut c_char;
type SearchResultGetContentSnippetFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> *mut c_char;
type SearchResultGetScoreFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> f64;
type SearchResultGetRankFn = unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> u64;
type SearchResultGetRawScoreFn =
unsafe extern "C" fn(*mut VldbSqliteSearchResultHandle, u64) -> f64;
struct LoadedSqliteApi {
_library: Library,
library_path: PathBuf,
runtime_create_default: RuntimeCreateDefaultFn,
runtime_destroy: RuntimeDestroyFn,
runtime_open_database: RuntimeOpenDatabaseFn,
database_destroy: DatabaseDestroyFn,
database_db_path: DatabaseDbPathFn,
string_free: StringFreeFn,
last_error_message: LastErrorMessageFn,
clear_last_error: ClearLastErrorFn,
library_info_json: LibraryInfoJsonFn,
database_execute_script: DatabaseExecuteScriptFn,
database_execute_batch: DatabaseExecuteBatchFn,
database_query_json: DatabaseQueryJsonFn,
database_query_stream: DatabaseQueryStreamFn,
execute_result_destroy: ExecuteResultDestroyFn,
execute_result_success: ExecuteResultSuccessFn,
execute_result_message: ExecuteResultMessageFn,
execute_result_rows_changed: ExecuteResultRowsChangedFn,
execute_result_last_insert_rowid: ExecuteResultLastInsertRowIdFn,
execute_result_statements_executed: ExecuteResultStatementsExecutedFn,
query_json_result_destroy: QueryJsonResultDestroyFn,
query_json_result_json_data: QueryJsonResultJsonDataFn,
query_json_result_row_count: QueryJsonResultRowCountFn,
query_stream_destroy: QueryStreamDestroyFn,
query_stream_chunk_count: QueryStreamChunkCountFn,
query_stream_row_count: QueryStreamRowCountFn,
query_stream_total_bytes: QueryStreamTotalBytesFn,
query_stream_get_chunk: QueryStreamGetChunkFn,
bytes_free: BytesFreeFn,
database_tokenize_text: DatabaseTokenizeTextFn,
tokenize_result_destroy: TokenizeResultDestroyFn,
tokenize_result_normalized_text: TokenizeResultNormalizedTextFn,
tokenize_result_fts_query: TokenizeResultFtsQueryFn,
tokenize_result_token_count: TokenizeResultTokenCountFn,
tokenize_result_get_token: TokenizeResultGetTokenFn,
database_upsert_custom_word: DatabaseUpsertCustomWordFn,
database_remove_custom_word: DatabaseRemoveCustomWordFn,
database_list_custom_words: DatabaseListCustomWordsFn,
custom_word_list_destroy: CustomWordListDestroyFn,
custom_word_list_len: CustomWordListLenFn,
custom_word_list_get_word: CustomWordListGetWordFn,
custom_word_list_get_weight: CustomWordListGetWeightFn,
database_ensure_fts_index: DatabaseEnsureFtsIndexFn,
database_rebuild_fts_index: DatabaseRebuildFtsIndexFn,
database_upsert_fts_document: DatabaseUpsertFtsDocumentFn,
database_delete_fts_document: DatabaseDeleteFtsDocumentFn,
database_search_fts: DatabaseSearchFtsFn,
search_result_destroy: SearchResultDestroyFn,
search_result_total: SearchResultTotalFn,
search_result_len: SearchResultLenFn,
search_result_source: SearchResultSourceFn,
search_result_query_mode: SearchResultQueryModeFn,
search_result_get_id: SearchResultGetIdFn,
search_result_get_file_path: SearchResultGetFilePathFn,
search_result_get_title: SearchResultGetTitleFn,
search_result_get_title_highlight: SearchResultGetTitleHighlightFn,
search_result_get_content_snippet: SearchResultGetContentSnippetFn,
search_result_get_score: SearchResultGetScoreFn,
search_result_get_rank: SearchResultGetRankFn,
search_result_get_raw_score: SearchResultGetRawScoreFn,
}
unsafe impl Send for LoadedSqliteApi {}
unsafe impl Sync for LoadedSqliteApi {}
impl LoadedSqliteApi {
fn load(library_path: &Path) -> Result<Self, String> {
if !library_path.exists() {
return Err(format!(
"SQLite dynamic library path does not exist: {}",
library_path.display()
));
}
let library = unsafe { Library::new(library_path) }.map_err(|error| {
format!(
"failed to load {}: {}: {}",
library_path.display(),
error,
error
)
})?;
unsafe { Self::from_library(library_path.to_path_buf(), library) }
}
unsafe fn from_library(library_path: PathBuf, library: Library) -> Result<Self, String> {
macro_rules! load_symbol {
($name:literal, $ty:ty) => {{
unsafe {
*library
.get::<$ty>(concat!($name, "\0").as_bytes())
.map_err(|error| {
format!(
"failed to load symbol {} from {}: {}",
$name,
library_path.display(),
error
)
})?
}
}};
}
Ok(Self {
runtime_create_default: load_symbol!(
"vldb_sqlite_runtime_create_default",
RuntimeCreateDefaultFn
),
runtime_destroy: load_symbol!("vldb_sqlite_runtime_destroy", RuntimeDestroyFn),
runtime_open_database: load_symbol!(
"vldb_sqlite_runtime_open_database",
RuntimeOpenDatabaseFn
),
database_destroy: load_symbol!("vldb_sqlite_database_destroy", DatabaseDestroyFn),
database_db_path: load_symbol!("vldb_sqlite_database_db_path", DatabaseDbPathFn),
string_free: load_symbol!("vldb_sqlite_string_free", StringFreeFn),
last_error_message: load_symbol!("vldb_sqlite_last_error_message", LastErrorMessageFn),
clear_last_error: load_symbol!("vldb_sqlite_clear_last_error", ClearLastErrorFn),
library_info_json: load_symbol!("vldb_sqlite_library_info_json", LibraryInfoJsonFn),
database_execute_script: load_symbol!(
"vldb_sqlite_database_execute_script",
DatabaseExecuteScriptFn
),
database_execute_batch: load_symbol!(
"vldb_sqlite_database_execute_batch",
DatabaseExecuteBatchFn
),
database_query_json: load_symbol!(
"vldb_sqlite_database_query_json",
DatabaseQueryJsonFn
),
database_query_stream: load_symbol!(
"vldb_sqlite_database_query_stream",
DatabaseQueryStreamFn
),
execute_result_destroy: load_symbol!(
"vldb_sqlite_execute_result_destroy",
ExecuteResultDestroyFn
),
execute_result_success: load_symbol!(
"vldb_sqlite_execute_result_success",
ExecuteResultSuccessFn
),
execute_result_message: load_symbol!(
"vldb_sqlite_execute_result_message",
ExecuteResultMessageFn
),
execute_result_rows_changed: load_symbol!(
"vldb_sqlite_execute_result_rows_changed",
ExecuteResultRowsChangedFn
),
execute_result_last_insert_rowid: load_symbol!(
"vldb_sqlite_execute_result_last_insert_rowid",
ExecuteResultLastInsertRowIdFn
),
execute_result_statements_executed: load_symbol!(
"vldb_sqlite_execute_result_statements_executed",
ExecuteResultStatementsExecutedFn
),
query_json_result_destroy: load_symbol!(
"vldb_sqlite_query_json_result_destroy",
QueryJsonResultDestroyFn
),
query_json_result_json_data: load_symbol!(
"vldb_sqlite_query_json_result_json_data",
QueryJsonResultJsonDataFn
),
query_json_result_row_count: load_symbol!(
"vldb_sqlite_query_json_result_row_count",
QueryJsonResultRowCountFn
),
query_stream_destroy: load_symbol!(
"vldb_sqlite_query_stream_destroy",
QueryStreamDestroyFn
),
query_stream_chunk_count: load_symbol!(
"vldb_sqlite_query_stream_chunk_count",
QueryStreamChunkCountFn
),
query_stream_row_count: load_symbol!(
"vldb_sqlite_query_stream_row_count",
QueryStreamRowCountFn
),
query_stream_total_bytes: load_symbol!(
"vldb_sqlite_query_stream_total_bytes",
QueryStreamTotalBytesFn
),
query_stream_get_chunk: load_symbol!(
"vldb_sqlite_query_stream_get_chunk",
QueryStreamGetChunkFn
),
bytes_free: load_symbol!("vldb_sqlite_bytes_free", BytesFreeFn),
database_tokenize_text: load_symbol!(
"vldb_sqlite_database_tokenize_text",
DatabaseTokenizeTextFn
),
tokenize_result_destroy: load_symbol!(
"vldb_sqlite_tokenize_result_destroy",
TokenizeResultDestroyFn
),
tokenize_result_normalized_text: load_symbol!(
"vldb_sqlite_tokenize_result_normalized_text",
TokenizeResultNormalizedTextFn
),
tokenize_result_fts_query: load_symbol!(
"vldb_sqlite_tokenize_result_fts_query",
TokenizeResultFtsQueryFn
),
tokenize_result_token_count: load_symbol!(
"vldb_sqlite_tokenize_result_token_count",
TokenizeResultTokenCountFn
),
tokenize_result_get_token: load_symbol!(
"vldb_sqlite_tokenize_result_get_token",
TokenizeResultGetTokenFn
),
database_upsert_custom_word: load_symbol!(
"vldb_sqlite_database_upsert_custom_word",
DatabaseUpsertCustomWordFn
),
database_remove_custom_word: load_symbol!(
"vldb_sqlite_database_remove_custom_word",
DatabaseRemoveCustomWordFn
),
database_list_custom_words: load_symbol!(
"vldb_sqlite_database_list_custom_words",
DatabaseListCustomWordsFn
),
custom_word_list_destroy: load_symbol!(
"vldb_sqlite_custom_word_list_destroy",
CustomWordListDestroyFn
),
custom_word_list_len: load_symbol!(
"vldb_sqlite_custom_word_list_len",
CustomWordListLenFn
),
custom_word_list_get_word: load_symbol!(
"vldb_sqlite_custom_word_list_get_word",
CustomWordListGetWordFn
),
custom_word_list_get_weight: load_symbol!(
"vldb_sqlite_custom_word_list_get_weight",
CustomWordListGetWeightFn
),
database_ensure_fts_index: load_symbol!(
"vldb_sqlite_database_ensure_fts_index",
DatabaseEnsureFtsIndexFn
),
database_rebuild_fts_index: load_symbol!(
"vldb_sqlite_database_rebuild_fts_index",
DatabaseRebuildFtsIndexFn
),
database_upsert_fts_document: load_symbol!(
"vldb_sqlite_database_upsert_fts_document",
DatabaseUpsertFtsDocumentFn
),
database_delete_fts_document: load_symbol!(
"vldb_sqlite_database_delete_fts_document",
DatabaseDeleteFtsDocumentFn
),
database_search_fts: load_symbol!(
"vldb_sqlite_database_search_fts",
DatabaseSearchFtsFn
),
search_result_destroy: load_symbol!(
"vldb_sqlite_search_result_destroy",
SearchResultDestroyFn
),
search_result_total: load_symbol!(
"vldb_sqlite_search_result_total",
SearchResultTotalFn
),
search_result_len: load_symbol!("vldb_sqlite_search_result_len", SearchResultLenFn),
search_result_source: load_symbol!(
"vldb_sqlite_search_result_source",
SearchResultSourceFn
),
search_result_query_mode: load_symbol!(
"vldb_sqlite_search_result_query_mode",
SearchResultQueryModeFn
),
search_result_get_id: load_symbol!(
"vldb_sqlite_search_result_get_id",
SearchResultGetIdFn
),
search_result_get_file_path: load_symbol!(
"vldb_sqlite_search_result_get_file_path",
SearchResultGetFilePathFn
),
search_result_get_title: load_symbol!(
"vldb_sqlite_search_result_get_title",
SearchResultGetTitleFn
),
search_result_get_title_highlight: load_symbol!(
"vldb_sqlite_search_result_get_title_highlight",
SearchResultGetTitleHighlightFn
),
search_result_get_content_snippet: load_symbol!(
"vldb_sqlite_search_result_get_content_snippet",
SearchResultGetContentSnippetFn
),
search_result_get_score: load_symbol!(
"vldb_sqlite_search_result_get_score",
SearchResultGetScoreFn
),
search_result_get_rank: load_symbol!(
"vldb_sqlite_search_result_get_rank",
SearchResultGetRankFn
),
search_result_get_raw_score: load_symbol!(
"vldb_sqlite_search_result_get_raw_score",
SearchResultGetRawScoreFn
),
_library: library,
library_path,
})
}
fn take_last_error_message(&self) -> String {
unsafe {
let ptr = (self.last_error_message)();
let text = if ptr.is_null() {
"unknown SQLite host error".to_string()
} else {
CStr::from_ptr(ptr).to_string_lossy().to_string()
};
(self.clear_last_error)();
text
}
}
fn take_owned_string(&self, ptr: *mut c_char) -> Result<String, String> {
if ptr.is_null() {
return Err(self.take_last_error_message());
}
unsafe {
let text = CStr::from_ptr(ptr).to_string_lossy().to_string();
(self.string_free)(ptr);
Ok(text)
}
}
fn take_optional_string(&self, ptr: *mut c_char) -> Option<String> {
if ptr.is_null() {
return None;
}
unsafe {
let text = CStr::from_ptr(ptr).to_string_lossy().to_string();
(self.string_free)(ptr);
Some(text)
}
}
fn call_json_noarg(
&self,
function: LibraryInfoJsonFn,
operation: &str,
) -> Result<Value, String> {
unsafe {
let response_ptr = function();
let response_text = self.take_owned_string(response_ptr)?;
serde_json::from_str(&response_text)
.map_err(|error| format!("{} returned invalid JSON: {}", operation, error))
}
}
fn take_chunk_bytes(&self, buffer: VldbSqliteByteBuffer) -> Result<Vec<u8>, String> {
if buffer.data.is_null() {
if buffer.len == 0 {
return Ok(Vec::new());
}
return Err(self.take_last_error_message());
}
let len =
usize::try_from(buffer.len).map_err(|_| "chunk length exceeds usize".to_string())?;
unsafe {
let bytes = std::slice::from_raw_parts(buffer.data, len).to_vec();
(self.bytes_free)(buffer);
Ok(bytes)
}
}
}
struct SkillHandleState {
runtime: *mut VldbSqliteRuntimeHandle,
database: *mut VldbSqliteDatabaseHandle,
query_streams: HashMap<u64, *mut VldbSqliteQueryStreamHandle>,
next_stream_id: u64,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum SqliteBindingMode {
DynamicLibrary,
HostCallback,
SpaceController,
}
unsafe impl Send for SkillHandleState {}
pub struct SqliteSkillBinding {
api: Option<Arc<LoadedSqliteApi>>,
skill_name: String,
skill_dir_name: String,
database_path: String,
config: SkillSqliteMeta,
provider_mode: SqliteBindingMode,
callback_mode: LuaRuntimeDatabaseCallbackMode,
handles: Option<Mutex<SkillHandleState>>,
controller: Option<Arc<LuaRuntimeSpaceControllerBridge>>,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
provider_binding: RuntimeDatabaseBindingContext,
}
impl SqliteSkillBinding {
pub fn status_json(&self) -> Value {
let integration_mode = self.integration_mode_name();
let controller_capabilities = if self.is_space_controller_mode() {
vec![
"execute_script",
"execute_batch",
"query_json",
"tokenize_text",
"upsert_custom_word",
"remove_custom_word",
"list_custom_words",
"ensure_fts_index",
"rebuild_fts_index",
"upsert_fts_document",
"delete_fts_document",
"search_fts",
]
} else {
vec![
"execute_script",
"execute_batch",
"query_json",
"query_stream",
"query_stream_wait_metrics",
"query_stream_chunk",
"query_stream_close",
"tokenize_text",
"upsert_custom_word",
"remove_custom_word",
"list_custom_words",
"ensure_fts_index",
"rebuild_fts_index",
"upsert_fts_document",
"delete_fts_document",
"search_fts",
]
};
let library_info = if let Some(api) = self.api.as_ref() {
api.call_json_noarg(api.library_info_json, "library_info_json")
.unwrap_or_else(|error| {
json!({
"name": "vldb-sqlite",
"version": "unknown",
"ffi_stage": "unknown",
"capabilities": [],
"warning": error,
})
})
} else {
let provider_name = match self.provider_mode {
SqliteBindingMode::HostCallback => "host_callback",
SqliteBindingMode::SpaceController => "space_controller",
SqliteBindingMode::DynamicLibrary => "vldb-sqlite",
};
let provider_version = match self.provider_mode {
SqliteBindingMode::SpaceController => "controller_managed",
_ => "host_managed",
};
json!({
"name": provider_name,
"version": provider_version,
"ffi_stage": provider_version,
"capabilities": controller_capabilities,
})
};
json!({
"enabled": true,
"initialized": true,
"skill_name": self.skill_name,
"skill_dir_name": self.skill_dir_name,
"database_path": self.database_path,
"integration_mode": integration_mode,
"library_path": self.api.as_ref().map(|api| api.library_path.to_string_lossy().to_string()).unwrap_or_default(),
"library_name": library_info.get("name").cloned().unwrap_or(Value::String("vldb-sqlite".to_string())),
"library_version": library_info.get("version").cloned().unwrap_or(Value::String("unknown".to_string())),
"ffi_stage": library_info.get("ffi_stage").cloned().unwrap_or(Value::String("unknown".to_string())),
"capabilities": library_info.get("capabilities").cloned().unwrap_or_else(|| Value::Array(Vec::new())),
"space_label": self.provider_binding.space_label,
"root_name": self.provider_binding.root_name,
"binding_tag": self.provider_binding.binding_tag,
"space_root": self.provider_binding.space_root,
"default_database_path": self.provider_binding.default_database_path,
"log_level": self.config.log_level.as_str(),
"slow_log_enabled": self.config.slow_log_enabled,
"slow_log_threshold_ms": self.config.slow_log_threshold_ms,
})
}
pub fn info_json(&self) -> Value {
let mut status = self.status_json();
if let Some(status_object) = status.as_object_mut() {
let library_info = if let Some(api) = self.api.as_ref() {
api.call_json_noarg(api.library_info_json, "library_info_json")
.unwrap_or_else(|error| {
json!({
"name": "vldb-sqlite",
"version": "unknown",
"ffi_stage": "unknown",
"capabilities": [],
"warning": error,
})
})
} else {
json!({
"name": "host_callback",
"version": "host_managed",
"ffi_stage": "host_managed",
})
};
status_object.insert("library_info".to_string(), library_info);
}
status
}
pub fn execute_script(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
self.log_info("execute_script", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let sql_text = sql.to_string();
let mapped_params = map_controller_sqlite_params(¶ms);
let result = bridge.run(move |client| async move {
client
.execute_sqlite_script_typed(space_id, binding_id, sql_text, mapped_params)
.await
})?;
self.log_if_slow("execute_script", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"rows_changed": result.rows_changed,
"last_insert_rowid": result.last_insert_rowid,
"statements_executed": 1,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::ExecuteScript, input);
}
let api = self.api_ref();
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
let owned_params = build_owned_ffi_values(¶ms)?;
self.log_info("execute_script", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let sql_cstr = to_cstring(sql, "sql")?;
unsafe {
let result_handle = (api.database_execute_script)(
guard.database,
sql_cstr.as_ptr(),
if owned_params.values.is_empty() {
ptr::null()
} else {
owned_params.as_ptr()
},
owned_params.len_u64(),
ptr::null(),
);
if result_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("execute_script", &error);
return Err(error);
}
let result = json!({
"success": u8_to_bool((api.execute_result_success)(result_handle)),
"message": api.take_optional_string((api.execute_result_message)(result_handle)).unwrap_or_default(),
"rows_changed": (api.execute_result_rows_changed)(result_handle),
"last_insert_rowid": (api.execute_result_last_insert_rowid)(result_handle),
"statements_executed": (api.execute_result_statements_executed)(result_handle),
});
(api.execute_result_destroy)(result_handle);
drop(guard);
self.log_if_slow("execute_script", started_at, None);
Ok(result)
}
}
pub fn execute_batch(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let sql = require_string_field(input, "sql")?;
let rows = parse_batch_sql_params(input)?;
self.log_info("execute_batch", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let sql_text = sql.to_string();
let mapped_rows = rows
.iter()
.map(|row| map_controller_sqlite_params(row))
.collect();
let result = bridge.run(move |client| async move {
client
.execute_sqlite_batch_typed(space_id, binding_id, sql_text, mapped_rows)
.await
})?;
self.log_if_slow("execute_batch", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"rows_changed": result.rows_changed,
"last_insert_rowid": result.last_insert_rowid,
"statements_executed": result.statements_executed,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::ExecuteBatch, input);
}
let api = self.api_ref();
let sql = require_string_field(input, "sql")?;
let rows = parse_batch_sql_params(input)?;
let owned_rows = build_owned_ffi_value_matrix(&rows)?;
self.log_info("execute_batch", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let sql_cstr = to_cstring(sql, "sql")?;
unsafe {
let result_handle = (api.database_execute_batch)(
guard.database,
sql_cstr.as_ptr(),
owned_rows.as_ptr(),
owned_rows.len_u64(),
);
if result_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("execute_batch", &error);
return Err(error);
}
let result = json!({
"success": u8_to_bool((api.execute_result_success)(result_handle)),
"message": api.take_optional_string((api.execute_result_message)(result_handle)).unwrap_or_default(),
"rows_changed": (api.execute_result_rows_changed)(result_handle),
"last_insert_rowid": (api.execute_result_last_insert_rowid)(result_handle),
"statements_executed": (api.execute_result_statements_executed)(result_handle),
});
(api.execute_result_destroy)(result_handle);
drop(guard);
self.log_if_slow("execute_batch", started_at, None);
Ok(result)
}
}
pub fn query_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
self.log_info("query_json", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let sql_text = sql.to_string();
let mapped_params = map_controller_sqlite_params(¶ms);
let result = bridge.run(move |client| async move {
client
.query_sqlite_json_typed(space_id, binding_id, sql_text, mapped_params)
.await
})?;
let rows = serde_json::from_str::<Value>(&result.json_data).map_err(|error| {
format!("space_controller returned invalid json_data: {}", error)
})?;
self.log_if_slow(
"query_json",
started_at,
Some(format!("rows={}", result.row_count)),
);
return Ok(json!({
"success": true,
"row_count": result.row_count,
"json_data": result.json_data,
"rows": rows,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::QueryJson, input);
}
let api = self.api_ref();
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
let owned_params = build_owned_ffi_values(¶ms)?;
self.log_info("query_json", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let sql_cstr = to_cstring(sql, "sql")?;
unsafe {
let result_handle = (api.database_query_json)(
guard.database,
sql_cstr.as_ptr(),
if owned_params.values.is_empty() {
ptr::null()
} else {
owned_params.as_ptr()
},
owned_params.len_u64(),
ptr::null(),
);
if result_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("query_json", &error);
return Err(error);
}
let row_count = (api.query_json_result_row_count)(result_handle);
let json_data =
api.take_owned_string((api.query_json_result_json_data)(result_handle))?;
let rows = serde_json::from_str::<Value>(&json_data)
.map_err(|error| format!("query_json returned invalid json_data: {}", error))?;
(api.query_json_result_destroy)(result_handle);
drop(guard);
self.log_if_slow(
"query_json",
started_at,
Some(format!("rows={}", row_count)),
);
Ok(json!({
"success": true,
"row_count": row_count,
"json_data": json_data,
"rows": rows,
}))
}
}
pub fn query_stream(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
let chunk_bytes = input
.get("chunk_bytes")
.or_else(|| input.get("chunk_size"))
.and_then(Value::as_u64);
self.log_info("query_stream", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let sql_text = sql.to_string();
let mapped_params = map_controller_sqlite_params(¶ms);
let result = bridge.run(move |client| async move {
client
.open_sqlite_query_stream_typed(
space_id,
binding_id,
sql_text,
mapped_params,
chunk_bytes,
)
.await
})?;
self.log_if_slow(
"query_stream",
started_at,
Some(format!(
"stream_id={} metrics_ready={}",
result.stream_id, result.metrics_ready
)),
);
return Ok(json!({
"success": true,
"stream_id": result.stream_id,
"metrics_ready": result.metrics_ready,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::QueryStream, input);
}
let api = self.api_ref();
let sql = require_string_field(input, "sql")?;
let params = parse_single_sql_params(input)?;
let owned_params = build_owned_ffi_values(¶ms)?;
let chunk_bytes = input
.get("chunk_bytes")
.or_else(|| input.get("chunk_size"))
.and_then(Value::as_u64)
.unwrap_or(0);
self.log_info("query_stream", None);
let started_at = Instant::now();
let mut guard = self.lock_handles()?;
let sql_cstr = to_cstring(sql, "sql")?;
unsafe {
let result_handle = (api.database_query_stream)(
guard.database,
sql_cstr.as_ptr(),
if owned_params.values.is_empty() {
ptr::null()
} else {
owned_params.as_ptr()
},
owned_params.len_u64(),
ptr::null(),
chunk_bytes,
);
if result_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("query_stream", &error);
return Err(error);
}
let stream_id = guard.next_stream_id;
guard.next_stream_id = guard.next_stream_id.saturating_add(1).max(1);
guard.query_streams.insert(stream_id, result_handle);
drop(guard);
self.log_if_slow(
"query_stream",
started_at,
Some(format!("stream_id={} metrics_ready=false", stream_id)),
);
Ok(json!({
"success": true,
"stream_id": stream_id,
"metrics_ready": false,
}))
}
}
pub fn query_stream_wait_metrics(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
self.log_info("query_stream_wait_metrics", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let metrics = bridge.run(move |client| async move {
client.wait_sqlite_query_stream_metrics(stream_id).await
})?;
self.log_if_slow(
"query_stream_wait_metrics",
started_at,
Some(format!(
"stream_id={} chunks={} rows={} bytes={}",
stream_id, metrics.chunk_count, metrics.row_count, metrics.total_bytes
)),
);
return Ok(json!({
"success": true,
"stream_id": stream_id,
"metrics_ready": true,
"row_count": metrics.row_count,
"chunk_count": metrics.chunk_count,
"total_bytes": metrics.total_bytes,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(
RuntimeSqliteProviderAction::QueryStreamWaitMetrics,
input,
);
}
let api = self.api_ref();
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
self.log_info("query_stream_wait_metrics", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let stream_handle = *guard
.query_streams
.get(&stream_id)
.ok_or_else(|| format!("query stream handle not found: {}", stream_id))?;
unsafe {
let row_count = (api.query_stream_row_count)(stream_handle);
let chunk_count = (api.query_stream_chunk_count)(stream_handle);
let total_bytes = (api.query_stream_total_bytes)(stream_handle);
drop(guard);
self.log_if_slow(
"query_stream_wait_metrics",
started_at,
Some(format!(
"stream_id={} chunks={} rows={} bytes={}",
stream_id, chunk_count, row_count, total_bytes
)),
);
Ok(json!({
"success": true,
"stream_id": stream_id,
"metrics_ready": true,
"row_count": row_count,
"chunk_count": chunk_count,
"total_bytes": total_bytes,
}))
}
}
pub fn query_stream_chunk(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
let index = input
.get("index")
.and_then(Value::as_u64)
.ok_or_else(|| "index is required".to_string())?;
self.log_info("query_stream_chunk", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let chunk = bridge.run(move |client| async move {
client
.read_sqlite_query_stream_chunk(stream_id, index)
.await
})?;
self.log_if_slow(
"query_stream_chunk",
started_at,
Some(format!(
"stream_id={} index={} bytes={}",
stream_id,
index,
chunk.len()
)),
);
return Ok(json!({
"success": true,
"stream_id": stream_id,
"index": index,
"byte_count": u64::try_from(chunk.len()).unwrap_or(u64::MAX),
"chunk_base64": BASE64_STANDARD.encode(chunk),
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::QueryStreamChunk, input);
}
let api = self.api_ref();
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
let index = input
.get("index")
.and_then(Value::as_u64)
.ok_or_else(|| "index is required".to_string())?;
self.log_info("query_stream_chunk", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let stream_handle = *guard
.query_streams
.get(&stream_id)
.ok_or_else(|| format!("query stream handle not found: {}", stream_id))?;
unsafe {
let buffer = (api.query_stream_get_chunk)(stream_handle, index);
let chunk = api.take_chunk_bytes(buffer)?;
drop(guard);
self.log_if_slow(
"query_stream_chunk",
started_at,
Some(format!(
"stream_id={} index={} bytes={}",
stream_id,
index,
chunk.len()
)),
);
Ok(json!({
"success": true,
"stream_id": stream_id,
"index": index,
"byte_count": u64::try_from(chunk.len()).unwrap_or(u64::MAX),
"chunk_base64": BASE64_STANDARD.encode(chunk),
}))
}
}
pub fn query_stream_close(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
self.log_info("query_stream_close", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let closed = bridge.run(move |client| async move {
client.close_sqlite_query_stream(stream_id).await
})?;
self.log_if_slow(
"query_stream_close",
started_at,
Some(format!("stream_id={}", stream_id)),
);
return Ok(json!({
"success": closed,
"stream_id": stream_id,
"message": if closed {
format!("query_stream handle {} closed successfully", stream_id)
} else {
format!("query_stream handle {} was not found or was already closed", stream_id)
},
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::QueryStreamClose, input);
}
let api = self.api_ref();
let stream_id = input
.get("stream_id")
.and_then(Value::as_u64)
.ok_or_else(|| "stream_id is required".to_string())?;
self.log_info("query_stream_close", None);
let started_at = Instant::now();
let mut guard = self.lock_handles()?;
let stream_handle = guard
.query_streams
.remove(&stream_id)
.ok_or_else(|| format!("query stream handle not found: {}", stream_id))?;
unsafe {
(api.query_stream_destroy)(stream_handle);
drop(guard);
self.log_if_slow(
"query_stream_close",
started_at,
Some(format!("stream_id={}", stream_id)),
);
Ok(json!({
"success": true,
"stream_id": stream_id,
"message": format!("query_stream handle {} closed successfully", stream_id),
}))
}
}
pub fn tokenize_text_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let tokenizer_mode = map_controller_tokenizer_mode(parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?);
let text = require_string_field(input, "text")?;
let search_mode = input
.get("search_mode")
.and_then(Value::as_bool)
.unwrap_or(false);
self.log_info(
"tokenize_text",
Some(format!(
"tokenizer_mode={} search_mode={}",
controller_tokenizer_mode_name(tokenizer_mode),
search_mode
)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let text_value = text.to_string();
let result = bridge.run(move |client| async move {
client
.tokenize_sqlite_text(
space_id,
binding_id,
tokenizer_mode,
text_value,
search_mode,
)
.await
})?;
self.log_if_slow("tokenize_text", started_at, None);
return Ok(json!({
"success": true,
"tokenizer_mode": result.tokenizer_mode,
"normalized_text": result.normalized_text,
"fts_query": result.fts_query,
"tokens": result.tokens,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::TokenizeText, input);
}
let api = self.api_ref();
let tokenizer_mode = parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?;
let text = require_string_field(input, "text")?;
let search_mode = input
.get("search_mode")
.and_then(Value::as_bool)
.unwrap_or(false);
self.log_info(
"tokenize_text",
Some(format!(
"tokenizer_mode={} search_mode={}",
tokenizer_mode_name(tokenizer_mode),
search_mode
)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let text_cstr = to_cstring(text, "text")?;
unsafe {
let handle = (api.database_tokenize_text)(
guard.database,
tokenizer_mode,
text_cstr.as_ptr(),
bool_to_u8(search_mode),
);
if handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("tokenize_text", &error);
return Err(error);
}
let normalized_text =
api.take_owned_string((api.tokenize_result_normalized_text)(handle))?;
let fts_query = api.take_owned_string((api.tokenize_result_fts_query)(handle))?;
let token_count = (api.tokenize_result_token_count)(handle);
let mut tokens = Vec::with_capacity(token_count as usize);
for index in 0..token_count {
if let Some(token) =
api.take_optional_string((api.tokenize_result_get_token)(handle, index))
{
tokens.push(Value::String(token));
}
}
(api.tokenize_result_destroy)(handle);
drop(guard);
self.log_if_slow("tokenize_text", started_at, None);
Ok(json!({
"success": true,
"tokenizer_mode": tokenizer_mode_name(tokenizer_mode),
"normalized_text": normalized_text,
"fts_query": fts_query,
"tokens": tokens,
}))
}
}
pub fn upsert_custom_word_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let word = require_string_field(input, "word")?;
let weight = input.get("weight").and_then(Value::as_u64).unwrap_or(1);
self.log_info("upsert_custom_word", Some(format!("word={}", word)));
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let word_value = word.to_string();
let weight_value = u32::try_from(weight).unwrap_or(u32::MAX);
let result = bridge.run(move |client| async move {
client
.upsert_sqlite_custom_word(space_id, binding_id, word_value, weight_value)
.await
})?;
self.log_if_slow("upsert_custom_word", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"affected_rows": result.affected_rows,
"word": word,
"weight": weight,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::UpsertCustomWord, input);
}
let api = self.api_ref();
let word = require_string_field(input, "word")?;
let weight = input.get("weight").and_then(Value::as_u64).unwrap_or(1);
self.log_info("upsert_custom_word", Some(format!("word={}", word)));
let started_at = Instant::now();
let guard = self.lock_handles()?;
let word_cstr = to_cstring(word, "word")?;
let mut result = VldbSqliteDictionaryMutationResultPod {
success: 0,
affected_rows: 0,
};
let status = unsafe {
(api.database_upsert_custom_word)(
guard.database,
word_cstr.as_ptr(),
weight,
&mut result,
)
};
drop(guard);
self.log_if_slow("upsert_custom_word", started_at, None);
ensure_status(api, status, "upsert_custom_word")?;
Ok(json!({
"success": u8_to_bool(result.success),
"affected_rows": result.affected_rows,
"word": word,
"weight": weight,
}))
}
pub fn remove_custom_word_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let word = require_string_field(input, "word")?;
self.log_info("remove_custom_word", Some(format!("word={}", word)));
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let word_value = word.to_string();
let result = bridge.run(move |client| async move {
client
.remove_sqlite_custom_word(space_id, binding_id, word_value)
.await
})?;
self.log_if_slow("remove_custom_word", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"affected_rows": result.affected_rows,
"word": word,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::RemoveCustomWord, input);
}
let api = self.api_ref();
let word = require_string_field(input, "word")?;
self.log_info("remove_custom_word", Some(format!("word={}", word)));
let started_at = Instant::now();
let guard = self.lock_handles()?;
let word_cstr = to_cstring(word, "word")?;
let mut result = VldbSqliteDictionaryMutationResultPod {
success: 0,
affected_rows: 0,
};
let status = unsafe {
(api.database_remove_custom_word)(guard.database, word_cstr.as_ptr(), &mut result)
};
drop(guard);
self.log_if_slow("remove_custom_word", started_at, None);
ensure_status(api, status, "remove_custom_word")?;
Ok(json!({
"success": u8_to_bool(result.success),
"affected_rows": result.affected_rows,
"word": word,
}))
}
pub fn list_custom_words_json(&self) -> Result<Value, String> {
if self.is_space_controller_mode() {
self.log_info("list_custom_words", None);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let result = bridge.run(move |client| async move {
client.list_sqlite_custom_words(space_id, binding_id).await
})?;
self.log_if_slow(
"list_custom_words",
started_at,
Some(format!("count={}", result.words.len())),
);
return Ok(json!({
"success": result.success,
"message": result.message,
"total": result.words.len(),
"words": result.words.into_iter().map(|entry| json!({
"word": entry.word,
"weight": entry.weight,
})).collect::<Vec<_>>(),
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(
RuntimeSqliteProviderAction::ListCustomWords,
&Value::Object(Default::default()),
);
}
let api = self.api_ref();
self.log_info("list_custom_words", None);
let started_at = Instant::now();
let guard = self.lock_handles()?;
unsafe {
let list_handle = (api.database_list_custom_words)(guard.database);
if list_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("list_custom_words", &error);
return Err(error);
}
let len = (api.custom_word_list_len)(list_handle);
let mut words = Vec::with_capacity(len as usize);
for index in 0..len {
let word = api
.take_optional_string((api.custom_word_list_get_word)(list_handle, index))
.unwrap_or_default();
let weight = (api.custom_word_list_get_weight)(list_handle, index);
words.push(json!({
"word": word,
"weight": weight,
}));
}
(api.custom_word_list_destroy)(list_handle);
drop(guard);
self.log_if_slow(
"list_custom_words",
started_at,
Some(format!("count={}", len)),
);
Ok(json!({
"success": true,
"total": len,
"words": words,
}))
}
}
pub fn ensure_fts_index_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = map_controller_tokenizer_mode(parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?);
self.log_info(
"ensure_fts_index",
Some(format!("index_name={}", index_name)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let index_name_value = index_name.to_string();
let result = bridge.run(move |client| async move {
client
.ensure_sqlite_fts_index(space_id, binding_id, index_name_value, tokenizer_mode)
.await
})?;
self.log_if_slow("ensure_fts_index", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"index_name": result.index_name,
"tokenizer_mode": result.tokenizer_mode,
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::EnsureFtsIndex, input);
}
let api = self.api_ref();
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?;
self.log_info(
"ensure_fts_index",
Some(format!("index_name={}", index_name)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let index_cstr = to_cstring(index_name, "index_name")?;
let mut result = VldbSqliteEnsureFtsIndexResultPod {
success: 0,
tokenizer_mode: tokenizer_mode as u32,
};
let status = unsafe {
(api.database_ensure_fts_index)(
guard.database,
index_cstr.as_ptr(),
tokenizer_mode,
&mut result,
)
};
drop(guard);
self.log_if_slow("ensure_fts_index", started_at, None);
ensure_status(api, status, "ensure_fts_index")?;
Ok(json!({
"success": u8_to_bool(result.success),
"index_name": index_name,
"tokenizer_mode": tokenizer_mode_name_from_u32(result.tokenizer_mode),
}))
}
pub fn rebuild_fts_index_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = map_controller_tokenizer_mode(parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?);
self.log_info(
"rebuild_fts_index",
Some(format!("index_name={}", index_name)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let index_name_value = index_name.to_string();
let result = bridge.run(move |client| async move {
client
.rebuild_sqlite_fts_index(
space_id,
binding_id,
index_name_value,
tokenizer_mode,
)
.await
})?;
self.log_if_slow("rebuild_fts_index", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"index_name": result.index_name,
"tokenizer_mode": result.tokenizer_mode,
"reindexed_rows": result.reindexed_rows,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::RebuildFtsIndex, input);
}
let api = self.api_ref();
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?;
self.log_info(
"rebuild_fts_index",
Some(format!("index_name={}", index_name)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let index_cstr = to_cstring(index_name, "index_name")?;
let mut result = VldbSqliteRebuildFtsIndexResultPod {
success: 0,
tokenizer_mode: tokenizer_mode as u32,
reindexed_rows: 0,
};
let status = unsafe {
(api.database_rebuild_fts_index)(
guard.database,
index_cstr.as_ptr(),
tokenizer_mode,
&mut result,
)
};
drop(guard);
self.log_if_slow("rebuild_fts_index", started_at, None);
ensure_status(api, status, "rebuild_fts_index")?;
Ok(json!({
"success": u8_to_bool(result.success),
"index_name": index_name,
"tokenizer_mode": tokenizer_mode_name_from_u32(result.tokenizer_mode),
"reindexed_rows": result.reindexed_rows,
}))
}
pub fn upsert_fts_document_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = map_controller_tokenizer_mode(parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?);
let id = require_string_field(input, "id")?;
let file_path = require_string_field(input, "file_path")?;
let title = input
.get("title")
.and_then(Value::as_str)
.unwrap_or_default();
let content = input
.get("content")
.and_then(Value::as_str)
.unwrap_or_default();
self.log_info(
"upsert_fts_document",
Some(format!("index_name={} id={}", index_name, id)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let index_name_value = index_name.to_string();
let id_value = id.to_string();
let file_path_value = file_path.to_string();
let title_value = title.to_string();
let content_value = content.to_string();
let result = bridge.run(move |client| async move {
client
.upsert_sqlite_fts_document(
space_id,
binding_id,
index_name_value,
tokenizer_mode,
id_value,
file_path_value,
title_value,
content_value,
)
.await
})?;
self.log_if_slow("upsert_fts_document", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"affected_rows": result.affected_rows,
"index_name": result.index_name,
"id": id,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::UpsertFtsDocument, input);
}
let api = self.api_ref();
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?;
let id = require_string_field(input, "id")?;
let file_path = require_string_field(input, "file_path")?;
let title = input.get("title").and_then(Value::as_str).unwrap_or("");
let content = input.get("content").and_then(Value::as_str).unwrap_or("");
self.log_info(
"upsert_fts_document",
Some(format!("index_name={} id={}", index_name, id)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let index_cstr = to_cstring(index_name, "index_name")?;
let id_cstr = to_cstring(id, "id")?;
let file_path_cstr = to_cstring(file_path, "file_path")?;
let title_cstr = to_cstring(title, "title")?;
let content_cstr = to_cstring(content, "content")?;
let mut result = VldbSqliteFtsMutationResultPod {
success: 0,
affected_rows: 0,
};
let status = unsafe {
(api.database_upsert_fts_document)(
guard.database,
index_cstr.as_ptr(),
tokenizer_mode,
id_cstr.as_ptr(),
file_path_cstr.as_ptr(),
title_cstr.as_ptr(),
content_cstr.as_ptr(),
&mut result,
)
};
drop(guard);
self.log_if_slow("upsert_fts_document", started_at, None);
ensure_status(api, status, "upsert_fts_document")?;
Ok(json!({
"success": u8_to_bool(result.success),
"affected_rows": result.affected_rows,
"index_name": index_name,
"id": id,
}))
}
pub fn delete_fts_document_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let index_name = require_string_field(input, "index_name")?;
let id = require_string_field(input, "id")?;
self.log_info(
"delete_fts_document",
Some(format!("index_name={} id={}", index_name, id)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let index_name_value = index_name.to_string();
let id_value = id.to_string();
let result = bridge.run(move |client| async move {
client
.delete_sqlite_fts_document(space_id, binding_id, index_name_value, id_value)
.await
})?;
self.log_if_slow("delete_fts_document", started_at, None);
return Ok(json!({
"success": result.success,
"message": result.message,
"affected_rows": result.affected_rows,
"index_name": result.index_name,
"id": id,
}));
}
if self.is_host_provider_mode() {
return self
.dispatch_host_provider(RuntimeSqliteProviderAction::DeleteFtsDocument, input);
}
let api = self.api_ref();
let index_name = require_string_field(input, "index_name")?;
let id = require_string_field(input, "id")?;
self.log_info(
"delete_fts_document",
Some(format!("index_name={} id={}", index_name, id)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let index_cstr = to_cstring(index_name, "index_name")?;
let id_cstr = to_cstring(id, "id")?;
let mut result = VldbSqliteFtsMutationResultPod {
success: 0,
affected_rows: 0,
};
let status = unsafe {
(api.database_delete_fts_document)(
guard.database,
index_cstr.as_ptr(),
id_cstr.as_ptr(),
&mut result,
)
};
drop(guard);
self.log_if_slow("delete_fts_document", started_at, None);
ensure_status(api, status, "delete_fts_document")?;
Ok(json!({
"success": u8_to_bool(result.success),
"affected_rows": result.affected_rows,
"index_name": index_name,
"id": id,
}))
}
pub fn search_fts_json(&self, input: &Value) -> Result<Value, String> {
if self.is_space_controller_mode() {
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = map_controller_tokenizer_mode(parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?);
let query = require_string_field(input, "query")?;
let limit = input.get("limit").and_then(Value::as_u64).unwrap_or(10);
let offset = input.get("offset").and_then(Value::as_u64).unwrap_or(0);
self.log_info(
"search_fts",
Some(format!(
"index_name={} tokenizer_mode={} limit={} offset={}",
index_name,
controller_tokenizer_mode_name(tokenizer_mode),
limit,
offset
)),
);
let started_at = Instant::now();
let bridge = self.controller_bridge()?;
let space_id = self.controller_space_id();
let binding_id = self.controller_binding_id()?;
let index_name_value = index_name.to_string();
let query_value = query.to_string();
let limit_value = u32::try_from(limit).unwrap_or(u32::MAX);
let offset_value = u32::try_from(offset).unwrap_or(u32::MAX);
let result = bridge.run(move |client| async move {
client
.search_sqlite_fts(
space_id,
binding_id,
index_name_value,
tokenizer_mode,
query_value,
limit_value,
offset_value,
)
.await
})?;
self.log_if_slow(
"search_fts",
started_at,
Some(format!("hits={}", result.total)),
);
return Ok(json!({
"success": result.success,
"message": result.message,
"index_name": result.index_name,
"tokenizer_mode": result.tokenizer_mode,
"normalized_query": result.normalized_query,
"fts_query": result.fts_query,
"source": result.source,
"query_mode": result.query_mode,
"total": result.total,
"hits": result.hits.into_iter().map(|hit| json!({
"id": hit.id,
"file_path": hit.file_path,
"title": hit.title,
"title_highlight": hit.title_highlight,
"content_snippet": hit.content_snippet,
"score": hit.score,
"rank": hit.rank,
"raw_score": hit.raw_score,
})).collect::<Vec<_>>(),
}));
}
if self.is_host_provider_mode() {
return self.dispatch_host_provider(RuntimeSqliteProviderAction::SearchFts, input);
}
let api = self.api_ref();
let index_name = require_string_field(input, "index_name")?;
let tokenizer_mode = parse_tokenizer_mode(
input
.get("tokenizer_mode")
.or_else(|| input.get("mode"))
.and_then(Value::as_str)
.unwrap_or("none"),
)?;
let query = require_string_field(input, "query")?;
let limit = input.get("limit").and_then(Value::as_u64).unwrap_or(10) as u32;
let offset = input.get("offset").and_then(Value::as_u64).unwrap_or(0) as u32;
self.log_info(
"search_fts",
Some(format!(
"index_name={} tokenizer_mode={} limit={} offset={}",
index_name,
tokenizer_mode_name(tokenizer_mode),
limit,
offset
)),
);
let started_at = Instant::now();
let guard = self.lock_handles()?;
let index_cstr = to_cstring(index_name, "index_name")?;
let query_cstr = to_cstring(query, "query")?;
unsafe {
let result_handle = (api.database_search_fts)(
guard.database,
index_cstr.as_ptr(),
tokenizer_mode,
query_cstr.as_ptr(),
limit,
offset,
);
if result_handle.is_null() {
drop(guard);
let error = api.take_last_error_message();
self.log_warning("search_fts", &error);
return Err(error);
}
let total = (api.search_result_total)(result_handle);
let len = (api.search_result_len)(result_handle);
let source = api
.take_optional_string((api.search_result_source)(result_handle))
.unwrap_or_else(|| "sqlite_fts".to_string());
let query_mode = api
.take_optional_string((api.search_result_query_mode)(result_handle))
.unwrap_or_else(|| "fts".to_string());
let mut hits = Vec::with_capacity(len as usize);
for index in 0..len {
hits.push(json!({
"id": api.take_optional_string((api.search_result_get_id)(result_handle, index)).unwrap_or_default(),
"file_path": api.take_optional_string((api.search_result_get_file_path)(result_handle, index)).unwrap_or_default(),
"title": api.take_optional_string((api.search_result_get_title)(result_handle, index)).unwrap_or_default(),
"title_highlight": api.take_optional_string((api.search_result_get_title_highlight)(result_handle, index)).unwrap_or_default(),
"content_snippet": api.take_optional_string((api.search_result_get_content_snippet)(result_handle, index)).unwrap_or_default(),
"score": (api.search_result_get_score)(result_handle, index),
"rank": (api.search_result_get_rank)(result_handle, index),
"raw_score": (api.search_result_get_raw_score)(result_handle, index),
}));
}
(api.search_result_destroy)(result_handle);
drop(guard);
self.log_if_slow("search_fts", started_at, Some(format!("hits={}", len)));
Ok(json!({
"success": true,
"index_name": index_name,
"tokenizer_mode": tokenizer_mode_name(tokenizer_mode),
"source": source,
"query_mode": query_mode,
"total": total,
"hits": hits,
}))
}
}
fn log_info(&self, operation: &str, extra: Option<String>) {
if self.config.log_level == SkillSqliteLogLevel::Info {
match extra {
Some(extra) => log_info(format!(
"[Sqlite:info] skill={} db={} op={} {}",
self.skill_name, self.skill_dir_name, operation, extra
)),
None => log_info(format!(
"[Sqlite:info] skill={} db={} op={}",
self.skill_name, self.skill_dir_name, operation
)),
}
}
}
fn log_if_slow(&self, operation: &str, started_at: Instant, extra: Option<String>) {
if !self.config.slow_log_enabled {
return;
}
let elapsed_ms = started_at.elapsed().as_millis() as u64;
if elapsed_ms < self.config.slow_log_threshold_ms {
return;
}
match extra {
Some(extra) => log_info(format!(
"[Sqlite:slow] skill={} db={} op={} elapsed_ms={} {}",
self.skill_name, self.skill_dir_name, operation, elapsed_ms, extra
)),
None => log_info(format!(
"[Sqlite:slow] skill={} db={} op={} elapsed_ms={}",
self.skill_name, self.skill_dir_name, operation, elapsed_ms
)),
}
}
fn log_warning(&self, operation: &str, message: &str) {
if matches!(
self.config.log_level,
SkillSqliteLogLevel::Info | SkillSqliteLogLevel::Warning
) {
log_warn(format!(
"[Sqlite:warn] skill={} db={} op={} message={}",
self.skill_name, self.skill_dir_name, operation, message
));
}
}
fn lock_handles(&self) -> Result<std::sync::MutexGuard<'_, SkillHandleState>, String> {
self.handles
.as_ref()
.ok_or_else(|| {
"SQLite dynamic-library handles are unavailable in host provider mode".to_string()
})?
.lock()
.map_err(|_| "failed to acquire SQLite handle lock".to_string())
}
fn is_host_provider_mode(&self) -> bool {
self.provider_mode == SqliteBindingMode::HostCallback
}
fn is_space_controller_mode(&self) -> bool {
self.provider_mode == SqliteBindingMode::SpaceController
}
fn api_ref(&self) -> &LoadedSqliteApi {
self.api
.as_ref()
.expect("SQLite dynamic-library API missing in host provider mode")
}
fn integration_mode_name(&self) -> &'static str {
match self.provider_mode {
SqliteBindingMode::DynamicLibrary => "dynamic_library",
SqliteBindingMode::HostCallback => "host_callback",
SqliteBindingMode::SpaceController => "space_controller",
}
}
fn dispatch_host_provider(
&self,
action: RuntimeSqliteProviderAction,
input: &Value,
) -> Result<Value, String> {
let request = RuntimeSqliteProviderRequest {
action,
binding: self.provider_binding.clone(),
input: input.clone(),
};
self.provider_callbacks
.dispatch_sqlite_provider_request(&request, self.host_callback_mode())
}
fn host_callback_mode(&self) -> LuaRuntimeDatabaseCallbackMode {
self.callback_mode
}
fn controller_bridge(&self) -> Result<&Arc<LuaRuntimeSpaceControllerBridge>, String> {
self.controller
.as_ref()
.ok_or_else(|| "SQLite space-controller bridge is unavailable".to_string())
}
fn controller_space_id(&self) -> String {
controller_space_id_for_binding(&self.provider_binding)
}
fn controller_binding_id(&self) -> Result<String, String> {
Ok(self
.controller_bridge()?
.controller_binding_id_for_binding(&self.provider_binding))
}
}
impl Drop for SqliteSkillBinding {
fn drop(&mut self) {
let Some(handles) = self.handles.as_ref() else {
return;
};
let Some(api) = self.api.as_ref() else {
return;
};
if let Ok(mut guard) = handles.lock() {
unsafe {
for (_, stream_handle) in guard.query_streams.drain() {
if !stream_handle.is_null() {
(api.query_stream_destroy)(stream_handle);
}
}
if !guard.database.is_null() {
(api.database_destroy)(guard.database);
guard.database = ptr::null_mut();
}
if !guard.runtime.is_null() {
(api.runtime_destroy)(guard.runtime);
guard.runtime = ptr::null_mut();
}
}
}
}
}
pub struct SqliteSkillHost {
api: Option<Arc<LoadedSqliteApi>>,
controller: Option<Arc<LuaRuntimeSpaceControllerBridge>>,
skills: Mutex<HashMap<String, Arc<SqliteSkillBinding>>>,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
host_options: LuaRuntimeHostOptions,
}
impl SqliteSkillHost {
pub fn new(
host_options: LuaRuntimeHostOptions,
provider_callbacks: Arc<RuntimeDatabaseProviderCallbacks>,
) -> Result<Self, String> {
let api = match host_options.sqlite_provider_mode {
LuaRuntimeDatabaseProviderMode::DynamicLibrary => {
let library_path = host_options.sqlite_library_path.clone().ok_or_else(|| {
"SQLite dynamic-library mode requires host_options.sqlite_library_path"
.to_string()
})?;
Some(Arc::new(LoadedSqliteApi::load(&library_path)?))
}
LuaRuntimeDatabaseProviderMode::HostCallback => {
if !provider_callbacks
.has_sqlite_provider_callback_for_mode(host_options.sqlite_callback_mode)
{
return Err(format!(
"SQLite host-callback mode is enabled but no {} callback is registered",
callback_mode_name(host_options.sqlite_callback_mode)
));
}
None
}
LuaRuntimeDatabaseProviderMode::SpaceController => None,
};
let controller = match host_options.sqlite_provider_mode {
LuaRuntimeDatabaseProviderMode::SpaceController => Some(
LuaRuntimeSpaceControllerBridge::new(&host_options, "sqlite")?,
),
_ => None,
};
Ok(Self {
api,
controller,
skills: Mutex::new(HashMap::new()),
provider_callbacks,
host_options,
})
}
pub fn register_skill(
&self,
root_name: &str,
skill_name: &str,
skill_dir: &Path,
config: SkillSqliteMeta,
) -> Result<Arc<SqliteSkillBinding>, String> {
let mut guard = self
.skills
.lock()
.map_err(|_| "failed to acquire SQLite skill registry lock".to_string())?;
if let Some(existing) = guard.get(skill_name) {
return Ok(existing.clone());
}
let skill_dir_name = skill_dir
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| {
format!(
"invalid skill directory name for {}: {}",
skill_name,
skill_dir.display()
)
})?
.to_string();
let skills_root = skill_dir.parent().ok_or_else(|| {
format!(
"invalid skill root for {}: {}",
skill_name,
skill_dir.display()
)
})?;
let sidecar_root = skills_root
.parent()
.unwrap_or(skills_root)
.join(self.host_options.database_dir_name.as_str());
let db_dir = sidecar_root.join("sqlite").join(skill_name);
let db_path = db_dir.join(format!("{}.sqlite3", skill_name));
let database_path = db_path.to_string_lossy().to_string();
let binding_context = RuntimeDatabaseBindingContext::new(
root_name,
skill_name,
root_name,
sidecar_root.to_string_lossy().to_string(),
skill_dir.to_string_lossy().to_string(),
skill_dir_name.clone(),
RuntimeDatabaseKind::Sqlite,
database_path.clone(),
);
let (resolved_path, handles, provider_mode, controller) = if let Some(api) =
self.api.as_ref()
{
std::fs::create_dir_all(&db_dir).map_err(|error| {
format!(
"failed to create SQLite directory {}: {}: {}",
db_dir.display(),
error,
error
)
})?;
let database_cstr = CString::new(database_path.clone())
.map_err(|_| "database path contains interior NUL bytes".to_string())?;
let runtime = unsafe { (api.runtime_create_default)() };
if runtime.is_null() {
return Err(api.take_last_error_message());
}
let database = unsafe { (api.runtime_open_database)(runtime, database_cstr.as_ptr()) };
if database.is_null() {
unsafe {
(api.runtime_destroy)(runtime);
}
return Err(api.take_last_error_message());
}
let resolved_path = unsafe { api.take_owned_string((api.database_db_path)(database)) }
.unwrap_or(database_path.clone());
(
resolved_path,
Some(Mutex::new(SkillHandleState {
runtime,
database,
query_streams: HashMap::new(),
next_stream_id: 1,
})),
SqliteBindingMode::DynamicLibrary,
None,
)
} else if matches!(
self.host_options.sqlite_provider_mode,
LuaRuntimeDatabaseProviderMode::SpaceController
) {
let controller = self
.controller
.as_ref()
.ok_or_else(|| "SQLite space-controller bridge is unavailable".to_string())?
.clone();
let controller_space_id = controller_space_id_for_binding(&binding_context);
let controller_binding_id =
controller.controller_binding_id_for_binding(&binding_context);
let controller_database_path = database_path.clone();
controller.attach_binding(&binding_context)?;
controller.run(move |client| async move {
client
.enable_sqlite(ControllerSqliteEnableRequest {
space_id: controller_space_id,
binding_id: controller_binding_id,
db_path: controller_database_path,
enforce_db_file_lock: false,
..ControllerSqliteEnableRequest::default()
})
.await
})?;
(
database_path.clone(),
None,
SqliteBindingMode::SpaceController,
Some(controller),
)
} else {
(
database_path.clone(),
None,
SqliteBindingMode::HostCallback,
None,
)
};
let binding = Arc::new(SqliteSkillBinding {
api: self.api.clone(),
skill_name: skill_name.to_string(),
skill_dir_name,
database_path: resolved_path,
config,
provider_mode,
callback_mode: self.host_options.sqlite_callback_mode,
handles,
controller,
provider_callbacks: self.provider_callbacks.clone(),
provider_binding: binding_context,
});
guard.insert(skill_name.to_string(), binding.clone());
Ok(binding)
}
pub fn binding_for_skill(
&self,
skill_name: &str,
) -> Result<Option<Arc<SqliteSkillBinding>>, String> {
let skills = self
.skills
.lock()
.map_err(|_| "SQLite skill binding registry lock poisoned".to_string())?;
Ok(skills.get(skill_name).cloned())
}
}
pub fn disabled_skill_status_json(skill_name: Option<&str>) -> Value {
json!({
"enabled": false,
"initialized": false,
"skill_name": skill_name.unwrap_or(""),
"integration_mode": "dynamic_library",
"reason": "current skill has not enabled sqlite"
})
}
fn parse_tokenizer_mode(text: &str) -> Result<VldbSqliteFfiTokenizerMode, String> {
match text.trim().to_ascii_lowercase().as_str() {
"" | "none" => Ok(VldbSqliteFfiTokenizerMode::None),
"jieba" => Ok(VldbSqliteFfiTokenizerMode::Jieba),
other => Err(format!("unsupported sqlite tokenizer mode: {}", other)),
}
}
fn tokenizer_mode_name(mode: VldbSqliteFfiTokenizerMode) -> &'static str {
match mode {
VldbSqliteFfiTokenizerMode::None => "none",
VldbSqliteFfiTokenizerMode::Jieba => "jieba",
}
}
fn map_controller_tokenizer_mode(
mode: VldbSqliteFfiTokenizerMode,
) -> ControllerSqliteTokenizerMode {
match mode {
VldbSqliteFfiTokenizerMode::None => ControllerSqliteTokenizerMode::None,
VldbSqliteFfiTokenizerMode::Jieba => ControllerSqliteTokenizerMode::Jieba,
}
}
fn controller_tokenizer_mode_name(mode: ControllerSqliteTokenizerMode) -> &'static str {
match mode {
ControllerSqliteTokenizerMode::None => "none",
ControllerSqliteTokenizerMode::Jieba => "jieba",
}
}
fn tokenizer_mode_name_from_u32(mode: u32) -> &'static str {
match mode {
1 => "jieba",
_ => "none",
}
}
fn callback_mode_name(mode: LuaRuntimeDatabaseCallbackMode) -> &'static str {
match mode {
LuaRuntimeDatabaseCallbackMode::Standard => "standard",
LuaRuntimeDatabaseCallbackMode::Json => "json",
}
}
fn bool_to_u8(value: bool) -> u8 {
if value { 1 } else { 0 }
}
fn u8_to_bool(value: u8) -> bool {
value != 0
}
enum HostSqliteParamValue {
Null,
Int64(i64),
Float64(f64),
String(String),
Bytes(Vec<u8>),
Bool(bool),
}
fn map_controller_sqlite_params(values: &[HostSqliteParamValue]) -> Vec<ControllerSqliteValue> {
values
.iter()
.map(|value| match value {
HostSqliteParamValue::Null => ControllerSqliteValue::Null,
HostSqliteParamValue::Int64(value) => ControllerSqliteValue::Int64(*value),
HostSqliteParamValue::Float64(value) => ControllerSqliteValue::Float64(*value),
HostSqliteParamValue::String(value) => ControllerSqliteValue::String(value.clone()),
HostSqliteParamValue::Bytes(value) => ControllerSqliteValue::Bytes(value.clone()),
HostSqliteParamValue::Bool(value) => ControllerSqliteValue::Bool(*value),
})
.collect()
}
struct OwnedSqliteFfiValues {
values: Vec<VldbSqliteFfiValue>,
_strings: Vec<CString>,
_bytes: Vec<Vec<u8>>,
}
impl OwnedSqliteFfiValues {
fn as_ptr(&self) -> *const VldbSqliteFfiValue {
self.values.as_ptr()
}
fn len_u64(&self) -> u64 {
u64::try_from(self.values.len()).unwrap_or(u64::MAX)
}
}
struct OwnedSqliteFfiValueMatrix {
_rows: Vec<OwnedSqliteFfiValues>,
slices: Vec<VldbSqliteFfiValueSlice>,
}
impl OwnedSqliteFfiValueMatrix {
fn as_ptr(&self) -> *const VldbSqliteFfiValueSlice {
self.slices.as_ptr()
}
fn len_u64(&self) -> u64 {
u64::try_from(self.slices.len()).unwrap_or(u64::MAX)
}
}
fn build_owned_ffi_values(values: &[HostSqliteParamValue]) -> Result<OwnedSqliteFfiValues, String> {
let mut ffi_values = Vec::with_capacity(values.len());
let mut strings = Vec::new();
let mut bytes = Vec::new();
for value in values {
match value {
HostSqliteParamValue::Null => ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::Null,
int64_value: 0,
float64_value: 0.0,
string_value: ptr::null(),
bytes_value: VldbSqliteByteView::default(),
bool_value: 0,
}),
HostSqliteParamValue::Int64(number) => ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::Int64,
int64_value: *number,
float64_value: 0.0,
string_value: ptr::null(),
bytes_value: VldbSqliteByteView::default(),
bool_value: 0,
}),
HostSqliteParamValue::Float64(number) => ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::Float64,
int64_value: 0,
float64_value: *number,
string_value: ptr::null(),
bytes_value: VldbSqliteByteView::default(),
bool_value: 0,
}),
HostSqliteParamValue::String(text) => {
let c_text = to_cstring(text, "params[*].string")?;
let ptr = c_text.as_ptr();
strings.push(c_text);
ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::String,
int64_value: 0,
float64_value: 0.0,
string_value: ptr,
bytes_value: VldbSqliteByteView::default(),
bool_value: 0,
});
}
HostSqliteParamValue::Bytes(blob) => {
let owned = blob.clone();
let view = if owned.is_empty() {
VldbSqliteByteView::default()
} else {
VldbSqliteByteView {
data: owned.as_ptr(),
len: u64::try_from(owned.len()).unwrap_or(u64::MAX),
}
};
bytes.push(owned);
ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::Bytes,
int64_value: 0,
float64_value: 0.0,
string_value: ptr::null(),
bytes_value: view,
bool_value: 0,
});
}
HostSqliteParamValue::Bool(flag) => ffi_values.push(VldbSqliteFfiValue {
kind: VldbSqliteFfiValueKind::Bool,
int64_value: 0,
float64_value: 0.0,
string_value: ptr::null(),
bytes_value: VldbSqliteByteView::default(),
bool_value: bool_to_u8(*flag),
}),
}
}
Ok(OwnedSqliteFfiValues {
values: ffi_values,
_strings: strings,
_bytes: bytes,
})
}
fn build_owned_ffi_value_matrix(
rows: &[Vec<HostSqliteParamValue>],
) -> Result<OwnedSqliteFfiValueMatrix, String> {
let owned_rows = rows
.iter()
.map(|row| build_owned_ffi_values(row))
.collect::<Result<Vec<_>, _>>()?;
let slices = owned_rows
.iter()
.map(|row| VldbSqliteFfiValueSlice {
values: row.as_ptr(),
len: row.len_u64(),
})
.collect::<Vec<_>>();
Ok(OwnedSqliteFfiValueMatrix {
_rows: owned_rows,
slices,
})
}
fn parse_scalar_sqlite_param(
value: &Value,
field_name: &str,
) -> Result<HostSqliteParamValue, String> {
match value {
Value::Null => Ok(HostSqliteParamValue::Null),
Value::Bool(flag) => Ok(HostSqliteParamValue::Bool(*flag)),
Value::Number(number) => {
if let Some(int_value) = number.as_i64() {
Ok(HostSqliteParamValue::Int64(int_value))
} else if let Some(unsigned) = number.as_u64() {
let converted = i64::try_from(unsigned).map_err(|_| {
format!(
"{} contains an unsigned integer larger than i64",
field_name
)
})?;
Ok(HostSqliteParamValue::Int64(converted))
} else if let Some(float_value) = number.as_f64() {
Ok(HostSqliteParamValue::Float64(float_value))
} else {
Err(format!(
"{} contains an unsupported numeric value",
field_name
))
}
}
Value::String(text) => Ok(HostSqliteParamValue::String(text.clone())),
_ => Err(format!("{} must contain only scalar values", field_name)),
}
}
fn parse_typed_sqlite_param(
object: &serde_json::Map<String, Value>,
field_name: &str,
) -> Result<HostSqliteParamValue, String> {
let kind = object
.get("kind")
.and_then(Value::as_str)
.ok_or_else(|| format!("{}.kind is required for typed parameters", field_name))?;
match kind.trim().to_ascii_lowercase().as_str() {
"null" => Ok(HostSqliteParamValue::Null),
"bool" => object
.get("value")
.and_then(Value::as_bool)
.map(HostSqliteParamValue::Bool)
.ok_or_else(|| format!("{}.value must be a bool", field_name)),
"int64" => object
.get("value")
.and_then(Value::as_i64)
.map(HostSqliteParamValue::Int64)
.ok_or_else(|| format!("{}.value must be an int64", field_name)),
"float64" => object
.get("value")
.and_then(Value::as_f64)
.map(HostSqliteParamValue::Float64)
.ok_or_else(|| format!("{}.value must be a float64", field_name)),
"string" => object
.get("value")
.and_then(Value::as_str)
.map(|value| HostSqliteParamValue::String(value.to_string()))
.ok_or_else(|| format!("{}.value must be a string", field_name)),
"bytes" => {
if let Some(base64_value) = object.get("base64").and_then(Value::as_str) {
let decoded = BASE64_STANDARD
.decode(base64_value)
.map_err(|error| format!("{}.base64 is invalid: {}", field_name, error))?;
return Ok(HostSqliteParamValue::Bytes(decoded));
}
let array = object
.get("value")
.and_then(Value::as_array)
.ok_or_else(|| {
format!(
"{}.value must be a byte array or provide base64",
field_name
)
})?;
let mut bytes = Vec::with_capacity(array.len());
for (index, item) in array.iter().enumerate() {
let byte = item.as_u64().ok_or_else(|| {
format!(
"{}.value[{}] must be an unsigned integer",
field_name, index
)
})?;
let converted = u8::try_from(byte)
.map_err(|_| format!("{}.value[{}] exceeds u8", field_name, index))?;
bytes.push(converted);
}
Ok(HostSqliteParamValue::Bytes(bytes))
}
other => Err(format!("{}.kind={} is unsupported", field_name, other)),
}
}
fn parse_sqlite_param(value: &Value, field_name: &str) -> Result<HostSqliteParamValue, String> {
match value {
Value::Object(object) if object.contains_key("kind") => {
parse_typed_sqlite_param(object, field_name)
}
other => parse_scalar_sqlite_param(other, field_name),
}
}
fn parse_legacy_params_json_text(params_json: &str) -> Result<Vec<HostSqliteParamValue>, String> {
if params_json.trim().is_empty() {
return Ok(Vec::new());
}
let parsed: Value = serde_json::from_str(params_json).map_err(|error| {
format!(
"params_json must be a JSON array of scalar values: {}",
error
)
})?;
let items = parsed
.as_array()
.ok_or_else(|| "params_json must be a JSON array of scalar values".to_string())?;
items
.iter()
.enumerate()
.map(|(index, item)| parse_scalar_sqlite_param(item, &format!("params_json[{}]", index)))
.collect()
}
fn parse_single_sql_params(input: &Value) -> Result<Vec<HostSqliteParamValue>, String> {
let params_json = input
.get("params_json")
.and_then(Value::as_str)
.unwrap_or("");
if let Some(params_value) = input.get("params") {
if !params_json.trim().is_empty() {
return Err("provide either params or params_json, but not both".to_string());
}
let params_array = params_value
.as_array()
.ok_or_else(|| "params must be an array".to_string())?;
return params_array
.iter()
.enumerate()
.map(|(index, item)| parse_sqlite_param(item, &format!("params[{}]", index)))
.collect();
}
parse_legacy_params_json_text(params_json)
}
fn parse_batch_sql_params(input: &Value) -> Result<Vec<Vec<HostSqliteParamValue>>, String> {
let items = input
.get("items")
.and_then(Value::as_array)
.ok_or_else(|| "items must be an array of arrays".to_string())?;
if items.is_empty() {
return Err("items must not be empty".to_string());
}
items
.iter()
.enumerate()
.map(|(row_index, row)| {
let row_items = row
.as_array()
.ok_or_else(|| format!("items[{}] must be an array", row_index))?;
row_items
.iter()
.enumerate()
.map(|(col_index, item)| {
parse_sqlite_param(item, &format!("items[{}][{}]", row_index, col_index))
})
.collect()
})
.collect()
}
fn require_string_field<'a>(input: &'a Value, field_name: &str) -> Result<&'a str, String> {
input
.get(field_name)
.and_then(Value::as_str)
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| format!("missing or empty field `{}`", field_name))
}
fn to_cstring(text: &str, field_name: &str) -> Result<CString, String> {
CString::new(text).map_err(|_| format!("field `{}` contains interior NUL bytes", field_name))
}
fn ensure_status(api: &LoadedSqliteApi, status: i32, operation: &str) -> Result<(), String> {
if status == VldbSqliteStatusCode::Success as i32 {
return Ok(());
}
let error = api.take_last_error_message();
Err(format!("{} failed: {}", operation, error))
}