1use parking_lot::RwLock;
65use serde::{Deserialize, Serialize};
66use std::collections::{BTreeMap, HashMap, HashSet};
67use std::path::{Path, PathBuf};
68use std::sync::Arc;
69use std::sync::atomic::{AtomicU64, Ordering};
70use sochdb_core::{Result, SochDBError};
71
72use crate::txn_wal::TxnWal;
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ColumnTemperature {
84 pub name: String,
86 pub temperature: f64,
88 pub window_updates: u64,
90 pub total_updates: u64,
92 pub last_update_us: u64,
94}
95
96impl ColumnTemperature {
97 pub fn new(name: String) -> Self {
99 Self {
100 name,
101 temperature: 0.0,
102 window_updates: 0,
103 total_updates: 0,
104 last_update_us: 0,
105 }
106 }
107
108 pub fn record_update(&mut self) {
110 self.window_updates += 1;
111 self.total_updates += 1;
112 self.last_update_us = std::time::SystemTime::now()
113 .duration_since(std::time::UNIX_EPOCH)
114 .unwrap()
115 .as_micros() as u64;
116 }
117
118 pub fn update_ema(&mut self, total_window_updates: u64) {
123 const ALPHA: f64 = 0.1;
124
125 let temp_current = if total_window_updates > 0 {
126 self.window_updates as f64 / total_window_updates as f64
127 } else {
128 0.0
129 };
130
131 self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
132 self.window_updates = 0;
133 }
134
135 pub fn is_hot(&self, threshold: f64) -> bool {
137 self.temperature > threshold
138 }
139}
140
141#[derive(Debug)]
143pub struct ColumnTemperatureTracker {
144 columns: RwLock<HashMap<String, ColumnTemperature>>,
146 window_size: u64,
148 window_updates: AtomicU64,
150 hot_threshold: AtomicU64,
152}
153
154impl ColumnTemperatureTracker {
155 pub fn new(column_names: &[String], window_size: u64) -> Self {
157 let mut columns = HashMap::new();
158 for name in column_names {
159 columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
160 }
161 Self {
162 columns: RwLock::new(columns),
163 window_size,
164 window_updates: AtomicU64::new(0),
165 hot_threshold: AtomicU64::new(0.1_f64.to_bits()),
166 }
167 }
168
169 fn threshold(&self) -> f64 {
171 f64::from_bits(self.hot_threshold.load(Ordering::Relaxed))
172 }
173
174 pub fn record_updates(&self, column_names: &[&str]) {
176 let mut cols = self.columns.write();
177 for name in column_names {
178 if let Some(temp) = cols.get_mut(*name) {
179 temp.record_update();
180 }
181 }
182
183 let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
184
185 if total >= self.window_size {
187 self.update_all_ema(&mut cols, total);
188 self.window_updates.store(0, Ordering::SeqCst);
189 }
190 }
191
192 fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
193 for temp in cols.values_mut() {
194 temp.update_ema(total);
195 }
196 }
197
198 pub fn get_hot_columns(&self) -> HashSet<String> {
200 let threshold = self.threshold();
201 let cols = self.columns.read();
202 cols.values()
203 .filter(|t| t.is_hot(threshold))
204 .map(|t| t.name.clone())
205 .collect()
206 }
207
208 pub fn get_cold_columns(&self) -> HashSet<String> {
210 let threshold = self.threshold();
211 let cols = self.columns.read();
212 cols.values()
213 .filter(|t| !t.is_hot(threshold))
214 .map(|t| t.name.clone())
215 .collect()
216 }
217
218 pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
220 self.columns.read().values().cloned().collect()
221 }
222
223 pub fn set_hot_threshold(&self, threshold: f64) {
228 let clamped = threshold.clamp(0.0, 1.0);
229 self.hot_threshold.store(clamped.to_bits(), Ordering::Relaxed);
230 }
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
243pub struct ColumnStripeRef {
244 pub level: u32,
246 pub segment_id: u64,
248 pub column_name: String,
250 pub offset: u64,
252 pub length: u64,
254 pub row_count: u64,
256 pub compression: u8,
258}
259
260impl ColumnStripeRef {
261 pub fn new(
263 level: u32,
264 segment_id: u64,
265 column_name: String,
266 offset: u64,
267 length: u64,
268 row_count: u64,
269 ) -> Self {
270 Self {
271 level,
272 segment_id,
273 column_name,
274 offset,
275 length,
276 row_count,
277 compression: 0,
278 }
279 }
280
281 pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
283 Self {
284 level: new_level,
285 segment_id: new_segment_id,
286 column_name: self.column_name.clone(),
287 offset: new_offset,
288 length: self.length,
289 row_count: self.row_count,
290 compression: self.compression,
291 }
292 }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct SegmentDescriptor {
301 pub id: u64,
303 pub level: u32,
305 pub col_refs: HashMap<String, ColumnStripeRef>,
307 pub min_row_id: RowId,
309 pub max_row_id: RowId,
311 pub row_count: u64,
313 pub min_timestamp: u64,
315 pub max_timestamp: u64,
317 pub is_tombstone: bool,
319}
320
321pub type ColumnId = u32;
323
324pub type RowId = u64;
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
329#[repr(u8)]
330pub enum ColumnType {
331 Bool = 0,
332 Int64 = 1,
333 UInt64 = 2,
334 Float64 = 3,
335 Text = 4,
336 Binary = 5,
337 Timestamp = 6,
338}
339
340impl ColumnType {
341 pub fn fixed_size(&self) -> Option<usize> {
343 match self {
344 ColumnType::Bool => Some(1),
345 ColumnType::Int64
346 | ColumnType::UInt64
347 | ColumnType::Float64
348 | ColumnType::Timestamp => Some(8),
349 ColumnType::Text | ColumnType::Binary => None,
350 }
351 }
352
353 pub fn from_byte(b: u8) -> Option<Self> {
355 match b {
356 0 => Some(ColumnType::Bool),
357 1 => Some(ColumnType::Int64),
358 2 => Some(ColumnType::UInt64),
359 3 => Some(ColumnType::Float64),
360 4 => Some(ColumnType::Text),
361 5 => Some(ColumnType::Binary),
362 6 => Some(ColumnType::Timestamp),
363 _ => None,
364 }
365 }
366}
367
368#[derive(Debug, Clone)]
370pub struct TableSchema {
371 pub name: String,
373 pub columns: Vec<ColumnDef>,
375}
376
377impl TableSchema {
378 pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
379 Self { name, columns }
380 }
381
382 pub fn with_mvcc(mut self) -> Self {
384 if !self.columns.iter().any(|c| c.name == "__txn_start") {
385 self.columns.push(ColumnDef {
386 name: "__txn_start".to_string(),
387 col_type: ColumnType::UInt64,
388 nullable: false,
389 });
390 }
391 if !self.columns.iter().any(|c| c.name == "__txn_end") {
392 self.columns.push(ColumnDef {
393 name: "__txn_end".to_string(),
394 col_type: ColumnType::UInt64,
395 nullable: false, });
397 }
398 self
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct ColumnDef {
405 pub name: String,
407 pub col_type: ColumnType,
409 pub nullable: bool,
411}
412
413#[derive(Debug)]
415struct ColumnBuffer {
416 col_type: ColumnType,
418 data: Vec<u8>,
420 nulls: Vec<u8>,
422 offsets: Option<Vec<u32>>,
424 row_count: u64,
426}
427
428impl ColumnBuffer {
429 fn new(col_type: ColumnType) -> Self {
430 Self {
431 col_type,
432 data: Vec::new(),
433 nulls: Vec::new(),
434 offsets: if col_type.fixed_size().is_none() {
435 Some(vec![0]) } else {
437 None
438 },
439 row_count: 0,
440 }
441 }
442
443 fn append(&mut self, value: Option<&[u8]>) {
445 let bit_idx = self.row_count as usize;
447 let byte_idx = bit_idx / 8;
448 let bit_offset = bit_idx % 8;
449
450 while self.nulls.len() <= byte_idx {
451 self.nulls.push(0);
452 }
453
454 if let Some(data) = value {
455 self.nulls[byte_idx] |= 1 << bit_offset;
457
458 self.data.extend_from_slice(data);
460
461 if let Some(offsets) = &mut self.offsets {
463 offsets.push(self.data.len() as u32);
464 }
465 } else if let Some(offsets) = &mut self.offsets {
466 let last = *offsets.last().unwrap();
468 offsets.push(last);
469 }
470
471 self.row_count += 1;
472 }
473
474 fn is_null(&self, row_idx: u64) -> bool {
476 if row_idx >= self.row_count {
477 return true; }
479 let byte_idx = (row_idx / 8) as usize;
480 let bit_offset = (row_idx % 8) as u8;
481
482 if byte_idx >= self.nulls.len() {
483 return true;
484 }
485
486 (self.nulls[byte_idx] & (1 << bit_offset)) == 0
487 }
488
489 fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
492 if row_idx >= self.row_count || self.is_null(row_idx) {
493 return None;
494 }
495
496 if let Some(fixed_size) = self.col_type.fixed_size() {
497 let start = (row_idx as usize) * fixed_size;
499 let end = start + fixed_size;
500 if end <= self.data.len() {
501 Some(self.data[start..end].to_vec())
502 } else {
503 None
504 }
505 } else {
506 if let Some(offsets) = &self.offsets {
508 let start = offsets[row_idx as usize] as usize;
509 let end = offsets[(row_idx + 1) as usize] as usize;
510 if end <= self.data.len() {
511 Some(self.data[start..end].to_vec())
512 } else {
513 None
514 }
515 } else {
516 None
517 }
518 }
519 }
520
521 fn memory_bytes(&self) -> usize {
523 self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
524 }
525}
526
527#[derive(Debug)]
529pub struct ColumnarMemtable {
530 schema: TableSchema,
532 columns: Vec<RwLock<ColumnBuffer>>,
534 row_ids: RwLock<BTreeMap<RowId, u64>>,
536 row_idx_to_id: RwLock<Vec<RowId>>,
538 next_row_idx: AtomicU64,
540 bytes_written: AtomicU64,
542 size_limit: usize,
544}
545
546impl ColumnarMemtable {
547 pub fn new(schema: TableSchema, size_limit: usize) -> Self {
549 let columns = schema
550 .columns
551 .iter()
552 .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
553 .collect();
554
555 Self {
556 schema,
557 columns,
558 row_ids: RwLock::new(BTreeMap::new()),
559 row_idx_to_id: RwLock::new(Vec::new()),
560 next_row_idx: AtomicU64::new(0),
561 bytes_written: AtomicU64::new(0),
562 size_limit,
563 }
564 }
565
566 pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
570 if values.len() != self.schema.columns.len() {
571 return Err(SochDBError::InvalidData(format!(
572 "Expected {} columns, got {}",
573 self.schema.columns.len(),
574 values.len()
575 )));
576 }
577
578 let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
579
580 let mut bytes = 0usize;
582 for (i, value) in values.iter().enumerate() {
583 let mut col = self.columns[i].write();
584 if let Some(data) = value {
585 bytes += data.len();
586 }
587 col.append(*value);
588 }
589
590 {
592 let mut ids = self.row_ids.write();
593 ids.insert(row_id, row_idx);
594 }
595 {
596 let mut idx_to_id = self.row_idx_to_id.write();
597 while idx_to_id.len() <= row_idx as usize {
599 idx_to_id.push(0); }
601 idx_to_id[row_idx as usize] = row_id;
602 }
603
604 self.bytes_written
605 .fetch_add(bytes as u64, Ordering::Relaxed);
606
607 Ok(())
608 }
609
610 pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
613 let row_ids = self.row_ids.read();
615 let row_idx = *row_ids.get(&row_id)?;
616 drop(row_ids);
617
618 let mut values = Vec::with_capacity(self.columns.len());
620 for col in &self.columns {
621 let col_buf = col.read();
622 values.push(col_buf.get(row_idx));
623 }
624
625 Some(values)
626 }
627
628 pub fn get_columns(
630 &self,
631 row_id: RowId,
632 col_indices: &[usize],
633 ) -> Option<Vec<Option<Vec<u8>>>> {
634 let row_ids = self.row_ids.read();
636 let row_idx = *row_ids.get(&row_id)?;
637 drop(row_ids);
638
639 let mut values = Vec::with_capacity(col_indices.len());
641 for &col_idx in col_indices {
642 if col_idx < self.columns.len() {
643 let col_buf = self.columns[col_idx].read();
644 values.push(col_buf.get(row_idx));
645 } else {
646 values.push(None);
647 }
648 }
649
650 Some(values)
651 }
652
653 pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
655 let row_ids = self.row_ids.read();
656 let mut results = Vec::new();
657
658 for (&row_id, &row_idx) in row_ids.range(start..=end) {
659 let mut values = Vec::with_capacity(self.columns.len());
660 for col in &self.columns {
661 let col_buf = col.read();
662 values.push(col_buf.get(row_idx));
663 }
664 results.push((row_id, values));
665 }
666
667 results
668 }
669
670 pub fn is_full(&self) -> bool {
672 self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
673 }
674
675 pub fn row_count(&self) -> u64 {
677 self.next_row_idx.load(Ordering::SeqCst)
678 }
679
680 pub fn memory_bytes(&self) -> usize {
682 self.columns.iter().map(|c| c.read().memory_bytes()).sum()
683 }
684
685 pub fn schema(&self) -> &TableSchema {
687 &self.schema
688 }
689}
690
691use sochdb_core::learned_index::LearnedSparseIndex;
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
695pub struct ColumnIndex {
696 pub offset: u64,
698 pub length: u64,
700 pub compression: u8,
702}
703
704#[derive(Debug)]
706#[allow(dead_code)]
707pub struct ColumnGroup {
708 path: PathBuf,
710 schema: TableSchema,
712 level: u32,
714 sequence: u64,
716 row_count: u64,
718 min_timestamp: u64,
720 max_timestamp: u64,
722 column_offsets: BTreeMap<String, ColumnIndex>,
724 lsi: Option<LearnedSparseIndex>,
726}
727
728impl ColumnGroup {
729 const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
731 const VERSION: u32 = 1;
732
733 #[allow(clippy::too_many_arguments)]
735 pub fn new(
736 path: PathBuf,
737 schema: TableSchema,
738 level: u32,
739 sequence: u64,
740 row_count: u64,
741 min_timestamp: u64,
742 max_timestamp: u64,
743 column_offsets: BTreeMap<String, ColumnIndex>,
744 lsi: Option<LearnedSparseIndex>,
745 ) -> Self {
746 Self {
747 path,
748 schema,
749 level,
750 sequence,
751 row_count,
752 min_timestamp,
753 max_timestamp,
754 column_offsets,
755 lsi,
756 }
757 }
758
759 pub fn from_memtable(
761 base_path: &Path,
762 memtable: &ColumnarMemtable,
763 level: u32,
764 sequence: u64,
765 ) -> Result<Self> {
766 use byteorder::{LittleEndian, WriteBytesExt};
767 use std::fs::File;
768 use std::io::{BufWriter, Seek, Write};
769
770 let file_name = format!("L{}_seq{}.sst", level, sequence);
772 let file_path = base_path.join(&file_name);
773 let file = File::create(&file_path)?;
774 let mut writer = BufWriter::new(file);
775
776 writer.write_all(&Self::MAGIC)?;
778 writer.write_u32::<LittleEndian>(Self::VERSION)?;
779
780 let mut column_offsets = BTreeMap::new();
781 let mut min_ts = u64::MAX;
782 let mut max_ts = 0u64;
783
784 for (i, col_lock) in memtable.columns.iter().enumerate() {
786 let col = col_lock.read();
787 let col_def = &memtable.schema.columns[i];
788
789 if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
791 let mut offset = 0;
793 let row_count = col.row_count as usize;
794 for row_idx in 0..row_count {
795 let byte_idx = row_idx / 8;
797 let bit_idx = row_idx % 8;
798 let is_null =
799 byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
800
801 if !is_null && offset + 8 <= col.data.len() {
802 let ts = u64::from_le_bytes(
803 col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
804 );
805 min_ts = min_ts.min(ts);
806 max_ts = max_ts.max(ts);
807 }
808 offset += 8;
809 }
810 }
811
812 let start_offset = writer.stream_position()?;
813
814 writer.write_u8(col.col_type as u8)?;
816 writer.write_u64::<LittleEndian>(col.row_count)?;
817
818 writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
820 writer.write_all(&col.nulls)?;
821
822 if let Some(offsets) = &col.offsets {
824 writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
825 for &off in offsets {
826 writer.write_u32::<LittleEndian>(off)?;
827 }
828 }
829
830 writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
832 writer.write_all(&col.data)?;
833
834 let end_offset = writer.stream_position()?;
835
836 column_offsets.insert(
837 col_def.name.clone(),
838 ColumnIndex {
839 offset: start_offset,
840 length: end_offset - start_offset,
841 compression: 0, },
843 );
844 }
845
846 let row_ids = memtable.row_ids.read();
848 let keys: Vec<u64> = row_ids.keys().cloned().collect();
849 let lsi = LearnedSparseIndex::build(&keys);
850
851 let footer_start = writer.stream_position()?;
853
854 let offsets_bytes = bincode::serialize(&column_offsets)
856 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
857 writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
858 writer.write_all(&offsets_bytes)?;
859
860 let lsi_bytes =
862 bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
863 writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
864 writer.write_all(&lsi_bytes)?;
865
866 writer.write_u64::<LittleEndian>(footer_start)?;
868 writer.write_all(&Self::MAGIC)?;
869
870 writer.flush()?;
871
872 if min_ts == u64::MAX || max_ts == 0 {
874 let now = std::time::SystemTime::now()
875 .duration_since(std::time::UNIX_EPOCH)
876 .unwrap()
877 .as_micros() as u64;
878 min_ts = now;
879 max_ts = now;
880 }
881
882 Ok(Self {
883 path: file_path,
884 schema: memtable.schema.clone(),
885 level,
886 sequence,
887 row_count: memtable.row_count(),
888 min_timestamp: min_ts,
889 max_timestamp: max_ts,
890 column_offsets,
891 lsi: Some(lsi),
892 })
893 }
894
895 pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
897 use byteorder::{LittleEndian, ReadBytesExt};
898 use std::fs::File;
899 use std::io::{Read, Seek, SeekFrom};
900
901 let mut file = File::open(&path)?;
902 let file_len = file.metadata()?.len();
903
904 if file_len < 12 {
905 return Err(SochDBError::Corruption("File too short".to_string()));
907 }
908
909 file.seek(SeekFrom::End(-12))?;
911 let footer_offset = file.read_u64::<LittleEndian>()?;
912 let mut magic = [0u8; 4];
913 file.read_exact(&mut magic)?;
914
915 if magic != Self::MAGIC {
916 return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
917 }
918
919 file.seek(SeekFrom::Start(footer_offset))?;
921
922 let offsets_len = file.read_u64::<LittleEndian>()?;
924 let mut offsets_bytes = vec![0u8; offsets_len as usize];
925 file.read_exact(&mut offsets_bytes)?;
926 let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
927 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
928
929 let lsi_len = file.read_u64::<LittleEndian>()?;
931 let mut lsi_bytes = vec![0u8; lsi_len as usize];
932 file.read_exact(&mut lsi_bytes)?;
933 let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
934 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
935
936 Ok(Self {
937 path,
938 schema,
939 level,
940 sequence,
941 row_count: 0, min_timestamp: 0,
943 max_timestamp: 0,
944 column_offsets,
945 lsi: Some(lsi),
946 })
947 }
948
949 pub fn file_path(&self) -> &Path {
951 &self.path
952 }
953
954 pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
956 self.column_offsets.get(col_name)
957 }
958
959 pub fn level(&self) -> u32 {
961 self.level
962 }
963
964 pub fn row_count(&self) -> u64 {
966 self.row_count
967 }
968}
969
970#[derive(Debug, Clone, Default)]
976pub struct CompactionStats {
977 pub compactions_total: u64,
979 pub l0_compactions: u64,
981 pub bytes_read: u64,
983 pub bytes_written: u64,
985 pub hot_column_compactions: u64,
987 pub cold_column_refs_preserved: u64,
989 pub estimated_wa_reduction: f64,
991 pub last_compaction_duration_us: u64,
993}
994
995impl CompactionStats {
996 pub fn write_amplification(&self) -> f64 {
998 if self.bytes_read == 0 {
999 1.0
1000 } else {
1001 self.bytes_written as f64 / self.bytes_read as f64
1002 }
1003 }
1004}
1005
1006#[derive(Debug, Clone, Default)]
1008pub struct LscsRecoveryStats {
1009 pub transactions_recovered: usize,
1011 pub rows_recovered: usize,
1013 pub max_row_id: u64,
1015}
1016
1017#[derive(Debug, Clone)]
1019pub struct LscsConfig {
1020 pub memtable_size: usize,
1022 pub num_levels: usize,
1024 pub level_ratio: usize,
1026 pub l0_compaction_threshold: usize,
1028 pub hot_threshold: f64,
1030 pub temperature_window_size: u64,
1032}
1033
1034impl Default for LscsConfig {
1035 fn default() -> Self {
1036 Self {
1037 memtable_size: 64 * 1024 * 1024, num_levels: 7,
1039 level_ratio: 10,
1040 l0_compaction_threshold: 4,
1041 hot_threshold: 0.1, temperature_window_size: 1000, }
1044 }
1045}
1046
1047pub struct Lscs {
1049 config: LscsConfig,
1051 path: PathBuf,
1053 schema: TableSchema,
1055 wal: Arc<TxnWal>,
1057 active_memtable: RwLock<ColumnarMemtable>,
1059 immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1061 column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1063 segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1065 temperature_tracker: Arc<ColumnTemperatureTracker>,
1067 next_sequence: AtomicU64,
1069 next_row_id: AtomicU64,
1071 compaction_stats: RwLock<CompactionStats>,
1073}
1074
1075impl Lscs {
1076 pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1078 std::fs::create_dir_all(&path)?;
1079
1080 let wal_path = path.join("wal.log");
1081 let wal = Arc::new(TxnWal::new(&wal_path)?);
1082
1083 let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1084
1085 let mut column_groups = Vec::with_capacity(config.num_levels);
1086 for _ in 0..config.num_levels {
1087 column_groups.push(Vec::new());
1088 }
1089
1090 let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1092 let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1093 &column_names,
1094 config.temperature_window_size,
1095 ));
1096
1097 Ok(Self {
1098 config,
1099 path,
1100 schema,
1101 wal,
1102 active_memtable: RwLock::new(active_memtable),
1103 immutable_memtables: RwLock::new(Vec::new()),
1104 column_groups: RwLock::new(column_groups),
1105 segment_descriptors: RwLock::new(HashMap::new()),
1106 temperature_tracker,
1107 next_sequence: AtomicU64::new(0),
1108 next_row_id: AtomicU64::new(1),
1109 compaction_stats: RwLock::new(CompactionStats::default()),
1110 })
1111 }
1112
1113 pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1126 let lscs = Self::new(path, schema, config)?;
1127 let stats = lscs.recover()?;
1128
1129 if stats.rows_recovered > 0 {
1130 eprintln!(
1131 "LSCS Recovery: restored {} rows from {} transactions",
1132 stats.rows_recovered, stats.transactions_recovered
1133 );
1134 }
1135
1136 Ok(lscs)
1137 }
1138
1139 pub fn recover(&self) -> Result<LscsRecoveryStats> {
1143 let (writes, txn_count) = self.wal.replay_for_recovery()?;
1144
1145 if writes.is_empty() {
1146 return Ok(LscsRecoveryStats::default());
1147 }
1148
1149 let mut max_row_id: u64 = 0;
1150 let mut rows_recovered = 0usize;
1151
1152 for (key, value) in &writes {
1154 if key.len() >= 8 {
1156 let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1157 if row_id > max_row_id {
1158 max_row_id = row_id;
1159 }
1160
1161 if let Ok(row_values) = Self::deserialize_row(value) {
1163 let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
1164
1165 let memtable = self.active_memtable.read();
1166 if memtable.insert(row_id, &value_refs).is_ok() {
1167 rows_recovered += 1;
1168 }
1169 }
1170 }
1171 }
1172
1173 if max_row_id > 0 {
1175 self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1176 }
1177
1178 Ok(LscsRecoveryStats {
1179 transactions_recovered: txn_count,
1180 rows_recovered,
1181 max_row_id,
1182 })
1183 }
1184
1185 pub fn mark_clean_shutdown(&self) -> Result<()> {
1190 self.fsync()?;
1192
1193 let marker_path = self.path.join(".clean_shutdown");
1195 std::fs::write(&marker_path, b"clean")?;
1196
1197 Ok(())
1201 }
1202
1203 pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1205 let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1206
1207 let txn_id = self.wal.begin_transaction()?;
1209
1210 let key = row_id.to_le_bytes().to_vec();
1212 let value = self.serialize_row(values)?;
1213 self.wal.write(txn_id, key, value)?;
1214 self.wal.commit_transaction(txn_id)?;
1215
1216 let memtable = self.active_memtable.read();
1218 memtable.insert(row_id, values)?;
1219
1220 if memtable.is_full() {
1222 drop(memtable);
1223 self.rotate_memtable()?;
1224 }
1225
1226 Ok(row_id)
1227 }
1228
1229 pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1243 let txn_end_idx = self
1245 .schema
1246 .columns
1247 .iter()
1248 .position(|c| c.name == "__txn_end")
1249 .ok_or_else(|| {
1250 SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1251 })?;
1252
1253 let current = self
1255 .get(row_id)?
1256 .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1257
1258 let mut new_values: Vec<Option<Vec<u8>>> = current;
1260 new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1261
1262 let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1264
1265 let wal_txn_id = self.wal.begin_transaction()?;
1268
1269 let row_data = self.serialize_row(&value_refs)?;
1271 self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1272
1273 self.wal.commit_transaction(wal_txn_id)?;
1275
1276 let memtable = self.active_memtable.read();
1278 memtable.insert(row_id, &value_refs)?;
1279
1280 Ok(())
1281 }
1282
1283 fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1285 use byteorder::{LittleEndian, WriteBytesExt};
1286
1287 let mut buf = Vec::new();
1288 buf.write_u32::<LittleEndian>(values.len() as u32)?;
1289
1290 for value in values {
1291 match value {
1292 Some(data) => {
1293 buf.write_u8(1)?; buf.write_u32::<LittleEndian>(data.len() as u32)?;
1295 buf.extend_from_slice(data);
1296 }
1297 None => {
1298 buf.write_u8(0)?; }
1300 }
1301 }
1302
1303 Ok(buf)
1304 }
1305
1306 #[allow(dead_code)]
1308 fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1309 use byteorder::{LittleEndian, ReadBytesExt};
1310 use std::io::Cursor;
1311
1312 let mut cursor = Cursor::new(data);
1313 let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1314 let mut values = Vec::with_capacity(num_cols);
1315
1316 for _ in 0..num_cols {
1317 let is_non_null = cursor.read_u8()? == 1;
1318 if is_non_null {
1319 let len = cursor.read_u32::<LittleEndian>()? as usize;
1320 let pos = cursor.position() as usize;
1321 let value = data[pos..pos + len].to_vec();
1322 cursor.set_position((pos + len) as u64);
1323 values.push(Some(value));
1324 } else {
1325 values.push(None);
1326 }
1327 }
1328
1329 Ok(values)
1330 }
1331
1332 pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1337 {
1339 let memtable = self.active_memtable.read();
1340 if let Some(values) = memtable.get(row_id) {
1341 return Ok(Some(values));
1342 }
1343 }
1344
1345 {
1347 let immutable = self.immutable_memtables.read();
1348 for memtable in immutable.iter().rev() {
1349 if let Some(values) = memtable.get(row_id) {
1350 return Ok(Some(values));
1351 }
1352 }
1353 }
1354
1355 {
1358 use sochdb_core::learned_index::LookupResult;
1359 let groups = self.column_groups.read();
1360 for level in &*groups {
1361 for group in level.iter().rev() {
1362 if let Some(lsi) = &group.lsi {
1363 let lookup = lsi.lookup(row_id);
1365 match lookup {
1366 LookupResult::Exact(_) | LookupResult::Range { .. } => {
1367 if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1369 return Ok(Some(row));
1370 }
1371 }
1372 LookupResult::NotFound => {
1373 continue;
1375 }
1376 }
1377 }
1378 }
1379 }
1380 }
1381
1382 Ok(None)
1383 }
1384
1385 fn read_row_from_sstable(
1387 &self,
1388 group: &ColumnGroup,
1389 row_id: RowId,
1390 ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1391 use byteorder::{LittleEndian, ReadBytesExt};
1392 use std::fs::File;
1393 use std::io::{BufReader, Read, Seek, SeekFrom};
1394
1395 let file = File::open(group.file_path())?;
1396 let mut reader = BufReader::new(file);
1397
1398 let mut values = Vec::new();
1399
1400 for (col_name, col_idx) in &group.column_offsets {
1402 reader.seek(SeekFrom::Start(col_idx.offset))?;
1403
1404 let col_type = reader.read_u8()?;
1406 let row_count = reader.read_u64::<LittleEndian>()?;
1407
1408 if row_id >= row_count {
1409 values.push(None);
1410 continue;
1411 }
1412
1413 let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1415 let mut nulls = vec![0u8; nulls_len];
1416 reader.read_exact(&mut nulls)?;
1417
1418 let byte_idx = (row_id / 8) as usize;
1420 let bit_offset = (row_id % 8) as u8;
1421 let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1422
1423 if is_null {
1424 values.push(None);
1425 continue;
1426 }
1427
1428 let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1430 if let Some(fixed_size) = col_type.fixed_size() {
1431 let offsets_section = reader.stream_position()?;
1433 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1434 let _ = data_len;
1435
1436 let row_offset = (row_id as usize) * fixed_size;
1438 reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1439
1440 let mut value = vec![0u8; fixed_size];
1441 reader.read_exact(&mut value)?;
1442 values.push(Some(value));
1443 } else {
1444 let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1446 let mut offsets = vec![0u32; offsets_count];
1447 for offset in offsets.iter_mut().take(offsets_count) {
1448 *offset = reader.read_u32::<LittleEndian>()?;
1449 }
1450
1451 if (row_id as usize + 1) >= offsets.len() {
1452 values.push(None);
1453 continue;
1454 }
1455
1456 let start = offsets[row_id as usize] as usize;
1457 let end = offsets[(row_id + 1) as usize] as usize;
1458
1459 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1461 let data_start = reader.stream_position()?;
1462
1463 if end <= data_len {
1464 reader.seek(SeekFrom::Start(data_start + start as u64))?;
1465 let mut value = vec![0u8; end - start];
1466 reader.read_exact(&mut value)?;
1467 values.push(Some(value));
1468 } else {
1469 values.push(None);
1470 }
1471 }
1472 let _ = col_name; }
1474
1475 if values.is_empty() {
1476 Ok(None)
1477 } else {
1478 Ok(Some(values))
1479 }
1480 }
1481
1482 pub fn fsync(&self) -> Result<()> {
1490 self.wal.sync()?;
1492
1493 let memtable = self.active_memtable.read();
1495 let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1496 drop(memtable);
1497
1498 if should_flush {
1499 self.rotate_memtable()?;
1501 self.flush()?;
1502 }
1503
1504 Ok(())
1505 }
1506
1507 fn rotate_memtable(&self) -> Result<()> {
1509 let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1510
1511 let old_memtable = {
1512 let mut active = self.active_memtable.write();
1513 std::mem::replace(&mut *active, new_memtable)
1514 };
1515
1516 let mut immutable = self.immutable_memtables.write();
1517 immutable.push(old_memtable);
1518
1519 if immutable.len() >= 2 {
1521 drop(immutable); self.flush()?;
1524 }
1525
1526 Ok(())
1527 }
1528
1529 pub fn flush(&self) -> Result<()> {
1531 let memtables = {
1532 let mut immutable = self.immutable_memtables.write();
1533 std::mem::take(&mut *immutable)
1534 };
1535
1536 for memtable in memtables {
1537 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1538 let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1539
1540 let mut groups = self.column_groups.write();
1541 groups[0].push(column_group);
1542 }
1543
1544 let groups = self.column_groups.read();
1546 if groups[0].len() >= self.config.l0_compaction_threshold {
1547 drop(groups);
1548 self.compact_l0()?;
1549 }
1550
1551 Ok(())
1552 }
1553
1554 fn compact_l0(&self) -> Result<()> {
1566 let start_time = std::time::Instant::now();
1567
1568 let hot_columns = self.temperature_tracker.get_hot_columns();
1570 let cold_columns = self.temperature_tracker.get_cold_columns();
1571
1572 let total_columns = self.schema.columns.len();
1573 let hot_fraction = if total_columns > 0 {
1574 hot_columns.len() as f64 / total_columns as f64
1575 } else {
1576 1.0
1577 };
1578
1579 let l0_segments: Vec<ColumnGroup> = {
1581 let mut groups = self.column_groups.write();
1582 std::mem::take(&mut groups[0])
1583 };
1584
1585 if l0_segments.is_empty() {
1586 return Ok(());
1587 }
1588
1589 let mut bytes_read = 0u64;
1590 let mut bytes_written = 0u64;
1591 let mut cold_refs_preserved = 0u64;
1592
1593 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1594
1595 let merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1598 let segment_refs: Vec<&ColumnGroup> = l0_segments.iter().collect();
1599
1600 let hot_col_refs = if !hot_columns.is_empty() {
1601 let refs = self.selective_merge_hot_columns(
1602 &segment_refs,
1603 &hot_columns,
1604 &merged_path,
1605 )?;
1606 for (_col, stripe) in &refs {
1608 bytes_written += stripe.length;
1609 }
1610 refs
1611 } else {
1612 HashMap::new()
1613 };
1614
1615 let mut col_refs = hot_col_refs;
1618 let mut total_row_count = 0u64;
1619 let mut min_row_id = u64::MAX;
1620 let mut max_row_id = 0u64;
1621
1622 for segment in &l0_segments {
1623 bytes_read += segment.row_count * 100; total_row_count += segment.row_count;
1625
1626 for col_name in &hot_columns {
1628 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1629 bytes_read += col_idx.length;
1630 }
1631 }
1632
1633 for col_name in &cold_columns {
1635 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1636 let stripe_ref = ColumnStripeRef::new(
1637 segment.level,
1638 segment.sequence,
1639 col_name.clone(),
1640 col_idx.offset,
1641 col_idx.length,
1642 segment.row_count,
1643 );
1644 col_refs.insert(col_name.clone(), stripe_ref);
1645 cold_refs_preserved += 1;
1646 }
1647 }
1648 }
1649
1650 let segment_desc = SegmentDescriptor {
1652 id: sequence,
1653 level: 1,
1654 col_refs,
1655 min_row_id,
1656 max_row_id,
1657 row_count: total_row_count,
1658 min_timestamp: 0,
1659 max_timestamp: std::time::SystemTime::now()
1660 .duration_since(std::time::UNIX_EPOCH)
1661 .unwrap()
1662 .as_micros() as u64,
1663 is_tombstone: false,
1664 };
1665
1666 {
1668 let mut descriptors = self.segment_descriptors.write();
1669 descriptors.insert(sequence, segment_desc);
1670 }
1671
1672 {
1674 let mut stats = self.compaction_stats.write();
1675 stats.compactions_total += 1;
1676 stats.l0_compactions += 1;
1677 stats.bytes_read += bytes_read;
1678 stats.bytes_written += bytes_written;
1679 stats.cold_column_refs_preserved += cold_refs_preserved;
1680 stats.hot_column_compactions += 1;
1681 stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1682 stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1683 }
1684
1685 for segment in l0_segments {
1688 let _ = segment; }
1690
1691 Ok(())
1692 }
1693
1694 fn selective_merge_hot_columns(
1706 &self,
1707 segments: &[&ColumnGroup],
1708 hot_columns: &HashSet<String>,
1709 output_path: &Path,
1710 ) -> Result<HashMap<String, ColumnStripeRef>> {
1711 use byteorder::{LittleEndian, WriteBytesExt};
1712 use std::fs::File;
1713 use std::io::{BufWriter, Seek, Write};
1714
1715 let mut result = HashMap::new();
1716
1717 let file = File::create(output_path)?;
1719 let mut writer = BufWriter::new(file);
1720
1721 writer.write_all(&ColumnGroup::MAGIC)?;
1723 writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1724
1725 let sequence = self.next_sequence.load(Ordering::SeqCst);
1726
1727 for col_name in hot_columns {
1729 let start_offset = writer.stream_position()?;
1730
1731 let mut merged_data = Vec::new();
1733 let mut row_count = 0u64;
1734
1735 for segment in segments {
1736 if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1737 merged_data.extend_from_slice(&[0u8; 0]); row_count += segment.row_count;
1741 }
1742 }
1743
1744 writer.write_u64::<LittleEndian>(row_count)?;
1747 writer.write_all(&merged_data)?;
1748
1749 let end_offset = writer.stream_position()?;
1750
1751 let stripe_ref = ColumnStripeRef::new(
1753 1, sequence,
1755 col_name.clone(),
1756 start_offset,
1757 end_offset - start_offset,
1758 row_count,
1759 );
1760 result.insert(col_name.clone(), stripe_ref);
1761 }
1762
1763 writer.flush()?;
1764 Ok(result)
1765 }
1766
1767 pub fn scan_columns(
1772 &self,
1773 column_names: &[&str],
1774 row_range: Option<(RowId, RowId)>,
1775 ) -> Result<Vec<Vec<u8>>> {
1776 let mut results = Vec::new();
1777
1778 let descriptors = self.segment_descriptors.read();
1780
1781 for (_seg_id, descriptor) in descriptors.iter() {
1782 if let Some((min, max)) = row_range
1784 && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1785 {
1786 continue;
1787 }
1788
1789 for col_name in column_names {
1791 if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1792 let data = self.read_column_stripe(stripe_ref)?;
1794 results.push(data);
1795 }
1796 }
1797 }
1798
1799 Ok(results)
1800 }
1801
1802 fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1804 use std::fs::File;
1805 use std::io::{Read, Seek, SeekFrom};
1806
1807 let file_path = self.path.join(format!(
1809 "L{}_seq{}.sst",
1810 stripe_ref.level, stripe_ref.segment_id
1811 ));
1812
1813 let mut file = File::open(&file_path)?;
1814 file.seek(SeekFrom::Start(stripe_ref.offset))?;
1815
1816 let mut data = vec![0u8; stripe_ref.length as usize];
1817 file.read_exact(&mut data)?;
1818
1819 Ok(data)
1820 }
1821
1822 pub fn compaction_stats(&self) -> CompactionStats {
1824 self.compaction_stats.read().clone()
1825 }
1826
1827 pub fn compact(&self) -> Result<()> {
1831 self.compact_l0()
1832 }
1833
1834 pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1836 self.temperature_tracker.get_all_temperatures()
1837 }
1838
1839 #[allow(clippy::type_complexity)]
1844 pub fn scan_range(
1845 &self,
1846 start: RowId,
1847 end: RowId,
1848 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1849 let mut results = Vec::new();
1850 let mut seen = std::collections::HashSet::new();
1851
1852 {
1854 let memtable = self.active_memtable.read();
1855 for (row_id, values) in memtable.scan_range(start, end) {
1856 if seen.insert(row_id) {
1857 results.push((row_id, values));
1858 }
1859 }
1860 }
1861
1862 {
1864 let immutable = self.immutable_memtables.read();
1865 for memtable in immutable.iter().rev() {
1866 for (row_id, values) in memtable.scan_range(start, end) {
1867 if seen.insert(row_id) {
1868 results.push((row_id, values));
1869 }
1870 }
1871 }
1872 }
1873
1874 results.sort_by_key(|(id, _)| *id);
1879
1880 Ok(results)
1881 }
1882
1883 #[allow(clippy::type_complexity)]
1887 pub fn scan_columns_range(
1888 &self,
1889 start: RowId,
1890 end: RowId,
1891 col_indices: &[usize],
1892 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1893 let mut results = Vec::new();
1894 let mut seen = std::collections::HashSet::new();
1895
1896 {
1898 let memtable = self.active_memtable.read();
1899 let row_ids = memtable.row_ids.read();
1900
1901 for (&row_id, _) in row_ids.range(start..=end) {
1902 if seen.insert(row_id)
1903 && let Some(values) = memtable.get_columns(row_id, col_indices)
1904 {
1905 results.push((row_id, values));
1906 }
1907 }
1908 }
1909
1910 {
1912 let immutable = self.immutable_memtables.read();
1913 for memtable in immutable.iter().rev() {
1914 let row_ids = memtable.row_ids.read();
1915 for (&row_id, _) in row_ids.range(start..=end) {
1916 if seen.insert(row_id)
1917 && let Some(values) = memtable.get_columns(row_id, col_indices)
1918 {
1919 results.push((row_id, values));
1920 }
1921 }
1922 }
1923 }
1924
1925 results.sort_by_key(|(id, _)| *id);
1927
1928 Ok(results)
1929 }
1930
1931 pub fn stats(&self) -> LscsStats {
1933 let active = self.active_memtable.read();
1934 let immutable = self.immutable_memtables.read();
1935 let groups = self.column_groups.read();
1936
1937 let mut level_sizes = vec![0u64; self.config.num_levels];
1938 let mut disk_bytes = 0u64;
1939
1940 for (i, level) in groups.iter().enumerate() {
1941 for group in level {
1942 level_sizes[i] += group.row_count;
1943 if let Ok(metadata) = std::fs::metadata(&group.path) {
1945 disk_bytes += metadata.len();
1946 }
1947 }
1948 }
1949
1950 LscsStats {
1951 active_memtable_bytes: active.memory_bytes(),
1952 immutable_memtables: immutable.len(),
1953 level_row_counts: level_sizes,
1954 next_row_id: self.next_row_id.load(Ordering::SeqCst),
1955 disk_bytes,
1956 }
1957 }
1958
1959 pub fn wal(&self) -> &Arc<TxnWal> {
1961 &self.wal
1962 }
1963}
1964
1965#[derive(Debug, Clone)]
1967pub struct LscsStats {
1968 pub active_memtable_bytes: usize,
1970 pub immutable_memtables: usize,
1972 pub level_row_counts: Vec<u64>,
1974 pub next_row_id: u64,
1976 pub disk_bytes: u64,
1978}
1979
1980#[cfg(test)]
1981mod tests {
1982 use super::*;
1983 use tempfile::tempdir;
1984
1985 fn test_schema() -> TableSchema {
1986 TableSchema {
1987 name: "users".to_string(),
1988 columns: vec![
1989 ColumnDef {
1990 name: "id".to_string(),
1991 col_type: ColumnType::UInt64,
1992 nullable: false,
1993 },
1994 ColumnDef {
1995 name: "name".to_string(),
1996 col_type: ColumnType::Text,
1997 nullable: false,
1998 },
1999 ColumnDef {
2000 name: "score".to_string(),
2001 col_type: ColumnType::Float64,
2002 nullable: true,
2003 },
2004 ],
2005 }
2006 }
2007
2008 #[test]
2009 fn test_columnar_memtable_insert() {
2010 let schema = test_schema();
2011 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2012
2013 let id: u64 = 1;
2014 let name = "Alice";
2015 let score: f64 = 95.5;
2016
2017 memtable
2018 .insert(
2019 1,
2020 &[
2021 Some(&id.to_le_bytes()),
2022 Some(name.as_bytes()),
2023 Some(&score.to_le_bytes()),
2024 ],
2025 )
2026 .unwrap();
2027
2028 assert_eq!(memtable.row_count(), 1);
2029 }
2030
2031 #[test]
2032 fn test_columnar_memtable_with_nulls() {
2033 let schema = test_schema();
2034 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2035
2036 let id: u64 = 1;
2037 let name = "Bob";
2038
2039 memtable
2041 .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2042 .unwrap();
2043
2044 assert_eq!(memtable.row_count(), 1);
2045 }
2046
2047 #[test]
2048 fn test_lscs_basic() {
2049 let dir = tempdir().unwrap();
2050 let schema = test_schema();
2051 let config = LscsConfig {
2052 memtable_size: 1024,
2053 ..Default::default()
2054 };
2055
2056 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2057
2058 let id: u64 = 1;
2059 let name = "Charlie";
2060 let score: f64 = 87.2;
2061
2062 let row_id = lscs
2063 .insert(&[
2064 Some(&id.to_le_bytes()),
2065 Some(name.as_bytes()),
2066 Some(&score.to_le_bytes()),
2067 ])
2068 .unwrap();
2069
2070 assert_eq!(row_id, 1);
2071
2072 let stats = lscs.stats();
2073 assert!(stats.active_memtable_bytes > 0);
2074 }
2075
2076 #[test]
2077 fn test_column_group_write() {
2078 let dir = tempfile::tempdir().unwrap();
2079 let schema = TableSchema::new(
2080 "users".to_string(),
2081 vec![
2082 ColumnDef {
2083 name: "id".to_string(),
2084 col_type: ColumnType::UInt64,
2085 nullable: false,
2086 },
2087 ColumnDef {
2088 name: "name".to_string(),
2089 col_type: ColumnType::Text,
2090 nullable: false,
2091 },
2092 ColumnDef {
2093 name: "score".to_string(),
2094 col_type: ColumnType::Float64,
2095 nullable: true,
2096 },
2097 ],
2098 )
2099 .with_mvcc(); let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2102
2103 memtable
2106 .insert(
2107 1,
2108 &[
2109 Some(&1u64.to_le_bytes()), Some(b"Alice"), Some(&95.5f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&0u64.to_le_bytes()), ],
2115 )
2116 .unwrap();
2117
2118 memtable
2120 .insert(
2121 2,
2122 &[
2123 Some(&2u64.to_le_bytes()), Some(b"Bob"), Some(&87.2f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&200u64.to_le_bytes()), ],
2129 )
2130 .unwrap();
2131
2132 let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2133 let file_path = cg.file_path();
2134 assert!(file_path.exists());
2135 assert!(file_path.extension().unwrap() == "sst");
2136
2137 let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2139 assert_eq!(cg_opened.column_offsets.len(), 5); assert!(cg_opened.lsi.is_some());
2143 let lsi = cg_opened.lsi.as_ref().unwrap();
2144 assert!(lsi.stats().num_keys > 0);
2145 }
2146
2147 #[test]
2148 fn test_memtable_get() {
2149 let schema = test_schema();
2150 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2151
2152 let id1: u64 = 1;
2154 let name1 = "Alice";
2155 let score1: f64 = 95.5;
2156 memtable
2157 .insert(
2158 1,
2159 &[
2160 Some(&id1.to_le_bytes()),
2161 Some(name1.as_bytes()),
2162 Some(&score1.to_le_bytes()),
2163 ],
2164 )
2165 .unwrap();
2166
2167 let id2: u64 = 2;
2168 let name2 = "Bob";
2169 memtable
2170 .insert(
2171 2,
2172 &[
2173 Some(&id2.to_le_bytes()),
2174 Some(name2.as_bytes()),
2175 None, ],
2177 )
2178 .unwrap();
2179
2180 let row1 = memtable.get(1).unwrap();
2182 assert_eq!(row1.len(), 3);
2183 assert_eq!(
2184 u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2185 1
2186 );
2187 assert_eq!(
2188 std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2189 "Alice"
2190 );
2191
2192 let row2 = memtable.get(2).unwrap();
2193 assert!(row2[2].is_none()); assert!(memtable.get(999).is_none());
2197 }
2198
2199 #[test]
2200 fn test_memtable_scan_range() {
2201 let schema = test_schema();
2202 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2203
2204 for i in 1..=10 {
2206 memtable
2207 .insert(
2208 i,
2209 &[
2210 Some(&i.to_le_bytes()),
2211 Some(format!("User{}", i).as_bytes()),
2212 Some(&((i as f64) * 10.0).to_le_bytes()),
2213 ],
2214 )
2215 .unwrap();
2216 }
2217
2218 let results = memtable.scan_range(3, 7);
2220 assert_eq!(results.len(), 5);
2221
2222 for (row_id, _) in &results {
2224 assert!(*row_id >= 3 && *row_id <= 7);
2225 }
2226 }
2227
2228 #[test]
2229 fn test_lscs_get() {
2230 let dir = tempdir().unwrap();
2231 let schema = test_schema();
2232 let config = LscsConfig {
2233 memtable_size: 64 * 1024 * 1024,
2234 ..Default::default()
2235 };
2236
2237 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2238
2239 let id: u64 = 42;
2241 let name = "TestUser";
2242 let score: f64 = 99.9;
2243
2244 let row_id = lscs
2245 .insert(&[
2246 Some(&id.to_le_bytes()),
2247 Some(name.as_bytes()),
2248 Some(&score.to_le_bytes()),
2249 ])
2250 .unwrap();
2251
2252 let result = lscs.get(row_id).unwrap();
2254 assert!(result.is_some());
2255
2256 let values = result.unwrap();
2257 assert_eq!(
2258 u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2259 42
2260 );
2261 assert_eq!(
2262 std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2263 "TestUser"
2264 );
2265 }
2266
2267 #[test]
2268 fn test_lscs_fsync() {
2269 let dir = tempdir().unwrap();
2270 let schema = test_schema();
2271 let config = LscsConfig::default();
2272
2273 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2274
2275 for i in 1..=5 {
2277 lscs.insert(&[
2278 Some(&(i as u64).to_le_bytes()),
2279 Some(format!("User{}", i).as_bytes()),
2280 Some(&((i as f64) * 10.0).to_le_bytes()),
2281 ])
2282 .unwrap();
2283 }
2284
2285 lscs.fsync().unwrap();
2287
2288 let result = lscs.get(1).unwrap();
2290 assert!(result.is_some());
2291 }
2292
2293 #[test]
2294 fn test_temperature_tracker_set_threshold() {
2295 let cols = vec!["a".to_string(), "b".to_string(), "c".to_string()];
2296 let tracker = ColumnTemperatureTracker::new(&cols, 10);
2297
2298 assert!((tracker.threshold() - 0.1).abs() < f64::EPSILON);
2300
2301 tracker.set_hot_threshold(0.5);
2303 assert!((tracker.threshold() - 0.5).abs() < f64::EPSILON);
2304
2305 tracker.set_hot_threshold(2.0);
2307 assert!((tracker.threshold() - 1.0).abs() < f64::EPSILON);
2308
2309 tracker.set_hot_threshold(-0.5);
2310 assert!((tracker.threshold() - 0.0).abs() < f64::EPSILON);
2311 }
2312
2313 #[test]
2314 fn test_column_temperature_hot_cold_classification() {
2315 let cols = vec!["hot_col".to_string(), "cold_col".to_string()];
2316 let tracker = ColumnTemperatureTracker::new(&cols, 2);
2317
2318 tracker.set_hot_threshold(0.05);
2320
2321 tracker.record_updates(&["hot_col"]);
2323 tracker.record_updates(&["hot_col"]);
2324
2325 let hot = tracker.get_hot_columns();
2326 let cold = tracker.get_cold_columns();
2327
2328 assert!(hot.contains("hot_col"), "hot_col should be classified as hot");
2330 assert!(cold.contains("cold_col"), "cold_col should be classified as cold");
2332 }
2333}