1use parking_lot::RwLock;
65use serde::{Deserialize, Serialize};
66use std::collections::{BTreeMap, HashMap, HashSet};
67use std::path::{Path, PathBuf};
68use std::sync::Arc;
69use std::sync::atomic::{AtomicU64, Ordering};
70use sochdb_core::{Result, SochDBError};
71
72use crate::txn_wal::TxnWal;
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ColumnTemperature {
84 pub name: String,
86 pub temperature: f64,
88 pub window_updates: u64,
90 pub total_updates: u64,
92 pub last_update_us: u64,
94}
95
96impl ColumnTemperature {
97 pub fn new(name: String) -> Self {
99 Self {
100 name,
101 temperature: 0.0,
102 window_updates: 0,
103 total_updates: 0,
104 last_update_us: 0,
105 }
106 }
107
108 pub fn record_update(&mut self) {
110 self.window_updates += 1;
111 self.total_updates += 1;
112 self.last_update_us = std::time::SystemTime::now()
113 .duration_since(std::time::UNIX_EPOCH)
114 .unwrap()
115 .as_micros() as u64;
116 }
117
118 pub fn update_ema(&mut self, total_window_updates: u64) {
123 const ALPHA: f64 = 0.1;
124
125 let temp_current = if total_window_updates > 0 {
126 self.window_updates as f64 / total_window_updates as f64
127 } else {
128 0.0
129 };
130
131 self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
132 self.window_updates = 0;
133 }
134
135 pub fn is_hot(&self, threshold: f64) -> bool {
137 self.temperature > threshold
138 }
139}
140
141#[derive(Debug)]
143pub struct ColumnTemperatureTracker {
144 columns: RwLock<HashMap<String, ColumnTemperature>>,
146 window_size: u64,
148 window_updates: AtomicU64,
150 hot_threshold: f64,
152}
153
154impl ColumnTemperatureTracker {
155 pub fn new(column_names: &[String], window_size: u64) -> Self {
157 let mut columns = HashMap::new();
158 for name in column_names {
159 columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
160 }
161 Self {
162 columns: RwLock::new(columns),
163 window_size,
164 window_updates: AtomicU64::new(0),
165 hot_threshold: 0.1,
166 }
167 }
168
169 pub fn record_updates(&self, column_names: &[&str]) {
171 let mut cols = self.columns.write();
172 for name in column_names {
173 if let Some(temp) = cols.get_mut(*name) {
174 temp.record_update();
175 }
176 }
177
178 let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
179
180 if total >= self.window_size {
182 self.update_all_ema(&mut cols, total);
183 self.window_updates.store(0, Ordering::SeqCst);
184 }
185 }
186
187 fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
188 for temp in cols.values_mut() {
189 temp.update_ema(total);
190 }
191 }
192
193 pub fn get_hot_columns(&self) -> HashSet<String> {
195 let cols = self.columns.read();
196 cols.values()
197 .filter(|t| t.is_hot(self.hot_threshold))
198 .map(|t| t.name.clone())
199 .collect()
200 }
201
202 pub fn get_cold_columns(&self) -> HashSet<String> {
204 let cols = self.columns.read();
205 cols.values()
206 .filter(|t| !t.is_hot(self.hot_threshold))
207 .map(|t| t.name.clone())
208 .collect()
209 }
210
211 pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
213 self.columns.read().values().cloned().collect()
214 }
215
216 pub fn set_hot_threshold(&self, _threshold: f64) {
218 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
233pub struct ColumnStripeRef {
234 pub level: u32,
236 pub segment_id: u64,
238 pub column_name: String,
240 pub offset: u64,
242 pub length: u64,
244 pub row_count: u64,
246 pub compression: u8,
248}
249
250impl ColumnStripeRef {
251 pub fn new(
253 level: u32,
254 segment_id: u64,
255 column_name: String,
256 offset: u64,
257 length: u64,
258 row_count: u64,
259 ) -> Self {
260 Self {
261 level,
262 segment_id,
263 column_name,
264 offset,
265 length,
266 row_count,
267 compression: 0,
268 }
269 }
270
271 pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
273 Self {
274 level: new_level,
275 segment_id: new_segment_id,
276 column_name: self.column_name.clone(),
277 offset: new_offset,
278 length: self.length,
279 row_count: self.row_count,
280 compression: self.compression,
281 }
282 }
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SegmentDescriptor {
291 pub id: u64,
293 pub level: u32,
295 pub col_refs: HashMap<String, ColumnStripeRef>,
297 pub min_row_id: RowId,
299 pub max_row_id: RowId,
301 pub row_count: u64,
303 pub min_timestamp: u64,
305 pub max_timestamp: u64,
307 pub is_tombstone: bool,
309}
310
311pub type ColumnId = u32;
313
314pub type RowId = u64;
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
319#[repr(u8)]
320pub enum ColumnType {
321 Bool = 0,
322 Int64 = 1,
323 UInt64 = 2,
324 Float64 = 3,
325 Text = 4,
326 Binary = 5,
327 Timestamp = 6,
328}
329
330impl ColumnType {
331 pub fn fixed_size(&self) -> Option<usize> {
333 match self {
334 ColumnType::Bool => Some(1),
335 ColumnType::Int64
336 | ColumnType::UInt64
337 | ColumnType::Float64
338 | ColumnType::Timestamp => Some(8),
339 ColumnType::Text | ColumnType::Binary => None,
340 }
341 }
342
343 pub fn from_byte(b: u8) -> Option<Self> {
345 match b {
346 0 => Some(ColumnType::Bool),
347 1 => Some(ColumnType::Int64),
348 2 => Some(ColumnType::UInt64),
349 3 => Some(ColumnType::Float64),
350 4 => Some(ColumnType::Text),
351 5 => Some(ColumnType::Binary),
352 6 => Some(ColumnType::Timestamp),
353 _ => None,
354 }
355 }
356}
357
358#[derive(Debug, Clone)]
360pub struct TableSchema {
361 pub name: String,
363 pub columns: Vec<ColumnDef>,
365}
366
367impl TableSchema {
368 pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
369 Self { name, columns }
370 }
371
372 pub fn with_mvcc(mut self) -> Self {
374 if !self.columns.iter().any(|c| c.name == "__txn_start") {
375 self.columns.push(ColumnDef {
376 name: "__txn_start".to_string(),
377 col_type: ColumnType::UInt64,
378 nullable: false,
379 });
380 }
381 if !self.columns.iter().any(|c| c.name == "__txn_end") {
382 self.columns.push(ColumnDef {
383 name: "__txn_end".to_string(),
384 col_type: ColumnType::UInt64,
385 nullable: false, });
387 }
388 self
389 }
390}
391
392#[derive(Debug, Clone)]
394pub struct ColumnDef {
395 pub name: String,
397 pub col_type: ColumnType,
399 pub nullable: bool,
401}
402
403#[derive(Debug)]
405struct ColumnBuffer {
406 col_type: ColumnType,
408 data: Vec<u8>,
410 nulls: Vec<u8>,
412 offsets: Option<Vec<u32>>,
414 row_count: u64,
416}
417
418impl ColumnBuffer {
419 fn new(col_type: ColumnType) -> Self {
420 Self {
421 col_type,
422 data: Vec::new(),
423 nulls: Vec::new(),
424 offsets: if col_type.fixed_size().is_none() {
425 Some(vec![0]) } else {
427 None
428 },
429 row_count: 0,
430 }
431 }
432
433 fn append(&mut self, value: Option<&[u8]>) {
435 let bit_idx = self.row_count as usize;
437 let byte_idx = bit_idx / 8;
438 let bit_offset = bit_idx % 8;
439
440 while self.nulls.len() <= byte_idx {
441 self.nulls.push(0);
442 }
443
444 if let Some(data) = value {
445 self.nulls[byte_idx] |= 1 << bit_offset;
447
448 self.data.extend_from_slice(data);
450
451 if let Some(offsets) = &mut self.offsets {
453 offsets.push(self.data.len() as u32);
454 }
455 } else if let Some(offsets) = &mut self.offsets {
456 let last = *offsets.last().unwrap();
458 offsets.push(last);
459 }
460
461 self.row_count += 1;
462 }
463
464 fn is_null(&self, row_idx: u64) -> bool {
466 if row_idx >= self.row_count {
467 return true; }
469 let byte_idx = (row_idx / 8) as usize;
470 let bit_offset = (row_idx % 8) as u8;
471
472 if byte_idx >= self.nulls.len() {
473 return true;
474 }
475
476 (self.nulls[byte_idx] & (1 << bit_offset)) == 0
477 }
478
479 fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
482 if row_idx >= self.row_count || self.is_null(row_idx) {
483 return None;
484 }
485
486 if let Some(fixed_size) = self.col_type.fixed_size() {
487 let start = (row_idx as usize) * fixed_size;
489 let end = start + fixed_size;
490 if end <= self.data.len() {
491 Some(self.data[start..end].to_vec())
492 } else {
493 None
494 }
495 } else {
496 if let Some(offsets) = &self.offsets {
498 let start = offsets[row_idx as usize] as usize;
499 let end = offsets[(row_idx + 1) as usize] as usize;
500 if end <= self.data.len() {
501 Some(self.data[start..end].to_vec())
502 } else {
503 None
504 }
505 } else {
506 None
507 }
508 }
509 }
510
511 fn memory_bytes(&self) -> usize {
513 self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
514 }
515}
516
517#[derive(Debug)]
519pub struct ColumnarMemtable {
520 schema: TableSchema,
522 columns: Vec<RwLock<ColumnBuffer>>,
524 row_ids: RwLock<BTreeMap<RowId, u64>>,
526 row_idx_to_id: RwLock<Vec<RowId>>,
528 next_row_idx: AtomicU64,
530 bytes_written: AtomicU64,
532 size_limit: usize,
534}
535
536impl ColumnarMemtable {
537 pub fn new(schema: TableSchema, size_limit: usize) -> Self {
539 let columns = schema
540 .columns
541 .iter()
542 .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
543 .collect();
544
545 Self {
546 schema,
547 columns,
548 row_ids: RwLock::new(BTreeMap::new()),
549 row_idx_to_id: RwLock::new(Vec::new()),
550 next_row_idx: AtomicU64::new(0),
551 bytes_written: AtomicU64::new(0),
552 size_limit,
553 }
554 }
555
556 pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
560 if values.len() != self.schema.columns.len() {
561 return Err(SochDBError::InvalidData(format!(
562 "Expected {} columns, got {}",
563 self.schema.columns.len(),
564 values.len()
565 )));
566 }
567
568 let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
569
570 let mut bytes = 0usize;
572 for (i, value) in values.iter().enumerate() {
573 let mut col = self.columns[i].write();
574 if let Some(data) = value {
575 bytes += data.len();
576 }
577 col.append(*value);
578 }
579
580 {
582 let mut ids = self.row_ids.write();
583 ids.insert(row_id, row_idx);
584 }
585 {
586 let mut idx_to_id = self.row_idx_to_id.write();
587 while idx_to_id.len() <= row_idx as usize {
589 idx_to_id.push(0); }
591 idx_to_id[row_idx as usize] = row_id;
592 }
593
594 self.bytes_written
595 .fetch_add(bytes as u64, Ordering::Relaxed);
596
597 Ok(())
598 }
599
600 pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
603 let row_ids = self.row_ids.read();
605 let row_idx = *row_ids.get(&row_id)?;
606 drop(row_ids);
607
608 let mut values = Vec::with_capacity(self.columns.len());
610 for col in &self.columns {
611 let col_buf = col.read();
612 values.push(col_buf.get(row_idx));
613 }
614
615 Some(values)
616 }
617
618 pub fn get_columns(
620 &self,
621 row_id: RowId,
622 col_indices: &[usize],
623 ) -> Option<Vec<Option<Vec<u8>>>> {
624 let row_ids = self.row_ids.read();
626 let row_idx = *row_ids.get(&row_id)?;
627 drop(row_ids);
628
629 let mut values = Vec::with_capacity(col_indices.len());
631 for &col_idx in col_indices {
632 if col_idx < self.columns.len() {
633 let col_buf = self.columns[col_idx].read();
634 values.push(col_buf.get(row_idx));
635 } else {
636 values.push(None);
637 }
638 }
639
640 Some(values)
641 }
642
643 pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
645 let row_ids = self.row_ids.read();
646 let mut results = Vec::new();
647
648 for (&row_id, &row_idx) in row_ids.range(start..=end) {
649 let mut values = Vec::with_capacity(self.columns.len());
650 for col in &self.columns {
651 let col_buf = col.read();
652 values.push(col_buf.get(row_idx));
653 }
654 results.push((row_id, values));
655 }
656
657 results
658 }
659
660 pub fn is_full(&self) -> bool {
662 self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
663 }
664
665 pub fn row_count(&self) -> u64 {
667 self.next_row_idx.load(Ordering::SeqCst)
668 }
669
670 pub fn memory_bytes(&self) -> usize {
672 self.columns.iter().map(|c| c.read().memory_bytes()).sum()
673 }
674
675 pub fn schema(&self) -> &TableSchema {
677 &self.schema
678 }
679}
680
681use sochdb_core::learned_index::LearnedSparseIndex;
682
683#[derive(Debug, Clone, Serialize, Deserialize)]
685pub struct ColumnIndex {
686 pub offset: u64,
688 pub length: u64,
690 pub compression: u8,
692}
693
694#[derive(Debug)]
696#[allow(dead_code)]
697pub struct ColumnGroup {
698 path: PathBuf,
700 schema: TableSchema,
702 level: u32,
704 sequence: u64,
706 row_count: u64,
708 min_timestamp: u64,
710 max_timestamp: u64,
712 column_offsets: BTreeMap<String, ColumnIndex>,
714 lsi: Option<LearnedSparseIndex>,
716}
717
718impl ColumnGroup {
719 const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
721 const VERSION: u32 = 1;
722
723 #[allow(clippy::too_many_arguments)]
725 pub fn new(
726 path: PathBuf,
727 schema: TableSchema,
728 level: u32,
729 sequence: u64,
730 row_count: u64,
731 min_timestamp: u64,
732 max_timestamp: u64,
733 column_offsets: BTreeMap<String, ColumnIndex>,
734 lsi: Option<LearnedSparseIndex>,
735 ) -> Self {
736 Self {
737 path,
738 schema,
739 level,
740 sequence,
741 row_count,
742 min_timestamp,
743 max_timestamp,
744 column_offsets,
745 lsi,
746 }
747 }
748
749 pub fn from_memtable(
751 base_path: &Path,
752 memtable: &ColumnarMemtable,
753 level: u32,
754 sequence: u64,
755 ) -> Result<Self> {
756 use byteorder::{LittleEndian, WriteBytesExt};
757 use std::fs::File;
758 use std::io::{BufWriter, Seek, Write};
759
760 let file_name = format!("L{}_seq{}.sst", level, sequence);
762 let file_path = base_path.join(&file_name);
763 let file = File::create(&file_path)?;
764 let mut writer = BufWriter::new(file);
765
766 writer.write_all(&Self::MAGIC)?;
768 writer.write_u32::<LittleEndian>(Self::VERSION)?;
769
770 let mut column_offsets = BTreeMap::new();
771 let mut min_ts = u64::MAX;
772 let mut max_ts = 0u64;
773
774 for (i, col_lock) in memtable.columns.iter().enumerate() {
776 let col = col_lock.read();
777 let col_def = &memtable.schema.columns[i];
778
779 if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
781 let mut offset = 0;
783 let row_count = col.row_count as usize;
784 for row_idx in 0..row_count {
785 let byte_idx = row_idx / 8;
787 let bit_idx = row_idx % 8;
788 let is_null =
789 byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
790
791 if !is_null && offset + 8 <= col.data.len() {
792 let ts = u64::from_le_bytes(
793 col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
794 );
795 min_ts = min_ts.min(ts);
796 max_ts = max_ts.max(ts);
797 }
798 offset += 8;
799 }
800 }
801
802 let start_offset = writer.stream_position()?;
803
804 writer.write_u8(col.col_type as u8)?;
806 writer.write_u64::<LittleEndian>(col.row_count)?;
807
808 writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
810 writer.write_all(&col.nulls)?;
811
812 if let Some(offsets) = &col.offsets {
814 writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
815 for &off in offsets {
816 writer.write_u32::<LittleEndian>(off)?;
817 }
818 }
819
820 writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
822 writer.write_all(&col.data)?;
823
824 let end_offset = writer.stream_position()?;
825
826 column_offsets.insert(
827 col_def.name.clone(),
828 ColumnIndex {
829 offset: start_offset,
830 length: end_offset - start_offset,
831 compression: 0, },
833 );
834 }
835
836 let row_ids = memtable.row_ids.read();
838 let keys: Vec<u64> = row_ids.keys().cloned().collect();
839 let lsi = LearnedSparseIndex::build(&keys);
840
841 let footer_start = writer.stream_position()?;
843
844 let offsets_bytes = bincode::serialize(&column_offsets)
846 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
847 writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
848 writer.write_all(&offsets_bytes)?;
849
850 let lsi_bytes =
852 bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
853 writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
854 writer.write_all(&lsi_bytes)?;
855
856 writer.write_u64::<LittleEndian>(footer_start)?;
858 writer.write_all(&Self::MAGIC)?;
859
860 writer.flush()?;
861
862 if min_ts == u64::MAX || max_ts == 0 {
864 let now = std::time::SystemTime::now()
865 .duration_since(std::time::UNIX_EPOCH)
866 .unwrap()
867 .as_micros() as u64;
868 min_ts = now;
869 max_ts = now;
870 }
871
872 Ok(Self {
873 path: file_path,
874 schema: memtable.schema.clone(),
875 level,
876 sequence,
877 row_count: memtable.row_count(),
878 min_timestamp: min_ts,
879 max_timestamp: max_ts,
880 column_offsets,
881 lsi: Some(lsi),
882 })
883 }
884
885 pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
887 use byteorder::{LittleEndian, ReadBytesExt};
888 use std::fs::File;
889 use std::io::{Read, Seek, SeekFrom};
890
891 let mut file = File::open(&path)?;
892 let file_len = file.metadata()?.len();
893
894 if file_len < 12 {
895 return Err(SochDBError::Corruption("File too short".to_string()));
897 }
898
899 file.seek(SeekFrom::End(-12))?;
901 let footer_offset = file.read_u64::<LittleEndian>()?;
902 let mut magic = [0u8; 4];
903 file.read_exact(&mut magic)?;
904
905 if magic != Self::MAGIC {
906 return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
907 }
908
909 file.seek(SeekFrom::Start(footer_offset))?;
911
912 let offsets_len = file.read_u64::<LittleEndian>()?;
914 let mut offsets_bytes = vec![0u8; offsets_len as usize];
915 file.read_exact(&mut offsets_bytes)?;
916 let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
917 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
918
919 let lsi_len = file.read_u64::<LittleEndian>()?;
921 let mut lsi_bytes = vec![0u8; lsi_len as usize];
922 file.read_exact(&mut lsi_bytes)?;
923 let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
924 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
925
926 Ok(Self {
927 path,
928 schema,
929 level,
930 sequence,
931 row_count: 0, min_timestamp: 0,
933 max_timestamp: 0,
934 column_offsets,
935 lsi: Some(lsi),
936 })
937 }
938
939 pub fn file_path(&self) -> &Path {
941 &self.path
942 }
943
944 pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
946 self.column_offsets.get(col_name)
947 }
948
949 pub fn level(&self) -> u32 {
951 self.level
952 }
953
954 pub fn row_count(&self) -> u64 {
956 self.row_count
957 }
958}
959
960#[derive(Debug, Clone, Default)]
966pub struct CompactionStats {
967 pub compactions_total: u64,
969 pub l0_compactions: u64,
971 pub bytes_read: u64,
973 pub bytes_written: u64,
975 pub hot_column_compactions: u64,
977 pub cold_column_refs_preserved: u64,
979 pub estimated_wa_reduction: f64,
981 pub last_compaction_duration_us: u64,
983}
984
985impl CompactionStats {
986 pub fn write_amplification(&self) -> f64 {
988 if self.bytes_read == 0 {
989 1.0
990 } else {
991 self.bytes_written as f64 / self.bytes_read as f64
992 }
993 }
994}
995
996#[derive(Debug, Clone, Default)]
998pub struct LscsRecoveryStats {
999 pub transactions_recovered: usize,
1001 pub rows_recovered: usize,
1003 pub max_row_id: u64,
1005}
1006
1007#[derive(Debug, Clone)]
1009pub struct LscsConfig {
1010 pub memtable_size: usize,
1012 pub num_levels: usize,
1014 pub level_ratio: usize,
1016 pub l0_compaction_threshold: usize,
1018 pub hot_threshold: f64,
1020 pub temperature_window_size: u64,
1022}
1023
1024impl Default for LscsConfig {
1025 fn default() -> Self {
1026 Self {
1027 memtable_size: 64 * 1024 * 1024, num_levels: 7,
1029 level_ratio: 10,
1030 l0_compaction_threshold: 4,
1031 hot_threshold: 0.1, temperature_window_size: 1000, }
1034 }
1035}
1036
1037pub struct Lscs {
1039 config: LscsConfig,
1041 path: PathBuf,
1043 schema: TableSchema,
1045 wal: Arc<TxnWal>,
1047 active_memtable: RwLock<ColumnarMemtable>,
1049 immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1051 column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1053 segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1055 temperature_tracker: Arc<ColumnTemperatureTracker>,
1057 next_sequence: AtomicU64,
1059 next_row_id: AtomicU64,
1061 compaction_stats: RwLock<CompactionStats>,
1063}
1064
1065impl Lscs {
1066 pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1068 std::fs::create_dir_all(&path)?;
1069
1070 let wal_path = path.join("wal.log");
1071 let wal = Arc::new(TxnWal::new(&wal_path)?);
1072
1073 let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1074
1075 let mut column_groups = Vec::with_capacity(config.num_levels);
1076 for _ in 0..config.num_levels {
1077 column_groups.push(Vec::new());
1078 }
1079
1080 let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1082 let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1083 &column_names,
1084 config.temperature_window_size,
1085 ));
1086
1087 Ok(Self {
1088 config,
1089 path,
1090 schema,
1091 wal,
1092 active_memtable: RwLock::new(active_memtable),
1093 immutable_memtables: RwLock::new(Vec::new()),
1094 column_groups: RwLock::new(column_groups),
1095 segment_descriptors: RwLock::new(HashMap::new()),
1096 temperature_tracker,
1097 next_sequence: AtomicU64::new(0),
1098 next_row_id: AtomicU64::new(1),
1099 compaction_stats: RwLock::new(CompactionStats::default()),
1100 })
1101 }
1102
1103 pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1116 let lscs = Self::new(path, schema, config)?;
1117 let stats = lscs.recover()?;
1118
1119 if stats.rows_recovered > 0 {
1120 eprintln!(
1121 "LSCS Recovery: restored {} rows from {} transactions",
1122 stats.rows_recovered, stats.transactions_recovered
1123 );
1124 }
1125
1126 Ok(lscs)
1127 }
1128
1129 pub fn recover(&self) -> Result<LscsRecoveryStats> {
1133 let (writes, txn_count) = self.wal.replay_for_recovery()?;
1134
1135 if writes.is_empty() {
1136 return Ok(LscsRecoveryStats::default());
1137 }
1138
1139 let mut max_row_id: u64 = 0;
1140 let mut rows_recovered = 0usize;
1141
1142 for (key, value) in &writes {
1144 if key.len() >= 8 {
1146 let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1147 if row_id > max_row_id {
1148 max_row_id = row_id;
1149 }
1150
1151 if let Ok(row_values) = Self::deserialize_row(value) {
1153 let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
1154
1155 let memtable = self.active_memtable.read();
1156 if memtable.insert(row_id, &value_refs).is_ok() {
1157 rows_recovered += 1;
1158 }
1159 }
1160 }
1161 }
1162
1163 if max_row_id > 0 {
1165 self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1166 }
1167
1168 Ok(LscsRecoveryStats {
1169 transactions_recovered: txn_count,
1170 rows_recovered,
1171 max_row_id,
1172 })
1173 }
1174
1175 pub fn mark_clean_shutdown(&self) -> Result<()> {
1180 self.fsync()?;
1182
1183 let marker_path = self.path.join(".clean_shutdown");
1185 std::fs::write(&marker_path, b"clean")?;
1186
1187 Ok(())
1191 }
1192
1193 pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1195 let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1196
1197 let txn_id = self.wal.begin_transaction()?;
1199
1200 let key = row_id.to_le_bytes().to_vec();
1202 let value = self.serialize_row(values)?;
1203 self.wal.write(txn_id, key, value)?;
1204 self.wal.commit_transaction(txn_id)?;
1205
1206 let memtable = self.active_memtable.read();
1208 memtable.insert(row_id, values)?;
1209
1210 if memtable.is_full() {
1212 drop(memtable);
1213 self.rotate_memtable()?;
1214 }
1215
1216 Ok(row_id)
1217 }
1218
1219 pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1233 let txn_end_idx = self
1235 .schema
1236 .columns
1237 .iter()
1238 .position(|c| c.name == "__txn_end")
1239 .ok_or_else(|| {
1240 SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1241 })?;
1242
1243 let current = self
1245 .get(row_id)?
1246 .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1247
1248 let mut new_values: Vec<Option<Vec<u8>>> = current;
1250 new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1251
1252 let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1254
1255 let wal_txn_id = self.wal.begin_transaction()?;
1258
1259 let row_data = self.serialize_row(&value_refs)?;
1261 self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1262
1263 self.wal.commit_transaction(wal_txn_id)?;
1265
1266 let memtable = self.active_memtable.read();
1268 memtable.insert(row_id, &value_refs)?;
1269
1270 Ok(())
1271 }
1272
1273 fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1275 use byteorder::{LittleEndian, WriteBytesExt};
1276
1277 let mut buf = Vec::new();
1278 buf.write_u32::<LittleEndian>(values.len() as u32)?;
1279
1280 for value in values {
1281 match value {
1282 Some(data) => {
1283 buf.write_u8(1)?; buf.write_u32::<LittleEndian>(data.len() as u32)?;
1285 buf.extend_from_slice(data);
1286 }
1287 None => {
1288 buf.write_u8(0)?; }
1290 }
1291 }
1292
1293 Ok(buf)
1294 }
1295
1296 #[allow(dead_code)]
1298 fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1299 use byteorder::{LittleEndian, ReadBytesExt};
1300 use std::io::Cursor;
1301
1302 let mut cursor = Cursor::new(data);
1303 let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1304 let mut values = Vec::with_capacity(num_cols);
1305
1306 for _ in 0..num_cols {
1307 let is_non_null = cursor.read_u8()? == 1;
1308 if is_non_null {
1309 let len = cursor.read_u32::<LittleEndian>()? as usize;
1310 let pos = cursor.position() as usize;
1311 let value = data[pos..pos + len].to_vec();
1312 cursor.set_position((pos + len) as u64);
1313 values.push(Some(value));
1314 } else {
1315 values.push(None);
1316 }
1317 }
1318
1319 Ok(values)
1320 }
1321
1322 pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1327 {
1329 let memtable = self.active_memtable.read();
1330 if let Some(values) = memtable.get(row_id) {
1331 return Ok(Some(values));
1332 }
1333 }
1334
1335 {
1337 let immutable = self.immutable_memtables.read();
1338 for memtable in immutable.iter().rev() {
1339 if let Some(values) = memtable.get(row_id) {
1340 return Ok(Some(values));
1341 }
1342 }
1343 }
1344
1345 {
1348 use sochdb_core::learned_index::LookupResult;
1349 let groups = self.column_groups.read();
1350 for level in &*groups {
1351 for group in level.iter().rev() {
1352 if let Some(lsi) = &group.lsi {
1353 let lookup = lsi.lookup(row_id);
1355 match lookup {
1356 LookupResult::Exact(_) | LookupResult::Range { .. } => {
1357 if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1359 return Ok(Some(row));
1360 }
1361 }
1362 LookupResult::NotFound => {
1363 continue;
1365 }
1366 }
1367 }
1368 }
1369 }
1370 }
1371
1372 Ok(None)
1373 }
1374
1375 fn read_row_from_sstable(
1377 &self,
1378 group: &ColumnGroup,
1379 row_id: RowId,
1380 ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1381 use byteorder::{LittleEndian, ReadBytesExt};
1382 use std::fs::File;
1383 use std::io::{BufReader, Read, Seek, SeekFrom};
1384
1385 let file = File::open(group.file_path())?;
1386 let mut reader = BufReader::new(file);
1387
1388 let mut values = Vec::new();
1389
1390 for (col_name, col_idx) in &group.column_offsets {
1392 reader.seek(SeekFrom::Start(col_idx.offset))?;
1393
1394 let col_type = reader.read_u8()?;
1396 let row_count = reader.read_u64::<LittleEndian>()?;
1397
1398 if row_id >= row_count {
1399 values.push(None);
1400 continue;
1401 }
1402
1403 let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1405 let mut nulls = vec![0u8; nulls_len];
1406 reader.read_exact(&mut nulls)?;
1407
1408 let byte_idx = (row_id / 8) as usize;
1410 let bit_offset = (row_id % 8) as u8;
1411 let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1412
1413 if is_null {
1414 values.push(None);
1415 continue;
1416 }
1417
1418 let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1420 if let Some(fixed_size) = col_type.fixed_size() {
1421 let offsets_section = reader.stream_position()?;
1423 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1424 let _ = data_len;
1425
1426 let row_offset = (row_id as usize) * fixed_size;
1428 reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1429
1430 let mut value = vec![0u8; fixed_size];
1431 reader.read_exact(&mut value)?;
1432 values.push(Some(value));
1433 } else {
1434 let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1436 let mut offsets = vec![0u32; offsets_count];
1437 for offset in offsets.iter_mut().take(offsets_count) {
1438 *offset = reader.read_u32::<LittleEndian>()?;
1439 }
1440
1441 if (row_id as usize + 1) >= offsets.len() {
1442 values.push(None);
1443 continue;
1444 }
1445
1446 let start = offsets[row_id as usize] as usize;
1447 let end = offsets[(row_id + 1) as usize] as usize;
1448
1449 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1451 let data_start = reader.stream_position()?;
1452
1453 if end <= data_len {
1454 reader.seek(SeekFrom::Start(data_start + start as u64))?;
1455 let mut value = vec![0u8; end - start];
1456 reader.read_exact(&mut value)?;
1457 values.push(Some(value));
1458 } else {
1459 values.push(None);
1460 }
1461 }
1462 let _ = col_name; }
1464
1465 if values.is_empty() {
1466 Ok(None)
1467 } else {
1468 Ok(Some(values))
1469 }
1470 }
1471
1472 pub fn fsync(&self) -> Result<()> {
1480 self.wal.sync()?;
1482
1483 let memtable = self.active_memtable.read();
1485 let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1486 drop(memtable);
1487
1488 if should_flush {
1489 self.rotate_memtable()?;
1491 self.flush()?;
1492 }
1493
1494 Ok(())
1495 }
1496
1497 fn rotate_memtable(&self) -> Result<()> {
1499 let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1500
1501 let old_memtable = {
1502 let mut active = self.active_memtable.write();
1503 std::mem::replace(&mut *active, new_memtable)
1504 };
1505
1506 let mut immutable = self.immutable_memtables.write();
1507 immutable.push(old_memtable);
1508
1509 if immutable.len() >= 2 {
1511 drop(immutable); self.flush()?;
1514 }
1515
1516 Ok(())
1517 }
1518
1519 pub fn flush(&self) -> Result<()> {
1521 let memtables = {
1522 let mut immutable = self.immutable_memtables.write();
1523 std::mem::take(&mut *immutable)
1524 };
1525
1526 for memtable in memtables {
1527 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1528 let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1529
1530 let mut groups = self.column_groups.write();
1531 groups[0].push(column_group);
1532 }
1533
1534 let groups = self.column_groups.read();
1536 if groups[0].len() >= self.config.l0_compaction_threshold {
1537 drop(groups);
1538 self.compact_l0()?;
1539 }
1540
1541 Ok(())
1542 }
1543
1544 fn compact_l0(&self) -> Result<()> {
1552 let start_time = std::time::Instant::now();
1553
1554 let hot_columns = self.temperature_tracker.get_hot_columns();
1556 let cold_columns = self.temperature_tracker.get_cold_columns();
1557
1558 let total_columns = self.schema.columns.len();
1559 let hot_fraction = if total_columns > 0 {
1560 hot_columns.len() as f64 / total_columns as f64
1561 } else {
1562 1.0
1563 };
1564
1565 let l0_segments: Vec<ColumnGroup> = {
1567 let mut groups = self.column_groups.write();
1568 std::mem::take(&mut groups[0])
1569 };
1570
1571 if l0_segments.is_empty() {
1572 return Ok(());
1573 }
1574
1575 let mut bytes_read = 0u64;
1576 let mut bytes_written = 0u64;
1577 let mut cold_refs_preserved = 0u64;
1578
1579 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1581
1582 let _merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1590
1591 let mut col_refs = HashMap::new();
1593 let mut total_row_count = 0u64;
1594 let min_row_id = u64::MAX;
1595 let max_row_id = 0u64;
1596
1597 for segment in &l0_segments {
1599 bytes_read += segment.row_count * 100; total_row_count += segment.row_count;
1601
1602 for col_name in &hot_columns {
1604 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1605 bytes_read += col_idx.length;
1606 bytes_written += col_idx.length;
1607 }
1608 }
1609
1610 for col_name in &cold_columns {
1612 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1613 let stripe_ref = ColumnStripeRef::new(
1615 segment.level,
1616 segment.sequence,
1617 col_name.clone(),
1618 col_idx.offset,
1619 col_idx.length,
1620 segment.row_count,
1621 );
1622 col_refs.insert(col_name.clone(), stripe_ref);
1623 cold_refs_preserved += 1;
1624 }
1625 }
1626 }
1627
1628 let segment_desc = SegmentDescriptor {
1630 id: sequence,
1631 level: 1,
1632 col_refs,
1633 min_row_id,
1634 max_row_id,
1635 row_count: total_row_count,
1636 min_timestamp: 0,
1637 max_timestamp: std::time::SystemTime::now()
1638 .duration_since(std::time::UNIX_EPOCH)
1639 .unwrap()
1640 .as_micros() as u64,
1641 is_tombstone: false,
1642 };
1643
1644 {
1646 let mut descriptors = self.segment_descriptors.write();
1647 descriptors.insert(sequence, segment_desc);
1648 }
1649
1650 {
1652 let mut stats = self.compaction_stats.write();
1653 stats.compactions_total += 1;
1654 stats.l0_compactions += 1;
1655 stats.bytes_read += bytes_read;
1656 stats.bytes_written += bytes_written;
1657 stats.cold_column_refs_preserved += cold_refs_preserved;
1658 stats.hot_column_compactions += 1;
1659 stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1660 stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1661 }
1662
1663 for segment in l0_segments {
1666 let _ = segment; }
1670
1671 Ok(())
1672 }
1673
1674 #[allow(dead_code)]
1686 fn selective_merge_hot_columns(
1687 &self,
1688 segments: &[&ColumnGroup],
1689 hot_columns: &HashSet<String>,
1690 output_path: &Path,
1691 ) -> Result<HashMap<String, ColumnStripeRef>> {
1692 use byteorder::{LittleEndian, WriteBytesExt};
1693 use std::fs::File;
1694 use std::io::{BufWriter, Seek, Write};
1695
1696 let mut result = HashMap::new();
1697
1698 let file = File::create(output_path)?;
1700 let mut writer = BufWriter::new(file);
1701
1702 writer.write_all(&ColumnGroup::MAGIC)?;
1704 writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1705
1706 let sequence = self.next_sequence.load(Ordering::SeqCst);
1707
1708 for col_name in hot_columns {
1710 let start_offset = writer.stream_position()?;
1711
1712 let mut merged_data = Vec::new();
1714 let mut row_count = 0u64;
1715
1716 for segment in segments {
1717 if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1718 merged_data.extend_from_slice(&[0u8; 0]); row_count += segment.row_count;
1722 }
1723 }
1724
1725 writer.write_u64::<LittleEndian>(row_count)?;
1728 writer.write_all(&merged_data)?;
1729
1730 let end_offset = writer.stream_position()?;
1731
1732 let stripe_ref = ColumnStripeRef::new(
1734 1, sequence,
1736 col_name.clone(),
1737 start_offset,
1738 end_offset - start_offset,
1739 row_count,
1740 );
1741 result.insert(col_name.clone(), stripe_ref);
1742 }
1743
1744 writer.flush()?;
1745 Ok(result)
1746 }
1747
1748 pub fn scan_columns(
1753 &self,
1754 column_names: &[&str],
1755 row_range: Option<(RowId, RowId)>,
1756 ) -> Result<Vec<Vec<u8>>> {
1757 let mut results = Vec::new();
1758
1759 let descriptors = self.segment_descriptors.read();
1761
1762 for (_seg_id, descriptor) in descriptors.iter() {
1763 if let Some((min, max)) = row_range
1765 && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1766 {
1767 continue;
1768 }
1769
1770 for col_name in column_names {
1772 if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1773 let data = self.read_column_stripe(stripe_ref)?;
1775 results.push(data);
1776 }
1777 }
1778 }
1779
1780 Ok(results)
1781 }
1782
1783 fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1785 use std::fs::File;
1786 use std::io::{Read, Seek, SeekFrom};
1787
1788 let file_path = self.path.join(format!(
1790 "L{}_seq{}.sst",
1791 stripe_ref.level, stripe_ref.segment_id
1792 ));
1793
1794 let mut file = File::open(&file_path)?;
1795 file.seek(SeekFrom::Start(stripe_ref.offset))?;
1796
1797 let mut data = vec![0u8; stripe_ref.length as usize];
1798 file.read_exact(&mut data)?;
1799
1800 Ok(data)
1801 }
1802
1803 pub fn compaction_stats(&self) -> CompactionStats {
1805 self.compaction_stats.read().clone()
1806 }
1807
1808 pub fn compact(&self) -> Result<()> {
1812 self.compact_l0()
1813 }
1814
1815 pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1817 self.temperature_tracker.get_all_temperatures()
1818 }
1819
1820 #[allow(clippy::type_complexity)]
1825 pub fn scan_range(
1826 &self,
1827 start: RowId,
1828 end: RowId,
1829 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1830 let mut results = Vec::new();
1831 let mut seen = std::collections::HashSet::new();
1832
1833 {
1835 let memtable = self.active_memtable.read();
1836 for (row_id, values) in memtable.scan_range(start, end) {
1837 if seen.insert(row_id) {
1838 results.push((row_id, values));
1839 }
1840 }
1841 }
1842
1843 {
1845 let immutable = self.immutable_memtables.read();
1846 for memtable in immutable.iter().rev() {
1847 for (row_id, values) in memtable.scan_range(start, end) {
1848 if seen.insert(row_id) {
1849 results.push((row_id, values));
1850 }
1851 }
1852 }
1853 }
1854
1855 results.sort_by_key(|(id, _)| *id);
1860
1861 Ok(results)
1862 }
1863
1864 #[allow(clippy::type_complexity)]
1868 pub fn scan_columns_range(
1869 &self,
1870 start: RowId,
1871 end: RowId,
1872 col_indices: &[usize],
1873 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1874 let mut results = Vec::new();
1875 let mut seen = std::collections::HashSet::new();
1876
1877 {
1879 let memtable = self.active_memtable.read();
1880 let row_ids = memtable.row_ids.read();
1881
1882 for (&row_id, _) in row_ids.range(start..=end) {
1883 if seen.insert(row_id)
1884 && let Some(values) = memtable.get_columns(row_id, col_indices)
1885 {
1886 results.push((row_id, values));
1887 }
1888 }
1889 }
1890
1891 {
1893 let immutable = self.immutable_memtables.read();
1894 for memtable in immutable.iter().rev() {
1895 let row_ids = memtable.row_ids.read();
1896 for (&row_id, _) in row_ids.range(start..=end) {
1897 if seen.insert(row_id)
1898 && let Some(values) = memtable.get_columns(row_id, col_indices)
1899 {
1900 results.push((row_id, values));
1901 }
1902 }
1903 }
1904 }
1905
1906 results.sort_by_key(|(id, _)| *id);
1908
1909 Ok(results)
1910 }
1911
1912 pub fn stats(&self) -> LscsStats {
1914 let active = self.active_memtable.read();
1915 let immutable = self.immutable_memtables.read();
1916 let groups = self.column_groups.read();
1917
1918 let mut level_sizes = vec![0u64; self.config.num_levels];
1919 let mut disk_bytes = 0u64;
1920
1921 for (i, level) in groups.iter().enumerate() {
1922 for group in level {
1923 level_sizes[i] += group.row_count;
1924 if let Ok(metadata) = std::fs::metadata(&group.path) {
1926 disk_bytes += metadata.len();
1927 }
1928 }
1929 }
1930
1931 LscsStats {
1932 active_memtable_bytes: active.memory_bytes(),
1933 immutable_memtables: immutable.len(),
1934 level_row_counts: level_sizes,
1935 next_row_id: self.next_row_id.load(Ordering::SeqCst),
1936 disk_bytes,
1937 }
1938 }
1939
1940 pub fn wal(&self) -> &Arc<TxnWal> {
1942 &self.wal
1943 }
1944}
1945
1946#[derive(Debug, Clone)]
1948pub struct LscsStats {
1949 pub active_memtable_bytes: usize,
1951 pub immutable_memtables: usize,
1953 pub level_row_counts: Vec<u64>,
1955 pub next_row_id: u64,
1957 pub disk_bytes: u64,
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963 use super::*;
1964 use tempfile::tempdir;
1965
1966 fn test_schema() -> TableSchema {
1967 TableSchema {
1968 name: "users".to_string(),
1969 columns: vec![
1970 ColumnDef {
1971 name: "id".to_string(),
1972 col_type: ColumnType::UInt64,
1973 nullable: false,
1974 },
1975 ColumnDef {
1976 name: "name".to_string(),
1977 col_type: ColumnType::Text,
1978 nullable: false,
1979 },
1980 ColumnDef {
1981 name: "score".to_string(),
1982 col_type: ColumnType::Float64,
1983 nullable: true,
1984 },
1985 ],
1986 }
1987 }
1988
1989 #[test]
1990 fn test_columnar_memtable_insert() {
1991 let schema = test_schema();
1992 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
1993
1994 let id: u64 = 1;
1995 let name = "Alice";
1996 let score: f64 = 95.5;
1997
1998 memtable
1999 .insert(
2000 1,
2001 &[
2002 Some(&id.to_le_bytes()),
2003 Some(name.as_bytes()),
2004 Some(&score.to_le_bytes()),
2005 ],
2006 )
2007 .unwrap();
2008
2009 assert_eq!(memtable.row_count(), 1);
2010 }
2011
2012 #[test]
2013 fn test_columnar_memtable_with_nulls() {
2014 let schema = test_schema();
2015 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2016
2017 let id: u64 = 1;
2018 let name = "Bob";
2019
2020 memtable
2022 .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2023 .unwrap();
2024
2025 assert_eq!(memtable.row_count(), 1);
2026 }
2027
2028 #[test]
2029 fn test_lscs_basic() {
2030 let dir = tempdir().unwrap();
2031 let schema = test_schema();
2032 let config = LscsConfig {
2033 memtable_size: 1024,
2034 ..Default::default()
2035 };
2036
2037 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2038
2039 let id: u64 = 1;
2040 let name = "Charlie";
2041 let score: f64 = 87.2;
2042
2043 let row_id = lscs
2044 .insert(&[
2045 Some(&id.to_le_bytes()),
2046 Some(name.as_bytes()),
2047 Some(&score.to_le_bytes()),
2048 ])
2049 .unwrap();
2050
2051 assert_eq!(row_id, 1);
2052
2053 let stats = lscs.stats();
2054 assert!(stats.active_memtable_bytes > 0);
2055 }
2056
2057 #[test]
2058 fn test_column_group_write() {
2059 let dir = tempfile::tempdir().unwrap();
2060 let schema = TableSchema::new(
2061 "users".to_string(),
2062 vec![
2063 ColumnDef {
2064 name: "id".to_string(),
2065 col_type: ColumnType::UInt64,
2066 nullable: false,
2067 },
2068 ColumnDef {
2069 name: "name".to_string(),
2070 col_type: ColumnType::Text,
2071 nullable: false,
2072 },
2073 ColumnDef {
2074 name: "score".to_string(),
2075 col_type: ColumnType::Float64,
2076 nullable: true,
2077 },
2078 ],
2079 )
2080 .with_mvcc(); let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2083
2084 memtable
2087 .insert(
2088 1,
2089 &[
2090 Some(&1u64.to_le_bytes()), Some(b"Alice"), Some(&95.5f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&0u64.to_le_bytes()), ],
2096 )
2097 .unwrap();
2098
2099 memtable
2101 .insert(
2102 2,
2103 &[
2104 Some(&2u64.to_le_bytes()), Some(b"Bob"), Some(&87.2f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&200u64.to_le_bytes()), ],
2110 )
2111 .unwrap();
2112
2113 let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2114 let file_path = cg.file_path();
2115 assert!(file_path.exists());
2116 assert!(file_path.extension().unwrap() == "sst");
2117
2118 let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2120 assert_eq!(cg_opened.column_offsets.len(), 5); assert!(cg_opened.lsi.is_some());
2124 let lsi = cg_opened.lsi.as_ref().unwrap();
2125 assert!(lsi.stats().num_keys > 0);
2126 }
2127
2128 #[test]
2129 fn test_memtable_get() {
2130 let schema = test_schema();
2131 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2132
2133 let id1: u64 = 1;
2135 let name1 = "Alice";
2136 let score1: f64 = 95.5;
2137 memtable
2138 .insert(
2139 1,
2140 &[
2141 Some(&id1.to_le_bytes()),
2142 Some(name1.as_bytes()),
2143 Some(&score1.to_le_bytes()),
2144 ],
2145 )
2146 .unwrap();
2147
2148 let id2: u64 = 2;
2149 let name2 = "Bob";
2150 memtable
2151 .insert(
2152 2,
2153 &[
2154 Some(&id2.to_le_bytes()),
2155 Some(name2.as_bytes()),
2156 None, ],
2158 )
2159 .unwrap();
2160
2161 let row1 = memtable.get(1).unwrap();
2163 assert_eq!(row1.len(), 3);
2164 assert_eq!(
2165 u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2166 1
2167 );
2168 assert_eq!(
2169 std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2170 "Alice"
2171 );
2172
2173 let row2 = memtable.get(2).unwrap();
2174 assert!(row2[2].is_none()); assert!(memtable.get(999).is_none());
2178 }
2179
2180 #[test]
2181 fn test_memtable_scan_range() {
2182 let schema = test_schema();
2183 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2184
2185 for i in 1..=10 {
2187 memtable
2188 .insert(
2189 i,
2190 &[
2191 Some(&i.to_le_bytes()),
2192 Some(format!("User{}", i).as_bytes()),
2193 Some(&((i as f64) * 10.0).to_le_bytes()),
2194 ],
2195 )
2196 .unwrap();
2197 }
2198
2199 let results = memtable.scan_range(3, 7);
2201 assert_eq!(results.len(), 5);
2202
2203 for (row_id, _) in &results {
2205 assert!(*row_id >= 3 && *row_id <= 7);
2206 }
2207 }
2208
2209 #[test]
2210 fn test_lscs_get() {
2211 let dir = tempdir().unwrap();
2212 let schema = test_schema();
2213 let config = LscsConfig {
2214 memtable_size: 64 * 1024 * 1024,
2215 ..Default::default()
2216 };
2217
2218 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2219
2220 let id: u64 = 42;
2222 let name = "TestUser";
2223 let score: f64 = 99.9;
2224
2225 let row_id = lscs
2226 .insert(&[
2227 Some(&id.to_le_bytes()),
2228 Some(name.as_bytes()),
2229 Some(&score.to_le_bytes()),
2230 ])
2231 .unwrap();
2232
2233 let result = lscs.get(row_id).unwrap();
2235 assert!(result.is_some());
2236
2237 let values = result.unwrap();
2238 assert_eq!(
2239 u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2240 42
2241 );
2242 assert_eq!(
2243 std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2244 "TestUser"
2245 );
2246 }
2247
2248 #[test]
2249 fn test_lscs_fsync() {
2250 let dir = tempdir().unwrap();
2251 let schema = test_schema();
2252 let config = LscsConfig::default();
2253
2254 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2255
2256 for i in 1..=5 {
2258 lscs.insert(&[
2259 Some(&(i as u64).to_le_bytes()),
2260 Some(format!("User{}", i).as_bytes()),
2261 Some(&((i as f64) * 10.0).to_le_bytes()),
2262 ])
2263 .unwrap();
2264 }
2265
2266 lscs.fsync().unwrap();
2268
2269 let result = lscs.get(1).unwrap();
2271 assert!(result.is_some());
2272 }
2273}