Skip to main content

nodedb_columnar/
pk_index.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Primary key index for columnar collections.
4//!
5//! Maps PK values to (segment_id, row_index) locations. Used for:
6//! - Point lookups by PK
7//! - UPDATE/DELETE row resolution
8//! - Duplicate detection on INSERT
9//!
10//! Uses a `BTreeMap` for sorted key order (enables range scans on PK).
11//! PK values are encoded as sortable byte vectors for uniform comparison.
12
13use std::collections::BTreeMap;
14
15use serde::{Deserialize, Serialize};
16use zerompk::{FromMessagePack, ToMessagePack};
17
18use crate::error::ColumnarError;
19
20/// Location of a row within the columnar segment store.
21#[derive(
22    Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
23)]
24pub struct RowLocation {
25    /// Segment identifier (index in the segment list, or a unique segment ID).
26    pub segment_id: u64,
27    /// Row index within the segment (0-based).
28    pub row_index: u32,
29}
30
31/// In-memory B-tree index mapping PK values to row locations.
32///
33/// PK values are stored as sortable byte vectors (`Vec<u8>`) produced by
34/// `encode_pk`. This gives uniform ordering regardless of PK type
35/// (Int64, String, Uuid, composite).
36#[derive(Debug, Clone)]
37pub struct PkIndex {
38    inner: BTreeMap<Vec<u8>, RowLocation>,
39}
40
41impl PkIndex {
42    /// Create an empty PK index.
43    pub fn new() -> Self {
44        Self {
45            inner: BTreeMap::new(),
46        }
47    }
48
49    /// Number of entries in the index.
50    pub fn len(&self) -> usize {
51        self.inner.len()
52    }
53
54    /// Whether the index is empty.
55    pub fn is_empty(&self) -> bool {
56        self.inner.is_empty()
57    }
58
59    /// Insert a PK → location mapping. Returns error if PK already exists.
60    pub fn insert(
61        &mut self,
62        pk_bytes: Vec<u8>,
63        location: RowLocation,
64    ) -> Result<(), ColumnarError> {
65        if self.inner.contains_key(&pk_bytes) {
66            return Err(ColumnarError::DuplicatePrimaryKey);
67        }
68        self.inner.insert(pk_bytes, location);
69        Ok(())
70    }
71
72    /// Insert or overwrite a PK → location mapping (used during compaction remap).
73    pub fn upsert(&mut self, pk_bytes: Vec<u8>, location: RowLocation) {
74        self.inner.insert(pk_bytes, location);
75    }
76
77    /// Look up a row location by PK.
78    pub fn get(&self, pk_bytes: &[u8]) -> Option<&RowLocation> {
79        self.inner.get(pk_bytes)
80    }
81
82    /// Remove a PK entry (used when a row is deleted and compacted away).
83    pub fn remove(&mut self, pk_bytes: &[u8]) -> Option<RowLocation> {
84        self.inner.remove(pk_bytes)
85    }
86
87    /// Check whether a PK exists in the index.
88    pub fn contains(&self, pk_bytes: &[u8]) -> bool {
89        self.inner.contains_key(pk_bytes)
90    }
91
92    /// Remap all entries pointing to `old_segment_id` using a provided function.
93    ///
94    /// Used after compaction: old segment IDs are replaced with new ones,
95    /// and row indices may shift. The `remap_fn` returns `None` to remove
96    /// the entry (row was deleted during compaction) or `Some(new_location)`.
97    pub fn remap_segment(
98        &mut self,
99        old_segment_id: u64,
100        remap_fn: impl Fn(u32) -> Option<RowLocation>,
101    ) {
102        let keys_to_remap: Vec<Vec<u8>> = self
103            .inner
104            .iter()
105            .filter(|(_, loc)| loc.segment_id == old_segment_id)
106            .map(|(k, _)| k.clone())
107            .collect();
108
109        for key in keys_to_remap {
110            let old_loc = self.inner.remove(&key).expect("key exists from filter");
111            if let Some(new_loc) = remap_fn(old_loc.row_index) {
112                self.inner.insert(key, new_loc);
113            }
114            // If remap_fn returns None, the entry is dropped (row was deleted).
115        }
116    }
117
118    /// Bulk insert entries for a newly flushed segment.
119    ///
120    /// `pk_bytes_list` contains the encoded PK bytes for each row in the segment
121    /// (in order). `segment_id` is the new segment's ID.
122    pub fn bulk_insert(
123        &mut self,
124        segment_id: u64,
125        pk_bytes_list: &[Vec<u8>],
126    ) -> Result<(), ColumnarError> {
127        for (row_index, pk_bytes) in pk_bytes_list.iter().enumerate() {
128            let location = RowLocation {
129                segment_id,
130                row_index: row_index as u32,
131            };
132            self.insert(pk_bytes.clone(), location)?;
133        }
134        Ok(())
135    }
136
137    /// Remove all entries pointing to a given segment (used when dropping a segment).
138    pub fn remove_segment(&mut self, segment_id: u64) {
139        self.inner.retain(|_, loc| loc.segment_id != segment_id);
140    }
141
142    /// Serialize the index to bytes for checkpoint persistence.
143    pub fn to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
144        // Serialize as Vec<(key, location)> via MessagePack.
145        let entries: Vec<(&Vec<u8>, &RowLocation)> = self.inner.iter().collect();
146        zerompk::to_msgpack_vec(&entries).map_err(|e| ColumnarError::Serialization(e.to_string()))
147    }
148
149    /// Deserialize the index from a checkpoint.
150    pub fn from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
151        let entries: Vec<(Vec<u8>, RowLocation)> =
152            zerompk::from_msgpack(data).map_err(|e| ColumnarError::Serialization(e.to_string()))?;
153        let mut inner = BTreeMap::new();
154        for (key, loc) in entries {
155            inner.insert(key, loc);
156        }
157        Ok(Self { inner })
158    }
159}
160
161impl Default for PkIndex {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167/// Encode a PK value into sortable bytes.
168///
169/// Uses big-endian XOR encoding for signed integers (preserves sort order),
170/// raw UTF-8 for strings, raw bytes for UUIDs.
171pub fn encode_pk(value: &nodedb_types::value::Value) -> Vec<u8> {
172    use nodedb_types::value::Value;
173    match value {
174        Value::Integer(v) => {
175            let sortable = (*v as u64) ^ (1u64 << 63);
176            sortable.to_be_bytes().to_vec()
177        }
178        Value::String(s) => s.as_bytes().to_vec(),
179        Value::Uuid(s) => s.as_bytes().to_vec(),
180        Value::Decimal(d) => d.serialize().to_vec(),
181        Value::DateTime(dt) => {
182            let sortable = (dt.micros as u64) ^ (1u64 << 63);
183            sortable.to_be_bytes().to_vec()
184        }
185        _ => format!("{value:?}").into_bytes(),
186    }
187}
188
189/// Encode a composite PK from multiple values.
190pub fn encode_composite_pk(values: &[&nodedb_types::value::Value]) -> Vec<u8> {
191    let mut key = Vec::new();
192    for (i, val) in values.iter().enumerate() {
193        if i > 0 {
194            key.push(0xFF); // Separator between composite PK parts.
195        }
196        key.extend_from_slice(&encode_pk(val));
197    }
198    key
199}
200
201#[cfg(test)]
202mod tests {
203    use nodedb_types::value::Value;
204
205    use super::*;
206
207    #[test]
208    fn insert_and_lookup() {
209        let mut idx = PkIndex::new();
210        let pk = encode_pk(&Value::Integer(42));
211        let loc = RowLocation {
212            segment_id: 0,
213            row_index: 5,
214        };
215
216        idx.insert(pk.clone(), loc).expect("insert");
217        assert_eq!(idx.get(&pk), Some(&loc));
218        assert_eq!(idx.len(), 1);
219    }
220
221    #[test]
222    fn duplicate_pk_rejected() {
223        let mut idx = PkIndex::new();
224        let pk = encode_pk(&Value::Integer(1));
225        let loc = RowLocation {
226            segment_id: 0,
227            row_index: 0,
228        };
229
230        idx.insert(pk.clone(), loc).expect("first insert");
231        assert!(matches!(
232            idx.insert(pk, loc),
233            Err(ColumnarError::DuplicatePrimaryKey)
234        ));
235    }
236
237    #[test]
238    fn remove_entry() {
239        let mut idx = PkIndex::new();
240        let pk = encode_pk(&Value::Integer(1));
241        let loc = RowLocation {
242            segment_id: 0,
243            row_index: 0,
244        };
245
246        idx.insert(pk.clone(), loc).expect("insert");
247        let removed = idx.remove(&pk);
248        assert_eq!(removed, Some(loc));
249        assert!(idx.is_empty());
250    }
251
252    #[test]
253    fn bulk_insert() {
254        let mut idx = PkIndex::new();
255        let pks: Vec<Vec<u8>> = (0..10).map(|i| encode_pk(&Value::Integer(i))).collect();
256
257        idx.bulk_insert(0, &pks).expect("bulk insert");
258        assert_eq!(idx.len(), 10);
259
260        // Verify row indices.
261        let loc = idx.get(&pks[5]).expect("lookup");
262        assert_eq!(loc.segment_id, 0);
263        assert_eq!(loc.row_index, 5);
264    }
265
266    #[test]
267    fn remap_segment() {
268        let mut idx = PkIndex::new();
269        let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
270        idx.bulk_insert(0, &pks).expect("bulk insert");
271
272        // Remap segment 0 → segment 1, shifting row indices by +10.
273        // Remove row 2 (deleted during compaction).
274        idx.remap_segment(0, |old_row| {
275            if old_row == 2 {
276                None // Deleted.
277            } else {
278                Some(RowLocation {
279                    segment_id: 1,
280                    row_index: old_row + 10,
281                })
282            }
283        });
284
285        assert_eq!(idx.len(), 4); // Row 2 was removed.
286        let loc = idx.get(&pks[0]).expect("row 0");
287        assert_eq!(loc.segment_id, 1);
288        assert_eq!(loc.row_index, 10);
289
290        assert!(idx.get(&pks[2]).is_none()); // Row 2 was deleted.
291    }
292
293    #[test]
294    fn remove_segment() {
295        let mut idx = PkIndex::new();
296        let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
297        idx.bulk_insert(0, &pks).expect("seg 0");
298
299        let pks2: Vec<Vec<u8>> = (10..15).map(|i| encode_pk(&Value::Integer(i))).collect();
300        idx.bulk_insert(1, &pks2).expect("seg 1");
301
302        assert_eq!(idx.len(), 10);
303        idx.remove_segment(0);
304        assert_eq!(idx.len(), 5); // Only segment 1 remains.
305    }
306
307    #[test]
308    fn serialization_roundtrip() {
309        let mut idx = PkIndex::new();
310        let pks: Vec<Vec<u8>> = (0..100).map(|i| encode_pk(&Value::Integer(i))).collect();
311        idx.bulk_insert(0, &pks).expect("bulk insert");
312
313        let bytes = idx.to_bytes().expect("serialize");
314        let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
315
316        assert_eq!(restored.len(), 100);
317        let loc = restored.get(&pks[50]).expect("lookup");
318        assert_eq!(loc.segment_id, 0);
319        assert_eq!(loc.row_index, 50);
320    }
321
322    #[test]
323    fn int_sort_order() {
324        // Verify that encoded integers sort correctly (including negatives).
325        let values = [-100i64, -1, 0, 1, 100];
326        let encoded: Vec<Vec<u8>> = values
327            .iter()
328            .map(|v| encode_pk(&Value::Integer(*v)))
329            .collect();
330
331        for i in 0..encoded.len() - 1 {
332            assert!(
333                encoded[i] < encoded[i + 1],
334                "sort order broken: {} < {}",
335                values[i],
336                values[i + 1]
337            );
338        }
339    }
340
341    #[test]
342    fn pk_index_roundtrip_segment_id_above_u32_max() {
343        let mut idx = PkIndex::new();
344        let pk = encode_pk(&Value::Integer(999));
345        // Use a segment ID that cannot fit in u32.
346        let large_seg_id: u64 = u32::MAX as u64 + 1;
347        let loc = RowLocation {
348            segment_id: large_seg_id,
349            row_index: 7,
350        };
351
352        idx.insert(pk.clone(), loc).expect("insert");
353
354        // Round-trip through serialization.
355        let bytes = idx.to_bytes().expect("serialize");
356        let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
357
358        let got = restored.get(&pk).expect("lookup after roundtrip");
359        assert_eq!(got.segment_id, large_seg_id);
360        assert_eq!(got.row_index, 7);
361
362        // remap_segment must match on the large ID.
363        idx.remap_segment(large_seg_id, |old_row| {
364            Some(RowLocation {
365                segment_id: large_seg_id + 1,
366                row_index: old_row,
367            })
368        });
369        let remapped = idx.get(&pk).expect("lookup after remap");
370        assert_eq!(remapped.segment_id, large_seg_id + 1);
371
372        // remove_segment must remove on the large ID.
373        idx.remove_segment(large_seg_id + 1);
374        assert!(idx.is_empty());
375    }
376
377    #[test]
378    fn composite_pk() {
379        let pk1 = encode_composite_pk(&[&Value::Integer(1), &Value::String("a".into())]);
380        let pk2 = encode_composite_pk(&[&Value::Integer(1), &Value::String("b".into())]);
381        let pk3 = encode_composite_pk(&[&Value::Integer(2), &Value::String("a".into())]);
382
383        assert!(pk1 < pk2); // Same first part, "a" < "b".
384        assert!(pk2 < pk3); // 1 < 2 in first part.
385    }
386}