use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use sochdb_core::{Result, SochDBError};
use crate::txn_wal::TxnWal;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnTemperature {
pub name: String,
pub temperature: f64,
pub window_updates: u64,
pub total_updates: u64,
pub last_update_us: u64,
}
impl ColumnTemperature {
pub fn new(name: String) -> Self {
Self {
name,
temperature: 0.0,
window_updates: 0,
total_updates: 0,
last_update_us: 0,
}
}
pub fn record_update(&mut self) {
self.window_updates += 1;
self.total_updates += 1;
self.last_update_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
}
pub fn update_ema(&mut self, total_window_updates: u64) {
const ALPHA: f64 = 0.1;
let temp_current = if total_window_updates > 0 {
self.window_updates as f64 / total_window_updates as f64
} else {
0.0
};
self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
self.window_updates = 0;
}
pub fn is_hot(&self, threshold: f64) -> bool {
self.temperature > threshold
}
}
#[derive(Debug)]
pub struct ColumnTemperatureTracker {
columns: RwLock<HashMap<String, ColumnTemperature>>,
window_size: u64,
window_updates: AtomicU64,
hot_threshold: f64,
}
impl ColumnTemperatureTracker {
pub fn new(column_names: &[String], window_size: u64) -> Self {
let mut columns = HashMap::new();
for name in column_names {
columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
}
Self {
columns: RwLock::new(columns),
window_size,
window_updates: AtomicU64::new(0),
hot_threshold: 0.1,
}
}
pub fn record_updates(&self, column_names: &[&str]) {
let mut cols = self.columns.write();
for name in column_names {
if let Some(temp) = cols.get_mut(*name) {
temp.record_update();
}
}
let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
if total >= self.window_size {
self.update_all_ema(&mut cols, total);
self.window_updates.store(0, Ordering::SeqCst);
}
}
fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
for temp in cols.values_mut() {
temp.update_ema(total);
}
}
pub fn get_hot_columns(&self) -> HashSet<String> {
let cols = self.columns.read();
cols.values()
.filter(|t| t.is_hot(self.hot_threshold))
.map(|t| t.name.clone())
.collect()
}
pub fn get_cold_columns(&self) -> HashSet<String> {
let cols = self.columns.read();
cols.values()
.filter(|t| !t.is_hot(self.hot_threshold))
.map(|t| t.name.clone())
.collect()
}
pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
self.columns.read().values().cloned().collect()
}
pub fn set_hot_threshold(&self, _threshold: f64) {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ColumnStripeRef {
pub level: u32,
pub segment_id: u64,
pub column_name: String,
pub offset: u64,
pub length: u64,
pub row_count: u64,
pub compression: u8,
}
impl ColumnStripeRef {
pub fn new(
level: u32,
segment_id: u64,
column_name: String,
offset: u64,
length: u64,
row_count: u64,
) -> Self {
Self {
level,
segment_id,
column_name,
offset,
length,
row_count,
compression: 0,
}
}
pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
Self {
level: new_level,
segment_id: new_segment_id,
column_name: self.column_name.clone(),
offset: new_offset,
length: self.length,
row_count: self.row_count,
compression: self.compression,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentDescriptor {
pub id: u64,
pub level: u32,
pub col_refs: HashMap<String, ColumnStripeRef>,
pub min_row_id: RowId,
pub max_row_id: RowId,
pub row_count: u64,
pub min_timestamp: u64,
pub max_timestamp: u64,
pub is_tombstone: bool,
}
pub type ColumnId = u32;
pub type RowId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ColumnType {
Bool = 0,
Int64 = 1,
UInt64 = 2,
Float64 = 3,
Text = 4,
Binary = 5,
Timestamp = 6,
}
impl ColumnType {
pub fn fixed_size(&self) -> Option<usize> {
match self {
ColumnType::Bool => Some(1),
ColumnType::Int64
| ColumnType::UInt64
| ColumnType::Float64
| ColumnType::Timestamp => Some(8),
ColumnType::Text | ColumnType::Binary => None,
}
}
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0 => Some(ColumnType::Bool),
1 => Some(ColumnType::Int64),
2 => Some(ColumnType::UInt64),
3 => Some(ColumnType::Float64),
4 => Some(ColumnType::Text),
5 => Some(ColumnType::Binary),
6 => Some(ColumnType::Timestamp),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnDef>,
}
impl TableSchema {
pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
Self { name, columns }
}
pub fn with_mvcc(mut self) -> Self {
if !self.columns.iter().any(|c| c.name == "__txn_start") {
self.columns.push(ColumnDef {
name: "__txn_start".to_string(),
col_type: ColumnType::UInt64,
nullable: false,
});
}
if !self.columns.iter().any(|c| c.name == "__txn_end") {
self.columns.push(ColumnDef {
name: "__txn_end".to_string(),
col_type: ColumnType::UInt64,
nullable: false, });
}
self
}
}
#[derive(Debug, Clone)]
pub struct ColumnDef {
pub name: String,
pub col_type: ColumnType,
pub nullable: bool,
}
#[derive(Debug)]
struct ColumnBuffer {
col_type: ColumnType,
data: Vec<u8>,
nulls: Vec<u8>,
offsets: Option<Vec<u32>>,
row_count: u64,
}
impl ColumnBuffer {
fn new(col_type: ColumnType) -> Self {
Self {
col_type,
data: Vec::new(),
nulls: Vec::new(),
offsets: if col_type.fixed_size().is_none() {
Some(vec![0]) } else {
None
},
row_count: 0,
}
}
fn append(&mut self, value: Option<&[u8]>) {
let bit_idx = self.row_count as usize;
let byte_idx = bit_idx / 8;
let bit_offset = bit_idx % 8;
while self.nulls.len() <= byte_idx {
self.nulls.push(0);
}
if let Some(data) = value {
self.nulls[byte_idx] |= 1 << bit_offset;
self.data.extend_from_slice(data);
if let Some(offsets) = &mut self.offsets {
offsets.push(self.data.len() as u32);
}
} else if let Some(offsets) = &mut self.offsets {
let last = *offsets.last().unwrap();
offsets.push(last);
}
self.row_count += 1;
}
fn is_null(&self, row_idx: u64) -> bool {
if row_idx >= self.row_count {
return true; }
let byte_idx = (row_idx / 8) as usize;
let bit_offset = (row_idx % 8) as u8;
if byte_idx >= self.nulls.len() {
return true;
}
(self.nulls[byte_idx] & (1 << bit_offset)) == 0
}
fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
if row_idx >= self.row_count || self.is_null(row_idx) {
return None;
}
if let Some(fixed_size) = self.col_type.fixed_size() {
let start = (row_idx as usize) * fixed_size;
let end = start + fixed_size;
if end <= self.data.len() {
Some(self.data[start..end].to_vec())
} else {
None
}
} else {
if let Some(offsets) = &self.offsets {
let start = offsets[row_idx as usize] as usize;
let end = offsets[(row_idx + 1) as usize] as usize;
if end <= self.data.len() {
Some(self.data[start..end].to_vec())
} else {
None
}
} else {
None
}
}
}
fn memory_bytes(&self) -> usize {
self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
}
}
#[derive(Debug)]
pub struct ColumnarMemtable {
schema: TableSchema,
columns: Vec<RwLock<ColumnBuffer>>,
row_ids: RwLock<BTreeMap<RowId, u64>>,
row_idx_to_id: RwLock<Vec<RowId>>,
next_row_idx: AtomicU64,
bytes_written: AtomicU64,
size_limit: usize,
}
impl ColumnarMemtable {
pub fn new(schema: TableSchema, size_limit: usize) -> Self {
let columns = schema
.columns
.iter()
.map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
.collect();
Self {
schema,
columns,
row_ids: RwLock::new(BTreeMap::new()),
row_idx_to_id: RwLock::new(Vec::new()),
next_row_idx: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
size_limit,
}
}
pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
if values.len() != self.schema.columns.len() {
return Err(SochDBError::InvalidData(format!(
"Expected {} columns, got {}",
self.schema.columns.len(),
values.len()
)));
}
let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
let mut bytes = 0usize;
for (i, value) in values.iter().enumerate() {
let mut col = self.columns[i].write();
if let Some(data) = value {
bytes += data.len();
}
col.append(*value);
}
{
let mut ids = self.row_ids.write();
ids.insert(row_id, row_idx);
}
{
let mut idx_to_id = self.row_idx_to_id.write();
while idx_to_id.len() <= row_idx as usize {
idx_to_id.push(0); }
idx_to_id[row_idx as usize] = row_id;
}
self.bytes_written
.fetch_add(bytes as u64, Ordering::Relaxed);
Ok(())
}
pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
let row_ids = self.row_ids.read();
let row_idx = *row_ids.get(&row_id)?;
drop(row_ids);
let mut values = Vec::with_capacity(self.columns.len());
for col in &self.columns {
let col_buf = col.read();
values.push(col_buf.get(row_idx));
}
Some(values)
}
pub fn get_columns(
&self,
row_id: RowId,
col_indices: &[usize],
) -> Option<Vec<Option<Vec<u8>>>> {
let row_ids = self.row_ids.read();
let row_idx = *row_ids.get(&row_id)?;
drop(row_ids);
let mut values = Vec::with_capacity(col_indices.len());
for &col_idx in col_indices {
if col_idx < self.columns.len() {
let col_buf = self.columns[col_idx].read();
values.push(col_buf.get(row_idx));
} else {
values.push(None);
}
}
Some(values)
}
pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
let row_ids = self.row_ids.read();
let mut results = Vec::new();
for (&row_id, &row_idx) in row_ids.range(start..=end) {
let mut values = Vec::with_capacity(self.columns.len());
for col in &self.columns {
let col_buf = col.read();
values.push(col_buf.get(row_idx));
}
results.push((row_id, values));
}
results
}
pub fn is_full(&self) -> bool {
self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
}
pub fn row_count(&self) -> u64 {
self.next_row_idx.load(Ordering::SeqCst)
}
pub fn memory_bytes(&self) -> usize {
self.columns.iter().map(|c| c.read().memory_bytes()).sum()
}
pub fn schema(&self) -> &TableSchema {
&self.schema
}
}
use sochdb_core::learned_index::LearnedSparseIndex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnIndex {
pub offset: u64,
pub length: u64,
pub compression: u8,
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct ColumnGroup {
path: PathBuf,
schema: TableSchema,
level: u32,
sequence: u64,
row_count: u64,
min_timestamp: u64,
max_timestamp: u64,
column_offsets: BTreeMap<String, ColumnIndex>,
lsi: Option<LearnedSparseIndex>,
}
impl ColumnGroup {
const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
const VERSION: u32 = 1;
#[allow(clippy::too_many_arguments)]
pub fn new(
path: PathBuf,
schema: TableSchema,
level: u32,
sequence: u64,
row_count: u64,
min_timestamp: u64,
max_timestamp: u64,
column_offsets: BTreeMap<String, ColumnIndex>,
lsi: Option<LearnedSparseIndex>,
) -> Self {
Self {
path,
schema,
level,
sequence,
row_count,
min_timestamp,
max_timestamp,
column_offsets,
lsi,
}
}
pub fn from_memtable(
base_path: &Path,
memtable: &ColumnarMemtable,
level: u32,
sequence: u64,
) -> Result<Self> {
use byteorder::{LittleEndian, WriteBytesExt};
use std::fs::File;
use std::io::{BufWriter, Seek, Write};
let file_name = format!("L{}_seq{}.sst", level, sequence);
let file_path = base_path.join(&file_name);
let file = File::create(&file_path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&Self::MAGIC)?;
writer.write_u32::<LittleEndian>(Self::VERSION)?;
let mut column_offsets = BTreeMap::new();
let mut min_ts = u64::MAX;
let mut max_ts = 0u64;
for (i, col_lock) in memtable.columns.iter().enumerate() {
let col = col_lock.read();
let col_def = &memtable.schema.columns[i];
if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
let mut offset = 0;
let row_count = col.row_count as usize;
for row_idx in 0..row_count {
let byte_idx = row_idx / 8;
let bit_idx = row_idx % 8;
let is_null =
byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
if !is_null && offset + 8 <= col.data.len() {
let ts = u64::from_le_bytes(
col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
);
min_ts = min_ts.min(ts);
max_ts = max_ts.max(ts);
}
offset += 8;
}
}
let start_offset = writer.stream_position()?;
writer.write_u8(col.col_type as u8)?;
writer.write_u64::<LittleEndian>(col.row_count)?;
writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
writer.write_all(&col.nulls)?;
if let Some(offsets) = &col.offsets {
writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
for &off in offsets {
writer.write_u32::<LittleEndian>(off)?;
}
}
writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
writer.write_all(&col.data)?;
let end_offset = writer.stream_position()?;
column_offsets.insert(
col_def.name.clone(),
ColumnIndex {
offset: start_offset,
length: end_offset - start_offset,
compression: 0, },
);
}
let row_ids = memtable.row_ids.read();
let keys: Vec<u64> = row_ids.keys().cloned().collect();
let lsi = LearnedSparseIndex::build(&keys);
let footer_start = writer.stream_position()?;
let offsets_bytes = bincode::serialize(&column_offsets)
.map_err(|e| SochDBError::Serialization(e.to_string()))?;
writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
writer.write_all(&offsets_bytes)?;
let lsi_bytes =
bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
writer.write_all(&lsi_bytes)?;
writer.write_u64::<LittleEndian>(footer_start)?;
writer.write_all(&Self::MAGIC)?;
writer.flush()?;
if min_ts == u64::MAX || max_ts == 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
min_ts = now;
max_ts = now;
}
Ok(Self {
path: file_path,
schema: memtable.schema.clone(),
level,
sequence,
row_count: memtable.row_count(),
min_timestamp: min_ts,
max_timestamp: max_ts,
column_offsets,
lsi: Some(lsi),
})
}
pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
use byteorder::{LittleEndian, ReadBytesExt};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
let mut file = File::open(&path)?;
let file_len = file.metadata()?.len();
if file_len < 12 {
return Err(SochDBError::Corruption("File too short".to_string()));
}
file.seek(SeekFrom::End(-12))?;
let footer_offset = file.read_u64::<LittleEndian>()?;
let mut magic = [0u8; 4];
file.read_exact(&mut magic)?;
if magic != Self::MAGIC {
return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
}
file.seek(SeekFrom::Start(footer_offset))?;
let offsets_len = file.read_u64::<LittleEndian>()?;
let mut offsets_bytes = vec![0u8; offsets_len as usize];
file.read_exact(&mut offsets_bytes)?;
let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
.map_err(|e| SochDBError::Serialization(e.to_string()))?;
let lsi_len = file.read_u64::<LittleEndian>()?;
let mut lsi_bytes = vec![0u8; lsi_len as usize];
file.read_exact(&mut lsi_bytes)?;
let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
.map_err(|e| SochDBError::Serialization(e.to_string()))?;
Ok(Self {
path,
schema,
level,
sequence,
row_count: 0, min_timestamp: 0,
max_timestamp: 0,
column_offsets,
lsi: Some(lsi),
})
}
pub fn file_path(&self) -> &Path {
&self.path
}
pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
self.column_offsets.get(col_name)
}
pub fn level(&self) -> u32 {
self.level
}
pub fn row_count(&self) -> u64 {
self.row_count
}
}
#[derive(Debug, Clone, Default)]
pub struct CompactionStats {
pub compactions_total: u64,
pub l0_compactions: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub hot_column_compactions: u64,
pub cold_column_refs_preserved: u64,
pub estimated_wa_reduction: f64,
pub last_compaction_duration_us: u64,
}
impl CompactionStats {
pub fn write_amplification(&self) -> f64 {
if self.bytes_read == 0 {
1.0
} else {
self.bytes_written as f64 / self.bytes_read as f64
}
}
}
#[derive(Debug, Clone, Default)]
pub struct LscsRecoveryStats {
pub transactions_recovered: usize,
pub rows_recovered: usize,
pub max_row_id: u64,
}
#[derive(Debug, Clone)]
pub struct LscsConfig {
pub memtable_size: usize,
pub num_levels: usize,
pub level_ratio: usize,
pub l0_compaction_threshold: usize,
pub hot_threshold: f64,
pub temperature_window_size: u64,
}
impl Default for LscsConfig {
fn default() -> Self {
Self {
memtable_size: 64 * 1024 * 1024, num_levels: 7,
level_ratio: 10,
l0_compaction_threshold: 4,
hot_threshold: 0.1, temperature_window_size: 1000, }
}
}
pub struct Lscs {
config: LscsConfig,
path: PathBuf,
schema: TableSchema,
wal: Arc<TxnWal>,
active_memtable: RwLock<ColumnarMemtable>,
immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
temperature_tracker: Arc<ColumnTemperatureTracker>,
next_sequence: AtomicU64,
next_row_id: AtomicU64,
compaction_stats: RwLock<CompactionStats>,
}
impl Lscs {
pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
std::fs::create_dir_all(&path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(TxnWal::new(&wal_path)?);
let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
let mut column_groups = Vec::with_capacity(config.num_levels);
for _ in 0..config.num_levels {
column_groups.push(Vec::new());
}
let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
&column_names,
config.temperature_window_size,
));
Ok(Self {
config,
path,
schema,
wal,
active_memtable: RwLock::new(active_memtable),
immutable_memtables: RwLock::new(Vec::new()),
column_groups: RwLock::new(column_groups),
segment_descriptors: RwLock::new(HashMap::new()),
temperature_tracker,
next_sequence: AtomicU64::new(0),
next_row_id: AtomicU64::new(1),
compaction_stats: RwLock::new(CompactionStats::default()),
})
}
pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
let lscs = Self::new(path, schema, config)?;
let stats = lscs.recover()?;
if stats.rows_recovered > 0 {
eprintln!(
"LSCS Recovery: restored {} rows from {} transactions",
stats.rows_recovered, stats.transactions_recovered
);
}
Ok(lscs)
}
pub fn recover(&self) -> Result<LscsRecoveryStats> {
let (writes, txn_count) = self.wal.replay_for_recovery()?;
if writes.is_empty() {
return Ok(LscsRecoveryStats::default());
}
let mut max_row_id: u64 = 0;
let mut rows_recovered = 0usize;
for (key, value) in &writes {
if key.len() >= 8 {
let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
if row_id > max_row_id {
max_row_id = row_id;
}
if let Ok(row_values) = Self::deserialize_row(value) {
let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
let memtable = self.active_memtable.read();
if memtable.insert(row_id, &value_refs).is_ok() {
rows_recovered += 1;
}
}
}
}
if max_row_id > 0 {
self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
}
Ok(LscsRecoveryStats {
transactions_recovered: txn_count,
rows_recovered,
max_row_id,
})
}
pub fn mark_clean_shutdown(&self) -> Result<()> {
self.fsync()?;
let marker_path = self.path.join(".clean_shutdown");
std::fs::write(&marker_path, b"clean")?;
Ok(())
}
pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
let txn_id = self.wal.begin_transaction()?;
let key = row_id.to_le_bytes().to_vec();
let value = self.serialize_row(values)?;
self.wal.write(txn_id, key, value)?;
self.wal.commit_transaction(txn_id)?;
let memtable = self.active_memtable.read();
memtable.insert(row_id, values)?;
if memtable.is_full() {
drop(memtable);
self.rotate_memtable()?;
}
Ok(row_id)
}
pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
let txn_end_idx = self
.schema
.columns
.iter()
.position(|c| c.name == "__txn_end")
.ok_or_else(|| {
SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
})?;
let current = self
.get(row_id)?
.ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
let mut new_values: Vec<Option<Vec<u8>>> = current;
new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
let wal_txn_id = self.wal.begin_transaction()?;
let row_data = self.serialize_row(&value_refs)?;
self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
self.wal.commit_transaction(wal_txn_id)?;
let memtable = self.active_memtable.read();
memtable.insert(row_id, &value_refs)?;
Ok(())
}
fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
use byteorder::{LittleEndian, WriteBytesExt};
let mut buf = Vec::new();
buf.write_u32::<LittleEndian>(values.len() as u32)?;
for value in values {
match value {
Some(data) => {
buf.write_u8(1)?; buf.write_u32::<LittleEndian>(data.len() as u32)?;
buf.extend_from_slice(data);
}
None => {
buf.write_u8(0)?; }
}
}
Ok(buf)
}
#[allow(dead_code)]
fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor;
let mut cursor = Cursor::new(data);
let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
let mut values = Vec::with_capacity(num_cols);
for _ in 0..num_cols {
let is_non_null = cursor.read_u8()? == 1;
if is_non_null {
let len = cursor.read_u32::<LittleEndian>()? as usize;
let pos = cursor.position() as usize;
let value = data[pos..pos + len].to_vec();
cursor.set_position((pos + len) as u64);
values.push(Some(value));
} else {
values.push(None);
}
}
Ok(values)
}
pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
{
let memtable = self.active_memtable.read();
if let Some(values) = memtable.get(row_id) {
return Ok(Some(values));
}
}
{
let immutable = self.immutable_memtables.read();
for memtable in immutable.iter().rev() {
if let Some(values) = memtable.get(row_id) {
return Ok(Some(values));
}
}
}
{
use sochdb_core::learned_index::LookupResult;
let groups = self.column_groups.read();
for level in &*groups {
for group in level.iter().rev() {
if let Some(lsi) = &group.lsi {
let lookup = lsi.lookup(row_id);
match lookup {
LookupResult::Exact(_) | LookupResult::Range { .. } => {
if let Some(row) = self.read_row_from_sstable(group, row_id)? {
return Ok(Some(row));
}
}
LookupResult::NotFound => {
continue;
}
}
}
}
}
}
Ok(None)
}
fn read_row_from_sstable(
&self,
group: &ColumnGroup,
row_id: RowId,
) -> Result<Option<Vec<Option<Vec<u8>>>>> {
use byteorder::{LittleEndian, ReadBytesExt};
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
let file = File::open(group.file_path())?;
let mut reader = BufReader::new(file);
let mut values = Vec::new();
for (col_name, col_idx) in &group.column_offsets {
reader.seek(SeekFrom::Start(col_idx.offset))?;
let col_type = reader.read_u8()?;
let row_count = reader.read_u64::<LittleEndian>()?;
if row_id >= row_count {
values.push(None);
continue;
}
let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
let mut nulls = vec![0u8; nulls_len];
reader.read_exact(&mut nulls)?;
let byte_idx = (row_id / 8) as usize;
let bit_offset = (row_id % 8) as u8;
let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
if is_null {
values.push(None);
continue;
}
let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
if let Some(fixed_size) = col_type.fixed_size() {
let offsets_section = reader.stream_position()?;
let data_len = reader.read_u32::<LittleEndian>()? as usize;
let _ = data_len;
let row_offset = (row_id as usize) * fixed_size;
reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
let mut value = vec![0u8; fixed_size];
reader.read_exact(&mut value)?;
values.push(Some(value));
} else {
let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
let mut offsets = vec![0u32; offsets_count];
for offset in offsets.iter_mut().take(offsets_count) {
*offset = reader.read_u32::<LittleEndian>()?;
}
if (row_id as usize + 1) >= offsets.len() {
values.push(None);
continue;
}
let start = offsets[row_id as usize] as usize;
let end = offsets[(row_id + 1) as usize] as usize;
let data_len = reader.read_u32::<LittleEndian>()? as usize;
let data_start = reader.stream_position()?;
if end <= data_len {
reader.seek(SeekFrom::Start(data_start + start as u64))?;
let mut value = vec![0u8; end - start];
reader.read_exact(&mut value)?;
values.push(Some(value));
} else {
values.push(None);
}
}
let _ = col_name; }
if values.is_empty() {
Ok(None)
} else {
Ok(Some(values))
}
}
pub fn fsync(&self) -> Result<()> {
self.wal.sync()?;
let memtable = self.active_memtable.read();
let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
drop(memtable);
if should_flush {
self.rotate_memtable()?;
self.flush()?;
}
Ok(())
}
fn rotate_memtable(&self) -> Result<()> {
let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
let old_memtable = {
let mut active = self.active_memtable.write();
std::mem::replace(&mut *active, new_memtable)
};
let mut immutable = self.immutable_memtables.write();
immutable.push(old_memtable);
if immutable.len() >= 2 {
drop(immutable); self.flush()?;
}
Ok(())
}
pub fn flush(&self) -> Result<()> {
let memtables = {
let mut immutable = self.immutable_memtables.write();
std::mem::take(&mut *immutable)
};
for memtable in memtables {
let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
let mut groups = self.column_groups.write();
groups[0].push(column_group);
}
let groups = self.column_groups.read();
if groups[0].len() >= self.config.l0_compaction_threshold {
drop(groups);
self.compact_l0()?;
}
Ok(())
}
fn compact_l0(&self) -> Result<()> {
let start_time = std::time::Instant::now();
let hot_columns = self.temperature_tracker.get_hot_columns();
let cold_columns = self.temperature_tracker.get_cold_columns();
let total_columns = self.schema.columns.len();
let hot_fraction = if total_columns > 0 {
hot_columns.len() as f64 / total_columns as f64
} else {
1.0
};
let l0_segments: Vec<ColumnGroup> = {
let mut groups = self.column_groups.write();
std::mem::take(&mut groups[0])
};
if l0_segments.is_empty() {
return Ok(());
}
let mut bytes_read = 0u64;
let mut bytes_written = 0u64;
let mut cold_refs_preserved = 0u64;
let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
let _merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
let mut col_refs = HashMap::new();
let mut total_row_count = 0u64;
let min_row_id = u64::MAX;
let max_row_id = 0u64;
for segment in &l0_segments {
bytes_read += segment.row_count * 100; total_row_count += segment.row_count;
for col_name in &hot_columns {
if let Some(col_idx) = segment.column_offsets.get(col_name) {
bytes_read += col_idx.length;
bytes_written += col_idx.length;
}
}
for col_name in &cold_columns {
if let Some(col_idx) = segment.column_offsets.get(col_name) {
let stripe_ref = ColumnStripeRef::new(
segment.level,
segment.sequence,
col_name.clone(),
col_idx.offset,
col_idx.length,
segment.row_count,
);
col_refs.insert(col_name.clone(), stripe_ref);
cold_refs_preserved += 1;
}
}
}
let segment_desc = SegmentDescriptor {
id: sequence,
level: 1,
col_refs,
min_row_id,
max_row_id,
row_count: total_row_count,
min_timestamp: 0,
max_timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64,
is_tombstone: false,
};
{
let mut descriptors = self.segment_descriptors.write();
descriptors.insert(sequence, segment_desc);
}
{
let mut stats = self.compaction_stats.write();
stats.compactions_total += 1;
stats.l0_compactions += 1;
stats.bytes_read += bytes_read;
stats.bytes_written += bytes_written;
stats.cold_column_refs_preserved += cold_refs_preserved;
stats.hot_column_compactions += 1;
stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
}
for segment in l0_segments {
let _ = segment; }
Ok(())
}
#[allow(dead_code)]
fn selective_merge_hot_columns(
&self,
segments: &[&ColumnGroup],
hot_columns: &HashSet<String>,
output_path: &Path,
) -> Result<HashMap<String, ColumnStripeRef>> {
use byteorder::{LittleEndian, WriteBytesExt};
use std::fs::File;
use std::io::{BufWriter, Seek, Write};
let mut result = HashMap::new();
let file = File::create(output_path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&ColumnGroup::MAGIC)?;
writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
let sequence = self.next_sequence.load(Ordering::SeqCst);
for col_name in hot_columns {
let start_offset = writer.stream_position()?;
let mut merged_data = Vec::new();
let mut row_count = 0u64;
for segment in segments {
if let Some(_col_idx) = segment.column_offsets.get(col_name) {
merged_data.extend_from_slice(&[0u8; 0]); row_count += segment.row_count;
}
}
writer.write_u64::<LittleEndian>(row_count)?;
writer.write_all(&merged_data)?;
let end_offset = writer.stream_position()?;
let stripe_ref = ColumnStripeRef::new(
1, sequence,
col_name.clone(),
start_offset,
end_offset - start_offset,
row_count,
);
result.insert(col_name.clone(), stripe_ref);
}
writer.flush()?;
Ok(result)
}
pub fn scan_columns(
&self,
column_names: &[&str],
row_range: Option<(RowId, RowId)>,
) -> Result<Vec<Vec<u8>>> {
let mut results = Vec::new();
let descriptors = self.segment_descriptors.read();
for (_seg_id, descriptor) in descriptors.iter() {
if let Some((min, max)) = row_range
&& (descriptor.max_row_id < min || descriptor.min_row_id > max)
{
continue;
}
for col_name in column_names {
if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
let data = self.read_column_stripe(stripe_ref)?;
results.push(data);
}
}
}
Ok(results)
}
fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
let file_path = self.path.join(format!(
"L{}_seq{}.sst",
stripe_ref.level, stripe_ref.segment_id
));
let mut file = File::open(&file_path)?;
file.seek(SeekFrom::Start(stripe_ref.offset))?;
let mut data = vec![0u8; stripe_ref.length as usize];
file.read_exact(&mut data)?;
Ok(data)
}
pub fn compaction_stats(&self) -> CompactionStats {
self.compaction_stats.read().clone()
}
pub fn compact(&self) -> Result<()> {
self.compact_l0()
}
pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
self.temperature_tracker.get_all_temperatures()
}
#[allow(clippy::type_complexity)]
pub fn scan_range(
&self,
start: RowId,
end: RowId,
) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
let mut results = Vec::new();
let mut seen = std::collections::HashSet::new();
{
let memtable = self.active_memtable.read();
for (row_id, values) in memtable.scan_range(start, end) {
if seen.insert(row_id) {
results.push((row_id, values));
}
}
}
{
let immutable = self.immutable_memtables.read();
for memtable in immutable.iter().rev() {
for (row_id, values) in memtable.scan_range(start, end) {
if seen.insert(row_id) {
results.push((row_id, values));
}
}
}
}
results.sort_by_key(|(id, _)| *id);
Ok(results)
}
#[allow(clippy::type_complexity)]
pub fn scan_columns_range(
&self,
start: RowId,
end: RowId,
col_indices: &[usize],
) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
let mut results = Vec::new();
let mut seen = std::collections::HashSet::new();
{
let memtable = self.active_memtable.read();
let row_ids = memtable.row_ids.read();
for (&row_id, _) in row_ids.range(start..=end) {
if seen.insert(row_id)
&& let Some(values) = memtable.get_columns(row_id, col_indices)
{
results.push((row_id, values));
}
}
}
{
let immutable = self.immutable_memtables.read();
for memtable in immutable.iter().rev() {
let row_ids = memtable.row_ids.read();
for (&row_id, _) in row_ids.range(start..=end) {
if seen.insert(row_id)
&& let Some(values) = memtable.get_columns(row_id, col_indices)
{
results.push((row_id, values));
}
}
}
}
results.sort_by_key(|(id, _)| *id);
Ok(results)
}
pub fn stats(&self) -> LscsStats {
let active = self.active_memtable.read();
let immutable = self.immutable_memtables.read();
let groups = self.column_groups.read();
let mut level_sizes = vec![0u64; self.config.num_levels];
let mut disk_bytes = 0u64;
for (i, level) in groups.iter().enumerate() {
for group in level {
level_sizes[i] += group.row_count;
if let Ok(metadata) = std::fs::metadata(&group.path) {
disk_bytes += metadata.len();
}
}
}
LscsStats {
active_memtable_bytes: active.memory_bytes(),
immutable_memtables: immutable.len(),
level_row_counts: level_sizes,
next_row_id: self.next_row_id.load(Ordering::SeqCst),
disk_bytes,
}
}
pub fn wal(&self) -> &Arc<TxnWal> {
&self.wal
}
}
#[derive(Debug, Clone)]
pub struct LscsStats {
pub active_memtable_bytes: usize,
pub immutable_memtables: usize,
pub level_row_counts: Vec<u64>,
pub next_row_id: u64,
pub disk_bytes: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_schema() -> TableSchema {
TableSchema {
name: "users".to_string(),
columns: vec![
ColumnDef {
name: "id".to_string(),
col_type: ColumnType::UInt64,
nullable: false,
},
ColumnDef {
name: "name".to_string(),
col_type: ColumnType::Text,
nullable: false,
},
ColumnDef {
name: "score".to_string(),
col_type: ColumnType::Float64,
nullable: true,
},
],
}
}
#[test]
fn test_columnar_memtable_insert() {
let schema = test_schema();
let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
let id: u64 = 1;
let name = "Alice";
let score: f64 = 95.5;
memtable
.insert(
1,
&[
Some(&id.to_le_bytes()),
Some(name.as_bytes()),
Some(&score.to_le_bytes()),
],
)
.unwrap();
assert_eq!(memtable.row_count(), 1);
}
#[test]
fn test_columnar_memtable_with_nulls() {
let schema = test_schema();
let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
let id: u64 = 1;
let name = "Bob";
memtable
.insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
.unwrap();
assert_eq!(memtable.row_count(), 1);
}
#[test]
fn test_lscs_basic() {
let dir = tempdir().unwrap();
let schema = test_schema();
let config = LscsConfig {
memtable_size: 1024,
..Default::default()
};
let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
let id: u64 = 1;
let name = "Charlie";
let score: f64 = 87.2;
let row_id = lscs
.insert(&[
Some(&id.to_le_bytes()),
Some(name.as_bytes()),
Some(&score.to_le_bytes()),
])
.unwrap();
assert_eq!(row_id, 1);
let stats = lscs.stats();
assert!(stats.active_memtable_bytes > 0);
}
#[test]
fn test_column_group_write() {
let dir = tempfile::tempdir().unwrap();
let schema = TableSchema::new(
"users".to_string(),
vec![
ColumnDef {
name: "id".to_string(),
col_type: ColumnType::UInt64,
nullable: false,
},
ColumnDef {
name: "name".to_string(),
col_type: ColumnType::Text,
nullable: false,
},
ColumnDef {
name: "score".to_string(),
col_type: ColumnType::Float64,
nullable: true,
},
],
)
.with_mvcc();
let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
memtable
.insert(
1,
&[
Some(&1u64.to_le_bytes()), Some(b"Alice"), Some(&95.5f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&0u64.to_le_bytes()), ],
)
.unwrap();
memtable
.insert(
2,
&[
Some(&2u64.to_le_bytes()), Some(b"Bob"), Some(&87.2f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&200u64.to_le_bytes()), ],
)
.unwrap();
let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
let file_path = cg.file_path();
assert!(file_path.exists());
assert!(file_path.extension().unwrap() == "sst");
let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
assert_eq!(cg_opened.column_offsets.len(), 5);
assert!(cg_opened.lsi.is_some());
let lsi = cg_opened.lsi.as_ref().unwrap();
assert!(lsi.stats().num_keys > 0);
}
#[test]
fn test_memtable_get() {
let schema = test_schema();
let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
let id1: u64 = 1;
let name1 = "Alice";
let score1: f64 = 95.5;
memtable
.insert(
1,
&[
Some(&id1.to_le_bytes()),
Some(name1.as_bytes()),
Some(&score1.to_le_bytes()),
],
)
.unwrap();
let id2: u64 = 2;
let name2 = "Bob";
memtable
.insert(
2,
&[
Some(&id2.to_le_bytes()),
Some(name2.as_bytes()),
None, ],
)
.unwrap();
let row1 = memtable.get(1).unwrap();
assert_eq!(row1.len(), 3);
assert_eq!(
u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
1
);
assert_eq!(
std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
"Alice"
);
let row2 = memtable.get(2).unwrap();
assert!(row2[2].is_none());
assert!(memtable.get(999).is_none());
}
#[test]
fn test_memtable_scan_range() {
let schema = test_schema();
let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
for i in 1..=10 {
memtable
.insert(
i,
&[
Some(&i.to_le_bytes()),
Some(format!("User{}", i).as_bytes()),
Some(&((i as f64) * 10.0).to_le_bytes()),
],
)
.unwrap();
}
let results = memtable.scan_range(3, 7);
assert_eq!(results.len(), 5);
for (row_id, _) in &results {
assert!(*row_id >= 3 && *row_id <= 7);
}
}
#[test]
fn test_lscs_get() {
let dir = tempdir().unwrap();
let schema = test_schema();
let config = LscsConfig {
memtable_size: 64 * 1024 * 1024,
..Default::default()
};
let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
let id: u64 = 42;
let name = "TestUser";
let score: f64 = 99.9;
let row_id = lscs
.insert(&[
Some(&id.to_le_bytes()),
Some(name.as_bytes()),
Some(&score.to_le_bytes()),
])
.unwrap();
let result = lscs.get(row_id).unwrap();
assert!(result.is_some());
let values = result.unwrap();
assert_eq!(
u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
42
);
assert_eq!(
std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
"TestUser"
);
}
#[test]
fn test_lscs_fsync() {
let dir = tempdir().unwrap();
let schema = test_schema();
let config = LscsConfig::default();
let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
for i in 1..=5 {
lscs.insert(&[
Some(&(i as u64).to_le_bytes()),
Some(format!("User{}", i).as_bytes()),
Some(&((i as f64) * 10.0).to_le_bytes()),
])
.unwrap();
}
lscs.fsync().unwrap();
let result = lscs.get(1).unwrap();
assert!(result.is_some());
}
}