Skip to main content

modelvault_core/
index.rs

1//! Persisted secondary index segments: payload codec and in-memory replay state.
2
3use std::collections::{BTreeSet, HashMap};
4
5use crate::error::{DbError, FormatError, SchemaError};
6use crate::schema::IndexKind;
7
8pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
9pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
10pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
11
12type IndexName = String;
13type IndexKey = Vec<u8>;
14type PkKey = Vec<u8>;
15type IndexId = (u32, IndexName);
16type UniqueIndex = HashMap<IndexKey, PkKey>;
17type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
18
19/// Index delta operation.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum IndexOp {
22    Insert,
23    Delete,
24}
25
26/// One index update entry (insert/update/delete).
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct IndexEntry {
29    pub collection_id: u32,
30    pub index_name: String,
31    pub kind: IndexKind,
32    pub op: IndexOp,
33    pub index_key: Vec<u8>,
34    pub pk_key: Vec<u8>,
35}
36
37#[derive(Debug, Default, Clone)]
38pub struct IndexState {
39    unique: HashMap<IndexId, UniqueIndex>,
40    non_unique: HashMap<IndexId, NonUniqueIndex>,
41}
42
43impl IndexState {
44    pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
45        match entry.kind {
46            IndexKind::Unique => {
47                let m = self
48                    .unique
49                    .entry((entry.collection_id, entry.index_name))
50                    .or_default();
51                match entry.op {
52                    IndexOp::Insert => match m.get(&entry.index_key) {
53                        None => {
54                            m.insert(entry.index_key, entry.pk_key);
55                            Ok(())
56                        }
57                        Some(existing) if *existing == entry.pk_key => Ok(()),
58                        Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
59                    },
60                    IndexOp::Delete => match m.get(&entry.index_key) {
61                        None => Ok(()),
62                        Some(existing) if *existing == entry.pk_key => {
63                            m.remove(&entry.index_key);
64                            Ok(())
65                        }
66                        Some(_) => Ok(()),
67                    },
68                }
69            }
70            IndexKind::NonUnique => {
71                let m = self
72                    .non_unique
73                    .entry((entry.collection_id, entry.index_name))
74                    .or_default();
75                match entry.op {
76                    IndexOp::Insert => {
77                        m.entry(entry.index_key).or_default().insert(entry.pk_key);
78                    }
79                    IndexOp::Delete => {
80                        if let Some(set) = m.get_mut(&entry.index_key) {
81                            set.remove(&entry.pk_key);
82                            if set.is_empty() {
83                                m.remove(&entry.index_key);
84                            }
85                        }
86                    }
87                }
88                Ok(())
89            }
90        }
91    }
92
93    pub fn unique_lookup(
94        &self,
95        collection_id: u32,
96        index_name: &str,
97        index_key: &[u8],
98    ) -> Option<&[u8]> {
99        self.unique
100            .get(&(collection_id, index_name.to_string()))?
101            .get(index_key)
102            .map(|v| v.as_slice())
103    }
104
105    pub fn non_unique_lookup(
106        &self,
107        collection_id: u32,
108        index_name: &str,
109        index_key: &[u8],
110    ) -> Option<Vec<Vec<u8>>> {
111        let set = self
112            .non_unique
113            .get(&(collection_id, index_name.to_string()))?
114            .get(index_key)?;
115        Some(set.iter().cloned().collect())
116    }
117
118    pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
119        let mut out = Vec::new();
120        for ((collection_id, index_name), m) in &self.unique {
121            for (index_key, pk_key) in m {
122                out.push(IndexEntry {
123                    collection_id: *collection_id,
124                    index_name: index_name.clone(),
125                    kind: IndexKind::Unique,
126                    op: IndexOp::Insert,
127                    index_key: index_key.clone(),
128                    pk_key: pk_key.clone(),
129                });
130            }
131        }
132        for ((collection_id, index_name), m) in &self.non_unique {
133            for (index_key, set) in m {
134                for pk_key in set {
135                    out.push(IndexEntry {
136                        collection_id: *collection_id,
137                        index_name: index_name.clone(),
138                        kind: IndexKind::NonUnique,
139                        op: IndexOp::Insert,
140                        index_key: index_key.clone(),
141                        pk_key: pk_key.clone(),
142                    });
143                }
144            }
145        }
146        out
147    }
148}
149
150pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
151    let mut out = Vec::new();
152    out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
153    out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
154    for e in entries {
155        out.extend_from_slice(&e.collection_id.to_le_bytes());
156        out.push(match e.kind {
157            IndexKind::Unique => 1,
158            IndexKind::NonUnique => 2,
159        });
160        out.push(match e.op {
161            IndexOp::Insert => 1,
162            IndexOp::Delete => 2,
163        });
164        encode_string(&mut out, &e.index_name);
165        encode_bytes(&mut out, &e.index_key);
166        encode_bytes(&mut out, &e.pk_key);
167    }
168    out
169}
170
171pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
172    let mut cur = Cursor::new(bytes);
173    let ver = cur.take_u16()?;
174    if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
175        return Err(DbError::Format(FormatError::UnsupportedVersion {
176            major: 0,
177            minor: ver,
178        }));
179    }
180    let n = cur.take_u32()? as usize;
181    let mut v = Vec::with_capacity(n.min(1024));
182    for _ in 0..n {
183        let collection_id = cur.take_u32()?;
184        let kind_tag = cur.take_u8()?;
185        let kind = match kind_tag {
186            1 => IndexKind::Unique,
187            2 => IndexKind::NonUnique,
188            _ => {
189                return Err(DbError::Format(FormatError::InvalidCatalogPayload {
190                    message: format!("unknown index kind tag {kind_tag}"),
191                }))
192            }
193        };
194        let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
195            let op_tag = cur.take_u8()?;
196            match op_tag {
197                1 => IndexOp::Insert,
198                2 => IndexOp::Delete,
199                _ => {
200                    return Err(DbError::Format(FormatError::InvalidCatalogPayload {
201                        message: format!("unknown index op tag {op_tag}"),
202                    }))
203                }
204            }
205        } else {
206            IndexOp::Insert
207        };
208        let index_name = decode_string(&mut cur)?;
209        let index_key = decode_bytes(&mut cur)?;
210        let pk_key = decode_bytes(&mut cur)?;
211        v.push(IndexEntry {
212            collection_id,
213            index_name,
214            kind,
215            op,
216            index_key,
217            pk_key,
218        });
219    }
220    if cur.remaining() != 0 {
221        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
222            message: "trailing bytes in index payload".to_string(),
223        }));
224    }
225    Ok(v)
226}
227
228fn encode_string(out: &mut Vec<u8>, s: &str) {
229    let b = s.as_bytes();
230    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
231    out.extend_from_slice(b);
232}
233
234fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
235    let n = cur.take_u32()? as usize;
236    if n == 0 {
237        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
238            message: "empty index name".to_string(),
239        }));
240    }
241    let bytes = cur.take_bytes(n)?;
242    String::from_utf8(bytes).map_err(|_| {
243        DbError::Format(FormatError::InvalidCatalogPayload {
244            message: "invalid utf-8 in index name".to_string(),
245        })
246    })
247}
248
249fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
250    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
251    out.extend_from_slice(b);
252}
253
254fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
255    let n = cur.take_u32()? as usize;
256    cur.take_bytes(n)
257}
258
259struct Cursor<'a> {
260    bytes: &'a [u8],
261    pos: usize,
262}
263
264impl<'a> Cursor<'a> {
265    fn new(bytes: &'a [u8]) -> Self {
266        Self { bytes, pos: 0 }
267    }
268
269    fn remaining(&self) -> usize {
270        self.bytes.len().saturating_sub(self.pos)
271    }
272
273    fn take_u8(&mut self) -> Result<u8, DbError> {
274        if self.pos >= self.bytes.len() {
275            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
276                message: "unexpected eof".to_string(),
277            }));
278        }
279        let b = self.bytes[self.pos];
280        self.pos += 1;
281        Ok(b)
282    }
283
284    fn take_u16(&mut self) -> Result<u16, DbError> {
285        if self.remaining() < 2 {
286            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
287                message: "unexpected eof".to_string(),
288            }));
289        }
290        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
291        self.pos += 2;
292        Ok(v)
293    }
294
295    fn take_u32(&mut self) -> Result<u32, DbError> {
296        if self.remaining() < 4 {
297            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
298                message: "unexpected eof".to_string(),
299            }));
300        }
301        let v = u32::from_le_bytes([
302            self.bytes[self.pos],
303            self.bytes[self.pos + 1],
304            self.bytes[self.pos + 2],
305            self.bytes[self.pos + 3],
306        ]);
307        self.pos += 4;
308        Ok(v)
309    }
310
311    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
312        if self.remaining() < n {
313            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
314                message: "unexpected eof".to_string(),
315            }));
316        }
317        let slice = &self.bytes[self.pos..self.pos + n];
318        self.pos += n;
319        Ok(slice.to_vec())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    include!(concat!(
326        env!("CARGO_MANIFEST_DIR"),
327        "/tests/unit/src_index_tests.rs"
328    ));
329}