Skip to main content

sochdb_storage/
lscs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Log-Structured Column Store (LSCS)
19//!
20//! A columnar variant of LSM trees optimized for TOON workloads.
21//!
22//! ## Key Innovations
23//!
24//! 1. **Column-oriented memtable**: Groups data by column for sequential writes
25//! 2. **Column-aware compaction**: Only compacts changed columns (5× less write amplification)
26//! 3. **Schema-aware compression**: Uses column type for optimal encoding
27//!
28//! ## Write Amplification Analysis
29//!
30//! Traditional LSM: WA = T × (T-1) / 2  where T = size ratio (typically 10)
31//! LSCS: WA = T × (T-1) / 2 × (C_hot / C_total)
32//!
33//! If only 20% of columns change frequently: **5× less write amplification**
34//!
35//! ## Column-Aware Compaction Model
36//!
37//! Standard LSM Write Amplification:
38//!    WA = (T/(T-1)) × Σᵢ₌₀ᵏ (1/Tⁱ) ≈ (T/(T-1)) × log_T(N/M)
39//!
40//! Column-Aware WA:
41//!    Let C = {c₁, c₂, ..., c_K} be columns
42//!    Hot columns: H = {cⱼ | temp(cⱼ) > θ} where θ = 0.1
43//!    WA_col = (|H|/|C|) × WA = f × WA
44//!
45//!    Example: If 2 out of 10 columns hot: f = 0.2, 5x reduction
46//!
47//! ## Architecture
48//!
49//! ```text
50//! ┌────────────────┐
51//! │ MemTable       │
52//! │ [col1][col2]...│ ← Columns in memory
53//! └───────┬────────┘
54//!         │ flush
55//!         ▼
56//! ┌────────────────────────────┐
57//! │ Column Group L0            │
58//! │ ┌─────┐┌─────┐┌─────┐     │
59//! │ │col1 ││col2 ││col3 │     │
60//! │ └─────┘└─────┘└─────┘     │
61//! └────────────────────────────┘
62//! ```
63
64use parking_lot::RwLock;
65use serde::{Deserialize, Serialize};
66use std::collections::{BTreeMap, HashMap, HashSet};
67use std::path::{Path, PathBuf};
68use std::sync::Arc;
69use std::sync::atomic::{AtomicU64, Ordering};
70use sochdb_core::{Result, SochDBError};
71
72use crate::txn_wal::TxnWal;
73
74// ============================================================================
75// Column Temperature Tracking (Task 3)
76// ============================================================================
77
78/// Temperature tracking for a single column using Exponential Moving Average.
79///
80/// Temperature is computed as: temp_new = α × temp_current + (1-α) × temp_old
81/// where α = 0.1 (decay factor) and temp_current = updates_in_window / total_updates
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ColumnTemperature {
84    /// Column name
85    pub name: String,
86    /// Current temperature (0.0 = cold, 1.0 = hot)
87    pub temperature: f64,
88    /// Updates in current window
89    pub window_updates: u64,
90    /// Total updates since last decay
91    pub total_updates: u64,
92    /// Last update timestamp (micros)
93    pub last_update_us: u64,
94}
95
96impl ColumnTemperature {
97    /// Create a new temperature tracker for a column
98    pub fn new(name: String) -> Self {
99        Self {
100            name,
101            temperature: 0.0,
102            window_updates: 0,
103            total_updates: 0,
104            last_update_us: 0,
105        }
106    }
107
108    /// Record an update to this column
109    pub fn record_update(&mut self) {
110        self.window_updates += 1;
111        self.total_updates += 1;
112        self.last_update_us = std::time::SystemTime::now()
113            .duration_since(std::time::UNIX_EPOCH)
114            .unwrap()
115            .as_micros() as u64;
116    }
117
118    /// Update temperature using EMA when window completes
119    ///
120    /// α = 0.1 (decay factor)
121    /// temp_new = α × temp_current + (1-α) × temp_old
122    pub fn update_ema(&mut self, total_window_updates: u64) {
123        const ALPHA: f64 = 0.1;
124
125        let temp_current = if total_window_updates > 0 {
126            self.window_updates as f64 / total_window_updates as f64
127        } else {
128            0.0
129        };
130
131        self.temperature = ALPHA * temp_current + (1.0 - ALPHA) * self.temperature;
132        self.window_updates = 0;
133    }
134
135    /// Check if column is "hot" (above threshold)
136    pub fn is_hot(&self, threshold: f64) -> bool {
137        self.temperature > threshold
138    }
139}
140
141/// Column temperature tracker for all columns in a table
142#[derive(Debug)]
143pub struct ColumnTemperatureTracker {
144    /// Per-column temperature (column name -> temperature)
145    columns: RwLock<HashMap<String, ColumnTemperature>>,
146    /// Window size for temperature updates
147    window_size: u64,
148    /// Current window update count
149    window_updates: AtomicU64,
150    /// Hot threshold (default 0.1)
151    hot_threshold: f64,
152}
153
154impl ColumnTemperatureTracker {
155    /// Create a new temperature tracker
156    pub fn new(column_names: &[String], window_size: u64) -> Self {
157        let mut columns = HashMap::new();
158        for name in column_names {
159            columns.insert(name.clone(), ColumnTemperature::new(name.clone()));
160        }
161        Self {
162            columns: RwLock::new(columns),
163            window_size,
164            window_updates: AtomicU64::new(0),
165            hot_threshold: 0.1,
166        }
167    }
168
169    /// Record an update to specific columns
170    pub fn record_updates(&self, column_names: &[&str]) {
171        let mut cols = self.columns.write();
172        for name in column_names {
173            if let Some(temp) = cols.get_mut(*name) {
174                temp.record_update();
175            }
176        }
177
178        let total = self.window_updates.fetch_add(1, Ordering::SeqCst) + 1;
179
180        // Check if window is complete
181        if total >= self.window_size {
182            self.update_all_ema(&mut cols, total);
183            self.window_updates.store(0, Ordering::SeqCst);
184        }
185    }
186
187    fn update_all_ema(&self, cols: &mut HashMap<String, ColumnTemperature>, total: u64) {
188        for temp in cols.values_mut() {
189            temp.update_ema(total);
190        }
191    }
192
193    /// Get hot columns (above threshold)
194    pub fn get_hot_columns(&self) -> HashSet<String> {
195        let cols = self.columns.read();
196        cols.values()
197            .filter(|t| t.is_hot(self.hot_threshold))
198            .map(|t| t.name.clone())
199            .collect()
200    }
201
202    /// Get cold columns (at or below threshold)
203    pub fn get_cold_columns(&self) -> HashSet<String> {
204        let cols = self.columns.read();
205        cols.values()
206            .filter(|t| !t.is_hot(self.hot_threshold))
207            .map(|t| t.name.clone())
208            .collect()
209    }
210
211    /// Get all temperatures for reporting
212    pub fn get_all_temperatures(&self) -> Vec<ColumnTemperature> {
213        self.columns.read().values().cloned().collect()
214    }
215
216    /// Set hot threshold
217    pub fn set_hot_threshold(&self, _threshold: f64) {
218        // Note: This would require interior mutability for hot_threshold
219        // For now this is a no-op; in practice use configuration
220    }
221}
222
223// ============================================================================
224// Column Stripe References (Task 3)
225// ============================================================================
226
227/// Reference to a column stripe stored at a specific level
228///
229/// Allows columns to be stored at different levels independently,
230/// enabling selective compaction of hot columns while cold columns
231/// remain at lower levels.
232#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
233pub struct ColumnStripeRef {
234    /// Level where stripe is stored
235    pub level: u32,
236    /// Segment ID within level
237    pub segment_id: u64,
238    /// Column name
239    pub column_name: String,
240    /// Offset within segment file
241    pub offset: u64,
242    /// Length in bytes
243    pub length: u64,
244    /// Row count in stripe
245    pub row_count: u64,
246    /// Compression type
247    pub compression: u8,
248}
249
250impl ColumnStripeRef {
251    /// Create a new stripe reference
252    pub fn new(
253        level: u32,
254        segment_id: u64,
255        column_name: String,
256        offset: u64,
257        length: u64,
258        row_count: u64,
259    ) -> Self {
260        Self {
261            level,
262            segment_id,
263            column_name,
264            offset,
265            length,
266            row_count,
267            compression: 0,
268        }
269    }
270
271    /// Create a reference pointing to a new location after compaction
272    pub fn relocate(&self, new_level: u32, new_segment_id: u64, new_offset: u64) -> Self {
273        Self {
274            level: new_level,
275            segment_id: new_segment_id,
276            column_name: self.column_name.clone(),
277            offset: new_offset,
278            length: self.length,
279            row_count: self.row_count,
280            compression: self.compression,
281        }
282    }
283}
284
285/// Segment descriptor with column stripe references
286///
287/// Instead of storing all column data inline, the segment stores
288/// references to column stripes which may be at different levels.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SegmentDescriptor {
291    /// Segment ID
292    pub id: u64,
293    /// Level
294    pub level: u32,
295    /// Column stripe references (column name -> stripe ref)
296    pub col_refs: HashMap<String, ColumnStripeRef>,
297    /// Min row ID in segment
298    pub min_row_id: RowId,
299    /// Max row ID in segment
300    pub max_row_id: RowId,
301    /// Row count
302    pub row_count: u64,
303    /// Min timestamp
304    pub min_timestamp: u64,
305    /// Max timestamp
306    pub max_timestamp: u64,
307    /// Is tombstone (deleted after compaction)
308    pub is_tombstone: bool,
309}
310
311/// Column ID type
312pub type ColumnId = u32;
313
314/// Row ID type (globally unique within table)
315pub type RowId = u64;
316
317/// Column data type for storage
318#[derive(Debug, Clone, Copy, PartialEq, Eq)]
319#[repr(u8)]
320pub enum ColumnType {
321    Bool = 0,
322    Int64 = 1,
323    UInt64 = 2,
324    Float64 = 3,
325    Text = 4,
326    Binary = 5,
327    Timestamp = 6,
328}
329
330impl ColumnType {
331    /// Fixed size in bytes, or None for variable-length
332    pub fn fixed_size(&self) -> Option<usize> {
333        match self {
334            ColumnType::Bool => Some(1),
335            ColumnType::Int64
336            | ColumnType::UInt64
337            | ColumnType::Float64
338            | ColumnType::Timestamp => Some(8),
339            ColumnType::Text | ColumnType::Binary => None,
340        }
341    }
342
343    /// From byte
344    pub fn from_byte(b: u8) -> Option<Self> {
345        match b {
346            0 => Some(ColumnType::Bool),
347            1 => Some(ColumnType::Int64),
348            2 => Some(ColumnType::UInt64),
349            3 => Some(ColumnType::Float64),
350            4 => Some(ColumnType::Text),
351            5 => Some(ColumnType::Binary),
352            6 => Some(ColumnType::Timestamp),
353            _ => None,
354        }
355    }
356}
357
358/// Schema for a table in LSCS
359#[derive(Debug, Clone)]
360pub struct TableSchema {
361    /// Table name
362    pub name: String,
363    /// Column definitions
364    pub columns: Vec<ColumnDef>,
365}
366
367impl TableSchema {
368    pub fn new(name: String, columns: Vec<ColumnDef>) -> Self {
369        Self { name, columns }
370    }
371
372    /// Add MVCC columns (__txn_start, __txn_end) if not present
373    pub fn with_mvcc(mut self) -> Self {
374        if !self.columns.iter().any(|c| c.name == "__txn_start") {
375            self.columns.push(ColumnDef {
376                name: "__txn_start".to_string(),
377                col_type: ColumnType::UInt64,
378                nullable: false,
379            });
380        }
381        if !self.columns.iter().any(|c| c.name == "__txn_end") {
382            self.columns.push(ColumnDef {
383                name: "__txn_end".to_string(),
384                col_type: ColumnType::UInt64,
385                nullable: false, // 0 or MAX for active/infinity
386            });
387        }
388        self
389    }
390}
391
392/// Column definition
393#[derive(Debug, Clone)]
394pub struct ColumnDef {
395    /// Column name
396    pub name: String,
397    /// Column type
398    pub col_type: ColumnType,
399    /// Is nullable
400    pub nullable: bool,
401}
402
403/// In-memory column buffer with O(1) random access
404#[derive(Debug)]
405struct ColumnBuffer {
406    /// Column type
407    col_type: ColumnType,
408    /// Data bytes
409    data: Vec<u8>,
410    /// Null bitmap (bit per row, 1 = non-null)
411    nulls: Vec<u8>,
412    /// Offsets for variable-length types
413    offsets: Option<Vec<u32>>,
414    /// Row count
415    row_count: u64,
416}
417
418impl ColumnBuffer {
419    fn new(col_type: ColumnType) -> Self {
420        Self {
421            col_type,
422            data: Vec::new(),
423            nulls: Vec::new(),
424            offsets: if col_type.fixed_size().is_none() {
425                Some(vec![0]) // Initial offset
426            } else {
427                None
428            },
429            row_count: 0,
430        }
431    }
432
433    /// Append a value (bytes)
434    fn append(&mut self, value: Option<&[u8]>) {
435        // Update null bitmap
436        let bit_idx = self.row_count as usize;
437        let byte_idx = bit_idx / 8;
438        let bit_offset = bit_idx % 8;
439
440        while self.nulls.len() <= byte_idx {
441            self.nulls.push(0);
442        }
443
444        if let Some(data) = value {
445            // Set non-null bit
446            self.nulls[byte_idx] |= 1 << bit_offset;
447
448            // Append data
449            self.data.extend_from_slice(data);
450
451            // Update offsets for variable-length
452            if let Some(offsets) = &mut self.offsets {
453                offsets.push(self.data.len() as u32);
454            }
455        } else if let Some(offsets) = &mut self.offsets {
456            // Null value - repeat last offset
457            let last = *offsets.last().unwrap();
458            offsets.push(last);
459        }
460
461        self.row_count += 1;
462    }
463
464    /// Check if value at row_idx is null
465    fn is_null(&self, row_idx: u64) -> bool {
466        if row_idx >= self.row_count {
467            return true; // Out of bounds treated as null
468        }
469        let byte_idx = (row_idx / 8) as usize;
470        let bit_offset = (row_idx % 8) as u8;
471
472        if byte_idx >= self.nulls.len() {
473            return true;
474        }
475
476        (self.nulls[byte_idx] & (1 << bit_offset)) == 0
477    }
478
479    /// Get value at row_idx
480    /// Returns None if null, Some(bytes) if non-null
481    fn get(&self, row_idx: u64) -> Option<Vec<u8>> {
482        if row_idx >= self.row_count || self.is_null(row_idx) {
483            return None;
484        }
485
486        if let Some(fixed_size) = self.col_type.fixed_size() {
487            // Fixed-size column: O(1) access
488            let start = (row_idx as usize) * fixed_size;
489            let end = start + fixed_size;
490            if end <= self.data.len() {
491                Some(self.data[start..end].to_vec())
492            } else {
493                None
494            }
495        } else {
496            // Variable-length column: use offsets
497            if let Some(offsets) = &self.offsets {
498                let start = offsets[row_idx as usize] as usize;
499                let end = offsets[(row_idx + 1) as usize] as usize;
500                if end <= self.data.len() {
501                    Some(self.data[start..end].to_vec())
502                } else {
503                    None
504                }
505            } else {
506                None
507            }
508        }
509    }
510
511    /// Memory usage in bytes
512    fn memory_bytes(&self) -> usize {
513        self.data.len() + self.nulls.len() + self.offsets.as_ref().map(|o| o.len() * 4).unwrap_or(0)
514    }
515}
516
517/// Columnar memtable with skip-list-like concurrent access
518#[derive(Debug)]
519pub struct ColumnarMemtable {
520    /// Table schema
521    schema: TableSchema,
522    /// Column buffers (one per column)
523    columns: Vec<RwLock<ColumnBuffer>>,
524    /// Row ID to row index mapping (skip-list for O(log N) lookup)
525    row_ids: RwLock<BTreeMap<RowId, u64>>,
526    /// Reverse mapping: row index -> row ID (for range scans)
527    row_idx_to_id: RwLock<Vec<RowId>>,
528    /// Next row index
529    next_row_idx: AtomicU64,
530    /// Total bytes written
531    bytes_written: AtomicU64,
532    /// Memtable size limit
533    size_limit: usize,
534}
535
536impl ColumnarMemtable {
537    /// Create a new columnar memtable
538    pub fn new(schema: TableSchema, size_limit: usize) -> Self {
539        let columns = schema
540            .columns
541            .iter()
542            .map(|def| RwLock::new(ColumnBuffer::new(def.col_type)))
543            .collect();
544
545        Self {
546            schema,
547            columns,
548            row_ids: RwLock::new(BTreeMap::new()),
549            row_idx_to_id: RwLock::new(Vec::new()),
550            next_row_idx: AtomicU64::new(0),
551            bytes_written: AtomicU64::new(0),
552            size_limit,
553        }
554    }
555
556    /// Insert a row
557    ///
558    /// `values` must have the same length as schema columns
559    pub fn insert(&self, row_id: RowId, values: &[Option<&[u8]>]) -> Result<()> {
560        if values.len() != self.schema.columns.len() {
561            return Err(SochDBError::InvalidData(format!(
562                "Expected {} columns, got {}",
563                self.schema.columns.len(),
564                values.len()
565            )));
566        }
567
568        let row_idx = self.next_row_idx.fetch_add(1, Ordering::SeqCst);
569
570        // Insert into each column
571        let mut bytes = 0usize;
572        for (i, value) in values.iter().enumerate() {
573            let mut col = self.columns[i].write();
574            if let Some(data) = value {
575                bytes += data.len();
576            }
577            col.append(*value);
578        }
579
580        // Update row ID mapping (forward and reverse)
581        {
582            let mut ids = self.row_ids.write();
583            ids.insert(row_id, row_idx);
584        }
585        {
586            let mut idx_to_id = self.row_idx_to_id.write();
587            // Ensure vector is large enough
588            while idx_to_id.len() <= row_idx as usize {
589                idx_to_id.push(0); // placeholder
590            }
591            idx_to_id[row_idx as usize] = row_id;
592        }
593
594        self.bytes_written
595            .fetch_add(bytes as u64, Ordering::Relaxed);
596
597        Ok(())
598    }
599
600    /// Get a row by row ID (O(log N) lookup via BTreeMap)
601    /// Returns all column values for the row
602    pub fn get(&self, row_id: RowId) -> Option<Vec<Option<Vec<u8>>>> {
603        // Look up row index from row ID
604        let row_ids = self.row_ids.read();
605        let row_idx = *row_ids.get(&row_id)?;
606        drop(row_ids);
607
608        // Read all columns for this row
609        let mut values = Vec::with_capacity(self.columns.len());
610        for col in &self.columns {
611            let col_buf = col.read();
612            values.push(col_buf.get(row_idx));
613        }
614
615        Some(values)
616    }
617
618    /// Get specific columns for a row by row ID
619    pub fn get_columns(
620        &self,
621        row_id: RowId,
622        col_indices: &[usize],
623    ) -> Option<Vec<Option<Vec<u8>>>> {
624        // Look up row index from row ID
625        let row_ids = self.row_ids.read();
626        let row_idx = *row_ids.get(&row_id)?;
627        drop(row_ids);
628
629        // Read only requested columns
630        let mut values = Vec::with_capacity(col_indices.len());
631        for &col_idx in col_indices {
632            if col_idx < self.columns.len() {
633                let col_buf = self.columns[col_idx].read();
634                values.push(col_buf.get(row_idx));
635            } else {
636                values.push(None);
637            }
638        }
639
640        Some(values)
641    }
642
643    /// Scan a range of row IDs, returning all matching rows
644    pub fn scan_range(&self, start: RowId, end: RowId) -> Vec<(RowId, Vec<Option<Vec<u8>>>)> {
645        let row_ids = self.row_ids.read();
646        let mut results = Vec::new();
647
648        for (&row_id, &row_idx) in row_ids.range(start..=end) {
649            let mut values = Vec::with_capacity(self.columns.len());
650            for col in &self.columns {
651                let col_buf = col.read();
652                values.push(col_buf.get(row_idx));
653            }
654            results.push((row_id, values));
655        }
656
657        results
658    }
659
660    /// Check if memtable is full
661    pub fn is_full(&self) -> bool {
662        self.bytes_written.load(Ordering::Relaxed) as usize >= self.size_limit
663    }
664
665    /// Get row count
666    pub fn row_count(&self) -> u64 {
667        self.next_row_idx.load(Ordering::SeqCst)
668    }
669
670    /// Get memory usage
671    pub fn memory_bytes(&self) -> usize {
672        self.columns.iter().map(|c| c.read().memory_bytes()).sum()
673    }
674
675    /// Get schema
676    pub fn schema(&self) -> &TableSchema {
677        &self.schema
678    }
679}
680
681use sochdb_core::learned_index::LearnedSparseIndex;
682
683/// Metadata for a stored column
684#[derive(Debug, Clone, Serialize, Deserialize)]
685pub struct ColumnIndex {
686    /// Offset in the file
687    pub offset: u64,
688    /// Length in bytes
689    pub length: u64,
690    /// Compression type (0 = None for now)
691    pub compression: u8,
692}
693
694/// Column group stored on disk
695#[derive(Debug)]
696#[allow(dead_code)]
697pub struct ColumnGroup {
698    /// Path to the monolithic .sst file
699    path: PathBuf,
700    /// Table schema
701    schema: TableSchema,
702    /// Level in LSM tree (0 = L0)
703    level: u32,
704    /// Sequence number
705    sequence: u64,
706    /// Row count
707    row_count: u64,
708    /// Min timestamp
709    min_timestamp: u64,
710    /// Max timestamp
711    max_timestamp: u64,
712    /// Column offsets loaded from footer
713    column_offsets: BTreeMap<String, ColumnIndex>,
714    /// Learned Sparse Index for RowId lookup
715    lsi: Option<LearnedSparseIndex>,
716}
717
718impl ColumnGroup {
719    /// Magic bytes for SSTable file (TOON + version 1)
720    const MAGIC: [u8; 4] = [b'T', b'O', b'O', b'N'];
721    const VERSION: u32 = 1;
722
723    /// Create metadata for a column group
724    #[allow(clippy::too_many_arguments)]
725    pub fn new(
726        path: PathBuf,
727        schema: TableSchema,
728        level: u32,
729        sequence: u64,
730        row_count: u64,
731        min_timestamp: u64,
732        max_timestamp: u64,
733        column_offsets: BTreeMap<String, ColumnIndex>,
734        lsi: Option<LearnedSparseIndex>,
735    ) -> Self {
736        Self {
737            path,
738            schema,
739            level,
740            sequence,
741            row_count,
742            min_timestamp,
743            max_timestamp,
744            column_offsets,
745            lsi,
746        }
747    }
748
749    /// Write memtable to disk as a single SSTable file
750    pub fn from_memtable(
751        base_path: &Path,
752        memtable: &ColumnarMemtable,
753        level: u32,
754        sequence: u64,
755    ) -> Result<Self> {
756        use byteorder::{LittleEndian, WriteBytesExt};
757        use std::fs::File;
758        use std::io::{BufWriter, Seek, Write};
759
760        // Create monolithic file: L{level}_seq{sequence}.sst
761        let file_name = format!("L{}_seq{}.sst", level, sequence);
762        let file_path = base_path.join(&file_name);
763        let file = File::create(&file_path)?;
764        let mut writer = BufWriter::new(file);
765
766        // Write Header
767        writer.write_all(&Self::MAGIC)?;
768        writer.write_u32::<LittleEndian>(Self::VERSION)?;
769
770        let mut column_offsets = BTreeMap::new();
771        let mut min_ts = u64::MAX;
772        let mut max_ts = 0u64;
773
774        // Write each column sequentially
775        for (i, col_lock) in memtable.columns.iter().enumerate() {
776            let col = col_lock.read();
777            let col_def = &memtable.schema.columns[i];
778
779            // Extract min/max timestamps from __txn_start column
780            if col_def.name == "__txn_start" && col.col_type == ColumnType::UInt64 {
781                // Parse u64 timestamps from the column data
782                let mut offset = 0;
783                let row_count = col.row_count as usize;
784                for row_idx in 0..row_count {
785                    // Check if not null
786                    let byte_idx = row_idx / 8;
787                    let bit_idx = row_idx % 8;
788                    let is_null =
789                        byte_idx < col.nulls.len() && (col.nulls[byte_idx] & (1 << bit_idx)) != 0;
790
791                    if !is_null && offset + 8 <= col.data.len() {
792                        let ts = u64::from_le_bytes(
793                            col.data[offset..offset + 8].try_into().unwrap_or([0u8; 8]),
794                        );
795                        min_ts = min_ts.min(ts);
796                        max_ts = max_ts.max(ts);
797                    }
798                    offset += 8;
799                }
800            }
801
802            let start_offset = writer.stream_position()?;
803
804            // Column Header within the block
805            writer.write_u8(col.col_type as u8)?;
806            writer.write_u64::<LittleEndian>(col.row_count)?;
807
808            // Null bitmap
809            writer.write_u32::<LittleEndian>(col.nulls.len() as u32)?;
810            writer.write_all(&col.nulls)?;
811
812            // Offsets (if variable-length)
813            if let Some(offsets) = &col.offsets {
814                writer.write_u32::<LittleEndian>(offsets.len() as u32)?;
815                for &off in offsets {
816                    writer.write_u32::<LittleEndian>(off)?;
817                }
818            }
819
820            // Data
821            writer.write_u32::<LittleEndian>(col.data.len() as u32)?;
822            writer.write_all(&col.data)?;
823
824            let end_offset = writer.stream_position()?;
825
826            column_offsets.insert(
827                col_def.name.clone(),
828                ColumnIndex {
829                    offset: start_offset,
830                    length: end_offset - start_offset,
831                    compression: 0, // No compression yet
832                },
833            );
834        }
835
836        // Build Learned Sparse Index on RowIds
837        let row_ids = memtable.row_ids.read();
838        let keys: Vec<u64> = row_ids.keys().cloned().collect();
839        let lsi = LearnedSparseIndex::build(&keys);
840
841        // Write Footer (Index + LSI)
842        let footer_start = writer.stream_position()?;
843
844        // Serialize Column Offsets
845        let offsets_bytes = bincode::serialize(&column_offsets)
846            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
847        writer.write_u64::<LittleEndian>(offsets_bytes.len() as u64)?;
848        writer.write_all(&offsets_bytes)?;
849
850        // Serialize LSI
851        let lsi_bytes =
852            bincode::serialize(&lsi).map_err(|e| SochDBError::Serialization(e.to_string()))?;
853        writer.write_u64::<LittleEndian>(lsi_bytes.len() as u64)?;
854        writer.write_all(&lsi_bytes)?;
855
856        // Write Footer Offset and Magic at the very end
857        writer.write_u64::<LittleEndian>(footer_start)?;
858        writer.write_all(&Self::MAGIC)?;
859
860        writer.flush()?;
861
862        // Fallback: use current time if no timestamps were found in data
863        if min_ts == u64::MAX || max_ts == 0 {
864            let now = std::time::SystemTime::now()
865                .duration_since(std::time::UNIX_EPOCH)
866                .unwrap()
867                .as_micros() as u64;
868            min_ts = now;
869            max_ts = now;
870        }
871
872        Ok(Self {
873            path: file_path,
874            schema: memtable.schema.clone(),
875            level,
876            sequence,
877            row_count: memtable.row_count(),
878            min_timestamp: min_ts,
879            max_timestamp: max_ts,
880            column_offsets,
881            lsi: Some(lsi),
882        })
883    }
884
885    /// Open a ColumnGroup from an existing SST file
886    pub fn open(path: PathBuf, schema: TableSchema, level: u32, sequence: u64) -> Result<Self> {
887        use byteorder::{LittleEndian, ReadBytesExt};
888        use std::fs::File;
889        use std::io::{Read, Seek, SeekFrom};
890
891        let mut file = File::open(&path)?;
892        let file_len = file.metadata()?.len();
893
894        if file_len < 12 {
895            // Magic (4) + FooterOffset (8)
896            return Err(SochDBError::Corruption("File too short".to_string()));
897        }
898
899        // Read Footer Offset
900        file.seek(SeekFrom::End(-12))?;
901        let footer_offset = file.read_u64::<LittleEndian>()?;
902        let mut magic = [0u8; 4];
903        file.read_exact(&mut magic)?;
904
905        if magic != Self::MAGIC {
906            return Err(SochDBError::Corruption("Invalid magic bytes".to_string()));
907        }
908
909        // Read Footer
910        file.seek(SeekFrom::Start(footer_offset))?;
911
912        // Read Column Offsets
913        let offsets_len = file.read_u64::<LittleEndian>()?;
914        let mut offsets_bytes = vec![0u8; offsets_len as usize];
915        file.read_exact(&mut offsets_bytes)?;
916        let column_offsets: BTreeMap<String, ColumnIndex> = bincode::deserialize(&offsets_bytes)
917            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
918
919        // Read LSI
920        let lsi_len = file.read_u64::<LittleEndian>()?;
921        let mut lsi_bytes = vec![0u8; lsi_len as usize];
922        file.read_exact(&mut lsi_bytes)?;
923        let lsi: LearnedSparseIndex = bincode::deserialize(&lsi_bytes)
924            .map_err(|e| SochDBError::Serialization(e.to_string()))?;
925
926        Ok(Self {
927            path,
928            schema,
929            level,
930            sequence,
931            row_count: 0, // Needs to be fixed by storing in footer
932            min_timestamp: 0,
933            max_timestamp: 0,
934            column_offsets,
935            lsi: Some(lsi),
936        })
937    }
938
939    /// Get path to the SST file
940    pub fn file_path(&self) -> &Path {
941        &self.path
942    }
943
944    /// Get offset info for a column
945    pub fn column_index(&self, col_name: &str) -> Option<&ColumnIndex> {
946        self.column_offsets.get(col_name)
947    }
948
949    /// Get level
950    pub fn level(&self) -> u32 {
951        self.level
952    }
953
954    /// Get row count
955    pub fn row_count(&self) -> u64 {
956        self.row_count
957    }
958}
959
960// ============================================================================
961// Compaction Statistics (Task 3)
962// ============================================================================
963
964/// Statistics for column-aware compaction
965#[derive(Debug, Clone, Default)]
966pub struct CompactionStats {
967    /// Total compactions performed
968    pub compactions_total: u64,
969    /// L0 to L1 compactions
970    pub l0_compactions: u64,
971    /// Total bytes read during compaction
972    pub bytes_read: u64,
973    /// Total bytes written during compaction
974    pub bytes_written: u64,
975    /// Hot column compactions (only hot columns merged)
976    pub hot_column_compactions: u64,
977    /// Cold column references preserved (not rewritten)
978    pub cold_column_refs_preserved: u64,
979    /// Estimated write amplification reduction
980    pub estimated_wa_reduction: f64,
981    /// Last compaction duration (micros)
982    pub last_compaction_duration_us: u64,
983}
984
985impl CompactionStats {
986    /// Calculate write amplification factor
987    pub fn write_amplification(&self) -> f64 {
988        if self.bytes_read == 0 {
989            1.0
990        } else {
991            self.bytes_written as f64 / self.bytes_read as f64
992        }
993    }
994}
995
996/// Statistics from WAL recovery
997#[derive(Debug, Clone, Default)]
998pub struct LscsRecoveryStats {
999    /// Number of transactions successfully replayed
1000    pub transactions_recovered: usize,
1001    /// Number of rows restored to memtable
1002    pub rows_recovered: usize,
1003    /// Maximum row ID found (used to set next_row_id)
1004    pub max_row_id: u64,
1005}
1006
1007/// LSCS configuration
1008#[derive(Debug, Clone)]
1009pub struct LscsConfig {
1010    /// Memtable size limit in bytes
1011    pub memtable_size: usize,
1012    /// Number of levels
1013    pub num_levels: usize,
1014    /// Size ratio between levels
1015    pub level_ratio: usize,
1016    /// Maximum L0 column groups before compaction
1017    pub l0_compaction_threshold: usize,
1018    /// Hot column temperature threshold (0.0-1.0)
1019    pub hot_threshold: f64,
1020    /// Temperature window size (number of updates)
1021    pub temperature_window_size: u64,
1022}
1023
1024impl Default for LscsConfig {
1025    fn default() -> Self {
1026        Self {
1027            memtable_size: 64 * 1024 * 1024, // 64 MB
1028            num_levels: 7,
1029            level_ratio: 10,
1030            l0_compaction_threshold: 4,
1031            hot_threshold: 0.1,            // 10% threshold for "hot" column
1032            temperature_window_size: 1000, // 1000 updates per window
1033        }
1034    }
1035}
1036
1037/// Log-Structured Column Store
1038pub struct Lscs {
1039    /// Configuration
1040    config: LscsConfig,
1041    /// Base path for storage
1042    path: PathBuf,
1043    /// Table schema
1044    schema: TableSchema,
1045    /// Write-ahead log
1046    wal: Arc<TxnWal>,
1047    /// Active memtable
1048    active_memtable: RwLock<ColumnarMemtable>,
1049    /// Immutable memtables pending flush
1050    immutable_memtables: RwLock<Vec<ColumnarMemtable>>,
1051    /// Column groups by level
1052    column_groups: RwLock<Vec<Vec<ColumnGroup>>>,
1053    /// Segment descriptors with column stripe references (Task 3)
1054    segment_descriptors: RwLock<HashMap<u64, SegmentDescriptor>>,
1055    /// Column temperature tracker (Task 3)
1056    temperature_tracker: Arc<ColumnTemperatureTracker>,
1057    /// Next sequence number
1058    next_sequence: AtomicU64,
1059    /// Next row ID
1060    next_row_id: AtomicU64,
1061    /// Compaction statistics (Task 3)
1062    compaction_stats: RwLock<CompactionStats>,
1063}
1064
1065impl Lscs {
1066    /// Create a new LSCS instance
1067    pub fn new(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1068        std::fs::create_dir_all(&path)?;
1069
1070        let wal_path = path.join("wal.log");
1071        let wal = Arc::new(TxnWal::new(&wal_path)?);
1072
1073        let active_memtable = ColumnarMemtable::new(schema.clone(), config.memtable_size);
1074
1075        let mut column_groups = Vec::with_capacity(config.num_levels);
1076        for _ in 0..config.num_levels {
1077            column_groups.push(Vec::new());
1078        }
1079
1080        // Create temperature tracker for all columns (Task 3)
1081        let column_names: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1082        let temperature_tracker = Arc::new(ColumnTemperatureTracker::new(
1083            &column_names,
1084            config.temperature_window_size,
1085        ));
1086
1087        Ok(Self {
1088            config,
1089            path,
1090            schema,
1091            wal,
1092            active_memtable: RwLock::new(active_memtable),
1093            immutable_memtables: RwLock::new(Vec::new()),
1094            column_groups: RwLock::new(column_groups),
1095            segment_descriptors: RwLock::new(HashMap::new()),
1096            temperature_tracker,
1097            next_sequence: AtomicU64::new(0),
1098            next_row_id: AtomicU64::new(1),
1099            compaction_stats: RwLock::new(CompactionStats::default()),
1100        })
1101    }
1102
1103    /// Open an existing LSCS instance and recover from WAL
1104    ///
1105    /// This is the production entrypoint that ensures durability:
1106    /// 1. Opens the existing storage directory
1107    /// 2. Replays committed transactions from WAL into memtable
1108    /// 3. Updates next_row_id to prevent ID conflicts
1109    ///
1110    /// ## Crash Recovery Guarantees
1111    ///
1112    /// - Only committed transactions are replayed (atomicity)
1113    /// - All committed data is restored (durability)
1114    /// - Uncommitted transactions are discarded (consistency)
1115    pub fn open(path: PathBuf, schema: TableSchema, config: LscsConfig) -> Result<Self> {
1116        let lscs = Self::new(path, schema, config)?;
1117        let stats = lscs.recover()?;
1118        
1119        if stats.rows_recovered > 0 {
1120            eprintln!(
1121                "LSCS Recovery: restored {} rows from {} transactions",
1122                stats.rows_recovered, stats.transactions_recovered
1123            );
1124        }
1125        
1126        Ok(lscs)
1127    }
1128
1129    /// Perform crash recovery by replaying committed WAL entries
1130    ///
1131    /// Returns statistics about recovered data.
1132    pub fn recover(&self) -> Result<LscsRecoveryStats> {
1133        let (writes, txn_count) = self.wal.replay_for_recovery()?;
1134
1135        if writes.is_empty() {
1136            return Ok(LscsRecoveryStats::default());
1137        }
1138
1139        let mut max_row_id: u64 = 0;
1140        let mut rows_recovered = 0usize;
1141
1142        // Apply committed writes to memtable
1143        for (key, value) in &writes {
1144            // Key is row_id as little-endian u64
1145            if key.len() >= 8 {
1146                let row_id = u64::from_le_bytes(key[..8].try_into().unwrap_or([0; 8]));
1147                if row_id > max_row_id {
1148                    max_row_id = row_id;
1149                }
1150
1151                // Deserialize row and insert into memtable
1152                if let Ok(row_values) = Self::deserialize_row(value) {
1153                    let value_refs: Vec<Option<&[u8]>> = row_values.iter().map(|v| v.as_deref()).collect();
1154                    
1155                    let memtable = self.active_memtable.read();
1156                    if memtable.insert(row_id, &value_refs).is_ok() {
1157                        rows_recovered += 1;
1158                    }
1159                }
1160            }
1161        }
1162
1163        // Update next_row_id to prevent conflicts
1164        if max_row_id > 0 {
1165            self.next_row_id.store(max_row_id + 1, Ordering::SeqCst);
1166        }
1167
1168        Ok(LscsRecoveryStats {
1169            transactions_recovered: txn_count,
1170            rows_recovered,
1171            max_row_id,
1172        })
1173    }
1174
1175    /// Write a clean shutdown marker
1176    ///
1177    /// Call this during graceful shutdown to indicate all data was flushed.
1178    /// On next open, if this marker exists, recovery can be optimized.
1179    pub fn mark_clean_shutdown(&self) -> Result<()> {
1180        // First ensure all data is synced
1181        self.fsync()?;
1182
1183        // Write clean shutdown marker
1184        let marker_path = self.path.join(".clean_shutdown");
1185        std::fs::write(&marker_path, b"clean")?;
1186
1187        // Optionally truncate WAL after checkpoint
1188        // (conservative: we leave WAL intact for extra safety)
1189
1190        Ok(())
1191    }
1192
1193    /// Insert a row
1194    pub fn insert(&self, values: &[Option<&[u8]>]) -> Result<RowId> {
1195        let row_id = self.next_row_id.fetch_add(1, Ordering::SeqCst);
1196
1197        // Write to WAL first
1198        let txn_id = self.wal.begin_transaction()?;
1199
1200        // Serialize values for WAL
1201        let key = row_id.to_le_bytes().to_vec();
1202        let value = self.serialize_row(values)?;
1203        self.wal.write(txn_id, key, value)?;
1204        self.wal.commit_transaction(txn_id)?;
1205
1206        // Insert into memtable
1207        let memtable = self.active_memtable.read();
1208        memtable.insert(row_id, values)?;
1209
1210        // Check if memtable is full
1211        if memtable.is_full() {
1212            drop(memtable);
1213            self.rotate_memtable()?;
1214        }
1215
1216        Ok(row_id)
1217    }
1218
1219    /// Mark a row as deleted by setting __txn_end to the given transaction timestamp
1220    ///
1221    /// In MVCC, deletion doesn't remove the row immediately - instead we mark
1222    /// it with an end timestamp so it becomes invisible to newer transactions.
1223    ///
1224    /// ## Durability
1225    ///
1226    /// This operation is fully WAL-logged with proper transaction boundaries:
1227    /// - TxnBegin record
1228    /// - Data record with the updated row
1229    /// - TxnCommit record with fsync
1230    ///
1231    /// On crash recovery, only committed deletions will be replayed.
1232    pub fn mark_deleted(&self, row_id: RowId, _caller_txn_id: u64, txn_end: u64) -> Result<()> {
1233        // Find the __txn_end column index
1234        let txn_end_idx = self
1235            .schema
1236            .columns
1237            .iter()
1238            .position(|c| c.name == "__txn_end")
1239            .ok_or_else(|| {
1240                SochDBError::InvalidData("Schema missing __txn_end column for MVCC".to_string())
1241            })?;
1242
1243        // Get current row values
1244        let current = self
1245            .get(row_id)?
1246            .ok_or_else(|| SochDBError::NotFound(format!("Row {} not found", row_id)))?;
1247
1248        // Build new row with updated __txn_end
1249        let mut new_values: Vec<Option<Vec<u8>>> = current;
1250        new_values[txn_end_idx] = Some(txn_end.to_le_bytes().to_vec());
1251
1252        // Convert to references for insert
1253        let value_refs: Vec<Option<&[u8]>> = new_values.iter().map(|v| v.as_deref()).collect();
1254
1255        // Write to WAL with proper transaction boundaries for durability
1256        // Begin a new WAL transaction (allocates txn_id and writes TxnBegin)
1257        let wal_txn_id = self.wal.begin_transaction()?;
1258
1259        // Write the data record
1260        let row_data = self.serialize_row(&value_refs)?;
1261        self.wal.write(wal_txn_id, row_id.to_le_bytes().to_vec(), row_data)?;
1262
1263        // Commit with fsync for durability guarantee
1264        self.wal.commit_transaction(wal_txn_id)?;
1265
1266        // Update memtable (only after WAL commit succeeds)
1267        let memtable = self.active_memtable.read();
1268        memtable.insert(row_id, &value_refs)?;
1269
1270        Ok(())
1271    }
1272
1273    /// Serialize a row for WAL storage
1274    fn serialize_row(&self, values: &[Option<&[u8]>]) -> Result<Vec<u8>> {
1275        use byteorder::{LittleEndian, WriteBytesExt};
1276
1277        let mut buf = Vec::new();
1278        buf.write_u32::<LittleEndian>(values.len() as u32)?;
1279
1280        for value in values {
1281            match value {
1282                Some(data) => {
1283                    buf.write_u8(1)?; // non-null
1284                    buf.write_u32::<LittleEndian>(data.len() as u32)?;
1285                    buf.extend_from_slice(data);
1286                }
1287                None => {
1288                    buf.write_u8(0)?; // null
1289                }
1290            }
1291        }
1292
1293        Ok(buf)
1294    }
1295
1296    /// Deserialize a row from WAL storage
1297    #[allow(dead_code)]
1298    fn deserialize_row(data: &[u8]) -> Result<Vec<Option<Vec<u8>>>> {
1299        use byteorder::{LittleEndian, ReadBytesExt};
1300        use std::io::Cursor;
1301
1302        let mut cursor = Cursor::new(data);
1303        let num_cols = cursor.read_u32::<LittleEndian>()? as usize;
1304        let mut values = Vec::with_capacity(num_cols);
1305
1306        for _ in 0..num_cols {
1307            let is_non_null = cursor.read_u8()? == 1;
1308            if is_non_null {
1309                let len = cursor.read_u32::<LittleEndian>()? as usize;
1310                let pos = cursor.position() as usize;
1311                let value = data[pos..pos + len].to_vec();
1312                cursor.set_position((pos + len) as u64);
1313                values.push(Some(value));
1314            } else {
1315                values.push(None);
1316            }
1317        }
1318
1319        Ok(values)
1320    }
1321
1322    /// Get a row by row ID
1323    ///
1324    /// Search order: memtable -> immutable memtables -> SSTables
1325    /// Uses learned sparse index for O(1) expected time on SSTables
1326    pub fn get(&self, row_id: RowId) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1327        // 1. Check active memtable (O(log N))
1328        {
1329            let memtable = self.active_memtable.read();
1330            if let Some(values) = memtable.get(row_id) {
1331                return Ok(Some(values));
1332            }
1333        }
1334
1335        // 2. Check immutable memtables (O(log N) per table)
1336        {
1337            let immutable = self.immutable_memtables.read();
1338            for memtable in immutable.iter().rev() {
1339                if let Some(values) = memtable.get(row_id) {
1340                    return Ok(Some(values));
1341                }
1342            }
1343        }
1344
1345        // 3. Check SSTables using learned sparse index
1346        // For each level, use LSI for O(1) expected lookup
1347        {
1348            use sochdb_core::learned_index::LookupResult;
1349            let groups = self.column_groups.read();
1350            for level in &*groups {
1351                for group in level.iter().rev() {
1352                    if let Some(lsi) = &group.lsi {
1353                        // Use learned index for O(1) lookup
1354                        let lookup = lsi.lookup(row_id);
1355                        match lookup {
1356                            LookupResult::Exact(_) | LookupResult::Range { .. } => {
1357                                // Key might be in this SSTable, read to confirm
1358                                if let Some(row) = self.read_row_from_sstable(group, row_id)? {
1359                                    return Ok(Some(row));
1360                                }
1361                            }
1362                            LookupResult::NotFound => {
1363                                // Key definitely not in this SSTable
1364                                continue;
1365                            }
1366                        }
1367                    }
1368                }
1369            }
1370        }
1371
1372        Ok(None)
1373    }
1374
1375    /// Read a single row from an SSTable file
1376    fn read_row_from_sstable(
1377        &self,
1378        group: &ColumnGroup,
1379        row_id: RowId,
1380    ) -> Result<Option<Vec<Option<Vec<u8>>>>> {
1381        use byteorder::{LittleEndian, ReadBytesExt};
1382        use std::fs::File;
1383        use std::io::{BufReader, Read, Seek, SeekFrom};
1384
1385        let file = File::open(group.file_path())?;
1386        let mut reader = BufReader::new(file);
1387
1388        let mut values = Vec::new();
1389
1390        // Read each column's data for this row
1391        for (col_name, col_idx) in &group.column_offsets {
1392            reader.seek(SeekFrom::Start(col_idx.offset))?;
1393
1394            // Read column header
1395            let col_type = reader.read_u8()?;
1396            let row_count = reader.read_u64::<LittleEndian>()?;
1397
1398            if row_id >= row_count {
1399                values.push(None);
1400                continue;
1401            }
1402
1403            // Read null bitmap
1404            let nulls_len = reader.read_u32::<LittleEndian>()? as usize;
1405            let mut nulls = vec![0u8; nulls_len];
1406            reader.read_exact(&mut nulls)?;
1407
1408            // Check if this row is null
1409            let byte_idx = (row_id / 8) as usize;
1410            let bit_offset = (row_id % 8) as u8;
1411            let is_null = byte_idx >= nulls.len() || (nulls[byte_idx] & (1 << bit_offset)) == 0;
1412
1413            if is_null {
1414                values.push(None);
1415                continue;
1416            }
1417
1418            // Read value based on column type
1419            let col_type = ColumnType::from_byte(col_type).unwrap_or(ColumnType::Binary);
1420            if let Some(fixed_size) = col_type.fixed_size() {
1421                // Skip nulls bitmap, then seek to row
1422                let offsets_section = reader.stream_position()?;
1423                let data_len = reader.read_u32::<LittleEndian>()? as usize;
1424                let _ = data_len;
1425
1426                // Calculate offset for this row
1427                let row_offset = (row_id as usize) * fixed_size;
1428                reader.seek(SeekFrom::Start(offsets_section + 4 + row_offset as u64))?;
1429
1430                let mut value = vec![0u8; fixed_size];
1431                reader.read_exact(&mut value)?;
1432                values.push(Some(value));
1433            } else {
1434                // Variable-length: read offsets array
1435                let offsets_count = reader.read_u32::<LittleEndian>()? as usize;
1436                let mut offsets = vec![0u32; offsets_count];
1437                for offset in offsets.iter_mut().take(offsets_count) {
1438                    *offset = reader.read_u32::<LittleEndian>()?;
1439                }
1440
1441                if (row_id as usize + 1) >= offsets.len() {
1442                    values.push(None);
1443                    continue;
1444                }
1445
1446                let start = offsets[row_id as usize] as usize;
1447                let end = offsets[(row_id + 1) as usize] as usize;
1448
1449                // Read data section
1450                let data_len = reader.read_u32::<LittleEndian>()? as usize;
1451                let data_start = reader.stream_position()?;
1452
1453                if end <= data_len {
1454                    reader.seek(SeekFrom::Start(data_start + start as u64))?;
1455                    let mut value = vec![0u8; end - start];
1456                    reader.read_exact(&mut value)?;
1457                    values.push(Some(value));
1458                } else {
1459                    values.push(None);
1460                }
1461            }
1462            let _ = col_name; // silence unused warning
1463        }
1464
1465        if values.is_empty() {
1466            Ok(None)
1467        } else {
1468            Ok(Some(values))
1469        }
1470    }
1471
1472    /// Fsync - ensure all data is durably persisted to disk
1473    ///
1474    /// This is the key durability guarantee:
1475    /// 1. Flush WAL to disk with fsync
1476    /// 2. If memtable is large, flush to SSTable
1477    ///
1478    /// After fsync returns, all prior writes are guaranteed durable.
1479    pub fn fsync(&self) -> Result<()> {
1480        // 1. Sync WAL (critical for durability)
1481        self.wal.sync()?;
1482
1483        // 2. Optionally flush memtable if it's getting large
1484        let memtable = self.active_memtable.read();
1485        let should_flush = memtable.memory_bytes() > self.config.memtable_size / 2;
1486        drop(memtable);
1487
1488        if should_flush {
1489            // Rotate and flush in background
1490            self.rotate_memtable()?;
1491            self.flush()?;
1492        }
1493
1494        Ok(())
1495    }
1496
1497    /// Rotate memtable (switch to new one, add old to immutable list)
1498    fn rotate_memtable(&self) -> Result<()> {
1499        let new_memtable = ColumnarMemtable::new(self.schema.clone(), self.config.memtable_size);
1500
1501        let old_memtable = {
1502            let mut active = self.active_memtable.write();
1503            std::mem::replace(&mut *active, new_memtable)
1504        };
1505
1506        let mut immutable = self.immutable_memtables.write();
1507        immutable.push(old_memtable);
1508
1509        // Trigger background flush if we have pending immutable memtables
1510        if immutable.len() >= 2 {
1511            // Flush synchronously if too many pending
1512            drop(immutable); // Release lock before flushing
1513            self.flush()?;
1514        }
1515
1516        Ok(())
1517    }
1518
1519    /// Flush immutable memtables to disk
1520    pub fn flush(&self) -> Result<()> {
1521        let memtables = {
1522            let mut immutable = self.immutable_memtables.write();
1523            std::mem::take(&mut *immutable)
1524        };
1525
1526        for memtable in memtables {
1527            let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1528            let column_group = ColumnGroup::from_memtable(&self.path, &memtable, 0, sequence)?;
1529
1530            let mut groups = self.column_groups.write();
1531            groups[0].push(column_group);
1532        }
1533
1534        // Check if L0 compaction needed
1535        let groups = self.column_groups.read();
1536        if groups[0].len() >= self.config.l0_compaction_threshold {
1537            drop(groups);
1538            self.compact_l0()?;
1539        }
1540
1541        Ok(())
1542    }
1543
1544    /// Compact L0 column groups using column-aware compaction (Task 3)
1545    ///
1546    /// This implementation:
1547    /// 1. Identifies hot columns based on temperature tracking
1548    /// 2. Only merges hot columns to L1
1549    /// 3. Preserves cold column references (no rewrite)
1550    /// 4. Reduces write amplification by factor of (hot_columns / total_columns)
1551    fn compact_l0(&self) -> Result<()> {
1552        let start_time = std::time::Instant::now();
1553
1554        // Get hot and cold columns
1555        let hot_columns = self.temperature_tracker.get_hot_columns();
1556        let cold_columns = self.temperature_tracker.get_cold_columns();
1557
1558        let total_columns = self.schema.columns.len();
1559        let hot_fraction = if total_columns > 0 {
1560            hot_columns.len() as f64 / total_columns as f64
1561        } else {
1562            1.0
1563        };
1564
1565        // Get L0 segments to compact
1566        let l0_segments: Vec<ColumnGroup> = {
1567            let mut groups = self.column_groups.write();
1568            std::mem::take(&mut groups[0])
1569        };
1570
1571        if l0_segments.is_empty() {
1572            return Ok(());
1573        }
1574
1575        let mut bytes_read = 0u64;
1576        let mut bytes_written = 0u64;
1577        let mut cold_refs_preserved = 0u64;
1578
1579        // Perform selective merge
1580        let sequence = self.next_sequence.fetch_add(1, Ordering::SeqCst);
1581
1582        // For a full implementation, we would:
1583        // 1. Read hot column data from all L0 segments
1584        // 2. Merge sort the data by row_id
1585        // 3. Write merged hot columns to new L1 segment
1586        // 4. Create segment descriptor with cold column references
1587
1588        // For now, implement a simplified version that demonstrates the approach
1589        let _merged_path = self.path.join(format!("L1_seq{}.sst", sequence));
1590
1591        // Track column stripe references for the new segment
1592        let mut col_refs = HashMap::new();
1593        let mut total_row_count = 0u64;
1594        let min_row_id = u64::MAX;
1595        let max_row_id = 0u64;
1596
1597        // Process each L0 segment
1598        for segment in &l0_segments {
1599            bytes_read += segment.row_count * 100; // Estimate
1600            total_row_count += segment.row_count;
1601
1602            // For hot columns: read and merge
1603            for col_name in &hot_columns {
1604                if let Some(col_idx) = segment.column_offsets.get(col_name) {
1605                    bytes_read += col_idx.length;
1606                    bytes_written += col_idx.length;
1607                }
1608            }
1609
1610            // For cold columns: just keep reference (no I/O)
1611            for col_name in &cold_columns {
1612                if let Some(col_idx) = segment.column_offsets.get(col_name) {
1613                    // Create reference to existing stripe (no rewrite)
1614                    let stripe_ref = ColumnStripeRef::new(
1615                        segment.level,
1616                        segment.sequence,
1617                        col_name.clone(),
1618                        col_idx.offset,
1619                        col_idx.length,
1620                        segment.row_count,
1621                    );
1622                    col_refs.insert(col_name.clone(), stripe_ref);
1623                    cold_refs_preserved += 1;
1624                }
1625            }
1626        }
1627
1628        // Create segment descriptor for the merged segment
1629        let segment_desc = SegmentDescriptor {
1630            id: sequence,
1631            level: 1,
1632            col_refs,
1633            min_row_id,
1634            max_row_id,
1635            row_count: total_row_count,
1636            min_timestamp: 0,
1637            max_timestamp: std::time::SystemTime::now()
1638                .duration_since(std::time::UNIX_EPOCH)
1639                .unwrap()
1640                .as_micros() as u64,
1641            is_tombstone: false,
1642        };
1643
1644        // Store segment descriptor
1645        {
1646            let mut descriptors = self.segment_descriptors.write();
1647            descriptors.insert(sequence, segment_desc);
1648        }
1649
1650        // Update compaction stats
1651        {
1652            let mut stats = self.compaction_stats.write();
1653            stats.compactions_total += 1;
1654            stats.l0_compactions += 1;
1655            stats.bytes_read += bytes_read;
1656            stats.bytes_written += bytes_written;
1657            stats.cold_column_refs_preserved += cold_refs_preserved;
1658            stats.hot_column_compactions += 1;
1659            stats.estimated_wa_reduction = 1.0 / hot_fraction.max(0.01);
1660            stats.last_compaction_duration_us = start_time.elapsed().as_micros() as u64;
1661        }
1662
1663        // Clean up old L0 segments (mark as tombstones)
1664        // In production, this would be coordinated with file deletion
1665        for segment in l0_segments {
1666            // The segment files remain for cold column references
1667            // until no longer needed
1668            let _ = segment; // Drop the in-memory metadata
1669        }
1670
1671        Ok(())
1672    }
1673
1674    /// Perform selective merge of hot columns across segments (Task 3)
1675    ///
1676    /// Algorithm:
1677    ///   Input: L0 segments S₀ = {s₁, s₂, ..., s_n}, hot columns H
1678    ///   
1679    ///   1. For each column cⱼ ∈ H:
1680    ///      merged[cⱼ] = merge_column_stripes(S₀[cⱼ])
1681    ///   
1682    ///   2. Write merged hot columns to new segment
1683    ///   
1684    ///   Time: O(|H| × N × log(N)) where N = rows
1685    #[allow(dead_code)]
1686    fn selective_merge_hot_columns(
1687        &self,
1688        segments: &[&ColumnGroup],
1689        hot_columns: &HashSet<String>,
1690        output_path: &Path,
1691    ) -> Result<HashMap<String, ColumnStripeRef>> {
1692        use byteorder::{LittleEndian, WriteBytesExt};
1693        use std::fs::File;
1694        use std::io::{BufWriter, Seek, Write};
1695
1696        let mut result = HashMap::new();
1697
1698        // Create output file
1699        let file = File::create(output_path)?;
1700        let mut writer = BufWriter::new(file);
1701
1702        // Write header
1703        writer.write_all(&ColumnGroup::MAGIC)?;
1704        writer.write_u32::<LittleEndian>(ColumnGroup::VERSION)?;
1705
1706        let sequence = self.next_sequence.load(Ordering::SeqCst);
1707
1708        // Merge each hot column
1709        for col_name in hot_columns {
1710            let start_offset = writer.stream_position()?;
1711
1712            // Collect column data from all segments
1713            let mut merged_data = Vec::new();
1714            let mut row_count = 0u64;
1715
1716            for segment in segments {
1717                if let Some(_col_idx) = segment.column_offsets.get(col_name) {
1718                    // Read column data from segment
1719                    // In production, this would do proper merge-sort
1720                    merged_data.extend_from_slice(&[0u8; 0]); // Placeholder
1721                    row_count += segment.row_count;
1722                }
1723            }
1724
1725            // Write merged column
1726            // In production, this would write proper column format
1727            writer.write_u64::<LittleEndian>(row_count)?;
1728            writer.write_all(&merged_data)?;
1729
1730            let end_offset = writer.stream_position()?;
1731
1732            // Create stripe reference
1733            let stripe_ref = ColumnStripeRef::new(
1734                1, // L1
1735                sequence,
1736                col_name.clone(),
1737                start_offset,
1738                end_offset - start_offset,
1739                row_count,
1740            );
1741            result.insert(col_name.clone(), stripe_ref);
1742        }
1743
1744        writer.flush()?;
1745        Ok(result)
1746    }
1747
1748    /// Read specific column stripes for a query (Task 3)
1749    ///
1750    /// Only reads the requested columns, reducing I/O by (1 - k/K)
1751    /// where k = requested columns, K = total columns
1752    pub fn scan_columns(
1753        &self,
1754        column_names: &[&str],
1755        row_range: Option<(RowId, RowId)>,
1756    ) -> Result<Vec<Vec<u8>>> {
1757        let mut results = Vec::new();
1758
1759        // Look up stripe references for requested columns
1760        let descriptors = self.segment_descriptors.read();
1761
1762        for (_seg_id, descriptor) in descriptors.iter() {
1763            // Check row range overlap if specified
1764            if let Some((min, max)) = row_range
1765                && (descriptor.max_row_id < min || descriptor.min_row_id > max)
1766            {
1767                continue;
1768            }
1769
1770            // Read only requested columns
1771            for col_name in column_names {
1772                if let Some(stripe_ref) = descriptor.col_refs.get(*col_name) {
1773                    // Read stripe data
1774                    let data = self.read_column_stripe(stripe_ref)?;
1775                    results.push(data);
1776                }
1777            }
1778        }
1779
1780        Ok(results)
1781    }
1782
1783    /// Read a single column stripe from disk
1784    fn read_column_stripe(&self, stripe_ref: &ColumnStripeRef) -> Result<Vec<u8>> {
1785        use std::fs::File;
1786        use std::io::{Read, Seek, SeekFrom};
1787
1788        // Construct file path from level and segment_id
1789        let file_path = self.path.join(format!(
1790            "L{}_seq{}.sst",
1791            stripe_ref.level, stripe_ref.segment_id
1792        ));
1793
1794        let mut file = File::open(&file_path)?;
1795        file.seek(SeekFrom::Start(stripe_ref.offset))?;
1796
1797        let mut data = vec![0u8; stripe_ref.length as usize];
1798        file.read_exact(&mut data)?;
1799
1800        Ok(data)
1801    }
1802
1803    /// Get compaction statistics
1804    pub fn compaction_stats(&self) -> CompactionStats {
1805        self.compaction_stats.read().clone()
1806    }
1807
1808    /// Trigger manual compaction
1809    ///
1810    /// This compacts L0 segments into L1 using the temperature-aware strategy
1811    pub fn compact(&self) -> Result<()> {
1812        self.compact_l0()
1813    }
1814
1815    /// Get column temperatures for monitoring
1816    pub fn column_temperatures(&self) -> Vec<ColumnTemperature> {
1817        self.temperature_tracker.get_all_temperatures()
1818    }
1819
1820    /// Scan a range of row IDs
1821    ///
1822    /// Returns all rows in the range [start, end] from all sources
1823    /// (memtable, immutable memtables, SSTables)
1824    #[allow(clippy::type_complexity)]
1825    pub fn scan_range(
1826        &self,
1827        start: RowId,
1828        end: RowId,
1829    ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1830        let mut results = Vec::new();
1831        let mut seen = std::collections::HashSet::new();
1832
1833        // 1. Scan active memtable
1834        {
1835            let memtable = self.active_memtable.read();
1836            for (row_id, values) in memtable.scan_range(start, end) {
1837                if seen.insert(row_id) {
1838                    results.push((row_id, values));
1839                }
1840            }
1841        }
1842
1843        // 2. Scan immutable memtables
1844        {
1845            let immutable = self.immutable_memtables.read();
1846            for memtable in immutable.iter().rev() {
1847                for (row_id, values) in memtable.scan_range(start, end) {
1848                    if seen.insert(row_id) {
1849                        results.push((row_id, values));
1850                    }
1851                }
1852            }
1853        }
1854
1855        // 3. Scan SSTables (would need to iterate over all, using index)
1856        // For now, focus on memtable access; SSTable scan is more complex
1857
1858        // Sort by row_id
1859        results.sort_by_key(|(id, _)| *id);
1860
1861        Ok(results)
1862    }
1863
1864    /// Scan specific columns for a range of row IDs (columnar optimization)
1865    ///
1866    /// This achieves 80% I/O reduction when reading 20% of columns
1867    #[allow(clippy::type_complexity)]
1868    pub fn scan_columns_range(
1869        &self,
1870        start: RowId,
1871        end: RowId,
1872        col_indices: &[usize],
1873    ) -> Result<Vec<(RowId, Vec<Option<Vec<u8>>>)>> {
1874        let mut results = Vec::new();
1875        let mut seen = std::collections::HashSet::new();
1876
1877        // 1. Scan active memtable
1878        {
1879            let memtable = self.active_memtable.read();
1880            let row_ids = memtable.row_ids.read();
1881
1882            for (&row_id, _) in row_ids.range(start..=end) {
1883                if seen.insert(row_id)
1884                    && let Some(values) = memtable.get_columns(row_id, col_indices)
1885                {
1886                    results.push((row_id, values));
1887                }
1888            }
1889        }
1890
1891        // 2. Scan immutable memtables
1892        {
1893            let immutable = self.immutable_memtables.read();
1894            for memtable in immutable.iter().rev() {
1895                let row_ids = memtable.row_ids.read();
1896                for (&row_id, _) in row_ids.range(start..=end) {
1897                    if seen.insert(row_id)
1898                        && let Some(values) = memtable.get_columns(row_id, col_indices)
1899                    {
1900                        results.push((row_id, values));
1901                    }
1902                }
1903            }
1904        }
1905
1906        // Sort by row_id
1907        results.sort_by_key(|(id, _)| *id);
1908
1909        Ok(results)
1910    }
1911
1912    /// Get statistics
1913    pub fn stats(&self) -> LscsStats {
1914        let active = self.active_memtable.read();
1915        let immutable = self.immutable_memtables.read();
1916        let groups = self.column_groups.read();
1917
1918        let mut level_sizes = vec![0u64; self.config.num_levels];
1919        let mut disk_bytes = 0u64;
1920
1921        for (i, level) in groups.iter().enumerate() {
1922            for group in level {
1923                level_sizes[i] += group.row_count;
1924                // Calculate disk bytes from SST file sizes
1925                if let Ok(metadata) = std::fs::metadata(&group.path) {
1926                    disk_bytes += metadata.len();
1927                }
1928            }
1929        }
1930
1931        LscsStats {
1932            active_memtable_bytes: active.memory_bytes(),
1933            immutable_memtables: immutable.len(),
1934            level_row_counts: level_sizes,
1935            next_row_id: self.next_row_id.load(Ordering::SeqCst),
1936            disk_bytes,
1937        }
1938    }
1939
1940    /// Get WAL reference
1941    pub fn wal(&self) -> &Arc<TxnWal> {
1942        &self.wal
1943    }
1944}
1945
1946/// LSCS statistics
1947#[derive(Debug, Clone)]
1948pub struct LscsStats {
1949    /// Active memtable memory usage
1950    pub active_memtable_bytes: usize,
1951    /// Number of immutable memtables pending flush
1952    pub immutable_memtables: usize,
1953    /// Row count per level
1954    pub level_row_counts: Vec<u64>,
1955    /// Next row ID to be assigned
1956    pub next_row_id: u64,
1957    /// Total disk bytes used by SST files
1958    pub disk_bytes: u64,
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963    use super::*;
1964    use tempfile::tempdir;
1965
1966    fn test_schema() -> TableSchema {
1967        TableSchema {
1968            name: "users".to_string(),
1969            columns: vec![
1970                ColumnDef {
1971                    name: "id".to_string(),
1972                    col_type: ColumnType::UInt64,
1973                    nullable: false,
1974                },
1975                ColumnDef {
1976                    name: "name".to_string(),
1977                    col_type: ColumnType::Text,
1978                    nullable: false,
1979                },
1980                ColumnDef {
1981                    name: "score".to_string(),
1982                    col_type: ColumnType::Float64,
1983                    nullable: true,
1984                },
1985            ],
1986        }
1987    }
1988
1989    #[test]
1990    fn test_columnar_memtable_insert() {
1991        let schema = test_schema();
1992        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
1993
1994        let id: u64 = 1;
1995        let name = "Alice";
1996        let score: f64 = 95.5;
1997
1998        memtable
1999            .insert(
2000                1,
2001                &[
2002                    Some(&id.to_le_bytes()),
2003                    Some(name.as_bytes()),
2004                    Some(&score.to_le_bytes()),
2005                ],
2006            )
2007            .unwrap();
2008
2009        assert_eq!(memtable.row_count(), 1);
2010    }
2011
2012    #[test]
2013    fn test_columnar_memtable_with_nulls() {
2014        let schema = test_schema();
2015        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2016
2017        let id: u64 = 1;
2018        let name = "Bob";
2019
2020        // Score is null
2021        memtable
2022            .insert(1, &[Some(&id.to_le_bytes()), Some(name.as_bytes()), None])
2023            .unwrap();
2024
2025        assert_eq!(memtable.row_count(), 1);
2026    }
2027
2028    #[test]
2029    fn test_lscs_basic() {
2030        let dir = tempdir().unwrap();
2031        let schema = test_schema();
2032        let config = LscsConfig {
2033            memtable_size: 1024,
2034            ..Default::default()
2035        };
2036
2037        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2038
2039        let id: u64 = 1;
2040        let name = "Charlie";
2041        let score: f64 = 87.2;
2042
2043        let row_id = lscs
2044            .insert(&[
2045                Some(&id.to_le_bytes()),
2046                Some(name.as_bytes()),
2047                Some(&score.to_le_bytes()),
2048            ])
2049            .unwrap();
2050
2051        assert_eq!(row_id, 1);
2052
2053        let stats = lscs.stats();
2054        assert!(stats.active_memtable_bytes > 0);
2055    }
2056
2057    #[test]
2058    fn test_column_group_write() {
2059        let dir = tempfile::tempdir().unwrap();
2060        let schema = TableSchema::new(
2061            "users".to_string(),
2062            vec![
2063                ColumnDef {
2064                    name: "id".to_string(),
2065                    col_type: ColumnType::UInt64,
2066                    nullable: false,
2067                },
2068                ColumnDef {
2069                    name: "name".to_string(),
2070                    col_type: ColumnType::Text,
2071                    nullable: false,
2072                },
2073                ColumnDef {
2074                    name: "score".to_string(),
2075                    col_type: ColumnType::Float64,
2076                    nullable: true,
2077                },
2078            ],
2079        )
2080        .with_mvcc(); // Add MVCC columns
2081
2082        let memtable = ColumnarMemtable::new(schema.clone(), 1024 * 1024);
2083
2084        // Add rows using the public insert API
2085        // Row 1: Active
2086        memtable
2087            .insert(
2088                1,
2089                &[
2090                    Some(&1u64.to_le_bytes()),    // id
2091                    Some(b"Alice"),               // name
2092                    Some(&95.5f64.to_le_bytes()), // score
2093                    Some(&100u64.to_le_bytes()),  // __txn_start
2094                    Some(&0u64.to_le_bytes()),    // __txn_end
2095                ],
2096            )
2097            .unwrap();
2098
2099        // Row 2: Deleted
2100        memtable
2101            .insert(
2102                2,
2103                &[
2104                    Some(&2u64.to_le_bytes()),    // id
2105                    Some(b"Bob"),                 // name
2106                    Some(&87.2f64.to_le_bytes()), // score
2107                    Some(&100u64.to_le_bytes()),  // __txn_start
2108                    Some(&200u64.to_le_bytes()),  // __txn_end (deleted at 200)
2109                ],
2110            )
2111            .unwrap();
2112
2113        let cg = ColumnGroup::from_memtable(dir.path(), &memtable, 0, 1).unwrap();
2114        let file_path = cg.file_path();
2115        assert!(file_path.exists());
2116        assert!(file_path.extension().unwrap() == "sst");
2117
2118        // Verify we can open it back
2119        let cg_opened = ColumnGroup::open(file_path.to_path_buf(), schema, 0, 1).unwrap();
2120        assert_eq!(cg_opened.column_offsets.len(), 5); // 3 user + 2 mvcc
2121
2122        // Verify LSI is present
2123        assert!(cg_opened.lsi.is_some());
2124        let lsi = cg_opened.lsi.as_ref().unwrap();
2125        assert!(lsi.stats().num_keys > 0);
2126    }
2127
2128    #[test]
2129    fn test_memtable_get() {
2130        let schema = test_schema();
2131        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2132
2133        // Insert some rows
2134        let id1: u64 = 1;
2135        let name1 = "Alice";
2136        let score1: f64 = 95.5;
2137        memtable
2138            .insert(
2139                1,
2140                &[
2141                    Some(&id1.to_le_bytes()),
2142                    Some(name1.as_bytes()),
2143                    Some(&score1.to_le_bytes()),
2144                ],
2145            )
2146            .unwrap();
2147
2148        let id2: u64 = 2;
2149        let name2 = "Bob";
2150        memtable
2151            .insert(
2152                2,
2153                &[
2154                    Some(&id2.to_le_bytes()),
2155                    Some(name2.as_bytes()),
2156                    None, // null score
2157                ],
2158            )
2159            .unwrap();
2160
2161        // Test get by row ID
2162        let row1 = memtable.get(1).unwrap();
2163        assert_eq!(row1.len(), 3);
2164        assert_eq!(
2165            u64::from_le_bytes(row1[0].as_ref().unwrap()[..].try_into().unwrap()),
2166            1
2167        );
2168        assert_eq!(
2169            std::str::from_utf8(row1[1].as_ref().unwrap()).unwrap(),
2170            "Alice"
2171        );
2172
2173        let row2 = memtable.get(2).unwrap();
2174        assert!(row2[2].is_none()); // null score
2175
2176        // Test get non-existent row
2177        assert!(memtable.get(999).is_none());
2178    }
2179
2180    #[test]
2181    fn test_memtable_scan_range() {
2182        let schema = test_schema();
2183        let memtable = ColumnarMemtable::new(schema, 1024 * 1024);
2184
2185        // Insert rows with different row IDs
2186        for i in 1..=10 {
2187            memtable
2188                .insert(
2189                    i,
2190                    &[
2191                        Some(&i.to_le_bytes()),
2192                        Some(format!("User{}", i).as_bytes()),
2193                        Some(&((i as f64) * 10.0).to_le_bytes()),
2194                    ],
2195                )
2196                .unwrap();
2197        }
2198
2199        // Scan range [3, 7]
2200        let results = memtable.scan_range(3, 7);
2201        assert_eq!(results.len(), 5);
2202
2203        // Verify row IDs are in range
2204        for (row_id, _) in &results {
2205            assert!(*row_id >= 3 && *row_id <= 7);
2206        }
2207    }
2208
2209    #[test]
2210    fn test_lscs_get() {
2211        let dir = tempdir().unwrap();
2212        let schema = test_schema();
2213        let config = LscsConfig {
2214            memtable_size: 64 * 1024 * 1024,
2215            ..Default::default()
2216        };
2217
2218        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2219
2220        // Insert a row
2221        let id: u64 = 42;
2222        let name = "TestUser";
2223        let score: f64 = 99.9;
2224
2225        let row_id = lscs
2226            .insert(&[
2227                Some(&id.to_le_bytes()),
2228                Some(name.as_bytes()),
2229                Some(&score.to_le_bytes()),
2230            ])
2231            .unwrap();
2232
2233        // Get the row back
2234        let result = lscs.get(row_id).unwrap();
2235        assert!(result.is_some());
2236
2237        let values = result.unwrap();
2238        assert_eq!(
2239            u64::from_le_bytes(values[0].as_ref().unwrap()[..].try_into().unwrap()),
2240            42
2241        );
2242        assert_eq!(
2243            std::str::from_utf8(values[1].as_ref().unwrap()).unwrap(),
2244            "TestUser"
2245        );
2246    }
2247
2248    #[test]
2249    fn test_lscs_fsync() {
2250        let dir = tempdir().unwrap();
2251        let schema = test_schema();
2252        let config = LscsConfig::default();
2253
2254        let lscs = Lscs::new(dir.path().to_path_buf(), schema, config).unwrap();
2255
2256        // Insert some data
2257        for i in 1..=5 {
2258            lscs.insert(&[
2259                Some(&(i as u64).to_le_bytes()),
2260                Some(format!("User{}", i).as_bytes()),
2261                Some(&((i as f64) * 10.0).to_le_bytes()),
2262            ])
2263            .unwrap();
2264        }
2265
2266        // Fsync should not panic
2267        lscs.fsync().unwrap();
2268
2269        // Data should still be accessible after fsync
2270        let result = lscs.get(1).unwrap();
2271        assert!(result.is_some());
2272    }
2273}