1use std::collections::BTreeMap;
12
13use serde::{Deserialize, Serialize};
14use zerompk::{FromMessagePack, ToMessagePack};
15
16use crate::error::ColumnarError;
17
18#[derive(
20 Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack,
21)]
22pub struct RowLocation {
23 pub segment_id: u32,
25 pub row_index: u32,
27}
28
29#[derive(Debug, Clone)]
35pub struct PkIndex {
36 inner: BTreeMap<Vec<u8>, RowLocation>,
37}
38
39impl PkIndex {
40 pub fn new() -> Self {
42 Self {
43 inner: BTreeMap::new(),
44 }
45 }
46
47 pub fn len(&self) -> usize {
49 self.inner.len()
50 }
51
52 pub fn is_empty(&self) -> bool {
54 self.inner.is_empty()
55 }
56
57 pub fn insert(
59 &mut self,
60 pk_bytes: Vec<u8>,
61 location: RowLocation,
62 ) -> Result<(), ColumnarError> {
63 if self.inner.contains_key(&pk_bytes) {
64 return Err(ColumnarError::DuplicatePrimaryKey);
65 }
66 self.inner.insert(pk_bytes, location);
67 Ok(())
68 }
69
70 pub fn upsert(&mut self, pk_bytes: Vec<u8>, location: RowLocation) {
72 self.inner.insert(pk_bytes, location);
73 }
74
75 pub fn get(&self, pk_bytes: &[u8]) -> Option<&RowLocation> {
77 self.inner.get(pk_bytes)
78 }
79
80 pub fn remove(&mut self, pk_bytes: &[u8]) -> Option<RowLocation> {
82 self.inner.remove(pk_bytes)
83 }
84
85 pub fn contains(&self, pk_bytes: &[u8]) -> bool {
87 self.inner.contains_key(pk_bytes)
88 }
89
90 pub fn remap_segment(
96 &mut self,
97 old_segment_id: u32,
98 remap_fn: impl Fn(u32) -> Option<RowLocation>,
99 ) {
100 let keys_to_remap: Vec<Vec<u8>> = self
101 .inner
102 .iter()
103 .filter(|(_, loc)| loc.segment_id == old_segment_id)
104 .map(|(k, _)| k.clone())
105 .collect();
106
107 for key in keys_to_remap {
108 let old_loc = self.inner.remove(&key).expect("key exists from filter");
109 if let Some(new_loc) = remap_fn(old_loc.row_index) {
110 self.inner.insert(key, new_loc);
111 }
112 }
114 }
115
116 pub fn bulk_insert(
121 &mut self,
122 segment_id: u32,
123 pk_bytes_list: &[Vec<u8>],
124 ) -> Result<(), ColumnarError> {
125 for (row_index, pk_bytes) in pk_bytes_list.iter().enumerate() {
126 let location = RowLocation {
127 segment_id,
128 row_index: row_index as u32,
129 };
130 self.insert(pk_bytes.clone(), location)?;
131 }
132 Ok(())
133 }
134
135 pub fn remove_segment(&mut self, segment_id: u32) {
137 self.inner.retain(|_, loc| loc.segment_id != segment_id);
138 }
139
140 pub fn to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
142 let entries: Vec<(&Vec<u8>, &RowLocation)> = self.inner.iter().collect();
144 zerompk::to_msgpack_vec(&entries).map_err(|e| ColumnarError::Serialization(e.to_string()))
145 }
146
147 pub fn from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
149 let entries: Vec<(Vec<u8>, RowLocation)> =
150 zerompk::from_msgpack(data).map_err(|e| ColumnarError::Serialization(e.to_string()))?;
151 let mut inner = BTreeMap::new();
152 for (key, loc) in entries {
153 inner.insert(key, loc);
154 }
155 Ok(Self { inner })
156 }
157}
158
159impl Default for PkIndex {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165pub fn encode_pk(value: &nodedb_types::value::Value) -> Vec<u8> {
170 use nodedb_types::value::Value;
171 match value {
172 Value::Integer(v) => {
173 let sortable = (*v as u64) ^ (1u64 << 63);
174 sortable.to_be_bytes().to_vec()
175 }
176 Value::String(s) => s.as_bytes().to_vec(),
177 Value::Uuid(s) => s.as_bytes().to_vec(),
178 Value::Decimal(d) => d.serialize().to_vec(),
179 Value::DateTime(dt) => {
180 let sortable = (dt.micros as u64) ^ (1u64 << 63);
181 sortable.to_be_bytes().to_vec()
182 }
183 _ => format!("{value:?}").into_bytes(),
184 }
185}
186
187pub fn encode_composite_pk(values: &[&nodedb_types::value::Value]) -> Vec<u8> {
189 let mut key = Vec::new();
190 for (i, val) in values.iter().enumerate() {
191 if i > 0 {
192 key.push(0xFF); }
194 key.extend_from_slice(&encode_pk(val));
195 }
196 key
197}
198
199#[cfg(test)]
200mod tests {
201 use nodedb_types::value::Value;
202
203 use super::*;
204
205 #[test]
206 fn insert_and_lookup() {
207 let mut idx = PkIndex::new();
208 let pk = encode_pk(&Value::Integer(42));
209 let loc = RowLocation {
210 segment_id: 0,
211 row_index: 5,
212 };
213
214 idx.insert(pk.clone(), loc).expect("insert");
215 assert_eq!(idx.get(&pk), Some(&loc));
216 assert_eq!(idx.len(), 1);
217 }
218
219 #[test]
220 fn duplicate_pk_rejected() {
221 let mut idx = PkIndex::new();
222 let pk = encode_pk(&Value::Integer(1));
223 let loc = RowLocation {
224 segment_id: 0,
225 row_index: 0,
226 };
227
228 idx.insert(pk.clone(), loc).expect("first insert");
229 assert!(matches!(
230 idx.insert(pk, loc),
231 Err(ColumnarError::DuplicatePrimaryKey)
232 ));
233 }
234
235 #[test]
236 fn remove_entry() {
237 let mut idx = PkIndex::new();
238 let pk = encode_pk(&Value::Integer(1));
239 let loc = RowLocation {
240 segment_id: 0,
241 row_index: 0,
242 };
243
244 idx.insert(pk.clone(), loc).expect("insert");
245 let removed = idx.remove(&pk);
246 assert_eq!(removed, Some(loc));
247 assert!(idx.is_empty());
248 }
249
250 #[test]
251 fn bulk_insert() {
252 let mut idx = PkIndex::new();
253 let pks: Vec<Vec<u8>> = (0..10).map(|i| encode_pk(&Value::Integer(i))).collect();
254
255 idx.bulk_insert(0, &pks).expect("bulk insert");
256 assert_eq!(idx.len(), 10);
257
258 let loc = idx.get(&pks[5]).expect("lookup");
260 assert_eq!(loc.segment_id, 0);
261 assert_eq!(loc.row_index, 5);
262 }
263
264 #[test]
265 fn remap_segment() {
266 let mut idx = PkIndex::new();
267 let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
268 idx.bulk_insert(0, &pks).expect("bulk insert");
269
270 idx.remap_segment(0, |old_row| {
273 if old_row == 2 {
274 None } else {
276 Some(RowLocation {
277 segment_id: 1,
278 row_index: old_row + 10,
279 })
280 }
281 });
282
283 assert_eq!(idx.len(), 4); let loc = idx.get(&pks[0]).expect("row 0");
285 assert_eq!(loc.segment_id, 1);
286 assert_eq!(loc.row_index, 10);
287
288 assert!(idx.get(&pks[2]).is_none()); }
290
291 #[test]
292 fn remove_segment() {
293 let mut idx = PkIndex::new();
294 let pks: Vec<Vec<u8>> = (0..5).map(|i| encode_pk(&Value::Integer(i))).collect();
295 idx.bulk_insert(0, &pks).expect("seg 0");
296
297 let pks2: Vec<Vec<u8>> = (10..15).map(|i| encode_pk(&Value::Integer(i))).collect();
298 idx.bulk_insert(1, &pks2).expect("seg 1");
299
300 assert_eq!(idx.len(), 10);
301 idx.remove_segment(0);
302 assert_eq!(idx.len(), 5); }
304
305 #[test]
306 fn serialization_roundtrip() {
307 let mut idx = PkIndex::new();
308 let pks: Vec<Vec<u8>> = (0..100).map(|i| encode_pk(&Value::Integer(i))).collect();
309 idx.bulk_insert(0, &pks).expect("bulk insert");
310
311 let bytes = idx.to_bytes().expect("serialize");
312 let restored = PkIndex::from_bytes(&bytes).expect("deserialize");
313
314 assert_eq!(restored.len(), 100);
315 let loc = restored.get(&pks[50]).expect("lookup");
316 assert_eq!(loc.segment_id, 0);
317 assert_eq!(loc.row_index, 50);
318 }
319
320 #[test]
321 fn int_sort_order() {
322 let values = [-100i64, -1, 0, 1, 100];
324 let encoded: Vec<Vec<u8>> = values
325 .iter()
326 .map(|v| encode_pk(&Value::Integer(*v)))
327 .collect();
328
329 for i in 0..encoded.len() - 1 {
330 assert!(
331 encoded[i] < encoded[i + 1],
332 "sort order broken: {} < {}",
333 values[i],
334 values[i + 1]
335 );
336 }
337 }
338
339 #[test]
340 fn composite_pk() {
341 let pk1 = encode_composite_pk(&[&Value::Integer(1), &Value::String("a".into())]);
342 let pk2 = encode_composite_pk(&[&Value::Integer(1), &Value::String("b".into())]);
343 let pk3 = encode_composite_pk(&[&Value::Integer(2), &Value::String("a".into())]);
344
345 assert!(pk1 < pk2); assert!(pk2 < pk3); }
348}