Skip to main content

nodedb_columnar/
pk_index.rs

1//! Primary key index for columnar collections.
2//!
3//! Maps PK values to (segment_id, row_index) locations. Used for:
4//! - Point lookups by PK
5//! - UPDATE/DELETE row resolution
6//! - Duplicate detection on INSERT
7//!
8//! Uses a `BTreeMap` for sorted key order (enables range scans on PK).
9//! PK values are encoded as sortable byte vectors for uniform comparison.
10
11use std::collections::BTreeMap;
12
13use serde::{Deserialize, Serialize};
14use zerompk::{FromMessagePack, ToMessagePack};
15
16use crate::error::ColumnarError;
17
18/// Location of a row within the columnar segment store.
19#[derive(
20    Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
21)]
22pub struct RowLocation {
23    /// Segment identifier (index in the segment list, or a unique segment ID).
24    pub segment_id: u32,
25    /// Row index within the segment (0-based).
26    pub row_index: u32,
27}
28
29/// In-memory B-tree index mapping PK values to row locations.
30///
31/// PK values are stored as sortable byte vectors (`Vec<u8>`) produced by
32/// `encode_pk`. This gives uniform ordering regardless of PK type
33/// (Int64, String, Uuid, composite).
34#[derive(Debug, Clone)]
35pub struct PkIndex {
36    inner: BTreeMap<Vec<u8>, RowLocation>,
37}
38
39impl PkIndex {
40    /// Create an empty PK index.
41    pub fn new() -> Self {
42        Self {
43            inner: BTreeMap::new(),
44        }
45    }
46
47    /// Number of entries in the index.
48    pub fn len(&self) -> usize {
49        self.inner.len()
50    }
51
52    /// Whether the index is empty.
53    pub fn is_empty(&self) -> bool {
54        self.inner.is_empty()
55    }
56
57    /// Insert a PK → location mapping. Returns error if PK already exists.
58    pub fn insert(
59        &mut self,
60        pk_bytes: Vec<u8>,
61        location: RowLocation,
62    ) -> Result<(), ColumnarError> {
63        if self.inner.contains_key(&pk_bytes) {
64            return Err(ColumnarError::DuplicatePrimaryKey);
65        }
66        self.inner.insert(pk_bytes, location);
67        Ok(())
68    }
69
70    /// Insert or overwrite a PK → location mapping (used during compaction remap).
71    pub fn upsert(&mut self, pk_bytes: Vec<u8>, location: RowLocation) {
72        self.inner.insert(pk_bytes, location);
73    }
74
75    /// Look up a row location by PK.
76    pub fn get(&self, pk_bytes: &[u8]) -> Option<&RowLocation> {
77        self.inner.get(pk_bytes)
78    }
79
80    /// Remove a PK entry (used when a row is deleted and compacted away).
81    pub fn remove(&mut self, pk_bytes: &[u8]) -> Option<RowLocation> {
82        self.inner.remove(pk_bytes)
83    }
84
85    /// Check whether a PK exists in the index.
86    pub fn contains(&self, pk_bytes: &[u8]) -> bool {
87        self.inner.contains_key(pk_bytes)
88    }
89
90    /// Remap all entries pointing to `old_segment_id` using a provided function.
91    ///
92    /// Used after compaction: old segment IDs are replaced with new ones,
93    /// and row indices may shift. The `remap_fn` returns `None` to remove
94    /// the entry (row was deleted during compaction) or `Some(new_location)`.
95    pub fn remap_segment(
96        &mut self,
97        old_segment_id: u32,
98        remap_fn: impl Fn(u32) -> Option<RowLocation>,
99    ) {
100        let keys_to_remap: Vec<Vec<u8>> = self
101            .inner
102            .iter()
103            .filter(|(_, loc)| loc.segment_id == old_segment_id)
104            .map(|(k, _)| k.clone())
105            .collect();
106
107        for key in keys_to_remap {
108            let old_loc = self.inner.remove(&key).expect("key exists from filter");
109            if let Some(new_loc) = remap_fn(old_loc.row_index) {
110                self.inner.insert(key, new_loc);
111            }
112            // If remap_fn returns None, the entry is dropped (row was deleted).
113        }
114    }
115
116    /// Bulk insert entries for a newly flushed segment.
117    ///
118    /// `pk_bytes_list` contains the encoded PK bytes for each row in the segment
119    /// (in order). `segment_id` is the new segment's ID.
120    pub fn bulk_insert(
121        &mut self,
122        segment_id: u32,
123        pk_bytes_list: &[Vec<u8>],
124    ) -> Result<(), ColumnarError> {
125        for (row_index, pk_bytes) in pk_bytes_list.iter().enumerate() {
126            let location = RowLocation {
127                segment_id,
128                row_index: row_index as u32,
129            };
130            self.insert(pk_bytes.clone(), location)?;
131        }
132        Ok(())
133    }
134
135    /// Remove all entries pointing to a given segment (used when dropping a segment).
136    pub fn remove_segment(&mut self, segment_id: u32) {
137        self.inner.retain(|_, loc| loc.segment_id != segment_id);
138    }
139
140    /// Serialize the index to bytes for checkpoint persistence.
141    pub fn to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
142        // Serialize as Vec<(key, location)> via MessagePack.
143        let entries: Vec<(&Vec<u8>, &RowLocation)> = self.inner.iter().collect();
144        zerompk::to_msgpack_vec(&entries).map_err(|e| ColumnarError::Serialization(e.to_string()))
145    }
146
147    /// Deserialize the index from a checkpoint.
148    pub fn from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
149        let entries: Vec<(Vec<u8>, RowLocation)> =
150            zerompk::from_msgpack(data).map_err(|e| ColumnarError::Serialization(e.to_string()))?;
151        let mut inner = BTreeMap::new();
152        for (key, loc) in entries {
153            inner.insert(key, loc);
154        }
155        Ok(Self { inner })
156    }
157}
158
159impl Default for PkIndex {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165/// Encode a PK value into sortable bytes.
166///
167/// Uses big-endian XOR encoding for signed integers (preserves sort order),
168/// raw UTF-8 for strings, raw bytes for UUIDs.
169pub fn encode_pk(value: &nodedb_types::value::Value) -> Vec<u8> {
170    use nodedb_types::value::Value;
171    match value {
172        Value::Integer(v) => {
173            let sortable = (*v as u64) ^ (1u64 << 63);
174            sortable.to_be_bytes().to_vec()
175        }
176        Value::String(s) => s.as_bytes().to_vec(),
177        Value::Uuid(s) => s.as_bytes().to_vec(),
178        Value::Decimal(d) => d.serialize().to_vec(),
179        Value::DateTime(dt) => {
180            let sortable = (dt.micros as u64) ^ (1u64 << 63);
181            sortable.to_be_bytes().to_vec()
182        }
183        _ => format!("{value:?}").into_bytes(),
184    }
185}
186
187/// Encode a composite PK from multiple values.
188pub fn encode_composite_pk(values: &[&nodedb_types::value::Value]) -> Vec<u8> {
189    let mut key = Vec::new();
190    for (i, val) in values.iter().enumerate() {
191        if i > 0 {
192            key.push(0xFF); // Separator between composite PK parts.
193        }
194        key.extend_from_slice(&encode_pk(val));
195    }
196    key
197}
198
199#[cfg(test)]
200mod tests {
201    use nodedb_types::value::Value;
202
203    use super::*;
204
205    #[test]
206    fn insert_and_lookup() {
207        let mut idx = PkIndex::new();
208        let pk = encode_pk(&Value::Integer(42));
209        let loc = RowLocation {
210            segment_id: 0,
211            row_index: 5,
212        };
213
214        idx.insert(pk.clone(), loc).expect("insert");
215        assert_eq!(idx.get(&pk), Some(&loc));
216        assert_eq!(idx.len(), 1);
217    }
218
219    #[test]
220    fn duplicate_pk_rejected() {
221        let mut idx = PkIndex::new();
222        let pk = encode_pk(&Value::Integer(1));
223        let loc = RowLocation {
224            segment_id: 0,
225            row_index: 0,
226        };
227
228        idx.insert(pk.clone(), loc).expect("first insert");
229        assert!(matches!(
230            idx.insert(pk, loc),
231            Err(ColumnarError::DuplicatePrimaryKey)
232        ));
233    }
234
235    #[test]
236    fn remove_entry() {
237        let mut idx = PkIndex::new();
238        let pk = encode_pk(&Value::Integer(1));
239        let loc = RowLocation {
240            segment_id: 0,
241            row_index: 0,
242        };
243
244        idx.insert(pk.clone(), loc).expect("insert");
245        let removed = idx.remove(&pk);
246        assert_eq!(removed, Some(loc));
247        assert!(idx.is_empty());
248    }
249
250    #[test]
251    fn bulk_insert() {
252        let mut idx = PkIndex::new();
253        let pks: Vec<Vec<u8>> = (0..10).map(|i| encode_pk(&Value::Integer(i))).collect();
254
255        idx.bulk_insert(0, &pks).expect("bulk insert");
256        assert_eq!(idx.len(), 10);
257
258        // Verify row indices.
259        let loc = idx.get(&pks[5]).expect("lookup");
260        assert_eq!(loc.segment_id, 0);
261        assert_eq!(loc.row_index, 5);
262    }
263
264    #[test]
265    fn remap_segment() {
266        let mut idx = PkIndex::new();
267        let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
268        idx.bulk_insert(0, &pks).expect("bulk insert");
269
270        // Remap segment 0 → segment 1, shifting row indices by +10.
271        // Remove row 2 (deleted during compaction).
272        idx.remap_segment(0, |old_row| {
273            if old_row == 2 {
274                None // Deleted.
275            } else {
276                Some(RowLocation {
277                    segment_id: 1,
278                    row_index: old_row + 10,
279                })
280            }
281        });
282
283        assert_eq!(idx.len(), 4); // Row 2 was removed.
284        let loc = idx.get(&pks[0]).expect("row 0");
285        assert_eq!(loc.segment_id, 1);
286        assert_eq!(loc.row_index, 10);
287
288        assert!(idx.get(&pks[2]).is_none()); // Row 2 was deleted.
289    }
290
291    #[test]
292    fn remove_segment() {
293        let mut idx = PkIndex::new();
294        let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
295        idx.bulk_insert(0, &pks).expect("seg 0");
296
297        let pks2: Vec<Vec<u8>> = (10..15).map(|i| encode_pk(&Value::Integer(i))).collect();
298        idx.bulk_insert(1, &pks2).expect("seg 1");
299
300        assert_eq!(idx.len(), 10);
301        idx.remove_segment(0);
302        assert_eq!(idx.len(), 5); // Only segment 1 remains.
303    }
304
305    #[test]
306    fn serialization_roundtrip() {
307        let mut idx = PkIndex::new();
308        let pks: Vec<Vec<u8>> = (0..100).map(|i| encode_pk(&Value::Integer(i))).collect();
309        idx.bulk_insert(0, &pks).expect("bulk insert");
310
311        let bytes = idx.to_bytes().expect("serialize");
312        let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
313
314        assert_eq!(restored.len(), 100);
315        let loc = restored.get(&pks[50]).expect("lookup");
316        assert_eq!(loc.segment_id, 0);
317        assert_eq!(loc.row_index, 50);
318    }
319
320    #[test]
321    fn int_sort_order() {
322        // Verify that encoded integers sort correctly (including negatives).
323        let values = [-100i64, -1, 0, 1, 100];
324        let encoded: Vec<Vec<u8>> = values
325            .iter()
326            .map(|v| encode_pk(&Value::Integer(*v)))
327            .collect();
328
329        for i in 0..encoded.len() - 1 {
330            assert!(
331                encoded[i] < encoded[i + 1],
332                "sort order broken: {} < {}",
333                values[i],
334                values[i + 1]
335            );
336        }
337    }
338
339    #[test]
340    fn composite_pk() {
341        let pk1 = encode_composite_pk(&[&Value::Integer(1), &Value::String("a".into())]);
342        let pk2 = encode_composite_pk(&[&Value::Integer(1), &Value::String("b".into())]);
343        let pk3 = encode_composite_pk(&[&Value::Integer(2), &Value::String("a".into())]);
344
345        assert!(pk1 < pk2); // Same first part, "a" < "b".
346        assert!(pk2 < pk3); // 1 < 2 in first part.
347    }
348}