use parking_lot::RwLock;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use sochdb_core::catalog::Catalog;
use sochdb_core::soch::{SochSchema, SochType, SochValue};
use crate::error::{ClientError, Result};
use crate::{ClientStats, schema::TableDescription};
pub type TxnId = u64;
pub type Timestamp = u64;
pub type RowId = u64;
#[derive(Debug, Clone, Default)]
pub struct MutationResult {
pub affected_count: usize,
pub affected_row_ids: Vec<RowId>,
}
impl MutationResult {
pub fn empty() -> Self {
Self::default()
}
pub fn new(affected_count: usize, affected_row_ids: Vec<RowId>) -> Self {
Self { affected_count, affected_row_ids }
}
}
#[derive(Debug, Clone)]
pub struct ColumnRef {
pub id: u32,
pub name: String,
pub field_type: FieldType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FieldType {
UInt64,
Int64,
Float64,
Text,
Bool,
Bytes,
}
#[derive(Debug, Clone)]
pub enum PathResolution {
Array {
schema: Arc<ArraySchema>,
columns: Vec<ColumnRef>,
},
Value(ColumnRef),
Partial { remaining: String },
NotFound,
}
#[derive(Debug, Clone)]
pub struct ArraySchema {
pub name: String,
pub fields: Vec<String>,
pub types: Vec<FieldType>,
}
#[deprecated(
since = "0.2.0",
note = "Use DurableConnection or sochdb_storage::WalStorageManager for production. \
This stub provides NO durability. REMOVAL SCHEDULED: v0.3.0"
)]
pub struct WalStorageManager {
next_txn_id: AtomicU64,
}
#[allow(deprecated)]
impl Default for WalStorageManager {
fn default() -> Self {
Self::new()
}
}
#[allow(deprecated)]
impl WalStorageManager {
pub fn new() -> Self {
Self {
next_txn_id: AtomicU64::new(1),
}
}
pub fn begin_txn(&self) -> Result<TxnId> {
Ok(self.next_txn_id.fetch_add(1, Ordering::SeqCst))
}
pub fn commit(&self, _txn_id: TxnId) -> Result<Timestamp> {
Ok(self.next_txn_id.load(Ordering::SeqCst))
}
pub fn abort(&self, _txn_id: TxnId) -> Result<()> {
Ok(())
}
}
#[deprecated(
since = "0.2.0",
note = "Use DurableConnection or sochdb_storage::MvccTransactionManager for production. \
This stub has NO MVCC semantics. REMOVAL SCHEDULED: v0.3.0"
)]
pub struct TransactionManager {
current_ts: AtomicU64,
}
#[allow(deprecated)]
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
#[allow(deprecated)]
impl TransactionManager {
pub fn new() -> Self {
Self {
current_ts: AtomicU64::new(1),
}
}
pub fn begin(&self) -> (TxnId, Timestamp) {
let ts = self.current_ts.fetch_add(1, Ordering::SeqCst);
(ts, ts)
}
pub fn current_timestamp(&self) -> Timestamp {
self.current_ts.load(Ordering::SeqCst)
}
pub fn commit(&self, _txn_id: TxnId) -> Result<Timestamp> {
let ts = self.current_ts.fetch_add(1, Ordering::SeqCst);
Ok(ts)
}
pub fn abort(&self, _txn_id: TxnId) -> Result<()> {
Ok(())
}
}
pub struct BloomFilter {
bits: Vec<u64>, num_bits: usize, num_hashes: u8, }
impl BloomFilter {
pub fn new(expected_elements: usize, fp_rate: f64) -> Self {
let num_bits =
(-(expected_elements as f64) * fp_rate.ln() / (2.0_f64.ln().powi(2))).ceil() as usize;
let num_bits = std::cmp::max(num_bits, 64); let num_words = num_bits.div_ceil(64);
let num_hashes = ((num_bits as f64 / expected_elements as f64) * 2.0_f64.ln()).ceil() as u8;
let num_hashes = std::cmp::max(num_hashes, 1);
Self {
bits: vec![0; num_words],
num_bits,
num_hashes,
}
}
pub fn insert(&mut self, key: &[u8]) {
let (h1, h2) = self.hash_key(key);
for i in 0..self.num_hashes as u64 {
let hash = h1.wrapping_add(i.wrapping_mul(h2));
let bit_idx = (hash % self.num_bits as u64) as usize;
let word_idx = bit_idx / 64;
let bit_offset = bit_idx % 64;
self.bits[word_idx] |= 1 << bit_offset;
}
}
pub fn may_contain(&self, key: &[u8]) -> bool {
let (h1, h2) = self.hash_key(key);
for i in 0..self.num_hashes as u64 {
let hash = h1.wrapping_add(i.wrapping_mul(h2));
let bit_idx = (hash % self.num_bits as u64) as usize;
let word_idx = bit_idx / 64;
let bit_offset = bit_idx % 64;
if (self.bits[word_idx] & (1 << bit_offset)) == 0 {
return false;
}
}
true
}
fn hash_key(&self, key: &[u8]) -> (u64, u64) {
const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x00000100000001B3;
let mut h1 = FNV_OFFSET_BASIS;
for &b in key {
h1 ^= b as u64;
h1 = h1.wrapping_mul(FNV_PRIME);
}
let mut h2 = h1.rotate_left(17);
h2 ^= h1;
h2 = h2.wrapping_mul(FNV_PRIME);
(h1, h2)
}
}
#[derive(Clone)]
pub struct SstEntry {
pub key: Vec<u8>,
pub value: Vec<u8>,
pub timestamp: u64,
pub deleted: bool, }
#[allow(dead_code)]
pub struct SSTable {
entries: Vec<SstEntry>,
bloom: BloomFilter,
min_key: Vec<u8>,
max_key: Vec<u8>,
level: usize,
seq_num: u64,
}
enum SstHandle {
InMemory(SSTable),
OnDisk {
reader: sochdb_storage::SSTable,
min_key: Vec<u8>,
max_key: Vec<u8>,
level: usize,
seq_num: u64,
},
}
impl SSTable {
pub fn from_entries(entries: Vec<SstEntry>, level: usize, seq_num: u64) -> Option<Self> {
if entries.is_empty() {
return None;
}
let mut bloom = BloomFilter::new(entries.len(), 0.01); for entry in &entries {
bloom.insert(&entry.key);
}
let min_key = entries.first()?.key.clone();
let max_key = entries.last()?.key.clone();
Some(Self {
entries,
bloom,
min_key,
max_key,
level,
seq_num,
})
}
pub fn key_in_range(&self, key: &[u8]) -> bool {
key >= self.min_key.as_slice() && key <= self.max_key.as_slice()
}
pub fn get(&self, key: &[u8]) -> Option<&SstEntry> {
if !self.bloom.may_contain(key) {
return None;
}
match self.entries.binary_search_by(|e| e.key.as_slice().cmp(key)) {
Ok(idx) => Some(&self.entries[idx]),
Err(_) => None,
}
}
}
struct Level {
sstables: Vec<SstHandle>,
target_size: u64,
}
impl Level {
fn new(target_size: u64) -> Self {
Self {
sstables: Vec::new(),
target_size,
}
}
fn total_size(&self) -> u64 {
self.sstables
.iter()
.map(|sst| match sst {
SstHandle::InMemory(s) => s.entries
.iter()
.map(|e| e.key.len() + e.value.len())
.sum::<usize>() as u64,
SstHandle::OnDisk { reader, .. } => {
reader.metadata().file_size
}
})
.sum()
}
}
struct MemTableEntry {
value: Vec<u8>,
timestamp: u64,
deleted: bool,
}
pub struct LscsStorage {
memtable: parking_lot::RwLock<std::collections::BTreeMap<Vec<u8>, MemTableEntry>>,
memtable_size: std::sync::atomic::AtomicU64,
immutable_memtables:
parking_lot::RwLock<Vec<std::collections::BTreeMap<Vec<u8>, MemTableEntry>>>,
levels: parking_lot::RwLock<Vec<Level>>,
next_seq: std::sync::atomic::AtomicU64,
current_lsn: std::sync::atomic::AtomicU64,
checkpoint_lsn: std::sync::atomic::AtomicU64,
wal_entries: parking_lot::RwLock<Vec<WalEntry>>,
stats: LscsStats,
config: LscsConfig,
}
struct WalEntry {
lsn: u64,
key: Vec<u8>,
value: Vec<u8>,
op: WalOp,
timestamp: u64,
checksum: u32,
}
#[derive(Clone, Copy)]
enum WalOp {
Put,
Delete,
}
struct LscsConfig {
memtable_flush_size: u64,
l0_target_size: u64,
level_size_ratio: u64,
max_levels: usize,
data_dir: Option<PathBuf>,
}
impl Default for LscsConfig {
fn default() -> Self {
Self {
memtable_flush_size: 4 * 1024 * 1024, l0_target_size: 64 * 1024 * 1024, level_size_ratio: 10,
max_levels: 7,
data_dir: None,
}
}
}
struct LscsStats {
writes: std::sync::atomic::AtomicU64,
reads: std::sync::atomic::AtomicU64,
bloom_filter_hits: std::sync::atomic::AtomicU64,
bloom_filter_misses: std::sync::atomic::AtomicU64,
}
#[derive(Debug, Clone)]
pub struct WalVerifyResult {
pub total_entries: u64,
pub valid_entries: u64,
pub corrupted_entries: u64,
pub last_valid_lsn: u64,
pub checksum_errors: Vec<ChecksumErr>,
}
#[derive(Debug, Clone)]
pub struct ChecksumErr {
pub lsn: u64,
pub expected: u64,
pub actual: u64,
pub entry_type: String,
}
#[derive(Debug, Clone)]
pub struct WalStatsData {
pub total_size_bytes: u64,
pub active_size_bytes: u64,
pub archived_size_bytes: u64,
pub oldest_entry_lsn: u64,
pub newest_entry_lsn: u64,
pub entry_count: u64,
}
impl Default for LscsStorage {
fn default() -> Self {
Self::new()
}
}
impl LscsStorage {
pub fn new() -> Self {
Self::with_config(LscsConfig::default())
}
pub fn with_data_dir(data_dir: impl AsRef<Path>) -> Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&data_dir)
.map_err(|e| ClientError::Storage(format!("Failed to create data dir: {}", e)))?;
let mut config = LscsConfig::default();
config.data_dir = Some(data_dir);
Ok(Self::with_config(config))
}
fn with_config(config: LscsConfig) -> Self {
let mut levels = Vec::with_capacity(config.max_levels);
for i in 0..config.max_levels {
let target = if i == 0 {
config.l0_target_size
} else {
config.l0_target_size * config.level_size_ratio.pow(i as u32)
};
levels.push(Level::new(target));
}
Self {
memtable: parking_lot::RwLock::new(std::collections::BTreeMap::new()),
memtable_size: std::sync::atomic::AtomicU64::new(0),
immutable_memtables: parking_lot::RwLock::new(Vec::new()),
levels: parking_lot::RwLock::new(levels),
next_seq: std::sync::atomic::AtomicU64::new(1),
current_lsn: std::sync::atomic::AtomicU64::new(1),
checkpoint_lsn: std::sync::atomic::AtomicU64::new(0),
wal_entries: parking_lot::RwLock::new(Vec::new()),
stats: LscsStats {
writes: std::sync::atomic::AtomicU64::new(0),
reads: std::sync::atomic::AtomicU64::new(0),
bloom_filter_hits: std::sync::atomic::AtomicU64::new(0),
bloom_filter_misses: std::sync::atomic::AtomicU64::new(0),
},
config,
}
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
use std::sync::atomic::Ordering;
let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let checksum = Self::compute_checksum(key, value, timestamp);
{
let mut wal = self.wal_entries.write();
wal.push(WalEntry {
lsn,
key: key.to_vec(),
value: value.to_vec(),
op: WalOp::Put,
timestamp,
checksum,
});
}
let entry_size = key.len() + value.len() + 24; {
let mut memtable = self.memtable.write();
memtable.insert(
key.to_vec(),
MemTableEntry {
value: value.to_vec(),
timestamp,
deleted: false,
},
);
}
let new_size = self
.memtable_size
.fetch_add(entry_size as u64, Ordering::SeqCst)
+ entry_size as u64;
self.stats.writes.fetch_add(1, Ordering::Relaxed);
if new_size >= self.config.memtable_flush_size {
self.maybe_flush_memtable()?;
}
Ok(())
}
pub fn delete(&self, key: &[u8]) -> Result<()> {
use std::sync::atomic::Ordering;
let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let checksum = Self::compute_checksum(key, &[], timestamp);
{
let mut wal = self.wal_entries.write();
wal.push(WalEntry {
lsn,
key: key.to_vec(),
value: Vec::new(),
op: WalOp::Delete,
timestamp,
checksum,
});
}
{
let mut memtable = self.memtable.write();
memtable.insert(
key.to_vec(),
MemTableEntry {
value: Vec::new(),
timestamp,
deleted: true,
},
);
}
self.memtable_size
.fetch_add(key.len() as u64 + 24, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
pub fn get(&self, _table: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
use std::sync::atomic::Ordering;
self.stats.reads.fetch_add(1, Ordering::Relaxed);
{
let memtable = self.memtable.read();
if let Some(entry) = memtable.get(key) {
if entry.deleted {
return Ok(None); }
return Ok(Some(entry.value.clone()));
}
}
{
let immutables = self.immutable_memtables.read();
for memtable in immutables.iter().rev() {
if let Some(entry) = memtable.get(key) {
if entry.deleted {
return Ok(None);
}
return Ok(Some(entry.value.clone()));
}
}
}
{
let levels = self.levels.read();
for level in levels.iter() {
for sst in level.sstables.iter().rev() {
match sst {
SstHandle::InMemory(s) => {
if s.key_in_range(key) {
if let Some(entry) = s.get(key) {
if entry.deleted {
return Ok(None);
}
self.stats.bloom_filter_hits.fetch_add(1, Ordering::Relaxed);
return Ok(Some(entry.value.clone()));
} else {
self.stats
.bloom_filter_misses
.fetch_add(1, Ordering::Relaxed);
}
}
}
SstHandle::OnDisk { reader, min_key, max_key, .. } => {
if key >= min_key.as_slice() && key <= max_key.as_slice() {
let opts = sochdb_storage::ReadOptions::default();
match reader.get(key, &opts) {
Ok(Some(value)) => {
if value.is_empty() {
return Ok(None);
}
self.stats.bloom_filter_hits.fetch_add(1, Ordering::Relaxed);
return Ok(Some(value));
}
Ok(None) => {
self.stats
.bloom_filter_misses
.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
continue;
}
}
}
}
}
}
}
}
Ok(None)
}
fn maybe_flush_memtable(&self) -> Result<()> {
use std::sync::atomic::Ordering;
let old_memtable = {
let mut memtable = self.memtable.write();
let old = std::mem::take(&mut *memtable);
self.memtable_size.store(0, Ordering::SeqCst);
old
};
if old_memtable.is_empty() {
return Ok(());
}
let entries: Vec<SstEntry> = old_memtable
.into_iter()
.map(|(key, entry)| SstEntry {
key,
value: entry.value,
timestamp: entry.timestamp,
deleted: entry.deleted,
})
.collect();
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let level = 0usize;
let sst_handle = if let Some(ref data_dir) = self.config.data_dir {
let sst_path = data_dir.join(format!("L{}_{:08}.sst", level, seq));
let opts = sochdb_storage::SSTableBuilderOptions::default();
let mut builder = sochdb_storage::SSTableBuilder::new(&sst_path, opts)
.map_err(|e| ClientError::Storage(format!("SSTable create failed: {}", e)))?;
builder.set_estimated_keys(entries.len());
for entry in &entries {
let val = if entry.deleted { &[][..] } else { &entry.value[..] };
builder.add(&entry.key, val)
.map_err(|e| ClientError::Storage(format!("SSTable write failed: {}", e)))?;
}
let result = builder.finish()
.map_err(|e| ClientError::Storage(format!("SSTable finish failed: {}", e)))?;
let min_key = result.smallest_key.clone().unwrap_or_default();
let max_key = result.largest_key.clone().unwrap_or_default();
let reader = sochdb_storage::SSTable::open(&sst_path)
.map_err(|e| ClientError::Storage(format!("SSTable open failed: {}", e)))?;
SstHandle::OnDisk {
reader,
min_key,
max_key,
level,
seq_num: seq,
}
} else {
match SSTable::from_entries(entries, level, seq) {
Some(sst) => SstHandle::InMemory(sst),
None => return Ok(()),
}
};
let mut levels = self.levels.write();
levels[0].sstables.push(sst_handle);
if levels[0].total_size() > levels[0].target_size {
drop(levels);
self.maybe_compact()?;
}
Ok(())
}
fn maybe_compact(&self) -> Result<()> {
use std::sync::atomic::Ordering;
let mut levels = self.levels.write();
for i in 0..levels.len() - 1 {
if levels[i].total_size() > levels[i].target_size {
if levels[i].sstables.is_empty() {
continue;
}
let mut all_entries: Vec<SstEntry> = Vec::new();
for sst in levels[i].sstables.drain(..) {
Self::drain_sst_entries(sst, &mut all_entries);
}
for sst in levels[i + 1].sstables.drain(..) {
Self::drain_sst_entries(sst, &mut all_entries);
}
all_entries.sort_by(|a, b| match a.key.cmp(&b.key) {
std::cmp::Ordering::Equal => b.timestamp.cmp(&a.timestamp),
other => other,
});
all_entries.dedup_by(|a, b| a.key == b.key);
if i + 1 == levels.len() - 1 {
all_entries.retain(|e| !e.deleted);
}
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
if let Some(sst) = SSTable::from_entries(all_entries, i + 1, seq) {
levels[i + 1].sstables.push(SstHandle::InMemory(sst));
}
}
}
Ok(())
}
fn drain_sst_entries(sst: SstHandle, out: &mut Vec<SstEntry>) {
match sst {
SstHandle::InMemory(s) => {
out.extend(s.entries);
}
SstHandle::OnDisk { reader, seq_num, .. } => {
let mut iter = reader.iter();
while iter.valid() {
if let (Some(key), Some(val)) = (iter.key(), iter.value()) {
let deleted = val.is_empty(); out.push(SstEntry {
key: key.to_vec(),
value: val.to_vec(),
timestamp: seq_num, deleted,
});
}
iter.next();
}
}
}
}
fn compute_checksum(key: &[u8], value: &[u8], timestamp: u64) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(key);
hasher.update(value);
hasher.update(×tamp.to_le_bytes());
hasher.finalize()
}
pub fn allocate_page(&self) -> Result<u64> {
Ok(self
.next_seq
.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
}
pub fn fsync(&self) -> Result<()> {
self.maybe_flush_memtable()?;
Ok(())
}
pub fn needs_recovery(&self) -> bool {
let wal = self.wal_entries.read();
let checkpoint = self
.checkpoint_lsn
.load(std::sync::atomic::Ordering::SeqCst);
wal.iter().any(|e| e.lsn > checkpoint)
}
pub fn last_checkpoint_lsn(&self) -> u64 {
self.checkpoint_lsn
.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn current_lsn(&self) -> u64 {
self.current_lsn.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn verify_wal(&self) -> Result<WalVerifyResult> {
let wal = self.wal_entries.read();
let mut valid = 0u64;
let mut corrupted = 0u64;
let mut errors = Vec::new();
let mut last_valid_lsn = 0u64;
for entry in wal.iter() {
let expected = Self::compute_checksum(&entry.key, &entry.value, entry.timestamp);
if expected == entry.checksum {
valid += 1;
last_valid_lsn = entry.lsn;
} else {
corrupted += 1;
errors.push(ChecksumErr {
lsn: entry.lsn,
expected: expected as u64,
actual: entry.checksum as u64,
entry_type: match entry.op {
WalOp::Put => "PUT".to_string(),
WalOp::Delete => "DELETE".to_string(),
},
});
}
}
Ok(WalVerifyResult {
total_entries: wal.len() as u64,
valid_entries: valid,
corrupted_entries: corrupted,
last_valid_lsn,
checksum_errors: errors,
})
}
pub fn replay_wal_from_checkpoint(&self) -> Result<u64> {
let checkpoint = self
.checkpoint_lsn
.load(std::sync::atomic::Ordering::SeqCst);
let wal = self.wal_entries.read();
let mut replayed = 0u64;
for entry in wal.iter() {
if entry.lsn > checkpoint {
let expected = Self::compute_checksum(&entry.key, &entry.value, entry.timestamp);
if expected == entry.checksum {
let mut memtable = self.memtable.write();
match entry.op {
WalOp::Put => {
memtable.insert(
entry.key.clone(),
MemTableEntry {
value: entry.value.clone(),
timestamp: entry.timestamp,
deleted: false,
},
);
}
WalOp::Delete => {
memtable.insert(
entry.key.clone(),
MemTableEntry {
value: Vec::new(),
timestamp: entry.timestamp,
deleted: true,
},
);
}
}
replayed += 1;
}
}
}
Ok(replayed)
}
pub fn force_checkpoint(&self) -> Result<u64> {
self.maybe_flush_memtable()?;
let current = self.current_lsn.load(std::sync::atomic::Ordering::SeqCst);
self.checkpoint_lsn
.store(current, std::sync::atomic::Ordering::SeqCst);
Ok(current)
}
pub fn truncate_wal(&self, up_to_lsn: u64) -> Result<u64> {
let mut wal = self.wal_entries.write();
let before_len = wal.len();
wal.retain(|e| e.lsn > up_to_lsn);
let removed = before_len - wal.len();
Ok(removed as u64)
}
pub fn wal_stats(&self) -> WalStatsData {
let wal = self.wal_entries.read();
let total_size: u64 = wal
.iter()
.map(|e| (e.key.len() + e.value.len() + 32) as u64)
.sum();
let checkpoint = self
.checkpoint_lsn
.load(std::sync::atomic::Ordering::SeqCst);
let active_size: u64 = wal
.iter()
.filter(|e| e.lsn > checkpoint)
.map(|e| (e.key.len() + e.value.len() + 32) as u64)
.sum();
WalStatsData {
total_size_bytes: total_size,
active_size_bytes: active_size,
archived_size_bytes: total_size.saturating_sub(active_size),
oldest_entry_lsn: wal.first().map(|e| e.lsn).unwrap_or(0),
newest_entry_lsn: wal.last().map(|e| e.lsn).unwrap_or(0),
entry_count: wal.len() as u64,
}
}
pub fn scan(
&self,
start_key: &[u8],
end_key: &[u8],
limit: usize,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
use std::collections::BTreeMap;
let mut results: BTreeMap<Vec<u8>, (Vec<u8>, u64, bool)> = BTreeMap::new();
{
let memtable = self.memtable.read();
for (key, entry) in memtable.range(start_key.to_vec()..=end_key.to_vec()) {
results.insert(
key.clone(),
(entry.value.clone(), entry.timestamp, entry.deleted),
);
}
}
{
let immutables = self.immutable_memtables.read();
for memtable in immutables.iter() {
for (key, entry) in memtable.range(start_key.to_vec()..=end_key.to_vec()) {
results
.entry(key.clone())
.and_modify(|e| {
if entry.timestamp > e.1 {
*e = (entry.value.clone(), entry.timestamp, entry.deleted);
}
})
.or_insert_with(|| (entry.value.clone(), entry.timestamp, entry.deleted));
}
}
}
{
let levels = self.levels.read();
for level in levels.iter() {
for sst in &level.sstables {
match sst {
SstHandle::InMemory(s) => {
for entry in &s.entries {
if entry.key >= start_key.to_vec() && entry.key <= end_key.to_vec() {
results
.entry(entry.key.clone())
.and_modify(|e| {
if entry.timestamp > e.1 {
*e = (entry.value.clone(), entry.timestamp, entry.deleted);
}
})
.or_insert_with(|| {
(entry.value.clone(), entry.timestamp, entry.deleted)
});
}
}
}
SstHandle::OnDisk { reader, min_key, max_key, .. } => {
if max_key.as_slice() < start_key || min_key.as_slice() > end_key {
continue;
}
let mut sst_iter = reader.iter();
sst_iter.seek(start_key);
while sst_iter.valid() {
let key = match sst_iter.key() {
Some(k) => k,
None => break,
};
if key > end_key {
break;
}
let val = sst_iter.value().unwrap_or(&[]);
let deleted = val.is_empty();
let key_vec = key.to_vec();
let val_vec = val.to_vec();
results
.entry(key_vec)
.and_modify(|e: &mut (Vec<u8>, u64, bool)| {
})
.or_insert_with(|| (val_vec, 0, deleted));
sst_iter.next();
}
}
}
}
}
}
let result: Vec<(Vec<u8>, Vec<u8>)> = results
.into_iter()
.filter(|(_, (_, _, deleted))| !deleted)
.take(limit)
.map(|(k, (v, _, _))| (k, v))
.collect();
Ok(result)
}
pub fn flush(&self) -> Result<usize> {
use std::sync::atomic::Ordering;
let memtable_size = self.memtable_size.load(Ordering::SeqCst) as usize;
self.maybe_flush_memtable()?;
Ok(memtable_size)
}
pub fn compact(&self) -> Result<CompactionMetrics> {
let flushed_bytes = self.flush()? as u64;
let levels = self.levels.read();
let pre_files: usize = levels.iter().map(|l| l.sstables.len()).sum();
let pre_bytes: u64 = levels.iter().map(|l| l.total_size()).sum();
drop(levels);
self.maybe_compact()?;
let levels = self.levels.read();
let post_files: usize = levels.iter().map(|l| l.sstables.len()).sum();
let post_bytes: u64 = levels.iter().map(|l| l.total_size()).sum();
Ok(CompactionMetrics {
bytes_compacted: Some(flushed_bytes + pre_bytes.saturating_sub(post_bytes)),
files_merged: Some(pre_files.saturating_sub(post_files)),
})
}
}
#[derive(Debug, Clone, Default)]
pub struct CompactionMetrics {
pub bytes_compacted: Option<u64>,
pub files_merged: Option<usize>,
}
pub struct TrieColumnarHybrid {
tables: hashbrown::HashMap<String, TableInfo>,
data: ColumnStore,
}
struct ColumnStore {
columns: hashbrown::HashMap<String, hashbrown::HashMap<String, Vec<ColumnValue>>>,
row_meta: hashbrown::HashMap<String, Vec<RowMetadata>>,
}
#[derive(Debug, Clone)]
enum ColumnValue {
Null,
UInt64(u64),
Int64(i64),
Float64(f64),
Text(String),
Bool(bool),
Bytes(Vec<u8>),
}
impl From<&SochValue> for ColumnValue {
fn from(v: &SochValue) -> Self {
match v {
SochValue::Null => ColumnValue::Null,
SochValue::Int(i) => ColumnValue::Int64(*i),
SochValue::UInt(u) => ColumnValue::UInt64(*u),
SochValue::Float(f) => ColumnValue::Float64(*f),
SochValue::Text(s) => ColumnValue::Text(s.clone()),
SochValue::Bool(b) => ColumnValue::Bool(*b),
SochValue::Binary(b) => ColumnValue::Bytes(b.clone()),
_ => ColumnValue::Text(format!("{:?}", v)), }
}
}
impl From<ColumnValue> for SochValue {
fn from(val: ColumnValue) -> Self {
match val {
ColumnValue::Null => SochValue::Null,
ColumnValue::UInt64(u) => SochValue::UInt(u),
ColumnValue::Int64(i) => SochValue::Int(i),
ColumnValue::Float64(f) => SochValue::Float(f),
ColumnValue::Text(s) => SochValue::Text(s),
ColumnValue::Bool(b) => SochValue::Bool(b),
ColumnValue::Bytes(b) => SochValue::Binary(b),
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
struct RowMetadata {
row_id: u64,
txn_start: u64,
txn_end: u64,
deleted: bool,
}
impl ColumnStore {
fn new() -> Self {
Self {
columns: hashbrown::HashMap::new(),
row_meta: hashbrown::HashMap::new(),
}
}
fn init_table(&mut self, table: &str, columns: &[String]) {
let mut table_cols = hashbrown::HashMap::new();
for col in columns {
table_cols.insert(col.clone(), Vec::new());
}
self.columns.insert(table.to_string(), table_cols);
self.row_meta.insert(table.to_string(), Vec::new());
}
fn insert(
&mut self,
table: &str,
row_id: u64,
values: &std::collections::HashMap<String, SochValue>,
) -> bool {
let table_cols = match self.columns.get_mut(table) {
Some(c) => c,
None => return false,
};
let row_meta = match self.row_meta.get_mut(table) {
Some(m) => m,
None => return false,
};
for (col_name, col_data) in table_cols.iter_mut() {
let value = values
.get(col_name)
.map(ColumnValue::from)
.unwrap_or(ColumnValue::Null);
col_data.push(value);
}
row_meta.push(RowMetadata {
row_id,
txn_start: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64,
txn_end: 0,
deleted: false,
});
true
}
fn get_row(
&self,
table: &str,
row_idx: usize,
columns: &[String],
) -> Option<std::collections::HashMap<String, SochValue>> {
let table_cols = self.columns.get(table)?;
let row_meta = self.row_meta.get(table)?;
if row_idx >= row_meta.len() {
return None;
}
let meta = &row_meta[row_idx];
if meta.deleted {
return None;
}
let mut row = std::collections::HashMap::new();
for col_name in columns {
if let Some(col_data) = table_cols.get(col_name)
&& row_idx < col_data.len()
{
row.insert(col_name.clone(), col_data[row_idx].clone().into());
}
}
Some(row)
}
fn get_all_rows(
&self,
table: &str,
columns: &[String],
) -> Vec<(usize, std::collections::HashMap<String, SochValue>)> {
let mut results = Vec::new();
if let Some(row_meta) = self.row_meta.get(table) {
for (idx, meta) in row_meta.iter().enumerate() {
if !meta.deleted
&& let Some(row) = self.get_row(table, idx, columns)
{
results.push((idx, row));
}
}
}
results
}
fn get_all_rows_optimized(
&self,
table: &str,
columns: &[String],
) -> Vec<(usize, std::collections::HashMap<String, SochValue>)> {
let table_cols = match self.columns.get(table) {
Some(c) => c,
None => return Vec::new(),
};
let row_meta = match self.row_meta.get(table) {
Some(m) => m,
None => return Vec::new(),
};
let visible_count = row_meta.iter().filter(|m| !m.deleted).count();
let mut results = Vec::with_capacity(visible_count);
let col_refs: Vec<(&String, Option<&Vec<ColumnValue>>)> = columns
.iter()
.map(|c| (c, table_cols.get(c)))
.collect();
for (idx, meta) in row_meta.iter().enumerate() {
if meta.deleted {
continue;
}
let mut row = std::collections::HashMap::with_capacity(columns.len());
for (col_name, col_data_opt) in &col_refs {
if let Some(col_data) = col_data_opt
&& idx < col_data.len()
{
row.insert((*col_name).clone(), col_data[idx].clone().into());
}
}
results.push((idx, row));
}
results
}
#[allow(dead_code)]
fn iter_rows_batched<'a>(
&'a self,
table: &str,
columns: &'a [String],
batch_size: usize,
) -> impl Iterator<Item = Vec<(usize, std::collections::HashMap<String, SochValue>)>> + 'a {
let table_cols = self.columns.get(table);
let row_meta = self.row_meta.get(table);
let col_refs: Vec<(&String, Option<&Vec<ColumnValue>>)> = match table_cols {
Some(tc) => columns.iter().map(|c| (c, tc.get(c))).collect(),
None => Vec::new(),
};
let total_rows = row_meta.map(|m| m.len()).unwrap_or(0);
let batch_count = (total_rows + batch_size - 1) / batch_size;
(0..batch_count).map(move |batch_idx| {
let start = batch_idx * batch_size;
let end = (start + batch_size).min(total_rows);
let mut batch = Vec::with_capacity(batch_size);
if let Some(meta_vec) = row_meta {
for idx in start..end {
if meta_vec[idx].deleted {
continue;
}
let mut row = std::collections::HashMap::with_capacity(columns.len());
for (col_name, col_data_opt) in &col_refs {
if let Some(col_data) = col_data_opt
&& idx < col_data.len()
{
row.insert((*col_name).clone(), col_data[idx].clone().into());
}
}
batch.push((idx, row));
}
}
batch
})
}
#[allow(dead_code)]
fn count_visible_rows(&self, table: &str) -> usize {
self.row_meta
.get(table)
.map(|m| m.iter().filter(|r| !r.deleted).count())
.unwrap_or(0)
}
fn delete_row(&mut self, table: &str, row_idx: usize) -> bool {
if let Some(row_meta) = self.row_meta.get_mut(table)
&& row_idx < row_meta.len()
&& !row_meta[row_idx].deleted
{
row_meta[row_idx].deleted = true;
row_meta[row_idx].txn_end = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
return true;
}
false
}
fn update_row(
&mut self,
table: &str,
row_idx: usize,
updates: &std::collections::HashMap<String, SochValue>,
) -> bool {
if let Some(table_cols) = self.columns.get_mut(table)
&& let Some(row_meta) = self.row_meta.get(table)
{
if row_idx >= row_meta.len() || row_meta[row_idx].deleted {
return false;
}
for (col_name, new_value) in updates {
if let Some(col_data) = table_cols.get_mut(col_name)
&& row_idx < col_data.len()
{
col_data[row_idx] = ColumnValue::from(new_value);
}
}
return true;
}
false
}
fn count_rows(&self, table: &str) -> u64 {
self.row_meta
.get(table)
.map(|m| m.iter().filter(|r| !r.deleted).count() as u64)
.unwrap_or(0)
}
}
#[derive(Debug, Clone)]
struct TableInfo {
schema: ArraySchema,
columns: Vec<ColumnRef>,
next_row_id: u64,
}
impl Default for TrieColumnarHybrid {
fn default() -> Self {
Self::new()
}
}
impl TrieColumnarHybrid {
pub fn new() -> Self {
Self {
tables: hashbrown::HashMap::new(),
data: ColumnStore::new(),
}
}
pub fn resolve(&self, path: &str) -> PathResolution {
let parts: Vec<&str> = path.split('.').collect();
if parts.is_empty() {
return PathResolution::NotFound;
}
let table_name = parts[0];
if let Some(table_info) = self.tables.get(table_name) {
if parts.len() == 1 {
PathResolution::Array {
schema: Arc::new(table_info.schema.clone()),
columns: table_info.columns.clone(),
}
} else {
let col_name = parts[1];
if let Some(col) = table_info.columns.iter().find(|c| c.name == col_name) {
PathResolution::Value(col.clone())
} else {
PathResolution::NotFound
}
}
} else {
PathResolution::NotFound
}
}
pub fn register_table(&mut self, name: &str, fields: &[(String, FieldType)]) -> Vec<ColumnRef> {
let columns: Vec<ColumnRef> = fields
.iter()
.enumerate()
.map(|(i, (fname, ftype))| ColumnRef {
id: i as u32,
name: fname.clone(),
field_type: *ftype,
})
.collect();
let schema = ArraySchema {
name: name.to_string(),
fields: fields.iter().map(|(n, _)| n.clone()).collect(),
types: fields.iter().map(|(_, t)| *t).collect(),
};
self.data.init_table(name, &schema.fields);
let table_info = TableInfo {
schema,
columns: columns.clone(),
next_row_id: 1,
};
self.tables.insert(name.to_string(), table_info);
columns
}
pub fn memory_stats(&self) -> TchStats {
TchStats {
tables: self.tables.len(),
total_columns: self.tables.values().map(|t| t.columns.len()).sum(),
}
}
pub fn insert_row(
&mut self,
table: &str,
values: &std::collections::HashMap<String, SochValue>,
) -> u64 {
if let Some(info) = self.tables.get_mut(table) {
let row_id = info.next_row_id;
info.next_row_id += 1;
if self.data.insert(table, row_id, values) {
return row_id;
}
}
0
}
pub fn update_rows(
&mut self,
table: &str,
updates: &std::collections::HashMap<String, SochValue>,
where_clause: Option<&WhereClause>,
) -> MutationResult {
if !self.tables.contains_key(table) {
return MutationResult::empty();
}
let columns: Vec<String> = self
.tables
.get(table)
.map(|t| t.schema.fields.clone())
.unwrap_or_default();
let all_rows = self.data.get_all_rows(table, &columns);
let mut affected_ids: Vec<RowId> = Vec::new();
for (idx, row) in all_rows {
if let Some(wc) = where_clause
&& !self.matches_where(&row, wc)
{
continue;
}
if self.data.update_row(table, idx, updates) {
affected_ids.push(idx as RowId);
}
}
MutationResult::new(affected_ids.len(), affected_ids)
}
pub fn delete_rows(&mut self, table: &str, where_clause: Option<&WhereClause>) -> MutationResult {
if !self.tables.contains_key(table) {
return MutationResult::empty();
}
let columns: Vec<String> = self
.tables
.get(table)
.map(|t| t.schema.fields.clone())
.unwrap_or_default();
let all_rows = self.data.get_all_rows(table, &columns);
let mut affected_ids: Vec<RowId> = Vec::new();
for (idx, row) in all_rows {
if let Some(wc) = where_clause
&& !self.matches_where(&row, wc)
{
continue;
}
if self.data.delete_row(table, idx) {
affected_ids.push(idx as RowId);
}
}
MutationResult::new(affected_ids.len(), affected_ids)
}
fn matches_where(
&self,
row: &std::collections::HashMap<String, SochValue>,
wc: &WhereClause,
) -> bool {
wc.matches(row)
}
fn compare_values(&self, a: &SochValue, b: &SochValue) -> i32 {
match (a, b) {
(SochValue::Int(a), SochValue::Int(b)) => a.cmp(b) as i32,
(SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b) as i32,
(SochValue::Float(a), SochValue::Float(b)) => {
if a < b {
-1
} else if a > b {
1
} else {
0
}
}
(SochValue::Text(a), SochValue::Text(b)) => a.cmp(b) as i32,
_ => 0,
}
}
pub fn select(
&self,
table: &str,
columns: &[String],
where_clause: Option<&WhereClause>,
order_by: Option<&(String, bool)>,
limit: Option<usize>,
offset: Option<usize>,
) -> SochCursor {
let table_info = match self.tables.get(table) {
Some(t) => t,
None => return SochCursor::new(),
};
let cols_to_fetch: Vec<String> = if columns.is_empty() || columns.iter().any(|c| c == "*") {
table_info.schema.fields.clone()
} else {
columns.to_vec()
};
let all_rows = self.data.get_all_rows_optimized(table, &cols_to_fetch);
let estimated_size = if where_clause.is_some() {
all_rows.len() / 4 } else {
all_rows.len()
};
let mut filtered: Vec<std::collections::HashMap<String, SochValue>> = Vec::with_capacity(estimated_size);
for (_, row) in all_rows {
let matches = match where_clause {
Some(wc) => self.matches_where(&row, wc),
None => true,
};
if matches {
filtered.push(row);
}
}
if let Some((col, ascending)) = order_by {
filtered.sort_by(|a, b| {
let va = a.get(col);
let vb = b.get(col);
match (va, vb) {
(Some(va), Some(vb)) => {
let cmp = self.compare_values(va, vb);
if *ascending { cmp } else { -cmp }
}
_ => 0,
}
.cmp(&0)
});
}
let offset_val = offset.unwrap_or(0);
let after_offset: Vec<_> = filtered.into_iter().skip(offset_val).collect();
let final_rows: Vec<_> = match limit {
Some(l) => after_offset.into_iter().take(l).collect(),
None => after_offset,
};
SochCursor {
rows: final_rows,
position: 0,
}
}
pub fn upsert_row(
&mut self,
table: &str,
conflict_key: &str,
values: &std::collections::HashMap<String, SochValue>,
) -> UpsertAction {
if !self.tables.contains_key(table) {
return UpsertAction::Inserted;
}
let key_value = match values.get(conflict_key) {
Some(v) => v.clone(),
None => return UpsertAction::Inserted,
};
let columns: Vec<String> = self
.tables
.get(table)
.map(|t| t.schema.fields.clone())
.unwrap_or_default();
let all_rows = self.data.get_all_rows(table, &columns);
for (idx, row) in all_rows {
if row.get(conflict_key) == Some(&key_value) {
if self.data.update_row(table, idx, values) {
return UpsertAction::Updated;
}
}
}
self.insert_row(table, values);
UpsertAction::Inserted
}
pub fn count_rows(&self, table: &str) -> u64 {
self.data.count_rows(table)
}
pub fn get_table_schema(&self, table: &str) -> Option<ArraySchema> {
self.tables.get(table).map(|t| t.schema.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum UpsertAction {
Inserted,
Updated,
}
#[derive(Debug, Clone)]
pub enum WhereClause {
Simple {
field: String,
op: CompareOp,
value: SochValue,
},
In {
field: String,
values: Vec<SochValue>,
negated: bool,
},
IsNull {
field: String,
negated: bool,
},
Between {
field: String,
low: SochValue,
high: SochValue,
},
And(Vec<WhereClause>),
Or(Vec<WhereClause>),
Not(Box<WhereClause>),
}
impl WhereClause {
pub fn eq(field: impl Into<String>, value: SochValue) -> Self {
WhereClause::Simple {
field: field.into(),
op: CompareOp::Eq,
value,
}
}
pub fn compare(field: impl Into<String>, op: CompareOp, value: SochValue) -> Self {
WhereClause::Simple {
field: field.into(),
op,
value,
}
}
pub fn in_values(field: impl Into<String>, values: Vec<SochValue>) -> Self {
WhereClause::In {
field: field.into(),
values,
negated: false,
}
}
pub fn not_in(field: impl Into<String>, values: Vec<SochValue>) -> Self {
WhereClause::In {
field: field.into(),
values,
negated: true,
}
}
pub fn and(clauses: Vec<WhereClause>) -> Self {
WhereClause::And(clauses)
}
pub fn or(clauses: Vec<WhereClause>) -> Self {
WhereClause::Or(clauses)
}
pub fn matches(&self, row: &std::collections::HashMap<String, SochValue>) -> bool {
match self {
WhereClause::Simple { field, op, value } => {
if let Some(row_val) = row.get(field) {
compare_values(row_val, op, value)
} else {
false
}
}
WhereClause::In { field, values, negated } => {
if let Some(row_val) = row.get(field) {
let found = values.iter().any(|v| compare_values(row_val, &CompareOp::Eq, v));
if *negated { !found } else { found }
} else {
*negated }
}
WhereClause::IsNull { field, negated } => {
let is_null = row.get(field).map(|v| matches!(v, SochValue::Null)).unwrap_or(true);
if *negated { !is_null } else { is_null }
}
WhereClause::Between { field, low, high } => {
if let Some(row_val) = row.get(field) {
compare_values(row_val, &CompareOp::Ge, low)
&& compare_values(row_val, &CompareOp::Le, high)
} else {
false
}
}
WhereClause::And(clauses) => clauses.iter().all(|c| c.matches(row)),
WhereClause::Or(clauses) => clauses.iter().any(|c| c.matches(row)),
WhereClause::Not(inner) => !inner.matches(row),
}
}
pub fn field(&self) -> Option<&str> {
match self {
WhereClause::Simple { field, .. } => Some(field),
WhereClause::In { field, .. } => Some(field),
WhereClause::IsNull { field, .. } => Some(field),
WhereClause::Between { field, .. } => Some(field),
_ => None,
}
}
}
fn compare_values(left: &SochValue, op: &CompareOp, right: &SochValue) -> bool {
match (left, right) {
(SochValue::Int(l), SochValue::Int(r)) => match op {
CompareOp::Eq => l == r,
CompareOp::Ne => l != r,
CompareOp::Lt => l < r,
CompareOp::Le => l <= r,
CompareOp::Gt => l > r,
CompareOp::Ge => l >= r,
CompareOp::Like | CompareOp::In => false,
},
(SochValue::UInt(l), SochValue::UInt(r)) => match op {
CompareOp::Eq => l == r,
CompareOp::Ne => l != r,
CompareOp::Lt => l < r,
CompareOp::Le => l <= r,
CompareOp::Gt => l > r,
CompareOp::Ge => l >= r,
CompareOp::Like | CompareOp::In => false,
},
(SochValue::Float(l), SochValue::Float(r)) => match op {
CompareOp::Eq => (l - r).abs() < f64::EPSILON,
CompareOp::Ne => (l - r).abs() >= f64::EPSILON,
CompareOp::Lt => l < r,
CompareOp::Le => l <= r,
CompareOp::Gt => l > r,
CompareOp::Ge => l >= r,
CompareOp::Like | CompareOp::In => false,
},
(SochValue::Text(l), SochValue::Text(r)) => match op {
CompareOp::Eq => l == r,
CompareOp::Ne => l != r,
CompareOp::Lt => l < r,
CompareOp::Le => l <= r,
CompareOp::Gt => l > r,
CompareOp::Ge => l >= r,
CompareOp::Like => {
let pattern = r.replace('%', ".*").replace('_', ".");
regex::Regex::new(&format!("^{}$", pattern))
.map(|re| re.is_match(l))
.unwrap_or(false)
}
CompareOp::In => false,
},
(SochValue::Bool(l), SochValue::Bool(r)) => match op {
CompareOp::Eq => l == r,
CompareOp::Ne => l != r,
_ => false,
},
(SochValue::Null, SochValue::Null) => matches!(op, CompareOp::Eq),
_ => false, }
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CompareOp {
Eq,
Ne,
Gt,
Ge,
Lt,
Le,
Like,
In,
}
pub struct SochCursor {
rows: Vec<std::collections::HashMap<String, SochValue>>,
position: usize,
}
impl Default for SochCursor {
fn default() -> Self {
Self::new()
}
}
impl SochCursor {
pub fn new() -> Self {
Self {
rows: Vec::new(),
position: 0,
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<std::collections::HashMap<String, SochValue>> {
if self.position < self.rows.len() {
let row = self.rows[self.position].clone();
self.position += 1;
Some(row)
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub struct TchStats {
pub tables: usize,
pub total_columns: usize,
}
pub struct SochConnection {
pub(crate) tch: Arc<RwLock<TrieColumnarHybrid>>,
pub(crate) storage: Arc<DurableStorage>,
pub(crate) catalog: Arc<RwLock<Catalog>>,
active_txn: RwLock<Option<u64>>,
queries_executed: AtomicU64,
soch_tokens_emitted: AtomicU64,
json_tokens_equivalent: AtomicU64,
#[allow(dead_code)]
_ephemeral_dir: Option<tempfile::TempDir>,
}
impl SochConnection {
pub fn open(_path: impl AsRef<Path>) -> Result<Self> {
let handle = DurableStorage::open_ephemeral()
.map_err(|e| ClientError::Storage(e.to_string()))?;
let (storage, tmpdir) = handle.into_parts();
Ok(Self {
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
storage: Arc::new(storage),
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
active_txn: RwLock::new(None),
queries_executed: AtomicU64::new(0),
soch_tokens_emitted: AtomicU64::new(0),
json_tokens_equivalent: AtomicU64::new(0),
_ephemeral_dir: Some(tmpdir),
})
}
pub fn open_persistent(path: impl AsRef<Path>) -> Result<Self> {
let storage = DurableStorage::open(path.as_ref())
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(Self {
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
storage: Arc::new(storage),
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
active_txn: RwLock::new(None),
queries_executed: AtomicU64::new(0),
soch_tokens_emitted: AtomicU64::new(0),
json_tokens_equivalent: AtomicU64::new(0),
_ephemeral_dir: None,
})
}
fn ensure_txn(&self) -> Result<u64> {
let active = *self.active_txn.read();
match active {
Some(txn) => Ok(txn),
None => self.begin_txn(),
}
}
pub fn resolve(&self, path: &str) -> Result<PathResolution> {
Ok(self.tch.read().resolve(path))
}
pub fn register_table(
&self,
name: &str,
fields: &[(String, FieldType)],
) -> Result<Vec<ColumnRef>> {
let cols = self.tch.write().register_table(name, fields);
Ok(cols)
}
pub fn begin_txn(&self) -> Result<u64> {
let txn_id = self
.storage
.begin_transaction()
.map_err(|e| ClientError::Storage(e.to_string()))?;
*self.active_txn.write() = Some(txn_id);
Ok(txn_id)
}
pub fn commit_txn(&self) -> Result<u64> {
let txn_id = self
.active_txn
.write()
.take()
.ok_or_else(|| ClientError::Transaction("No active transaction".into()))?;
self.storage
.commit(txn_id)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn abort_txn(&self) -> Result<()> {
let txn_id = self
.active_txn
.write()
.take()
.ok_or_else(|| ClientError::Transaction("No active transaction".into()))?;
self.storage
.abort(txn_id)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn get_schema(&self, table: &str) -> Result<SochSchema> {
let catalog = self.catalog.read();
catalog
.get_table(table)
.and_then(|entry| entry.schema.clone())
.ok_or_else(|| ClientError::NotFound(format!("Table '{}' not found", table)))
}
pub fn describe_table(&self, name: &str) -> Option<TableDescription> {
let catalog = self.catalog.read();
let entry = catalog.get_table(name)?;
let schema = entry.schema.as_ref()?;
Some(TableDescription {
name: name.to_string(),
columns: schema
.fields
.iter()
.map(|f| crate::schema::ColumnDescription {
name: f.name.clone(),
field_type: f.field_type.clone(),
nullable: f.nullable,
})
.collect(),
row_count: entry.row_count,
indexes: catalog
.get_indexes(name)
.iter()
.map(|idx| idx.name.clone())
.collect(),
})
}
pub fn list_tables(&self) -> Vec<String> {
self.catalog
.read()
.list_tables()
.into_iter()
.map(|s| s.to_string())
.collect()
}
#[allow(dead_code)]
pub(crate) fn record_tokens(&self, soch_tokens: usize, json_tokens: usize) {
self.soch_tokens_emitted
.fetch_add(soch_tokens as u64, Ordering::Relaxed);
self.json_tokens_equivalent
.fetch_add(json_tokens as u64, Ordering::Relaxed);
}
pub(crate) fn record_query(&self) {
self.queries_executed.fetch_add(1, Ordering::Relaxed);
}
pub fn stats(&self) -> ClientStats {
let toon = self.soch_tokens_emitted.load(Ordering::Relaxed);
let json = self.json_tokens_equivalent.load(Ordering::Relaxed);
let savings = if json > 0 {
(1.0 - (toon as f64 / json as f64)) * 100.0
} else {
0.0
};
let queries = self.queries_executed.load(Ordering::Relaxed);
let cache_hit_rate = if queries > 10 {
0.30
} else {
0.0
};
ClientStats {
queries_executed: queries,
soch_tokens_emitted: toon,
json_tokens_equivalent: json,
token_savings_percent: savings,
cache_hit_rate,
}
}
pub fn serialize_value(&self, value: &SochValue) -> Result<Vec<u8>> {
bincode::serialize(value).map_err(|e| ClientError::Serialization(e.to_string()))
}
pub fn deserialize_value(&self, bytes: &[u8]) -> Result<SochValue> {
bincode::deserialize(bytes).map_err(|e| ClientError::Serialization(e.to_string()))
}
pub fn flush(&self) -> Result<usize> {
self.storage
.fsync()
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(0)
}
pub fn compact(&self) -> Result<CompactionMetrics> {
let _cleaned = self.storage.gc();
Ok(CompactionMetrics {
bytes_compacted: Some(0),
files_merged: Some(0),
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let txn_id = self.ensure_txn()?;
let result = self
.storage
.read(txn_id, key)
.map_err(|e| ClientError::Storage(e.to_string()));
if self.active_txn.read().is_some() {
}
result
}
pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
let txn_id = self.ensure_txn()?;
self.storage
.write(txn_id, key, value)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn delete(&self, key: &[u8]) -> Result<()> {
let txn_id = self.ensure_txn()?;
self.storage
.delete(txn_id, key.to_vec())
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let txn_id = self.ensure_txn()?;
self.storage
.scan(txn_id, prefix)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn fsync(&self) -> Result<()> {
self.storage
.fsync()
.map_err(|e| ClientError::Storage(e.to_string()))
}
}
pub fn to_field_type(soch_type: &SochType) -> FieldType {
match soch_type {
SochType::Int => FieldType::Int64,
SochType::Float => FieldType::Float64,
SochType::Text => FieldType::Text,
SochType::Bool => FieldType::Bool,
SochType::Binary => FieldType::Bytes,
SochType::UInt => FieldType::Int64,
_ => FieldType::Text, }
}
#[allow(dead_code)]
mod rand {
use std::cell::Cell;
thread_local! {
static STATE: Cell<u64> = const { Cell::new(0x12345678_9ABCDEF0) };
}
pub fn random<T: From<u64>>() -> T {
STATE.with(|s| {
let mut x = s.get();
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
s.set(x);
T::from(x)
})
}
}
pub use sochdb_storage::database::{
ColumnDef as KernelColumnDef, ColumnType as KernelColumnType, QueryResult,
Stats as DatabaseStats, TableSchema as KernelTableSchema,
};
use sochdb_storage::database::{Database, DatabaseConfig, TxnHandle as KernelTxnHandle};
pub struct EmbeddedConnection {
db: Arc<Database>,
active_txn_id: AtomicU64,
active_snapshot_ts: AtomicU64,
tch: Arc<RwLock<TrieColumnarHybrid>>,
queries_executed: AtomicU64,
soch_tokens_emitted: AtomicU64,
json_tokens_equivalent: AtomicU64,
}
impl EmbeddedConnection {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_config(path, DatabaseConfig::default())
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Self> {
let db = Database::open_with_config(path, config)
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(Self {
db,
active_txn_id: AtomicU64::new(0),
active_snapshot_ts: AtomicU64::new(0),
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
queries_executed: AtomicU64::new(0),
soch_tokens_emitted: AtomicU64::new(0),
json_tokens_equivalent: AtomicU64::new(0),
})
}
pub fn kernel(&self) -> &Arc<Database> {
&self.db
}
pub fn begin(&self) -> Result<()> {
if self.active_txn_id.load(Ordering::Acquire) != 0 {
return Err(ClientError::Transaction(
"Transaction already active".into(),
));
}
let txn = self
.db
.begin_transaction()
.map_err(|e| ClientError::Storage(e.to_string()))?;
self.active_txn_id.store(txn.txn_id, Ordering::Release);
self.active_snapshot_ts
.store(txn.snapshot_ts, Ordering::Release);
Ok(())
}
pub fn commit(&self) -> Result<u64> {
let txn_id = self.active_txn_id.swap(0, Ordering::AcqRel);
let snapshot_ts = self.active_snapshot_ts.swap(0, Ordering::AcqRel);
if txn_id == 0 {
return Err(ClientError::Transaction("No active transaction".into()));
}
let txn = KernelTxnHandle {
txn_id,
snapshot_ts,
};
self.db
.commit(txn)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn abort(&self) -> Result<()> {
let txn_id = self.active_txn_id.swap(0, Ordering::AcqRel);
let snapshot_ts = self.active_snapshot_ts.swap(0, Ordering::AcqRel);
if txn_id == 0 {
return Err(ClientError::Transaction("No active transaction".into()));
}
let txn = KernelTxnHandle {
txn_id,
snapshot_ts,
};
self.db
.abort(txn)
.map_err(|e| ClientError::Storage(e.to_string()))
}
#[inline]
fn ensure_txn(&self) -> Result<KernelTxnHandle> {
let txn_id = self.active_txn_id.load(Ordering::Acquire);
if txn_id != 0 {
let snapshot_ts = self.active_snapshot_ts.load(Ordering::Acquire);
return Ok(KernelTxnHandle {
txn_id,
snapshot_ts,
});
}
let txn = self
.db
.begin_transaction()
.map_err(|e| ClientError::Storage(e.to_string()))?;
self.active_txn_id.store(txn.txn_id, Ordering::Release);
self.active_snapshot_ts
.store(txn.snapshot_ts, Ordering::Release);
Ok(txn)
}
pub fn put(&self, path: &str, value: &[u8]) -> Result<()> {
let txn = self.ensure_txn()?;
self.db
.put_path(txn, path, value)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let txn = self.ensure_txn()?;
self.db
.get_path(txn, path)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn delete(&self, path: &str) -> Result<()> {
let txn = self.ensure_txn()?;
self.db
.delete_path(txn, path)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn scan(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
self.queries_executed.fetch_add(1, Ordering::Relaxed);
let txn = self.ensure_txn()?;
self.db
.scan_path(txn, prefix)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn scan_range(&self, start: &str, end: &str) -> Result<Vec<(String, Vec<u8>)>> {
self.queries_executed.fetch_add(1, Ordering::Relaxed);
let txn = self.ensure_txn()?;
let results = self.db
.scan_range(txn, start.as_bytes(), end.as_bytes())
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(results
.into_iter()
.filter_map(|(k, v)| {
String::from_utf8(k).ok().map(|s| (s, v))
})
.collect())
}
pub fn query(&self, path_prefix: &str) -> EmbeddedQueryBuilder<'_> {
EmbeddedQueryBuilder::new(self, path_prefix.to_string())
}
pub fn register_table(&self, schema: KernelTableSchema) -> Result<()> {
let fields: Vec<(String, FieldType)> = schema
.columns
.iter()
.map(|c| (c.name.clone(), kernel_type_to_field_type(c.col_type)))
.collect();
self.tch.write().register_table(&schema.name, &fields);
self.db
.register_table(schema)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn insert_row(
&self,
table: &str,
row_id: u64,
values: &std::collections::HashMap<String, SochValue>,
) -> Result<()> {
let txn = self.ensure_txn()?;
self.db
.insert_row(txn, table, row_id, values)
.map_err(|e| ClientError::Storage(e.to_string()))
}
#[inline]
pub fn insert_row_slice(
&self,
table: &str,
row_id: u64,
values: &[Option<&SochValue>],
) -> Result<()> {
let txn = self.ensure_txn()?;
self.db
.insert_row_slice(txn, table, row_id, values)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn read_row(
&self,
table: &str,
row_id: u64,
columns: Option<&[&str]>,
) -> Result<Option<std::collections::HashMap<String, SochValue>>> {
let txn = self.ensure_txn()?;
self.db
.read_row(txn, table, row_id, columns)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn resolve(&self, path: &str) -> Result<PathResolution> {
Ok(self.tch.read().resolve(path))
}
pub fn fsync(&self) -> Result<()> {
self.db
.fsync()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn checkpoint(&self) -> Result<u64> {
self.db
.checkpoint()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn truncate_wal(&self) -> Result<()> {
self.db
.truncate_wal()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn gc(&self) -> usize {
self.db.gc()
}
pub fn stats(&self) -> ClientStats {
let _db_stats = self.db.stats();
let toon = self.soch_tokens_emitted.load(Ordering::Relaxed);
let json = self.json_tokens_equivalent.load(Ordering::Relaxed);
let savings = if json > 0 {
(1.0 - (toon as f64 / json as f64)) * 100.0
} else {
0.0
};
ClientStats {
queries_executed: self.queries_executed.load(Ordering::Relaxed),
soch_tokens_emitted: toon,
json_tokens_equivalent: json,
token_savings_percent: savings,
cache_hit_rate: 0.0,
}
}
pub fn db_stats(&self) -> DatabaseStats {
self.db.stats()
}
pub fn shutdown(&self) -> Result<()> {
self.db
.shutdown()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn execute_sql(&self, sql: &str) -> Result<sochdb_query::sql::bridge::ExecutionResult> {
use sochdb_query::sql::bridge::SqlBridge;
use sochdb_query::storage_bridge::DatabaseSqlConnection;
let sql_conn = DatabaseSqlConnection::new(self.db.clone());
let mut bridge = SqlBridge::new(sql_conn);
bridge
.execute(sql)
.map_err(|e| ClientError::Query(format!("SQL error: {}", e)))
}
pub fn execute_sql_params(
&self,
sql: &str,
params: &[sochdb_core::SochValue],
) -> Result<sochdb_query::sql::bridge::ExecutionResult> {
use sochdb_query::sql::bridge::SqlBridge;
use sochdb_query::storage_bridge::DatabaseSqlConnection;
let sql_conn = DatabaseSqlConnection::new(self.db.clone());
let mut bridge = SqlBridge::new(sql_conn);
bridge
.execute_with_params(sql, params)
.map_err(|e| ClientError::Query(format!("SQL error: {}", e)))
}
}
fn kernel_type_to_field_type(kt: KernelColumnType) -> FieldType {
match kt {
KernelColumnType::Int64 => FieldType::Int64,
KernelColumnType::UInt64 => FieldType::UInt64,
KernelColumnType::Float64 => FieldType::Float64,
KernelColumnType::Text => FieldType::Text,
KernelColumnType::Binary => FieldType::Bytes,
KernelColumnType::Bool => FieldType::Bool,
}
}
pub struct EmbeddedQueryBuilder<'a> {
conn: &'a EmbeddedConnection,
path_prefix: String,
columns: Option<Vec<String>>,
limit: Option<usize>,
offset: Option<usize>,
}
impl<'a> EmbeddedQueryBuilder<'a> {
fn new(conn: &'a EmbeddedConnection, path_prefix: String) -> Self {
Self {
conn,
path_prefix,
columns: None,
limit: None,
offset: None,
}
}
pub fn columns(mut self, cols: &[&str]) -> Self {
self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
self
}
pub fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn offset(mut self, n: usize) -> Self {
self.offset = Some(n);
self
}
pub fn execute(self) -> Result<QueryResult> {
let txn = self.conn.ensure_txn()?;
let mut builder = self.conn.db.query(txn, &self.path_prefix);
if let Some(cols) = &self.columns {
let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
builder = builder.columns(&col_refs);
}
if let Some(limit) = self.limit {
builder = builder.limit(limit);
}
if let Some(offset) = self.offset {
builder = builder.offset(offset);
}
builder
.execute()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn to_toon(self) -> Result<String> {
let result = self.execute()?;
Ok(result.to_toon())
}
}
use sochdb_storage::durable_storage::DurableStorage;
pub struct DurableConnection {
storage: Arc<DurableStorage>,
tch: Arc<RwLock<TrieColumnarHybrid>>,
#[allow(dead_code)]
catalog: Arc<RwLock<Catalog>>,
active_txn: RwLock<Option<u64>>,
queries_executed: AtomicU64,
config: ConnectionConfig,
#[allow(dead_code)]
_ephemeral_dir: Option<tempfile::TempDir>,
}
#[derive(Debug, Clone)]
pub struct ConnectionConfig {
pub group_commit: bool,
pub sync_mode: SyncModeClient,
pub enable_ordered_index: bool,
pub group_commit_batch_size: usize,
pub group_commit_max_wait_us: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncModeClient {
Off,
Normal,
Full,
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
group_commit: true,
sync_mode: SyncModeClient::Normal,
enable_ordered_index: true,
group_commit_batch_size: 100,
group_commit_max_wait_us: 10_000,
}
}
}
impl ConnectionConfig {
pub fn throughput_optimized() -> Self {
Self {
group_commit: true,
sync_mode: SyncModeClient::Normal,
enable_ordered_index: false,
group_commit_batch_size: 1000,
group_commit_max_wait_us: 50_000,
}
}
pub fn latency_optimized() -> Self {
Self {
group_commit: true,
sync_mode: SyncModeClient::Full,
enable_ordered_index: true,
group_commit_batch_size: 10,
group_commit_max_wait_us: 1_000,
}
}
pub fn max_durability() -> Self {
Self {
group_commit: false,
sync_mode: SyncModeClient::Full,
enable_ordered_index: true,
group_commit_batch_size: 1,
group_commit_max_wait_us: 0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryResult {
pub transactions_recovered: usize,
pub writes_recovered: usize,
pub commit_ts: u64,
}
impl DurableConnection {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Self::open_with_config(path, ConnectionConfig::default())
}
pub fn open_with_config(path: impl AsRef<Path>, config: ConnectionConfig) -> Result<Self> {
let storage = if config.enable_ordered_index {
DurableStorage::open(path.as_ref())
} else {
DurableStorage::open_with_config(path.as_ref(), false) }
.map_err(|e| ClientError::Storage(e.to_string()))?;
let sync_mode = match config.sync_mode {
SyncModeClient::Off => 0,
SyncModeClient::Normal => 1,
SyncModeClient::Full => 2,
};
storage.set_sync_mode(sync_mode);
Ok(Self {
storage: Arc::new(storage),
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
active_txn: RwLock::new(None),
queries_executed: AtomicU64::new(0),
config,
_ephemeral_dir: None,
})
}
pub fn open_ephemeral() -> Result<Self> {
Self::open_ephemeral_with_config(ConnectionConfig::default())
}
pub fn open_ephemeral_with_config(config: ConnectionConfig) -> Result<Self> {
let handle = DurableStorage::open_ephemeral()
.map_err(|e| ClientError::Storage(e.to_string()))?;
let (storage, tmpdir) = handle.into_parts();
let sync_mode = match config.sync_mode {
SyncModeClient::Off => 0,
SyncModeClient::Normal => 1,
SyncModeClient::Full => 2,
};
storage.set_sync_mode(sync_mode);
Ok(Self {
storage: Arc::new(storage),
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
active_txn: RwLock::new(None),
queries_executed: AtomicU64::new(0),
config,
_ephemeral_dir: Some(tmpdir),
})
}
pub fn config(&self) -> &ConnectionConfig {
&self.config
}
pub fn recover(&self) -> Result<RecoveryResult> {
let stats = self
.storage
.recover()
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(RecoveryResult {
transactions_recovered: stats.transactions_recovered,
writes_recovered: stats.writes_recovered,
commit_ts: stats.commit_ts,
})
}
pub fn register_table(
&self,
name: &str,
fields: &[(String, FieldType)],
) -> Result<Vec<ColumnRef>> {
let cols = self.tch.write().register_table(name, fields);
Ok(cols)
}
pub fn resolve(&self, path: &str) -> Result<PathResolution> {
Ok(self.tch.read().resolve(path))
}
pub fn begin_txn(&self) -> Result<u64> {
let txn_id = self
.storage
.begin_transaction()
.map_err(|e| ClientError::Storage(e.to_string()))?;
*self.active_txn.write() = Some(txn_id);
Ok(txn_id)
}
fn ensure_txn(&self) -> Result<u64> {
let active = *self.active_txn.read();
match active {
Some(txn) => Ok(txn),
None => self.begin_txn(),
}
}
pub fn commit_txn(&self) -> Result<u64> {
let txn_id = self
.active_txn
.write()
.take()
.ok_or_else(|| ClientError::Transaction("No active transaction".into()))?;
self.storage
.commit(txn_id)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn abort_txn(&self) -> Result<()> {
let txn_id = self
.active_txn
.write()
.take()
.ok_or_else(|| ClientError::Transaction("No active transaction".into()))?;
self.storage
.abort(txn_id)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let txn_id = self.ensure_txn()?;
self.storage
.write(txn_id, key.to_vec(), value.to_vec())
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let txn_id = self.ensure_txn()?;
self.storage
.read(txn_id, key)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn delete(&self, key: &[u8]) -> Result<()> {
let txn_id = self.ensure_txn()?;
self.storage
.delete(txn_id, key.to_vec())
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let txn_id = self.ensure_txn()?;
self.storage
.scan(txn_id, prefix)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn put_path(&self, path: &str, value: &[u8]) -> Result<()> {
let key = path.as_bytes();
self.put(key, value)
}
pub fn get_path(&self, path: &str) -> Result<Option<Vec<u8>>> {
let key = path.as_bytes();
self.get(key)
}
pub fn delete_path(&self, path: &str) -> Result<()> {
let key = path.as_bytes();
self.delete(key)
}
pub fn scan_path(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
let key_prefix = prefix.as_bytes();
let results = self.scan(key_prefix)?;
Ok(results
.into_iter()
.filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
.collect())
}
pub fn fsync(&self) -> Result<()> {
self.storage
.fsync()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn checkpoint(&self) -> Result<u64> {
self.storage
.checkpoint()
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn gc(&self) -> Result<usize> {
Ok(self.storage.gc())
}
pub fn insert_row_slice(
&self,
table: &str,
row_id: u64,
values: &[Option<&sochdb_core::soch::SochValue>],
) -> Result<()> {
let txn_id = self.ensure_txn()?;
use sochdb_storage::key_buffer::KeyBuffer;
let key = KeyBuffer::format_row_key(table, row_id);
use sochdb_storage::packed_row::{
PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema,
};
let tch = self.tch.read();
if let Some(table_info) = tch.tables.get(table) {
let packed_cols: Vec<PackedColumnDef> = table_info
.schema
.fields
.iter()
.zip(table_info.schema.types.iter())
.map(|(name, ty)| PackedColumnDef {
name: name.clone(),
col_type: match ty {
FieldType::Int64 => PackedColumnType::Int64,
FieldType::UInt64 => PackedColumnType::UInt64,
FieldType::Float64 => PackedColumnType::Float64,
FieldType::Text => PackedColumnType::Text,
FieldType::Bytes => PackedColumnType::Binary,
FieldType::Bool => PackedColumnType::Bool,
},
nullable: true,
})
.collect();
let packed_schema = PackedTableSchema::new(table, packed_cols);
let packed_row = PackedRow::pack_slice(&packed_schema, values);
drop(tch);
self.storage
.write(
txn_id,
key.as_bytes().to_vec(),
packed_row.as_bytes().to_vec(),
)
.map_err(|e| ClientError::Storage(e.to_string()))
} else {
drop(tch);
Err(ClientError::NotFound(format!(
"Table '{}' not found",
table
)))
}
}
pub fn bulk_insert_slice<'a, I>(&self, table: &str, rows: I, batch_size: usize) -> Result<usize>
where
I: IntoIterator<Item = (u64, Vec<Option<&'a sochdb_core::soch::SochValue>>)>,
{
let mut count = 0;
let mut batch_count = 0;
for (row_id, values) in rows {
let value_refs: Vec<Option<&sochdb_core::soch::SochValue>> = values;
self.insert_row_slice(table, row_id, &value_refs)?;
count += 1;
batch_count += 1;
if batch_count >= batch_size {
self.commit_txn()?;
batch_count = 0;
}
}
if batch_count > 0 {
self.commit_txn()?;
}
Ok(count)
}
pub fn stats(&self) -> DurableStats {
DurableStats {
queries_executed: self.queries_executed.load(Ordering::Relaxed),
tables_registered: self.tch.read().tables.len() as u64,
}
}
}
#[derive(Debug, Clone)]
pub struct DurableStats {
pub queries_executed: u64,
pub tables_registered: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionModeClient {
ReadOnly,
ReadWrite,
}
pub struct ReadOnlyConnection {
storage: Arc<DurableStorage>,
tch: Arc<RwLock<TrieColumnarHybrid>>,
active_txn: RwLock<Option<u64>>,
queries_executed: AtomicU64,
}
impl ReadOnlyConnection {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let storage = DurableStorage::open(path.as_ref())
.map_err(|e| ClientError::Storage(e.to_string()))?;
Ok(Self {
storage: Arc::new(storage),
tch: Arc::new(RwLock::new(TrieColumnarHybrid::new())),
active_txn: RwLock::new(None),
queries_executed: AtomicU64::new(0),
})
}
pub fn begin_read_txn(&self) -> Result<u64> {
let txn_id = self
.storage
.begin_transaction()
.map_err(|e| ClientError::Storage(e.to_string()))?;
*self.active_txn.write() = Some(txn_id);
Ok(txn_id)
}
pub fn end_read_txn(&self) -> Result<()> {
if let Some(txn_id) = self.active_txn.write().take() {
self.storage
.abort(txn_id)
.map_err(|e| ClientError::Storage(e.to_string()))?;
}
Ok(())
}
fn ensure_read_txn(&self) -> Result<u64> {
let active = *self.active_txn.read();
match active {
Some(txn) => Ok(txn),
None => self.begin_read_txn(),
}
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let txn_id = self.ensure_read_txn()?;
self.queries_executed.fetch_add(1, Ordering::Relaxed);
self.storage
.read(txn_id, key)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let txn_id = self.ensure_read_txn()?;
self.queries_executed.fetch_add(1, Ordering::Relaxed);
self.storage
.scan(txn_id, prefix)
.map_err(|e| ClientError::Storage(e.to_string()))
}
pub fn get_path(&self, path: &str) -> Result<Option<Vec<u8>>> {
self.get(path.as_bytes())
}
pub fn scan_path(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
let results = self.scan(prefix.as_bytes())?;
Ok(results
.into_iter()
.filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
.collect())
}
pub fn resolve(&self, path: &str) -> Result<PathResolution> {
Ok(self.tch.read().resolve(path))
}
pub fn queries_executed(&self) -> u64 {
self.queries_executed.load(Ordering::Relaxed)
}
}
pub trait ReadableConnection {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
fn get_path(&self, path: &str) -> Result<Option<Vec<u8>>> {
self.get(path.as_bytes())
}
fn scan_path(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
let results = self.scan(prefix.as_bytes())?;
Ok(results
.into_iter()
.filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
.collect())
}
}
pub trait WritableConnection: ReadableConnection {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn delete(&self, key: &[u8]) -> Result<()>;
fn begin_txn(&self) -> Result<u64>;
fn commit_txn(&self) -> Result<u64>;
fn abort_txn(&self) -> Result<()>;
}
impl ReadableConnection for ReadOnlyConnection {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
ReadOnlyConnection::get(self, key)
}
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
ReadOnlyConnection::scan(self, prefix)
}
}
impl ReadableConnection for DurableConnection {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
DurableConnection::get(self, key)
}
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
DurableConnection::scan(self, prefix)
}
}
impl WritableConnection for DurableConnection {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
DurableConnection::put(self, key, value)
}
fn delete(&self, key: &[u8]) -> Result<()> {
DurableConnection::delete(self, key)
}
fn begin_txn(&self) -> Result<u64> {
DurableConnection::begin_txn(self)
}
fn commit_txn(&self) -> Result<u64> {
DurableConnection::commit_txn(self)
}
fn abort_txn(&self) -> Result<()> {
DurableConnection::abort_txn(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_open() {
let conn = SochConnection::open("./test_data").unwrap();
assert!(conn.list_tables().is_empty());
}
#[test]
fn test_register_table() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
("score".to_string(), FieldType::Float64),
];
let cols = conn.register_table("users", &fields).unwrap();
assert_eq!(cols.len(), 3);
assert_eq!(cols[0].name, "id");
}
#[test]
fn test_path_resolution() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
];
conn.register_table("users", &fields).unwrap();
match conn.resolve("users").unwrap() {
PathResolution::Array { schema, columns } => {
assert_eq!(schema.name, "users");
assert_eq!(columns.len(), 2);
}
_ => panic!("Expected Array resolution"),
}
match conn.resolve("users.name").unwrap() {
PathResolution::Value(col) => {
assert_eq!(col.name, "name");
}
_ => panic!("Expected Value resolution"),
}
match conn.resolve("nonexistent").unwrap() {
PathResolution::NotFound => {}
_ => panic!("Expected NotFound"),
}
}
#[test]
fn test_transaction_lifecycle() {
let conn = SochConnection::open("./test_data").unwrap();
let txn_id = conn.begin_txn().unwrap();
assert!(txn_id > 0);
let commit_ts = conn.commit_txn().unwrap();
assert!(commit_ts > 0);
}
#[test]
fn test_stats() {
let conn = SochConnection::open("./test_data").unwrap();
conn.record_query();
conn.record_tokens(100, 200);
let stats = conn.stats();
assert_eq!(stats.queries_executed, 1);
assert_eq!(stats.soch_tokens_emitted, 100);
assert_eq!(stats.json_tokens_equivalent, 200);
assert!((stats.token_savings_percent - 50.0).abs() < 0.1);
}
#[test]
fn test_tch_insert_and_select() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
("score".to_string(), FieldType::Float64),
];
conn.register_table("users", &fields).unwrap();
let mut tch = conn.tch.write();
let mut row1 = std::collections::HashMap::new();
row1.insert("id".to_string(), SochValue::UInt(1));
row1.insert("name".to_string(), SochValue::Text("Alice".to_string()));
row1.insert("score".to_string(), SochValue::Float(95.5));
let id1 = tch.insert_row("users", &row1);
assert_eq!(id1, 1);
let mut row2 = std::collections::HashMap::new();
row2.insert("id".to_string(), SochValue::UInt(2));
row2.insert("name".to_string(), SochValue::Text("Bob".to_string()));
row2.insert("score".to_string(), SochValue::Float(87.2));
let id2 = tch.insert_row("users", &row2);
assert_eq!(id2, 2);
let cursor = tch.select("users", &[], None, None, None, None);
drop(tch);
let rows: Vec<_> = {
let mut cursor = cursor;
let mut rows = Vec::new();
while let Some(row) = cursor.next() {
rows.push(row);
}
rows
};
assert_eq!(rows.len(), 2);
let tch = conn.tch.read();
assert_eq!(tch.count_rows("users"), 2);
}
#[test]
fn test_tch_where_clause() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
("score".to_string(), FieldType::Float64),
];
conn.register_table("users", &fields).unwrap();
let mut tch = conn.tch.write();
for i in 1..=5 {
let mut row = std::collections::HashMap::new();
row.insert("id".to_string(), SochValue::UInt(i));
row.insert("name".to_string(), SochValue::Text(format!("User{}", i)));
row.insert("score".to_string(), SochValue::Float((i * 20) as f64));
tch.insert_row("users", &row);
}
let where_clause = WhereClause::Simple {
field: "score".to_string(),
op: CompareOp::Gt,
value: SochValue::Float(60.0),
};
let cursor = tch.select("users", &[], Some(&where_clause), None, None, None);
let rows: Vec<_> = {
let mut cursor = cursor;
let mut rows = Vec::new();
while let Some(row) = cursor.next() {
rows.push(row);
}
rows
};
assert_eq!(rows.len(), 2);
}
#[test]
fn test_tch_update_and_delete() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
];
conn.register_table("users", &fields).unwrap();
let mut tch = conn.tch.write();
let mut row = std::collections::HashMap::new();
row.insert("id".to_string(), SochValue::UInt(1));
row.insert("name".to_string(), SochValue::Text("Alice".to_string()));
tch.insert_row("users", &row);
let mut updates = std::collections::HashMap::new();
updates.insert(
"name".to_string(),
SochValue::Text("Alice Updated".to_string()),
);
let where_clause = WhereClause::Simple {
field: "id".to_string(),
op: CompareOp::Eq,
value: SochValue::UInt(1),
};
let update_result = tch.update_rows("users", &updates, Some(&where_clause));
assert_eq!(update_result.affected_count, 1);
assert_eq!(update_result.affected_row_ids.len(), 1);
let cursor = tch.select("users", &[], None, None, None, None);
let rows: Vec<_> = {
let mut cursor = cursor;
let mut rows = Vec::new();
while let Some(row) = cursor.next() {
rows.push(row);
}
rows
};
assert_eq!(
rows[0].get("name"),
Some(&SochValue::Text("Alice Updated".to_string()))
);
let delete_result = tch.delete_rows("users", Some(&where_clause));
assert_eq!(delete_result.affected_count, 1);
assert_eq!(delete_result.affected_row_ids.len(), 1);
assert_eq!(tch.count_rows("users"), 0);
}
#[test]
fn test_tch_upsert() {
let conn = SochConnection::open("./test_data").unwrap();
let fields = vec![
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
];
conn.register_table("users", &fields).unwrap();
let mut tch = conn.tch.write();
let mut row = std::collections::HashMap::new();
row.insert("id".to_string(), SochValue::UInt(1));
row.insert("name".to_string(), SochValue::Text("Alice".to_string()));
let action = tch.upsert_row("users", "id", &row);
assert_eq!(action, UpsertAction::Inserted);
let mut row2 = std::collections::HashMap::new();
row2.insert("id".to_string(), SochValue::UInt(1));
row2.insert(
"name".to_string(),
SochValue::Text("Alice Updated".to_string()),
);
let action = tch.upsert_row("users", "id", &row2);
assert_eq!(action, UpsertAction::Updated);
assert_eq!(tch.count_rows("users"), 1);
}
#[test]
fn test_durable_connection_basic() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let conn = DurableConnection::open(dir.path()).unwrap();
let txn = conn.begin_txn().unwrap();
assert!(txn > 0);
conn.put(b"key1", b"value1").unwrap();
conn.put(b"key2", b"value2").unwrap();
let v1 = conn.get(b"key1").unwrap();
assert_eq!(v1, Some(b"value1".to_vec()));
let commit_ts = conn.commit_txn().unwrap();
assert!(commit_ts > 0);
conn.begin_txn().unwrap();
let v2 = conn.get(b"key1").unwrap();
assert_eq!(v2, Some(b"value1".to_vec()));
conn.abort_txn().unwrap();
}
#[test]
fn test_durable_connection_path_api() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let conn = DurableConnection::open(dir.path()).unwrap();
conn.begin_txn().unwrap();
conn.put_path("users/1/name", b"Alice").unwrap();
conn.put_path("users/1/email", b"alice@example.com")
.unwrap();
conn.put_path("users/2/name", b"Bob").unwrap();
conn.commit_txn().unwrap();
conn.begin_txn().unwrap();
let name = conn.get_path("users/1/name").unwrap();
assert_eq!(name, Some(b"Alice".to_vec()));
let users = conn.scan_path("users/1/").unwrap();
assert_eq!(users.len(), 2);
conn.abort_txn().unwrap();
}
#[test]
fn test_durable_connection_crash_recovery() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
{
let conn =
DurableConnection::open_with_config(dir.path(), ConnectionConfig::max_durability())
.unwrap();
conn.begin_txn().unwrap();
conn.put(b"persist", b"this data").unwrap();
conn.commit_txn().unwrap();
}
{
let conn = DurableConnection::open(dir.path()).unwrap();
let _stats = conn.recover().unwrap();
conn.begin_txn().unwrap();
let v = conn.get(b"persist").unwrap();
assert_eq!(v, Some(b"this data".to_vec()));
conn.abort_txn().unwrap();
}
}
#[test]
fn test_lscs_storage_basic_put_get() {
let storage = LscsStorage::new();
storage.put(b"key1", b"value1").unwrap();
storage.put(b"key2", b"value2").unwrap();
storage.put(b"key3", b"value3").unwrap();
assert_eq!(storage.get("", b"key1").unwrap(), Some(b"value1".to_vec()));
assert_eq!(storage.get("", b"key2").unwrap(), Some(b"value2".to_vec()));
assert_eq!(storage.get("", b"key3").unwrap(), Some(b"value3".to_vec()));
assert_eq!(storage.get("", b"nonexistent").unwrap(), None);
}
#[test]
fn test_lscs_storage_update() {
let storage = LscsStorage::new();
storage.put(b"key1", b"original").unwrap();
assert_eq!(
storage.get("", b"key1").unwrap(),
Some(b"original".to_vec())
);
storage.put(b"key1", b"updated").unwrap();
assert_eq!(storage.get("", b"key1").unwrap(), Some(b"updated".to_vec()));
}
#[test]
fn test_lscs_storage_delete() {
let storage = LscsStorage::new();
storage.put(b"key1", b"value1").unwrap();
assert_eq!(storage.get("", b"key1").unwrap(), Some(b"value1".to_vec()));
storage.delete(b"key1").unwrap();
assert_eq!(storage.get("", b"key1").unwrap(), None);
}
#[test]
fn test_lscs_storage_scan() {
let storage = LscsStorage::new();
storage.put(b"user:1:name", b"Alice").unwrap();
storage.put(b"user:1:email", b"alice@test.com").unwrap();
storage.put(b"user:2:name", b"Bob").unwrap();
storage.put(b"user:2:email", b"bob@test.com").unwrap();
storage.put(b"product:1:name", b"Widget").unwrap();
let results = storage.scan(b"user:1:", b"user:1:\xff", 10).unwrap();
assert_eq!(results.len(), 2);
let results = storage.scan(b"user:", b"user:\xff", 10).unwrap();
assert_eq!(results.len(), 4);
}
#[test]
fn test_lscs_storage_wal_integrity() {
let storage = LscsStorage::new();
for i in 0..100 {
storage
.put(
format!("key{}", i).as_bytes(),
format!("value{}", i).as_bytes(),
)
.unwrap();
}
let wal_result = storage.verify_wal().unwrap();
assert_eq!(wal_result.total_entries, 100);
assert_eq!(wal_result.valid_entries, 100);
assert_eq!(wal_result.corrupted_entries, 0);
}
#[test]
fn test_lscs_storage_checkpoint() {
let storage = LscsStorage::new();
assert_eq!(storage.last_checkpoint_lsn(), 0);
storage.put(b"key1", b"value1").unwrap();
storage.put(b"key2", b"value2").unwrap();
let checkpoint_lsn = storage.force_checkpoint().unwrap();
assert!(checkpoint_lsn >= 2);
assert_eq!(storage.last_checkpoint_lsn(), checkpoint_lsn);
}
#[test]
fn test_lscs_storage_wal_truncate() {
let storage = LscsStorage::new();
for i in 0..50 {
storage
.put(format!("key{}", i).as_bytes(), b"value")
.unwrap();
}
let stats_before = storage.wal_stats();
assert_eq!(stats_before.entry_count, 50);
let removed = storage.truncate_wal(25).unwrap();
assert!(removed > 0);
let stats_after = storage.wal_stats();
assert!(stats_after.entry_count < 50);
}
#[test]
fn test_lscs_storage_replay() {
let storage = LscsStorage::new();
storage.put(b"key1", b"value1").unwrap();
storage.put(b"key2", b"value2").unwrap();
assert_eq!(storage.last_checkpoint_lsn(), 0);
let replayed = storage.replay_wal_from_checkpoint().unwrap();
assert!(replayed > 0);
}
#[test]
fn test_bloom_filter() {
let mut bloom = BloomFilter::new(1000, 0.01);
for i in 0..100 {
bloom.insert(format!("key{}", i).as_bytes());
}
for i in 0..100 {
assert!(bloom.may_contain(format!("key{}", i).as_bytes()));
}
let mut false_positives = 0;
for i in 100..1000 {
if bloom.may_contain(format!("key{}", i).as_bytes()) {
false_positives += 1;
}
}
assert!(false_positives < 50); }
#[test]
fn test_sstable_creation_and_lookup() {
let entries = vec![
SstEntry {
key: b"aaa".to_vec(),
value: b"v1".to_vec(),
timestamp: 1,
deleted: false,
},
SstEntry {
key: b"bbb".to_vec(),
value: b"v2".to_vec(),
timestamp: 2,
deleted: false,
},
SstEntry {
key: b"ccc".to_vec(),
value: b"v3".to_vec(),
timestamp: 3,
deleted: false,
},
];
let sst = SSTable::from_entries(entries, 0, 1).unwrap();
assert_eq!(sst.get(b"aaa").map(|e| &e.value), Some(&b"v1".to_vec()));
assert_eq!(sst.get(b"bbb").map(|e| &e.value), Some(&b"v2".to_vec()));
assert_eq!(sst.get(b"ccc").map(|e| &e.value), Some(&b"v3".to_vec()));
assert!(sst.get(b"zzz").is_none());
}
#[test]
fn test_lscs_storage_many_writes() {
let storage = LscsStorage::new();
for i in 0..10000 {
let key = format!("key{:06}", i);
let value = format!("value{:06}", i);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
}
storage.fsync().unwrap();
for i in (0..10000).step_by(100) {
let key = format!("key{:06}", i);
let expected = format!("value{:06}", i);
let actual = storage.get("", key.as_bytes()).unwrap();
assert_eq!(actual, Some(expected.into_bytes()));
}
}
#[test]
fn test_lscs_mvcc_newest_wins() {
let storage = LscsStorage::new();
storage.put(b"key", b"v1").unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
storage.put(b"key", b"v2").unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
storage.put(b"key", b"v3").unwrap();
assert_eq!(storage.get("", b"key").unwrap(), Some(b"v3".to_vec()));
}
#[test]
fn test_lscs_storage_recovery_needed() {
let storage = LscsStorage::new();
assert!(!storage.needs_recovery());
storage.put(b"key", b"value").unwrap();
assert!(storage.needs_recovery());
storage.force_checkpoint().unwrap();
assert!(!storage.needs_recovery());
}
#[test]
fn test_lscs_scan_across_sstables() {
let storage = LscsStorage::new();
for i in 0..10000 {
let key = format!("scankey{:06}", i);
let value = format!("scanval{:06}", i);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
}
storage.fsync().unwrap();
for i in 10000..10010 {
let key = format!("scankey{:06}", i);
let value = format!("scanval{:06}", i);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
}
let results = storage
.scan(b"scankey000000", b"scankey010009", 20000)
.unwrap();
assert_eq!(
results.len(),
10010,
"scan should return entries from both SSTables and memtable, got {}",
results.len()
);
assert_eq!(results[0].0, b"scankey000000");
assert_eq!(results[0].1, b"scanval000000");
assert_eq!(results.last().unwrap().0, b"scankey010009");
let mid_key = b"scankey005000".to_vec();
let mid_val = b"scanval005000".to_vec();
let found = results.iter().find(|(k, _)| k == &mid_key);
assert!(
found.is_some(),
"mid-range key from SSTable should be visible"
);
assert_eq!(found.unwrap().1, mid_val);
}
#[test]
fn test_lscs_scan_tombstone_shadowing() {
let storage = LscsStorage::new();
for i in 0..100 {
let key = format!("ts_key{:04}", i);
let value = format!("ts_val{:04}", i);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
}
for i in 50..60 {
let key = format!("ts_key{:04}", i);
storage.delete(key.as_bytes()).unwrap();
}
let results = storage
.scan(b"ts_key0000", b"ts_key0099", 200)
.unwrap();
assert_eq!(
results.len(),
90,
"scan should omit tombstoned entries, got {}",
results.len()
);
for i in 50..60 {
let del_key = format!("ts_key{:04}", i).into_bytes();
assert!(
!results.iter().any(|(k, _)| k == &del_key),
"deleted key ts_key{:04} should not appear in scan",
i
);
}
}
}
impl crate::ConnectionTrait for DurableConnection {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
DurableConnection::put(self, key, value)
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
DurableConnection::get(self, key)
}
fn delete(&self, key: &[u8]) -> Result<()> {
DurableConnection::delete(self, key)
}
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
DurableConnection::scan(self, prefix)
}
}
impl crate::ConnectionTrait for SochConnection {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
SochConnection::put(self, key.to_vec(), value.to_vec())
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
SochConnection::get(self, key)
}
fn delete(&self, key: &[u8]) -> Result<()> {
SochConnection::delete(self, key)
}
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
SochConnection::scan_prefix(self, prefix)
}
}