Skip to main content

mqdb_core/
index.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::entity::Entity;
5use crate::error::Result;
6use crate::keys;
7use crate::storage::{BatchWriter, Storage};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11#[derive(Serialize, Deserialize)]
12pub struct IndexDefinition {
13    pub entity: String,
14    pub fields: Vec<String>,
15}
16
17impl IndexDefinition {
18    #[allow(clippy::must_use_candidate)]
19    pub fn new(entity: String, fields: Vec<String>) -> Self {
20        Self { entity, fields }
21    }
22}
23
24pub struct IndexManager {
25    indexes: HashMap<String, IndexDefinition>,
26}
27
28impl IndexManager {
29    #[allow(clippy::must_use_candidate)]
30    pub fn new() -> Self {
31        Self {
32            indexes: HashMap::new(),
33        }
34    }
35
36    pub fn add_index(&mut self, definition: IndexDefinition) {
37        self.indexes.insert(definition.entity.clone(), definition);
38    }
39
40    #[allow(clippy::must_use_candidate)]
41    pub fn get_indexed_fields(&self, entity: &str) -> Option<&Vec<String>> {
42        self.indexes.get(entity).map(|idx| &idx.fields)
43    }
44
45    pub fn update_indexes(
46        &self,
47        batch: &mut BatchWriter,
48        entity: &Entity,
49        old_entity: Option<&Entity>,
50    ) {
51        if let Some(fields) = self.get_indexed_fields(&entity.name) {
52            if let Some(old) = old_entity {
53                Self::remove_index_entries(batch, old, fields);
54            }
55            Self::add_index_entries(batch, entity, fields);
56        }
57    }
58
59    pub fn remove_indexes(&self, batch: &mut BatchWriter, entity: &Entity) {
60        if let Some(fields) = self.get_indexed_fields(&entity.name) {
61            Self::remove_index_entries(batch, entity, fields);
62        }
63    }
64
65    fn add_index_entries(batch: &mut BatchWriter, entity: &Entity, fields: &[String]) {
66        let index_values = entity.extract_index_values(fields);
67
68        for (field, value) in index_values {
69            let key = keys::encode_index_key(&entity.name, &field, &value, &entity.id);
70            batch.insert(key, Vec::new());
71        }
72    }
73
74    fn remove_index_entries(batch: &mut BatchWriter, entity: &Entity, fields: &[String]) {
75        let index_values = entity.extract_index_values(fields);
76
77        for (field, value) in index_values {
78            let key = keys::encode_index_key(&entity.name, &field, &value, &entity.id);
79            batch.remove(key);
80        }
81    }
82
83    /// # Errors
84    /// Returns an error if the storage prefix scan fails.
85    pub fn lookup_by_field(
86        &self,
87        storage: &crate::storage::Storage,
88        entity: &str,
89        field: &str,
90        value: &[u8],
91    ) -> Result<Vec<String>> {
92        let prefix = keys::encode_index_prefix(entity, field, Some(value));
93        let items = storage.prefix_scan(&prefix)?;
94
95        Ok(Self::extract_ids_from_keys(&items))
96    }
97
98    #[allow(clippy::must_use_candidate)]
99    pub fn is_field_indexed(&self, entity: &str, field: &str) -> bool {
100        self.indexes
101            .get(entity)
102            .is_some_and(|idx| idx.fields.iter().any(|f| f == field))
103    }
104
105    /// # Errors
106    /// Returns an error if serialization fails.
107    pub fn persist_index(
108        &self,
109        batch: &mut BatchWriter,
110        definition: &IndexDefinition,
111    ) -> Result<()> {
112        let key = keys::encode_index_definition_key(&definition.entity);
113        let value = serde_json::to_vec(definition)?;
114        batch.insert(key, value);
115        Ok(())
116    }
117
118    /// # Errors
119    /// Returns an error if reading or deserializing index definitions fails.
120    pub fn load_indexes(&mut self, storage: &Storage) -> Result<()> {
121        let prefix = b"meta/index/";
122        let items = storage.prefix_scan(prefix)?;
123
124        for (_key, value) in items {
125            let definition: IndexDefinition = serde_json::from_slice(&value)?;
126            self.indexes.insert(definition.entity.clone(), definition);
127        }
128
129        Ok(())
130    }
131
132    /// # Errors
133    /// Returns an error if the storage range scan fails.
134    pub fn lookup_by_range(
135        &self,
136        storage: &crate::storage::Storage,
137        entity: &str,
138        field: &str,
139        lower: Option<(&[u8], bool)>,
140        upper: Option<(&[u8], bool)>,
141    ) -> Result<Vec<String>> {
142        let field_prefix = keys::encode_index_prefix(entity, field, None);
143
144        let start = if let Some((value, inclusive)) = lower {
145            let mut key = field_prefix.clone();
146            key.push(keys::SEPARATOR);
147            key.extend_from_slice(value);
148            key.push(keys::SEPARATOR);
149            if !inclusive {
150                key.push(0xFF);
151            }
152            key
153        } else {
154            let mut key = field_prefix.clone();
155            key.push(keys::SEPARATOR);
156            key
157        };
158
159        let end = if let Some((value, inclusive)) = upper {
160            let mut key = field_prefix;
161            key.push(keys::SEPARATOR);
162            key.extend_from_slice(value);
163            key.push(keys::SEPARATOR);
164            if inclusive {
165                key.push(0xFF);
166            }
167            key
168        } else {
169            let mut key = field_prefix;
170            key.push(0xFF);
171            key
172        };
173
174        let items = storage.range_scan(&start, &end)?;
175        Ok(Self::extract_ids_from_keys(&items))
176    }
177
178    fn extract_ids_from_keys(items: &[(Vec<u8>, Vec<u8>)]) -> Vec<String> {
179        let mut ids = Vec::new();
180        for (key, _) in items {
181            if let Some(id_start) = key.iter().rposition(|&b| b == b'/')
182                && let Ok(id) = String::from_utf8(key[id_start + 1..].to_vec())
183            {
184                ids.push(id);
185            }
186        }
187        ids
188    }
189}
190
191impl Default for IndexManager {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use crate::storage::Storage;
201    use serde_json::json;
202
203    #[test]
204    fn test_index_definition() {
205        let idx = IndexDefinition::new("users".into(), vec!["email".into(), "status".into()]);
206        assert_eq!(idx.entity, "users");
207        assert_eq!(idx.fields.len(), 2);
208    }
209
210    #[test]
211    fn test_extract_index_values() {
212        let entity = Entity::new(
213            "users".into(),
214            "123".into(),
215            json!({
216                "email": "test@example.com",
217                "status": "active"
218            }),
219        );
220
221        let values = entity.extract_index_values(&["email".into(), "status".into()]);
222        assert_eq!(values.len(), 2);
223    }
224
225    #[test]
226    fn is_field_indexed_returns_true_for_indexed_field() {
227        let mut mgr = IndexManager::new();
228        mgr.add_index(IndexDefinition::new(
229            "users".into(),
230            vec!["age".into(), "name".into()],
231        ));
232        assert!(mgr.is_field_indexed("users", "age"));
233        assert!(mgr.is_field_indexed("users", "name"));
234        assert!(!mgr.is_field_indexed("users", "email"));
235        assert!(!mgr.is_field_indexed("posts", "age"));
236    }
237
238    fn setup_indexed_entities(ages: &[i64]) -> (Storage, IndexManager) {
239        let storage = Storage::memory();
240        let mut mgr = IndexManager::new();
241        mgr.add_index(IndexDefinition::new("users".into(), vec!["age".into()]));
242
243        for (i, &age) in ages.iter().enumerate() {
244            let id = format!("u{i}");
245            let entity = Entity::new("users".into(), id, json!({"age": age}));
246            let mut batch = storage.batch();
247            mgr.update_indexes(&mut batch, &entity, None);
248            batch.commit().unwrap();
249        }
250
251        (storage, mgr)
252    }
253
254    #[test]
255    fn range_lookup_returns_ids_in_range() {
256        let (storage, mgr) = setup_indexed_entities(&[10, 20, 30, 40, 50]);
257
258        let lower = keys::encode_value_for_index(&json!(20)).unwrap();
259        let upper = keys::encode_value_for_index(&json!(40)).unwrap();
260        let ids = mgr
261            .lookup_by_range(
262                &storage,
263                "users",
264                "age",
265                Some((&lower, true)),
266                Some((&upper, true)),
267            )
268            .unwrap();
269
270        assert_eq!(ids.len(), 3);
271        assert!(ids.contains(&"u1".to_string()));
272        assert!(ids.contains(&"u2".to_string()));
273        assert!(ids.contains(&"u3".to_string()));
274    }
275
276    #[test]
277    fn range_lookup_exclusive_bounds() {
278        let (storage, mgr) = setup_indexed_entities(&[10, 20, 30, 40, 50]);
279
280        let lower = keys::encode_value_for_index(&json!(20)).unwrap();
281        let upper = keys::encode_value_for_index(&json!(40)).unwrap();
282        let ids = mgr
283            .lookup_by_range(
284                &storage,
285                "users",
286                "age",
287                Some((&lower, false)),
288                Some((&upper, false)),
289            )
290            .unwrap();
291
292        assert_eq!(ids.len(), 1);
293        assert!(ids.contains(&"u2".to_string()));
294    }
295
296    #[test]
297    fn range_lookup_open_ended_lower() {
298        let (storage, mgr) = setup_indexed_entities(&[10, 20, 30, 40, 50]);
299
300        let upper = keys::encode_value_for_index(&json!(30)).unwrap();
301        let ids = mgr
302            .lookup_by_range(&storage, "users", "age", None, Some((&upper, false)))
303            .unwrap();
304
305        assert_eq!(ids.len(), 2);
306        assert!(ids.contains(&"u0".to_string()));
307        assert!(ids.contains(&"u1".to_string()));
308    }
309
310    #[test]
311    fn range_lookup_open_ended_upper() {
312        let (storage, mgr) = setup_indexed_entities(&[10, 20, 30, 40, 50]);
313
314        let lower = keys::encode_value_for_index(&json!(30)).unwrap();
315        let ids = mgr
316            .lookup_by_range(&storage, "users", "age", Some((&lower, true)), None)
317            .unwrap();
318
319        assert_eq!(ids.len(), 3);
320        assert!(ids.contains(&"u2".to_string()));
321        assert!(ids.contains(&"u3".to_string()));
322        assert!(ids.contains(&"u4".to_string()));
323    }
324
325    #[test]
326    fn range_lookup_string_field() {
327        let storage = Storage::memory();
328        let mut mgr = IndexManager::new();
329        mgr.add_index(IndexDefinition::new("users".into(), vec!["name".into()]));
330
331        let names = ["alice", "bob", "carol", "dave", "eve"];
332        for (i, name) in names.iter().enumerate() {
333            let id = format!("u{i}");
334            let entity = Entity::new("users".into(), id, json!({"name": name}));
335            let mut batch = storage.batch();
336            mgr.update_indexes(&mut batch, &entity, None);
337            batch.commit().unwrap();
338        }
339
340        let lower = keys::encode_value_for_index(&json!("bob")).unwrap();
341        let upper = keys::encode_value_for_index(&json!("dave")).unwrap();
342        let ids = mgr
343            .lookup_by_range(
344                &storage,
345                "users",
346                "name",
347                Some((&lower, true)),
348                Some((&upper, true)),
349            )
350            .unwrap();
351
352        assert_eq!(ids.len(), 3);
353        assert!(ids.contains(&"u1".to_string()));
354        assert!(ids.contains(&"u2".to_string()));
355        assert!(ids.contains(&"u3".to_string()));
356    }
357
358    #[test]
359    fn range_lookup_negative_numbers() {
360        let (storage, mgr) = setup_indexed_entities(&[-20, -10, 0, 10, 20]);
361
362        let lower = keys::encode_value_for_index(&json!(-10)).unwrap();
363        let upper = keys::encode_value_for_index(&json!(10)).unwrap();
364        let ids = mgr
365            .lookup_by_range(
366                &storage,
367                "users",
368                "age",
369                Some((&lower, true)),
370                Some((&upper, true)),
371            )
372            .unwrap();
373
374        assert_eq!(ids.len(), 3);
375        assert!(ids.contains(&"u1".to_string()));
376        assert!(ids.contains(&"u2".to_string()));
377        assert!(ids.contains(&"u3".to_string()));
378    }
379
380    #[test]
381    fn persist_and_load_indexes_round_trip() {
382        let storage = Storage::memory();
383        let mut mgr = IndexManager::new();
384        mgr.add_index(IndexDefinition::new(
385            "users".into(),
386            vec!["email".into(), "status".into()],
387        ));
388        mgr.add_index(IndexDefinition::new("posts".into(), vec!["title".into()]));
389
390        for def in mgr.indexes.values() {
391            let mut batch = storage.batch();
392            mgr.persist_index(&mut batch, def).unwrap();
393            batch.commit().unwrap();
394        }
395
396        let mut loaded = IndexManager::new();
397        loaded.load_indexes(&storage).unwrap();
398
399        assert_eq!(
400            loaded.get_indexed_fields("users").unwrap(),
401            &vec!["email".to_string(), "status".to_string()]
402        );
403        assert_eq!(
404            loaded.get_indexed_fields("posts").unwrap(),
405            &vec!["title".to_string()]
406        );
407        assert!(loaded.get_indexed_fields("nonexistent").is_none());
408    }
409}