1use parking_lot::RwLock;
62use serde::{Deserialize, Serialize};
63use std::collections::{BTreeMap, HashMap, HashSet};
64use std::path::{Path, PathBuf};
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU64, Ordering};
67use sochdb_core::{Result, SochDBError};
68
69use crate::txn_wal::TxnWal;
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ColumnTemperature {
81 pub name: String,
83 pub temperature: f64,
85 pub window_updates: u64,
87 pub total_updates: u64,
89 pub last_update_us: u64,
91}
92
93impl ColumnTemperature {
94 pub fn new(name: String) -> Self {
96 Self {
97 name,
98 temperature: 0.0,
99 window_updates: 0,
100 total_updates: 0,
101 last_update_us: 0,
102 }
103 }
104
105 pub fn record_update(&mut self) {
107 self.window_updates += 1;
108 self.total_updates += 1;
109 self.last_update_us = std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .unwrap()
112 .as_micros() as u64;
113 }
114
115 pub fn update_ema(&mut self, total_window_updates: u64) {
120 const ALPHA: f64 = 0.1;
121
122 let temp_current = if total_window_updates > 0 {
123 self.window_updates as f64 / total_window_updates as f64
124 } else {
125 0.0
126 };
127
128 self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
129 self.window_updates = 0;
130 }
131
132 pub fn is_hot(&self, threshold: f64) -> bool {
134 self.temperature > threshold
135 }
136}
137
138#[derive(Debug)]
140pub struct ColumnTemperatureTracker {
141 columns: RwLock<HashMap<String, ColumnTemperature>>,
143 window_size: u64,
145 window_updates: AtomicU64,
147 hot_threshold: f64,
149}
150
151impl ColumnTemperatureTracker {
152 pub fn new(column_names: &[String], window_size: u64) -> Self {
154 let mut columns = HashMap::new();
155 for name in column_names {
156 columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
157 }
158 Self {
159 columns: RwLock::new(columns),
160 window_size,
161 window_updates: AtomicU64::new(0),
162 hot_threshold: 0.1,
163 }
164 }
165
166 pub fn record_updates(&self, column_names: &[&str]) {
168 let mut cols = self.columns.write();
169 for name in column_names {
170 if let Some(temp) = cols.get_mut(*name) {
171 temp.record_update();
172 }
173 }
174
175 let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
176
177 if total >= self.window_size {
179 self.update_all_ema(&mut cols, total);
180 self.window_updates.store(0, Ordering::SeqCst);
181 }
182 }
183
184 fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
185 for temp in cols.values_mut() {
186 temp.update_ema(total);
187 }
188 }
189
190 pub fn get_hot_columns(&self) -> HashSet<String> {
192 let cols = self.columns.read();
193 cols.values()
194 .filter(|t| t.is_hot(self.hot_threshold))
195 .map(|t| t.name.clone())
196 .collect()
197 }
198
199 pub fn get_cold_columns(&self) -> HashSet<String> {
201 let cols = self.columns.read();
202 cols.values()
203 .filter(|t| !t.is_hot(self.hot_threshold))
204 .map(|t| t.name.clone())
205 .collect()
206 }
207
208 pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
210 self.columns.read().values().cloned().collect()
211 }
212
213 pub fn set_hot_threshold(&self, _threshold: f64) {
215 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
230pub struct ColumnStripeRef {
231 pub level: u32,
233 pub segment_id: u64,
235 pub column_name: String,
237 pub offset: u64,
239 pub length: u64,
241 pub row_count: u64,
243 pub compression: u8,
245}
246
247impl ColumnStripeRef {
248 pub fn new(
250 level: u32,
251 segment_id: u64,
252 column_name: String,
253 offset: u64,
254 length: u64,
255 row_count: u64,
256 ) -> Self {
257 Self {
258 level,
259 segment_id,
260 column_name,
261 offset,
262 length,
263 row_count,
264 compression: 0,
265 }
266 }
267
268 pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
270 Self {
271 level: new_level,
272 segment_id: new_segment_id,
273 column_name: self.column_name.clone(),
274 offset: new_offset,
275 length: self.length,
276 row_count: self.row_count,
277 compression: self.compression,
278 }
279 }
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct SegmentDescriptor {
288 pub id: u64,
290 pub level: u32,
292 pub col_refs: HashMap<String, ColumnStripeRef>,
294 pub min_row_id: RowId,
296 pub max_row_id: RowId,
298 pub row_count: u64,
300 pub min_timestamp: u64,
302 pub max_timestamp: u64,
304 pub is_tombstone: bool,
306}
307
308pub type ColumnId = u32;
310
311pub type RowId = u64;
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316#[repr(u8)]
317pub enum ColumnType {
318 Bool = 0,
319 Int64 = 1,
320 UInt64 = 2,
321 Float64 = 3,
322 Text = 4,
323 Binary = 5,
324 Timestamp = 6,
325}
326
327impl ColumnType {
328 pub fn fixed_size(&self) -> Option<usize> {
330 match self {
331 ColumnType::Bool => Some(1),
332 ColumnType::Int64
333 | ColumnType::UInt64
334 | ColumnType::Float64
335 | ColumnType::Timestamp => Some(8),
336 ColumnType::Text | ColumnType::Binary => None,
337 }
338 }
339
340 pub fn from_byte(b: u8) -> Option<Self> {
342 match b {
343 0 => Some(ColumnType::Bool),
344 1 => Some(ColumnType::Int64),
345 2 => Some(ColumnType::UInt64),
346 3 => Some(ColumnType::Float64),
347 4 => Some(ColumnType::Text),
348 5 => Some(ColumnType::Binary),
349 6 => Some(ColumnType::Timestamp),
350 _ => None,
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct TableSchema {
358 pub name: String,
360 pub columns: Vec<ColumnDef>,
362}
363
364impl TableSchema {
365 pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
366 Self { name, columns }
367 }
368
369 pub fn with_mvcc(mut self) -> Self {
371 if !self.columns.iter().any(|c| c.name == "__txn_start") {
372 self.columns.push(ColumnDef {
373 name: "__txn_start".to_string(),
374 col_type: ColumnType::UInt64,
375 nullable: false,
376 });
377 }
378 if !self.columns.iter().any(|c| c.name == "__txn_end") {
379 self.columns.push(ColumnDef {
380 name: "__txn_end".to_string(),
381 col_type: ColumnType::UInt64,
382 nullable: false, });
384 }
385 self
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct ColumnDef {
392 pub name: String,
394 pub col_type: ColumnType,
396 pub nullable: bool,
398}
399
400#[derive(Debug)]
402struct ColumnBuffer {
403 col_type: ColumnType,
405 data: Vec<u8>,
407 nulls: Vec<u8>,
409 offsets: Option<Vec<u32>>,
411 row_count: u64,
413}
414
415impl ColumnBuffer {
416 fn new(col_type: ColumnType) -> Self {
417 Self {
418 col_type,
419 data: Vec::new(),
420 nulls: Vec::new(),
421 offsets: if col_type.fixed_size().is_none() {
422 Some(vec![0]) } else {
424 None
425 },
426 row_count: 0,
427 }
428 }
429
430 fn append(&mut self, value: Option<&[u8]>) {
432 let bit_idx = self.row_count as usize;
434 let byte_idx = bit_idx / 8;
435 let bit_offset = bit_idx % 8;
436
437 while self.nulls.len() <= byte_idx {
438 self.nulls.push(0);
439 }
440
441 if let Some(data) = value {
442 self.nulls[byte_idx] |= 1 << bit_offset;
444
445 self.data.extend_from_slice(data);
447
448 if let Some(offsets) = &mut self.offsets {
450 offsets.push(self.data.len() as u32);
451 }
452 } else if let Some(offsets) = &mut self.offsets {
453 let last = *offsets.last().unwrap();
455 offsets.push(last);
456 }
457
458 self.row_count += 1;
459 }
460
461 fn is_null(&self, row_idx: u64) -> bool {
463 if row_idx >= self.row_count {
464 return true; }
466 let byte_idx = (row_idx / 8) as usize;
467 let bit_offset = (row_idx % 8) as u8;
468
469 if byte_idx >= self.nulls.len() {
470 return true;
471 }
472
473 (self.nulls[byte_idx] & (1 << bit_offset)) == 0
474 }
475
476 fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
479 if row_idx >= self.row_count || self.is_null(row_idx) {
480 return None;
481 }
482
483 if let Some(fixed_size) = self.col_type.fixed_size() {
484 let start = (row_idx as usize) * fixed_size;
486 let end = start + fixed_size;
487 if end <= self.data.len() {
488 Some(self.data[start..end].to_vec())
489 } else {
490 None
491 }
492 } else {
493 if let Some(offsets) = &self.offsets {
495 let start = offsets[row_idx as usize] as usize;
496 let end = offsets[(row_idx + 1) as usize] as usize;
497 if end <= self.data.len() {
498 Some(self.data[start..end].to_vec())
499 } else {
500 None
501 }
502 } else {
503 None
504 }
505 }
506 }
507
508 fn memory_bytes(&self) -> usize {
510 self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
511 }
512}
513
514#[derive(Debug)]
516pub struct ColumnarMemtable {
517 schema: TableSchema,
519 columns: Vec<RwLock<ColumnBuffer>>,
521 row_ids: RwLock<BTreeMap<RowId, u64>>,
523 row_idx_to_id: RwLock<Vec<RowId>>,
525 next_row_idx: AtomicU64,
527 bytes_written: AtomicU64,
529 size_limit: usize,
531}
532
533impl ColumnarMemtable {
534 pub fn new(schema: TableSchema, size_limit: usize) -> Self {
536 let columns = schema
537 .columns
538 .iter()
539 .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
540 .collect();
541
542 Self {
543 schema,
544 columns,
545 row_ids: RwLock::new(BTreeMap::new()),
546 row_idx_to_id: RwLock::new(Vec::new()),
547 next_row_idx: AtomicU64::new(0),
548 bytes_written: AtomicU64::new(0),
549 size_limit,
550 }
551 }
552
553 pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
557 if values.len() != self.schema.columns.len() {
558 return Err(SochDBError::InvalidData(format!(
559 "Expected {} columns, got {}",
560 self.schema.columns.len(),
561 values.len()
562 )));
563 }
564
565 let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
566
567 let mut bytes = 0usize;
569 for (i, value) in values.iter().enumerate() {
570 let mut col = self.columns[i].write();
571 if let Some(data) = value {
572 bytes += data.len();
573 }
574 col.append(*value);
575 }
576
577 {
579 let mut ids = self.row_ids.write();
580 ids.insert(row_id, row_idx);
581 }
582 {
583 let mut idx_to_id = self.row_idx_to_id.write();
584 while idx_to_id.len() <= row_idx as usize {
586 idx_to_id.push(0); }
588 idx_to_id[row_idx as usize] = row_id;
589 }
590
591 self.bytes_written
592 .fetch_add(bytes as u64, Ordering::Relaxed);
593
594 Ok(())
595 }
596
597 pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
600 let row_ids = self.row_ids.read();
602 let row_idx = *row_ids.get(&row_id)?;
603 drop(row_ids);
604
605 let mut values = Vec::with_capacity(self.columns.len());
607 for col in &self.columns {
608 let col_buf = col.read();
609 values.push(col_buf.get(row_idx));
610 }
611
612 Some(values)
613 }
614
615 pub fn get_columns(
617 &self,
618 row_id: RowId,
619 col_indices: &[usize],
620 ) -> Option<Vec<Option<Vec<u8>>>> {
621 let row_ids = self.row_ids.read();
623 let row_idx = *row_ids.get(&row_id)?;
624 drop(row_ids);
625
626 let mut values = Vec::with_capacity(col_indices.len());
628 for &col_idx in col_indices {
629 if col_idx < self.columns.len() {
630 let col_buf = self.columns[col_idx].read();
631 values.push(col_buf.get(row_idx));
632 } else {
633 values.push(None);
634 }
635 }
636
637 Some(values)
638 }
639
640 pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
642 let row_ids = self.row_ids.read();
643 let mut results = Vec::new();
644
645 for (&row_id, &row_idx) in row_ids.range(start..=end) {
646 let mut values = Vec::with_capacity(self.columns.len());
647 for col in &self.columns {
648 let col_buf = col.read();
649 values.push(col_buf.get(row_idx));
650 }
651 results.push((row_id, values));
652 }
653
654 results
655 }
656
657 pub fn is_full(&self) -> bool {
659 self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
660 }
661
662 pub fn row_count(&self) -> u64 {
664 self.next_row_idx.load(Ordering::SeqCst)
665 }
666
667 pub fn memory_bytes(&self) -> usize {
669 self.columns.iter().map(|c| c.read().memory_bytes()).sum()
670 }
671
672 pub fn schema(&self) -> &TableSchema {
674 &self.schema
675 }
676}
677
678use sochdb_core::learned_index::LearnedSparseIndex;
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct ColumnIndex {
683 pub offset: u64,
685 pub length: u64,
687 pub compression: u8,
689}
690
691#[derive(Debug)]
693#[allow(dead_code)]
694pub struct ColumnGroup {
695 path: PathBuf,
697 schema: TableSchema,
699 level: u32,
701 sequence: u64,
703 row_count: u64,
705 min_timestamp: u64,
707 max_timestamp: u64,
709 column_offsets: BTreeMap<String, ColumnIndex>,
711 lsi: Option<LearnedSparseIndex>,
713}
714
715impl ColumnGroup {
716 const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
718 const VERSION: u32 = 1;
719
720 #[allow(clippy::too_many_arguments)]
722 pub fn new(
723 path: PathBuf,
724 schema: TableSchema,
725 level: u32,
726 sequence: u64,
727 row_count: u64,
728 min_timestamp: u64,
729 max_timestamp: u64,
730 column_offsets: BTreeMap<String, ColumnIndex>,
731 lsi: Option<LearnedSparseIndex>,
732 ) -> Self {
733 Self {
734 path,
735 schema,
736 level,
737 sequence,
738 row_count,
739 min_timestamp,
740 max_timestamp,
741 column_offsets,
742 lsi,
743 }
744 }
745
746 pub fn from_memtable(
748 base_path: &Path,
749 memtable: &ColumnarMemtable,
750 level: u32,
751 sequence: u64,
752 ) -> Result<Self> {
753 use byteorder::{LittleEndian, WriteBytesExt};
754 use std::fs::File;
755 use std::io::{BufWriter, Seek, Write};
756
757 let file_name = format!("L{}_seq{}.sst", level, sequence);
759 let file_path = base_path.join(&file_name);
760 let file = File::create(&file_path)?;
761 let mut writer = BufWriter::new(file);
762
763 writer.write_all(&Self::MAGIC)?;
765 writer.write_u32::<LittleEndian>(Self::VERSION)?;
766
767 let mut column_offsets = BTreeMap::new();
768 let mut min_ts = u64::MAX;
769 let mut max_ts = 0u64;
770
771 for (i, col_lock) in memtable.columns.iter().enumerate() {
773 let col = col_lock.read();
774 let col_def = &memtable.schema.columns[i];
775
776 if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
778 let mut offset = 0;
780 let row_count = col.row_count as usize;
781 for row_idx in 0..row_count {
782 let byte_idx = row_idx / 8;
784 let bit_idx = row_idx % 8;
785 let is_null =
786 byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
787
788 if !is_null && offset + 8 <= col.data.len() {
789 let ts = u64::from_le_bytes(
790 col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
791 );
792 min_ts = min_ts.min(ts);
793 max_ts = max_ts.max(ts);
794 }
795 offset += 8;
796 }
797 }
798
799 let start_offset = writer.stream_position()?;
800
801 writer.write_u8(col.col_type as u8)?;
803 writer.write_u64::<LittleEndian>(col.row_count)?;
804
805 writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
807 writer.write_all(&col.nulls)?;
808
809 if let Some(offsets) = &col.offsets {
811 writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
812 for &off in offsets {
813 writer.write_u32::<LittleEndian>(off)?;
814 }
815 }
816
817 writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
819 writer.write_all(&col.data)?;
820
821 let end_offset = writer.stream_position()?;
822
823 column_offsets.insert(
824 col_def.name.clone(),
825 ColumnIndex {
826 offset: start_offset,
827 length: end_offset - start_offset,
828 compression: 0, },
830 );
831 }
832
833 let row_ids = memtable.row_ids.read();
835 let keys: Vec<u64> = row_ids.keys().cloned().collect();
836 let lsi = LearnedSparseIndex::build(&keys);
837
838 let footer_start = writer.stream_position()?;
840
841 let offsets_bytes = bincode::serialize(&column_offsets)
843 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
844 writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
845 writer.write_all(&offsets_bytes)?;
846
847 let lsi_bytes =
849 bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
850 writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
851 writer.write_all(&lsi_bytes)?;
852
853 writer.write_u64::<LittleEndian>(footer_start)?;
855 writer.write_all(&Self::MAGIC)?;
856
857 writer.flush()?;
858
859 if min_ts == u64::MAX || max_ts == 0 {
861 let now = std::time::SystemTime::now()
862 .duration_since(std::time::UNIX_EPOCH)
863 .unwrap()
864 .as_micros() as u64;
865 min_ts = now;
866 max_ts = now;
867 }
868
869 Ok(Self {
870 path: file_path,
871 schema: memtable.schema.clone(),
872 level,
873 sequence,
874 row_count: memtable.row_count(),
875 min_timestamp: min_ts,
876 max_timestamp: max_ts,
877 column_offsets,
878 lsi: Some(lsi),
879 })
880 }
881
882 pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
884 use byteorder::{LittleEndian, ReadBytesExt};
885 use std::fs::File;
886 use std::io::{Read, Seek, SeekFrom};
887
888 let mut file = File::open(&path)?;
889 let file_len = file.metadata()?.len();
890
891 if file_len < 12 {
892 return Err(SochDBError::Corruption("File too short".to_string()));
894 }
895
896 file.seek(SeekFrom::End(-12))?;
898 let footer_offset = file.read_u64::<LittleEndian>()?;
899 let mut magic = [0u8; 4];
900 file.read_exact(&mut magic)?;
901
902 if magic != Self::MAGIC {
903 return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
904 }
905
906 file.seek(SeekFrom::Start(footer_offset))?;
908
909 let offsets_len = file.read_u64::<LittleEndian>()?;
911 let mut offsets_bytes = vec![0u8; offsets_len as usize];
912 file.read_exact(&mut offsets_bytes)?;
913 let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
914 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
915
916 let lsi_len = file.read_u64::<LittleEndian>()?;
918 let mut lsi_bytes = vec![0u8; lsi_len as usize];
919 file.read_exact(&mut lsi_bytes)?;
920 let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
921 .map_err(|e| SochDBError::Serialization(e.to_string()))?;
922
923 Ok(Self {
924 path,
925 schema,
926 level,
927 sequence,
928 row_count: 0, min_timestamp: 0,
930 max_timestamp: 0,
931 column_offsets,
932 lsi: Some(lsi),
933 })
934 }
935
936 pub fn file_path(&self) -> &Path {
938 &self.path
939 }
940
941 pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
943 self.column_offsets.get(col_name)
944 }
945
946 pub fn level(&self) -> u32 {
948 self.level
949 }
950
951 pub fn row_count(&self) -> u64 {
953 self.row_count
954 }
955}
956
957#[derive(Debug, Clone, Default)]
963pub struct CompactionStats {
964 pub compactions_total: u64,
966 pub l0_compactions: u64,
968 pub bytes_read: u64,
970 pub bytes_written: u64,
972 pub hot_column_compactions: u64,
974 pub cold_column_refs_preserved: u64,
976 pub estimated_wa_reduction: f64,
978 pub last_compaction_duration_us: u64,
980}
981
982impl CompactionStats {
983 pub fn write_amplification(&self) -> f64 {
985 if self.bytes_read == 0 {
986 1.0
987 } else {
988 self.bytes_written as f64 / self.bytes_read as f64
989 }
990 }
991}
992
993#[derive(Debug, Clone, Default)]
995pub struct LscsRecoveryStats {
996 pub transactions_recovered: usize,
998 pub rows_recovered: usize,
1000 pub max_row_id: u64,
1002}
1003
1004#[derive(Debug, Clone)]
1006pub struct LscsConfig {
1007 pub memtable_size: usize,
1009 pub num_levels: usize,
1011 pub level_ratio: usize,
1013 pub l0_compaction_threshold: usize,
1015 pub hot_threshold: f64,
1017 pub temperature_window_size: u64,
1019}
1020
1021impl Default for LscsConfig {
1022 fn default() -> Self {
1023 Self {
1024 memtable_size: 64 * 1024 * 1024, num_levels: 7,
1026 level_ratio: 10,
1027 l0_compaction_threshold: 4,
1028 hot_threshold: 0.1, temperature_window_size: 1000, }
1031 }
1032}
1033
1034pub struct Lscs {
1036 config: LscsConfig,
1038 path: PathBuf,
1040 schema: TableSchema,
1042 wal: Arc<TxnWal>,
1044 active_memtable: RwLock<ColumnarMemtable>,
1046 immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1048 column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1050 segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1052 temperature_tracker: Arc<ColumnTemperatureTracker>,
1054 next_sequence: AtomicU64,
1056 next_row_id: AtomicU64,
1058 compaction_stats: RwLock<CompactionStats>,
1060}
1061
1062impl Lscs {
1063 pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1065 std::fs::create_dir_all(&path)?;
1066
1067 let wal_path = path.join("wal.log");
1068 let wal = Arc::new(TxnWal::new(&wal_path)?);
1069
1070 let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1071
1072 let mut column_groups = Vec::with_capacity(config.num_levels);
1073 for _ in 0..config.num_levels {
1074 column_groups.push(Vec::new());
1075 }
1076
1077 let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1079 let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1080 &column_names,
1081 config.temperature_window_size,
1082 ));
1083
1084 Ok(Self {
1085 config,
1086 path,
1087 schema,
1088 wal,
1089 active_memtable: RwLock::new(active_memtable),
1090 immutable_memtables: RwLock::new(Vec::new()),
1091 column_groups: RwLock::new(column_groups),
1092 segment_descriptors: RwLock::new(HashMap::new()),
1093 temperature_tracker,
1094 next_sequence: AtomicU64::new(0),
1095 next_row_id: AtomicU64::new(1),
1096 compaction_stats: RwLock::new(CompactionStats::default()),
1097 })
1098 }
1099
1100 pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1113 let lscs = Self::new(path, schema, config)?;
1114 let stats = lscs.recover()?;
1115
1116 if stats.rows_recovered > 0 {
1117 eprintln!(
1118 "LSCS Recovery: restored {} rows from {} transactions",
1119 stats.rows_recovered, stats.transactions_recovered
1120 );
1121 }
1122
1123 Ok(lscs)
1124 }
1125
1126 pub fn recover(&self) -> Result<LscsRecoveryStats> {
1130 let (writes, txn_count) = self.wal.replay_for_recovery()?;
1131
1132 if writes.is_empty() {
1133 return Ok(LscsRecoveryStats::default());
1134 }
1135
1136 let mut max_row_id: u64 = 0;
1137 let mut rows_recovered = 0usize;
1138
1139 for (key, value) in &writes {
1141 if key.len() >= 8 {
1143 let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1144 if row_id > max_row_id {
1145 max_row_id = row_id;
1146 }
1147
1148 if let Ok(row_values) = Self::deserialize_row(value) {
1150 let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
1151
1152 let memtable = self.active_memtable.read();
1153 if memtable.insert(row_id, &value_refs).is_ok() {
1154 rows_recovered += 1;
1155 }
1156 }
1157 }
1158 }
1159
1160 if max_row_id > 0 {
1162 self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1163 }
1164
1165 Ok(LscsRecoveryStats {
1166 transactions_recovered: txn_count,
1167 rows_recovered,
1168 max_row_id,
1169 })
1170 }
1171
1172 pub fn mark_clean_shutdown(&self) -> Result<()> {
1177 self.fsync()?;
1179
1180 let marker_path = self.path.join(".clean_shutdown");
1182 std::fs::write(&marker_path, b"clean")?;
1183
1184 Ok(())
1188 }
1189
1190 pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1192 let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1193
1194 let txn_id = self.wal.begin_transaction()?;
1196
1197 let key = row_id.to_le_bytes().to_vec();
1199 let value = self.serialize_row(values)?;
1200 self.wal.write(txn_id, key, value)?;
1201 self.wal.commit_transaction(txn_id)?;
1202
1203 let memtable = self.active_memtable.read();
1205 memtable.insert(row_id, values)?;
1206
1207 if memtable.is_full() {
1209 drop(memtable);
1210 self.rotate_memtable()?;
1211 }
1212
1213 Ok(row_id)
1214 }
1215
1216 pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1230 let txn_end_idx = self
1232 .schema
1233 .columns
1234 .iter()
1235 .position(|c| c.name == "__txn_end")
1236 .ok_or_else(|| {
1237 SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1238 })?;
1239
1240 let current = self
1242 .get(row_id)?
1243 .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1244
1245 let mut new_values: Vec<Option<Vec<u8>>> = current;
1247 new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1248
1249 let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1251
1252 let wal_txn_id = self.wal.begin_transaction()?;
1255
1256 let row_data = self.serialize_row(&value_refs)?;
1258 self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1259
1260 self.wal.commit_transaction(wal_txn_id)?;
1262
1263 let memtable = self.active_memtable.read();
1265 memtable.insert(row_id, &value_refs)?;
1266
1267 Ok(())
1268 }
1269
1270 fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1272 use byteorder::{LittleEndian, WriteBytesExt};
1273
1274 let mut buf = Vec::new();
1275 buf.write_u32::<LittleEndian>(values.len() as u32)?;
1276
1277 for value in values {
1278 match value {
1279 Some(data) => {
1280 buf.write_u8(1)?; buf.write_u32::<LittleEndian>(data.len() as u32)?;
1282 buf.extend_from_slice(data);
1283 }
1284 None => {
1285 buf.write_u8(0)?; }
1287 }
1288 }
1289
1290 Ok(buf)
1291 }
1292
1293 #[allow(dead_code)]
1295 fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1296 use byteorder::{LittleEndian, ReadBytesExt};
1297 use std::io::Cursor;
1298
1299 let mut cursor = Cursor::new(data);
1300 let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1301 let mut values = Vec::with_capacity(num_cols);
1302
1303 for _ in 0..num_cols {
1304 let is_non_null = cursor.read_u8()? == 1;
1305 if is_non_null {
1306 let len = cursor.read_u32::<LittleEndian>()? as usize;
1307 let pos = cursor.position() as usize;
1308 let value = data[pos..pos + len].to_vec();
1309 cursor.set_position((pos + len) as u64);
1310 values.push(Some(value));
1311 } else {
1312 values.push(None);
1313 }
1314 }
1315
1316 Ok(values)
1317 }
1318
1319 pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1324 {
1326 let memtable = self.active_memtable.read();
1327 if let Some(values) = memtable.get(row_id) {
1328 return Ok(Some(values));
1329 }
1330 }
1331
1332 {
1334 let immutable = self.immutable_memtables.read();
1335 for memtable in immutable.iter().rev() {
1336 if let Some(values) = memtable.get(row_id) {
1337 return Ok(Some(values));
1338 }
1339 }
1340 }
1341
1342 {
1345 use sochdb_core::learned_index::LookupResult;
1346 let groups = self.column_groups.read();
1347 for level in &*groups {
1348 for group in level.iter().rev() {
1349 if let Some(lsi) = &group.lsi {
1350 let lookup = lsi.lookup(row_id);
1352 match lookup {
1353 LookupResult::Exact(_) | LookupResult::Range { .. } => {
1354 if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1356 return Ok(Some(row));
1357 }
1358 }
1359 LookupResult::NotFound => {
1360 continue;
1362 }
1363 }
1364 }
1365 }
1366 }
1367 }
1368
1369 Ok(None)
1370 }
1371
1372 fn read_row_from_sstable(
1374 &self,
1375 group: &ColumnGroup,
1376 row_id: RowId,
1377 ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1378 use byteorder::{LittleEndian, ReadBytesExt};
1379 use std::fs::File;
1380 use std::io::{BufReader, Read, Seek, SeekFrom};
1381
1382 let file = File::open(group.file_path())?;
1383 let mut reader = BufReader::new(file);
1384
1385 let mut values = Vec::new();
1386
1387 for (col_name, col_idx) in &group.column_offsets {
1389 reader.seek(SeekFrom::Start(col_idx.offset))?;
1390
1391 let col_type = reader.read_u8()?;
1393 let row_count = reader.read_u64::<LittleEndian>()?;
1394
1395 if row_id >= row_count {
1396 values.push(None);
1397 continue;
1398 }
1399
1400 let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1402 let mut nulls = vec![0u8; nulls_len];
1403 reader.read_exact(&mut nulls)?;
1404
1405 let byte_idx = (row_id / 8) as usize;
1407 let bit_offset = (row_id % 8) as u8;
1408 let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1409
1410 if is_null {
1411 values.push(None);
1412 continue;
1413 }
1414
1415 let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1417 if let Some(fixed_size) = col_type.fixed_size() {
1418 let offsets_section = reader.stream_position()?;
1420 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1421 let _ = data_len;
1422
1423 let row_offset = (row_id as usize) * fixed_size;
1425 reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1426
1427 let mut value = vec![0u8; fixed_size];
1428 reader.read_exact(&mut value)?;
1429 values.push(Some(value));
1430 } else {
1431 let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1433 let mut offsets = vec![0u32; offsets_count];
1434 for offset in offsets.iter_mut().take(offsets_count) {
1435 *offset = reader.read_u32::<LittleEndian>()?;
1436 }
1437
1438 if (row_id as usize + 1) >= offsets.len() {
1439 values.push(None);
1440 continue;
1441 }
1442
1443 let start = offsets[row_id as usize] as usize;
1444 let end = offsets[(row_id + 1) as usize] as usize;
1445
1446 let data_len = reader.read_u32::<LittleEndian>()? as usize;
1448 let data_start = reader.stream_position()?;
1449
1450 if end <= data_len {
1451 reader.seek(SeekFrom::Start(data_start + start as u64))?;
1452 let mut value = vec![0u8; end - start];
1453 reader.read_exact(&mut value)?;
1454 values.push(Some(value));
1455 } else {
1456 values.push(None);
1457 }
1458 }
1459 let _ = col_name; }
1461
1462 if values.is_empty() {
1463 Ok(None)
1464 } else {
1465 Ok(Some(values))
1466 }
1467 }
1468
1469 pub fn fsync(&self) -> Result<()> {
1477 self.wal.sync()?;
1479
1480 let memtable = self.active_memtable.read();
1482 let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1483 drop(memtable);
1484
1485 if should_flush {
1486 self.rotate_memtable()?;
1488 self.flush()?;
1489 }
1490
1491 Ok(())
1492 }
1493
1494 fn rotate_memtable(&self) -> Result<()> {
1496 let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1497
1498 let old_memtable = {
1499 let mut active = self.active_memtable.write();
1500 std::mem::replace(&mut *active, new_memtable)
1501 };
1502
1503 let mut immutable = self.immutable_memtables.write();
1504 immutable.push(old_memtable);
1505
1506 if immutable.len() >= 2 {
1508 drop(immutable); self.flush()?;
1511 }
1512
1513 Ok(())
1514 }
1515
1516 pub fn flush(&self) -> Result<()> {
1518 let memtables = {
1519 let mut immutable = self.immutable_memtables.write();
1520 std::mem::take(&mut *immutable)
1521 };
1522
1523 for memtable in memtables {
1524 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1525 let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1526
1527 let mut groups = self.column_groups.write();
1528 groups[0].push(column_group);
1529 }
1530
1531 let groups = self.column_groups.read();
1533 if groups[0].len() >= self.config.l0_compaction_threshold {
1534 drop(groups);
1535 self.compact_l0()?;
1536 }
1537
1538 Ok(())
1539 }
1540
1541 fn compact_l0(&self) -> Result<()> {
1549 let start_time = std::time::Instant::now();
1550
1551 let hot_columns = self.temperature_tracker.get_hot_columns();
1553 let cold_columns = self.temperature_tracker.get_cold_columns();
1554
1555 let total_columns = self.schema.columns.len();
1556 let hot_fraction = if total_columns > 0 {
1557 hot_columns.len() as f64 / total_columns as f64
1558 } else {
1559 1.0
1560 };
1561
1562 let l0_segments: Vec<ColumnGroup> = {
1564 let mut groups = self.column_groups.write();
1565 std::mem::take(&mut groups[0])
1566 };
1567
1568 if l0_segments.is_empty() {
1569 return Ok(());
1570 }
1571
1572 let mut bytes_read = 0u64;
1573 let mut bytes_written = 0u64;
1574 let mut cold_refs_preserved = 0u64;
1575
1576 let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1578
1579 let _merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1587
1588 let mut col_refs = HashMap::new();
1590 let mut total_row_count = 0u64;
1591 let min_row_id = u64::MAX;
1592 let max_row_id = 0u64;
1593
1594 for segment in &l0_segments {
1596 bytes_read += segment.row_count * 100; total_row_count += segment.row_count;
1598
1599 for col_name in &hot_columns {
1601 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1602 bytes_read += col_idx.length;
1603 bytes_written += col_idx.length;
1604 }
1605 }
1606
1607 for col_name in &cold_columns {
1609 if let Some(col_idx) = segment.column_offsets.get(col_name) {
1610 let stripe_ref = ColumnStripeRef::new(
1612 segment.level,
1613 segment.sequence,
1614 col_name.clone(),
1615 col_idx.offset,
1616 col_idx.length,
1617 segment.row_count,
1618 );
1619 col_refs.insert(col_name.clone(), stripe_ref);
1620 cold_refs_preserved += 1;
1621 }
1622 }
1623 }
1624
1625 let segment_desc = SegmentDescriptor {
1627 id: sequence,
1628 level: 1,
1629 col_refs,
1630 min_row_id,
1631 max_row_id,
1632 row_count: total_row_count,
1633 min_timestamp: 0,
1634 max_timestamp: std::time::SystemTime::now()
1635 .duration_since(std::time::UNIX_EPOCH)
1636 .unwrap()
1637 .as_micros() as u64,
1638 is_tombstone: false,
1639 };
1640
1641 {
1643 let mut descriptors = self.segment_descriptors.write();
1644 descriptors.insert(sequence, segment_desc);
1645 }
1646
1647 {
1649 let mut stats = self.compaction_stats.write();
1650 stats.compactions_total += 1;
1651 stats.l0_compactions += 1;
1652 stats.bytes_read += bytes_read;
1653 stats.bytes_written += bytes_written;
1654 stats.cold_column_refs_preserved += cold_refs_preserved;
1655 stats.hot_column_compactions += 1;
1656 stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1657 stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1658 }
1659
1660 for segment in l0_segments {
1663 let _ = segment; }
1667
1668 Ok(())
1669 }
1670
1671 #[allow(dead_code)]
1683 fn selective_merge_hot_columns(
1684 &self,
1685 segments: &[&ColumnGroup],
1686 hot_columns: &HashSet<String>,
1687 output_path: &Path,
1688 ) -> Result<HashMap<String, ColumnStripeRef>> {
1689 use byteorder::{LittleEndian, WriteBytesExt};
1690 use std::fs::File;
1691 use std::io::{BufWriter, Seek, Write};
1692
1693 let mut result = HashMap::new();
1694
1695 let file = File::create(output_path)?;
1697 let mut writer = BufWriter::new(file);
1698
1699 writer.write_all(&ColumnGroup::MAGIC)?;
1701 writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1702
1703 let sequence = self.next_sequence.load(Ordering::SeqCst);
1704
1705 for col_name in hot_columns {
1707 let start_offset = writer.stream_position()?;
1708
1709 let mut merged_data = Vec::new();
1711 let mut row_count = 0u64;
1712
1713 for segment in segments {
1714 if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1715 merged_data.extend_from_slice(&[0u8; 0]); row_count += segment.row_count;
1719 }
1720 }
1721
1722 writer.write_u64::<LittleEndian>(row_count)?;
1725 writer.write_all(&merged_data)?;
1726
1727 let end_offset = writer.stream_position()?;
1728
1729 let stripe_ref = ColumnStripeRef::new(
1731 1, sequence,
1733 col_name.clone(),
1734 start_offset,
1735 end_offset - start_offset,
1736 row_count,
1737 );
1738 result.insert(col_name.clone(), stripe_ref);
1739 }
1740
1741 writer.flush()?;
1742 Ok(result)
1743 }
1744
1745 pub fn scan_columns(
1750 &self,
1751 column_names: &[&str],
1752 row_range: Option<(RowId, RowId)>,
1753 ) -> Result<Vec<Vec<u8>>> {
1754 let mut results = Vec::new();
1755
1756 let descriptors = self.segment_descriptors.read();
1758
1759 for (_seg_id, descriptor) in descriptors.iter() {
1760 if let Some((min, max)) = row_range
1762 && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1763 {
1764 continue;
1765 }
1766
1767 for col_name in column_names {
1769 if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1770 let data = self.read_column_stripe(stripe_ref)?;
1772 results.push(data);
1773 }
1774 }
1775 }
1776
1777 Ok(results)
1778 }
1779
1780 fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1782 use std::fs::File;
1783 use std::io::{Read, Seek, SeekFrom};
1784
1785 let file_path = self.path.join(format!(
1787 "L{}_seq{}.sst",
1788 stripe_ref.level, stripe_ref.segment_id
1789 ));
1790
1791 let mut file = File::open(&file_path)?;
1792 file.seek(SeekFrom::Start(stripe_ref.offset))?;
1793
1794 let mut data = vec![0u8; stripe_ref.length as usize];
1795 file.read_exact(&mut data)?;
1796
1797 Ok(data)
1798 }
1799
1800 pub fn compaction_stats(&self) -> CompactionStats {
1802 self.compaction_stats.read().clone()
1803 }
1804
1805 pub fn compact(&self) -> Result<()> {
1809 self.compact_l0()
1810 }
1811
1812 pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1814 self.temperature_tracker.get_all_temperatures()
1815 }
1816
1817 #[allow(clippy::type_complexity)]
1822 pub fn scan_range(
1823 &self,
1824 start: RowId,
1825 end: RowId,
1826 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1827 let mut results = Vec::new();
1828 let mut seen = std::collections::HashSet::new();
1829
1830 {
1832 let memtable = self.active_memtable.read();
1833 for (row_id, values) in memtable.scan_range(start, end) {
1834 if seen.insert(row_id) {
1835 results.push((row_id, values));
1836 }
1837 }
1838 }
1839
1840 {
1842 let immutable = self.immutable_memtables.read();
1843 for memtable in immutable.iter().rev() {
1844 for (row_id, values) in memtable.scan_range(start, end) {
1845 if seen.insert(row_id) {
1846 results.push((row_id, values));
1847 }
1848 }
1849 }
1850 }
1851
1852 results.sort_by_key(|(id, _)| *id);
1857
1858 Ok(results)
1859 }
1860
1861 #[allow(clippy::type_complexity)]
1865 pub fn scan_columns_range(
1866 &self,
1867 start: RowId,
1868 end: RowId,
1869 col_indices: &[usize],
1870 ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1871 let mut results = Vec::new();
1872 let mut seen = std::collections::HashSet::new();
1873
1874 {
1876 let memtable = self.active_memtable.read();
1877 let row_ids = memtable.row_ids.read();
1878
1879 for (&row_id, _) in row_ids.range(start..=end) {
1880 if seen.insert(row_id)
1881 && let Some(values) = memtable.get_columns(row_id, col_indices)
1882 {
1883 results.push((row_id, values));
1884 }
1885 }
1886 }
1887
1888 {
1890 let immutable = self.immutable_memtables.read();
1891 for memtable in immutable.iter().rev() {
1892 let row_ids = memtable.row_ids.read();
1893 for (&row_id, _) in row_ids.range(start..=end) {
1894 if seen.insert(row_id)
1895 && let Some(values) = memtable.get_columns(row_id, col_indices)
1896 {
1897 results.push((row_id, values));
1898 }
1899 }
1900 }
1901 }
1902
1903 results.sort_by_key(|(id, _)| *id);
1905
1906 Ok(results)
1907 }
1908
1909 pub fn stats(&self) -> LscsStats {
1911 let active = self.active_memtable.read();
1912 let immutable = self.immutable_memtables.read();
1913 let groups = self.column_groups.read();
1914
1915 let mut level_sizes = vec![0u64; self.config.num_levels];
1916 let mut disk_bytes = 0u64;
1917
1918 for (i, level) in groups.iter().enumerate() {
1919 for group in level {
1920 level_sizes[i] += group.row_count;
1921 if let Ok(metadata) = std::fs::metadata(&group.path) {
1923 disk_bytes += metadata.len();
1924 }
1925 }
1926 }
1927
1928 LscsStats {
1929 active_memtable_bytes: active.memory_bytes(),
1930 immutable_memtables: immutable.len(),
1931 level_row_counts: level_sizes,
1932 next_row_id: self.next_row_id.load(Ordering::SeqCst),
1933 disk_bytes,
1934 }
1935 }
1936
1937 pub fn wal(&self) -> &Arc<TxnWal> {
1939 &self.wal
1940 }
1941}
1942
1943#[derive(Debug, Clone)]
1945pub struct LscsStats {
1946 pub active_memtable_bytes: usize,
1948 pub immutable_memtables: usize,
1950 pub level_row_counts: Vec<u64>,
1952 pub next_row_id: u64,
1954 pub disk_bytes: u64,
1956}
1957
1958#[cfg(test)]
1959mod tests {
1960 use super::*;
1961 use tempfile::tempdir;
1962
1963 fn test_schema() -> TableSchema {
1964 TableSchema {
1965 name: "users".to_string(),
1966 columns: vec![
1967 ColumnDef {
1968 name: "id".to_string(),
1969 col_type: ColumnType::UInt64,
1970 nullable: false,
1971 },
1972 ColumnDef {
1973 name: "name".to_string(),
1974 col_type: ColumnType::Text,
1975 nullable: false,
1976 },
1977 ColumnDef {
1978 name: "score".to_string(),
1979 col_type: ColumnType::Float64,
1980 nullable: true,
1981 },
1982 ],
1983 }
1984 }
1985
1986 #[test]
1987 fn test_columnar_memtable_insert() {
1988 let schema = test_schema();
1989 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
1990
1991 let id: u64 = 1;
1992 let name = "Alice";
1993 let score: f64 = 95.5;
1994
1995 memtable
1996 .insert(
1997 1,
1998 &[
1999 Some(&id.to_le_bytes()),
2000 Some(name.as_bytes()),
2001 Some(&score.to_le_bytes()),
2002 ],
2003 )
2004 .unwrap();
2005
2006 assert_eq!(memtable.row_count(), 1);
2007 }
2008
2009 #[test]
2010 fn test_columnar_memtable_with_nulls() {
2011 let schema = test_schema();
2012 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2013
2014 let id: u64 = 1;
2015 let name = "Bob";
2016
2017 memtable
2019 .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2020 .unwrap();
2021
2022 assert_eq!(memtable.row_count(), 1);
2023 }
2024
2025 #[test]
2026 fn test_lscs_basic() {
2027 let dir = tempdir().unwrap();
2028 let schema = test_schema();
2029 let config = LscsConfig {
2030 memtable_size: 1024,
2031 ..Default::default()
2032 };
2033
2034 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2035
2036 let id: u64 = 1;
2037 let name = "Charlie";
2038 let score: f64 = 87.2;
2039
2040 let row_id = lscs
2041 .insert(&[
2042 Some(&id.to_le_bytes()),
2043 Some(name.as_bytes()),
2044 Some(&score.to_le_bytes()),
2045 ])
2046 .unwrap();
2047
2048 assert_eq!(row_id, 1);
2049
2050 let stats = lscs.stats();
2051 assert!(stats.active_memtable_bytes > 0);
2052 }
2053
2054 #[test]
2055 fn test_column_group_write() {
2056 let dir = tempfile::tempdir().unwrap();
2057 let schema = TableSchema::new(
2058 "users".to_string(),
2059 vec![
2060 ColumnDef {
2061 name: "id".to_string(),
2062 col_type: ColumnType::UInt64,
2063 nullable: false,
2064 },
2065 ColumnDef {
2066 name: "name".to_string(),
2067 col_type: ColumnType::Text,
2068 nullable: false,
2069 },
2070 ColumnDef {
2071 name: "score".to_string(),
2072 col_type: ColumnType::Float64,
2073 nullable: true,
2074 },
2075 ],
2076 )
2077 .with_mvcc(); let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2080
2081 memtable
2084 .insert(
2085 1,
2086 &[
2087 Some(&1u64.to_le_bytes()), Some(b"Alice"), Some(&95.5f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&0u64.to_le_bytes()), ],
2093 )
2094 .unwrap();
2095
2096 memtable
2098 .insert(
2099 2,
2100 &[
2101 Some(&2u64.to_le_bytes()), Some(b"Bob"), Some(&87.2f64.to_le_bytes()), Some(&100u64.to_le_bytes()), Some(&200u64.to_le_bytes()), ],
2107 )
2108 .unwrap();
2109
2110 let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2111 let file_path = cg.file_path();
2112 assert!(file_path.exists());
2113 assert!(file_path.extension().unwrap() == "sst");
2114
2115 let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2117 assert_eq!(cg_opened.column_offsets.len(), 5); assert!(cg_opened.lsi.is_some());
2121 let lsi = cg_opened.lsi.as_ref().unwrap();
2122 assert!(lsi.stats().num_keys > 0);
2123 }
2124
2125 #[test]
2126 fn test_memtable_get() {
2127 let schema = test_schema();
2128 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2129
2130 let id1: u64 = 1;
2132 let name1 = "Alice";
2133 let score1: f64 = 95.5;
2134 memtable
2135 .insert(
2136 1,
2137 &[
2138 Some(&id1.to_le_bytes()),
2139 Some(name1.as_bytes()),
2140 Some(&score1.to_le_bytes()),
2141 ],
2142 )
2143 .unwrap();
2144
2145 let id2: u64 = 2;
2146 let name2 = "Bob";
2147 memtable
2148 .insert(
2149 2,
2150 &[
2151 Some(&id2.to_le_bytes()),
2152 Some(name2.as_bytes()),
2153 None, ],
2155 )
2156 .unwrap();
2157
2158 let row1 = memtable.get(1).unwrap();
2160 assert_eq!(row1.len(), 3);
2161 assert_eq!(
2162 u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2163 1
2164 );
2165 assert_eq!(
2166 std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2167 "Alice"
2168 );
2169
2170 let row2 = memtable.get(2).unwrap();
2171 assert!(row2[2].is_none()); assert!(memtable.get(999).is_none());
2175 }
2176
2177 #[test]
2178 fn test_memtable_scan_range() {
2179 let schema = test_schema();
2180 let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2181
2182 for i in 1..=10 {
2184 memtable
2185 .insert(
2186 i,
2187 &[
2188 Some(&i.to_le_bytes()),
2189 Some(format!("User{}", i).as_bytes()),
2190 Some(&((i as f64) * 10.0).to_le_bytes()),
2191 ],
2192 )
2193 .unwrap();
2194 }
2195
2196 let results = memtable.scan_range(3, 7);
2198 assert_eq!(results.len(), 5);
2199
2200 for (row_id, _) in &results {
2202 assert!(*row_id >= 3 && *row_id <= 7);
2203 }
2204 }
2205
2206 #[test]
2207 fn test_lscs_get() {
2208 let dir = tempdir().unwrap();
2209 let schema = test_schema();
2210 let config = LscsConfig {
2211 memtable_size: 64 * 1024 * 1024,
2212 ..Default::default()
2213 };
2214
2215 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2216
2217 let id: u64 = 42;
2219 let name = "TestUser";
2220 let score: f64 = 99.9;
2221
2222 let row_id = lscs
2223 .insert(&[
2224 Some(&id.to_le_bytes()),
2225 Some(name.as_bytes()),
2226 Some(&score.to_le_bytes()),
2227 ])
2228 .unwrap();
2229
2230 let result = lscs.get(row_id).unwrap();
2232 assert!(result.is_some());
2233
2234 let values = result.unwrap();
2235 assert_eq!(
2236 u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2237 42
2238 );
2239 assert_eq!(
2240 std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2241 "TestUser"
2242 );
2243 }
2244
2245 #[test]
2246 fn test_lscs_fsync() {
2247 let dir = tempdir().unwrap();
2248 let schema = test_schema();
2249 let config = LscsConfig::default();
2250
2251 let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2252
2253 for i in 1..=5 {
2255 lscs.insert(&[
2256 Some(&(i as u64).to_le_bytes()),
2257 Some(format!("User{}", i).as_bytes()),
2258 Some(&((i as f64) * 10.0).to_le_bytes()),
2259 ])
2260 .unwrap();
2261 }
2262
2263 lscs.fsync().unwrap();
2265
2266 let result = lscs.get(1).unwrap();
2268 assert!(result.is_some());
2269 }
2270}