1use std::collections::BTreeMap;
14
15use serde::{Deserialize, Serialize};
16use zerompk::{FromMessagePack, ToMessagePack};
17
18use crate::error::ColumnarError;
19
20#[derive(
22 Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
23)]
24pub struct RowLocation {
25 pub segment_id: u64,
27 pub row_index: u32,
29}
30
31#[derive(Debug, Clone)]
37pub struct PkIndex {
38 inner: BTreeMap<Vec<u8>, RowLocation>,
39}
40
41impl PkIndex {
42 pub fn new() -> Self {
44 Self {
45 inner: BTreeMap::new(),
46 }
47 }
48
49 pub fn len(&self) -> usize {
51 self.inner.len()
52 }
53
54 pub fn is_empty(&self) -> bool {
56 self.inner.is_empty()
57 }
58
59 pub fn insert(
61 &mut self,
62 pk_bytes: Vec<u8>,
63 location: RowLocation,
64 ) -> Result<(), ColumnarError> {
65 if self.inner.contains_key(&pk_bytes) {
66 return Err(ColumnarError::DuplicatePrimaryKey);
67 }
68 self.inner.insert(pk_bytes, location);
69 Ok(())
70 }
71
72 pub fn upsert(&mut self, pk_bytes: Vec<u8>, location: RowLocation) {
74 self.inner.insert(pk_bytes, location);
75 }
76
77 pub fn get(&self, pk_bytes: &[u8]) -> Option<&RowLocation> {
79 self.inner.get(pk_bytes)
80 }
81
82 pub fn remove(&mut self, pk_bytes: &[u8]) -> Option<RowLocation> {
84 self.inner.remove(pk_bytes)
85 }
86
87 pub fn contains(&self, pk_bytes: &[u8]) -> bool {
89 self.inner.contains_key(pk_bytes)
90 }
91
92 pub fn remap_segment(
98 &mut self,
99 old_segment_id: u64,
100 remap_fn: impl Fn(u32) -> Option<RowLocation>,
101 ) {
102 let keys_to_remap: Vec<Vec<u8>> = self
103 .inner
104 .iter()
105 .filter(|(_, loc)| loc.segment_id == old_segment_id)
106 .map(|(k, _)| k.clone())
107 .collect();
108
109 for key in keys_to_remap {
110 let old_loc = self.inner.remove(&key).expect("key exists from filter");
111 if let Some(new_loc) = remap_fn(old_loc.row_index) {
112 self.inner.insert(key, new_loc);
113 }
114 }
116 }
117
118 pub fn bulk_insert(
123 &mut self,
124 segment_id: u64,
125 pk_bytes_list: &[Vec<u8>],
126 ) -> Result<(), ColumnarError> {
127 for (row_index, pk_bytes) in pk_bytes_list.iter().enumerate() {
128 let location = RowLocation {
129 segment_id,
130 row_index: row_index as u32,
131 };
132 self.insert(pk_bytes.clone(), location)?;
133 }
134 Ok(())
135 }
136
137 pub fn remove_segment(&mut self, segment_id: u64) {
139 self.inner.retain(|_, loc| loc.segment_id != segment_id);
140 }
141
142 pub fn to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
144 let entries: Vec<(&Vec<u8>, &RowLocation)> = self.inner.iter().collect();
146 zerompk::to_msgpack_vec(&entries).map_err(|e| ColumnarError::Serialization(e.to_string()))
147 }
148
149 pub fn from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
151 let entries: Vec<(Vec<u8>, RowLocation)> =
152 zerompk::from_msgpack(data).map_err(|e| ColumnarError::Serialization(e.to_string()))?;
153 let mut inner = BTreeMap::new();
154 for (key, loc) in entries {
155 inner.insert(key, loc);
156 }
157 Ok(Self { inner })
158 }
159}
160
161impl Default for PkIndex {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167pub fn encode_pk(value: &nodedb_types::value::Value) -> Vec<u8> {
172 use nodedb_types::value::Value;
173 match value {
174 Value::Integer(v) => {
175 let sortable = (*v as u64) ^ (1u64 << 63);
176 sortable.to_be_bytes().to_vec()
177 }
178 Value::String(s) => s.as_bytes().to_vec(),
179 Value::Uuid(s) => s.as_bytes().to_vec(),
180 Value::Decimal(d) => d.serialize().to_vec(),
181 Value::DateTime(dt) => {
182 let sortable = (dt.micros as u64) ^ (1u64 << 63);
183 sortable.to_be_bytes().to_vec()
184 }
185 _ => format!("{value:?}").into_bytes(),
186 }
187}
188
189pub fn encode_composite_pk(values: &[&nodedb_types::value::Value]) -> Vec<u8> {
191 let mut key = Vec::new();
192 for (i, val) in values.iter().enumerate() {
193 if i > 0 {
194 key.push(0xFF); }
196 key.extend_from_slice(&encode_pk(val));
197 }
198 key
199}
200
201#[cfg(test)]
202mod tests {
203 use nodedb_types::value::Value;
204
205 use super::*;
206
207 #[test]
208 fn insert_and_lookup() {
209 let mut idx = PkIndex::new();
210 let pk = encode_pk(&Value::Integer(42));
211 let loc = RowLocation {
212 segment_id: 0,
213 row_index: 5,
214 };
215
216 idx.insert(pk.clone(), loc).expect("insert");
217 assert_eq!(idx.get(&pk), Some(&loc));
218 assert_eq!(idx.len(), 1);
219 }
220
221 #[test]
222 fn duplicate_pk_rejected() {
223 let mut idx = PkIndex::new();
224 let pk = encode_pk(&Value::Integer(1));
225 let loc = RowLocation {
226 segment_id: 0,
227 row_index: 0,
228 };
229
230 idx.insert(pk.clone(), loc).expect("first insert");
231 assert!(matches!(
232 idx.insert(pk, loc),
233 Err(ColumnarError::DuplicatePrimaryKey)
234 ));
235 }
236
237 #[test]
238 fn remove_entry() {
239 let mut idx = PkIndex::new();
240 let pk = encode_pk(&Value::Integer(1));
241 let loc = RowLocation {
242 segment_id: 0,
243 row_index: 0,
244 };
245
246 idx.insert(pk.clone(), loc).expect("insert");
247 let removed = idx.remove(&pk);
248 assert_eq!(removed, Some(loc));
249 assert!(idx.is_empty());
250 }
251
252 #[test]
253 fn bulk_insert() {
254 let mut idx = PkIndex::new();
255 let pks: Vec<Vec<u8>> = (0..10).map(|i| encode_pk(&Value::Integer(i))).collect();
256
257 idx.bulk_insert(0, &pks).expect("bulk insert");
258 assert_eq!(idx.len(), 10);
259
260 let loc = idx.get(&pks[5]).expect("lookup");
262 assert_eq!(loc.segment_id, 0);
263 assert_eq!(loc.row_index, 5);
264 }
265
266 #[test]
267 fn remap_segment() {
268 let mut idx = PkIndex::new();
269 let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
270 idx.bulk_insert(0, &pks).expect("bulk insert");
271
272 idx.remap_segment(0, |old_row| {
275 if old_row == 2 {
276 None } else {
278 Some(RowLocation {
279 segment_id: 1,
280 row_index: old_row + 10,
281 })
282 }
283 });
284
285 assert_eq!(idx.len(), 4); let loc = idx.get(&pks[0]).expect("row 0");
287 assert_eq!(loc.segment_id, 1);
288 assert_eq!(loc.row_index, 10);
289
290 assert!(idx.get(&pks[2]).is_none()); }
292
293 #[test]
294 fn remove_segment() {
295 let mut idx = PkIndex::new();
296 let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
297 idx.bulk_insert(0, &pks).expect("seg 0");
298
299 let pks2: Vec<Vec<u8>> = (10..15).map(|i| encode_pk(&Value::Integer(i))).collect();
300 idx.bulk_insert(1, &pks2).expect("seg 1");
301
302 assert_eq!(idx.len(), 10);
303 idx.remove_segment(0);
304 assert_eq!(idx.len(), 5); }
306
307 #[test]
308 fn serialization_roundtrip() {
309 let mut idx = PkIndex::new();
310 let pks: Vec<Vec<u8>> = (0..100).map(|i| encode_pk(&Value::Integer(i))).collect();
311 idx.bulk_insert(0, &pks).expect("bulk insert");
312
313 let bytes = idx.to_bytes().expect("serialize");
314 let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
315
316 assert_eq!(restored.len(), 100);
317 let loc = restored.get(&pks[50]).expect("lookup");
318 assert_eq!(loc.segment_id, 0);
319 assert_eq!(loc.row_index, 50);
320 }
321
322 #[test]
323 fn int_sort_order() {
324 let values = [-100i64, -1, 0, 1, 100];
326 let encoded: Vec<Vec<u8>> = values
327 .iter()
328 .map(|v| encode_pk(&Value::Integer(*v)))
329 .collect();
330
331 for i in 0..encoded.len() - 1 {
332 assert!(
333 encoded[i] < encoded[i + 1],
334 "sort order broken: {} < {}",
335 values[i],
336 values[i + 1]
337 );
338 }
339 }
340
341 #[test]
342 fn pk_index_roundtrip_segment_id_above_u32_max() {
343 let mut idx = PkIndex::new();
344 let pk = encode_pk(&Value::Integer(999));
345 let large_seg_id: u64 = u32::MAX as u64 + 1;
347 let loc = RowLocation {
348 segment_id: large_seg_id,
349 row_index: 7,
350 };
351
352 idx.insert(pk.clone(), loc).expect("insert");
353
354 let bytes = idx.to_bytes().expect("serialize");
356 let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
357
358 let got = restored.get(&pk).expect("lookup after roundtrip");
359 assert_eq!(got.segment_id, large_seg_id);
360 assert_eq!(got.row_index, 7);
361
362 idx.remap_segment(large_seg_id, |old_row| {
364 Some(RowLocation {
365 segment_id: large_seg_id + 1,
366 row_index: old_row,
367 })
368 });
369 let remapped = idx.get(&pk).expect("lookup after remap");
370 assert_eq!(remapped.segment_id, large_seg_id + 1);
371
372 idx.remove_segment(large_seg_id + 1);
374 assert!(idx.is_empty());
375 }
376
377 #[test]
378 fn composite_pk() {
379 let pk1 = encode_composite_pk(&[&Value::Integer(1), &Value::String("a".into())]);
380 let pk2 = encode_composite_pk(&[&Value::Integer(1), &Value::String("b".into())]);
381 let pk3 = encode_composite_pk(&[&Value::Integer(2), &Value::String("a".into())]);
382
383 assert!(pk1 < pk2); assert!(pk2 < pk3); }
386}