Skip to main content

modelvault_core/
index.rs

1//! Persisted secondary index segments: payload codec and in-memory replay state.
2
3use std::collections::{BTreeSet, HashMap};
4
5use crate::error::{DbError, FormatError, SchemaError};
6use crate::file_format::{check_decode_entry_count, MAX_SEGMENT_DECODE_ENTRIES};
7use crate::schema::IndexKind;
8
9pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
10pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
11pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
12
13type IndexName = String;
14type IndexKey = Vec<u8>;
15type PkKey = Vec<u8>;
16type IndexId = (u32, IndexName);
17type UniqueIndex = HashMap<IndexKey, PkKey>;
18type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
19
20/// Index delta operation.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum IndexOp {
23    Insert,
24    Delete,
25}
26
27/// One index update entry (insert/update/delete).
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct IndexEntry {
30    pub collection_id: u32,
31    pub index_name: String,
32    pub kind: IndexKind,
33    pub op: IndexOp,
34    pub index_key: Vec<u8>,
35    pub pk_key: Vec<u8>,
36}
37
38#[derive(Debug, Default, Clone)]
39pub struct IndexState {
40    unique: HashMap<IndexId, UniqueIndex>,
41    non_unique: HashMap<IndexId, NonUniqueIndex>,
42}
43
44impl IndexState {
45    pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
46        match entry.kind {
47            IndexKind::Unique => {
48                let m = self
49                    .unique
50                    .entry((entry.collection_id, entry.index_name))
51                    .or_default();
52                match entry.op {
53                    IndexOp::Insert => match m.get(&entry.index_key) {
54                        None => {
55                            m.insert(entry.index_key, entry.pk_key);
56                            Ok(())
57                        }
58                        Some(existing) if *existing == entry.pk_key => Ok(()),
59                        Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
60                    },
61                    IndexOp::Delete => match m.get(&entry.index_key) {
62                        None => Ok(()),
63                        Some(existing) if *existing == entry.pk_key => {
64                            m.remove(&entry.index_key);
65                            Ok(())
66                        }
67                        Some(_) => Ok(()),
68                    },
69                }
70            }
71            IndexKind::NonUnique => {
72                let m = self
73                    .non_unique
74                    .entry((entry.collection_id, entry.index_name))
75                    .or_default();
76                match entry.op {
77                    IndexOp::Insert => {
78                        m.entry(entry.index_key).or_default().insert(entry.pk_key);
79                    }
80                    IndexOp::Delete => {
81                        if let Some(set) = m.get_mut(&entry.index_key) {
82                            set.remove(&entry.pk_key);
83                            if set.is_empty() {
84                                m.remove(&entry.index_key);
85                            }
86                        }
87                    }
88                }
89                Ok(())
90            }
91        }
92    }
93
94    pub fn unique_lookup(
95        &self,
96        collection_id: u32,
97        index_name: &str,
98        index_key: &[u8],
99    ) -> Option<&[u8]> {
100        self.unique
101            .get(&(collection_id, index_name.to_string()))?
102            .get(index_key)
103            .map(|v| v.as_slice())
104    }
105
106    pub fn non_unique_lookup(
107        &self,
108        collection_id: u32,
109        index_name: &str,
110        index_key: &[u8],
111    ) -> Option<Vec<Vec<u8>>> {
112        let set = self
113            .non_unique
114            .get(&(collection_id, index_name.to_string()))?
115            .get(index_key)?;
116        Some(set.iter().cloned().collect())
117    }
118
119    pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
120        let mut out = Vec::new();
121        for ((collection_id, index_name), m) in &self.unique {
122            for (index_key, pk_key) in m {
123                out.push(IndexEntry {
124                    collection_id: *collection_id,
125                    index_name: index_name.clone(),
126                    kind: IndexKind::Unique,
127                    op: IndexOp::Insert,
128                    index_key: index_key.clone(),
129                    pk_key: pk_key.clone(),
130                });
131            }
132        }
133        for ((collection_id, index_name), m) in &self.non_unique {
134            for (index_key, set) in m {
135                for pk_key in set {
136                    out.push(IndexEntry {
137                        collection_id: *collection_id,
138                        index_name: index_name.clone(),
139                        kind: IndexKind::NonUnique,
140                        op: IndexOp::Insert,
141                        index_key: index_key.clone(),
142                        pk_key: pk_key.clone(),
143                    });
144                }
145            }
146        }
147        out
148    }
149}
150
151pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
152    let mut out = Vec::new();
153    out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
154    out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
155    for e in entries {
156        out.extend_from_slice(&e.collection_id.to_le_bytes());
157        out.push(match e.kind {
158            IndexKind::Unique => 1,
159            IndexKind::NonUnique => 2,
160        });
161        out.push(match e.op {
162            IndexOp::Insert => 1,
163            IndexOp::Delete => 2,
164        });
165        encode_string(&mut out, &e.index_name);
166        encode_bytes(&mut out, &e.index_key);
167        encode_bytes(&mut out, &e.pk_key);
168    }
169    out
170}
171
172pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
173    let mut cur = Cursor::new(bytes);
174    let ver = cur.take_u16()?;
175    if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
176        return Err(DbError::Format(FormatError::UnsupportedVersion {
177            major: 0,
178            minor: ver,
179        }));
180    }
181    let n = cur.take_u32()? as usize;
182    check_decode_entry_count(n)?;
183    let mut v = Vec::with_capacity(n.min(MAX_SEGMENT_DECODE_ENTRIES));
184    for _ in 0..n {
185        let collection_id = cur.take_u32()?;
186        let kind_tag = cur.take_u8()?;
187        let kind = match kind_tag {
188            1 => IndexKind::Unique,
189            2 => IndexKind::NonUnique,
190            _ => {
191                return Err(DbError::Format(FormatError::InvalidCatalogPayload {
192                    message: format!("unknown index kind tag {kind_tag}"),
193                }))
194            }
195        };
196        let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
197            let op_tag = cur.take_u8()?;
198            match op_tag {
199                1 => IndexOp::Insert,
200                2 => IndexOp::Delete,
201                _ => {
202                    return Err(DbError::Format(FormatError::InvalidCatalogPayload {
203                        message: format!("unknown index op tag {op_tag}"),
204                    }))
205                }
206            }
207        } else {
208            IndexOp::Insert
209        };
210        let index_name = decode_string(&mut cur)?;
211        let index_key = decode_bytes(&mut cur)?;
212        let pk_key = decode_bytes(&mut cur)?;
213        v.push(IndexEntry {
214            collection_id,
215            index_name,
216            kind,
217            op,
218            index_key,
219            pk_key,
220        });
221    }
222    if cur.remaining() != 0 {
223        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
224            message: "trailing bytes in index payload".to_string(),
225        }));
226    }
227    Ok(v)
228}
229
230fn encode_string(out: &mut Vec<u8>, s: &str) {
231    let b = s.as_bytes();
232    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
233    out.extend_from_slice(b);
234}
235
236fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
237    let n = cur.take_u32()? as usize;
238    if n == 0 {
239        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
240            message: "empty index name".to_string(),
241        }));
242    }
243    let bytes = cur.take_bytes(n)?;
244    String::from_utf8(bytes).map_err(|_| {
245        DbError::Format(FormatError::InvalidCatalogPayload {
246            message: "invalid utf-8 in index name".to_string(),
247        })
248    })
249}
250
251fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
252    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
253    out.extend_from_slice(b);
254}
255
256fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
257    let n = cur.take_u32()? as usize;
258    cur.take_bytes(n)
259}
260
261struct Cursor<'a> {
262    bytes: &'a [u8],
263    pos: usize,
264}
265
266impl<'a> Cursor<'a> {
267    fn new(bytes: &'a [u8]) -> Self {
268        Self { bytes, pos: 0 }
269    }
270
271    fn remaining(&self) -> usize {
272        self.bytes.len().saturating_sub(self.pos)
273    }
274
275    fn take_u8(&mut self) -> Result<u8, DbError> {
276        if self.pos >= self.bytes.len() {
277            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
278                message: "unexpected eof".to_string(),
279            }));
280        }
281        let b = self.bytes[self.pos];
282        self.pos += 1;
283        Ok(b)
284    }
285
286    fn take_u16(&mut self) -> Result<u16, DbError> {
287        if self.remaining() < 2 {
288            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
289                message: "unexpected eof".to_string(),
290            }));
291        }
292        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
293        self.pos += 2;
294        Ok(v)
295    }
296
297    fn take_u32(&mut self) -> Result<u32, DbError> {
298        if self.remaining() < 4 {
299            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
300                message: "unexpected eof".to_string(),
301            }));
302        }
303        let v = u32::from_le_bytes([
304            self.bytes[self.pos],
305            self.bytes[self.pos + 1],
306            self.bytes[self.pos + 2],
307            self.bytes[self.pos + 3],
308        ]);
309        self.pos += 4;
310        Ok(v)
311    }
312
313    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
314        if self.remaining() < n {
315            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
316                message: "unexpected eof".to_string(),
317            }));
318        }
319        let slice = &self.bytes[self.pos..self.pos + n];
320        self.pos += n;
321        Ok(slice.to_vec())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    include!(concat!(
328        env!("CARGO_MANIFEST_DIR"),
329        "/tests/unit/src_index_tests.rs"
330    ));
331}