Skip to main content

velesdb_core/column_store/
vacuum.rs

1//! Vacuum and compaction operations for `ColumnStore`.
2//!
3//! Extracted from `mod.rs` for maintainability (04-06 module splitting).
4//! Handles tombstone removal, column compaction, and deletion bitmap operations.
5
6use super::types::{TypedColumn, VacuumConfig, VacuumStats};
7use super::ColumnStore;
8
9use roaring::RoaringBitmap;
10use std::collections::HashMap;
11
12impl ColumnStore {
13    /// Runs vacuum to remove tombstones and compact data.
14    ///
15    /// This operation removes deleted rows from the column store, reclaiming
16    /// space and improving query performance. The operation is done in-place
17    /// by building new column vectors without the deleted rows.
18    ///
19    /// # Arguments
20    ///
21    /// * `_config` - Vacuum configuration (batch_size, sync options)
22    ///
23    /// # Returns
24    ///
25    /// Statistics about the vacuum operation.
26    pub fn vacuum(&mut self, _config: VacuumConfig) -> VacuumStats {
27        let start = std::time::Instant::now();
28        let tombstones_found = self.deleted_rows.len();
29
30        // Phase 1: Early exit if no tombstones
31        if tombstones_found == 0 {
32            return VacuumStats {
33                tombstones_found: 0,
34                completed: true,
35                duration_ms: start.elapsed().as_millis() as u64,
36                ..Default::default()
37            };
38        }
39
40        let mut stats = VacuumStats {
41            tombstones_found,
42            ..Default::default()
43        };
44
45        // Phase 2: Build index mapping (old_idx -> new_idx)
46        let mut old_to_new: HashMap<usize, usize> = HashMap::new();
47        let mut new_idx = 0;
48        for old_idx in 0..self.row_count {
49            if !self.deleted_rows.contains(&old_idx) {
50                old_to_new.insert(old_idx, new_idx);
51                new_idx += 1;
52            }
53        }
54        let new_row_count = new_idx;
55
56        // Phase 3: Compact each column
57        for column in self.columns.values_mut() {
58            let (new_col, bytes) = Self::compact_column(column, &self.deleted_rows);
59            stats.bytes_reclaimed += bytes;
60            *column = new_col;
61        }
62
63        // Phase 4: Update primary index
64        if self.primary_key_column.is_some() {
65            let mut new_primary_index: HashMap<i64, usize> = HashMap::new();
66            let mut new_row_idx_to_pk: HashMap<usize, i64> = HashMap::new();
67
68            for (pk, old_idx) in &self.primary_index {
69                if let Some(&new_idx) = old_to_new.get(old_idx) {
70                    new_primary_index.insert(*pk, new_idx);
71                    new_row_idx_to_pk.insert(new_idx, *pk);
72                }
73            }
74
75            self.primary_index = new_primary_index;
76            self.row_idx_to_pk = new_row_idx_to_pk;
77        }
78
79        // Phase 5: Update row expiry mapping
80        let mut new_row_expiry: HashMap<usize, u64> = HashMap::new();
81        for (old_idx, expiry) in &self.row_expiry {
82            if let Some(&new_idx) = old_to_new.get(old_idx) {
83                new_row_expiry.insert(new_idx, *expiry);
84            }
85        }
86        self.row_expiry = new_row_expiry;
87
88        // Phase 6: Clear tombstones and update row count
89        stats.tombstones_removed = self.deleted_rows.len();
90        self.deleted_rows.clear();
91        self.deletion_bitmap.clear(); // EPIC-043 US-002: Also clear RoaringBitmap
92        self.row_count = new_row_count;
93
94        stats.completed = true;
95        stats.duration_ms = start.elapsed().as_millis() as u64;
96        stats
97    }
98
99    /// Compacts a single column by removing deleted rows.
100    fn compact_column(
101        column: &TypedColumn,
102        deleted: &rustc_hash::FxHashSet<usize>,
103    ) -> (TypedColumn, u64) {
104        let mut bytes_reclaimed = 0u64;
105
106        match column {
107            TypedColumn::Int(data) => {
108                let mut new_data = Vec::with_capacity(data.len() - deleted.len());
109                for (idx, value) in data.iter().enumerate() {
110                    if deleted.contains(&idx) {
111                        bytes_reclaimed += 8; // i64 size
112                    } else {
113                        new_data.push(*value);
114                    }
115                }
116                (TypedColumn::Int(new_data), bytes_reclaimed)
117            }
118            TypedColumn::Float(data) => {
119                let mut new_data = Vec::with_capacity(data.len() - deleted.len());
120                for (idx, value) in data.iter().enumerate() {
121                    if deleted.contains(&idx) {
122                        bytes_reclaimed += 8; // f64 size
123                    } else {
124                        new_data.push(*value);
125                    }
126                }
127                (TypedColumn::Float(new_data), bytes_reclaimed)
128            }
129            TypedColumn::String(data) => {
130                let mut new_data = Vec::with_capacity(data.len() - deleted.len());
131                for (idx, value) in data.iter().enumerate() {
132                    if deleted.contains(&idx) {
133                        bytes_reclaimed += 4; // StringId size
134                    } else {
135                        new_data.push(*value);
136                    }
137                }
138                (TypedColumn::String(new_data), bytes_reclaimed)
139            }
140            TypedColumn::Bool(data) => {
141                let mut new_data = Vec::with_capacity(data.len() - deleted.len());
142                for (idx, value) in data.iter().enumerate() {
143                    if deleted.contains(&idx) {
144                        bytes_reclaimed += 1; // bool size
145                    } else {
146                        new_data.push(*value);
147                    }
148                }
149                (TypedColumn::Bool(new_data), bytes_reclaimed)
150            }
151        }
152    }
153
154    /// Returns whether vacuum is recommended based on tombstone ratio.
155    ///
156    /// # Arguments
157    ///
158    /// * `threshold` - Ratio of deleted rows to trigger vacuum (0.0-1.0)
159    #[must_use]
160    pub fn should_vacuum(&self, threshold: f64) -> bool {
161        if self.row_count == 0 {
162            return false;
163        }
164        let ratio = self.deleted_rows.len() as f64 / self.row_count as f64;
165        ratio >= threshold
166    }
167
168    // =========================================================================
169    // EPIC-043 US-002: RoaringBitmap Filtering
170    // =========================================================================
171
172    /// Checks if a row is deleted using RoaringBitmap (O(1) lookup).
173    ///
174    /// This is faster than FxHashSet for large deletion sets.
175    #[must_use]
176    #[inline]
177    pub fn is_row_deleted_bitmap(&self, row_idx: usize) -> bool {
178        if let Ok(idx) = u32::try_from(row_idx) {
179            self.deletion_bitmap.contains(idx)
180        } else {
181            // Fallback to FxHashSet for indices > u32::MAX
182            self.deleted_rows.contains(&row_idx)
183        }
184    }
185
186    /// Returns an iterator over live (non-deleted) row indices.
187    ///
188    /// Uses RoaringBitmap for efficient filtering.
189    pub fn live_row_indices(&self) -> impl Iterator<Item = usize> + '_ {
190        (0..self.row_count).filter(|&idx| !self.is_row_deleted_bitmap(idx))
191    }
192
193    /// Returns the deletion bitmap for advanced filtering operations.
194    #[must_use]
195    pub fn deletion_bitmap(&self) -> &RoaringBitmap {
196        &self.deletion_bitmap
197    }
198
199    /// Returns the number of deleted rows using the bitmap (O(1)).
200    #[must_use]
201    pub fn deleted_count_bitmap(&self) -> u64 {
202        self.deletion_bitmap.len()
203    }
204}