1use parking_lot::RwLock;
65use serde::{Deserialize, Serialize};
66use sochdb_core::{Result, SochDBError};
67use std::collections::{BTreeMap, HashMap, HashSet};
68use std::path::{Path, PathBuf};
69use std::sync::Arc;
70use std::sync::atomic::{AtomicU64, Ordering};
71
72use crate::txn_wal::TxnWal;
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ColumnTemperature {
84 pub name: String,
86 pub temperature: f64,
88 pub window_updates: u64,
90 pub total_updates: u64,
92 pub last_update_us: u64,
94}
95
96impl ColumnTemperature {
97 pub fn new(name: String) -> Self {
99 Self {
100 name,
101 temperature: 0.0,
102 window_updates: 0,
103 total_updates: 0,
104 last_update_us: 0,
105 }
106 }
107
108 pub fn record_update(&mut self) {
110 self.window_updates += 1;
111 self.total_updates += 1;
112 self.last_update_us = std::time::SystemTime::now()
113 .duration_since(std::time::UNIX_EPOCH)
114 .unwrap()
115 .as_micros() as u64;
116 }
117
118 pub fn update_ema(&mut self, total_window_updates: u64) {
123 const ALPHA: f64 = 0.1;
124
125 let temp_current = if total_window_updates > 0 {
126 self.window_updates as f64 / total_window_updates as f64
127 } else {
128 0.0
129 };
130
131 self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
132 self.window_updates = 0;
133 }
134
135 pub fn is_hot(&self, threshold: f64) -> bool {
137 self.temperature > threshold
138 }
139}
140
141#[derive(Debug)]
143pub struct ColumnTemperatureTracker {
144 columns: RwLock<HashMap<String, ColumnTemperature>>,
146 window_size: u64,
148 window_updates: AtomicU64,
150 hot_threshold: AtomicU64,
152}
153
154impl ColumnTemperatureTracker {
155 pub fn new(column_names: &[String], window_size: u64) -> Self {
157 let mut columns = HashMap::new();
158 for name in column_names {
159 columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
160 }
161 Self {
162 columns: RwLock::new(columns),
163 window_size,
164 window_updates: AtomicU64::new(0),
165 hot_threshold: AtomicU64::new(0.1_f64.to_bits()),
166 }
167 }
168
169 fn threshold(&self) -> f64 {
171 f64::from_bits(self.hot_threshold.load(Ordering::Relaxed))
172 }
173
174 pub fn record_updates(&self, column_names: &[&str]) {
176 let mut cols = self.columns.write();
177 for name in column_names {
178 if let Some(temp) = cols.get_mut(*name) {
179 temp.record_update();
180 }
181 }
182
183 let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
184
185 if total >= self.window_size {
187 self.update_all_ema(&mut cols, total);
188 self.window_updates.store(0, Ordering::SeqCst);
189 }
190 }
191
192 fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
193 for temp in cols.values_mut() {
194 temp.update_ema(total);
195 }
196 }
197
198 pub fn get_hot_columns(&self) -> HashSet<String> {
200 let threshold = self.threshold();
201 let cols = self.columns.read();
202 cols.values()
203 .filter(|t| t.is_hot(threshold))
204 .map(|t| t.name.clone())
205 .collect()
206 }
207
208 pub fn get_cold_columns(&self) -> HashSet<String> {
210 let threshold = self.threshold();
211 let cols = self.columns.read();
212 cols.values()
213 .filter(|t| !t.is_hot(threshold))
214 .map(|t| t.name.clone())
215 .collect()
216 }
217
218 pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
220 self.columns.read().values().cloned().collect()
221 }
222
223 pub fn set_hot_threshold(&self, threshold: f64) {
228 let clamped = threshold.clamp(0.0, 1.0);
229 self.hot_threshold
230 .store(clamped.to_bits(), Ordering::Relaxed);
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
244pub struct ColumnStripeRef {
245 pub level: u32,
247 pub segment_id: u64,
249 pub column_name: String,
251 pub offset: u64,
253 pub length: u64,
255 pub row_count: u64,
257 pub compression: u8,
259}
260
261impl ColumnStripeRef {
262 pub fn new(
264 level: u32,
265 segment_id: u64,
266 column_name: String,
267 offset: u64,
268 length: u64,
269 row_count: u64,
270 ) -> Self {
271 Self {
272 level,
273 segment_id,
274 column_name,
275 offset,
276 length,
277 row_count,
278 compression: 0,
279 }
280 }
281
282 pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
284 Self {
285 level: new_level,
286 segment_id: new_segment_id,
287 column_name: self.column_name.clone(),
288 offset: new_offset,
289 length: self.length,
290 row_count: self.row_count,
291 compression: self.compression,
292 }
293 }
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SegmentDescriptor {
302 pub id: u64,
304 pub level: u32,
306 pub col_refs: HashMap<String, ColumnStripeRef>,
308 pub min_row_id: RowId,
310 pub max_row_id: RowId,
312 pub row_count: u64,
314 pub min_timestamp: u64,
316 pub max_timestamp: u64,
318 pub is_tombstone: bool,
320}
321
322pub type ColumnId = u32;
324
325pub type RowId = u64;
327
328#[derive(Debug, Clone, Copy, PartialEq, Eq)]
330#[repr(u8)]
331pub enum ColumnType {
332 Bool = 0,
333 Int64 = 1,
334 UInt64 = 2,
335 Float64 = 3,
336 Text = 4,
337 Binary = 5,
338 Timestamp = 6,
339}
340
341impl ColumnType {
342 pub fn fixed_size(&self) -> Option<usize> {
344 match self {
345 ColumnType::Bool => Some(1),
346 ColumnType::Int64
347 | ColumnType::UInt64
348 | ColumnType::Float64
349 | ColumnType::Timestamp => Some(8),
350 ColumnType::Text | ColumnType::Binary => None,
351 }
352 }
353
354 pub fn from_byte(b: u8) -> Option<Self> {
356 match b {
357 0 => Some(ColumnType::Bool),
358 1 => Some(ColumnType::Int64),
359 2 => Some(ColumnType::UInt64),
360 3 => Some(ColumnType::Float64),
361 4 => Some(ColumnType::Text),
362 5 => Some(ColumnType::Binary),
363 6 => Some(ColumnType::Timestamp),
364 _ => None,
365 }
366 }
367}
368
369#[derive(Debug, Clone)]
371pub struct TableSchema {
372 pub name: String,
374 pub columns: Vec<ColumnDef>,
376}
377
378impl TableSchema {
379 pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
380 Self { name, columns }
381 }
382
383 pub fn with_mvcc(mut self) -> Self {
385 if !self.columns.iter().any(|c| c.name == "__txn_start") {
386 self.columns.push(ColumnDef {
387 name: "__txn_start".to_string(),
388 col_type: ColumnType::UInt64,
389 nullable: false,
390 });
391 }
392 if !self.columns.iter().any(|c| c.name == "__txn_end") {
393 self.columns.push(ColumnDef {
394 name: "__txn_end".to_string(),
395 col_type: ColumnType::UInt64,
396 nullable: false, });
398 }
399 self
400 }
401}
402
403#[derive(Debug, Clone)]
405pub struct ColumnDef {
406 pub name: String,
408 pub col_type: ColumnType,
410 pub nullable: bool,
412}
413
414#[derive(Debug)]
416struct ColumnBuffer {
417 col_type: ColumnType,
419 data: Vec<u8>,
421 nulls: Vec<u8>,
423 offsets: Option<Vec<u32>>,
425 row_count: u64,
427}
428
429impl ColumnBuffer {
430 fn new(col_type: ColumnType) -> Self {
431 Self {
432 col_type,
433 data: Vec::new(),
434 nulls: Vec::new(),
435 offsets: if col_type.fixed_size().is_none() {
436 Some(vec![0]) } else {
438 None
439 },
440 row_count: 0,
441 }
442 }
443
444 fn append(&mut self, value: Option<&[u8]>) {
446 let bit_idx = self.row_count as usize;
448 let byte_idx = bit_idx / 8;
449 let bit_offset = bit_idx % 8;
450
451 while self.nulls.len() <= byte_idx {
452 self.nulls.push(0);
453 }
454
455 if let Some(data) = value {
456 self.nulls[byte_idx] |= 1 << bit_offset;
458
459 self.data.extend_from_slice(data);
461
462 if let Some(offsets) = &mut self.offsets {
464 offsets.push(self.data.len() as u32);
465 }
466 } else if let Some(offsets) = &mut self.offsets {
467 let last = *offsets.last().unwrap();
469 offsets.push(last);
470 }
471
472 self.row_count += 1;
473 }
474
475 fn is_null(&self, row_idx: u64) -> bool {
477 if row_idx >= self.row_count {
478 return true; }
480 let byte_idx = (row_idx / 8) as usize;
481 let bit_offset = (row_idx % 8) as u8;
482
483 if byte_idx >= self.nulls.len() {
484 return true;
485 }
486
487 (self.nulls[byte_idx] & (1 << bit_offset)) == 0
488 }
489
490 fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
493 if row_idx >= self.row_count || self.is_null(row_idx) {
494 return None;
495 }
496
497 if let Some(fixed_size) = self.col_type.fixed_size() {
498 let start = (row_idx as usize) * fixed_size;
500 let end = start + fixed_size;
501 if end <= self.data.len() {
502 Some(self.data[start..end].to_vec())
503 } else {
504 None
505 }
506 } else {
507 if let Some(offsets) = &self.offsets {
509 let start = offsets[row_idx as usize] as usize;
510 let end = offsets[(row_idx + 1) as usize] as usize;
511 if end <= self.data.len() {
512 Some(self.data[start..end].to_vec())
513 } else {
514 None
515 }
516 } else {
517 None
518 }
519 }
520 }
521
522 fn memory_bytes(&self) -> usize {
524 self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
525 }
526}
527
528#[derive(Debug)]
530pub struct ColumnarMemtable {
531 schema: TableSchema,
533 columns: Vec<RwLock<ColumnBuffer>>,
535 row_ids: RwLock<BTreeMap<RowId, u64>>,
537 row_idx_to_id: RwLock<Vec<RowId>>,
539 next_row_idx: AtomicU64,
541 bytes_written: AtomicU64,
543 size_limit: usize,
545}
546
547impl ColumnarMemtable {
548 pub fn new(schema: TableSchema, size_limit: usize) -> Self {
550 let columns = schema
551 .columns
552 .iter()
553 .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
554 .collect();
555
556 Self {
557 schema,
558 columns,
559 row_ids: RwLock::new(BTreeMap::new()),
560 row_idx_to_id: RwLock::new(Vec::new()),
561 next_row_idx: AtomicU64::new(0),
562 bytes_written: AtomicU64::new(0),
563 size_limit,
564 }
565 }
566
567 pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
571 if values.len() != self.schema.columns.len() {
572 return Err(SochDBError::InvalidData(format!(
573 "Expected {} columns, got {}",
574 self.schema.columns.len(),
575 values.len()
576 )));
577 }
578
579 let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
580
581 let mut bytes = 0usize;
583 for (i, value) in values.iter().enumerate() {
584 let mut col = self.columns[i].write();
585 if let Some(data) = value {
586 bytes += data.len();
587 }
588 col.append(*value);
589 }
590
591 {
593 let mut ids = self.row_ids.write();
594 ids.insert(row_id, row_idx);
595 }
596 {
597 let mut idx_to_id = self.row_idx_to_id.write();
598 while idx_to_id.len() <= row_idx as usize {
600 idx_to_id.push(0); }
602 idx_to_id[row_idx as usize] = row_id;
603 }
604
605 self.bytes_written
606 .fetch_add(bytes as u64, Ordering::Relaxed);
607
608 Ok(())
609 }
610
611 pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
614 let row_ids = self.row_ids.read();
616 let row_idx = *row_ids.get(&row_id)?;
617 drop(row_ids);
618
619 let mut values = Vec::with_capacity(self.columns.len());
621 for col in &self.columns {
622 let col_buf = col.read();
623 values.push(col_buf.get(row_idx));
624 }
625
626 Some(values)
627 }
628
629 pub fn get_columns(
631 &self,
632 row_id: RowId,
633 col_indices: &[usize],
634 ) -> Option<Vec<Option<Vec<u8>>>> {
635 let row_ids = self.row_ids.read();
637 let row_idx = *row_ids.get(&row_id)?;
638 drop(row_ids);
639
640 let mut values = Vec::with_capacity(col_indices.len());
642 for &col_idx in col_indices {
643 if col_idx < self.columns.len() {
644 let col_buf = self.columns[col_idx].read();
645 values.push(col_buf.get(row_idx));
646 } else {
647 values.push(None);
648 }
649 }
650
651 Some(values)
652 }
653
654 pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
656 let row_ids = self.row_ids.read();
657 let mut results = Vec::new();
658
659 for (&row_id, &row_idx) in row_ids.range(start..=end) {
660 let mut values = Vec::with_capacity(self.columns.len());
661 for col in &self.columns {
662 let col_buf = col.read();
663 values.push(col_buf.get(row_idx));
664 }
665 results.push((row_id, values));
666 }
667
668 results
669 }
670
671 pub fn is_full(&self) -> bool {
673 self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
674 }
675
676 pub fn row_count(&self) -> u64 {
678 self.next_row_idx.load(Ordering::SeqCst)
679 }
680
681 pub fn memory_bytes(&self) -> usize {
683 self.columns.iter().map(|c| c.read().memory_bytes()).sum()
684 }
685
686 pub fn schema(&self) -> &TableSchema {
688 &self.schema
689 }
690}
691
692use sochdb_core::learned_index::LearnedSparseIndex;
693
694#[derive(Debug, Clone, Serialize, Deserialize)]
696pub struct ColumnIndex {
697 pub offset: u64,
699 pub length: u64,
701 pub compression: u8,
703}
704
705#[derive(Debug)]
707#[allow(dead_code)]
708pub struct ColumnGroup {
709 path: PathBuf,
711 schema: TableSchema,
713 level: u32,
715 sequence: u64,
717 row_count: u64,
719 min_timestamp: u64,
721 max_timestamp: u64,
723 column_offsets: BTreeMap<String, ColumnIndex>,
725 lsi: Option<LearnedSparseIndex>,
727}
728
729impl ColumnGroup {
730 const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
732 const VERSION: u32 = 1;
733
734 #[allow(clippy::too_many_arguments)]
736 pub fn new(
737 path: PathBuf,
738 schema: TableSchema,
739 level: u32,
740 sequence: u64,
741 row_count: u64,
742 min_timestamp: u64,
743 max_timestamp: u64,
744 column_offsets: BTreeMap<String, ColumnIndex>,
745 lsi: Option<LearnedSparseIndex>,
746 ) -> Self {
747 Self {
748 path,
749 schema,
750 level,
751 sequence,
752 row_count,
753 min_timestamp,
754 max_timestamp,
755 column_offsets,
756 lsi,
757 }
758 }
759
760 pub fn from_memtable(
762 base_path: &Path,
763 memtable: &ColumnarMemtable,
764 level: u32,
765 sequence: u64,
766 ) -> Result<Self> {
767 use byteorder::{LittleEndian, WriteBytesExt};
768 use std::fs::File;
769 use std::io::{BufWriter, Seek, Write};
770
771 let file_name = format!("L{}_seq{}.sst", level, sequence);
773 let file_path = base_path.join(&file_name);
774 let file = File::create(&file_path)?;
775 let mut writer = BufWriter::new(file);
776
777 writer.write_all(&Self::MAGIC)?;
779 writer.write_u32::<LittleEndian>(Self::VERSION)?;
780
781 let mut column_offsets = BTreeMap::new();
782 let mut min_ts = u64::MAX;
783 let mut max_ts = 0u64;
784
785 for (i, col_lock) in memtable.columns.iter().enumerate() {
787 let col = col_lock.read();
788 let col_def = &memtable.schema.columns[i];
789
790 if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
792 let mut offset = 0;
794 let row_count = col.row_count as usize;
795 for row_idx in 0..row_count {
796 let byte_idx = row_idx / 8;
798 let bit_idx = row_idx % 8;
799 let is_null =
800 byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
801
802 if !is_null && offset + 8 <= col.data.len() {
803 let ts = u64::from_le_bytes(
804 col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
805 );
806 min_ts = min_ts.min(ts);
807 max_ts = max_ts.max(ts);
808 }
809 offset += 8;
810 }
811 }
812
813 let start_offset = writer.stream_position()?;
814
815 writer.write_u8(col.col_type as u8)?;
817 writer.write_u64::<LittleEndian>(col.row_count)?;
818
819 writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
821 writer.write_all(&col.nulls)?;
822
823 if let Some(offsets) = &col.offsets {
825 writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
826 for &off in offsets {
827 writer.write_u32::<LittleEndian>(off)?;
828 }
829 }
830
831 writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
833 writer.write_all(&col.data)?;
834
835 let end_offset = writer.stream_position()?;
836
837 column_offsets.insert(
838 col_def.name.clone(),
839 ColumnIndex {
840 offset: start_offset,
841 length: end_offset - start_offset,
842 compression: 0, },
844 );
845 }
846
847 let row_ids = memtable.row_ids.read();
849 let keys: Vec<u64> = row_ids.keys().cloned().collect();
850 let lsi = LearnedSparseIndex::build(&keys);
851
852 let footer_start = writer.stream_position()?;
854
855 let offsets_bytes = bincode::serialize(&column_offsets)
857 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
858 writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
859 writer.write_all(&offsets_bytes)?;
860
861 let lsi_bytes =
863 bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
864 writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
865 writer.write_all(&lsi_bytes)?;
866
867 writer.write_u64::<LittleEndian>(footer_start)?;
869 writer.write_all(&Self::MAGIC)?;
870
871 writer.flush()?;
872
873 if min_ts == u64::MAX || max_ts == 0 {
875 let now = std::time::SystemTime::now()
876 .duration_since(std::time::UNIX_EPOCH)
877 .unwrap()
878 .as_micros() as u64;
879 min_ts = now;
880 max_ts = now;
881 }
882
883 Ok(Self {
884 path: file_path,
885 schema: memtable.schema.clone(),
886 level,
887 sequence,
888 row_count: memtable.row_count(),
889 min_timestamp: min_ts,
890 max_timestamp: max_ts,
891 column_offsets,
892 lsi: Some(lsi),
893 })
894 }
895
896 pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
898 use byteorder::{LittleEndian, ReadBytesExt};
899 use std::fs::File;
900 use std::io::{Read, Seek, SeekFrom};
901
902 let mut file = File::open(&path)?;
903 let file_len = file.metadata()?.len();
904
905 if file_len < 12 {
906 return Err(SochDBError::Corruption("File too short".to_string()));
908 }
909
910 file.seek(SeekFrom::End(-12))?;
912 let footer_offset = file.read_u64::<LittleEndian>()?;
913 let mut magic = [0u8; 4];
914 file.read_exact(&mut magic)?;
915
916 if magic != Self::MAGIC {
917 return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
918 }
919
920 file.seek(SeekFrom::Start(footer_offset))?;
922
923 let offsets_len = file.read_u64::<LittleEndian>()?;
925 let mut offsets_bytes = vec![0u8; offsets_len as usize];
926 file.read_exact(&mut offsets_bytes)?;
927 let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
928 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
929
930 let lsi_len = file.read_u64::<LittleEndian>()?;
932 let mut lsi_bytes = vec![0u8; lsi_len as usize];
933 file.read_exact(&mut lsi_bytes)?;
934 let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
935 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
936
937 Ok(Self {
938 path,
939 schema,
940 level,
941 sequence,
942 row_count: 0, min_timestamp: 0,
944 max_timestamp: 0,
945 column_offsets,
946 lsi: Some(lsi),
947 })
948 }
949
950 pub fn file_path(&self) -> &Path {
952 &self.path
953 }
954
955 pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
957 self.column_offsets.get(col_name)
958 }
959
960 pub fn level(&self) -> u32 {
962 self.level
963 }
964
965 pub fn row_count(&self) -> u64 {
967 self.row_count
968 }
969}
970
971#[derive(Debug, Clone, Default)]
977pub struct CompactionStats {
978 pub compactions_total: u64,
980 pub l0_compactions: u64,
982 pub bytes_read: u64,
984 pub bytes_written: u64,
986 pub hot_column_compactions: u64,
988 pub cold_column_refs_preserved: u64,
990 pub estimated_wa_reduction: f64,
992 pub last_compaction_duration_us: u64,
994}
995
996impl CompactionStats {
997 pub fn write_amplification(&self) -> f64 {
999 if self.bytes_read == 0 {
1000 1.0
1001 } else {
1002 self.bytes_written as f64 / self.bytes_read as f64
1003 }
1004 }
1005}
1006
1007#[derive(Debug, Clone, Default)]
1009pub struct LscsRecoveryStats {
1010 pub transactions_recovered: usize,
1012 pub rows_recovered: usize,
1014 pub max_row_id: u64,
1016}
1017
1018#[derive(Debug, Clone)]
1020pub struct LscsConfig {
1021 pub memtable_size: usize,
1023 pub num_levels: usize,
1025 pub level_ratio: usize,
1027 pub l0_compaction_threshold: usize,
1029 pub hot_threshold: f64,
1031 pub temperature_window_size: u64,
1033}
1034
1035impl Default for LscsConfig {
1036 fn default() -> Self {
1037 Self {
1038 memtable_size: 64 * 1024 * 1024, num_levels: 7,
1040 level_ratio: 10,
1041 l0_compaction_threshold: 4,
1042 hot_threshold: 0.1, temperature_window_size: 1000, }
1045 }
1046}
1047
1048pub struct Lscs {
1050 config: LscsConfig,
1052 path: PathBuf,
1054 schema: TableSchema,
1056 wal: Arc<TxnWal>,
1058 active_memtable: RwLock<ColumnarMemtable>,
1060 immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1062 column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1064 segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1066 temperature_tracker: Arc<ColumnTemperatureTracker>,
1068 next_sequence: AtomicU64,
1070 next_row_id: AtomicU64,
1072 compaction_stats: RwLock<CompactionStats>,
1074}
1075
1076impl Lscs {
1077 pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1079 std::fs::create_dir_all(&path)?;
1080
1081 let wal_path = path.join("wal.log");
1082 let wal = Arc::new(TxnWal::new(&wal_path)?);
1083
1084 let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1085
1086 let mut column_groups = Vec::with_capacity(config.num_levels);
1087 for _ in 0..config.num_levels {
1088 column_groups.push(Vec::new());
1089 }
1090
1091 let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1093 let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1094 &column_names,
1095 config.temperature_window_size,
1096 ));
1097
1098 Ok(Self {
1099 config,
1100 path,
1101 schema,
1102 wal,
1103 active_memtable: RwLock::new(active_memtable),
1104 immutable_memtables: RwLock::new(Vec::new()),
1105 column_groups: RwLock::new(column_groups),
1106 segment_descriptors: RwLock::new(HashMap::new()),
1107 temperature_tracker,
1108 next_sequence: AtomicU64::new(0),
1109 next_row_id: AtomicU64::new(1),
1110 compaction_stats: RwLock::new(CompactionStats::default()),
1111 })
1112 }
1113
1114 pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1127 let lscs = Self::new(path, schema, config)?;
1128 let stats = lscs.recover()?;
1129
1130 if stats.rows_recovered > 0 {
1131 eprintln!(
1132 "LSCS Recovery: restored {} rows from {} transactions",
1133 stats.rows_recovered, stats.transactions_recovered
1134 );
1135 }
1136
1137 Ok(lscs)
1138 }
1139
1140 pub fn recover(&self) -> Result<LscsRecoveryStats> {
1144 let (writes, txn_count) = self.wal.replay_for_recovery()?;
1145
1146 if writes.is_empty() {
1147 return Ok(LscsRecoveryStats::default());
1148 }
1149
1150 let mut max_row_id: u64 = 0;
1151 let mut rows_recovered = 0usize;
1152
1153 for (key, value) in &writes {
1155 if key.len() >= 8 {
1157 let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1158 if row_id > max_row_id {
1159 max_row_id = row_id;
1160 }
1161
1162 if let Ok(row_values) = Self::deserialize_row(value) {
1164 let value_refs: Vec<Option<&[u8]>> =
1165 row_values.iter().map(|v| v.as_deref()).collect();
1166
1167 let memtable = self.active_memtable.read();
1168 if memtable.insert(row_id, &value_refs).is_ok() {
1169 rows_recovered += 1;
1170 }
1171 }
1172 }
1173 }
1174
1175 if max_row_id > 0 {
1177 self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1178 }
1179
1180 Ok(LscsRecoveryStats {
1181 transactions_recovered: txn_count,
1182 rows_recovered,
1183 max_row_id,
1184 })
1185 }
1186
1187 pub fn mark_clean_shutdown(&self) -> Result<()> {
1192 self.fsync()?;
1194
1195 let marker_path = self.path.join(".clean_shutdown");
1197 std::fs::write(&marker_path, b"clean")?;
1198
1199 Ok(())
1203 }
1204
1205 pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1207 let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1208
1209 let txn_id = self.wal.begin_transaction()?;
1211
1212 let key = row_id.to_le_bytes().to_vec();
1214 let value = self.serialize_row(values)?;
1215 self.wal.write(txn_id, key, value)?;
1216 self.wal.commit_transaction(txn_id)?;
1217
1218 let memtable = self.active_memtable.read();
1220 memtable.insert(row_id, values)?;
1221
1222 if memtable.is_full() {
1224 drop(memtable);
1225 self.rotate_memtable()?;
1226 }
1227
1228 Ok(row_id)
1229 }
1230
1231 pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1245 let txn_end_idx = self
1247 .schema
1248 .columns
1249 .iter()
1250 .position(|c| c.name == "__txn_end")
1251 .ok_or_else(|| {
1252 SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1253 })?;
1254
1255 let current = self
1257 .get(row_id)?
1258 .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1259
1260 let mut new_values: Vec<Option<Vec<u8>>> = current;
1262 new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1263
1264 let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1266
1267 let wal_txn_id = self.wal.begin_transaction()?;
1270
1271 let row_data = self.serialize_row(&value_refs)?;
1273 self.wal
1274 .write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1275
1276 self.wal.commit_transaction(wal_txn_id)?;
1278
1279 let memtable = self.active_memtable.read();
1281 memtable.insert(row_id, &value_refs)?;
1282
1283 Ok(())
1284 }
1285
1286 fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1288 use byteorder::{LittleEndian, WriteBytesExt};
1289
1290 let mut buf = Vec::new();
1291 buf.write_u32::<LittleEndian>(values.len() as u32)?;
1292
1293 for value in values {
1294 match value {
1295 Some(data) => {
1296 buf.write_u8(1)?; buf.write_u32::<LittleEndian>(data.len() as u32)?;
1298 buf.extend_from_slice(data);
1299 }
1300 None => {
1301 buf.write_u8(0)?; }
1303 }
1304 }
1305
1306 Ok(buf)
1307 }
1308
1309 #[allow(dead_code)]
1311 fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1312 use byteorder::{LittleEndian, ReadBytesExt};
1313 use std::io::Cursor;
1314
1315 let mut cursor = Cursor::new(data);
1316 let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1317 let mut values = Vec::with_capacity(num_cols);
1318
1319 for _ in 0..num_cols {
1320 let is_non_null = cursor.read_u8()? == 1;
1321 if is_non_null {
1322 let len = cursor.read_u32::<LittleEndian>()? as usize;
1323 let pos = cursor.position() as usize;
1324 let value = data[pos..pos + len].to_vec();
1325 cursor.set_position((pos + len) as u64);
1326 values.push(Some(value));
1327 } else {
1328 values.push(None);
1329 }
1330 }
1331
1332 Ok(values)
1333 }
1334
1335 pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1340 {
1342 let memtable = self.active_memtable.read();
1343 if let Some(values) = memtable.get(row_id) {
1344 return Ok(Some(values));
1345 }
1346 }
1347
1348 {
1350 let immutable = self.immutable_memtables.read();
1351 for memtable in immutable.iter().rev() {
1352 if let Some(values) = memtable.get(row_id) {
1353 return Ok(Some(values));
1354 }
1355 }
1356 }
1357
1358 {
1361 use sochdb_core::learned_index::LookupResult;
1362 let groups = self.column_groups.read();
1363 for level in &*groups {
1364 for group in level.iter().rev() {
1365 if let Some(lsi) = &group.lsi {
1366 let lookup = lsi.lookup(row_id);
1368 match lookup {
1369 LookupResult::Exact(_) | LookupResult::Range { .. } => {
1370 if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1372 return Ok(Some(row));
1373 }
1374 }
1375 LookupResult::NotFound => {
1376 continue;
1378 }
1379 }
1380 }
1381 }
1382 }
1383 }
1384
1385 Ok(None)
1386 }
1387
1388 fn read_row_from_sstable(
1390 &self,
1391 group: &ColumnGroup,
1392 row_id: RowId,
1393 ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1394 use byteorder::{LittleEndian, ReadBytesExt};
1395 use std::fs::File;
1396 use std::io::{BufReader, Read, Seek, SeekFrom};
1397
1398 let file = File::open(group.file_path())?;
1399 let mut reader = BufReader::new(file);
1400
1401 let mut values = Vec::new();
1402
1403 for (col_name, col_idx) in &group.column_offsets {
1405 reader.seek(SeekFrom::Start(col_idx.offset))?;
1406
1407 let col_type = reader.read_u8()?;
1409 let row_count = reader.read_u64::<LittleEndian>()?;
1410
1411 if row_id >= row_count {
1412 values.push(None);
1413 continue;
1414 }
1415
1416 let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1418 let mut nulls = vec![0u8; nulls_len];
1419 reader.read_exact(&mut nulls)?;
1420
1421 let byte_idx = (row_id / 8) as usize;
1423 let bit_offset = (row_id % 8) as u8;
1424 let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1425
1426 if is_null {
1427 values.push(None);
1428 continue;
1429 }
1430
1431 let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1433 if let Some(fixed_size) = col_type.fixed_size() {
1434 let offsets_section = reader.stream_position()?;
1436 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1437 let _ = data_len;
1438
1439 let row_offset = (row_id as usize) * fixed_size;
1441 reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1442
1443 let mut value = vec![0u8; fixed_size];
1444 reader.read_exact(&mut value)?;
1445 values.push(Some(value));
1446 } else {
1447 let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1449 let mut offsets = vec![0u32; offsets_count];
1450 for offset in offsets.iter_mut().take(offsets_count) {
1451 *offset = reader.read_u32::<LittleEndian>()?;
1452 }
1453
1454 if (row_id as usize + 1) >= offsets.len() {
1455 values.push(None);
1456 continue;
1457 }
1458
1459 let start = offsets[row_id as usize] as usize;
1460 let end = offsets[(row_id + 1) as usize] as usize;
1461
1462 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1464 let data_start = reader.stream_position()?;
1465
1466 if end <= data_len {
1467 reader.seek(SeekFrom::Start(data_start + start as u64))?;
1468 let mut value = vec![0u8; end - start];
1469 reader.read_exact(&mut value)?;
1470 values.push(Some(value));
1471 } else {
1472 values.push(None);
1473 }
1474 }
1475 let _ = col_name; }
1477
1478 if values.is_empty() {
1479 Ok(None)
1480 } else {
1481 Ok(Some(values))
1482 }
1483 }
1484
1485 pub fn fsync(&self) -> Result<()> {
1493 self.wal.sync()?;
1495
1496 let memtable = self.active_memtable.read();
1498 let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1499 drop(memtable);
1500
1501 if should_flush {
1502 self.rotate_memtable()?;
1504 self.flush()?;
1505 }
1506
1507 Ok(())
1508 }
1509
1510 fn rotate_memtable(&self) -> Result<()> {
1512 let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1513
1514 let old_memtable = {
1515 let mut active = self.active_memtable.write();
1516 std::mem::replace(&mut *active, new_memtable)
1517 };
1518
1519 let mut immutable = self.immutable_memtables.write();
1520 immutable.push(old_memtable);
1521
1522 if immutable.len() >= 2 {
1524 drop(immutable); self.flush()?;
1527 }
1528
1529 Ok(())
1530 }
1531
1532 pub fn flush(&self) -> Result<()> {
1534 let memtables = {
1535 let mut immutable = self.immutable_memtables.write();
1536 std::mem::take(&mut *immutable)
1537 };
1538
1539 for memtable in memtables {
1540 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1541 let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1542
1543 let mut groups = self.column_groups.write();
1544 groups[0].push(column_group);
1545 }
1546
1547 let groups = self.column_groups.read();
1549 if groups[0].len() >= self.config.l0_compaction_threshold {
1550 drop(groups);
1551 self.compact_l0()?;
1552 }
1553
1554 Ok(())
1555 }
1556
1557 fn compact_l0(&self) -> Result<()> {
1569 let start_time = std::time::Instant::now();
1570
1571 let hot_columns = self.temperature_tracker.get_hot_columns();
1573 let cold_columns = self.temperature_tracker.get_cold_columns();
1574
1575 let total_columns = self.schema.columns.len();
1576 let hot_fraction = if total_columns > 0 {
1577 hot_columns.len() as f64 / total_columns as f64
1578 } else {
1579 1.0
1580 };
1581
1582 let l0_segments: Vec<ColumnGroup> = {
1584 let mut groups = self.column_groups.write();
1585 std::mem::take(&mut groups[0])
1586 };
1587
1588 if l0_segments.is_empty() {
1589 return Ok(());
1590 }
1591
1592 let mut bytes_read = 0u64;
1593 let mut bytes_written = 0u64;
1594 let mut cold_refs_preserved = 0u64;
1595
1596 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1597
1598 let merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1601 let segment_refs: Vec<&ColumnGroup> = l0_segments.iter().collect();
1602
1603 let hot_col_refs = if !hot_columns.is_empty() {
1604 let refs =
1605 self.selective_merge_hot_columns(&segment_refs, &hot_columns, &merged_path)?;
1606 for (_col, stripe) in &refs {
1608 bytes_written += stripe.length;
1609 }
1610 refs
1611 } else {
1612 HashMap::new()
1613 };
1614
1615 let mut col_refs = hot_col_refs;
1618 let mut total_row_count = 0u64;
1619 let min_row_id = u64::MAX;
1620 let max_row_id = 0u64;
1621
1622 for segment in &l0_segments {
1623 bytes_read += segment.row_count * 100; total_row_count += segment.row_count;
1625
1626 for col_name in &hot_columns {
1628 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1629 bytes_read += col_idx.length;
1630 }
1631 }
1632
1633 for col_name in &cold_columns {
1635 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1636 let stripe_ref = ColumnStripeRef::new(
1637 segment.level,
1638 segment.sequence,
1639 col_name.clone(),
1640 col_idx.offset,
1641 col_idx.length,
1642 segment.row_count,
1643 );
1644 col_refs.insert(col_name.clone(), stripe_ref);
1645 cold_refs_preserved += 1;
1646 }
1647 }
1648 }
1649
1650 let segment_desc = SegmentDescriptor {
1652 id: sequence,
1653 level: 1,
1654 col_refs,
1655 min_row_id,
1656 max_row_id,
1657 row_count: total_row_count,
1658 min_timestamp: 0,
1659 max_timestamp: std::time::SystemTime::now()
1660 .duration_since(std::time::UNIX_EPOCH)
1661 .unwrap()
1662 .as_micros() as u64,
1663 is_tombstone: false,
1664 };
1665
1666 {
1668 let mut descriptors = self.segment_descriptors.write();
1669 descriptors.insert(sequence, segment_desc);
1670 }
1671
1672 {
1674 let mut stats = self.compaction_stats.write();
1675 stats.compactions_total += 1;
1676 stats.l0_compactions += 1;
1677 stats.bytes_read += bytes_read;
1678 stats.bytes_written += bytes_written;
1679 stats.cold_column_refs_preserved += cold_refs_preserved;
1680 stats.hot_column_compactions += 1;
1681 stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1682 stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1683 }
1684
1685 for segment in l0_segments {
1688 let _ = segment; }
1690
1691 Ok(())
1692 }
1693
1694 fn selective_merge_hot_columns(
1706 &self,
1707 segments: &[&ColumnGroup],
1708 hot_columns: &HashSet<String>,
1709 output_path: &Path,
1710 ) -> Result<HashMap<String, ColumnStripeRef>> {
1711 use byteorder::{LittleEndian, WriteBytesExt};
1712 use std::fs::File;
1713 use std::io::{BufWriter, Seek, Write};
1714
1715 let mut result = HashMap::new();
1716
1717 let file = File::create(output_path)?;
1719 let mut writer = BufWriter::new(file);
1720
1721 writer.write_all(&ColumnGroup::MAGIC)?;
1723 writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1724
1725 let sequence = self.next_sequence.load(Ordering::SeqCst);
1726
1727 for col_name in hot_columns {
1729 let start_offset = writer.stream_position()?;
1730
1731 let mut merged_data = Vec::new();
1733 let mut row_count = 0u64;
1734
1735 for segment in segments {
1736 if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1737 merged_data.extend_from_slice(&[0u8; 0]); row_count += segment.row_count;
1741 }
1742 }
1743
1744 writer.write_u64::<LittleEndian>(row_count)?;
1747 writer.write_all(&merged_data)?;
1748
1749 let end_offset = writer.stream_position()?;
1750
1751 let stripe_ref = ColumnStripeRef::new(
1753 1, sequence,
1755 col_name.clone(),
1756 start_offset,
1757 end_offset - start_offset,
1758 row_count,
1759 );
1760 result.insert(col_name.clone(), stripe_ref);
1761 }
1762
1763 writer.flush()?;
1764 Ok(result)
1765 }
1766
1767 pub fn scan_columns(
1772 &self,
1773 column_names: &[&str],
1774 row_range: Option<(RowId, RowId)>,
1775 ) -> Result<Vec<Vec<u8>>> {
1776 let mut results = Vec::new();
1777
1778 let descriptors = self.segment_descriptors.read();
1780
1781 for (_seg_id, descriptor) in descriptors.iter() {
1782 if let Some((min, max)) = row_range
1784 && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1785 {
1786 continue;
1787 }
1788
1789 for col_name in column_names {
1791 if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1792 let data = self.read_column_stripe(stripe_ref)?;
1794 results.push(data);
1795 }
1796 }
1797 }
1798
1799 Ok(results)
1800 }
1801
1802 fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1804 use std::fs::File;
1805 use std::io::{Read, Seek, SeekFrom};
1806
1807 let file_path = self.path.join(format!(
1809 "L{}_seq{}.sst",
1810 stripe_ref.level, stripe_ref.segment_id
1811 ));
1812
1813 let mut file = File::open(&file_path)?;
1814 file.seek(SeekFrom::Start(stripe_ref.offset))?;
1815
1816 let mut data = vec![0u8; stripe_ref.length as usize];
1817 file.read_exact(&mut data)?;
1818
1819 Ok(data)
1820 }
1821
1822 pub fn compaction_stats(&self) -> CompactionStats {
1824 self.compaction_stats.read().clone()
1825 }
1826
1827 pub fn compact(&self) -> Result<()> {
1831 self.compact_l0()
1832 }
1833
1834 pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1836 self.temperature_tracker.get_all_temperatures()
1837 }
1838
1839 #[allow(clippy::type_complexity)]
1844 pub fn scan_range(
1845 &self,
1846 start: RowId,
1847 end: RowId,
1848 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1849 let mut results = Vec::new();
1850 let mut seen = std::collections::HashSet::new();
1851
1852 {
1854 let memtable = self.active_memtable.read();
1855 for (row_id, values) in memtable.scan_range(start, end) {
1856 if seen.insert(row_id) {
1857 results.push((row_id, values));
1858 }
1859 }
1860 }
1861
1862 {
1864 let immutable = self.immutable_memtables.read();
1865 for memtable in immutable.iter().rev() {
1866 for (row_id, values) in memtable.scan_range(start, end) {
1867 if seen.insert(row_id) {
1868 results.push((row_id, values));
1869 }
1870 }
1871 }
1872 }
1873
1874 results.sort_by_key(|(id, _)| *id);
1879
1880 Ok(results)
1881 }
1882
1883 #[allow(clippy::type_complexity)]
1887 pub fn scan_columns_range(
1888 &self,
1889 start: RowId,
1890 end: RowId,
1891 col_indices: &[usize],
1892 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1893 let mut results = Vec::new();
1894 let mut seen = std::collections::HashSet::new();
1895
1896 {
1898 let memtable = self.active_memtable.read();
1899 let row_ids = memtable.row_ids.read();
1900
1901 for (&row_id, _) in row_ids.range(start..=end) {
1902 if seen.insert(row_id)
1903 && let Some(values) = memtable.get_columns(row_id, col_indices)
1904 {
1905 results.push((row_id, values));
1906 }
1907 }
1908 }
1909
1910 {
1912 let immutable = self.immutable_memtables.read();
1913 for memtable in immutable.iter().rev() {
1914 let row_ids = memtable.row_ids.read();
1915 for (&row_id, _) in row_ids.range(start..=end) {
1916 if seen.insert(row_id)
1917 && let Some(values) = memtable.get_columns(row_id, col_indices)
1918 {
1919 results.push((row_id, values));
1920 }
1921 }
1922 }
1923 }
1924
1925 results.sort_by_key(|(id, _)| *id);
1927
1928 Ok(results)
1929 }
1930
1931 pub fn stats(&self) -> LscsStats {
1933 let active = self.active_memtable.read();
1934 let immutable = self.immutable_memtables.read();
1935 let groups = self.column_groups.read();
1936
1937 let mut level_sizes = vec![0u64; self.config.num_levels];
1938 let mut disk_bytes = 0u64;
1939
1940 for (i, level) in groups.iter().enumerate() {
1941 for group in level {
1942 level_sizes[i] += group.row_count;
1943 if let Ok(metadata) = std::fs::metadata(&group.path) {
1945 disk_bytes += metadata.len();
1946 }
1947 }
1948 }
1949
1950 LscsStats {
1951 active_memtable_bytes: active.memory_bytes(),
1952 immutable_memtables: immutable.len(),
1953 level_row_counts: level_sizes,
1954 next_row_id: self.next_row_id.load(Ordering::SeqCst),
1955 disk_bytes,
1956 }
1957 }
1958
1959 pub fn wal(&self) -> &Arc<TxnWal> {
1961 &self.wal
1962 }
1963}
1964
1965#[derive(Debug, Clone)]
1967pub struct LscsStats {
1968 pub active_memtable_bytes: usize,
1970 pub immutable_memtables: usize,
1972 pub level_row_counts: Vec<u64>,
1974 pub next_row_id: u64,
1976 pub disk_bytes: u64,
1978}
1979
1980#[cfg(test)]
1981mod tests {
1982 use super::*;
1983 use tempfile::tempdir;
1984
1985 fn test_schema() -> TableSchema {
1986 TableSchema {
1987 name: "users".to_string(),
1988 columns: vec![
1989 ColumnDef {
1990 name: "id".to_string(),
1991 col_type: ColumnType::UInt64,
1992 nullable: false,
1993 },
1994 ColumnDef {
1995 name: "name".to_string(),
1996 col_type: ColumnType::Text,
1997 nullable: false,
1998 },
1999 ColumnDef {
2000 name: "score".to_string(),
2001 col_type: ColumnType::Float64,
2002 nullable: true,
2003 },
2004 ],
2005 }
2006 }
2007
2008 #[test]
2009 fn test_columnar_memtable_insert() {
2010 let schema = test_schema();
2011 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2012
2013 let id: u64 = 1;
2014 let name = "Alice";
2015 let score: f64 = 95.5;
2016
2017 memtable
2018 .insert(
2019 1,
2020 &[
2021 Some(&id.to_le_bytes()),
2022 Some(name.as_bytes()),
2023 Some(&score.to_le_bytes()),
2024 ],
2025 )
2026 .unwrap();
2027
2028 assert_eq!(memtable.row_count(), 1);
2029 }
2030
2031 #[test]
2032 fn test_columnar_memtable_with_nulls() {
2033 let schema = test_schema();
2034 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2035
2036 let id: u64 = 1;
2037 let name = "Bob";
2038
2039 memtable
2041 .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2042 .unwrap();
2043
2044 assert_eq!(memtable.row_count(), 1);
2045 }
2046
2047 #[test]
2048 fn test_lscs_basic() {
2049 let dir = tempdir().unwrap();
2050 let schema = test_schema();
2051 let config = LscsConfig {
2052 memtable_size: 1024,
2053 ..Default::default()
2054 };
2055
2056 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2057
2058 let id: u64 = 1;
2059 let name = "Charlie";
2060 let score: f64 = 87.2;
2061
2062 let row_id = lscs
2063 .insert(&[
2064 Some(&id.to_le_bytes()),
2065 Some(name.as_bytes()),
2066 Some(&score.to_le_bytes()),
2067 ])
2068 .unwrap();
2069
2070 assert_eq!(row_id, 1);
2071
2072 let stats = lscs.stats();
2073 assert!(stats.active_memtable_bytes > 0);
2074 }
2075
2076 #[test]
2077 fn test_column_group_write() {
2078 let dir = tempfile::tempdir().unwrap();
2079 let schema = TableSchema::new(
2080 "users".to_string(),
2081 vec![
2082 ColumnDef {
2083 name: "id".to_string(),
2084 col_type: ColumnType::UInt64,
2085 nullable: false,
2086 },
2087 ColumnDef {
2088 name: "name".to_string(),
2089 col_type: ColumnType::Text,
2090 nullable: false,
2091 },
2092 ColumnDef {
2093 name: "score".to_string(),
2094 col_type: ColumnType::Float64,
2095 nullable: true,
2096 },
2097 ],
2098 )
2099 .with_mvcc(); let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2102
2103 memtable
2106 .insert(
2107 1,
2108 &[
2109 Some(&1u64.to_le_bytes()), Some(b"Alice"), Some(&95.5f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&0u64.to_le_bytes()), ],
2115 )
2116 .unwrap();
2117
2118 memtable
2120 .insert(
2121 2,
2122 &[
2123 Some(&2u64.to_le_bytes()), Some(b"Bob"), Some(&87.2f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&200u64.to_le_bytes()), ],
2129 )
2130 .unwrap();
2131
2132 let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2133 let file_path = cg.file_path();
2134 assert!(file_path.exists());
2135 assert!(file_path.extension().unwrap() == "sst");
2136
2137 let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2139 assert_eq!(cg_opened.column_offsets.len(), 5); assert!(cg_opened.lsi.is_some());
2143 let lsi = cg_opened.lsi.as_ref().unwrap();
2144 assert!(lsi.stats().num_keys > 0);
2145 }
2146
2147 #[test]
2148 fn test_memtable_get() {
2149 let schema = test_schema();
2150 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2151
2152 let id1: u64 = 1;
2154 let name1 = "Alice";
2155 let score1: f64 = 95.5;
2156 memtable
2157 .insert(
2158 1,
2159 &[
2160 Some(&id1.to_le_bytes()),
2161 Some(name1.as_bytes()),
2162 Some(&score1.to_le_bytes()),
2163 ],
2164 )
2165 .unwrap();
2166
2167 let id2: u64 = 2;
2168 let name2 = "Bob";
2169 memtable
2170 .insert(
2171 2,
2172 &[
2173 Some(&id2.to_le_bytes()),
2174 Some(name2.as_bytes()),
2175 None, ],
2177 )
2178 .unwrap();
2179
2180 let row1 = memtable.get(1).unwrap();
2182 assert_eq!(row1.len(), 3);
2183 assert_eq!(
2184 u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2185 1
2186 );
2187 assert_eq!(
2188 std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2189 "Alice"
2190 );
2191
2192 let row2 = memtable.get(2).unwrap();
2193 assert!(row2[2].is_none()); assert!(memtable.get(999).is_none());
2197 }
2198
2199 #[test]
2200 fn test_memtable_scan_range() {
2201 let schema = test_schema();
2202 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2203
2204 for i in 1..=10 {
2206 memtable
2207 .insert(
2208 i,
2209 &[
2210 Some(&i.to_le_bytes()),
2211 Some(format!("User{}", i).as_bytes()),
2212 Some(&((i as f64) * 10.0).to_le_bytes()),
2213 ],
2214 )
2215 .unwrap();
2216 }
2217
2218 let results = memtable.scan_range(3, 7);
2220 assert_eq!(results.len(), 5);
2221
2222 for (row_id, _) in &results {
2224 assert!(*row_id >= 3 && *row_id <= 7);
2225 }
2226 }
2227
2228 #[test]
2229 fn test_lscs_get() {
2230 let dir = tempdir().unwrap();
2231 let schema = test_schema();
2232 let config = LscsConfig {
2233 memtable_size: 64 * 1024 * 1024,
2234 ..Default::default()
2235 };
2236
2237 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2238
2239 let id: u64 = 42;
2241 let name = "TestUser";
2242 let score: f64 = 99.9;
2243
2244 let row_id = lscs
2245 .insert(&[
2246 Some(&id.to_le_bytes()),
2247 Some(name.as_bytes()),
2248 Some(&score.to_le_bytes()),
2249 ])
2250 .unwrap();
2251
2252 let result = lscs.get(row_id).unwrap();
2254 assert!(result.is_some());
2255
2256 let values = result.unwrap();
2257 assert_eq!(
2258 u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2259 42
2260 );
2261 assert_eq!(
2262 std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2263 "TestUser"
2264 );
2265 }
2266
2267 #[test]
2268 fn test_lscs_fsync() {
2269 let dir = tempdir().unwrap();
2270 let schema = test_schema();
2271 let config = LscsConfig::default();
2272
2273 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2274
2275 for i in 1..=5 {
2277 lscs.insert(&[
2278 Some(&(i as u64).to_le_bytes()),
2279 Some(format!("User{}", i).as_bytes()),
2280 Some(&((i as f64) * 10.0).to_le_bytes()),
2281 ])
2282 .unwrap();
2283 }
2284
2285 lscs.fsync().unwrap();
2287
2288 let result = lscs.get(1).unwrap();
2290 assert!(result.is_some());
2291 }
2292
2293 #[test]
2294 fn test_temperature_tracker_set_threshold() {
2295 let cols = vec!["a".to_string(), "b".to_string(), "c".to_string()];
2296 let tracker = ColumnTemperatureTracker::new(&cols, 10);
2297
2298 assert!((tracker.threshold() - 0.1).abs() < f64::EPSILON);
2300
2301 tracker.set_hot_threshold(0.5);
2303 assert!((tracker.threshold() - 0.5).abs() < f64::EPSILON);
2304
2305 tracker.set_hot_threshold(2.0);
2307 assert!((tracker.threshold() - 1.0).abs() < f64::EPSILON);
2308
2309 tracker.set_hot_threshold(-0.5);
2310 assert!((tracker.threshold() - 0.0).abs() < f64::EPSILON);
2311 }
2312
2313 #[test]
2314 fn test_column_temperature_hot_cold_classification() {
2315 let cols = vec!["hot_col".to_string(), "cold_col".to_string()];
2316 let tracker = ColumnTemperatureTracker::new(&cols, 2);
2317
2318 tracker.set_hot_threshold(0.05);
2320
2321 tracker.record_updates(&["hot_col"]);
2323 tracker.record_updates(&["hot_col"]);
2324
2325 let hot = tracker.get_hot_columns();
2326 let cold = tracker.get_cold_columns();
2327
2328 assert!(
2330 hot.contains("hot_col"),
2331 "hot_col should be classified as hot"
2332 );
2333 assert!(
2335 cold.contains("cold_col"),
2336 "cold_col should be classified as cold"
2337 );
2338 }
2339}