sochdb_storage/
lscs.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Log-Structured Column Store (LSCS)
16//!
17//! A columnar variant of LSM trees optimized for TOON workloads.
18//!
19//! ## Key Innovations
20//!
21//! 1. **Column-oriented memtable**: Groups data by column for sequential writes
22//! 2. **Column-aware compaction**: Only compacts changed columns (5× less write amplification)
23//! 3. **Schema-aware compression**: Uses column type for optimal encoding
24//!
25//! ## Write Amplification Analysis
26//!
27//! Traditional LSM: WA = T × (T-1) / 2  where T = size ratio (typically 10)
28//! LSCS: WA = T × (T-1) / 2 × (C_hot / C_total)
29//!
30//! If only 20% of columns change frequently: **5× less write amplification**
31//!
32//! ## Column-Aware Compaction Model
33//!
34//! Standard LSM Write Amplification:
35//!    WA = (T/(T-1)) × Σᵢ₌₀ᵏ (1/Tⁱ) ≈ (T/(T-1)) × log_T(N/M)
36//!
37//! Column-Aware WA:
38//!    Let C = {c₁, c₂, ..., c_K} be columns
39//!    Hot columns: H = {cⱼ | temp(cⱼ) > θ} where θ = 0.1
40//!    WA_col = (|H|/|C|) × WA = f × WA
41//!
42//!    Example: If 2 out of 10 columns hot: f = 0.2, 5x reduction
43//!
44//! ## Architecture
45//!
46//! ```text
47//! ┌────────────────┐
48//! │ MemTable       │
49//! │ [col1][col2]...│ ← Columns in memory
50//! └───────┬────────┘
51//!         │ flush
52//!         ▼
53//! ┌────────────────────────────┐
54//! │ Column Group L0            │
55//! │ ┌─────┐┌─────┐┌─────┐     │
56//! │ │col1 ││col2 ││col3 │     │
57//! │ └─────┘└─────┘└─────┘     │
58//! └────────────────────────────┘
59//! ```
60
61use parking_lot::RwLock;
62use serde::{Deserialize, Serialize};
63use std::collections::{BTreeMap, HashMap, HashSet};
64use std::path::{Path, PathBuf};
65use std::sync::Arc;
66use std::sync::atomic::{AtomicU64, Ordering};
67use sochdb_core::{Result, SochDBError};
68
69use crate::txn_wal::TxnWal;
70
71// ============================================================================
72// Column Temperature Tracking (Task 3)
73// ============================================================================
74
75/// Temperature tracking for a single column using Exponential Moving Average.
76///
77/// Temperature is computed as: temp_new = α × temp_current + (1-α) × temp_old
78/// where α = 0.1 (decay factor) and temp_current = updates_in_window / total_updates
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ColumnTemperature {
81    /// Column name
82    pub name: String,
83    /// Current temperature (0.0 = cold, 1.0 = hot)
84    pub temperature: f64,
85    /// Updates in current window
86    pub window_updates: u64,
87    /// Total updates since last decay
88    pub total_updates: u64,
89    /// Last update timestamp (micros)
90    pub last_update_us: u64,
91}
92
93impl ColumnTemperature {
94    /// Create a new temperature tracker for a column
95    pub fn new(name: String) -> Self {
96        Self {
97            name,
98            temperature: 0.0,
99            window_updates: 0,
100            total_updates: 0,
101            last_update_us: 0,
102        }
103    }
104
105    /// Record an update to this column
106    pub fn record_update(&mut self) {
107        self.window_updates += 1;
108        self.total_updates += 1;
109        self.last_update_us = std::time::SystemTime::now()
110            .duration_since(std::time::UNIX_EPOCH)
111            .unwrap()
112            .as_micros() as u64;
113    }
114
115    /// Update temperature using EMA when window completes
116    ///
117    /// α = 0.1 (decay factor)
118    /// temp_new = α × temp_current + (1-α) × temp_old
119    pub fn update_ema(&mut self, total_window_updates: u64) {
120        const ALPHA: f64 = 0.1;
121
122        let temp_current = if total_window_updates > 0 {
123            self.window_updates as f64 / total_window_updates as f64
124        } else {
125            0.0
126        };
127
128        self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
129        self.window_updates = 0;
130    }
131
132    /// Check if column is "hot" (above threshold)
133    pub fn is_hot(&self, threshold: f64) -> bool {
134        self.temperature > threshold
135    }
136}
137
138/// Column temperature tracker for all columns in a table
139#[derive(Debug)]
140pub struct ColumnTemperatureTracker {
141    /// Per-column temperature (column name -> temperature)
142    columns: RwLock<HashMap<String, ColumnTemperature>>,
143    /// Window size for temperature updates
144    window_size: u64,
145    /// Current window update count
146    window_updates: AtomicU64,
147    /// Hot threshold (default 0.1)
148    hot_threshold: f64,
149}
150
151impl ColumnTemperatureTracker {
152    /// Create a new temperature tracker
153    pub fn new(column_names: &[String], window_size: u64) -> Self {
154        let mut columns = HashMap::new();
155        for name in column_names {
156            columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
157        }
158        Self {
159            columns: RwLock::new(columns),
160            window_size,
161            window_updates: AtomicU64::new(0),
162            hot_threshold: 0.1,
163        }
164    }
165
166    /// Record an update to specific columns
167    pub fn record_updates(&self, column_names: &[&str]) {
168        let mut cols = self.columns.write();
169        for name in column_names {
170            if let Some(temp) = cols.get_mut(*name) {
171                temp.record_update();
172            }
173        }
174
175        let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
176
177        // Check if window is complete
178        if total >= self.window_size {
179            self.update_all_ema(&mut cols, total);
180            self.window_updates.store(0, Ordering::SeqCst);
181        }
182    }
183
184    fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
185        for temp in cols.values_mut() {
186            temp.update_ema(total);
187        }
188    }
189
190    /// Get hot columns (above threshold)
191    pub fn get_hot_columns(&self) -> HashSet<String> {
192        let cols = self.columns.read();
193        cols.values()
194            .filter(|t| t.is_hot(self.hot_threshold))
195            .map(|t| t.name.clone())
196            .collect()
197    }
198
199    /// Get cold columns (at or below threshold)
200    pub fn get_cold_columns(&self) -> HashSet<String> {
201        let cols = self.columns.read();
202        cols.values()
203            .filter(|t| !t.is_hot(self.hot_threshold))
204            .map(|t| t.name.clone())
205            .collect()
206    }
207
208    /// Get all temperatures for reporting
209    pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
210        self.columns.read().values().cloned().collect()
211    }
212
213    /// Set hot threshold
214    pub fn set_hot_threshold(&self, _threshold: f64) {
215        // Note: This would require interior mutability for hot_threshold
216        // For now this is a no-op; in practice use configuration
217    }
218}
219
220// ============================================================================
221// Column Stripe References (Task 3)
222// ============================================================================
223
224/// Reference to a column stripe stored at a specific level
225///
226/// Allows columns to be stored at different levels independently,
227/// enabling selective compaction of hot columns while cold columns
228/// remain at lower levels.
229#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
230pub struct ColumnStripeRef {
231    /// Level where stripe is stored
232    pub level: u32,
233    /// Segment ID within level
234    pub segment_id: u64,
235    /// Column name
236    pub column_name: String,
237    /// Offset within segment file
238    pub offset: u64,
239    /// Length in bytes
240    pub length: u64,
241    /// Row count in stripe
242    pub row_count: u64,
243    /// Compression type
244    pub compression: u8,
245}
246
247impl ColumnStripeRef {
248    /// Create a new stripe reference
249    pub fn new(
250        level: u32,
251        segment_id: u64,
252        column_name: String,
253        offset: u64,
254        length: u64,
255        row_count: u64,
256    ) -> Self {
257        Self {
258            level,
259            segment_id,
260            column_name,
261            offset,
262            length,
263            row_count,
264            compression: 0,
265        }
266    }
267
268    /// Create a reference pointing to a new location after compaction
269    pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
270        Self {
271            level: new_level,
272            segment_id: new_segment_id,
273            column_name: self.column_name.clone(),
274            offset: new_offset,
275            length: self.length,
276            row_count: self.row_count,
277            compression: self.compression,
278        }
279    }
280}
281
282/// Segment descriptor with column stripe references
283///
284/// Instead of storing all column data inline, the segment stores
285/// references to column stripes which may be at different levels.
286#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct SegmentDescriptor {
288    /// Segment ID
289    pub id: u64,
290    /// Level
291    pub level: u32,
292    /// Column stripe references (column name -> stripe ref)
293    pub col_refs: HashMap<String, ColumnStripeRef>,
294    /// Min row ID in segment
295    pub min_row_id: RowId,
296    /// Max row ID in segment
297    pub max_row_id: RowId,
298    /// Row count
299    pub row_count: u64,
300    /// Min timestamp
301    pub min_timestamp: u64,
302    /// Max timestamp
303    pub max_timestamp: u64,
304    /// Is tombstone (deleted after compaction)
305    pub is_tombstone: bool,
306}
307
308/// Column ID type
309pub type ColumnId = u32;
310
311/// Row ID type (globally unique within table)
312pub type RowId = u64;
313
314/// Column data type for storage
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316#[repr(u8)]
317pub enum ColumnType {
318    Bool = 0,
319    Int64 = 1,
320    UInt64 = 2,
321    Float64 = 3,
322    Text = 4,
323    Binary = 5,
324    Timestamp = 6,
325}
326
327impl ColumnType {
328    /// Fixed size in bytes, or None for variable-length
329    pub fn fixed_size(&self) -> Option<usize> {
330        match self {
331            ColumnType::Bool => Some(1),
332            ColumnType::Int64
333            | ColumnType::UInt64
334            | ColumnType::Float64
335            | ColumnType::Timestamp => Some(8),
336            ColumnType::Text | ColumnType::Binary => None,
337        }
338    }
339
340    /// From byte
341    pub fn from_byte(b: u8) -> Option<Self> {
342        match b {
343            0 => Some(ColumnType::Bool),
344            1 => Some(ColumnType::Int64),
345            2 => Some(ColumnType::UInt64),
346            3 => Some(ColumnType::Float64),
347            4 => Some(ColumnType::Text),
348            5 => Some(ColumnType::Binary),
349            6 => Some(ColumnType::Timestamp),
350            _ => None,
351        }
352    }
353}
354
355/// Schema for a table in LSCS
356#[derive(Debug, Clone)]
357pub struct TableSchema {
358    /// Table name
359    pub name: String,
360    /// Column definitions
361    pub columns: Vec<ColumnDef>,
362}
363
364impl TableSchema {
365    pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
366        Self { name, columns }
367    }
368
369    /// Add MVCC columns (__txn_start, __txn_end) if not present
370    pub fn with_mvcc(mut self) -> Self {
371        if !self.columns.iter().any(|c| c.name == "__txn_start") {
372            self.columns.push(ColumnDef {
373                name: "__txn_start".to_string(),
374                col_type: ColumnType::UInt64,
375                nullable: false,
376            });
377        }
378        if !self.columns.iter().any(|c| c.name == "__txn_end") {
379            self.columns.push(ColumnDef {
380                name: "__txn_end".to_string(),
381                col_type: ColumnType::UInt64,
382                nullable: false, // 0 or MAX for active/infinity
383            });
384        }
385        self
386    }
387}
388
389/// Column definition
390#[derive(Debug, Clone)]
391pub struct ColumnDef {
392    /// Column name
393    pub name: String,
394    /// Column type
395    pub col_type: ColumnType,
396    /// Is nullable
397    pub nullable: bool,
398}
399
400/// In-memory column buffer with O(1) random access
401#[derive(Debug)]
402struct ColumnBuffer {
403    /// Column type
404    col_type: ColumnType,
405    /// Data bytes
406    data: Vec<u8>,
407    /// Null bitmap (bit per row, 1 = non-null)
408    nulls: Vec<u8>,
409    /// Offsets for variable-length types
410    offsets: Option<Vec<u32>>,
411    /// Row count
412    row_count: u64,
413}
414
415impl ColumnBuffer {
416    fn new(col_type: ColumnType) -> Self {
417        Self {
418            col_type,
419            data: Vec::new(),
420            nulls: Vec::new(),
421            offsets: if col_type.fixed_size().is_none() {
422                Some(vec![0]) // Initial offset
423            } else {
424                None
425            },
426            row_count: 0,
427        }
428    }
429
430    /// Append a value (bytes)
431    fn append(&mut self, value: Option<&[u8]>) {
432        // Update null bitmap
433        let bit_idx = self.row_count as usize;
434        let byte_idx = bit_idx / 8;
435        let bit_offset = bit_idx % 8;
436
437        while self.nulls.len() <= byte_idx {
438            self.nulls.push(0);
439        }
440
441        if let Some(data) = value {
442            // Set non-null bit
443            self.nulls[byte_idx] |= 1 << bit_offset;
444
445            // Append data
446            self.data.extend_from_slice(data);
447
448            // Update offsets for variable-length
449            if let Some(offsets) = &mut self.offsets {
450                offsets.push(self.data.len() as u32);
451            }
452        } else if let Some(offsets) = &mut self.offsets {
453            // Null value - repeat last offset
454            let last = *offsets.last().unwrap();
455            offsets.push(last);
456        }
457
458        self.row_count += 1;
459    }
460
461    /// Check if value at row_idx is null
462    fn is_null(&self, row_idx: u64) -> bool {
463        if row_idx >= self.row_count {
464            return true; // Out of bounds treated as null
465        }
466        let byte_idx = (row_idx / 8) as usize;
467        let bit_offset = (row_idx % 8) as u8;
468
469        if byte_idx >= self.nulls.len() {
470            return true;
471        }
472
473        (self.nulls[byte_idx] & (1 << bit_offset)) == 0
474    }
475
476    /// Get value at row_idx
477    /// Returns None if null, Some(bytes) if non-null
478    fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
479        if row_idx >= self.row_count || self.is_null(row_idx) {
480            return None;
481        }
482
483        if let Some(fixed_size) = self.col_type.fixed_size() {
484            // Fixed-size column: O(1) access
485            let start = (row_idx as usize) * fixed_size;
486            let end = start + fixed_size;
487            if end <= self.data.len() {
488                Some(self.data[start..end].to_vec())
489            } else {
490                None
491            }
492        } else {
493            // Variable-length column: use offsets
494            if let Some(offsets) = &self.offsets {
495                let start = offsets[row_idx as usize] as usize;
496                let end = offsets[(row_idx + 1) as usize] as usize;
497                if end <= self.data.len() {
498                    Some(self.data[start..end].to_vec())
499                } else {
500                    None
501                }
502            } else {
503                None
504            }
505        }
506    }
507
508    /// Memory usage in bytes
509    fn memory_bytes(&self) -> usize {
510        self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
511    }
512}
513
514/// Columnar memtable with skip-list-like concurrent access
515#[derive(Debug)]
516pub struct ColumnarMemtable {
517    /// Table schema
518    schema: TableSchema,
519    /// Column buffers (one per column)
520    columns: Vec<RwLock<ColumnBuffer>>,
521    /// Row ID to row index mapping (skip-list for O(log N) lookup)
522    row_ids: RwLock<BTreeMap<RowId, u64>>,
523    /// Reverse mapping: row index -> row ID (for range scans)
524    row_idx_to_id: RwLock<Vec<RowId>>,
525    /// Next row index
526    next_row_idx: AtomicU64,
527    /// Total bytes written
528    bytes_written: AtomicU64,
529    /// Memtable size limit
530    size_limit: usize,
531}
532
533impl ColumnarMemtable {
534    /// Create a new columnar memtable
535    pub fn new(schema: TableSchema, size_limit: usize) -> Self {
536        let columns = schema
537            .columns
538            .iter()
539            .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
540            .collect();
541
542        Self {
543            schema,
544            columns,
545            row_ids: RwLock::new(BTreeMap::new()),
546            row_idx_to_id: RwLock::new(Vec::new()),
547            next_row_idx: AtomicU64::new(0),
548            bytes_written: AtomicU64::new(0),
549            size_limit,
550        }
551    }
552
553    /// Insert a row
554    ///
555    /// `values` must have the same length as schema columns
556    pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
557        if values.len() != self.schema.columns.len() {
558            return Err(SochDBError::InvalidData(format!(
559                "Expected {} columns, got {}",
560                self.schema.columns.len(),
561                values.len()
562            )));
563        }
564
565        let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
566
567        // Insert into each column
568        let mut bytes = 0usize;
569        for (i, value) in values.iter().enumerate() {
570            let mut col = self.columns[i].write();
571            if let Some(data) = value {
572                bytes += data.len();
573            }
574            col.append(*value);
575        }
576
577        // Update row ID mapping (forward and reverse)
578        {
579            let mut ids = self.row_ids.write();
580            ids.insert(row_id, row_idx);
581        }
582        {
583            let mut idx_to_id = self.row_idx_to_id.write();
584            // Ensure vector is large enough
585            while idx_to_id.len() <= row_idx as usize {
586                idx_to_id.push(0); // placeholder
587            }
588            idx_to_id[row_idx as usize] = row_id;
589        }
590
591        self.bytes_written
592            .fetch_add(bytes as u64, Ordering::Relaxed);
593
594        Ok(())
595    }
596
597    /// Get a row by row ID (O(log N) lookup via BTreeMap)
598    /// Returns all column values for the row
599    pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
600        // Look up row index from row ID
601        let row_ids = self.row_ids.read();
602        let row_idx = *row_ids.get(&row_id)?;
603        drop(row_ids);
604
605        // Read all columns for this row
606        let mut values = Vec::with_capacity(self.columns.len());
607        for col in &self.columns {
608            let col_buf = col.read();
609            values.push(col_buf.get(row_idx));
610        }
611
612        Some(values)
613    }
614
615    /// Get specific columns for a row by row ID
616    pub fn get_columns(
617        &self,
618        row_id: RowId,
619        col_indices: &[usize],
620    ) -> Option<Vec<Option<Vec<u8>>>> {
621        // Look up row index from row ID
622        let row_ids = self.row_ids.read();
623        let row_idx = *row_ids.get(&row_id)?;
624        drop(row_ids);
625
626        // Read only requested columns
627        let mut values = Vec::with_capacity(col_indices.len());
628        for &col_idx in col_indices {
629            if col_idx < self.columns.len() {
630                let col_buf = self.columns[col_idx].read();
631                values.push(col_buf.get(row_idx));
632            } else {
633                values.push(None);
634            }
635        }
636
637        Some(values)
638    }
639
640    /// Scan a range of row IDs, returning all matching rows
641    pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
642        let row_ids = self.row_ids.read();
643        let mut results = Vec::new();
644
645        for (&row_id, &row_idx) in row_ids.range(start..=end) {
646            let mut values = Vec::with_capacity(self.columns.len());
647            for col in &self.columns {
648                let col_buf = col.read();
649                values.push(col_buf.get(row_idx));
650            }
651            results.push((row_id, values));
652        }
653
654        results
655    }
656
657    /// Check if memtable is full
658    pub fn is_full(&self) -> bool {
659        self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
660    }
661
662    /// Get row count
663    pub fn row_count(&self) -> u64 {
664        self.next_row_idx.load(Ordering::SeqCst)
665    }
666
667    /// Get memory usage
668    pub fn memory_bytes(&self) -> usize {
669        self.columns.iter().map(|c| c.read().memory_bytes()).sum()
670    }
671
672    /// Get schema
673    pub fn schema(&self) -> &TableSchema {
674        &self.schema
675    }
676}
677
678use sochdb_core::learned_index::LearnedSparseIndex;
679
680/// Metadata for a stored column
681#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct ColumnIndex {
683    /// Offset in the file
684    pub offset: u64,
685    /// Length in bytes
686    pub length: u64,
687    /// Compression type (0 = None for now)
688    pub compression: u8,
689}
690
691/// Column group stored on disk
692#[derive(Debug)]
693#[allow(dead_code)]
694pub struct ColumnGroup {
695    /// Path to the monolithic .sst file
696    path: PathBuf,
697    /// Table schema
698    schema: TableSchema,
699    /// Level in LSM tree (0 = L0)
700    level: u32,
701    /// Sequence number
702    sequence: u64,
703    /// Row count
704    row_count: u64,
705    /// Min timestamp
706    min_timestamp: u64,
707    /// Max timestamp
708    max_timestamp: u64,
709    /// Column offsets loaded from footer
710    column_offsets: BTreeMap<String, ColumnIndex>,
711    /// Learned Sparse Index for RowId lookup
712    lsi: Option<LearnedSparseIndex>,
713}
714
715impl ColumnGroup {
716    /// Magic bytes for SSTable file (TOON + version 1)
717    const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
718    const VERSION: u32 = 1;
719
720    /// Create metadata for a column group
721    #[allow(clippy::too_many_arguments)]
722    pub fn new(
723        path: PathBuf,
724        schema: TableSchema,
725        level: u32,
726        sequence: u64,
727        row_count: u64,
728        min_timestamp: u64,
729        max_timestamp: u64,
730        column_offsets: BTreeMap<String, ColumnIndex>,
731        lsi: Option<LearnedSparseIndex>,
732    ) -> Self {
733        Self {
734            path,
735            schema,
736            level,
737            sequence,
738            row_count,
739            min_timestamp,
740            max_timestamp,
741            column_offsets,
742            lsi,
743        }
744    }
745
746    /// Write memtable to disk as a single SSTable file
747    pub fn from_memtable(
748        base_path: &Path,
749        memtable: &ColumnarMemtable,
750        level: u32,
751        sequence: u64,
752    ) -> Result<Self> {
753        use byteorder::{LittleEndian, WriteBytesExt};
754        use std::fs::File;
755        use std::io::{BufWriter, Seek, Write};
756
757        // Create monolithic file: L{level}_seq{sequence}.sst
758        let file_name = format!("L{}_seq{}.sst", level, sequence);
759        let file_path = base_path.join(&file_name);
760        let file = File::create(&file_path)?;
761        let mut writer = BufWriter::new(file);
762
763        // Write Header
764        writer.write_all(&Self::MAGIC)?;
765        writer.write_u32::<LittleEndian>(Self::VERSION)?;
766
767        let mut column_offsets = BTreeMap::new();
768        let mut min_ts = u64::MAX;
769        let mut max_ts = 0u64;
770
771        // Write each column sequentially
772        for (i, col_lock) in memtable.columns.iter().enumerate() {
773            let col = col_lock.read();
774            let col_def = &memtable.schema.columns[i];
775
776            // Extract min/max timestamps from __txn_start column
777            if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
778                // Parse u64 timestamps from the column data
779                let mut offset = 0;
780                let row_count = col.row_count as usize;
781                for row_idx in 0..row_count {
782                    // Check if not null
783                    let byte_idx = row_idx / 8;
784                    let bit_idx = row_idx % 8;
785                    let is_null =
786                        byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
787
788                    if !is_null && offset + 8 <= col.data.len() {
789                        let ts = u64::from_le_bytes(
790                            col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
791                        );
792                        min_ts = min_ts.min(ts);
793                        max_ts = max_ts.max(ts);
794                    }
795                    offset += 8;
796                }
797            }
798
799            let start_offset = writer.stream_position()?;
800
801            // Column Header within the block
802            writer.write_u8(col.col_type as u8)?;
803            writer.write_u64::<LittleEndian>(col.row_count)?;
804
805            // Null bitmap
806            writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
807            writer.write_all(&col.nulls)?;
808
809            // Offsets (if variable-length)
810            if let Some(offsets) = &col.offsets {
811                writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
812                for &off in offsets {
813                    writer.write_u32::<LittleEndian>(off)?;
814                }
815            }
816
817            // Data
818            writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
819            writer.write_all(&col.data)?;
820
821            let end_offset = writer.stream_position()?;
822
823            column_offsets.insert(
824                col_def.name.clone(),
825                ColumnIndex {
826                    offset: start_offset,
827                    length: end_offset - start_offset,
828                    compression: 0, // No compression yet
829                },
830            );
831        }
832
833        // Build Learned Sparse Index on RowIds
834        let row_ids = memtable.row_ids.read();
835        let keys: Vec<u64> = row_ids.keys().cloned().collect();
836        let lsi = LearnedSparseIndex::build(&keys);
837
838        // Write Footer (Index + LSI)
839        let footer_start = writer.stream_position()?;
840
841        // Serialize Column Offsets
842        let offsets_bytes = bincode::serialize(&column_offsets)
843            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
844        writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
845        writer.write_all(&offsets_bytes)?;
846
847        // Serialize LSI
848        let lsi_bytes =
849            bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
850        writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
851        writer.write_all(&lsi_bytes)?;
852
853        // Write Footer Offset and Magic at the very end
854        writer.write_u64::<LittleEndian>(footer_start)?;
855        writer.write_all(&Self::MAGIC)?;
856
857        writer.flush()?;
858
859        // Fallback: use current time if no timestamps were found in data
860        if min_ts == u64::MAX || max_ts == 0 {
861            let now = std::time::SystemTime::now()
862                .duration_since(std::time::UNIX_EPOCH)
863                .unwrap()
864                .as_micros() as u64;
865            min_ts = now;
866            max_ts = now;
867        }
868
869        Ok(Self {
870            path: file_path,
871            schema: memtable.schema.clone(),
872            level,
873            sequence,
874            row_count: memtable.row_count(),
875            min_timestamp: min_ts,
876            max_timestamp: max_ts,
877            column_offsets,
878            lsi: Some(lsi),
879        })
880    }
881
882    /// Open a ColumnGroup from an existing SST file
883    pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
884        use byteorder::{LittleEndian, ReadBytesExt};
885        use std::fs::File;
886        use std::io::{Read, Seek, SeekFrom};
887
888        let mut file = File::open(&path)?;
889        let file_len = file.metadata()?.len();
890
891        if file_len < 12 {
892            // Magic (4) + FooterOffset (8)
893            return Err(SochDBError::Corruption("File too short".to_string()));
894        }
895
896        // Read Footer Offset
897        file.seek(SeekFrom::End(-12))?;
898        let footer_offset = file.read_u64::<LittleEndian>()?;
899        let mut magic = [0u8; 4];
900        file.read_exact(&mut magic)?;
901
902        if magic != Self::MAGIC {
903            return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
904        }
905
906        // Read Footer
907        file.seek(SeekFrom::Start(footer_offset))?;
908
909        // Read Column Offsets
910        let offsets_len = file.read_u64::<LittleEndian>()?;
911        let mut offsets_bytes = vec![0u8; offsets_len as usize];
912        file.read_exact(&mut offsets_bytes)?;
913        let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
914            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
915
916        // Read LSI
917        let lsi_len = file.read_u64::<LittleEndian>()?;
918        let mut lsi_bytes = vec![0u8; lsi_len as usize];
919        file.read_exact(&mut lsi_bytes)?;
920        let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
921            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
922
923        Ok(Self {
924            path,
925            schema,
926            level,
927            sequence,
928            row_count: 0, // Needs to be fixed by storing in footer
929            min_timestamp: 0,
930            max_timestamp: 0,
931            column_offsets,
932            lsi: Some(lsi),
933        })
934    }
935
936    /// Get path to the SST file
937    pub fn file_path(&self) -> &Path {
938        &self.path
939    }
940
941    /// Get offset info for a column
942    pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
943        self.column_offsets.get(col_name)
944    }
945
946    /// Get level
947    pub fn level(&self) -> u32 {
948        self.level
949    }
950
951    /// Get row count
952    pub fn row_count(&self) -> u64 {
953        self.row_count
954    }
955}
956
957// ============================================================================
958// Compaction Statistics (Task 3)
959// ============================================================================
960
961/// Statistics for column-aware compaction
962#[derive(Debug, Clone, Default)]
963pub struct CompactionStats {
964    /// Total compactions performed
965    pub compactions_total: u64,
966    /// L0 to L1 compactions
967    pub l0_compactions: u64,
968    /// Total bytes read during compaction
969    pub bytes_read: u64,
970    /// Total bytes written during compaction
971    pub bytes_written: u64,
972    /// Hot column compactions (only hot columns merged)
973    pub hot_column_compactions: u64,
974    /// Cold column references preserved (not rewritten)
975    pub cold_column_refs_preserved: u64,
976    /// Estimated write amplification reduction
977    pub estimated_wa_reduction: f64,
978    /// Last compaction duration (micros)
979    pub last_compaction_duration_us: u64,
980}
981
982impl CompactionStats {
983    /// Calculate write amplification factor
984    pub fn write_amplification(&self) -> f64 {
985        if self.bytes_read == 0 {
986            1.0
987        } else {
988            self.bytes_written as f64 / self.bytes_read as f64
989        }
990    }
991}
992
993/// Statistics from WAL recovery
994#[derive(Debug, Clone, Default)]
995pub struct LscsRecoveryStats {
996    /// Number of transactions successfully replayed
997    pub transactions_recovered: usize,
998    /// Number of rows restored to memtable
999    pub rows_recovered: usize,
1000    /// Maximum row ID found (used to set next_row_id)
1001    pub max_row_id: u64,
1002}
1003
1004/// LSCS configuration
1005#[derive(Debug, Clone)]
1006pub struct LscsConfig {
1007    /// Memtable size limit in bytes
1008    pub memtable_size: usize,
1009    /// Number of levels
1010    pub num_levels: usize,
1011    /// Size ratio between levels
1012    pub level_ratio: usize,
1013    /// Maximum L0 column groups before compaction
1014    pub l0_compaction_threshold: usize,
1015    /// Hot column temperature threshold (0.0-1.0)
1016    pub hot_threshold: f64,
1017    /// Temperature window size (number of updates)
1018    pub temperature_window_size: u64,
1019}
1020
1021impl Default for LscsConfig {
1022    fn default() -> Self {
1023        Self {
1024            memtable_size: 64 * 1024 * 1024, // 64 MB
1025            num_levels: 7,
1026            level_ratio: 10,
1027            l0_compaction_threshold: 4,
1028            hot_threshold: 0.1,            // 10% threshold for "hot" column
1029            temperature_window_size: 1000, // 1000 updates per window
1030        }
1031    }
1032}
1033
1034/// Log-Structured Column Store
1035pub struct Lscs {
1036    /// Configuration
1037    config: LscsConfig,
1038    /// Base path for storage
1039    path: PathBuf,
1040    /// Table schema
1041    schema: TableSchema,
1042    /// Write-ahead log
1043    wal: Arc<TxnWal>,
1044    /// Active memtable
1045    active_memtable: RwLock<ColumnarMemtable>,
1046    /// Immutable memtables pending flush
1047    immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1048    /// Column groups by level
1049    column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1050    /// Segment descriptors with column stripe references (Task 3)
1051    segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1052    /// Column temperature tracker (Task 3)
1053    temperature_tracker: Arc<ColumnTemperatureTracker>,
1054    /// Next sequence number
1055    next_sequence: AtomicU64,
1056    /// Next row ID
1057    next_row_id: AtomicU64,
1058    /// Compaction statistics (Task 3)
1059    compaction_stats: RwLock<CompactionStats>,
1060}
1061
1062impl Lscs {
1063    /// Create a new LSCS instance
1064    pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1065        std::fs::create_dir_all(&path)?;
1066
1067        let wal_path = path.join("wal.log");
1068        let wal = Arc::new(TxnWal::new(&wal_path)?);
1069
1070        let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1071
1072        let mut column_groups = Vec::with_capacity(config.num_levels);
1073        for _ in 0..config.num_levels {
1074            column_groups.push(Vec::new());
1075        }
1076
1077        // Create temperature tracker for all columns (Task 3)
1078        let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1079        let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1080            &column_names,
1081            config.temperature_window_size,
1082        ));
1083
1084        Ok(Self {
1085            config,
1086            path,
1087            schema,
1088            wal,
1089            active_memtable: RwLock::new(active_memtable),
1090            immutable_memtables: RwLock::new(Vec::new()),
1091            column_groups: RwLock::new(column_groups),
1092            segment_descriptors: RwLock::new(HashMap::new()),
1093            temperature_tracker,
1094            next_sequence: AtomicU64::new(0),
1095            next_row_id: AtomicU64::new(1),
1096            compaction_stats: RwLock::new(CompactionStats::default()),
1097        })
1098    }
1099
1100    /// Open an existing LSCS instance and recover from WAL
1101    ///
1102    /// This is the production entrypoint that ensures durability:
1103    /// 1. Opens the existing storage directory
1104    /// 2. Replays committed transactions from WAL into memtable
1105    /// 3. Updates next_row_id to prevent ID conflicts
1106    ///
1107    /// ## Crash Recovery Guarantees
1108    ///
1109    /// - Only committed transactions are replayed (atomicity)
1110    /// - All committed data is restored (durability)
1111    /// - Uncommitted transactions are discarded (consistency)
1112    pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1113        let lscs = Self::new(path, schema, config)?;
1114        let stats = lscs.recover()?;
1115        
1116        if stats.rows_recovered > 0 {
1117            eprintln!(
1118                "LSCS Recovery: restored {} rows from {} transactions",
1119                stats.rows_recovered, stats.transactions_recovered
1120            );
1121        }
1122        
1123        Ok(lscs)
1124    }
1125
1126    /// Perform crash recovery by replaying committed WAL entries
1127    ///
1128    /// Returns statistics about recovered data.
1129    pub fn recover(&self) -> Result<LscsRecoveryStats> {
1130        let (writes, txn_count) = self.wal.replay_for_recovery()?;
1131
1132        if writes.is_empty() {
1133            return Ok(LscsRecoveryStats::default());
1134        }
1135
1136        let mut max_row_id: u64 = 0;
1137        let mut rows_recovered = 0usize;
1138
1139        // Apply committed writes to memtable
1140        for (key, value) in &writes {
1141            // Key is row_id as little-endian u64
1142            if key.len() >= 8 {
1143                let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1144                if row_id > max_row_id {
1145                    max_row_id = row_id;
1146                }
1147
1148                // Deserialize row and insert into memtable
1149                if let Ok(row_values) = Self::deserialize_row(value) {
1150                    let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
1151                    
1152                    let memtable = self.active_memtable.read();
1153                    if memtable.insert(row_id, &value_refs).is_ok() {
1154                        rows_recovered += 1;
1155                    }
1156                }
1157            }
1158        }
1159
1160        // Update next_row_id to prevent conflicts
1161        if max_row_id > 0 {
1162            self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1163        }
1164
1165        Ok(LscsRecoveryStats {
1166            transactions_recovered: txn_count,
1167            rows_recovered,
1168            max_row_id,
1169        })
1170    }
1171
1172    /// Write a clean shutdown marker
1173    ///
1174    /// Call this during graceful shutdown to indicate all data was flushed.
1175    /// On next open, if this marker exists, recovery can be optimized.
1176    pub fn mark_clean_shutdown(&self) -> Result<()> {
1177        // First ensure all data is synced
1178        self.fsync()?;
1179
1180        // Write clean shutdown marker
1181        let marker_path = self.path.join(".clean_shutdown");
1182        std::fs::write(&marker_path, b"clean")?;
1183
1184        // Optionally truncate WAL after checkpoint
1185        // (conservative: we leave WAL intact for extra safety)
1186
1187        Ok(())
1188    }
1189
1190    /// Insert a row
1191    pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1192        let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1193
1194        // Write to WAL first
1195        let txn_id = self.wal.begin_transaction()?;
1196
1197        // Serialize values for WAL
1198        let key = row_id.to_le_bytes().to_vec();
1199        let value = self.serialize_row(values)?;
1200        self.wal.write(txn_id, key, value)?;
1201        self.wal.commit_transaction(txn_id)?;
1202
1203        // Insert into memtable
1204        let memtable = self.active_memtable.read();
1205        memtable.insert(row_id, values)?;
1206
1207        // Check if memtable is full
1208        if memtable.is_full() {
1209            drop(memtable);
1210            self.rotate_memtable()?;
1211        }
1212
1213        Ok(row_id)
1214    }
1215
1216    /// Mark a row as deleted by setting __txn_end to the given transaction timestamp
1217    ///
1218    /// In MVCC, deletion doesn't remove the row immediately - instead we mark
1219    /// it with an end timestamp so it becomes invisible to newer transactions.
1220    ///
1221    /// ## Durability
1222    ///
1223    /// This operation is fully WAL-logged with proper transaction boundaries:
1224    /// - TxnBegin record
1225    /// - Data record with the updated row
1226    /// - TxnCommit record with fsync
1227    ///
1228    /// On crash recovery, only committed deletions will be replayed.
1229    pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1230        // Find the __txn_end column index
1231        let txn_end_idx = self
1232            .schema
1233            .columns
1234            .iter()
1235            .position(|c| c.name == "__txn_end")
1236            .ok_or_else(|| {
1237                SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1238            })?;
1239
1240        // Get current row values
1241        let current = self
1242            .get(row_id)?
1243            .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1244
1245        // Build new row with updated __txn_end
1246        let mut new_values: Vec<Option<Vec<u8>>> = current;
1247        new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1248
1249        // Convert to references for insert
1250        let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1251
1252        // Write to WAL with proper transaction boundaries for durability
1253        // Begin a new WAL transaction (allocates txn_id and writes TxnBegin)
1254        let wal_txn_id = self.wal.begin_transaction()?;
1255
1256        // Write the data record
1257        let row_data = self.serialize_row(&value_refs)?;
1258        self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1259
1260        // Commit with fsync for durability guarantee
1261        self.wal.commit_transaction(wal_txn_id)?;
1262
1263        // Update memtable (only after WAL commit succeeds)
1264        let memtable = self.active_memtable.read();
1265        memtable.insert(row_id, &value_refs)?;
1266
1267        Ok(())
1268    }
1269
1270    /// Serialize a row for WAL storage
1271    fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1272        use byteorder::{LittleEndian, WriteBytesExt};
1273
1274        let mut buf = Vec::new();
1275        buf.write_u32::<LittleEndian>(values.len() as u32)?;
1276
1277        for value in values {
1278            match value {
1279                Some(data) => {
1280                    buf.write_u8(1)?; // non-null
1281                    buf.write_u32::<LittleEndian>(data.len() as u32)?;
1282                    buf.extend_from_slice(data);
1283                }
1284                None => {
1285                    buf.write_u8(0)?; // null
1286                }
1287            }
1288        }
1289
1290        Ok(buf)
1291    }
1292
1293    /// Deserialize a row from WAL storage
1294    #[allow(dead_code)]
1295    fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1296        use byteorder::{LittleEndian, ReadBytesExt};
1297        use std::io::Cursor;
1298
1299        let mut cursor = Cursor::new(data);
1300        let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1301        let mut values = Vec::with_capacity(num_cols);
1302
1303        for _ in 0..num_cols {
1304            let is_non_null = cursor.read_u8()? == 1;
1305            if is_non_null {
1306                let len = cursor.read_u32::<LittleEndian>()? as usize;
1307                let pos = cursor.position() as usize;
1308                let value = data[pos..pos + len].to_vec();
1309                cursor.set_position((pos + len) as u64);
1310                values.push(Some(value));
1311            } else {
1312                values.push(None);
1313            }
1314        }
1315
1316        Ok(values)
1317    }
1318
1319    /// Get a row by row ID
1320    ///
1321    /// Search order: memtable -> immutable memtables -> SSTables
1322    /// Uses learned sparse index for O(1) expected time on SSTables
1323    pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1324        // 1. Check active memtable (O(log N))
1325        {
1326            let memtable = self.active_memtable.read();
1327            if let Some(values) = memtable.get(row_id) {
1328                return Ok(Some(values));
1329            }
1330        }
1331
1332        // 2. Check immutable memtables (O(log N) per table)
1333        {
1334            let immutable = self.immutable_memtables.read();
1335            for memtable in immutable.iter().rev() {
1336                if let Some(values) = memtable.get(row_id) {
1337                    return Ok(Some(values));
1338                }
1339            }
1340        }
1341
1342        // 3. Check SSTables using learned sparse index
1343        // For each level, use LSI for O(1) expected lookup
1344        {
1345            use sochdb_core::learned_index::LookupResult;
1346            let groups = self.column_groups.read();
1347            for level in &*groups {
1348                for group in level.iter().rev() {
1349                    if let Some(lsi) = &group.lsi {
1350                        // Use learned index for O(1) lookup
1351                        let lookup = lsi.lookup(row_id);
1352                        match lookup {
1353                            LookupResult::Exact(_) | LookupResult::Range { .. } => {
1354                                // Key might be in this SSTable, read to confirm
1355                                if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1356                                    return Ok(Some(row));
1357                                }
1358                            }
1359                            LookupResult::NotFound => {
1360                                // Key definitely not in this SSTable
1361                                continue;
1362                            }
1363                        }
1364                    }
1365                }
1366            }
1367        }
1368
1369        Ok(None)
1370    }
1371
1372    /// Read a single row from an SSTable file
1373    fn read_row_from_sstable(
1374        &self,
1375        group: &ColumnGroup,
1376        row_id: RowId,
1377    ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1378        use byteorder::{LittleEndian, ReadBytesExt};
1379        use std::fs::File;
1380        use std::io::{BufReader, Read, Seek, SeekFrom};
1381
1382        let file = File::open(group.file_path())?;
1383        let mut reader = BufReader::new(file);
1384
1385        let mut values = Vec::new();
1386
1387        // Read each column's data for this row
1388        for (col_name, col_idx) in &group.column_offsets {
1389            reader.seek(SeekFrom::Start(col_idx.offset))?;
1390
1391            // Read column header
1392            let col_type = reader.read_u8()?;
1393            let row_count = reader.read_u64::<LittleEndian>()?;
1394
1395            if row_id >= row_count {
1396                values.push(None);
1397                continue;
1398            }
1399
1400            // Read null bitmap
1401            let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1402            let mut nulls = vec![0u8; nulls_len];
1403            reader.read_exact(&mut nulls)?;
1404
1405            // Check if this row is null
1406            let byte_idx = (row_id / 8) as usize;
1407            let bit_offset = (row_id % 8) as u8;
1408            let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1409
1410            if is_null {
1411                values.push(None);
1412                continue;
1413            }
1414
1415            // Read value based on column type
1416            let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1417            if let Some(fixed_size) = col_type.fixed_size() {
1418                // Skip nulls bitmap, then seek to row
1419                let offsets_section = reader.stream_position()?;
1420                let data_len = reader.read_u32::<LittleEndian>()? as usize;
1421                let _ = data_len;
1422
1423                // Calculate offset for this row
1424                let row_offset = (row_id as usize) * fixed_size;
1425                reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1426
1427                let mut value = vec![0u8; fixed_size];
1428                reader.read_exact(&mut value)?;
1429                values.push(Some(value));
1430            } else {
1431                // Variable-length: read offsets array
1432                let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1433                let mut offsets = vec![0u32; offsets_count];
1434                for offset in offsets.iter_mut().take(offsets_count) {
1435                    *offset = reader.read_u32::<LittleEndian>()?;
1436                }
1437
1438                if (row_id as usize + 1) >= offsets.len() {
1439                    values.push(None);
1440                    continue;
1441                }
1442
1443                let start = offsets[row_id as usize] as usize;
1444                let end = offsets[(row_id + 1) as usize] as usize;
1445
1446                // Read data section
1447                let data_len = reader.read_u32::<LittleEndian>()? as usize;
1448                let data_start = reader.stream_position()?;
1449
1450                if end <= data_len {
1451                    reader.seek(SeekFrom::Start(data_start + start as u64))?;
1452                    let mut value = vec![0u8; end - start];
1453                    reader.read_exact(&mut value)?;
1454                    values.push(Some(value));
1455                } else {
1456                    values.push(None);
1457                }
1458            }
1459            let _ = col_name; // silence unused warning
1460        }
1461
1462        if values.is_empty() {
1463            Ok(None)
1464        } else {
1465            Ok(Some(values))
1466        }
1467    }
1468
1469    /// Fsync - ensure all data is durably persisted to disk
1470    ///
1471    /// This is the key durability guarantee:
1472    /// 1. Flush WAL to disk with fsync
1473    /// 2. If memtable is large, flush to SSTable
1474    ///
1475    /// After fsync returns, all prior writes are guaranteed durable.
1476    pub fn fsync(&self) -> Result<()> {
1477        // 1. Sync WAL (critical for durability)
1478        self.wal.sync()?;
1479
1480        // 2. Optionally flush memtable if it's getting large
1481        let memtable = self.active_memtable.read();
1482        let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1483        drop(memtable);
1484
1485        if should_flush {
1486            // Rotate and flush in background
1487            self.rotate_memtable()?;
1488            self.flush()?;
1489        }
1490
1491        Ok(())
1492    }
1493
1494    /// Rotate memtable (switch to new one, add old to immutable list)
1495    fn rotate_memtable(&self) -> Result<()> {
1496        let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1497
1498        let old_memtable = {
1499            let mut active = self.active_memtable.write();
1500            std::mem::replace(&mut *active, new_memtable)
1501        };
1502
1503        let mut immutable = self.immutable_memtables.write();
1504        immutable.push(old_memtable);
1505
1506        // Trigger background flush if we have pending immutable memtables
1507        if immutable.len() >= 2 {
1508            // Flush synchronously if too many pending
1509            drop(immutable); // Release lock before flushing
1510            self.flush()?;
1511        }
1512
1513        Ok(())
1514    }
1515
1516    /// Flush immutable memtables to disk
1517    pub fn flush(&self) -> Result<()> {
1518        let memtables = {
1519            let mut immutable = self.immutable_memtables.write();
1520            std::mem::take(&mut *immutable)
1521        };
1522
1523        for memtable in memtables {
1524            let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1525            let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1526
1527            let mut groups = self.column_groups.write();
1528            groups[0].push(column_group);
1529        }
1530
1531        // Check if L0 compaction needed
1532        let groups = self.column_groups.read();
1533        if groups[0].len() >= self.config.l0_compaction_threshold {
1534            drop(groups);
1535            self.compact_l0()?;
1536        }
1537
1538        Ok(())
1539    }
1540
1541    /// Compact L0 column groups using column-aware compaction (Task 3)
1542    ///
1543    /// This implementation:
1544    /// 1. Identifies hot columns based on temperature tracking
1545    /// 2. Only merges hot columns to L1
1546    /// 3. Preserves cold column references (no rewrite)
1547    /// 4. Reduces write amplification by factor of (hot_columns / total_columns)
1548    fn compact_l0(&self) -> Result<()> {
1549        let start_time = std::time::Instant::now();
1550
1551        // Get hot and cold columns
1552        let hot_columns = self.temperature_tracker.get_hot_columns();
1553        let cold_columns = self.temperature_tracker.get_cold_columns();
1554
1555        let total_columns = self.schema.columns.len();
1556        let hot_fraction = if total_columns > 0 {
1557            hot_columns.len() as f64 / total_columns as f64
1558        } else {
1559            1.0
1560        };
1561
1562        // Get L0 segments to compact
1563        let l0_segments: Vec<ColumnGroup> = {
1564            let mut groups = self.column_groups.write();
1565            std::mem::take(&mut groups[0])
1566        };
1567
1568        if l0_segments.is_empty() {
1569            return Ok(());
1570        }
1571
1572        let mut bytes_read = 0u64;
1573        let mut bytes_written = 0u64;
1574        let mut cold_refs_preserved = 0u64;
1575
1576        // Perform selective merge
1577        let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1578
1579        // For a full implementation, we would:
1580        // 1. Read hot column data from all L0 segments
1581        // 2. Merge sort the data by row_id
1582        // 3. Write merged hot columns to new L1 segment
1583        // 4. Create segment descriptor with cold column references
1584
1585        // For now, implement a simplified version that demonstrates the approach
1586        let _merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1587
1588        // Track column stripe references for the new segment
1589        let mut col_refs = HashMap::new();
1590        let mut total_row_count = 0u64;
1591        let min_row_id = u64::MAX;
1592        let max_row_id = 0u64;
1593
1594        // Process each L0 segment
1595        for segment in &l0_segments {
1596            bytes_read += segment.row_count * 100; // Estimate
1597            total_row_count += segment.row_count;
1598
1599            // For hot columns: read and merge
1600            for col_name in &hot_columns {
1601                if let Some(col_idx) = segment.column_offsets.get(col_name) {
1602                    bytes_read += col_idx.length;
1603                    bytes_written += col_idx.length;
1604                }
1605            }
1606
1607            // For cold columns: just keep reference (no I/O)
1608            for col_name in &cold_columns {
1609                if let Some(col_idx) = segment.column_offsets.get(col_name) {
1610                    // Create reference to existing stripe (no rewrite)
1611                    let stripe_ref = ColumnStripeRef::new(
1612                        segment.level,
1613                        segment.sequence,
1614                        col_name.clone(),
1615                        col_idx.offset,
1616                        col_idx.length,
1617                        segment.row_count,
1618                    );
1619                    col_refs.insert(col_name.clone(), stripe_ref);
1620                    cold_refs_preserved += 1;
1621                }
1622            }
1623        }
1624
1625        // Create segment descriptor for the merged segment
1626        let segment_desc = SegmentDescriptor {
1627            id: sequence,
1628            level: 1,
1629            col_refs,
1630            min_row_id,
1631            max_row_id,
1632            row_count: total_row_count,
1633            min_timestamp: 0,
1634            max_timestamp: std::time::SystemTime::now()
1635                .duration_since(std::time::UNIX_EPOCH)
1636                .unwrap()
1637                .as_micros() as u64,
1638            is_tombstone: false,
1639        };
1640
1641        // Store segment descriptor
1642        {
1643            let mut descriptors = self.segment_descriptors.write();
1644            descriptors.insert(sequence, segment_desc);
1645        }
1646
1647        // Update compaction stats
1648        {
1649            let mut stats = self.compaction_stats.write();
1650            stats.compactions_total += 1;
1651            stats.l0_compactions += 1;
1652            stats.bytes_read += bytes_read;
1653            stats.bytes_written += bytes_written;
1654            stats.cold_column_refs_preserved += cold_refs_preserved;
1655            stats.hot_column_compactions += 1;
1656            stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1657            stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1658        }
1659
1660        // Clean up old L0 segments (mark as tombstones)
1661        // In production, this would be coordinated with file deletion
1662        for segment in l0_segments {
1663            // The segment files remain for cold column references
1664            // until no longer needed
1665            let _ = segment; // Drop the in-memory metadata
1666        }
1667
1668        Ok(())
1669    }
1670
1671    /// Perform selective merge of hot columns across segments (Task 3)
1672    ///
1673    /// Algorithm:
1674    ///   Input: L0 segments S₀ = {s₁, s₂, ..., s_n}, hot columns H
1675    ///   
1676    ///   1. For each column cⱼ ∈ H:
1677    ///      merged[cⱼ] = merge_column_stripes(S₀[cⱼ])
1678    ///   
1679    ///   2. Write merged hot columns to new segment
1680    ///   
1681    ///   Time: O(|H| × N × log(N)) where N = rows
1682    #[allow(dead_code)]
1683    fn selective_merge_hot_columns(
1684        &self,
1685        segments: &[&ColumnGroup],
1686        hot_columns: &HashSet<String>,
1687        output_path: &Path,
1688    ) -> Result<HashMap<String, ColumnStripeRef>> {
1689        use byteorder::{LittleEndian, WriteBytesExt};
1690        use std::fs::File;
1691        use std::io::{BufWriter, Seek, Write};
1692
1693        let mut result = HashMap::new();
1694
1695        // Create output file
1696        let file = File::create(output_path)?;
1697        let mut writer = BufWriter::new(file);
1698
1699        // Write header
1700        writer.write_all(&ColumnGroup::MAGIC)?;
1701        writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1702
1703        let sequence = self.next_sequence.load(Ordering::SeqCst);
1704
1705        // Merge each hot column
1706        for col_name in hot_columns {
1707            let start_offset = writer.stream_position()?;
1708
1709            // Collect column data from all segments
1710            let mut merged_data = Vec::new();
1711            let mut row_count = 0u64;
1712
1713            for segment in segments {
1714                if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1715                    // Read column data from segment
1716                    // In production, this would do proper merge-sort
1717                    merged_data.extend_from_slice(&[0u8; 0]); // Placeholder
1718                    row_count += segment.row_count;
1719                }
1720            }
1721
1722            // Write merged column
1723            // In production, this would write proper column format
1724            writer.write_u64::<LittleEndian>(row_count)?;
1725            writer.write_all(&merged_data)?;
1726
1727            let end_offset = writer.stream_position()?;
1728
1729            // Create stripe reference
1730            let stripe_ref = ColumnStripeRef::new(
1731                1, // L1
1732                sequence,
1733                col_name.clone(),
1734                start_offset,
1735                end_offset - start_offset,
1736                row_count,
1737            );
1738            result.insert(col_name.clone(), stripe_ref);
1739        }
1740
1741        writer.flush()?;
1742        Ok(result)
1743    }
1744
1745    /// Read specific column stripes for a query (Task 3)
1746    ///
1747    /// Only reads the requested columns, reducing I/O by (1 - k/K)
1748    /// where k = requested columns, K = total columns
1749    pub fn scan_columns(
1750        &self,
1751        column_names: &[&str],
1752        row_range: Option<(RowId, RowId)>,
1753    ) -> Result<Vec<Vec<u8>>> {
1754        let mut results = Vec::new();
1755
1756        // Look up stripe references for requested columns
1757        let descriptors = self.segment_descriptors.read();
1758
1759        for (_seg_id, descriptor) in descriptors.iter() {
1760            // Check row range overlap if specified
1761            if let Some((min, max)) = row_range
1762                && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1763            {
1764                continue;
1765            }
1766
1767            // Read only requested columns
1768            for col_name in column_names {
1769                if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1770                    // Read stripe data
1771                    let data = self.read_column_stripe(stripe_ref)?;
1772                    results.push(data);
1773                }
1774            }
1775        }
1776
1777        Ok(results)
1778    }
1779
1780    /// Read a single column stripe from disk
1781    fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1782        use std::fs::File;
1783        use std::io::{Read, Seek, SeekFrom};
1784
1785        // Construct file path from level and segment_id
1786        let file_path = self.path.join(format!(
1787            "L{}_seq{}.sst",
1788            stripe_ref.level, stripe_ref.segment_id
1789        ));
1790
1791        let mut file = File::open(&file_path)?;
1792        file.seek(SeekFrom::Start(stripe_ref.offset))?;
1793
1794        let mut data = vec![0u8; stripe_ref.length as usize];
1795        file.read_exact(&mut data)?;
1796
1797        Ok(data)
1798    }
1799
1800    /// Get compaction statistics
1801    pub fn compaction_stats(&self) -> CompactionStats {
1802        self.compaction_stats.read().clone()
1803    }
1804
1805    /// Trigger manual compaction
1806    ///
1807    /// This compacts L0 segments into L1 using the temperature-aware strategy
1808    pub fn compact(&self) -> Result<()> {
1809        self.compact_l0()
1810    }
1811
1812    /// Get column temperatures for monitoring
1813    pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1814        self.temperature_tracker.get_all_temperatures()
1815    }
1816
1817    /// Scan a range of row IDs
1818    ///
1819    /// Returns all rows in the range [start, end] from all sources
1820    /// (memtable, immutable memtables, SSTables)
1821    #[allow(clippy::type_complexity)]
1822    pub fn scan_range(
1823        &self,
1824        start: RowId,
1825        end: RowId,
1826    ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1827        let mut results = Vec::new();
1828        let mut seen = std::collections::HashSet::new();
1829
1830        // 1. Scan active memtable
1831        {
1832            let memtable = self.active_memtable.read();
1833            for (row_id, values) in memtable.scan_range(start, end) {
1834                if seen.insert(row_id) {
1835                    results.push((row_id, values));
1836                }
1837            }
1838        }
1839
1840        // 2. Scan immutable memtables
1841        {
1842            let immutable = self.immutable_memtables.read();
1843            for memtable in immutable.iter().rev() {
1844                for (row_id, values) in memtable.scan_range(start, end) {
1845                    if seen.insert(row_id) {
1846                        results.push((row_id, values));
1847                    }
1848                }
1849            }
1850        }
1851
1852        // 3. Scan SSTables (would need to iterate over all, using index)
1853        // For now, focus on memtable access; SSTable scan is more complex
1854
1855        // Sort by row_id
1856        results.sort_by_key(|(id, _)| *id);
1857
1858        Ok(results)
1859    }
1860
1861    /// Scan specific columns for a range of row IDs (columnar optimization)
1862    ///
1863    /// This achieves 80% I/O reduction when reading 20% of columns
1864    #[allow(clippy::type_complexity)]
1865    pub fn scan_columns_range(
1866        &self,
1867        start: RowId,
1868        end: RowId,
1869        col_indices: &[usize],
1870    ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1871        let mut results = Vec::new();
1872        let mut seen = std::collections::HashSet::new();
1873
1874        // 1. Scan active memtable
1875        {
1876            let memtable = self.active_memtable.read();
1877            let row_ids = memtable.row_ids.read();
1878
1879            for (&row_id, _) in row_ids.range(start..=end) {
1880                if seen.insert(row_id)
1881                    && let Some(values) = memtable.get_columns(row_id, col_indices)
1882                {
1883                    results.push((row_id, values));
1884                }
1885            }
1886        }
1887
1888        // 2. Scan immutable memtables
1889        {
1890            let immutable = self.immutable_memtables.read();
1891            for memtable in immutable.iter().rev() {
1892                let row_ids = memtable.row_ids.read();
1893                for (&row_id, _) in row_ids.range(start..=end) {
1894                    if seen.insert(row_id)
1895                        && let Some(values) = memtable.get_columns(row_id, col_indices)
1896                    {
1897                        results.push((row_id, values));
1898                    }
1899                }
1900            }
1901        }
1902
1903        // Sort by row_id
1904        results.sort_by_key(|(id, _)| *id);
1905
1906        Ok(results)
1907    }
1908
1909    /// Get statistics
1910    pub fn stats(&self) -> LscsStats {
1911        let active = self.active_memtable.read();
1912        let immutable = self.immutable_memtables.read();
1913        let groups = self.column_groups.read();
1914
1915        let mut level_sizes = vec![0u64; self.config.num_levels];
1916        let mut disk_bytes = 0u64;
1917
1918        for (i, level) in groups.iter().enumerate() {
1919            for group in level {
1920                level_sizes[i] += group.row_count;
1921                // Calculate disk bytes from SST file sizes
1922                if let Ok(metadata) = std::fs::metadata(&group.path) {
1923                    disk_bytes += metadata.len();
1924                }
1925            }
1926        }
1927
1928        LscsStats {
1929            active_memtable_bytes: active.memory_bytes(),
1930            immutable_memtables: immutable.len(),
1931            level_row_counts: level_sizes,
1932            next_row_id: self.next_row_id.load(Ordering::SeqCst),
1933            disk_bytes,
1934        }
1935    }
1936
1937    /// Get WAL reference
1938    pub fn wal(&self) -> &Arc<TxnWal> {
1939        &self.wal
1940    }
1941}
1942
1943/// LSCS statistics
1944#[derive(Debug, Clone)]
1945pub struct LscsStats {
1946    /// Active memtable memory usage
1947    pub active_memtable_bytes: usize,
1948    /// Number of immutable memtables pending flush
1949    pub immutable_memtables: usize,
1950    /// Row count per level
1951    pub level_row_counts: Vec<u64>,
1952    /// Next row ID to be assigned
1953    pub next_row_id: u64,
1954    /// Total disk bytes used by SST files
1955    pub disk_bytes: u64,
1956}
1957
1958#[cfg(test)]
1959mod tests {
1960    use super::*;
1961    use tempfile::tempdir;
1962
1963    fn test_schema() -> TableSchema {
1964        TableSchema {
1965            name: "users".to_string(),
1966            columns: vec![
1967                ColumnDef {
1968                    name: "id".to_string(),
1969                    col_type: ColumnType::UInt64,
1970                    nullable: false,
1971                },
1972                ColumnDef {
1973                    name: "name".to_string(),
1974                    col_type: ColumnType::Text,
1975                    nullable: false,
1976                },
1977                ColumnDef {
1978                    name: "score".to_string(),
1979                    col_type: ColumnType::Float64,
1980                    nullable: true,
1981                },
1982            ],
1983        }
1984    }
1985
1986    #[test]
1987    fn test_columnar_memtable_insert() {
1988        let schema = test_schema();
1989        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
1990
1991        let id: u64 = 1;
1992        let name = "Alice";
1993        let score: f64 = 95.5;
1994
1995        memtable
1996            .insert(
1997                1,
1998                &[
1999                    Some(&id.to_le_bytes()),
2000                    Some(name.as_bytes()),
2001                    Some(&score.to_le_bytes()),
2002                ],
2003            )
2004            .unwrap();
2005
2006        assert_eq!(memtable.row_count(), 1);
2007    }
2008
2009    #[test]
2010    fn test_columnar_memtable_with_nulls() {
2011        let schema = test_schema();
2012        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2013
2014        let id: u64 = 1;
2015        let name = "Bob";
2016
2017        // Score is null
2018        memtable
2019            .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2020            .unwrap();
2021
2022        assert_eq!(memtable.row_count(), 1);
2023    }
2024
2025    #[test]
2026    fn test_lscs_basic() {
2027        let dir = tempdir().unwrap();
2028        let schema = test_schema();
2029        let config = LscsConfig {
2030            memtable_size: 1024,
2031            ..Default::default()
2032        };
2033
2034        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2035
2036        let id: u64 = 1;
2037        let name = "Charlie";
2038        let score: f64 = 87.2;
2039
2040        let row_id = lscs
2041            .insert(&[
2042                Some(&id.to_le_bytes()),
2043                Some(name.as_bytes()),
2044                Some(&score.to_le_bytes()),
2045            ])
2046            .unwrap();
2047
2048        assert_eq!(row_id, 1);
2049
2050        let stats = lscs.stats();
2051        assert!(stats.active_memtable_bytes > 0);
2052    }
2053
2054    #[test]
2055    fn test_column_group_write() {
2056        let dir = tempfile::tempdir().unwrap();
2057        let schema = TableSchema::new(
2058            "users".to_string(),
2059            vec![
2060                ColumnDef {
2061                    name: "id".to_string(),
2062                    col_type: ColumnType::UInt64,
2063                    nullable: false,
2064                },
2065                ColumnDef {
2066                    name: "name".to_string(),
2067                    col_type: ColumnType::Text,
2068                    nullable: false,
2069                },
2070                ColumnDef {
2071                    name: "score".to_string(),
2072                    col_type: ColumnType::Float64,
2073                    nullable: true,
2074                },
2075            ],
2076        )
2077        .with_mvcc(); // Add MVCC columns
2078
2079        let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2080
2081        // Add rows using the public insert API
2082        // Row 1: Active
2083        memtable
2084            .insert(
2085                1,
2086                &[
2087                    Some(&1u64.to_le_bytes()),    // id
2088                    Some(b"Alice"),               // name
2089                    Some(&95.5f64.to_le_bytes()), // score
2090                    Some(&100u64.to_le_bytes()),  // __txn_start
2091                    Some(&0u64.to_le_bytes()),    // __txn_end
2092                ],
2093            )
2094            .unwrap();
2095
2096        // Row 2: Deleted
2097        memtable
2098            .insert(
2099                2,
2100                &[
2101                    Some(&2u64.to_le_bytes()),    // id
2102                    Some(b"Bob"),                 // name
2103                    Some(&87.2f64.to_le_bytes()), // score
2104                    Some(&100u64.to_le_bytes()),  // __txn_start
2105                    Some(&200u64.to_le_bytes()),  // __txn_end (deleted at 200)
2106                ],
2107            )
2108            .unwrap();
2109
2110        let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2111        let file_path = cg.file_path();
2112        assert!(file_path.exists());
2113        assert!(file_path.extension().unwrap() == "sst");
2114
2115        // Verify we can open it back
2116        let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2117        assert_eq!(cg_opened.column_offsets.len(), 5); // 3 user + 2 mvcc
2118
2119        // Verify LSI is present
2120        assert!(cg_opened.lsi.is_some());
2121        let lsi = cg_opened.lsi.as_ref().unwrap();
2122        assert!(lsi.stats().num_keys > 0);
2123    }
2124
2125    #[test]
2126    fn test_memtable_get() {
2127        let schema = test_schema();
2128        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2129
2130        // Insert some rows
2131        let id1: u64 = 1;
2132        let name1 = "Alice";
2133        let score1: f64 = 95.5;
2134        memtable
2135            .insert(
2136                1,
2137                &[
2138                    Some(&id1.to_le_bytes()),
2139                    Some(name1.as_bytes()),
2140                    Some(&score1.to_le_bytes()),
2141                ],
2142            )
2143            .unwrap();
2144
2145        let id2: u64 = 2;
2146        let name2 = "Bob";
2147        memtable
2148            .insert(
2149                2,
2150                &[
2151                    Some(&id2.to_le_bytes()),
2152                    Some(name2.as_bytes()),
2153                    None, // null score
2154                ],
2155            )
2156            .unwrap();
2157
2158        // Test get by row ID
2159        let row1 = memtable.get(1).unwrap();
2160        assert_eq!(row1.len(), 3);
2161        assert_eq!(
2162            u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2163            1
2164        );
2165        assert_eq!(
2166            std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2167            "Alice"
2168        );
2169
2170        let row2 = memtable.get(2).unwrap();
2171        assert!(row2[2].is_none()); // null score
2172
2173        // Test get non-existent row
2174        assert!(memtable.get(999).is_none());
2175    }
2176
2177    #[test]
2178    fn test_memtable_scan_range() {
2179        let schema = test_schema();
2180        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2181
2182        // Insert rows with different row IDs
2183        for i in 1..=10 {
2184            memtable
2185                .insert(
2186                    i,
2187                    &[
2188                        Some(&i.to_le_bytes()),
2189                        Some(format!("User{}", i).as_bytes()),
2190                        Some(&((i as f64) * 10.0).to_le_bytes()),
2191                    ],
2192                )
2193                .unwrap();
2194        }
2195
2196        // Scan range [3, 7]
2197        let results = memtable.scan_range(3, 7);
2198        assert_eq!(results.len(), 5);
2199
2200        // Verify row IDs are in range
2201        for (row_id, _) in &results {
2202            assert!(*row_id >= 3 && *row_id <= 7);
2203        }
2204    }
2205
2206    #[test]
2207    fn test_lscs_get() {
2208        let dir = tempdir().unwrap();
2209        let schema = test_schema();
2210        let config = LscsConfig {
2211            memtable_size: 64 * 1024 * 1024,
2212            ..Default::default()
2213        };
2214
2215        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2216
2217        // Insert a row
2218        let id: u64 = 42;
2219        let name = "TestUser";
2220        let score: f64 = 99.9;
2221
2222        let row_id = lscs
2223            .insert(&[
2224                Some(&id.to_le_bytes()),
2225                Some(name.as_bytes()),
2226                Some(&score.to_le_bytes()),
2227            ])
2228            .unwrap();
2229
2230        // Get the row back
2231        let result = lscs.get(row_id).unwrap();
2232        assert!(result.is_some());
2233
2234        let values = result.unwrap();
2235        assert_eq!(
2236            u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2237            42
2238        );
2239        assert_eq!(
2240            std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2241            "TestUser"
2242        );
2243    }
2244
2245    #[test]
2246    fn test_lscs_fsync() {
2247        let dir = tempdir().unwrap();
2248        let schema = test_schema();
2249        let config = LscsConfig::default();
2250
2251        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2252
2253        // Insert some data
2254        for i in 1..=5 {
2255            lscs.insert(&[
2256                Some(&(i as u64).to_le_bytes()),
2257                Some(format!("User{}", i).as_bytes()),
2258                Some(&((i as f64) * 10.0).to_le_bytes()),
2259            ])
2260            .unwrap();
2261        }
2262
2263        // Fsync should not panic
2264        lscs.fsync().unwrap();
2265
2266        // Data should still be accessible after fsync
2267        let result = lscs.get(1).unwrap();
2268        assert!(result.is_some());
2269    }
2270}