Skip to main content

nodedb_columnar/compaction/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Single-segment compaction: drop deleted rows from one segment, write a new one.
4
5use std::sync::Arc;
6
7use nodedb_mem::{EngineId, MemoryGovernor};
8use nodedb_types::columnar::ColumnarSchema;
9
10use crate::delete_bitmap::DeleteBitmap;
11use crate::error::ColumnarError;
12use crate::memtable::ColumnarMemtable;
13use crate::reader::SegmentReader;
14use crate::writer::SegmentWriter;
15
16use super::extract::extract_row_value;
17
18/// Default compaction threshold: compact when >20% of rows are deleted.
19pub const DEFAULT_DELETE_RATIO_THRESHOLD: f64 = 0.2;
20
21/// Result of a compaction operation.
22pub struct CompactionResult {
23    /// The new compacted segment bytes. Empty if all rows were deleted.
24    pub segment: Option<Vec<u8>>,
25    /// Number of live rows in the new segment.
26    pub live_rows: usize,
27    /// Number of rows removed (deleted).
28    pub removed_rows: usize,
29}
30
31/// Compact a single segment by removing deleted rows.
32///
33/// Reads the segment, skips rows marked in the delete bitmap, and writes
34/// a new segment with only live rows. Returns `None` segment if all rows
35/// were deleted.
36///
37/// When `kek` is `Some`, the output segment is wrapped in an AES-256-GCM
38/// SEGC envelope. The input segment must be plaintext (the caller is
39/// responsible for decrypting before passing to this function).
40///
41/// `governor` is optional: when `Some`, working-buffer allocations are
42/// tracked against the `Columnar` engine budget. Pass `None` in embedded
43/// (Lite) deployments where no governor is configured.
44pub fn compact_segment(
45    segment_data: &[u8],
46    deletes: &DeleteBitmap,
47    schema: &ColumnarSchema,
48    profile_tag: u8,
49    governor: Option<&Arc<MemoryGovernor>>,
50    kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
51) -> Result<CompactionResult, ColumnarError> {
52    let reader = SegmentReader::open(segment_data)?;
53    let total_rows = reader.row_count() as usize;
54    let deleted = deletes.deleted_count() as usize;
55    let live = total_rows.saturating_sub(deleted);
56
57    if live == 0 {
58        return Ok(CompactionResult {
59            segment: None,
60            live_rows: 0,
61            removed_rows: total_rows,
62        });
63    }
64
65    // Read all columns without delete masking — we'll filter manually.
66    let col_count = reader.column_count();
67    // Reserve budget for the decoded-column pointer vec (each entry is a fat pointer).
68    let _cols_guard = governor
69        .map(|g| {
70            g.reserve(
71                EngineId::Columnar,
72                col_count * std::mem::size_of::<usize>() * 3,
73            )
74        })
75        .transpose()?;
76    // no-governor: governed by _cols_guard above; multi-line reserve call splits outside 5-line gate window
77    let mut decoded_cols = Vec::with_capacity(col_count);
78    for i in 0..col_count {
79        decoded_cols.push(reader.read_column(i)?);
80    }
81
82    // Build a new memtable with only live rows.
83    let mut memtable = ColumnarMemtable::new(schema);
84    let col_len = schema.columns.len();
85    let _row_guard = governor
86        .map(|g| {
87            g.reserve(
88                EngineId::Columnar,
89                col_len * std::mem::size_of::<usize>() * 3,
90            )
91        })
92        .transpose()?;
93    // no-governor: governed by _row_guard above; multi-line reserve call splits outside 5-line gate window
94    let mut row_values = Vec::with_capacity(col_len);
95
96    for row_idx in 0..total_rows {
97        if deletes.is_deleted(row_idx as u32) {
98            continue;
99        }
100
101        row_values.clear();
102        for (col_idx, decoded) in decoded_cols.iter().enumerate() {
103            let value = extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
104            row_values.push(value);
105        }
106
107        memtable.append_row(&row_values)?;
108    }
109
110    let (schema, columns, row_count) = memtable.drain();
111    let writer = match governor {
112        Some(g) => SegmentWriter::with_governor(profile_tag, Arc::clone(g)),
113        None => SegmentWriter::new(profile_tag),
114    };
115    let new_segment = writer.write_segment(&schema, &columns, row_count, kek)?;
116
117    Ok(CompactionResult {
118        segment: Some(new_segment),
119        live_rows: row_count,
120        removed_rows: deleted,
121    })
122}