use crate::config::{ColumnFamilyConfig, Config, IsolationLevel};
use crate::error::{check_result, Error, Result};
use crate::ffi;
use crate::stats::{CacheStats, DbStats};
use crate::transaction::Transaction;
use libc::{c_char, c_int, c_void, size_t};
use std::ffi::{CStr, CString};
use std::ptr;
#[derive(Debug, Clone)]
pub struct CommitOp {
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub ttl: i64,
pub is_delete: bool,
}
type CommitHookCallback = Box<dyn Fn(&[CommitOp], u64) -> i32 + Send>;
type ComparatorCallback = Box<dyn Fn(&[u8], &[u8]) -> i32 + Send + Sync>;
unsafe extern "C" fn comparator_trampoline(
key1: *const u8,
key1_size: size_t,
key2: *const u8,
key2_size: size_t,
ctx: *mut c_void,
) -> c_int {
if ctx.is_null() {
return 0;
}
let callback = unsafe { &*(ctx as *const ComparatorCallback) };
let k1 = if key1.is_null() || key1_size == 0 {
&[]
} else {
unsafe { std::slice::from_raw_parts(key1, key1_size) }
};
let k2 = if key2.is_null() || key2_size == 0 {
&[]
} else {
unsafe { std::slice::from_raw_parts(key2, key2_size) }
};
callback(k1, k2)
}
pub fn init() -> Result<()> {
let result = unsafe { ffi::tidesdb_init(None, None, None, None) };
check_result(result, "failed to initialize TidesDB")
}
pub unsafe fn init_with_allocator(
malloc_fn: ffi::tidesdb_malloc_fn,
calloc_fn: ffi::tidesdb_calloc_fn,
realloc_fn: ffi::tidesdb_realloc_fn,
free_fn: ffi::tidesdb_free_fn,
) -> Result<()> {
let result = unsafe { ffi::tidesdb_init(malloc_fn, calloc_fn, realloc_fn, free_fn) };
check_result(result, "failed to initialize TidesDB with custom allocator")
}
pub fn finalize() {
unsafe {
ffi::tidesdb_finalize();
}
}
pub unsafe fn free(ptr: *mut c_void) {
unsafe {
ffi::tidesdb_free(ptr);
}
}
unsafe extern "C" fn commit_hook_trampoline(
ops: *const ffi::tidesdb_commit_op_t,
num_ops: c_int,
commit_seq: u64,
ctx: *mut c_void,
) -> c_int {
if ctx.is_null() || ops.is_null() || num_ops <= 0 {
return -1;
}
let callback = unsafe { &*(ctx as *const CommitHookCallback) };
let mut rust_ops = Vec::with_capacity(num_ops as usize);
for i in 0..num_ops as isize {
let op = unsafe { &*ops.offset(i) };
let key = unsafe { std::slice::from_raw_parts(op.key, op.key_size) }.to_vec();
let value = if op.is_delete != 0 || op.value.is_null() {
None
} else {
Some(unsafe { std::slice::from_raw_parts(op.value, op.value_size) }.to_vec())
};
rust_ops.push(CommitOp {
key,
value,
ttl: op.ttl as i64,
is_delete: op.is_delete != 0,
});
}
callback(&rust_ops, commit_seq)
}
pub struct TidesDB {
db: *mut ffi::tidesdb_t,
}
unsafe impl Send for TidesDB {}
unsafe impl Sync for TidesDB {}
impl TidesDB {
pub fn open(config: Config) -> Result<Self> {
let c_data = config.to_c_config()?;
let mut db: *mut ffi::tidesdb_t = ptr::null_mut();
let result = unsafe { ffi::tidesdb_open(&c_data.config, &mut db) };
check_result(result, "failed to open database")?;
if db.is_null() {
return Err(Error::NullPointer("database handle"));
}
Ok(TidesDB { db })
}
pub fn create_column_family(&self, name: &str, config: ColumnFamilyConfig) -> Result<()> {
let c_name = CString::new(name)?;
let c_config = config.to_c_config();
let result = unsafe {
ffi::tidesdb_create_column_family(self.db, c_name.as_ptr(), &c_config)
};
check_result(result, "failed to create column family")
}
pub fn drop_column_family(&self, name: &str) -> Result<()> {
let c_name = CString::new(name)?;
let result = unsafe { ffi::tidesdb_drop_column_family(self.db, c_name.as_ptr()) };
check_result(result, "failed to drop column family")
}
pub fn delete_column_family(&self, cf: ColumnFamily) -> Result<()> {
let result = unsafe { ffi::tidesdb_delete_column_family(self.db, cf.cf) };
std::mem::forget(cf);
check_result(result, "failed to delete column family")
}
pub fn rename_column_family(&self, old_name: &str, new_name: &str) -> Result<()> {
let c_old_name = CString::new(old_name)?;
let c_new_name = CString::new(new_name)?;
let result = unsafe {
ffi::tidesdb_rename_column_family(self.db, c_old_name.as_ptr(), c_new_name.as_ptr())
};
check_result(result, "failed to rename column family")
}
pub fn clone_column_family(&self, source_name: &str, dest_name: &str) -> Result<()> {
let c_source_name = CString::new(source_name)?;
let c_dest_name = CString::new(dest_name)?;
let result = unsafe {
ffi::tidesdb_clone_column_family(self.db, c_source_name.as_ptr(), c_dest_name.as_ptr())
};
check_result(result, "failed to clone column family")
}
pub fn get_column_family(&self, name: &str) -> Result<ColumnFamily> {
let c_name = CString::new(name)?;
let cf = unsafe { ffi::tidesdb_get_column_family(self.db, c_name.as_ptr()) };
if cf.is_null() {
return Err(Error::from_code(
ffi::TDB_ERR_NOT_FOUND,
"column family not found",
));
}
Ok(ColumnFamily {
cf,
name: name.to_string(),
hook_ctx: None,
})
}
pub fn list_column_families(&self) -> Result<Vec<String>> {
let mut names: *mut *mut c_char = ptr::null_mut();
let mut count: i32 = 0;
let result =
unsafe { ffi::tidesdb_list_column_families(self.db, &mut names, &mut count) };
check_result(result, "failed to list column families")?;
if count == 0 || names.is_null() {
return Ok(Vec::new());
}
let mut result_names = Vec::with_capacity(count as usize);
unsafe {
for i in 0..count as isize {
let name_ptr = *names.offset(i);
if !name_ptr.is_null() {
let name = CStr::from_ptr(name_ptr).to_string_lossy().into_owned();
result_names.push(name);
ffi::tidesdb_free(name_ptr as *mut c_void);
}
}
ffi::tidesdb_free(names as *mut c_void);
}
Ok(result_names)
}
pub fn begin_transaction(&self) -> Result<Transaction> {
let mut txn: *mut ffi::tidesdb_txn_t = ptr::null_mut();
let result = unsafe { ffi::tidesdb_txn_begin(self.db, &mut txn) };
check_result(result, "failed to begin transaction")?;
if txn.is_null() {
return Err(Error::NullPointer("transaction handle"));
}
Ok(Transaction::new(txn))
}
pub fn begin_transaction_with_isolation(
&self,
isolation: IsolationLevel,
) -> Result<Transaction> {
let mut txn: *mut ffi::tidesdb_txn_t = ptr::null_mut();
let result = unsafe {
ffi::tidesdb_txn_begin_with_isolation(self.db, isolation as i32, &mut txn)
};
check_result(result, "failed to begin transaction with isolation")?;
if txn.is_null() {
return Err(Error::NullPointer("transaction handle"));
}
Ok(Transaction::new(txn))
}
pub fn get_cache_stats(&self) -> Result<CacheStats> {
let mut c_stats = ffi::tidesdb_cache_stats_t {
enabled: 0,
total_entries: 0,
total_bytes: 0,
hits: 0,
misses: 0,
hit_rate: 0.0,
num_partitions: 0,
};
let result = unsafe { ffi::tidesdb_get_cache_stats(self.db, &mut c_stats) };
check_result(result, "failed to get cache stats")?;
Ok(CacheStats {
enabled: c_stats.enabled != 0,
total_entries: c_stats.total_entries,
total_bytes: c_stats.total_bytes,
hits: c_stats.hits as usize,
misses: c_stats.misses as usize,
hit_rate: c_stats.hit_rate,
num_partitions: c_stats.num_partitions,
})
}
pub fn register_comparator<F>(&self, name: &str, compare_fn: F) -> Result<()>
where
F: Fn(&[u8], &[u8]) -> i32 + Send + Sync + 'static,
{
let c_name = CString::new(name)?;
let boxed: Box<ComparatorCallback> = Box::new(Box::new(compare_fn));
let raw = Box::into_raw(boxed);
let result = unsafe {
ffi::tidesdb_register_comparator(
self.db,
c_name.as_ptr(),
Some(comparator_trampoline),
std::ptr::null(), raw as *mut c_void,
)
};
if result != ffi::TDB_SUCCESS {
unsafe {
drop(Box::from_raw(raw));
}
return Err(Error::from_code(result, "failed to register comparator"));
}
Ok(())
}
pub fn has_comparator(&self, name: &str) -> bool {
let c_name = match CString::new(name) {
Ok(s) => s,
Err(_) => return false,
};
let mut fn_out: ffi::tidesdb_comparator_fn = None;
let mut ctx_out: *mut c_void = ptr::null_mut();
let result = unsafe {
ffi::tidesdb_get_comparator(
self.db,
c_name.as_ptr(),
&mut fn_out,
&mut ctx_out,
)
};
result == ffi::TDB_SUCCESS
}
pub fn backup(&self, dir: &str) -> Result<()> {
let c_dir = CString::new(dir)?;
let result = unsafe { ffi::tidesdb_backup(self.db, c_dir.as_ptr() as *mut c_char) };
check_result(result, "failed to backup database")
}
pub fn checkpoint(&self, checkpoint_dir: &str) -> Result<()> {
let c_dir = CString::new(checkpoint_dir)?;
let result = unsafe { ffi::tidesdb_checkpoint(self.db, c_dir.as_ptr()) };
check_result(result, "failed to checkpoint database")
}
pub fn purge(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_purge(self.db) };
check_result(result, "failed to purge database")
}
pub fn get_db_stats(&self) -> Result<DbStats> {
let mut c_stats = std::mem::MaybeUninit::<ffi::tidesdb_db_stats_t>::zeroed();
let result = unsafe { ffi::tidesdb_get_db_stats(self.db, c_stats.as_mut_ptr()) };
check_result(result, "failed to get database stats")?;
let c_stats = unsafe { c_stats.assume_init() };
let object_store_connector = if c_stats.object_store_connector.is_null() {
String::new()
} else {
unsafe { CStr::from_ptr(c_stats.object_store_connector) }
.to_string_lossy()
.into_owned()
};
Ok(DbStats {
num_column_families: c_stats.num_column_families,
total_memory: c_stats.total_memory,
available_memory: c_stats.available_memory,
resolved_memory_limit: c_stats.resolved_memory_limit,
memory_pressure_level: c_stats.memory_pressure_level,
flush_pending_count: c_stats.flush_pending_count,
total_memtable_bytes: c_stats.total_memtable_bytes,
total_immutable_count: c_stats.total_immutable_count,
total_sstable_count: c_stats.total_sstable_count,
total_data_size_bytes: c_stats.total_data_size_bytes,
num_open_sstables: c_stats.num_open_sstables,
global_seq: c_stats.global_seq,
txn_memory_bytes: c_stats.txn_memory_bytes,
compaction_queue_size: c_stats.compaction_queue_size,
flush_queue_size: c_stats.flush_queue_size,
unified_memtable_enabled: c_stats.unified_memtable_enabled != 0,
unified_memtable_bytes: c_stats.unified_memtable_bytes,
unified_immutable_count: c_stats.unified_immutable_count,
unified_is_flushing: c_stats.unified_is_flushing != 0,
unified_next_cf_index: c_stats.unified_next_cf_index,
unified_wal_generation: c_stats.unified_wal_generation,
object_store_enabled: c_stats.object_store_enabled != 0,
object_store_connector,
local_cache_bytes_used: c_stats.local_cache_bytes_used,
local_cache_bytes_max: c_stats.local_cache_bytes_max,
local_cache_num_files: c_stats.local_cache_num_files,
last_uploaded_generation: c_stats.last_uploaded_generation,
upload_queue_depth: c_stats.upload_queue_depth,
total_uploads: c_stats.total_uploads,
total_upload_failures: c_stats.total_upload_failures,
replica_mode: c_stats.replica_mode != 0,
})
}
pub fn promote_to_primary(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_promote_to_primary(self.db) };
check_result(result, "failed to promote to primary")
}
}
impl Drop for TidesDB {
fn drop(&mut self) {
if !self.db.is_null() {
unsafe {
ffi::tidesdb_close(self.db);
}
self.db = ptr::null_mut();
}
}
}
pub struct ColumnFamily {
pub(crate) cf: *mut ffi::tidesdb_column_family_t,
name: String,
hook_ctx: Option<*mut CommitHookCallback>,
}
unsafe impl Send for ColumnFamily {}
unsafe impl Sync for ColumnFamily {}
impl ColumnFamily {
pub fn name(&self) -> &str {
&self.name
}
pub fn get_stats(&self) -> Result<crate::stats::Stats> {
let mut c_stats: *mut ffi::tidesdb_stats_t = ptr::null_mut();
let result = unsafe { ffi::tidesdb_get_stats(self.cf, &mut c_stats) };
check_result(result, "failed to get stats")?;
if c_stats.is_null() {
return Err(Error::NullPointer("stats"));
}
let stats = unsafe {
let num_levels = (*c_stats).num_levels;
let memtable_size = (*c_stats).memtable_size;
let mut level_sizes = Vec::new();
if num_levels > 0 && !(*c_stats).level_sizes.is_null() {
for i in 0..num_levels as isize {
level_sizes.push(*(*c_stats).level_sizes.offset(i));
}
}
let mut level_num_sstables = Vec::new();
if num_levels > 0 && !(*c_stats).level_num_sstables.is_null() {
for i in 0..num_levels as isize {
level_num_sstables.push(*(*c_stats).level_num_sstables.offset(i));
}
}
let mut level_key_counts = Vec::new();
if num_levels > 0 && !(*c_stats).level_key_counts.is_null() {
for i in 0..num_levels as isize {
level_key_counts.push(*(*c_stats).level_key_counts.offset(i));
}
}
let total_keys = (*c_stats).total_keys;
let total_data_size = (*c_stats).total_data_size;
let avg_key_size = (*c_stats).avg_key_size;
let avg_value_size = (*c_stats).avg_value_size;
let read_amp = (*c_stats).read_amp;
let hit_rate = (*c_stats).hit_rate;
let use_btree = (*c_stats).use_btree != 0;
let btree_total_nodes = (*c_stats).btree_total_nodes;
let btree_max_height = (*c_stats).btree_max_height;
let btree_avg_height = (*c_stats).btree_avg_height;
ffi::tidesdb_free_stats(c_stats);
crate::stats::Stats {
num_levels,
memtable_size,
level_sizes,
level_num_sstables,
config: None,
total_keys,
total_data_size,
avg_key_size,
avg_value_size,
level_key_counts,
read_amp,
hit_rate,
use_btree,
btree_total_nodes,
btree_max_height,
btree_avg_height,
}
};
Ok(stats)
}
pub fn compact(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_compact(self.cf) };
check_result(result, "failed to compact column family")
}
pub fn flush_memtable(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_flush_memtable(self.cf) };
check_result(result, "failed to flush memtable")
}
pub fn is_flushing(&self) -> bool {
unsafe { ffi::tidesdb_is_flushing(self.cf) != 0 }
}
pub fn is_compacting(&self) -> bool {
unsafe { ffi::tidesdb_is_compacting(self.cf) != 0 }
}
pub fn purge(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_purge_cf(self.cf) };
check_result(result, "failed to purge column family")
}
pub fn sync_wal(&self) -> Result<()> {
let result = unsafe { ffi::tidesdb_sync_wal(self.cf) };
check_result(result, "failed to sync WAL")
}
pub fn range_cost(&self, key_a: &[u8], key_b: &[u8]) -> Result<f64> {
let mut cost: f64 = 0.0;
let result = unsafe {
ffi::tidesdb_range_cost(
self.cf,
key_a.as_ptr(),
key_a.len(),
key_b.as_ptr(),
key_b.len(),
&mut cost,
)
};
check_result(result, "failed to estimate range cost")?;
Ok(cost)
}
pub fn set_commit_hook<F>(&mut self, callback: F) -> Result<()>
where
F: Fn(&[CommitOp], u64) -> i32 + Send + 'static,
{
self.clear_commit_hook()?;
let boxed: Box<CommitHookCallback> = Box::new(Box::new(callback));
let raw = Box::into_raw(boxed);
let result = unsafe {
ffi::tidesdb_cf_set_commit_hook(
self.cf,
Some(commit_hook_trampoline),
raw as *mut c_void,
)
};
if result != ffi::TDB_SUCCESS {
unsafe {
drop(Box::from_raw(raw));
}
return Err(Error::from_code(result, "failed to set commit hook"));
}
self.hook_ctx = Some(raw);
Ok(())
}
pub fn clear_commit_hook(&mut self) -> Result<()> {
if let Some(raw) = self.hook_ctx.take() {
let result = unsafe {
ffi::tidesdb_cf_set_commit_hook(self.cf, None, ptr::null_mut())
};
unsafe {
drop(Box::from_raw(raw));
}
check_result(result, "failed to clear commit hook")?;
}
Ok(())
}
pub fn update_runtime_config(
&self,
config: &ColumnFamilyConfig,
persist_to_disk: bool,
) -> Result<()> {
let c_config = config.to_c_config();
let result = unsafe {
ffi::tidesdb_cf_update_runtime_config(
self.cf,
&c_config,
if persist_to_disk { 1 } else { 0 },
)
};
check_result(result, "failed to update runtime config")
}
}
impl Drop for ColumnFamily {
fn drop(&mut self) {
if let Some(raw) = self.hook_ctx.take() {
unsafe {
let _ = ffi::tidesdb_cf_set_commit_hook(self.cf, None, ptr::null_mut());
drop(Box::from_raw(raw));
}
}
}
}