nodedb_columnar/compaction/
segment.rs1use std::sync::Arc;
6
7use nodedb_mem::{EngineId, MemoryGovernor};
8use nodedb_types::columnar::ColumnarSchema;
9
10use crate::delete_bitmap::DeleteBitmap;
11use crate::error::ColumnarError;
12use crate::memtable::ColumnarMemtable;
13use crate::reader::SegmentReader;
14use crate::writer::SegmentWriter;
15
16use super::extract::extract_row_value;
17
18pub const DEFAULT_DELETE_RATIO_THRESHOLD: f64 = 0.2;
20
21pub struct CompactionResult {
23 pub segment: Option<Vec<u8>>,
25 pub live_rows: usize,
27 pub removed_rows: usize,
29}
30
31pub fn compact_segment(
45 segment_data: &[u8],
46 deletes: &DeleteBitmap,
47 schema: &ColumnarSchema,
48 profile_tag: u8,
49 governor: Option<&Arc<MemoryGovernor>>,
50 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
51) -> Result<CompactionResult, ColumnarError> {
52 let reader = SegmentReader::open(segment_data)?;
53 let total_rows = reader.row_count() as usize;
54 let deleted = deletes.deleted_count() as usize;
55 let live = total_rows.saturating_sub(deleted);
56
57 if live == 0 {
58 return Ok(CompactionResult {
59 segment: None,
60 live_rows: 0,
61 removed_rows: total_rows,
62 });
63 }
64
65 let col_count = reader.column_count();
67 let _cols_guard = governor
69 .map(|g| {
70 g.reserve(
71 EngineId::Columnar,
72 col_count * std::mem::size_of::<usize>() * 3,
73 )
74 })
75 .transpose()?;
76 let mut decoded_cols = Vec::with_capacity(col_count);
78 for i in 0..col_count {
79 decoded_cols.push(reader.read_column(i)?);
80 }
81
82 let mut memtable = ColumnarMemtable::new(schema);
84 let col_len = schema.columns.len();
85 let _row_guard = governor
86 .map(|g| {
87 g.reserve(
88 EngineId::Columnar,
89 col_len * std::mem::size_of::<usize>() * 3,
90 )
91 })
92 .transpose()?;
93 let mut row_values = Vec::with_capacity(col_len);
95
96 for row_idx in 0..total_rows {
97 if deletes.is_deleted(row_idx as u32) {
98 continue;
99 }
100
101 row_values.clear();
102 for (col_idx, decoded) in decoded_cols.iter().enumerate() {
103 let value = extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
104 row_values.push(value);
105 }
106
107 memtable.append_row(&row_values)?;
108 }
109
110 let (schema, columns, row_count) = memtable.drain();
111 let writer = match governor {
112 Some(g) => SegmentWriter::with_governor(profile_tag, Arc::clone(g)),
113 None => SegmentWriter::new(profile_tag),
114 };
115 let new_segment = writer.write_segment(&schema, &columns, row_count, kek)?;
116
117 Ok(CompactionResult {
118 segment: Some(new_segment),
119 live_rows: row_count,
120 removed_rows: deleted,
121 })
122}