Skip to main content

modelvault_core/
index.rs

1//! Persisted secondary index segments: payload codec and in-memory replay state.
2
3use std::collections::{BTreeSet, HashMap};
4
5use std::cmp::Ordering;
6
7use crate::error::{DbError, FormatError, SchemaError};
8use crate::file_format::{
9    check_decode_entry_count, check_field_bytes_len, MAX_SEGMENT_DECODE_ENTRIES,
10};
11use crate::schema::IndexKind;
12use crate::ScalarValue;
13
14pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
15pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
16pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
17
18type IndexName = String;
19type IndexKey = Vec<u8>;
20type PkKey = Vec<u8>;
21type IndexId = (u32, IndexName);
22type UniqueIndex = HashMap<IndexKey, PkKey>;
23type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
24
25/// Index delta operation.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum IndexOp {
28    Insert,
29    Delete,
30}
31
32/// One index update entry (insert/update/delete).
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct IndexEntry {
35    pub collection_id: u32,
36    pub index_name: String,
37    pub kind: IndexKind,
38    pub op: IndexOp,
39    pub index_key: Vec<u8>,
40    pub pk_key: Vec<u8>,
41}
42
43#[derive(Debug, Default, Clone)]
44pub struct IndexState {
45    unique: HashMap<IndexId, UniqueIndex>,
46    non_unique: HashMap<IndexId, NonUniqueIndex>,
47}
48
49impl IndexState {
50    pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
51        match entry.kind {
52            IndexKind::Unique => {
53                let m = self
54                    .unique
55                    .entry((entry.collection_id, entry.index_name))
56                    .or_default();
57                match entry.op {
58                    IndexOp::Insert => match m.get(&entry.index_key) {
59                        None => {
60                            m.insert(entry.index_key, entry.pk_key);
61                            Ok(())
62                        }
63                        Some(existing) if *existing == entry.pk_key => Ok(()),
64                        Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
65                    },
66                    IndexOp::Delete => match m.get(&entry.index_key) {
67                        None => Ok(()),
68                        Some(existing) if *existing == entry.pk_key => {
69                            m.remove(&entry.index_key);
70                            Ok(())
71                        }
72                        Some(_) => Err(DbError::Format(FormatError::InvalidCatalogPayload {
73                            message: "unique index delete pk_key mismatch".into(),
74                        })),
75                    },
76                }
77            }
78            IndexKind::NonUnique => {
79                let m = self
80                    .non_unique
81                    .entry((entry.collection_id, entry.index_name))
82                    .or_default();
83                match entry.op {
84                    IndexOp::Insert => {
85                        m.entry(entry.index_key).or_default().insert(entry.pk_key);
86                    }
87                    IndexOp::Delete => {
88                        if let Some(set) = m.get_mut(&entry.index_key) {
89                            set.remove(&entry.pk_key);
90                            if set.is_empty() {
91                                m.remove(&entry.index_key);
92                            }
93                        }
94                    }
95                }
96                Ok(())
97            }
98        }
99    }
100
101    pub fn unique_lookup(
102        &self,
103        collection_id: u32,
104        index_name: &str,
105        index_key: &[u8],
106    ) -> Option<&[u8]> {
107        self.unique
108            .get(&(collection_id, index_name.to_string()))?
109            .get(index_key)
110            .map(|v| v.as_slice())
111    }
112
113    pub fn non_unique_lookup(
114        &self,
115        collection_id: u32,
116        index_name: &str,
117        index_key: &[u8],
118    ) -> Option<Vec<Vec<u8>>> {
119        let set = self
120            .non_unique
121            .get(&(collection_id, index_name.to_string()))?
122            .get(index_key)?;
123        Some(set.iter().cloned().collect())
124    }
125
126    /// Collect PK keys for all index entries whose key falls in `[lo, hi]` (per `scalar_partial_cmp`).
127    pub fn non_unique_range_lookup(
128        &self,
129        collection_id: u32,
130        index_name: &str,
131        lo: Option<&ScalarValue>,
132        lo_inclusive: bool,
133        hi: Option<&ScalarValue>,
134        hi_inclusive: bool,
135    ) -> Vec<Vec<u8>> {
136        let key_hint = lo.or(hi);
137        let Some(m) = self
138            .non_unique
139            .get(&(collection_id, index_name.to_string()))
140        else {
141            return Vec::new();
142        };
143        let mut out = Vec::new();
144        for (index_key, set) in m {
145            if !index_key_in_range(index_key, key_hint, lo, lo_inclusive, hi, hi_inclusive) {
146                continue;
147            }
148            out.extend(set.iter().cloned());
149        }
150        out
151    }
152
153    /// Collect PK keys for unique index entries whose key falls in `[lo, hi]`.
154    pub fn unique_range_lookup(
155        &self,
156        collection_id: u32,
157        index_name: &str,
158        lo: Option<&ScalarValue>,
159        lo_inclusive: bool,
160        hi: Option<&ScalarValue>,
161        hi_inclusive: bool,
162    ) -> Vec<Vec<u8>> {
163        let key_hint = lo.or(hi);
164        let Some(m) = self.unique.get(&(collection_id, index_name.to_string())) else {
165            return Vec::new();
166        };
167        m.iter()
168            .filter(|(index_key, _)| {
169                index_key_in_range(index_key, key_hint, lo, lo_inclusive, hi, hi_inclusive)
170            })
171            .map(|(_, pk)| pk.clone())
172            .collect()
173    }
174
175    pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
176        let mut out = Vec::new();
177        for ((collection_id, index_name), m) in &self.unique {
178            for (index_key, pk_key) in m {
179                out.push(IndexEntry {
180                    collection_id: *collection_id,
181                    index_name: index_name.clone(),
182                    kind: IndexKind::Unique,
183                    op: IndexOp::Insert,
184                    index_key: index_key.clone(),
185                    pk_key: pk_key.clone(),
186                });
187            }
188        }
189        for ((collection_id, index_name), m) in &self.non_unique {
190            for (index_key, set) in m {
191                for pk_key in set {
192                    out.push(IndexEntry {
193                        collection_id: *collection_id,
194                        index_name: index_name.clone(),
195                        kind: IndexKind::NonUnique,
196                        op: IndexOp::Insert,
197                        index_key: index_key.clone(),
198                        pk_key: pk_key.clone(),
199                    });
200                }
201            }
202        }
203        out
204    }
205}
206
207fn decode_index_key_scalar(key: &[u8], hint: Option<&ScalarValue>) -> Option<ScalarValue> {
208    match key.len() {
209        8 => {
210            let bytes: [u8; 8] = key.try_into().ok()?;
211            Some(match hint {
212                Some(ScalarValue::Uint64(_)) => ScalarValue::Uint64(u64::from_le_bytes(bytes)),
213                Some(ScalarValue::Int64(_)) => ScalarValue::Int64(i64::from_le_bytes(bytes)),
214                Some(ScalarValue::Float64(_)) => ScalarValue::Float64(f64::from_le_bytes(bytes)),
215                Some(ScalarValue::Timestamp(_)) => {
216                    ScalarValue::Timestamp(i64::from_le_bytes(bytes))
217                }
218                _ => ScalarValue::Int64(i64::from_le_bytes(bytes)),
219            })
220        }
221        n if n > 0 => String::from_utf8(key.to_vec())
222            .ok()
223            .map(ScalarValue::String),
224        _ => None,
225    }
226}
227
228fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<Ordering> {
229    use ScalarValue::*;
230    match (a, b) {
231        (Bool(x), Bool(y)) => Some(x.cmp(y)),
232        (Int64(x), Int64(y)) => Some(x.cmp(y)),
233        (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
234        (Float64(x), Float64(y)) => x.partial_cmp(y),
235        (String(x), String(y)) => Some(x.cmp(y)),
236        (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
237        (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
238        (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
239        _ => None,
240    }
241}
242
243fn index_key_in_range(
244    key: &[u8],
245    key_hint: Option<&ScalarValue>,
246    lo: Option<&ScalarValue>,
247    lo_inclusive: bool,
248    hi: Option<&ScalarValue>,
249    hi_inclusive: bool,
250) -> bool {
251    let Some(decoded) = decode_index_key_scalar(key, key_hint) else {
252        return false;
253    };
254    if let Some(lo_v) = lo {
255        match scalar_partial_cmp(&decoded, lo_v) {
256            Some(Ordering::Less) => return false,
257            Some(Ordering::Equal) if !lo_inclusive => return false,
258            None => return false,
259            _ => {}
260        }
261    }
262    if let Some(hi_v) = hi {
263        match scalar_partial_cmp(&decoded, hi_v) {
264            Some(Ordering::Greater) => return false,
265            Some(Ordering::Equal) if !hi_inclusive => return false,
266            None => return false,
267            _ => {}
268        }
269    }
270    true
271}
272
273pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
274    let mut out = Vec::new();
275    out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
276    out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
277    for e in entries {
278        out.extend_from_slice(&e.collection_id.to_le_bytes());
279        out.push(match e.kind {
280            IndexKind::Unique => 1,
281            IndexKind::NonUnique => 2,
282        });
283        out.push(match e.op {
284            IndexOp::Insert => 1,
285            IndexOp::Delete => 2,
286        });
287        encode_string(&mut out, &e.index_name);
288        encode_bytes(&mut out, &e.index_key);
289        encode_bytes(&mut out, &e.pk_key);
290    }
291    out
292}
293
294pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
295    let mut cur = Cursor::new(bytes);
296    let ver = cur.take_u16()?;
297    if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
298        return Err(DbError::Format(FormatError::UnsupportedVersion {
299            major: 0,
300            minor: ver,
301        }));
302    }
303    let n = cur.take_u32()? as usize;
304    check_decode_entry_count(n)?;
305    let mut v = Vec::with_capacity(n.min(MAX_SEGMENT_DECODE_ENTRIES));
306    for _ in 0..n {
307        let collection_id = cur.take_u32()?;
308        let kind_tag = cur.take_u8()?;
309        let kind = match kind_tag {
310            1 => IndexKind::Unique,
311            2 => IndexKind::NonUnique,
312            _ => {
313                return Err(DbError::Format(FormatError::InvalidCatalogPayload {
314                    message: format!("unknown index kind tag {kind_tag}"),
315                }))
316            }
317        };
318        let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
319            let op_tag = cur.take_u8()?;
320            match op_tag {
321                1 => IndexOp::Insert,
322                2 => IndexOp::Delete,
323                _ => {
324                    return Err(DbError::Format(FormatError::InvalidCatalogPayload {
325                        message: format!("unknown index op tag {op_tag}"),
326                    }))
327                }
328            }
329        } else {
330            IndexOp::Insert
331        };
332        let index_name = decode_string(&mut cur)?;
333        let index_key = decode_bytes(&mut cur)?;
334        let pk_key = decode_bytes(&mut cur)?;
335        v.push(IndexEntry {
336            collection_id,
337            index_name,
338            kind,
339            op,
340            index_key,
341            pk_key,
342        });
343    }
344    if cur.remaining() != 0 {
345        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
346            message: "trailing bytes in index payload".to_string(),
347        }));
348    }
349    Ok(v)
350}
351
352fn encode_string(out: &mut Vec<u8>, s: &str) {
353    let b = s.as_bytes();
354    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
355    out.extend_from_slice(b);
356}
357
358fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
359    let n = cur.take_u32()? as usize;
360    if n == 0 {
361        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
362            message: "empty index name".to_string(),
363        }));
364    }
365    let bytes = cur.take_bytes(n)?;
366    String::from_utf8(bytes).map_err(|_| {
367        DbError::Format(FormatError::InvalidCatalogPayload {
368            message: "invalid utf-8 in index name".to_string(),
369        })
370    })
371}
372
373fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
374    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
375    out.extend_from_slice(b);
376}
377
378fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
379    let n = cur.take_u32()? as usize;
380    cur.take_bytes(n)
381}
382
383struct Cursor<'a> {
384    bytes: &'a [u8],
385    pos: usize,
386}
387
388impl<'a> Cursor<'a> {
389    fn new(bytes: &'a [u8]) -> Self {
390        Self { bytes, pos: 0 }
391    }
392
393    fn remaining(&self) -> usize {
394        self.bytes.len().saturating_sub(self.pos)
395    }
396
397    fn take_u8(&mut self) -> Result<u8, DbError> {
398        if self.pos >= self.bytes.len() {
399            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
400                message: "unexpected eof".to_string(),
401            }));
402        }
403        let b = self.bytes[self.pos];
404        self.pos += 1;
405        Ok(b)
406    }
407
408    fn take_u16(&mut self) -> Result<u16, DbError> {
409        if self.remaining() < 2 {
410            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
411                message: "unexpected eof".to_string(),
412            }));
413        }
414        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
415        self.pos += 2;
416        Ok(v)
417    }
418
419    fn take_u32(&mut self) -> Result<u32, DbError> {
420        if self.remaining() < 4 {
421            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
422                message: "unexpected eof".to_string(),
423            }));
424        }
425        let v = u32::from_le_bytes([
426            self.bytes[self.pos],
427            self.bytes[self.pos + 1],
428            self.bytes[self.pos + 2],
429            self.bytes[self.pos + 3],
430        ]);
431        self.pos += 4;
432        Ok(v)
433    }
434
435    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
436        check_field_bytes_len(n)?;
437        if self.remaining() < n {
438            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
439                message: "unexpected eof".to_string(),
440            }));
441        }
442        let slice = &self.bytes[self.pos..self.pos + n];
443        self.pos += n;
444        Ok(slice.to_vec())
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    include!(concat!(
451        env!("CARGO_MANIFEST_DIR"),
452        "/tests/unit/src_index_tests.rs"
453    ));
454}