1use nodedb_types::columnar::ColumnarSchema;
12
13use crate::delete_bitmap::DeleteBitmap;
14use crate::error::ColumnarError;
15use crate::memtable::ColumnarMemtable;
16use crate::reader::{DecodedColumn, SegmentReader};
17use crate::writer::SegmentWriter;
18
19pub const DEFAULT_DELETE_RATIO_THRESHOLD: f64 = 0.2;
21
22pub struct CompactionResult {
24 pub segment: Option<Vec<u8>>,
26 pub live_rows: usize,
28 pub removed_rows: usize,
30}
31
32pub fn compact_segment(
38 segment_data: &[u8],
39 deletes: &DeleteBitmap,
40 schema: &ColumnarSchema,
41 profile_tag: u8,
42) -> Result<CompactionResult, ColumnarError> {
43 let reader = SegmentReader::open(segment_data)?;
44 let total_rows = reader.row_count() as usize;
45 let deleted = deletes.deleted_count() as usize;
46 let live = total_rows.saturating_sub(deleted);
47
48 if live == 0 {
49 return Ok(CompactionResult {
50 segment: None,
51 live_rows: 0,
52 removed_rows: total_rows,
53 });
54 }
55
56 let col_count = reader.column_count();
58 let mut decoded_cols = Vec::with_capacity(col_count);
59 for i in 0..col_count {
60 decoded_cols.push(reader.read_column(i)?);
61 }
62
63 let mut memtable = ColumnarMemtable::new(schema);
65 let mut row_values = Vec::with_capacity(schema.columns.len());
66
67 for row_idx in 0..total_rows {
68 if deletes.is_deleted(row_idx as u32) {
69 continue;
70 }
71
72 row_values.clear();
73 for (col_idx, decoded) in decoded_cols.iter().enumerate() {
74 let value = extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
75 row_values.push(value);
76 }
77
78 memtable.append_row(&row_values)?;
79 }
80
81 let (schema, columns, row_count) = memtable.drain();
82 let writer = SegmentWriter::new(profile_tag);
83 let new_segment = writer.write_segment(&schema, &columns, row_count)?;
84
85 Ok(CompactionResult {
86 segment: Some(new_segment),
87 live_rows: row_count,
88 removed_rows: deleted,
89 })
90}
91
92pub fn compact_segments(
98 segments: &[(&[u8], &DeleteBitmap)],
99 schema: &ColumnarSchema,
100 profile_tag: u8,
101) -> Result<CompactionResult, ColumnarError> {
102 let mut memtable = ColumnarMemtable::new(schema);
103 let mut total_removed = 0usize;
104 let mut row_values = Vec::with_capacity(schema.columns.len());
105
106 for &(segment_data, deletes) in segments {
107 let reader = SegmentReader::open(segment_data)?;
108 let total_rows = reader.row_count() as usize;
109
110 let mut decoded_cols = Vec::with_capacity(reader.column_count());
111 for i in 0..reader.column_count() {
112 decoded_cols.push(reader.read_column(i)?);
113 }
114
115 for row_idx in 0..total_rows {
116 if deletes.is_deleted(row_idx as u32) {
117 total_removed += 1;
118 continue;
119 }
120
121 row_values.clear();
122 for (col_idx, decoded) in decoded_cols.iter().enumerate() {
123 let value =
124 extract_row_value(decoded, row_idx, &schema.columns[col_idx].column_type);
125 row_values.push(value);
126 }
127
128 memtable.append_row(&row_values)?;
129 }
130 }
131
132 let live_rows = memtable.row_count();
133 if live_rows == 0 {
134 return Ok(CompactionResult {
135 segment: None,
136 live_rows: 0,
137 removed_rows: total_removed,
138 });
139 }
140
141 let (schema, columns, row_count) = memtable.drain();
142 let writer = SegmentWriter::new(profile_tag);
143 let new_segment = writer.write_segment(&schema, &columns, row_count)?;
144
145 Ok(CompactionResult {
146 segment: Some(new_segment),
147 live_rows: row_count,
148 removed_rows: total_removed,
149 })
150}
151
152fn extract_row_value(
154 col: &DecodedColumn,
155 row_idx: usize,
156 col_type: &nodedb_types::columnar::ColumnType,
157) -> nodedb_types::value::Value {
158 use nodedb_types::value::Value;
159
160 match col {
161 DecodedColumn::Int64 { values, valid } => {
162 if !valid[row_idx] {
163 Value::Null
164 } else {
165 Value::Integer(values[row_idx])
166 }
167 }
168 DecodedColumn::Float64 { values, valid } => {
169 if !valid[row_idx] {
170 Value::Null
171 } else {
172 Value::Float(values[row_idx])
173 }
174 }
175 DecodedColumn::Timestamp { values, valid } => {
176 if !valid[row_idx] {
177 Value::Null
178 } else {
179 Value::Integer(values[row_idx]) }
181 }
182 DecodedColumn::Bool { values, valid } => {
183 if !valid[row_idx] {
184 Value::Null
185 } else {
186 Value::Bool(values[row_idx])
187 }
188 }
189 DecodedColumn::Binary {
190 data,
191 offsets,
192 valid,
193 } => {
194 if !valid[row_idx] {
195 return Value::Null;
196 }
197 let start = offsets[row_idx] as usize;
198 let end = offsets[row_idx + 1] as usize;
199 let bytes = &data[start..end];
200
201 match col_type {
202 nodedb_types::columnar::ColumnType::String => {
203 Value::String(String::from_utf8_lossy(bytes).into_owned())
204 }
205 nodedb_types::columnar::ColumnType::Bytes
206 | nodedb_types::columnar::ColumnType::Geometry => Value::Bytes(bytes.to_vec()),
207 _ => Value::Bytes(bytes.to_vec()),
208 }
209 }
210 DecodedColumn::DictEncoded {
211 ids,
212 dictionary,
213 valid,
214 } => {
215 if !valid[row_idx] {
216 return Value::Null;
217 }
218 let id = ids[row_idx] as usize;
219 if let Some(s) = dictionary.get(id) {
220 Value::String(s.clone())
221 } else {
222 Value::Null
223 }
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
231 use nodedb_types::value::Value;
232
233 use super::*;
234 use crate::memtable::ColumnarMemtable;
235 use crate::writer::SegmentWriter;
236
237 fn test_schema() -> ColumnarSchema {
238 ColumnarSchema::new(vec![
239 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
240 ColumnDef::required("name", ColumnType::String),
241 ColumnDef::nullable("score", ColumnType::Float64),
242 ])
243 .expect("valid")
244 }
245
246 fn write_segment(rows: usize) -> Vec<u8> {
247 let schema = test_schema();
248 let mut mt = ColumnarMemtable::new(&schema);
249 for i in 0..rows {
250 mt.append_row(&[
251 Value::Integer(i as i64),
252 Value::String(format!("user_{i}")),
253 if i % 3 == 0 {
254 Value::Null
255 } else {
256 Value::Float(i as f64 * 0.5)
257 },
258 ])
259 .expect("append");
260 }
261 let (schema, columns, row_count) = mt.drain();
262 SegmentWriter::plain()
263 .write_segment(&schema, &columns, row_count)
264 .expect("write")
265 }
266
267 #[test]
268 fn compact_removes_deleted_rows() {
269 let segment = write_segment(100);
270 let mut deletes = DeleteBitmap::new();
271
272 for i in (0..100).step_by(10) {
274 deletes.mark_deleted(i);
275 }
276
277 let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
278
279 assert_eq!(result.live_rows, 90);
280 assert_eq!(result.removed_rows, 10);
281 assert!(result.segment.is_some());
282
283 let new_seg = result.segment.as_ref().expect("segment");
285 let reader = SegmentReader::open(new_seg).expect("open");
286 assert_eq!(reader.row_count(), 90);
287
288 let col = reader.read_column(0).expect("read id");
291 match col {
292 DecodedColumn::Int64 { values, valid } => {
293 assert_eq!(values[0], 1); assert!(valid[0]);
295 assert_eq!(values[8], 9);
297 }
298 _ => panic!("expected Int64"),
299 }
300 }
301
302 #[test]
303 fn compact_all_deleted() {
304 let segment = write_segment(10);
305 let mut deletes = DeleteBitmap::new();
306 for i in 0..10 {
307 deletes.mark_deleted(i);
308 }
309
310 let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
311
312 assert_eq!(result.live_rows, 0);
313 assert_eq!(result.removed_rows, 10);
314 assert!(result.segment.is_none());
315 }
316
317 #[test]
318 fn compact_no_deletes() {
319 let segment = write_segment(50);
320 let deletes = DeleteBitmap::new();
321
322 let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
323
324 assert_eq!(result.live_rows, 50);
325 assert_eq!(result.removed_rows, 0);
326 assert!(result.segment.is_some());
327 }
328
329 #[test]
330 fn merge_multiple_segments() {
331 let seg1 = write_segment(50);
332 let seg2 = write_segment(30);
333
334 let mut del1 = DeleteBitmap::new();
335 del1.mark_deleted_batch(&[0, 1, 2]); let del2 = DeleteBitmap::new(); let result =
340 compact_segments(&[(&seg1, &del1), (&seg2, &del2)], &test_schema(), 0).expect("merge");
341
342 assert_eq!(result.live_rows, 77); assert_eq!(result.removed_rows, 3);
344 assert!(result.segment.is_some());
345
346 let new_seg = result.segment.as_ref().expect("segment");
347 let reader = SegmentReader::open(new_seg).expect("open");
348 assert_eq!(reader.row_count(), 77);
349 }
350
351 #[test]
352 fn compact_preserves_string_data() {
353 let segment = write_segment(20);
354 let mut deletes = DeleteBitmap::new();
355 deletes.mark_deleted(0); let result = compact_segment(&segment, &deletes, &test_schema(), 0).expect("compact");
358 let new_seg = result.segment.as_ref().expect("segment");
359 let reader = SegmentReader::open(new_seg).expect("open");
360
361 let col = reader.read_column(1).expect("read name");
363 match col {
364 DecodedColumn::Binary {
365 data,
366 offsets,
367 valid,
368 } => {
369 let start = offsets[0] as usize;
371 let end = offsets[1] as usize;
372 let first_name = std::str::from_utf8(&data[start..end]).expect("utf8");
373 assert_eq!(first_name, "user_1");
374 assert!(valid[0]);
375 }
376 _ => panic!("expected Binary"),
377 }
378 }
379}