Skip to main content

aurora_db/
index.rs

1use crate::error::Result;
2use crate::search::FullTextIndex;
3use crate::types::{Document, Value};
4use crossbeam_skiplist::{SkipMap, SkipSet};
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub enum IndexType {
10    BTree,
11    Hash,
12    FullText,
13    Custom(String),
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct IndexDefinition {
18    pub name: String,
19    pub collection: String,
20    pub fields: Vec<String>,
21    pub index_type: IndexType,
22    pub unique: bool,
23}
24
25pub struct Index {
26    definition: IndexDefinition,
27    // Using SkipMap for lock-free concurrent sorted storage
28    // Value -> Set of Document IDs (Sorted) to avoid O(N) cloning on insert
29    data: Arc<SkipMap<Value, Arc<SkipSet<String>>>>,
30    full_text: Option<Arc<FullTextIndex>>,
31}
32
33impl Index {
34    pub fn new(definition: IndexDefinition) -> Self {
35        let full_text = if matches!(definition.index_type, IndexType::FullText) {
36            Some(Arc::new(FullTextIndex::new(
37                &definition.collection,
38                &definition.fields[0],
39            )))
40        } else {
41            None
42        };
43
44        Self {
45            definition,
46            data: Arc::new(SkipMap::new()),
47            full_text,
48        }
49    }
50
51    pub fn insert(&self, doc: &Document) -> Result<()> {
52        let key = self.extract_key(doc)?;
53        let doc_id = doc._sid.clone();
54
55        // Get or create the SkipSet for this key atomically, then insert the doc ID.
56        // For unique fields we check *after* inserting: if the set now contains more
57        // than one ID another writer slipped in concurrently, so we roll back and
58        // return an error. This avoids the TOCTOU race of a pre-insert read.
59        let id_set = self
60            .data
61            .get_or_insert(key.clone(), Arc::new(SkipSet::new()))
62            .value()
63            .clone();
64
65        id_set.insert(doc_id.clone());
66
67        if self.definition.unique && id_set.len() > 1 {
68            id_set.remove(&doc_id);
69            return Err(crate::error::AqlError::invalid_operation(
70                "Unique constraint violation".to_string(),
71            ));
72        }
73
74        if let Some(ft_index) = &self.full_text {
75            ft_index.index_document(doc)?;
76        }
77
78        Ok(())
79    }
80
81    pub fn search(&self, value: &Value) -> Option<Vec<String>> {
82        self.data
83            .get(value)
84            .map(|e| e.value().iter().map(|v| v.to_string()).collect())
85    }
86
87    pub fn remove(&self, doc: &Document) -> Result<()> {
88        let key = self.extract_key(doc)?;
89        if let Some(entry) = self.data.get(&key) {
90            entry.value().remove(&doc._sid);
91            // Prune the entry when the set becomes empty to avoid memory growth
92            if entry.value().is_empty() {
93                self.data.remove(&key);
94            }
95        }
96        Ok(())
97    }
98
99    /// Return all IDs in the index, sorted by key value (Lock-free iteration)
100    pub fn iter_ids(&self) -> Vec<String> {
101        self.data
102            .iter()
103            .flat_map(|e| {
104                let ids: Vec<String> = e.value().iter().map(|v| v.to_string()).collect();
105                ids
106            })
107            .collect()
108    }
109
110    pub fn search_text(&self, query: &str) -> Option<Vec<(String, f32)>> {
111        self.full_text.as_ref().map(|ft| ft.search(query))
112    }
113
114    fn extract_key(&self, doc: &Document) -> Result<Value> {
115        if self.definition.fields.len() == 1 {
116            Ok(doc
117                .data
118                .get(&self.definition.fields[0])
119                .cloned()
120                .unwrap_or(Value::Null))
121        } else {
122            let values: Vec<Value> = self
123                .definition
124                .fields
125                .iter()
126                .map(|f| doc.data.get(f).cloned().unwrap_or(Value::Null))
127                .collect();
128            Ok(Value::Array(values))
129        }
130    }
131
132    #[allow(dead_code)]
133    pub fn full_text(&self) -> Option<Arc<FullTextIndex>> {
134        self.full_text.clone()
135    }
136}