Skip to main content

nodedb_columnar/mutation/
flush.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Post-write coordination: memtable flush + compaction commit.
4
5use std::collections::HashMap;
6
7use crate::error::ColumnarError;
8use crate::pk_index::RowLocation;
9use crate::wal_record::ColumnarWalRecord;
10
11use super::engine::{MutationEngine, MutationResult};
12
13impl MutationEngine {
14    /// Notify the engine that the memtable was flushed to a new segment.
15    ///
16    /// Updates the PK index to remap memtable entries to the new segment.
17    /// Returns the WAL record for the flush event, or `SegmentIdExhausted`
18    /// if the u64 segment ID counter has wrapped past its maximum.
19    pub fn on_memtable_flushed(
20        &mut self,
21        new_segment_id: u64,
22    ) -> Result<MutationResult, ColumnarError> {
23        let row_count = self.memtable_row_counter;
24
25        // Remap PK index entries from virtual memtable segment to real segment.
26        self.pk_index
27            .remap_segment(self.memtable_segment_id, |old_row| {
28                Some(RowLocation {
29                    segment_id: new_segment_id,
30                    row_index: old_row,
31                })
32            });
33
34        // Advance the segment ID counter with overflow protection.
35        let next = self
36            .next_segment_id
37            .checked_add(1)
38            .ok_or(ColumnarError::SegmentIdExhausted)?;
39
40        // Reset memtable tracking.
41        self.memtable_segment_id = self.next_segment_id;
42        self.next_segment_id = next;
43        self.memtable_row_counter = 0;
44        self.memtable_surrogates.clear();
45
46        let wal = ColumnarWalRecord::MemtableFlushed {
47            collection: self.collection.clone(),
48            segment_id: new_segment_id,
49            row_count: row_count as u64,
50        };
51
52        Ok(MutationResult {
53            wal_records: vec![wal],
54        })
55    }
56
57    /// Notify the engine that compaction completed.
58    ///
59    /// Remaps PK index entries and removes old delete bitmaps.
60    pub fn on_compaction_complete(
61        &mut self,
62        old_segment_ids: &[u64],
63        new_segment_id: u64,
64        row_mapping: &HashMap<(u64, u32), u32>,
65    ) -> MutationResult {
66        // Remap PK index for each old segment.
67        for &old_seg in old_segment_ids {
68            self.pk_index.remap_segment(old_seg, |old_row| {
69                row_mapping
70                    .get(&(old_seg, old_row))
71                    .map(|&new_row| RowLocation {
72                        segment_id: new_segment_id,
73                        row_index: new_row,
74                    })
75            });
76
77            // Remove old delete bitmap.
78            self.delete_bitmaps.remove(&old_seg);
79        }
80
81        let wal = ColumnarWalRecord::CompactionCommit {
82            collection: self.collection.clone(),
83            old_segment_ids: old_segment_ids.to_vec(),
84            new_segment_ids: vec![new_segment_id],
85        };
86
87        MutationResult {
88            wal_records: vec![wal],
89        }
90    }
91}