Skip to main content

nodedb_columnar/
compaction.rs

1//! Segment compaction: merge segments, drop deleted rows, re-encode.
2//!
3//! Compaction reads one or more source segments along with their delete
4//! bitmaps, filters out deleted rows, and writes a new compacted segment.
5//! The caller is responsible for the atomic metadata swap (WAL commit marker
6//! → swap segment references → delete old files).
7//!
8//! Triggered when a segment's delete ratio exceeds the threshold (default 20%)
9//! or when the segment count exceeds a limit.
10
11use nodedb_types::columnar::ColumnarSchema;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::reader::{DecodedColumn, SegmentReader};
17use crate::writer::SegmentWriter;
18
19/// Default compaction threshold: compact when >20% of rows are deleted.
20pub const DEFAULT_DELETE_RATIO_THRESHOLD: f64 = 0.2;
21
22/// Result of a compaction operation.
23pub struct CompactionResult {
24    /// The new compacted segment bytes. Empty if all rows were deleted.
25    pub segment: Option<Vec<u8>>,
26    /// Number of live rows in the new segment.
27    pub live_rows: usize,
28    /// Number of rows removed (deleted).
29    pub removed_rows: usize,
30}
31
32/// Compact a single segment by removing deleted rows.
33///
34/// Reads the segment, skips rows marked in the delete bitmap, and writes
35/// a new segment with only live rows. Returns `None` segment if all rows
36/// were deleted.
37pub fn compact_segment(
38    segment_data: &[u8],
39    deletes: &DeleteBitmap,
40    schema: &ColumnarSchema,
41    profile_tag: u8,
42) -> Result<CompactionResult, ColumnarError> {
43    let reader = SegmentReader::open(segment_data)?;
44    let total_rows = reader.row_count() as usize;
45    let deleted = deletes.deleted_count() as usize;
46    let live = total_rows.saturating_sub(deleted);
47
48    if live == 0 {
49        return Ok(CompactionResult {
50            segment: None,
51            live_rows: 0,
52            removed_rows: total_rows,
53        });
54    }
55
56    // Read all columns without delete masking — we'll filter manually.
57    let col_count = reader.column_count();
58    let mut decoded_cols = Vec::with_capacity(col_count);
59    for i in 0..col_count {
60        decoded_cols.push(reader.read_column(i)?);
61    }
62
63    // Build a new memtable with only live rows.
64    let mut memtable = ColumnarMemtable::new(schema);
65    let mut row_values = Vec::with_capacity(schema.columns.len());
66
67    for row_idx in 0..total_rows {
68        if deletes.is_deleted(row_idx as u32) {
69            continue;
70        }
71
72        row_values.clear();
73        for (col_idx, decoded) in decoded_cols.iter().enumerate() {
74            let value = extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
75            row_values.push(value);
76        }
77
78        memtable.append_row(&row_values)?;
79    }
80
81    let (schema, columns, row_count) = memtable.drain();
82    let writer = SegmentWriter::new(profile_tag);
83    let new_segment = writer.write_segment(&schema, &columns, row_count)?;
84
85    Ok(CompactionResult {
86        segment: Some(new_segment),
87        live_rows: row_count,
88        removed_rows: deleted,
89    })
90}
91
92/// Compact multiple segments into a single merged segment.
93///
94/// Reads all source segments, skips deleted rows from each, and writes
95/// a single merged output segment. This reduces segment count and reclaims
96/// space from deleted rows across all sources.
97pub fn compact_segments(
98    segments: &[(&[u8], &DeleteBitmap)],
99    schema: &ColumnarSchema,
100    profile_tag: u8,
101) -> Result<CompactionResult, ColumnarError> {
102    let mut memtable = ColumnarMemtable::new(schema);
103    let mut total_removed = 0usize;
104    let mut row_values = Vec::with_capacity(schema.columns.len());
105
106    for &(segment_data, deletes) in segments {
107        let reader = SegmentReader::open(segment_data)?;
108        let total_rows = reader.row_count() as usize;
109
110        let mut decoded_cols = Vec::with_capacity(reader.column_count());
111        for i in 0..reader.column_count() {
112            decoded_cols.push(reader.read_column(i)?);
113        }
114
115        for row_idx in 0..total_rows {
116            if deletes.is_deleted(row_idx as u32) {
117                total_removed += 1;
118                continue;
119            }
120
121            row_values.clear();
122            for (col_idx, decoded) in decoded_cols.iter().enumerate() {
123                let value =
124                    extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
125                row_values.push(value);
126            }
127
128            memtable.append_row(&row_values)?;
129        }
130    }
131
132    let live_rows = memtable.row_count();
133    if live_rows == 0 {
134        return Ok(CompactionResult {
135            segment: None,
136            live_rows: 0,
137            removed_rows: total_removed,
138        });
139    }
140
141    let (schema, columns, row_count) = memtable.drain();
142    let writer = SegmentWriter::new(profile_tag);
143    let new_segment = writer.write_segment(&schema, &columns, row_count)?;
144
145    Ok(CompactionResult {
146        segment: Some(new_segment),
147        live_rows: row_count,
148        removed_rows: total_removed,
149    })
150}
151
152/// Extract a single row value from a DecodedColumn.
153fn extract_row_value(
154    col: &DecodedColumn,
155    row_idx: usize,
156    col_type: &nodedb_types::columnar::ColumnType,
157) -> nodedb_types::value::Value {
158    use nodedb_types::value::Value;
159
160    match col {
161        DecodedColumn::Int64 { values, valid } => {
162            if !valid[row_idx] {
163                Value::Null
164            } else {
165                Value::Integer(values[row_idx])
166            }
167        }
168        DecodedColumn::Float64 { values, valid } => {
169            if !valid[row_idx] {
170                Value::Null
171            } else {
172                Value::Float(values[row_idx])
173            }
174        }
175        DecodedColumn::Timestamp { values, valid } => {
176            if !valid[row_idx] {
177                Value::Null
178            } else {
179                Value::Integer(values[row_idx]) // Timestamp stored as micros.
180            }
181        }
182        DecodedColumn::Bool { values, valid } => {
183            if !valid[row_idx] {
184                Value::Null
185            } else {
186                Value::Bool(values[row_idx])
187            }
188        }
189        DecodedColumn::Binary {
190            data,
191            offsets,
192            valid,
193        } => {
194            if !valid[row_idx] {
195                return Value::Null;
196            }
197            let start = offsets[row_idx] as usize;
198            let end = offsets[row_idx + 1] as usize;
199            let bytes = &data[start..end];
200
201            match col_type {
202                nodedb_types::columnar::ColumnType::String => {
203                    Value::String(String::from_utf8_lossy(bytes).into_owned())
204                }
205                nodedb_types::columnar::ColumnType::Bytes
206                | nodedb_types::columnar::ColumnType::Geometry => Value::Bytes(bytes.to_vec()),
207                _ => Value::Bytes(bytes.to_vec()),
208            }
209        }
210        DecodedColumn::DictEncoded {
211            ids,
212            dictionary,
213            valid,
214        } => {
215            if !valid[row_idx] {
216                return Value::Null;
217            }
218            let id = ids[row_idx] as usize;
219            if let Some(s) = dictionary.get(id) {
220                Value::String(s.clone())
221            } else {
222                Value::Null
223            }
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
231    use nodedb_types::value::Value;
232
233    use super::*;
234    use crate::memtable::ColumnarMemtable;
235    use crate::writer::SegmentWriter;
236
237    fn test_schema() -> ColumnarSchema {
238        ColumnarSchema::new(vec![
239            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
240            ColumnDef::required("name", ColumnType::String),
241            ColumnDef::nullable("score", ColumnType::Float64),
242        ])
243        .expect("valid")
244    }
245
246    fn write_segment(rows: usize) -> Vec<u8> {
247        let schema = test_schema();
248        let mut mt = ColumnarMemtable::new(&schema);
249        for i in 0..rows {
250            mt.append_row(&[
251                Value::Integer(i as i64),
252                Value::String(format!("user_{i}")),
253                if i % 3 == 0 {
254                    Value::Null
255                } else {
256                    Value::Float(i as f64 * 0.5)
257                },
258            ])
259            .expect("append");
260        }
261        let (schema, columns, row_count) = mt.drain();
262        SegmentWriter::plain()
263            .write_segment(&schema, &columns, row_count)
264            .expect("write")
265    }
266
267    #[test]
268    fn compact_removes_deleted_rows() {
269        let segment = write_segment(100);
270        let mut deletes = DeleteBitmap::new();
271
272        // Delete rows 0, 10, 20, ..., 90 (10 rows).
273        for i in (0..100).step_by(10) {
274            deletes.mark_deleted(i);
275        }
276
277        let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
278
279        assert_eq!(result.live_rows, 90);
280        assert_eq!(result.removed_rows, 10);
281        assert!(result.segment.is_some());
282
283        // Verify the compacted segment has correct row count.
284        let new_seg = result.segment.as_ref().expect("segment");
285        let reader = SegmentReader::open(new_seg).expect("open");
286        assert_eq!(reader.row_count(), 90);
287
288        // Verify that deleted rows are gone: row 0 (id=0) was deleted,
289        // so the first row should be id=1.
290        let col = reader.read_column(0).expect("read id");
291        match col {
292            DecodedColumn::Int64 { values, valid } => {
293                assert_eq!(values[0], 1); // First live row.
294                assert!(valid[0]);
295                // Row at index 8 should be id=9 (rows 0,10 deleted, so 1..9 = 9 rows, idx 8 = id 9).
296                assert_eq!(values[8], 9);
297            }
298            _ => panic!("expected Int64"),
299        }
300    }
301
302    #[test]
303    fn compact_all_deleted() {
304        let segment = write_segment(10);
305        let mut deletes = DeleteBitmap::new();
306        for i in 0..10 {
307            deletes.mark_deleted(i);
308        }
309
310        let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
311
312        assert_eq!(result.live_rows, 0);
313        assert_eq!(result.removed_rows, 10);
314        assert!(result.segment.is_none());
315    }
316
317    #[test]
318    fn compact_no_deletes() {
319        let segment = write_segment(50);
320        let deletes = DeleteBitmap::new();
321
322        let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
323
324        assert_eq!(result.live_rows, 50);
325        assert_eq!(result.removed_rows, 0);
326        assert!(result.segment.is_some());
327    }
328
329    #[test]
330    fn merge_multiple_segments() {
331        let seg1 = write_segment(50);
332        let seg2 = write_segment(30);
333
334        let mut del1 = DeleteBitmap::new();
335        del1.mark_deleted_batch(&[0, 1, 2]); // Delete 3 from seg1.
336
337        let del2 = DeleteBitmap::new(); // No deletes from seg2.
338
339        let result =
340            compact_segments(&[(&seg1, &del1), (&seg2, &del2)], &test_schema(), 0).expect("merge");
341
342        assert_eq!(result.live_rows, 77); // 50-3 + 30 = 77.
343        assert_eq!(result.removed_rows, 3);
344        assert!(result.segment.is_some());
345
346        let new_seg = result.segment.as_ref().expect("segment");
347        let reader = SegmentReader::open(new_seg).expect("open");
348        assert_eq!(reader.row_count(), 77);
349    }
350
351    #[test]
352    fn compact_preserves_string_data() {
353        let segment = write_segment(20);
354        let mut deletes = DeleteBitmap::new();
355        deletes.mark_deleted(0); // Delete first row.
356
357        let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
358        let new_seg = result.segment.as_ref().expect("segment");
359        let reader = SegmentReader::open(new_seg).expect("open");
360
361        // Read the name column (string).
362        let col = reader.read_column(1).expect("read name");
363        match col {
364            DecodedColumn::Binary {
365                data,
366                offsets,
367                valid,
368            } => {
369                // First row should be "user_1" (user_0 was deleted).
370                let start = offsets[0] as usize;
371                let end = offsets[1] as usize;
372                let first_name = std::str::from_utf8(&data[start..end]).expect("utf8");
373                assert_eq!(first_name, "user_1");
374                assert!(valid[0]);
375            }
376            _ => panic!("expected Binary"),
377        }
378    }
379}