1use std::collections::{BTreeSet, HashMap};
4
5use crate::error::{DbError, FormatError, SchemaError};
6use crate::file_format::{check_decode_entry_count, MAX_SEGMENT_DECODE_ENTRIES};
7use crate::schema::IndexKind;
8
9pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
10pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
11pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
12
13type IndexName = String;
14type IndexKey = Vec<u8>;
15type PkKey = Vec<u8>;
16type IndexId = (u32, IndexName);
17type UniqueIndex = HashMap<IndexKey, PkKey>;
18type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum IndexOp {
23 Insert,
24 Delete,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct IndexEntry {
30 pub collection_id: u32,
31 pub index_name: String,
32 pub kind: IndexKind,
33 pub op: IndexOp,
34 pub index_key: Vec<u8>,
35 pub pk_key: Vec<u8>,
36}
37
38#[derive(Debug, Default, Clone)]
39pub struct IndexState {
40 unique: HashMap<IndexId, UniqueIndex>,
41 non_unique: HashMap<IndexId, NonUniqueIndex>,
42}
43
44impl IndexState {
45 pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
46 match entry.kind {
47 IndexKind::Unique => {
48 let m = self
49 .unique
50 .entry((entry.collection_id, entry.index_name))
51 .or_default();
52 match entry.op {
53 IndexOp::Insert => match m.get(&entry.index_key) {
54 None => {
55 m.insert(entry.index_key, entry.pk_key);
56 Ok(())
57 }
58 Some(existing) if *existing == entry.pk_key => Ok(()),
59 Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
60 },
61 IndexOp::Delete => match m.get(&entry.index_key) {
62 None => Ok(()),
63 Some(existing) if *existing == entry.pk_key => {
64 m.remove(&entry.index_key);
65 Ok(())
66 }
67 Some(_) => Ok(()),
68 },
69 }
70 }
71 IndexKind::NonUnique => {
72 let m = self
73 .non_unique
74 .entry((entry.collection_id, entry.index_name))
75 .or_default();
76 match entry.op {
77 IndexOp::Insert => {
78 m.entry(entry.index_key).or_default().insert(entry.pk_key);
79 }
80 IndexOp::Delete => {
81 if let Some(set) = m.get_mut(&entry.index_key) {
82 set.remove(&entry.pk_key);
83 if set.is_empty() {
84 m.remove(&entry.index_key);
85 }
86 }
87 }
88 }
89 Ok(())
90 }
91 }
92 }
93
94 pub fn unique_lookup(
95 &self,
96 collection_id: u32,
97 index_name: &str,
98 index_key: &[u8],
99 ) -> Option<&[u8]> {
100 self.unique
101 .get(&(collection_id, index_name.to_string()))?
102 .get(index_key)
103 .map(|v| v.as_slice())
104 }
105
106 pub fn non_unique_lookup(
107 &self,
108 collection_id: u32,
109 index_name: &str,
110 index_key: &[u8],
111 ) -> Option<Vec<Vec<u8>>> {
112 let set = self
113 .non_unique
114 .get(&(collection_id, index_name.to_string()))?
115 .get(index_key)?;
116 Some(set.iter().cloned().collect())
117 }
118
119 pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
120 let mut out = Vec::new();
121 for ((collection_id, index_name), m) in &self.unique {
122 for (index_key, pk_key) in m {
123 out.push(IndexEntry {
124 collection_id: *collection_id,
125 index_name: index_name.clone(),
126 kind: IndexKind::Unique,
127 op: IndexOp::Insert,
128 index_key: index_key.clone(),
129 pk_key: pk_key.clone(),
130 });
131 }
132 }
133 for ((collection_id, index_name), m) in &self.non_unique {
134 for (index_key, set) in m {
135 for pk_key in set {
136 out.push(IndexEntry {
137 collection_id: *collection_id,
138 index_name: index_name.clone(),
139 kind: IndexKind::NonUnique,
140 op: IndexOp::Insert,
141 index_key: index_key.clone(),
142 pk_key: pk_key.clone(),
143 });
144 }
145 }
146 }
147 out
148 }
149}
150
151pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
152 let mut out = Vec::new();
153 out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
154 out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
155 for e in entries {
156 out.extend_from_slice(&e.collection_id.to_le_bytes());
157 out.push(match e.kind {
158 IndexKind::Unique => 1,
159 IndexKind::NonUnique => 2,
160 });
161 out.push(match e.op {
162 IndexOp::Insert => 1,
163 IndexOp::Delete => 2,
164 });
165 encode_string(&mut out, &e.index_name);
166 encode_bytes(&mut out, &e.index_key);
167 encode_bytes(&mut out, &e.pk_key);
168 }
169 out
170}
171
172pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
173 let mut cur = Cursor::new(bytes);
174 let ver = cur.take_u16()?;
175 if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
176 return Err(DbError::Format(FormatError::UnsupportedVersion {
177 major: 0,
178 minor: ver,
179 }));
180 }
181 let n = cur.take_u32()? as usize;
182 check_decode_entry_count(n)?;
183 let mut v = Vec::with_capacity(n.min(MAX_SEGMENT_DECODE_ENTRIES));
184 for _ in 0..n {
185 let collection_id = cur.take_u32()?;
186 let kind_tag = cur.take_u8()?;
187 let kind = match kind_tag {
188 1 => IndexKind::Unique,
189 2 => IndexKind::NonUnique,
190 _ => {
191 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
192 message: format!("unknown index kind tag {kind_tag}"),
193 }))
194 }
195 };
196 let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
197 let op_tag = cur.take_u8()?;
198 match op_tag {
199 1 => IndexOp::Insert,
200 2 => IndexOp::Delete,
201 _ => {
202 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
203 message: format!("unknown index op tag {op_tag}"),
204 }))
205 }
206 }
207 } else {
208 IndexOp::Insert
209 };
210 let index_name = decode_string(&mut cur)?;
211 let index_key = decode_bytes(&mut cur)?;
212 let pk_key = decode_bytes(&mut cur)?;
213 v.push(IndexEntry {
214 collection_id,
215 index_name,
216 kind,
217 op,
218 index_key,
219 pk_key,
220 });
221 }
222 if cur.remaining() != 0 {
223 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
224 message: "trailing bytes in index payload".to_string(),
225 }));
226 }
227 Ok(v)
228}
229
230fn encode_string(out: &mut Vec<u8>, s: &str) {
231 let b = s.as_bytes();
232 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
233 out.extend_from_slice(b);
234}
235
236fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
237 let n = cur.take_u32()? as usize;
238 if n == 0 {
239 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
240 message: "empty index name".to_string(),
241 }));
242 }
243 let bytes = cur.take_bytes(n)?;
244 String::from_utf8(bytes).map_err(|_| {
245 DbError::Format(FormatError::InvalidCatalogPayload {
246 message: "invalid utf-8 in index name".to_string(),
247 })
248 })
249}
250
251fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
252 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
253 out.extend_from_slice(b);
254}
255
256fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
257 let n = cur.take_u32()? as usize;
258 cur.take_bytes(n)
259}
260
261struct Cursor<'a> {
262 bytes: &'a [u8],
263 pos: usize,
264}
265
266impl<'a> Cursor<'a> {
267 fn new(bytes: &'a [u8]) -> Self {
268 Self { bytes, pos: 0 }
269 }
270
271 fn remaining(&self) -> usize {
272 self.bytes.len().saturating_sub(self.pos)
273 }
274
275 fn take_u8(&mut self) -> Result<u8, DbError> {
276 if self.pos >= self.bytes.len() {
277 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
278 message: "unexpected eof".to_string(),
279 }));
280 }
281 let b = self.bytes[self.pos];
282 self.pos += 1;
283 Ok(b)
284 }
285
286 fn take_u16(&mut self) -> Result<u16, DbError> {
287 if self.remaining() < 2 {
288 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
289 message: "unexpected eof".to_string(),
290 }));
291 }
292 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
293 self.pos += 2;
294 Ok(v)
295 }
296
297 fn take_u32(&mut self) -> Result<u32, DbError> {
298 if self.remaining() < 4 {
299 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
300 message: "unexpected eof".to_string(),
301 }));
302 }
303 let v = u32::from_le_bytes([
304 self.bytes[self.pos],
305 self.bytes[self.pos + 1],
306 self.bytes[self.pos + 2],
307 self.bytes[self.pos + 3],
308 ]);
309 self.pos += 4;
310 Ok(v)
311 }
312
313 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
314 if self.remaining() < n {
315 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
316 message: "unexpected eof".to_string(),
317 }));
318 }
319 let slice = &self.bytes[self.pos..self.pos + n];
320 self.pos += n;
321 Ok(slice.to_vec())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 include!(concat!(
328 env!("CARGO_MANIFEST_DIR"),
329 "/tests/unit/src_index_tests.rs"
330 ));
331}