1use std::collections::{BTreeSet, HashMap};
4
5use std::cmp::Ordering;
6
7use crate::error::{DbError, FormatError, SchemaError};
8use crate::file_format::{
9 check_decode_entry_count, check_field_bytes_len, MAX_SEGMENT_DECODE_ENTRIES,
10};
11use crate::schema::IndexKind;
12use crate::ScalarValue;
13
14pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
15pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
16pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
17
18type IndexName = String;
19type IndexKey = Vec<u8>;
20type PkKey = Vec<u8>;
21type IndexId = (u32, IndexName);
22type UniqueIndex = HashMap<IndexKey, PkKey>;
23type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum IndexOp {
28 Insert,
29 Delete,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct IndexEntry {
35 pub collection_id: u32,
36 pub index_name: String,
37 pub kind: IndexKind,
38 pub op: IndexOp,
39 pub index_key: Vec<u8>,
40 pub pk_key: Vec<u8>,
41}
42
43#[derive(Debug, Default, Clone)]
44pub struct IndexState {
45 unique: HashMap<IndexId, UniqueIndex>,
46 non_unique: HashMap<IndexId, NonUniqueIndex>,
47}
48
49impl IndexState {
50 pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
51 match entry.kind {
52 IndexKind::Unique => {
53 let m = self
54 .unique
55 .entry((entry.collection_id, entry.index_name))
56 .or_default();
57 match entry.op {
58 IndexOp::Insert => match m.get(&entry.index_key) {
59 None => {
60 m.insert(entry.index_key, entry.pk_key);
61 Ok(())
62 }
63 Some(existing) if *existing == entry.pk_key => Ok(()),
64 Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
65 },
66 IndexOp::Delete => match m.get(&entry.index_key) {
67 None => Ok(()),
68 Some(existing) if *existing == entry.pk_key => {
69 m.remove(&entry.index_key);
70 Ok(())
71 }
72 Some(_) => Err(DbError::Format(FormatError::InvalidCatalogPayload {
73 message: "unique index delete pk_key mismatch".into(),
74 })),
75 },
76 }
77 }
78 IndexKind::NonUnique => {
79 let m = self
80 .non_unique
81 .entry((entry.collection_id, entry.index_name))
82 .or_default();
83 match entry.op {
84 IndexOp::Insert => {
85 m.entry(entry.index_key).or_default().insert(entry.pk_key);
86 }
87 IndexOp::Delete => {
88 if let Some(set) = m.get_mut(&entry.index_key) {
89 set.remove(&entry.pk_key);
90 if set.is_empty() {
91 m.remove(&entry.index_key);
92 }
93 }
94 }
95 }
96 Ok(())
97 }
98 }
99 }
100
101 pub fn unique_lookup(
102 &self,
103 collection_id: u32,
104 index_name: &str,
105 index_key: &[u8],
106 ) -> Option<&[u8]> {
107 self.unique
108 .get(&(collection_id, index_name.to_string()))?
109 .get(index_key)
110 .map(|v| v.as_slice())
111 }
112
113 pub fn non_unique_lookup(
114 &self,
115 collection_id: u32,
116 index_name: &str,
117 index_key: &[u8],
118 ) -> Option<Vec<Vec<u8>>> {
119 let set = self
120 .non_unique
121 .get(&(collection_id, index_name.to_string()))?
122 .get(index_key)?;
123 Some(set.iter().cloned().collect())
124 }
125
126 pub fn non_unique_range_lookup(
128 &self,
129 collection_id: u32,
130 index_name: &str,
131 lo: Option<&ScalarValue>,
132 lo_inclusive: bool,
133 hi: Option<&ScalarValue>,
134 hi_inclusive: bool,
135 ) -> Vec<Vec<u8>> {
136 let key_hint = lo.or(hi);
137 let Some(m) = self
138 .non_unique
139 .get(&(collection_id, index_name.to_string()))
140 else {
141 return Vec::new();
142 };
143 let mut out = Vec::new();
144 for (index_key, set) in m {
145 if !index_key_in_range(index_key, key_hint, lo, lo_inclusive, hi, hi_inclusive) {
146 continue;
147 }
148 out.extend(set.iter().cloned());
149 }
150 out
151 }
152
153 pub fn unique_range_lookup(
155 &self,
156 collection_id: u32,
157 index_name: &str,
158 lo: Option<&ScalarValue>,
159 lo_inclusive: bool,
160 hi: Option<&ScalarValue>,
161 hi_inclusive: bool,
162 ) -> Vec<Vec<u8>> {
163 let key_hint = lo.or(hi);
164 let Some(m) = self.unique.get(&(collection_id, index_name.to_string())) else {
165 return Vec::new();
166 };
167 m.iter()
168 .filter(|(index_key, _)| {
169 index_key_in_range(index_key, key_hint, lo, lo_inclusive, hi, hi_inclusive)
170 })
171 .map(|(_, pk)| pk.clone())
172 .collect()
173 }
174
175 pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
176 let mut out = Vec::new();
177 for ((collection_id, index_name), m) in &self.unique {
178 for (index_key, pk_key) in m {
179 out.push(IndexEntry {
180 collection_id: *collection_id,
181 index_name: index_name.clone(),
182 kind: IndexKind::Unique,
183 op: IndexOp::Insert,
184 index_key: index_key.clone(),
185 pk_key: pk_key.clone(),
186 });
187 }
188 }
189 for ((collection_id, index_name), m) in &self.non_unique {
190 for (index_key, set) in m {
191 for pk_key in set {
192 out.push(IndexEntry {
193 collection_id: *collection_id,
194 index_name: index_name.clone(),
195 kind: IndexKind::NonUnique,
196 op: IndexOp::Insert,
197 index_key: index_key.clone(),
198 pk_key: pk_key.clone(),
199 });
200 }
201 }
202 }
203 out
204 }
205}
206
207fn decode_index_key_scalar(key: &[u8], hint: Option<&ScalarValue>) -> Option<ScalarValue> {
208 match key.len() {
209 8 => {
210 let bytes: [u8; 8] = key.try_into().ok()?;
211 Some(match hint {
212 Some(ScalarValue::Uint64(_)) => ScalarValue::Uint64(u64::from_le_bytes(bytes)),
213 Some(ScalarValue::Int64(_)) => ScalarValue::Int64(i64::from_le_bytes(bytes)),
214 Some(ScalarValue::Float64(_)) => ScalarValue::Float64(f64::from_le_bytes(bytes)),
215 Some(ScalarValue::Timestamp(_)) => {
216 ScalarValue::Timestamp(i64::from_le_bytes(bytes))
217 }
218 _ => ScalarValue::Int64(i64::from_le_bytes(bytes)),
219 })
220 }
221 n if n > 0 => String::from_utf8(key.to_vec())
222 .ok()
223 .map(ScalarValue::String),
224 _ => None,
225 }
226}
227
228fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<Ordering> {
229 use ScalarValue::*;
230 match (a, b) {
231 (Bool(x), Bool(y)) => Some(x.cmp(y)),
232 (Int64(x), Int64(y)) => Some(x.cmp(y)),
233 (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
234 (Float64(x), Float64(y)) => x.partial_cmp(y),
235 (String(x), String(y)) => Some(x.cmp(y)),
236 (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
237 (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
238 (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
239 _ => None,
240 }
241}
242
243fn index_key_in_range(
244 key: &[u8],
245 key_hint: Option<&ScalarValue>,
246 lo: Option<&ScalarValue>,
247 lo_inclusive: bool,
248 hi: Option<&ScalarValue>,
249 hi_inclusive: bool,
250) -> bool {
251 let Some(decoded) = decode_index_key_scalar(key, key_hint) else {
252 return false;
253 };
254 if let Some(lo_v) = lo {
255 match scalar_partial_cmp(&decoded, lo_v) {
256 Some(Ordering::Less) => return false,
257 Some(Ordering::Equal) if !lo_inclusive => return false,
258 None => return false,
259 _ => {}
260 }
261 }
262 if let Some(hi_v) = hi {
263 match scalar_partial_cmp(&decoded, hi_v) {
264 Some(Ordering::Greater) => return false,
265 Some(Ordering::Equal) if !hi_inclusive => return false,
266 None => return false,
267 _ => {}
268 }
269 }
270 true
271}
272
273pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
274 let mut out = Vec::new();
275 out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
276 out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
277 for e in entries {
278 out.extend_from_slice(&e.collection_id.to_le_bytes());
279 out.push(match e.kind {
280 IndexKind::Unique => 1,
281 IndexKind::NonUnique => 2,
282 });
283 out.push(match e.op {
284 IndexOp::Insert => 1,
285 IndexOp::Delete => 2,
286 });
287 encode_string(&mut out, &e.index_name);
288 encode_bytes(&mut out, &e.index_key);
289 encode_bytes(&mut out, &e.pk_key);
290 }
291 out
292}
293
294pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
295 let mut cur = Cursor::new(bytes);
296 let ver = cur.take_u16()?;
297 if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
298 return Err(DbError::Format(FormatError::UnsupportedVersion {
299 major: 0,
300 minor: ver,
301 }));
302 }
303 let n = cur.take_u32()? as usize;
304 check_decode_entry_count(n)?;
305 let mut v = Vec::with_capacity(n.min(MAX_SEGMENT_DECODE_ENTRIES));
306 for _ in 0..n {
307 let collection_id = cur.take_u32()?;
308 let kind_tag = cur.take_u8()?;
309 let kind = match kind_tag {
310 1 => IndexKind::Unique,
311 2 => IndexKind::NonUnique,
312 _ => {
313 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
314 message: format!("unknown index kind tag {kind_tag}"),
315 }))
316 }
317 };
318 let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
319 let op_tag = cur.take_u8()?;
320 match op_tag {
321 1 => IndexOp::Insert,
322 2 => IndexOp::Delete,
323 _ => {
324 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
325 message: format!("unknown index op tag {op_tag}"),
326 }))
327 }
328 }
329 } else {
330 IndexOp::Insert
331 };
332 let index_name = decode_string(&mut cur)?;
333 let index_key = decode_bytes(&mut cur)?;
334 let pk_key = decode_bytes(&mut cur)?;
335 v.push(IndexEntry {
336 collection_id,
337 index_name,
338 kind,
339 op,
340 index_key,
341 pk_key,
342 });
343 }
344 if cur.remaining() != 0 {
345 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
346 message: "trailing bytes in index payload".to_string(),
347 }));
348 }
349 Ok(v)
350}
351
352fn encode_string(out: &mut Vec<u8>, s: &str) {
353 let b = s.as_bytes();
354 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
355 out.extend_from_slice(b);
356}
357
358fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
359 let n = cur.take_u32()? as usize;
360 if n == 0 {
361 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
362 message: "empty index name".to_string(),
363 }));
364 }
365 let bytes = cur.take_bytes(n)?;
366 String::from_utf8(bytes).map_err(|_| {
367 DbError::Format(FormatError::InvalidCatalogPayload {
368 message: "invalid utf-8 in index name".to_string(),
369 })
370 })
371}
372
373fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
374 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
375 out.extend_from_slice(b);
376}
377
378fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
379 let n = cur.take_u32()? as usize;
380 cur.take_bytes(n)
381}
382
383struct Cursor<'a> {
384 bytes: &'a [u8],
385 pos: usize,
386}
387
388impl<'a> Cursor<'a> {
389 fn new(bytes: &'a [u8]) -> Self {
390 Self { bytes, pos: 0 }
391 }
392
393 fn remaining(&self) -> usize {
394 self.bytes.len().saturating_sub(self.pos)
395 }
396
397 fn take_u8(&mut self) -> Result<u8, DbError> {
398 if self.pos >= self.bytes.len() {
399 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
400 message: "unexpected eof".to_string(),
401 }));
402 }
403 let b = self.bytes[self.pos];
404 self.pos += 1;
405 Ok(b)
406 }
407
408 fn take_u16(&mut self) -> Result<u16, DbError> {
409 if self.remaining() < 2 {
410 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
411 message: "unexpected eof".to_string(),
412 }));
413 }
414 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
415 self.pos += 2;
416 Ok(v)
417 }
418
419 fn take_u32(&mut self) -> Result<u32, DbError> {
420 if self.remaining() < 4 {
421 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
422 message: "unexpected eof".to_string(),
423 }));
424 }
425 let v = u32::from_le_bytes([
426 self.bytes[self.pos],
427 self.bytes[self.pos + 1],
428 self.bytes[self.pos + 2],
429 self.bytes[self.pos + 3],
430 ]);
431 self.pos += 4;
432 Ok(v)
433 }
434
435 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
436 check_field_bytes_len(n)?;
437 if self.remaining() < n {
438 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
439 message: "unexpected eof".to_string(),
440 }));
441 }
442 let slice = &self.bytes[self.pos..self.pos + n];
443 self.pos += n;
444 Ok(slice.to_vec())
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 include!(concat!(
451 env!("CARGO_MANIFEST_DIR"),
452 "/tests/unit/src_index_tests.rs"
453 ));
454}