Skip to main content

aurora_db/
query.rs

1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5
6use crate::Aurora;
7use crate::error::Result;
8use crate::types::{Document, Value};
9use std::collections::HashMap;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use roaring::RoaringBitmap;
13
14#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
15pub struct SimpleQueryBuilder {
16    pub collection: String,
17    pub filters: Vec<Filter>,
18    pub order_by: Option<(String, bool)>,
19    pub limit: Option<usize>,
20    pub offset: Option<usize>,
21}
22
23/// Builder for creating and executing document queries.
24pub struct QueryBuilder<'a> {
25    db: &'a Aurora,
26    collection: String,
27    filters: Vec<Filter>,
28    order_by: Option<(String, bool)>,
29    limit: Option<usize>,
30    offset: Option<usize>,
31    fields: Option<Vec<String>>,
32    debounce_duration: Option<std::time::Duration>,
33}
34
35/// Builder for constructing document filter expressions.
36pub struct FilterBuilder;
37
38impl FilterBuilder {
39    pub fn new() -> Self {
40        Self
41    }
42
43    pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
44        Filter::Eq(field.to_string(), value.into())
45    }
46
47    pub fn ne<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
48        Filter::Ne(field.to_string(), value.into())
49    }
50
51    pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> Filter {
52        Filter::In(field.to_string(), values.iter().cloned().map(|v| v.into()).collect())
53    }
54
55    pub fn starts_with(&self, field: &str, value: &str) -> Filter {
56        Filter::StartsWith(field.to_string(), value.to_string())
57    }
58
59    pub fn contains(&self, field: &str, value: &str) -> Filter {
60        Filter::Contains(field.to_string(), value.to_string())
61    }
62
63    pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
64        Filter::Gt(field.to_string(), value.into())
65    }
66
67    pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
68        Filter::Gte(field.to_string(), value.into())
69    }
70
71    pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
72        Filter::Lt(field.to_string(), value.into())
73    }
74
75    pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
76        Filter::Lte(field.to_string(), value.into())
77    }
78
79    pub fn in_vec<T: Into<Value>>(&self, field: &str, values: Vec<T>) -> Filter {
80        Filter::In(field.to_string(), values.into_iter().map(|v| v.into()).collect())
81    }
82
83    pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> Filter {
84        Filter::And(vec![
85            Filter::Gte(field.to_string(), min.into()),
86            Filter::Lte(field.to_string(), max.into()),
87        ])
88    }
89}
90
91impl<'a> QueryBuilder<'a> {
92    pub fn new(db: &'a Aurora, collection: &str) -> Self {
93        Self {
94            db,
95            collection: collection.to_string(),
96            filters: Vec::new(),
97            order_by: None,
98            limit: None,
99            offset: None,
100            fields: None,
101            debounce_duration: None,
102        }
103    }
104
105    pub fn filter<F>(mut self, f: F) -> Self
106    where
107        F: FnOnce(&FilterBuilder) -> Filter,
108    {
109        let builder = FilterBuilder::new();
110        self.filters.push(f(&builder));
111        self
112    }
113
114    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
115        self.order_by = Some((field.to_string(), ascending));
116        self
117    }
118
119    pub fn limit(mut self, limit: usize) -> Self {
120        self.limit = Some(limit);
121        self
122    }
123
124    pub fn offset(mut self, offset: usize) -> Self {
125        self.offset = Some(offset);
126        self
127    }
128
129    pub fn select(mut self, fields: Vec<&str>) -> Self {
130        self.fields = Some(fields.into_iter().map(|s| s.to_string()).collect());
131        self
132    }
133
134    pub fn debounce(mut self, duration: std::time::Duration) -> Self {
135        self.debounce_duration = Some(duration);
136        self
137    }
138
139    pub async fn first_one(self) -> Result<Option<Document>> {
140        let docs = self.limit(1).collect().await?;
141        Ok(docs.into_iter().next())
142    }
143
144    /// Executes the query and returns the matching documents.
145    /// Uses secondary indices (Roaring Bitmaps) for optimized filtering when possible.
146    pub async fn collect(self) -> Result<Vec<Document>> {
147        self.db.ensure_indices_initialized().await?;
148
149        // Optimized Bitwise Intersection Path
150        let mut candidate_bitmap: Option<RoaringBitmap> = None;
151
152        for filter in &self.filters {
153            if let Filter::Eq(field, value) = filter {
154                // Check secondary indices
155                let index_key = format!("{}:{}", self.collection, field);
156                let val_str = match value {
157                    Value::String(s) => s.clone(),
158                    _ => value.to_string(),
159                };
160                let full_key = format!("{}:{}:{}", self.collection, field, val_str);
161
162                let mut current_bitmap = RoaringBitmap::new();
163                let mut found = false;
164
165                // 1. Check Cold Index (mmap) — bounds-checked to avoid panic on corrupt manifest
166                if let Some(loc) = self.db.index_manifest.get(&full_key) {
167                    let (offset, len) = *loc.value();
168                    if let Ok(guard) = self.db.mmap_index.read() {
169                        if let Some(mmap) = guard.as_ref() {
170                            if offset + len <= mmap.len() {
171                                let bytes = &mmap[offset..(offset + len)];
172                                if let Ok(cold_bitmap) = RoaringBitmap::deserialize_from(bytes) {
173                                    current_bitmap |= cold_bitmap;
174                                    found = true;
175                                }
176                            }
177                        }
178                    }
179                }
180
181                // 2. Check Hot Index — direct entry lookup, no DashMap clone
182                if let Some(storage_arc) = self.db.get_indexed_storage(&index_key, &val_str) {
183                    if let Ok(storage) = storage_arc.read() {
184                        current_bitmap |= storage.to_bitmap();
185                        found = true;
186                    }
187                }
188
189                if !found {
190                    // Only short-circuit on an indexed miss when there is NO active
191                    // transaction — transaction writes are buffered and never reach the
192                    // bitmap index, so we must not hide them with an early empty return.
193                    let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
194                        .try_with(|id| *id)
195                        .ok()
196                        .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
197                        .is_some();
198
199                    if !in_transaction && self.db.has_index_key(&index_key) {
200                        return Ok(vec![]);
201                    }
202                    // Index absent or inside a transaction → fall through to scan path
203                    candidate_bitmap = None;
204                    break;
205                }
206
207                if let Some(ref mut existing) = candidate_bitmap {
208                    *existing &= current_bitmap; // Bitwise AND intersection
209                } else {
210                    candidate_bitmap = Some(current_bitmap);
211                }
212
213                // Short-circuit on empty intersection — but only outside a transaction.
214                // Buffered inserts are merged later and may still satisfy all filters.
215                if let Some(ref b) = candidate_bitmap {
216                    if b.is_empty() {
217                        let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
218                            .try_with(|id| *id)
219                            .ok()
220                            .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
221                            .is_some();
222                        if !in_transaction {
223                            return Ok(vec![]);
224                        }
225                    }
226                }
227            }
228        }
229
230        let mut docs = if let Some(bitmap) = candidate_bitmap {
231            // OPTIMIZATION: If the query only requests 'id', we can bypass Sled hydration entirely
232            let id_only = self.fields.as_ref().map(|f| f.len() == 1 && f[0] == "id").unwrap_or(false);
233            
234            // Check active transaction buffer for collection members
235            let tx_id = crate::transaction::ACTIVE_TRANSACTION_ID
236                .try_with(|id| *id)
237                .ok();
238            
239            let tx_buffer = tx_id.and_then(|id| self.db.transaction_manager.active_transactions.get(&id));
240
241            // Hydrate only the final matching IDs
242            let mut final_docs = Vec::with_capacity(bitmap.len() as usize);
243            for internal_id in bitmap {
244                if let Some(external_id) = self.db.get_external_id(internal_id) {
245                    // IMPORTANT: Check if this doc was deleted in the current transaction
246                    if let Some(ref buffer) = tx_buffer {
247                        let key = format!("{}:{}", self.collection, external_id);
248                        if buffer.deletes.contains_key(&key) {
249                            continue;
250                        }
251                    }
252
253                    if id_only && self.filters.is_empty() {
254                        // ULTRA FAST PATH: No filters and id only
255                        final_docs.push(Document { id: external_id, data: HashMap::new() });
256                        continue;
257                    }
258
259                    if let Ok(Some(doc)) = self.db.get_document(&self.collection, &external_id) {
260                        // Double-check with full filter (for any non-indexed conditions)
261                        if self.filters.iter().all(|f| f.matches(&doc)) {
262                            final_docs.push(doc);
263                        }
264                    }
265                }
266            }
267
268            // Also check for NEW documents in transaction that might match (not in bitmap index yet)
269            if let Some(buffer) = tx_buffer {
270                let prefix = format!("{}:", self.collection);
271                for item in buffer.writes.iter() {
272                    let key: &String = item.key();
273                    if let Some(external_id) = key.strip_prefix(&prefix) {
274                        
275                        // If it's already in final_docs (from bitmap index), skip it
276                        if final_docs.iter().any(|d| d.id == external_id) {
277                            continue;
278                        }
279
280                        let data: &Vec<u8> = item.value();
281                        if let Ok(doc) = self.db.deserialize_internal::<Document>(data) {
282                            if self.filters.iter().all(|f| f.matches(&doc)) {
283                                final_docs.push(doc);
284                            }
285                        }
286                    }
287                }
288            }
289
290            final_docs
291        } else {
292            // Fallback to scan if no indices were hit.
293            // Only pre-limit when there is no sort — applying a limit before sorting
294            // truncates the match set and returns the wrong page when scan order
295            // differs from sort order.
296            let scan_limit = if self.order_by.is_none() {
297                self.limit.map(|l| l + self.offset.unwrap_or(0))
298            } else {
299                None
300            };
301            
302            let db_filters = self.filters.clone();
303            self.db.scan_and_filter(&self.collection, move |doc| {
304                db_filters.iter().all(|f| f.matches(doc))
305            }, scan_limit)?
306        };
307
308        // Apply Sorting
309        if let Some((field, ascending)) = self.order_by {
310            docs.sort_by(|a, b| {
311                let v1 = a.data.get(&field);
312                let v2 = b.data.get(&field);
313                let ord = compare_values(v1, v2);
314                if ascending { ord } else { ord.reverse() }
315            });
316        }
317
318        // Apply Offset/Limit
319        let mut start = self.offset.unwrap_or(0);
320        if start > docs.len() { start = docs.len(); }
321        let mut end = docs.len();
322        if let Some(max) = self.limit {
323            if start + max < end { end = start + max; }
324        }
325
326        let mut result = docs[start..end].to_vec();
327
328        // Apply computed fields
329        if let Ok(computed) = self.db.computed.read() {
330            for doc in &mut result {
331                let _ = computed.apply(&self.collection, doc);
332            }
333        }
334
335        // Apply field projection when select() was called
336        if let Some(ref fields) = self.fields {
337            let field_set: std::collections::HashSet<&str> =
338                fields.iter().map(|s| s.as_str()).collect();
339            for doc in &mut result {
340                doc.data.retain(|k, _| field_set.contains(k.as_str()));
341            }
342        }
343
344        Ok(result)
345    }
346
347    pub async fn count(self) -> Result<usize> {
348        let results = self.collect().await?;
349        Ok(results.len())
350    }
351
352    pub async fn delete(self) -> Result<usize> {
353        let db = self.db;
354        let collection = self.collection.clone();
355        let docs = self.collect().await?;
356        let count = docs.len();
357        for doc in docs {
358            let _ = db.aql_delete_document(&collection, &doc.id).await;
359        }
360        Ok(count)
361    }
362
363    pub async fn watch(self) -> Result<crate::reactive::QueryWatcher> {
364        let collection = self.collection.clone();
365        let filters = self.filters.clone();
366        let db_clone = self.db.clone();
367        let debounce_duration = self.debounce_duration;
368
369        let initial_results = self.collect().await?;
370        let listener = db_clone.pubsub.listen(&collection);
371        let state = Arc::new(crate::reactive::ReactiveQueryState::new(filters));
372
373        Ok(crate::reactive::QueryWatcher::new(
374            Arc::new(db_clone),
375            collection,
376            listener,
377            state,
378            initial_results,
379            debounce_duration,
380        ))
381    }
382}
383
384/// Builder for full-text search queries.
385pub struct SearchBuilder<'a> {
386    db: &'a Aurora,
387    collection: String,
388    query: String,
389    limit: Option<usize>,
390    fuzzy: bool,
391    distance: u8,
392    search_fields: Option<Vec<String>>,
393}
394
395impl<'a> SearchBuilder<'a> {
396    pub fn new(db: &'a Aurora, collection: &str) -> Self {
397        Self {
398            db,
399            collection: collection.to_string(),
400            query: String::new(),
401            limit: None,
402            fuzzy: false,
403            distance: 0,
404            search_fields: None,
405        }
406    }
407
408    pub fn query(mut self, query: &str) -> Self {
409        self.query = query.to_string();
410        self
411    }
412
413    pub fn limit(mut self, limit: usize) -> Self {
414        self.limit = Some(limit);
415        self
416    }
417
418    pub fn fuzzy(mut self, distance: u8) -> Self {
419        self.fuzzy = true;
420        self.distance = distance;
421        self
422    }
423
424    /// Restrict search to specific field names (None = all string fields)
425    pub fn fields(mut self, fields: Vec<String>) -> Self {
426        self.search_fields = Some(fields);
427        self
428    }
429
430    /// Like collect() but accepts an optional field filter inline
431    pub async fn collect_with_fields(self, fields: Option<&[String]>) -> Result<Vec<Document>> {
432        let builder = if let Some(f) = fields {
433            Self { search_fields: Some(f.to_vec()), ..self }
434        } else {
435            self
436        };
437        builder.collect().await
438    }
439
440    pub async fn collect(self) -> Result<Vec<Document>> {
441        let query = self.query.to_lowercase();
442        let mut results = Vec::new();
443
444        if let Some(index) = self.db.primary_indices.get(&self.collection) {
445            for entry in index.iter() {
446                if let Some(data) = self.db.get(entry.key())? {
447                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
448                        let matches = if query.is_empty() {
449                            true
450                        } else {
451                            let fields_to_check = self.search_fields.as_deref();
452                            doc.data.iter().any(|(k, v)| {
453                                if let Some(ref allowed) = fields_to_check {
454                                    if !allowed.contains(k) {
455                                        return false;
456                                    }
457                                }
458                                if let crate::types::Value::String(s) = v {
459                                    s.to_lowercase().contains(&query)
460                                } else {
461                                    false
462                                }
463                            })
464                        };
465                        if matches {
466                            results.push(doc);
467                            if let Some(l) = self.limit {
468                                if results.len() >= l {
469                                    break;
470                                }
471                            }
472                        }
473                    }
474                }
475            }
476        }
477
478        Ok(results)
479    }
480}
481
482
483fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
484    match (a, b) {
485        (None, None) => std::cmp::Ordering::Equal,
486        (None, Some(_)) => std::cmp::Ordering::Less,
487        (Some(_), None) => std::cmp::Ordering::Greater,
488        (Some(v1), Some(v2)) => v1.partial_cmp(v2).unwrap_or(std::cmp::Ordering::Equal),
489    }
490}
491
492/// Supported filter operators
493#[derive(Debug, Clone, Serialize, Deserialize)]
494pub enum Filter {
495    Eq(String, Value),
496    Ne(String, Value),
497    Gt(String, Value),
498    Gte(String, Value),
499    Lt(String, Value),
500    Lte(String, Value),
501    In(String, Vec<Value>),
502    Contains(String, String),
503    StartsWith(String, String),
504    IsNull(String),
505    IsNotNull(String),
506    Not(Box<Filter>),
507    And(Vec<Filter>),
508    Or(Vec<Filter>),
509}
510
511/// Traverse a dotted field path (e.g. `"meta.author"`) through a Document's data map.
512/// Returns `None` if any segment is missing or the intermediate value is not an Object.
513fn get_nested<'a>(doc: &'a Document, field: &str) -> Option<&'a Value> {
514    let mut parts = field.splitn(2, '.');
515    let first = parts.next()?;
516    let rest = parts.next();
517    let val = doc.data.get(first)?;
518    match rest {
519        None => Some(val),
520        Some(remaining) => get_nested_value(val, remaining),
521    }
522}
523
524/// Like `get_nested` but also handles the virtual `"id"` field which lives in `doc.id`.
525/// Returns an owned `Value` to avoid lifetime issues with the temporary id string.
526fn get_field_owned(doc: &Document, field: &str) -> Option<Value> {
527    if field == "id" && !doc.data.contains_key("id") {
528        Some(Value::String(doc.id.clone()))
529    } else {
530        get_nested(doc, field).cloned()
531    }
532}
533
534fn get_nested_value<'a>(val: &'a Value, path: &str) -> Option<&'a Value> {
535    let mut parts = path.splitn(2, '.');
536    let first = parts.next()?;
537    let rest = parts.next();
538    if let Value::Object(map) = val {
539        let child = map.get(first)?;
540        match rest {
541            None => Some(child),
542            Some(remaining) => get_nested_value(child, remaining),
543        }
544    } else {
545        None
546    }
547}
548
549impl std::ops::Not for Filter {
550    type Output = Self;
551    fn not(self) -> Self::Output {
552        Filter::Not(Box::new(self))
553    }
554}
555
556impl Filter {
557    pub fn matches(&self, doc: &Document) -> bool {
558        match self {
559            Filter::Eq(f, v) => get_field_owned(doc, f).as_ref() == Some(v),
560            Filter::Ne(f, v) => get_field_owned(doc, f).as_ref() != Some(v),
561            Filter::Gt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv > *v),
562            Filter::Gte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv >= *v),
563            Filter::Lt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv < *v),
564            Filter::Lte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv <= *v),
565            Filter::In(f, v) => get_field_owned(doc, f).map_or(false, |dv| v.contains(&dv)),
566            Filter::Contains(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
567                if let Value::String(s) = dv { s.contains(v.as_str()) } else { false }
568            }),
569            Filter::StartsWith(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
570                if let Value::String(s) = dv { s.starts_with(v.as_str()) } else { false }
571            }),
572            Filter::IsNull(f) => get_field_owned(doc, f).map_or(true, |v| matches!(v, Value::Null)),
573            Filter::IsNotNull(f) => get_field_owned(doc, f).map_or(false, |v| !matches!(v, Value::Null)),
574            Filter::Not(f) => !f.matches(doc),
575            Filter::And(fs) => fs.iter().all(|f| f.matches(doc)),
576            Filter::Or(fs) => fs.iter().any(|f| f.matches(doc)),
577        }
578    }
579}
580
581impl std::ops::BitAnd for Filter {
582    type Output = Filter;
583    fn bitand(self, rhs: Self) -> Self::Output {
584        match (self, rhs) {
585            (Filter::And(mut a), Filter::And(mut b)) => { a.append(&mut b); Filter::And(a) }
586            (Filter::And(mut a), b) => { a.push(b); Filter::And(a) }
587            (a, Filter::And(mut b)) => { b.insert(0, a); Filter::And(b) }
588            (a, b) => Filter::And(vec![a, b]),
589        }
590    }
591}
592
593impl std::ops::BitOr for Filter {
594    type Output = Filter;
595    fn bitor(self, rhs: Self) -> Self::Output {
596        match (self, rhs) {
597            (Filter::Or(mut a), Filter::Or(mut b)) => { a.append(&mut b); Filter::Or(a) }
598            (Filter::Or(mut a), b) => { a.push(b); Filter::Or(a) }
599            (a, Filter::Or(mut b)) => { b.insert(0, a); Filter::Or(b) }
600            (a, b) => Filter::Or(vec![a, b]),
601        }
602    }
603}