nodedb_columnar/mutation/
flush.rs1use std::collections::HashMap;
6
7use crate::error::ColumnarError;
8use crate::pk_index::RowLocation;
9use crate::wal_record::ColumnarWalRecord;
10
11use super::engine::{MutationEngine, MutationResult};
12
13impl MutationEngine {
14 pub fn on_memtable_flushed(
20 &mut self,
21 new_segment_id: u64,
22 ) -> Result<MutationResult, ColumnarError> {
23 let row_count = self.memtable_row_counter;
24
25 self.pk_index
27 .remap_segment(self.memtable_segment_id, |old_row| {
28 Some(RowLocation {
29 segment_id: new_segment_id,
30 row_index: old_row,
31 })
32 });
33
34 let next = self
36 .next_segment_id
37 .checked_add(1)
38 .ok_or(ColumnarError::SegmentIdExhausted)?;
39
40 self.memtable_segment_id = self.next_segment_id;
42 self.next_segment_id = next;
43 self.memtable_row_counter = 0;
44 self.memtable_surrogates.clear();
45
46 let wal = ColumnarWalRecord::MemtableFlushed {
47 collection: self.collection.clone(),
48 segment_id: new_segment_id,
49 row_count: row_count as u64,
50 };
51
52 Ok(MutationResult {
53 wal_records: vec![wal],
54 })
55 }
56
57 pub fn on_compaction_complete(
61 &mut self,
62 old_segment_ids: &[u64],
63 new_segment_id: u64,
64 row_mapping: &HashMap<(u64, u32), u32>,
65 ) -> MutationResult {
66 for &old_seg in old_segment_ids {
68 self.pk_index.remap_segment(old_seg, |old_row| {
69 row_mapping
70 .get(&(old_seg, old_row))
71 .map(|&new_row| RowLocation {
72 segment_id: new_segment_id,
73 row_index: new_row,
74 })
75 });
76
77 self.delete_bitmaps.remove(&old_seg);
79 }
80
81 let wal = ColumnarWalRecord::CompactionCommit {
82 collection: self.collection.clone(),
83 old_segment_ids: old_segment_ids.to_vec(),
84 new_segment_ids: vec![new_segment_id],
85 };
86
87 MutationResult {
88 wal_records: vec![wal],
89 }
90 }
91}