1use std::collections::{BTreeSet, HashMap};
4
5use crate::error::{DbError, FormatError, SchemaError};
6use crate::schema::IndexKind;
7
8pub const INDEX_PAYLOAD_VERSION_V1: u16 = 1;
9pub const INDEX_PAYLOAD_VERSION_V2: u16 = 2;
10pub const INDEX_PAYLOAD_VERSION: u16 = INDEX_PAYLOAD_VERSION_V2;
11
12type IndexName = String;
13type IndexKey = Vec<u8>;
14type PkKey = Vec<u8>;
15type IndexId = (u32, IndexName);
16type UniqueIndex = HashMap<IndexKey, PkKey>;
17type NonUniqueIndex = HashMap<IndexKey, BTreeSet<PkKey>>;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum IndexOp {
22 Insert,
23 Delete,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct IndexEntry {
29 pub collection_id: u32,
30 pub index_name: String,
31 pub kind: IndexKind,
32 pub op: IndexOp,
33 pub index_key: Vec<u8>,
34 pub pk_key: Vec<u8>,
35}
36
37#[derive(Debug, Default, Clone)]
38pub struct IndexState {
39 unique: HashMap<IndexId, UniqueIndex>,
40 non_unique: HashMap<IndexId, NonUniqueIndex>,
41}
42
43impl IndexState {
44 pub fn apply(&mut self, entry: IndexEntry) -> Result<(), DbError> {
45 match entry.kind {
46 IndexKind::Unique => {
47 let m = self
48 .unique
49 .entry((entry.collection_id, entry.index_name))
50 .or_default();
51 match entry.op {
52 IndexOp::Insert => match m.get(&entry.index_key) {
53 None => {
54 m.insert(entry.index_key, entry.pk_key);
55 Ok(())
56 }
57 Some(existing) if *existing == entry.pk_key => Ok(()),
58 Some(_) => Err(DbError::Schema(SchemaError::UniqueIndexViolation)),
59 },
60 IndexOp::Delete => match m.get(&entry.index_key) {
61 None => Ok(()),
62 Some(existing) if *existing == entry.pk_key => {
63 m.remove(&entry.index_key);
64 Ok(())
65 }
66 Some(_) => Ok(()),
67 },
68 }
69 }
70 IndexKind::NonUnique => {
71 let m = self
72 .non_unique
73 .entry((entry.collection_id, entry.index_name))
74 .or_default();
75 match entry.op {
76 IndexOp::Insert => {
77 m.entry(entry.index_key).or_default().insert(entry.pk_key);
78 }
79 IndexOp::Delete => {
80 if let Some(set) = m.get_mut(&entry.index_key) {
81 set.remove(&entry.pk_key);
82 if set.is_empty() {
83 m.remove(&entry.index_key);
84 }
85 }
86 }
87 }
88 Ok(())
89 }
90 }
91 }
92
93 pub fn unique_lookup(
94 &self,
95 collection_id: u32,
96 index_name: &str,
97 index_key: &[u8],
98 ) -> Option<&[u8]> {
99 self.unique
100 .get(&(collection_id, index_name.to_string()))?
101 .get(index_key)
102 .map(|v| v.as_slice())
103 }
104
105 pub fn non_unique_lookup(
106 &self,
107 collection_id: u32,
108 index_name: &str,
109 index_key: &[u8],
110 ) -> Option<Vec<Vec<u8>>> {
111 let set = self
112 .non_unique
113 .get(&(collection_id, index_name.to_string()))?
114 .get(index_key)?;
115 Some(set.iter().cloned().collect())
116 }
117
118 pub(crate) fn entries_for_checkpoint(&self) -> Vec<IndexEntry> {
119 let mut out = Vec::new();
120 for ((collection_id, index_name), m) in &self.unique {
121 for (index_key, pk_key) in m {
122 out.push(IndexEntry {
123 collection_id: *collection_id,
124 index_name: index_name.clone(),
125 kind: IndexKind::Unique,
126 op: IndexOp::Insert,
127 index_key: index_key.clone(),
128 pk_key: pk_key.clone(),
129 });
130 }
131 }
132 for ((collection_id, index_name), m) in &self.non_unique {
133 for (index_key, set) in m {
134 for pk_key in set {
135 out.push(IndexEntry {
136 collection_id: *collection_id,
137 index_name: index_name.clone(),
138 kind: IndexKind::NonUnique,
139 op: IndexOp::Insert,
140 index_key: index_key.clone(),
141 pk_key: pk_key.clone(),
142 });
143 }
144 }
145 }
146 out
147 }
148}
149
150pub fn encode_index_payload(entries: &[IndexEntry]) -> Vec<u8> {
151 let mut out = Vec::new();
152 out.extend_from_slice(&INDEX_PAYLOAD_VERSION.to_le_bytes());
153 out.extend_from_slice(&(entries.len() as u32).to_le_bytes());
154 for e in entries {
155 out.extend_from_slice(&e.collection_id.to_le_bytes());
156 out.push(match e.kind {
157 IndexKind::Unique => 1,
158 IndexKind::NonUnique => 2,
159 });
160 out.push(match e.op {
161 IndexOp::Insert => 1,
162 IndexOp::Delete => 2,
163 });
164 encode_string(&mut out, &e.index_name);
165 encode_bytes(&mut out, &e.index_key);
166 encode_bytes(&mut out, &e.pk_key);
167 }
168 out
169}
170
171pub fn decode_index_payload(bytes: &[u8]) -> Result<Vec<IndexEntry>, DbError> {
172 let mut cur = Cursor::new(bytes);
173 let ver = cur.take_u16()?;
174 if ver != INDEX_PAYLOAD_VERSION_V1 && ver != INDEX_PAYLOAD_VERSION_V2 {
175 return Err(DbError::Format(FormatError::UnsupportedVersion {
176 major: 0,
177 minor: ver,
178 }));
179 }
180 let n = cur.take_u32()? as usize;
181 let mut v = Vec::with_capacity(n.min(1024));
182 for _ in 0..n {
183 let collection_id = cur.take_u32()?;
184 let kind_tag = cur.take_u8()?;
185 let kind = match kind_tag {
186 1 => IndexKind::Unique,
187 2 => IndexKind::NonUnique,
188 _ => {
189 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
190 message: format!("unknown index kind tag {kind_tag}"),
191 }))
192 }
193 };
194 let op = if ver >= INDEX_PAYLOAD_VERSION_V2 {
195 let op_tag = cur.take_u8()?;
196 match op_tag {
197 1 => IndexOp::Insert,
198 2 => IndexOp::Delete,
199 _ => {
200 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
201 message: format!("unknown index op tag {op_tag}"),
202 }))
203 }
204 }
205 } else {
206 IndexOp::Insert
207 };
208 let index_name = decode_string(&mut cur)?;
209 let index_key = decode_bytes(&mut cur)?;
210 let pk_key = decode_bytes(&mut cur)?;
211 v.push(IndexEntry {
212 collection_id,
213 index_name,
214 kind,
215 op,
216 index_key,
217 pk_key,
218 });
219 }
220 if cur.remaining() != 0 {
221 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
222 message: "trailing bytes in index payload".to_string(),
223 }));
224 }
225 Ok(v)
226}
227
228fn encode_string(out: &mut Vec<u8>, s: &str) {
229 let b = s.as_bytes();
230 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
231 out.extend_from_slice(b);
232}
233
234fn decode_string(cur: &mut Cursor<'_>) -> Result<String, DbError> {
235 let n = cur.take_u32()? as usize;
236 if n == 0 {
237 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
238 message: "empty index name".to_string(),
239 }));
240 }
241 let bytes = cur.take_bytes(n)?;
242 String::from_utf8(bytes).map_err(|_| {
243 DbError::Format(FormatError::InvalidCatalogPayload {
244 message: "invalid utf-8 in index name".to_string(),
245 })
246 })
247}
248
249fn encode_bytes(out: &mut Vec<u8>, b: &[u8]) {
250 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
251 out.extend_from_slice(b);
252}
253
254fn decode_bytes(cur: &mut Cursor<'_>) -> Result<Vec<u8>, DbError> {
255 let n = cur.take_u32()? as usize;
256 cur.take_bytes(n)
257}
258
259struct Cursor<'a> {
260 bytes: &'a [u8],
261 pos: usize,
262}
263
264impl<'a> Cursor<'a> {
265 fn new(bytes: &'a [u8]) -> Self {
266 Self { bytes, pos: 0 }
267 }
268
269 fn remaining(&self) -> usize {
270 self.bytes.len().saturating_sub(self.pos)
271 }
272
273 fn take_u8(&mut self) -> Result<u8, DbError> {
274 if self.pos >= self.bytes.len() {
275 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
276 message: "unexpected eof".to_string(),
277 }));
278 }
279 let b = self.bytes[self.pos];
280 self.pos += 1;
281 Ok(b)
282 }
283
284 fn take_u16(&mut self) -> Result<u16, DbError> {
285 if self.remaining() < 2 {
286 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
287 message: "unexpected eof".to_string(),
288 }));
289 }
290 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
291 self.pos += 2;
292 Ok(v)
293 }
294
295 fn take_u32(&mut self) -> Result<u32, DbError> {
296 if self.remaining() < 4 {
297 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
298 message: "unexpected eof".to_string(),
299 }));
300 }
301 let v = u32::from_le_bytes([
302 self.bytes[self.pos],
303 self.bytes[self.pos + 1],
304 self.bytes[self.pos + 2],
305 self.bytes[self.pos + 3],
306 ]);
307 self.pos += 4;
308 Ok(v)
309 }
310
311 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
312 if self.remaining() < n {
313 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
314 message: "unexpected eof".to_string(),
315 }));
316 }
317 let slice = &self.bytes[self.pos..self.pos + n];
318 self.pos += n;
319 Ok(slice.to_vec())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 include!(concat!(
326 env!("CARGO_MANIFEST_DIR"),
327 "/tests/unit/src_index_tests.rs"
328 ));
329}