use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use dashmap::DashMap;
use parking_lot::RwLock;
use crate::durable_storage::{DurableStorage, TransactionMode};
use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
use crate::key_buffer::KeyBuffer;
use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
use sochdb_core::catalog::Catalog;
use sochdb_core::{Result, SochDBError, SochValue};
pub use crate::durable_storage::RecoveryStats;
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
pub group_commit: bool,
pub memtable_size_limit: usize,
pub wal_enabled: bool,
pub sync_mode: SyncMode,
pub read_only: bool,
#[deprecated(
since = "0.2.0",
note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
)]
pub enable_ordered_index: bool,
pub group_commit_config: GroupCommitSettings,
pub default_index_policy: IndexPolicy,
}
#[derive(Debug, Clone)]
pub struct GroupCommitSettings {
pub min_batch_size: usize,
pub max_batch_size: usize,
pub max_wait_us: u64,
pub fsync_latency_us: u64,
}
impl Default for GroupCommitSettings {
fn default() -> Self {
Self {
min_batch_size: 1,
max_batch_size: 1000,
max_wait_us: 10_000, fsync_latency_us: 5_000, }
}
}
impl GroupCommitSettings {
pub fn high_throughput() -> Self {
Self {
min_batch_size: 50,
max_batch_size: 5000,
max_wait_us: 50_000, fsync_latency_us: 5_000,
}
}
pub fn low_latency() -> Self {
Self {
min_batch_size: 1,
max_batch_size: 10,
max_wait_us: 1_000, fsync_latency_us: 5_000,
}
}
pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
(n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
}
}
impl Default for DatabaseConfig {
#[allow(deprecated)]
fn default() -> Self {
Self {
group_commit: true,
memtable_size_limit: 64 * 1024 * 1024, wal_enabled: true,
sync_mode: SyncMode::Normal,
read_only: false,
enable_ordered_index: true, group_commit_config: GroupCommitSettings::default(),
default_index_policy: IndexPolicy::Balanced, }
}
}
impl DatabaseConfig {
#[allow(deprecated)]
pub fn throughput_optimized() -> Self {
Self {
group_commit: true,
memtable_size_limit: 128 * 1024 * 1024, wal_enabled: true,
sync_mode: SyncMode::Normal,
read_only: false,
enable_ordered_index: false,
group_commit_config: GroupCommitSettings::high_throughput(),
default_index_policy: IndexPolicy::WriteOptimized, }
}
#[allow(deprecated)]
pub fn latency_optimized() -> Self {
Self {
group_commit: true,
memtable_size_limit: 32 * 1024 * 1024, wal_enabled: true,
sync_mode: SyncMode::Full,
read_only: false,
enable_ordered_index: true,
group_commit_config: GroupCommitSettings::low_latency(),
default_index_policy: IndexPolicy::ScanOptimized, }
}
#[allow(deprecated)]
pub fn sqlite_compatible() -> Self {
Self {
group_commit: false, memtable_size_limit: 64 * 1024 * 1024,
wal_enabled: true,
sync_mode: SyncMode::Normal, read_only: false,
enable_ordered_index: true,
group_commit_config: GroupCommitSettings::default(),
default_index_policy: IndexPolicy::Balanced, }
}
pub fn effective_ordered_index(&self) -> bool {
matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
Off = 0,
Normal = 1,
Full = 2,
}
impl SyncMode {
pub fn from_sqlite_pragma(value: u32) -> Self {
match value {
0 => SyncMode::Off,
1 => SyncMode::Normal,
_ => SyncMode::Full, }
}
pub fn to_sqlite_pragma(self) -> u32 {
self as u32
}
pub fn parse(s: &str) -> Option<Self> {
match s.to_ascii_uppercase().as_str() {
"OFF" | "0" => Some(SyncMode::Off),
"NORMAL" | "1" => Some(SyncMode::Normal),
"FULL" | "2" => Some(SyncMode::Full),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnDef>,
}
#[derive(Debug, Clone)]
pub struct ColumnDef {
pub name: String,
pub col_type: ColumnType,
pub nullable: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ColumnType {
Int64,
UInt64,
Float64,
Text,
Binary,
Bool,
}
#[derive(Debug, Clone, Copy)]
pub struct TxnHandle {
pub txn_id: u64,
pub snapshot_ts: u64,
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<String>,
pub rows: Vec<HashMap<String, SochValue>>,
pub rows_scanned: usize,
pub bytes_read: usize,
}
impl QueryResult {
pub fn empty() -> Self {
Self {
columns: vec![],
rows: vec![],
rows_scanned: 0,
bytes_read: 0,
}
}
pub fn to_toon(&self) -> String {
if self.rows.is_empty() {
return "[]".to_string();
}
let n = self.rows.len();
let cols = self.columns.join(",");
let rows_str: Vec<String> = self
.rows
.iter()
.map(|row| {
self.columns
.iter()
.map(|c| {
row.get(c)
.map(format_soch_value)
.unwrap_or_else(|| "∅".to_string())
})
.collect::<Vec<_>>()
.join(",")
})
.collect();
format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
}
}
fn format_soch_value(v: &SochValue) -> String {
match v {
SochValue::Null => "∅".to_string(),
SochValue::Int(i) => i.to_string(),
SochValue::UInt(u) => u.to_string(),
SochValue::Float(f) => format!("{:.6}", f),
SochValue::Text(s) => {
if s.contains(',') || s.contains(';') {
format!("\"{}\"", s.replace('"', "\\\""))
} else {
s.clone()
}
}
SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
_ => format!("{:?}", v),
}
}
fn base64_encode(data: &[u8]) -> String {
const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::new();
for chunk in data.chunks(3) {
let b0 = chunk[0] as usize;
let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
result.push(ALPHABET[b0 >> 2] as char);
result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
if chunk.len() > 1 {
result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
} else {
result.push('=');
}
if chunk.len() > 2 {
result.push(ALPHABET[b2 & 0x3f] as char);
} else {
result.push('=');
}
}
result
}
use sochdb_core::TypedColumn as CoreTypedColumn;
#[derive(Debug, Clone)]
pub struct ColumnarQueryResult {
pub columns: Vec<String>,
pub data: Vec<CoreTypedColumn>,
pub row_count: usize,
pub bytes_read: usize,
}
impl ColumnarQueryResult {
pub fn empty() -> Self {
Self {
columns: vec![],
data: vec![],
row_count: 0,
bytes_read: 0,
}
}
pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
self.columns
.iter()
.position(|c| c == name)
.and_then(|idx| self.data.get(idx))
}
pub fn column_index(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c == name)
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn column_count(&self) -> usize {
self.columns.len()
}
pub fn memory_size(&self) -> usize {
self.data.iter().map(|c| c.memory_size()).sum()
}
pub fn sum_i64(&self, column: &str) -> Option<i64> {
self.column(column).map(|c| c.sum_i64())
}
pub fn sum_f64(&self, column: &str) -> Option<f64> {
self.column(column).map(|c| c.sum_f64())
}
pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
self.column(column).map(|c| c.stats())
}
pub fn to_toon(&self) -> String {
if self.row_count == 0 {
return "[]".to_string();
}
let n = self.row_count;
let cols = self.columns.join(",");
let mut rows_str = Vec::with_capacity(n);
for i in 0..n {
let row: Vec<String> = self
.data
.iter()
.map(|col| format_columnar_value(col, i))
.collect();
rows_str.push(row.join(","));
}
format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
}
}
fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
match col {
CoreTypedColumn::Int64 {
values, validity, ..
} => {
if validity.is_valid(idx) && idx < values.len() {
values[idx].to_string()
} else {
"∅".to_string()
}
}
CoreTypedColumn::UInt64 {
values, validity, ..
} => {
if validity.is_valid(idx) && idx < values.len() {
values[idx].to_string()
} else {
"∅".to_string()
}
}
CoreTypedColumn::Float64 {
values, validity, ..
} => {
if validity.is_valid(idx) && idx < values.len() {
format!("{:.6}", values[idx])
} else {
"∅".to_string()
}
}
CoreTypedColumn::Text {
offsets,
data,
validity,
..
} => {
if validity.is_valid(idx) && idx + 1 < offsets.len() {
let start = offsets[idx] as usize;
let end = offsets[idx + 1] as usize;
std::str::from_utf8(&data[start..end])
.map(|s| {
if s.contains(',') || s.contains(';') {
format!("\"{}\"", s.replace('"', "\\\""))
} else {
s.to_string()
}
})
.unwrap_or_else(|_| "∅".to_string())
} else {
"∅".to_string()
}
}
CoreTypedColumn::Binary {
offsets,
data,
validity,
..
} => {
if validity.is_valid(idx) && idx + 1 < offsets.len() {
let start = offsets[idx] as usize;
let end = offsets[idx + 1] as usize;
format!("b64:{}", base64_encode(&data[start..end]))
} else {
"∅".to_string()
}
}
CoreTypedColumn::Bool {
values,
validity,
len,
..
} => {
if validity.is_valid(idx) && idx < *len {
let word = idx / 64;
let bit = idx % 64;
if (values[word] >> bit) & 1 == 1 {
"T"
} else {
"F"
}
.to_string()
} else {
"∅".to_string()
}
}
}
}
#[derive(Debug, Clone)]
pub struct VectorSearchResult {
pub id: u64,
pub distance: f32,
pub metadata: Option<HashMap<String, SochValue>>,
}
#[allow(dead_code)]
pub struct Database {
path: PathBuf,
storage: Arc<DurableStorage>,
concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
catalog: Arc<RwLock<Catalog>>,
tables: DashMap<String, TableSchema>,
packed_schemas: DashMap<String, PackedTableSchema>,
index_registry: Arc<TableIndexRegistry>,
config: DatabaseConfig,
stats: DatabaseStats,
shutdown: AtomicU64,
is_concurrent: bool,
}
struct DatabaseStats {
transactions_started: AtomicU64,
transactions_committed: AtomicU64,
transactions_aborted: AtomicU64,
queries_executed: AtomicU64,
bytes_written: AtomicU64,
bytes_read: AtomicU64,
}
impl DatabaseStats {
fn new() -> Self {
Self {
transactions_started: AtomicU64::new(0),
transactions_committed: AtomicU64::new(0),
transactions_aborted: AtomicU64::new(0),
queries_executed: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct Stats {
pub transactions_started: u64,
pub transactions_committed: u64,
pub transactions_aborted: u64,
pub queries_executed: u64,
pub bytes_written: u64,
pub bytes_read: u64,
}
impl Database {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
Self::open_with_config(path, DatabaseConfig::default())
}
#[cfg(test)]
pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
let path = path.as_ref().to_path_buf();
let config = DatabaseConfig::default();
let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
config.default_index_policy,
));
let db = Arc::new(Self {
path: path.clone(),
storage,
concurrent_mvcc: None,
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
tables: DashMap::new(),
packed_schemas: DashMap::new(),
index_registry,
config,
stats: DatabaseStats::new(),
shutdown: AtomicU64::new(0),
is_concurrent: false,
});
db.recover()?;
Ok(db)
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
let path = path.as_ref().to_path_buf();
let storage = Arc::new(DurableStorage::open_with_policy(
&path,
config.default_index_policy,
config.group_commit,
)?);
let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
config.default_index_policy,
));
let db = Arc::new(Self {
path: path.clone(),
storage,
concurrent_mvcc: None,
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
tables: DashMap::new(),
packed_schemas: DashMap::new(),
index_registry,
config,
stats: DatabaseStats::new(),
shutdown: AtomicU64::new(0),
is_concurrent: false,
});
db.recover()?;
Ok(db)
}
pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
Self::open_concurrent_with_config(path, DatabaseConfig::default())
}
pub fn open_concurrent_with_config<P: AsRef<Path>>(
path: P,
config: DatabaseConfig,
) -> Result<Arc<Self>> {
use crate::mvcc_concurrent::ConcurrentMvcc;
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
let storage = Arc::new(DurableStorage::open_for_concurrent(&path, config.default_index_policy)?);
let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
config.default_index_policy,
));
let db = Arc::new(Self {
path: path.clone(),
storage,
concurrent_mvcc: Some(concurrent_mvcc),
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
tables: DashMap::new(),
packed_schemas: DashMap::new(),
index_registry,
config,
stats: DatabaseStats::new(),
shutdown: AtomicU64::new(0),
is_concurrent: true,
});
db.recover()?;
if let Some(ref mvcc) = db.concurrent_mvcc {
mvcc.cleanup_stale_readers();
}
Ok(db)
}
#[inline]
pub fn is_concurrent(&self) -> bool {
self.is_concurrent
}
fn recover(&self) -> Result<RecoveryStats> {
self.storage.recover()
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn begin_transaction(&self) -> Result<TxnHandle> {
self.stats
.transactions_started
.fetch_add(1, Ordering::Relaxed);
let txn_id = self.storage.begin_transaction()?;
Ok(TxnHandle {
txn_id,
snapshot_ts: txn_id,
})
}
pub fn begin_read_only(&self) -> Result<TxnHandle> {
self.stats
.transactions_started
.fetch_add(1, Ordering::Relaxed);
let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
Ok(TxnHandle {
txn_id,
snapshot_ts: txn_id,
})
}
pub fn begin_write_only(&self) -> Result<TxnHandle> {
self.stats
.transactions_started
.fetch_add(1, Ordering::Relaxed);
let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
Ok(TxnHandle {
txn_id,
snapshot_ts: txn_id,
})
}
pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
self.stats
.transactions_committed
.fetch_add(1, Ordering::Relaxed);
let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
} else {
None
};
let commit_ts = self.storage.commit(txn.txn_id)?;
if self.is_concurrent {
self.storage.flush_wal()?;
self.storage.fsync()?;
}
if let Some(ref mvcc) = self.concurrent_mvcc {
mvcc.on_commit();
}
Ok(commit_ts)
}
pub fn abort(&self, txn: TxnHandle) -> Result<()> {
self.stats
.transactions_aborted
.fetch_add(1, Ordering::Relaxed);
self.storage.abort(txn.txn_id)
}
pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
self.index_registry.configure_table(
TableIndexConfig::new(table, policy)
);
}
pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
self.index_registry.get_policy(table)
}
pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
&self.index_registry
}
pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
self.stats
.bytes_written
.fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
} else {
None
};
self.storage.write_refs(txn.txn_id, key, value)
}
pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
let bytes: u64 = writes
.iter()
.map(|(k, v)| (k.len() + v.len()) as u64)
.sum();
self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
} else {
None
};
self.storage.write_batch_refs(txn.txn_id, writes)
}
pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
let result = self.storage.read(txn.txn_id, key)?;
if let Some(ref data) = result {
self.stats
.bytes_read
.fetch_add(data.len() as u64, Ordering::Relaxed);
}
Ok(result)
}
pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
self.storage.delete(txn.txn_id, key.to_vec())
}
pub const MIN_SCAN_PREFIX_LEN: usize = 2;
pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
return Err(SochDBError::InvalidArgument(format!(
"Prefix too short: {} bytes (minimum {} required). \
Use scan_unchecked() for unrestricted scans.",
prefix.len(),
Self::MIN_SCAN_PREFIX_LEN
)));
}
self.scan_unchecked(txn, prefix)
}
pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let results = self.storage.scan(txn.txn_id, prefix)?;
let bytes: u64 = results
.iter()
.map(|(k, v)| (k.len() + v.len()) as u64)
.sum();
self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
Ok(results)
}
pub fn scan_range(
&self,
txn: TxnHandle,
start: &[u8],
end: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let results = self.storage.scan_range(txn.txn_id, start, end)?;
let bytes: u64 = results
.iter()
.map(|(k, v)| (k.len() + v.len()) as u64)
.sum();
self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
Ok(results)
}
pub fn scan_range_iter<'a>(
&'a self,
txn: TxnHandle,
start: &'a [u8],
end: &'a [u8],
) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
let stats = &self.stats;
self.storage
.scan_range_iter(txn.txn_id, start, end)
.map(move |item| {
stats.bytes_read.fetch_add(
(item.0.len() + item.1.len()) as u64,
Ordering::Relaxed,
);
Ok(item)
})
}
pub fn flush(&self) -> Result<()> {
self.storage.fsync()
}
pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
self.storage.stats()
}
pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
self.put(txn, path.as_bytes(), value)
}
pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
self.get(txn, path.as_bytes())
}
pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
self.delete(txn, path.as_bytes())
}
pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
let results = self.scan(txn, prefix.as_bytes())?;
Ok(results
.into_iter()
.filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
.collect())
}
pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
QueryBuilder::new(self, txn, path_prefix.to_string())
}
pub fn register_table(&self, schema: TableSchema) -> Result<()> {
if self.tables.contains_key(&schema.name) {
return Err(SochDBError::InvalidArgument(format!(
"Table '{}' already exists",
schema.name
)));
}
let packed_schema = Self::to_packed_schema(&schema);
self.packed_schemas
.insert(schema.name.clone(), packed_schema);
self.tables.insert(schema.name.clone(), schema);
Ok(())
}
pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
self.tables.get(name).map(|s| s.clone())
}
pub fn list_tables(&self) -> Vec<String> {
self.tables.iter().map(|e| e.key().clone()).collect()
}
fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
let columns = schema
.columns
.iter()
.map(|col| PackedColumnDef {
name: col.name.clone(),
col_type: match col.col_type {
ColumnType::Int64 => PackedColumnType::Int64,
ColumnType::UInt64 => PackedColumnType::UInt64,
ColumnType::Float64 => PackedColumnType::Float64,
ColumnType::Text => PackedColumnType::Text,
ColumnType::Binary => PackedColumnType::Binary,
ColumnType::Bool => PackedColumnType::Bool,
},
nullable: col.nullable,
})
.collect();
PackedTableSchema::new(&schema.name, columns)
}
pub fn insert_row(
&self,
txn: TxnHandle,
table: &str,
row_id: u64,
values: &HashMap<String, SochValue>,
) -> Result<()> {
let packed_schema = self
.packed_schemas
.get(table)
.ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
let packed_row = PackedRow::pack(&packed_schema, values);
let key = KeyBuffer::format_row_key(table, row_id);
self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
Ok(())
}
pub fn read_row(
&self,
txn: TxnHandle,
table: &str,
row_id: u64,
columns: Option<&[&str]>,
) -> Result<Option<HashMap<String, SochValue>>> {
let schema = self
.tables
.get(table)
.ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
let key = KeyBuffer::format_row_key(table, row_id);
let bytes = match self.get(txn, key.as_bytes())? {
Some(b) => b,
None => return Ok(None),
};
let packed_schema = self
.packed_schemas
.get(table)
.ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
let cols_to_read: Vec<&str> = match columns {
Some(c) => c.to_vec(),
None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
};
let mut row = HashMap::new();
for col_name in cols_to_read {
if let Some(idx) = packed_schema.column_index(col_name)
&& let Some(col_def) = packed_schema.column(idx)
&& let Some(value) = packed_row.get_column(idx, col_def.col_type)
{
row.insert(col_name.to_string(), value);
}
}
Ok(Some(row))
}
pub fn insert_rows_batch(
&self,
txn: TxnHandle,
table: &str,
rows: &[(u64, HashMap<String, SochValue>)],
) -> Result<usize> {
let packed_schema = self
.packed_schemas
.get(table)
.ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
let mut count = 0;
for (row_id, values) in rows {
let packed_row = PackedRow::pack(&packed_schema, values);
let key = KeyBuffer::format_row_key(table, *row_id);
self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
count += 1;
}
Ok(count)
}
#[inline]
pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
self.storage.write_refs(txn.txn_id, key, value)
}
#[inline]
pub fn insert_row_slice(
&self,
txn: TxnHandle,
table: &str,
row_id: u64,
values: &[Option<&SochValue>],
) -> Result<()> {
let packed_schema = self
.packed_schemas
.get(table)
.ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
if values.len() != packed_schema.num_columns() {
return Err(SochDBError::InvalidArgument(format!(
"Expected {} columns, got {}",
packed_schema.num_columns(),
values.len()
)));
}
let packed_row = PackedRow::pack_slice(&packed_schema, values);
let key = KeyBuffer::format_row_key(table, row_id);
self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
Ok(())
}
pub fn fsync(&self) -> Result<()> {
self.storage.fsync()
}
pub fn checkpoint(&self) -> Result<u64> {
self.storage.checkpoint()
}
pub fn truncate_wal(&self) -> Result<()> {
self.storage.truncate_wal()
}
pub fn gc(&self) -> usize {
self.storage.gc()
}
pub fn stats(&self) -> Stats {
Stats {
transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
}
}
pub fn shutdown(&self) -> Result<()> {
if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
return Ok(()); }
self.fsync()?;
let marker = self.path.join(".clean_shutdown");
std::fs::write(&marker, b"ok")?;
Ok(())
}
}
impl Drop for Database {
fn drop(&mut self) {
if self.shutdown.load(Ordering::SeqCst) == 0 {
let _ = self.fsync();
let marker = self.path.join(".clean_shutdown");
let _ = std::fs::write(&marker, b"ok");
}
}
}
pub struct QueryBuilder<'a> {
db: &'a Database,
txn: TxnHandle,
path_prefix: String,
columns: Option<Vec<String>>,
limit: Option<usize>,
offset: Option<usize>,
}
impl<'a> QueryBuilder<'a> {
fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
Self {
db,
txn,
path_prefix,
columns: None,
limit: None,
offset: None,
}
}
pub fn columns(mut self, cols: &[&str]) -> Self {
self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
self
}
pub fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn offset(mut self, n: usize) -> Self {
self.offset = Some(n);
self
}
pub fn execute(self) -> Result<QueryResult> {
self.db
.stats
.queries_executed
.fetch_add(1, Ordering::Relaxed);
let table_name = self
.path_prefix
.split('/')
.next()
.unwrap_or(&self.path_prefix);
let schema = self.db.tables.get(table_name).map(|s| s.clone());
let results = self.db.scan_path(self.txn, &self.path_prefix)?;
let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
let mut bytes_read = 0usize;
if let Some(ref schema) = schema {
let packed_schema = self
.db
.packed_schemas
.get(table_name)
.map(|ps| ps.clone())
.unwrap_or_else(|| Database::to_packed_schema(schema));
for (path, value_bytes) in results {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() == 2 {
bytes_read += value_bytes.len();
if let Ok(packed_row) =
PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
{
let mut row = HashMap::new();
if let Some(ref cols) = self.columns {
for col_name in cols {
if let Some(idx) = packed_schema.column_index(col_name)
&& let Some(col_def) = packed_schema.column(idx)
&& let Some(value) =
packed_row.get_column(idx, col_def.col_type)
{
row.insert(col_name.clone(), value);
}
}
} else {
row = packed_row.unpack(&packed_schema);
}
if !row.is_empty() {
rows.push(row);
}
}
}
}
} else {
let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
for (path, value_bytes) in results {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() >= 3 {
let row_key = format!("{}/{}", parts[0], parts[1]);
let col_name = parts[2..].join("/");
if let Some(ref cols) = self.columns
&& !cols.contains(&col_name)
{
continue;
}
bytes_read += value_bytes.len();
let row = rows_map.entry(row_key).or_default();
row.insert(col_name, deserialize_value(&value_bytes));
}
}
rows = rows_map.into_values().collect();
}
if let Some(offset) = self.offset {
rows = rows.into_iter().skip(offset).collect();
}
if let Some(limit) = self.limit {
rows.truncate(limit);
}
let columns: Vec<String> = self.columns.unwrap_or_else(|| {
rows.iter()
.flat_map(|r| r.keys().cloned())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect()
});
Ok(QueryResult {
columns,
rows_scanned: rows.len(),
bytes_read,
rows,
})
}
pub fn to_toon(self) -> Result<String> {
let result = self.execute()?;
Ok(result.to_toon())
}
pub fn execute_iter(self) -> Result<QueryRowIterator> {
self.db
.stats
.queries_executed
.fetch_add(1, Ordering::Relaxed);
let table_name = self
.path_prefix
.split('/')
.next()
.unwrap_or(&self.path_prefix)
.to_string();
let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
let results = self.db.scan_path(self.txn, &self.path_prefix)?;
Ok(QueryRowIterator {
results: results.into_iter(),
packed_schema,
columns: self.columns,
offset: self.offset.unwrap_or(0),
limit: self.limit,
yielded: 0,
skipped: 0,
})
}
pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
self.db
.stats
.queries_executed
.fetch_add(1, Ordering::Relaxed);
let table_name = self
.path_prefix
.split('/')
.next()
.unwrap_or(&self.path_prefix);
let schema = self.db.tables.get(table_name).map(|s| s.clone());
let packed_schema = match self.db.packed_schemas.get(table_name) {
Some(ps) => ps.clone(),
None => return Ok(ColumnarQueryResult::empty()),
};
let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
schema
.as_ref()
.map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
.unwrap_or_default()
});
if column_names.is_empty() {
return Ok(ColumnarQueryResult::empty());
}
let mut columns: Vec<CoreTypedColumn> = column_names
.iter()
.map(|col_name| {
packed_schema
.column_index(col_name)
.and_then(|idx| packed_schema.column(idx))
.map(|col_def| match col_def.col_type {
PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
PackedColumnType::Text => CoreTypedColumn::new_text(),
PackedColumnType::Binary => CoreTypedColumn::new_binary(),
PackedColumnType::Bool => CoreTypedColumn::new_bool(),
PackedColumnType::Null => CoreTypedColumn::new_text(), })
.unwrap_or_else(CoreTypedColumn::new_text) })
.collect();
let results = self.db.scan_path(self.txn, &self.path_prefix)?;
let mut row_count = 0;
let mut bytes_read = 0;
let mut skipped = 0;
for (path, value_bytes) in results {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() != 2 {
continue;
}
if let Some(offset) = self.offset
&& skipped < offset
{
skipped += 1;
continue;
}
if let Some(limit) = self.limit
&& row_count >= limit
{
break;
}
bytes_read += value_bytes.len();
if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
{
for (col_idx, col_name) in column_names.iter().enumerate() {
if let Some(schema_idx) = packed_schema.column_index(col_name) {
if let Some(col_def) = packed_schema.column(schema_idx) {
let value = packed_row.get_column(schema_idx, col_def.col_type);
push_value_to_typed_column(&mut columns[col_idx], value);
} else {
push_null_to_typed_column(&mut columns[col_idx]);
}
} else {
push_null_to_typed_column(&mut columns[col_idx]);
}
}
row_count += 1;
}
}
Ok(ColumnarQueryResult {
columns: column_names,
data: columns,
row_count,
bytes_read,
})
}
}
pub struct QueryRowIterator {
results: std::vec::IntoIter<(String, Vec<u8>)>,
packed_schema: Option<PackedTableSchema>,
columns: Option<Vec<String>>,
offset: usize,
limit: Option<usize>,
yielded: usize,
skipped: usize,
}
impl Iterator for QueryRowIterator {
type Item = Result<Vec<SochValue>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(limit) = self.limit
&& self.yielded >= limit
{
return None;
}
loop {
let (path, value_bytes) = self.results.next()?;
let parts: Vec<&str> = path.split('/').collect();
if parts.len() != 2 {
continue; }
if self.skipped < self.offset {
self.skipped += 1;
continue;
}
if let Some(ref schema) = self.packed_schema {
match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
Ok(packed_row) => {
let row = if let Some(ref cols) = self.columns {
cols.iter()
.map(|col_name| {
schema
.column_index(col_name)
.and_then(|idx| schema.column(idx))
.and_then(|col_def| {
packed_row.get_column(
schema.column_index(col_name).unwrap(),
col_def.col_type,
)
})
.unwrap_or(SochValue::Null)
})
.collect()
} else {
packed_row.unpack_to_vec(schema)
};
self.yielded += 1;
return Some(Ok(row));
}
Err(e) => return Some(Err(e)),
}
} else {
self.yielded += 1;
return Some(Ok(vec![SochValue::Binary(value_bytes)]));
}
}
}
}
#[allow(dead_code)]
fn serialize_value(value: &SochValue) -> Vec<u8> {
match value {
SochValue::Null => vec![0],
SochValue::Int(i) => {
let mut buf = vec![1];
buf.extend_from_slice(&i.to_le_bytes());
buf
}
SochValue::UInt(u) => {
let mut buf = vec![2];
buf.extend_from_slice(&u.to_le_bytes());
buf
}
SochValue::Float(f) => {
let mut buf = vec![3];
buf.extend_from_slice(&f.to_le_bytes());
buf
}
SochValue::Text(s) => {
let mut buf = vec![4];
buf.extend_from_slice(s.as_bytes());
buf
}
SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
SochValue::Binary(b) => {
let mut buf = vec![6];
buf.extend_from_slice(b);
buf
}
_ => {
let s = format!("{:?}", value);
let mut buf = vec![4];
buf.extend_from_slice(s.as_bytes());
buf
}
}
}
fn deserialize_value(bytes: &[u8]) -> SochValue {
if bytes.is_empty() {
return SochValue::Null;
}
match bytes[0] {
0 => SochValue::Null,
1 if bytes.len() >= 9 => {
let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
SochValue::Int(i)
}
2 if bytes.len() >= 9 => {
let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
SochValue::UInt(u)
}
3 if bytes.len() >= 9 => {
let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
SochValue::Float(f)
}
4 => {
let s = String::from_utf8_lossy(&bytes[1..]).to_string();
SochValue::Text(s)
}
5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
6 => SochValue::Binary(bytes[1..].to_vec()),
_ => {
let s = String::from_utf8_lossy(bytes).to_string();
SochValue::Text(s)
}
}
}
fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
match value {
None => push_null_to_typed_column(col),
Some(v) => match (col, v) {
(
CoreTypedColumn::Int64 {
values,
validity,
stats,
},
SochValue::Int(i),
) => {
values.push(i);
validity.push(true);
stats.update_i64(i);
}
(
CoreTypedColumn::Int64 {
values,
validity,
stats,
},
SochValue::UInt(u),
) => {
values.push(u as i64);
validity.push(true);
stats.update_i64(u as i64);
}
(
CoreTypedColumn::UInt64 {
values,
validity,
stats,
},
SochValue::UInt(u),
) => {
values.push(u);
validity.push(true);
stats.update_i64(u as i64);
}
(
CoreTypedColumn::UInt64 {
values,
validity,
stats,
},
SochValue::Int(i),
) => {
values.push(i as u64);
validity.push(true);
stats.update_i64(i);
}
(
CoreTypedColumn::Float64 {
values,
validity,
stats,
},
SochValue::Float(f),
) => {
values.push(f);
validity.push(true);
stats.update_f64(f);
}
(
CoreTypedColumn::Float64 {
values,
validity,
stats,
},
SochValue::Int(i),
) => {
values.push(i as f64);
validity.push(true);
stats.update_f64(i as f64);
}
(
CoreTypedColumn::Text {
offsets,
data,
validity,
stats,
},
SochValue::Text(s),
) => {
data.extend_from_slice(s.as_bytes());
offsets.push(data.len() as u32);
validity.push(true);
stats.row_count += 1;
}
(
CoreTypedColumn::Binary {
offsets,
data,
validity,
stats,
},
SochValue::Binary(b),
) => {
data.extend_from_slice(&b);
offsets.push(data.len() as u32);
validity.push(true);
stats.row_count += 1;
}
(
CoreTypedColumn::Bool {
values,
validity,
stats,
len,
},
SochValue::Bool(b),
) => {
let idx = *len;
*len += 1;
let num_words = (*len).div_ceil(64);
while values.len() < num_words {
values.push(0);
}
if b {
let word = idx / 64;
let bit = idx % 64;
values[word] |= 1 << bit;
}
validity.push(true);
stats.row_count += 1;
}
(col, _) => push_null_to_typed_column(col),
},
}
}
fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
match col {
CoreTypedColumn::Int64 {
values,
validity,
stats,
} => {
values.push(0);
validity.push(false);
stats.update_null();
}
CoreTypedColumn::UInt64 {
values,
validity,
stats,
} => {
values.push(0);
validity.push(false);
stats.update_null();
}
CoreTypedColumn::Float64 {
values,
validity,
stats,
} => {
values.push(0.0);
validity.push(false);
stats.update_null();
}
CoreTypedColumn::Text {
offsets,
data: _,
validity,
stats,
} => {
offsets.push(offsets.last().copied().unwrap_or(0));
validity.push(false);
stats.update_null();
}
CoreTypedColumn::Binary {
offsets,
data: _,
validity,
stats,
} => {
offsets.push(offsets.last().copied().unwrap_or(0));
validity.push(false);
stats.update_null();
}
CoreTypedColumn::Bool {
values,
validity,
stats,
len,
} => {
*len += 1;
let num_words = (*len).div_ceil(64);
while values.len() < num_words {
values.push(0);
}
validity.push(false);
stats.update_null();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_database_open_close() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
let txn = db.begin_transaction().unwrap();
assert!(txn.txn_id > 0);
db.abort(txn).unwrap();
db.shutdown().unwrap();
}
#[test]
fn test_database_put_get() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
let txn = db.begin_transaction().unwrap();
db.put(txn, b"key1", b"value1").unwrap();
let val = db.get(txn, b"key1").unwrap();
assert_eq!(val, Some(b"value1".to_vec()));
db.commit(txn).unwrap();
let txn2 = db.begin_transaction().unwrap();
let val = db.get(txn2, b"key1").unwrap();
assert_eq!(val, Some(b"value1".to_vec()));
db.abort(txn2).unwrap();
}
#[test]
fn test_database_path_api() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
let txn = db.begin_transaction().unwrap();
db.put_path(txn, "users/1/name", b"Alice").unwrap();
db.put_path(txn, "users/1/email", b"alice@example.com")
.unwrap();
db.put_path(txn, "users/2/name", b"Bob").unwrap();
db.commit(txn).unwrap();
let txn2 = db.begin_transaction().unwrap();
let results = db.scan_path(txn2, "users/1/").unwrap();
assert_eq!(results.len(), 2);
db.abort(txn2).unwrap();
}
#[test]
fn test_database_table_api() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
db.register_table(TableSchema {
name: "users".to_string(),
columns: vec![
ColumnDef {
name: "name".to_string(),
col_type: ColumnType::Text,
nullable: false,
},
ColumnDef {
name: "age".to_string(),
col_type: ColumnType::Int64,
nullable: true,
},
],
})
.unwrap();
let txn = db.begin_transaction().unwrap();
let mut values = HashMap::new();
values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
values.insert("age".to_string(), SochValue::Int(30));
db.insert_row(txn, "users", 1, &values).unwrap();
db.commit(txn).unwrap();
let txn2 = db.begin_transaction().unwrap();
let row = db.read_row(txn2, "users", 1, None).unwrap();
assert!(row.is_some());
let row = row.unwrap();
assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
db.abort(txn2).unwrap();
}
#[test]
fn test_database_query_builder() {
let dir = tempdir().unwrap();
let db = Database::open(dir.path()).unwrap();
let txn = db.begin_transaction().unwrap();
db.put_path(txn, "docs/1/title", b"Hello").unwrap();
db.put_path(txn, "docs/1/content", b"World").unwrap();
db.put_path(txn, "docs/2/title", b"Foo").unwrap();
db.put_path(txn, "docs/2/content", b"Bar").unwrap();
db.commit(txn).unwrap();
let txn2 = db.begin_transaction().unwrap();
let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
assert_eq!(result.rows.len(), 1);
db.abort(txn2).unwrap();
}
#[test]
fn test_database_crash_recovery() {
let dir = tempdir().unwrap();
{
let db = Database::open_without_lock(dir.path()).unwrap();
db.storage.set_sync_mode(2);
let txn = db.begin_transaction().unwrap();
db.put(txn, b"persist", b"this").unwrap();
db.commit(txn).unwrap();
std::mem::forget(db);
}
{
let db = Database::open_without_lock(dir.path()).unwrap();
let txn = db.begin_transaction().unwrap();
let val = db.get(txn, b"persist").unwrap();
assert_eq!(val, Some(b"this".to_vec()));
db.abort(txn).unwrap();
}
}
}