Skip to main content

aurora_db/
query.rs

1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5
6use crate::Aurora;
7use crate::error::Result;
8use crate::types::{Document, Value};
9use std::collections::HashMap;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use roaring::RoaringBitmap;
13
14#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
15pub struct SimpleQueryBuilder {
16    pub collection: String,
17    pub filters: Vec<Filter>,
18    pub order_by: Option<(String, bool)>,
19    pub limit: Option<usize>,
20    pub offset: Option<usize>,
21}
22
23/// Builder for creating and executing document queries.
24pub struct QueryBuilder<'a> {
25    db: &'a Aurora,
26    collection: String,
27    filters: Vec<Filter>,
28    order_by: Option<(String, bool)>,
29    limit: Option<usize>,
30    offset: Option<usize>,
31    fields: Option<Vec<String>>,
32    debounce_duration: Option<std::time::Duration>,
33}
34
35/// Builder for constructing document filter expressions.
36pub struct FilterBuilder;
37
38impl FilterBuilder {
39    pub fn new() -> Self {
40        Self
41    }
42
43    pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
44        Filter::Eq(field.to_string(), value.into())
45    }
46
47    pub fn ne<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
48        Filter::Ne(field.to_string(), value.into())
49    }
50
51    pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> Filter {
52        Filter::In(field.to_string(), values.iter().cloned().map(|v| v.into()).collect())
53    }
54
55    pub fn starts_with(&self, field: &str, value: &str) -> Filter {
56        Filter::StartsWith(field.to_string(), value.to_string())
57    }
58
59    pub fn contains(&self, field: &str, value: &str) -> Filter {
60        Filter::Contains(field.to_string(), value.to_string())
61    }
62
63    pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
64        Filter::Gt(field.to_string(), value.into())
65    }
66
67    pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
68        Filter::Gte(field.to_string(), value.into())
69    }
70
71    pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
72        Filter::Lt(field.to_string(), value.into())
73    }
74
75    pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
76        Filter::Lte(field.to_string(), value.into())
77    }
78
79    pub fn in_vec<T: Into<Value>>(&self, field: &str, values: Vec<T>) -> Filter {
80        Filter::In(field.to_string(), values.into_iter().map(|v| v.into()).collect())
81    }
82
83    pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> Filter {
84        Filter::And(vec![
85            Filter::Gte(field.to_string(), min.into()),
86            Filter::Lte(field.to_string(), max.into()),
87        ])
88    }
89}
90
91impl<'a> QueryBuilder<'a> {
92    pub fn new(db: &'a Aurora, collection: &str) -> Self {
93        Self {
94            db,
95            collection: collection.to_string(),
96            filters: Vec::new(),
97            order_by: None,
98            limit: None,
99            offset: None,
100            fields: None,
101            debounce_duration: None,
102        }
103    }
104
105    pub fn filter<F>(mut self, f: F) -> Self
106    where
107        F: FnOnce(&FilterBuilder) -> Filter,
108    {
109        let builder = FilterBuilder::new();
110        self.filters.push(f(&builder));
111        self
112    }
113
114    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
115        self.order_by = Some((field.to_string(), ascending));
116        self
117    }
118
119    pub fn limit(mut self, limit: usize) -> Self {
120        self.limit = Some(limit);
121        self
122    }
123
124    pub fn offset(mut self, offset: usize) -> Self {
125        self.offset = Some(offset);
126        self
127    }
128
129    pub fn select(mut self, fields: Vec<&str>) -> Self {
130        self.fields = Some(fields.into_iter().map(|s| s.to_string()).collect());
131        self
132    }
133
134    pub fn debounce(mut self, duration: std::time::Duration) -> Self {
135        self.debounce_duration = Some(duration);
136        self
137    }
138
139    pub async fn first_one(self) -> Result<Option<Document>> {
140        let docs = self.limit(1).collect().await?;
141        Ok(docs.into_iter().next())
142    }
143
144    /// Executes the query and returns the matching documents.
145    /// Uses secondary indices (Roaring Bitmaps) for optimized filtering when possible.
146    pub async fn collect(self) -> Result<Vec<Document>> {
147        self.db.ensure_indices_initialized().await?;
148
149        // Optimized Bitwise Intersection Path
150        let mut candidate_bitmap: Option<RoaringBitmap> = None;
151
152        for filter in &self.filters {
153            if let Filter::Eq(field, value) = filter {
154                // Check secondary indices
155                let index_key = format!("{}:{}", self.collection, field);
156                let val_str = match value {
157                    Value::String(s) => s.clone(),
158                    _ => value.to_string(),
159                };
160                let full_key = format!("{}:{}:{}", self.collection, field, val_str);
161
162                let mut current_bitmap = RoaringBitmap::new();
163                let mut found = false;
164
165                // 1. Check Cold Index (mmap) — bounds-checked to avoid panic on corrupt manifest
166                if let Some(loc) = self.db.index_manifest.get(&full_key) {
167                    let (offset, len) = *loc.value();
168                    if let Ok(guard) = self.db.mmap_index.read() {
169                        if let Some(mmap) = guard.as_ref() {
170                            if offset + len <= mmap.len() {
171                                let bytes = &mmap[offset..(offset + len)];
172                                if let Ok(cold_bitmap) = RoaringBitmap::deserialize_from(bytes) {
173                                    current_bitmap |= cold_bitmap;
174                                    found = true;
175                                }
176                            }
177                        }
178                    }
179                }
180
181                // 2. Check Hot Index — direct entry lookup, no DashMap clone
182                if let Some(storage_arc) = self.db.get_indexed_storage(&index_key, &val_str) {
183                    if let Ok(storage) = storage_arc.read() {
184                        current_bitmap |= storage.to_bitmap();
185                        found = true;
186                    }
187                }
188
189                if !found {
190                    // Only short-circuit on an indexed miss when there is NO active
191                    // transaction — transaction writes are buffered and never reach the
192                    // bitmap index, so we must not hide them with an early empty return.
193                    let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
194                        .try_with(|id| *id)
195                        .ok()
196                        .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
197                        .is_some();
198
199                    if !in_transaction && self.db.has_index_key(&index_key) {
200                        return Ok(vec![]);
201                    }
202                    // Index absent or inside a transaction → fall through to scan path
203                    candidate_bitmap = None;
204                    break;
205                }
206
207                if let Some(ref mut existing) = candidate_bitmap {
208                    *existing &= current_bitmap; // Bitwise AND intersection
209                } else {
210                    candidate_bitmap = Some(current_bitmap);
211                }
212
213                // Short-circuit on empty intersection — but only outside a transaction.
214                // Buffered inserts are merged later and may still satisfy all filters.
215                if let Some(ref b) = candidate_bitmap {
216                    if b.is_empty() {
217                        let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
218                            .try_with(|id| *id)
219                            .ok()
220                            .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
221                            .is_some();
222                        if !in_transaction {
223                            return Ok(vec![]);
224                        }
225                    }
226                }
227            }
228        }
229
230        let mut docs = if let Some(bitmap) = candidate_bitmap {
231            // OPTIMIZATION: If the query only requests 'id', we can bypass Sled hydration entirely
232            let id_only = self.fields.as_ref().map(|f| f.len() == 1 && f[0] == "id").unwrap_or(false);
233            
234            // Check active transaction buffer for collection members
235            let tx_id = crate::transaction::ACTIVE_TRANSACTION_ID
236                .try_with(|id| *id)
237                .ok();
238            
239            let tx_buffer = tx_id.and_then(|id| self.db.transaction_manager.active_transactions.get(&id));
240
241            // Hydrate only the final matching IDs
242            let mut final_docs = Vec::with_capacity(bitmap.len() as usize);
243            for internal_id in bitmap {
244                if let Some(external_id) = self.db.get_external_id(internal_id) {
245                    // IMPORTANT: Check if this doc was deleted in the current transaction
246                    if let Some(ref buffer) = tx_buffer {
247                        let key = format!("{}:{}", self.collection, external_id);
248                        if buffer.deletes.contains_key(&key) {
249                            continue;
250                        }
251                    }
252
253                    if id_only && self.filters.is_empty() {
254                        // ULTRA FAST PATH: No filters and id only
255                        final_docs.push(Document { _sid: external_id, data: HashMap::new() });
256                        continue;
257                    }
258
259                    if let Ok(Some(doc)) = self.db.get_document(&self.collection, &external_id) {
260                        // Double-check with full filter (for any non-indexed conditions)
261                        if self.filters.iter().all(|f| f.matches(&doc)) {
262                            final_docs.push(doc);
263                        }
264                    }
265                }
266            }
267
268            // Also check for NEW documents in transaction that might match (not in bitmap index yet)
269            if let Some(buffer) = tx_buffer {
270                let prefix = format!("{}:", self.collection);
271                for item in buffer.writes.iter() {
272                    let key: &String = item.key();
273                    if let Some(external_id) = key.strip_prefix(&prefix) {
274                        
275                        // If it's already in final_docs (from bitmap index), skip it
276                        if final_docs.iter().any(|d| d._sid == external_id) {
277                            continue;
278                        }
279
280                        let data: &Vec<u8> = item.value();
281                        if let Ok(doc) = self.db.deserialize_internal::<Document>(data) {
282                            if self.filters.iter().all(|f| f.matches(&doc)) {
283                                final_docs.push(doc);
284                            }
285                        }
286                    }
287                }
288            }
289
290            final_docs
291        } else {
292            // Fallback to scan if no indices were hit.
293            // Only pre-limit when there is no sort — applying a limit before sorting
294            // truncates the match set and returns the wrong page when scan order
295            // differs from sort order.
296            let scan_limit = if self.order_by.is_none() {
297                self.limit.map(|l| l + self.offset.unwrap_or(0))
298            } else {
299                None
300            };
301            
302            let db_filters = self.filters.clone();
303            self.db.scan_and_filter(&self.collection, move |doc| {
304                db_filters.iter().all(|f| f.matches(doc))
305            }, scan_limit)?
306        };
307
308        // Apply Sorting
309        if let Some((field, ascending)) = self.order_by {
310            docs.sort_by(|a, b| {
311                let v1 = a.data.get(&field);
312                let v2 = b.data.get(&field);
313                let ord = compare_values(v1, v2);
314                if ascending { ord } else { ord.reverse() }
315            });
316        }
317
318        // Apply Offset/Limit
319        let mut start = self.offset.unwrap_or(0);
320        if start > docs.len() { start = docs.len(); }
321        let mut end = docs.len();
322        if let Some(max) = self.limit {
323            if start + max < end { end = start + max; }
324        }
325
326        let mut result = docs[start..end].to_vec();
327
328        // Apply computed fields
329        if let Ok(computed) = self.db.computed.read() {
330            for doc in &mut result {
331                let _ = computed.apply(&self.collection, doc);
332            }
333        }
334
335        // Apply field projection when select() was called
336        if let Some(ref fields) = self.fields {
337            let field_set: std::collections::HashSet<&str> =
338                fields.iter().map(|s| s.as_str()).collect();
339            for doc in &mut result {
340                doc.data.retain(|k, _| field_set.contains(k.as_str()));
341            }
342        }
343
344        Ok(result)
345    }
346
347    pub async fn count(self) -> Result<usize> {
348        let results = self.collect().await?;
349        Ok(results.len())
350    }
351
352    pub async fn delete(self) -> Result<usize> {
353        let db = self.db;
354        let collection = self.collection.clone();
355        let docs = self.collect().await?;
356        let count = docs.len();
357        for doc in docs {
358            let _ = db.aql_delete_document(&collection, &doc._sid).await;
359        }
360        Ok(count)
361    }
362
363    pub async fn watch(self) -> Result<crate::reactive::QueryWatcher> {
364        let collection = self.collection.clone();
365        let filters = self.filters.clone();
366        let db_clone = self.db.clone();
367        let debounce_duration = self.debounce_duration;
368
369        let initial_results = self.collect().await?;
370        let listener = db_clone.pubsub.listen(&collection);
371        let state = Arc::new(crate::reactive::ReactiveQueryState::new(filters));
372
373        Ok(crate::reactive::QueryWatcher::new(
374            Arc::new(db_clone),
375            collection,
376            listener,
377            state,
378            initial_results,
379            debounce_duration,
380        ))
381    }
382}
383
384/// Builder for full-text search queries.
385pub struct SearchBuilder<'a> {
386    db: &'a Aurora,
387    collection: String,
388    query: String,
389    limit: Option<usize>,
390    fuzzy: bool,
391    distance: u8,
392    search_fields: Option<Vec<String>>,
393}
394
395/// Score a document against tokenised query terms using per-word Levenshtein distance.
396/// Returns 0.0 if no query token is within `max_dist` edits of any doc token.
397fn fuzzy_score(doc: &Document, query_tokens: &[&str], max_dist: usize, fields: Option<&[String]>) -> f32 {
398    let mut score = 0.0f32;
399    for (field, value) in &doc.data {
400        if let Some(allowed) = fields {
401            if !allowed.contains(field) { continue; }
402        }
403        if let crate::types::Value::String(text) = value {
404            let doc_tokens: Vec<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
405            for q in query_tokens {
406                for d in &doc_tokens {
407                    let dist = crate::search::levenshtein_distance(q, d);
408                    if dist <= max_dist {
409                        // Closer match = higher score; exact hit scores 1.0
410                        score += 1.0 / (1.0 + dist as f32 * 0.3);
411                    }
412                }
413            }
414        }
415    }
416    score
417}
418
419impl<'a> SearchBuilder<'a> {
420    pub fn new(db: &'a Aurora, collection: &str) -> Self {
421        Self {
422            db,
423            collection: collection.to_string(),
424            query: String::new(),
425            limit: None,
426            fuzzy: false,
427            distance: 0,
428            search_fields: None,
429        }
430    }
431
432    pub fn query(mut self, query: &str) -> Self {
433        self.query = query.to_string();
434        self
435    }
436
437    pub fn limit(mut self, limit: usize) -> Self {
438        self.limit = Some(limit);
439        self
440    }
441
442    pub fn fuzzy(mut self, distance: u8) -> Self {
443        self.fuzzy = true;
444        self.distance = distance;
445        self
446    }
447
448    /// Restrict search to specific field names (None = all string fields)
449    pub fn fields(mut self, fields: Vec<String>) -> Self {
450        self.search_fields = Some(fields);
451        self
452    }
453
454    /// Like collect() but accepts an optional field filter inline
455    pub async fn collect_with_fields(self, fields: Option<&[String]>) -> Result<Vec<Document>> {
456        let builder = if let Some(f) = fields {
457            Self { search_fields: Some(f.to_vec()), ..self }
458        } else {
459            self
460        };
461        builder.collect().await
462    }
463
464    pub async fn collect(self) -> Result<Vec<Document>> {
465        let query = self.query.to_lowercase();
466        let mut results = Vec::new();
467
468        if let Some(index) = self.db.primary_indices.get(&self.collection) {
469            if self.fuzzy && !query.is_empty() {
470                // Fuzzy path: score each doc, exclude zero-score matches, sort by relevance.
471                let query_tokens: Vec<&str> = query.split_whitespace().collect();
472                let max_dist = self.distance as usize;
473                let fields = self.search_fields.as_deref();
474                let mut scored: Vec<(f32, Document)> = Vec::new();
475
476                for entry in index.iter() {
477                    if let Some(data) = self.db.get(entry.key())? {
478                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
479                            let score = fuzzy_score(&doc, &query_tokens, max_dist, fields);
480                            if score > 0.0 {
481                                scored.push((score, doc));
482                            }
483                        }
484                    }
485                }
486
487                scored.sort_by(|(a, _), (b, _)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
488                for (_, doc) in scored {
489                    results.push(doc);
490                    if let Some(l) = self.limit {
491                        if results.len() >= l { break; }
492                    }
493                }
494            } else {
495                // Exact / substring path (unchanged).
496                for entry in index.iter() {
497                    if let Some(data) = self.db.get(entry.key())? {
498                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
499                            let matches = if query.is_empty() {
500                                true
501                            } else {
502                                let fields_to_check = self.search_fields.as_deref();
503                                doc.data.iter().any(|(k, v)| {
504                                    if let Some(ref allowed) = fields_to_check {
505                                        if !allowed.contains(k) { return false; }
506                                    }
507                                    if let crate::types::Value::String(s) = v {
508                                        s.to_lowercase().contains(&query)
509                                    } else {
510                                        false
511                                    }
512                                })
513                            };
514                            if matches {
515                                results.push(doc);
516                                if let Some(l) = self.limit {
517                                    if results.len() >= l { break; }
518                                }
519                            }
520                        }
521                    }
522                }
523            }
524        }
525
526        Ok(results)
527    }
528}
529
530
531fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
532    match (a, b) {
533        (None, None) => std::cmp::Ordering::Equal,
534        (None, Some(_)) => std::cmp::Ordering::Less,
535        (Some(_), None) => std::cmp::Ordering::Greater,
536        (Some(v1), Some(v2)) => v1.partial_cmp(v2).unwrap_or(std::cmp::Ordering::Equal),
537    }
538}
539
540/// Supported filter operators
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub enum Filter {
543    Eq(String, Value),
544    Ne(String, Value),
545    Gt(String, Value),
546    Gte(String, Value),
547    Lt(String, Value),
548    Lte(String, Value),
549    In(String, Vec<Value>),
550    Contains(String, String),
551    StartsWith(String, String),
552    IsNull(String),
553    IsNotNull(String),
554    Not(Box<Filter>),
555    And(Vec<Filter>),
556    Or(Vec<Filter>),
557}
558
559/// Traverse a dotted field path (e.g. `"meta.author"`) through a Document's data map.
560/// Returns `None` if any segment is missing or the intermediate value is not an Object.
561fn get_nested<'a>(doc: &'a Document, field: &str) -> Option<&'a Value> {
562    let mut parts = field.splitn(2, '.');
563    let first = parts.next()?;
564    let rest = parts.next();
565    let val = doc.data.get(first)?;
566    match rest {
567        None => Some(val),
568        Some(remaining) => get_nested_value(val, remaining),
569    }
570}
571
572/// Like `get_nested` but also handles the virtual `"id"` field which lives in `doc._sid`.
573/// Returns an owned `Value` to avoid lifetime issues with the temporary id string.
574fn get_field_owned(doc: &Document, field: &str) -> Option<Value> {
575    if field == "_sid" {
576        Some(Value::String(doc._sid.clone()))
577    } else {
578        get_nested(doc, field).cloned()
579    }
580}
581
582fn get_nested_value<'a>(val: &'a Value, path: &str) -> Option<&'a Value> {
583    let mut parts = path.splitn(2, '.');
584    let first = parts.next()?;
585    let rest = parts.next();
586    if let Value::Object(map) = val {
587        let child = map.get(first)?;
588        match rest {
589            None => Some(child),
590            Some(remaining) => get_nested_value(child, remaining),
591        }
592    } else {
593        None
594    }
595}
596
597impl std::ops::Not for Filter {
598    type Output = Self;
599    fn not(self) -> Self::Output {
600        Filter::Not(Box::new(self))
601    }
602}
603
604impl Filter {
605    pub fn matches(&self, doc: &Document) -> bool {
606        match self {
607            Filter::Eq(f, v) => get_field_owned(doc, f).as_ref() == Some(v),
608            Filter::Ne(f, v) => get_field_owned(doc, f).as_ref() != Some(v),
609            Filter::Gt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv > *v),
610            Filter::Gte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv >= *v),
611            Filter::Lt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv < *v),
612            Filter::Lte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv <= *v),
613            Filter::In(f, v) => get_field_owned(doc, f).map_or(false, |dv| v.contains(&dv)),
614            Filter::Contains(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
615                if let Value::String(s) = dv { s.contains(v.as_str()) } else { false }
616            }),
617            Filter::StartsWith(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
618                if let Value::String(s) = dv { s.starts_with(v.as_str()) } else { false }
619            }),
620            Filter::IsNull(f) => get_field_owned(doc, f).map_or(true, |v| matches!(v, Value::Null)),
621            Filter::IsNotNull(f) => get_field_owned(doc, f).map_or(false, |v| !matches!(v, Value::Null)),
622            Filter::Not(f) => !f.matches(doc),
623            Filter::And(fs) => fs.iter().all(|f| f.matches(doc)),
624            Filter::Or(fs) => fs.iter().any(|f| f.matches(doc)),
625        }
626    }
627}
628
629impl std::ops::BitAnd for Filter {
630    type Output = Filter;
631    fn bitand(self, rhs: Self) -> Self::Output {
632        match (self, rhs) {
633            (Filter::And(mut a), Filter::And(mut b)) => { a.append(&mut b); Filter::And(a) }
634            (Filter::And(mut a), b) => { a.push(b); Filter::And(a) }
635            (a, Filter::And(mut b)) => { b.insert(0, a); Filter::And(b) }
636            (a, b) => Filter::And(vec![a, b]),
637        }
638    }
639}
640
641impl std::ops::BitOr for Filter {
642    type Output = Filter;
643    fn bitor(self, rhs: Self) -> Self::Output {
644        match (self, rhs) {
645            (Filter::Or(mut a), Filter::Or(mut b)) => { a.append(&mut b); Filter::Or(a) }
646            (Filter::Or(mut a), b) => { a.push(b); Filter::Or(a) }
647            (a, Filter::Or(mut b)) => { b.insert(0, a); Filter::Or(b) }
648            (a, b) => Filter::Or(vec![a, b]),
649        }
650    }
651}