aurora_db/
query.rs

1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5//!
6//! ## Examples
7//!
8//! ```rust
9//! // Get all active users over 21
10//! let users = db.query("users")
11//!     .filter(|f| f.eq("status", "active") && f.gt("age", 21))
12//!     .order_by("last_login", false)
13//!     .limit(20)
14//!     .collect()
15//!     .await?;
16//! ```
17
18use crate::Aurora;
19use crate::error::AuroraError;
20use crate::error::Result;
21use crate::types::{Document, Value};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25/// Trait for objects that can filter documents.
26///
27/// This trait is implemented for closures that take a reference to a
28/// `FilterBuilder` and return a boolean, allowing for a natural filter syntax.
29pub trait Queryable {
30    fn matches(&self, doc: &Document) -> bool;
31}
32
33impl<F> Queryable for F
34where
35    F: Fn(&Document) -> bool,
36{
37    fn matches(&self, doc: &Document) -> bool {
38        self(doc)
39    }
40}
41
42/// Builder for creating and executing document queries.
43///
44/// QueryBuilder uses a fluent interface pattern to construct
45/// and execute queries against Aurora collections.
46///
47/// # Examples
48///
49/// ```
50/// // Query for active premium users
51/// let premium_users = db.query("users")
52///     .filter(|f| f.eq("status", "active") && f.eq("account_type", "premium"))
53///     .order_by("created_at", false)
54///     .limit(10)
55///     .collect()
56///     .await?;
57/// ```
58pub struct QueryBuilder<'a> {
59    db: &'a Aurora,
60    collection: String,
61    // CHANGE 1: Add `+ Send + Sync` to make the boxed closure thread-safe.
62    filters: Vec<Box<dyn Fn(&Document) -> bool + Send + Sync + 'a>>,
63    order_by: Option<(String, bool)>,
64    limit: Option<usize>,
65    offset: Option<usize>,
66    fields: Option<Vec<String>>,
67}
68
69/// Builder for constructing document filter expressions.
70///
71/// This struct provides methods for comparing document fields
72/// with values to create filter conditions.
73///
74/// # Examples
75///
76/// ```
77/// // Combine multiple filter conditions
78/// db.query("products")
79///     .filter(|f| {
80///         f.gte("price", 10.0) &&
81///         f.lte("price", 50.0) &&
82///         f.contains("name", "widget")
83///     })
84///     .collect()
85///     .await?;
86/// ```
87pub struct FilterBuilder<'a, 'b> {
88    doc: &'b Document,
89    _marker: std::marker::PhantomData<&'a ()>,
90}
91
92impl<'a, 'b> FilterBuilder<'a, 'b> {
93    /// Create a new filter builder for the given document
94    pub fn new(doc: &'b Document) -> Self {
95        Self {
96            doc,
97            _marker: std::marker::PhantomData,
98        }
99    }
100
101    /// Check if a field equals a value
102    ///
103    /// # Examples
104    /// ```
105    /// .filter(|f| f.eq("status", "active"))
106    /// ```
107    pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> bool {
108        let value = value.into();
109        self.doc.data.get(field).map_or(false, |v| v == &value)
110    }
111
112    /// Check if a field is greater than a value
113    ///
114    /// # Examples
115    /// ```
116    /// .filter(|f| f.gt("age", 21))
117    /// ```
118    pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
119        let value = value.into();
120        self.doc.data.get(field).map_or(false, |v| v > &value)
121    }
122
123    /// Check if a field is greater than or equal to a value
124    ///
125    /// # Examples
126    /// ```
127    /// .filter(|f| f.gte("age", 21))
128    /// ```
129    pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
130        let value = value.into();
131        self.doc.data.get(field).map_or(false, |v| v >= &value)
132    }
133
134    /// Check if a field is less than a value
135    ///
136    /// # Examples
137    /// ```
138    /// .filter(|f| f.lt("age", 65))
139    /// ```
140    pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
141        let value = value.into();
142        self.doc.data.get(field).map_or(false, |v| v < &value)
143    }
144
145    /// Check if a field is less than or equal to a value
146    ///
147    /// # Examples
148    /// ```
149    /// .filter(|f| f.lte("age", 65))
150    /// ```
151    pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
152        let value = value.into();
153        self.doc.data.get(field).map_or(false, |v| v <= &value)
154    }
155
156    /// Check if a field contains a value
157    ///
158    /// # Examples
159    /// ```
160    /// .filter(|f| f.contains("name", "widget"))
161    /// ```
162    pub fn contains(&self, field: &str, value: &str) -> bool {
163        self.doc.data.get(field).map_or(false, |v| match v {
164            Value::String(s) => s.contains(value),
165            Value::Array(arr) => arr.contains(&Value::String(value.to_string())),
166            _ => false,
167        })
168    }
169
170    /// Check if a field is in a list of values
171    ///
172    /// # Examples
173    /// ```
174    /// .filter(|f| f.in_values("status", &["active", "inactive"]))
175    /// ```
176    pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> bool {
177        let values: Vec<Value> = values.iter().map(|v| v.clone().into()).collect();
178        self.doc
179            .data
180            .get(field)
181            .map_or(false, |v| values.contains(v))
182    }
183
184    /// Check if a field is between two values (inclusive)
185    ///
186    /// # Examples
187    /// ```
188    /// .filter(|f| f.between("age", 18, 65))
189    /// ```
190    pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> bool {
191        self.gte(field, min) && self.lte(field, max)
192    }
193
194    /// Check if a field exists and is not null
195    ///
196    /// # Examples
197    /// ```
198    /// .filter(|f| f.exists("email"))
199    /// ```
200    pub fn exists(&self, field: &str) -> bool {
201        self.doc
202            .data
203            .get(field)
204            .map_or(false, |v| !matches!(v, Value::Null))
205    }
206
207    /// Check if a field doesn't exist or is null
208    ///
209    /// # Examples
210    /// ```
211    /// .filter(|f| f.is_null("email"))
212    /// ```
213    pub fn is_null(&self, field: &str) -> bool {
214        self.doc
215            .data
216            .get(field)
217            .map_or(true, |v| matches!(v, Value::Null))
218    }
219
220    /// Check if a field starts with a prefix
221    ///
222    /// # Examples
223    /// ```
224    /// .filter(|f| f.starts_with("name", "John"))
225    /// ```
226    pub fn starts_with(&self, field: &str, prefix: &str) -> bool {
227        self.doc.data.get(field).map_or(false, |v| match v {
228            Value::String(s) => s.starts_with(prefix),
229            _ => false,
230        })
231    }
232
233    /// Check if a field ends with a suffix
234    ///
235    /// # Examples
236    /// ```
237    /// .filter(|f| f.ends_with("name", "son"))
238    /// ```
239    pub fn ends_with(&self, field: &str, suffix: &str) -> bool {
240        self.doc.data.get(field).map_or(false, |v| match v {
241            Value::String(s) => s.ends_with(suffix),
242            _ => false,
243        })
244    }
245
246    /// Check if a field is in an array
247    ///
248    /// # Examples
249    /// ```
250    /// .filter(|f| f.array_contains("status", "active"))
251    /// ```
252    pub fn array_contains(&self, field: &str, value: impl Into<Value>) -> bool {
253        let value = value.into();
254        self.doc.data.get(field).map_or(false, |v| match v {
255            Value::Array(arr) => arr.contains(&value),
256            _ => false,
257        })
258    }
259
260    /// Check if an array has a specific length
261    ///
262    /// # Examples
263    /// ```
264    /// .filter(|f| f.array_len_eq("status", 2))
265    /// ```
266    pub fn array_len_eq(&self, field: &str, len: usize) -> bool {
267        self.doc.data.get(field).map_or(false, |v| match v {
268            Value::Array(arr) => arr.len() == len,
269            _ => false,
270        })
271    }
272
273    /// Access a nested field using dot notation
274    ///
275    /// # Examples
276    /// ```
277    /// .filter(|f| f.get_nested_value("user.address.city") == Some(&Value::String("New York")))
278    /// ```
279    pub fn get_nested_value(&self, path: &str) -> Option<&Value> {
280        let parts: Vec<&str> = path.split('.').collect();
281        let mut current = self.doc.data.get(parts[0])?;
282
283        for &part in &parts[1..] {
284            if let Value::Object(map) = current {
285                current = map.get(part)?;
286            } else {
287                return None;
288            }
289        }
290
291        Some(current)
292    }
293
294    /// Check if a nested field equals a value
295    ///
296    /// # Examples
297    /// ```
298    /// .filter(|f| f.nested_eq("user.address.city", "New York"))
299    /// ```
300    pub fn nested_eq<T: Into<Value>>(&self, path: &str, value: T) -> bool {
301        let value = value.into();
302        self.get_nested_value(path).map_or(false, |v| v == &value)
303    }
304
305    /// Check if a field matches a regular expression
306    ///
307    /// # Examples
308    /// ```
309    /// .filter(|f| f.matches_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
310    /// ```
311    pub fn matches_regex(&self, field: &str, pattern: &str) -> bool {
312        use regex::Regex;
313
314        if let Ok(re) = Regex::new(pattern) {
315            self.doc.data.get(field).map_or(false, |v| match v {
316                Value::String(s) => re.is_match(s),
317                _ => false,
318            })
319        } else {
320            false
321        }
322    }
323}
324
325impl<'a> QueryBuilder<'a> {
326    /// Create a new query builder for the specified collection
327    ///
328    /// # Examples
329    /// ```
330    /// let query = db.query("users");
331    /// ```
332    pub fn new(db: &'a Aurora, collection: &str) -> Self {
333        Self {
334            db,
335            collection: collection.to_string(),
336            filters: Vec::new(),
337            order_by: None,
338            limit: None,
339            offset: None,
340            fields: None,
341        }
342    }
343
344    /// Add a filter function to the query
345    ///
346    /// # Examples
347    /// ```
348    /// let active_users = db.query("users")
349    ///     .filter(|f| f.eq("status", "active"))
350    ///     .collect()
351    ///     .await?;
352    /// ```
353    pub fn filter<F>(mut self, filter_fn: F) -> Self
354    where
355        // CHANGE 2: Require the closure `F` to be `Send + Sync`.
356        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'a,
357    {
358        self.filters.push(Box::new(move |doc| {
359            let filter_builder = FilterBuilder::new(doc);
360            filter_fn(&filter_builder)
361        }));
362        self
363    }
364
365    /// Sort results by a field (ascending or descending)
366    ///
367    /// # Parameters
368    /// * `field` - The field to sort by
369    /// * `ascending` - `true` for ascending order, `false` for descending
370    ///
371    /// # Examples
372    /// ```
373    /// // Sort by age ascending
374    /// .order_by("age", true)
375    ///
376    /// // Sort by creation date descending (newest first)
377    /// .order_by("created_at", false)
378    /// ```
379    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
380        self.order_by = Some((field.to_string(), ascending));
381        self
382    }
383
384    /// Limit the number of results returned
385    ///
386    /// # Examples
387    /// ```
388    /// // Get at most 10 results
389    /// .limit(10)
390    /// ```
391    pub fn limit(mut self, limit: usize) -> Self {
392        self.limit = Some(limit);
393        self
394    }
395
396    /// Skip a number of results (for pagination)
397    ///
398    /// # Examples
399    /// ```
400    /// // For pagination: skip the first 20 results and get the next 10
401    /// .offset(20).limit(10)
402    /// ```
403    pub fn offset(mut self, offset: usize) -> Self {
404        self.offset = Some(offset);
405        self
406    }
407
408    /// Select only specific fields to return
409    ///
410    /// # Examples
411    /// ```
412    /// // Only return name and email fields
413    /// .select(&["name", "email"])
414    /// ```
415    pub fn select(mut self, fields: &[&str]) -> Self {
416        self.fields = Some(fields.iter().map(|s| s.to_string()).collect());
417        self
418    }
419
420    /// Execute the query and collect the results
421    ///
422    /// # Returns
423    /// A vector of documents matching the query criteria
424    ///
425    /// # Examples
426    /// ```
427    /// let results = db.query("products")
428    ///     .filter(|f| f.lt("price", 100))
429    ///     .collect()
430    ///     .await?;
431    /// ```
432    pub async fn collect(self) -> Result<Vec<Document>> {
433        // Ensure indices are initialized
434        self.db.ensure_indices_initialized().await?;
435
436        let mut docs = self.db.get_all_collection(&self.collection).await?;
437
438        // Apply filters
439        docs.retain(|doc| self.filters.iter().all(|f| f(doc)));
440
441        // Apply ordering
442        if let Some((field, ascending)) = self.order_by {
443            docs.sort_by(|a, b| match (a.data.get(&field), b.data.get(&field)) {
444                (Some(v1), Some(v2)) => {
445                    let cmp = v1.cmp(v2);
446                    if ascending { cmp } else { cmp.reverse() }
447                }
448                (None, Some(_)) => std::cmp::Ordering::Less,
449                (Some(_), None) => std::cmp::Ordering::Greater,
450                (None, None) => std::cmp::Ordering::Equal,
451            });
452        }
453
454        // Apply field selection if specified
455        if let Some(fields) = self.fields {
456            // Create new documents with only selected fields
457            docs = docs
458                .into_iter()
459                .map(|doc| {
460                    let mut new_data = HashMap::new();
461                    // Always include the ID
462                    for field in &fields {
463                        if let Some(value) = doc.data.get(field) {
464                            new_data.insert(field.clone(), value.clone());
465                        }
466                    }
467                    Document {
468                        id: doc.id,
469                        data: new_data,
470                    }
471                })
472                .collect();
473        }
474
475        // Apply offset and limit safely
476        let start = self.offset.unwrap_or(0);
477        let end = self
478            .limit
479            .map(|l| start.saturating_add(l))
480            .unwrap_or(docs.len());
481
482        // Ensure we don't go out of bounds
483        let end = end.min(docs.len());
484        Ok(docs.get(start..end).unwrap_or(&[]).to_vec())
485    }
486
487    /// Watch the query for real-time updates
488    ///
489    /// Returns a QueryWatcher that emits updates when documents are added,
490    /// removed, or modified in ways that affect the query results.
491    ///
492    /// Note: This method requires the QueryBuilder to have a 'static lifetime,
493    /// which means the database reference must also be 'static (e.g., Arc<Aurora>).
494    ///
495    /// # Examples
496    /// ```
497    /// let mut watcher = db.query("users")
498    ///     .filter(|f| f.eq("active", true))
499    ///     .watch()
500    ///     .await?;
501    ///
502    /// // Receive updates in real-time
503    /// while let Some(update) = watcher.next().await {
504    ///     match update {
505    ///         QueryUpdate::Added(doc) => println!("New: {:?}", doc),
506    ///         QueryUpdate::Removed(doc) => println!("Removed: {:?}", doc),
507    ///         QueryUpdate::Modified { old, new } => println!("Modified: {:?}", new),
508    ///     }
509    /// }
510    /// ```
511    pub async fn watch(mut self) -> Result<crate::reactive::QueryWatcher>
512    where
513        'a: 'static,
514    {
515        use crate::reactive::{QueryWatcher, ReactiveQueryState};
516        use std::sync::Arc;
517
518        // Extract the filters before consuming self
519        let collection = self.collection.clone();
520        let db = self.db;
521        let filters = std::mem::take(&mut self.filters);
522
523        // Get initial results
524        let docs = self.collect().await?;
525
526        // Create a listener for this collection
527        let listener = db.listen(&collection);
528
529        // Create filter closure that combines all the query filters
530        let filter_fn = move |doc: &Document| -> bool { filters.iter().all(|f| f(doc)) };
531
532        // Create reactive state
533        let state = Arc::new(ReactiveQueryState::new(filter_fn));
534
535        // Create and return watcher
536        Ok(QueryWatcher::new(collection, listener, state, docs))
537    }
538
539    /// Get only the first matching document or None if no matches
540    ///
541    /// # Examples
542    /// ```
543    /// let user = db.query("users")
544    ///     .filter(|f| f.eq("email", "jane@example.com"))
545    ///     .first_one()
546    ///     .await?;
547    /// ```
548    pub async fn first_one(self) -> Result<Option<Document>> {
549        self.limit(1).collect().await.map(|mut docs| docs.pop())
550    }
551
552    /// Count the number of documents matching the query
553    ///
554    /// # Examples
555    /// ```
556    /// let active_count = db.query("users")
557    ///     .filter(|f| f.eq("status", "active"))
558    ///     .count()
559    ///     .await?;
560    /// ```
561    pub async fn count(self) -> Result<usize> {
562        self.collect().await.map(|docs| docs.len())
563    }
564
565    /// Update documents matching the query with new field values
566    ///
567    /// # Returns
568    /// The number of documents updated
569    ///
570    /// # Examples
571    /// ```
572    /// let updated = db.query("products")
573    ///     .filter(|f| f.lt("stock", 5))
574    ///     .update([
575    ///         ("status", Value::String("low_stock".to_string())),
576    ///         ("needs_reorder", Value::Bool(true))
577    ///     ].into_iter().collect())
578    ///     .await?;
579    /// ```
580    pub async fn update(self, updates: HashMap<&str, Value>) -> Result<usize> {
581        // Store a reference to the db and collection before consuming self
582        let db = self.db;
583        let collection = self.collection.clone();
584
585        let docs = self.collect().await?;
586        let mut updated_count = 0;
587
588        for doc in docs {
589            let mut updated_doc = doc.clone();
590            let mut changed = false;
591
592            for (field, value) in &updates {
593                updated_doc.data.insert(field.to_string(), value.clone());
594                changed = true;
595            }
596
597            if changed {
598                // Update document in the database
599                db.put(
600                    format!("{}:{}", collection, updated_doc.id),
601                    serde_json::to_vec(&updated_doc)?,
602                    None,
603                )?;
604                updated_count += 1;
605            }
606        }
607
608        Ok(updated_count)
609    }
610}
611
612/// Builder for performing full-text search operations
613///
614/// # Examples
615/// ```
616/// let results = db.search("products")
617///     .field("description")
618///     .matching("wireless headphones")
619///     .fuzzy(true)
620///     .collect()
621///     .await?;
622/// ```
623pub struct SearchBuilder<'a> {
624    db: &'a Aurora,
625    collection: String,
626    field: Option<String>,
627    query: Option<String>,
628    fuzzy: bool,
629}
630
631impl<'a> SearchBuilder<'a> {
632    /// Create a new search builder for the specified collection
633    pub fn new(db: &'a Aurora, collection: &str) -> Self {
634        Self {
635            db,
636            collection: collection.to_string(),
637            field: None,
638            query: None,
639            fuzzy: false,
640        }
641    }
642
643    /// Specify the field to search in
644    ///
645    /// # Examples
646    /// ```
647    /// .field("description")
648    /// ```
649    pub fn field(mut self, field: &str) -> Self {
650        self.field = Some(field.to_string());
651        self
652    }
653
654    /// Specify the search query text
655    ///
656    /// # Examples
657    /// ```
658    /// .matching("wireless headphones")
659    /// ```
660    pub fn matching(mut self, query: &str) -> Self {
661        self.query = Some(query.to_string());
662        self
663    }
664
665    /// Enable or disable fuzzy matching (for typo tolerance)
666    ///
667    /// # Examples
668    /// ```
669    /// .fuzzy(true)  // Enable fuzzy matching
670    /// ```
671    pub fn fuzzy(mut self, enable: bool) -> Self {
672        self.fuzzy = enable;
673        self
674    }
675
676    /// Execute the search and collect the results
677    ///
678    /// # Returns
679    /// A vector of documents matching the search criteria
680    ///
681    /// # Examples
682    /// ```
683    /// let results = db.search("articles")
684    ///     .field("content")
685    ///     .matching("quantum computing")
686    ///     .collect()
687    ///     .await?;
688    /// ```
689    pub async fn collect(self) -> Result<Vec<Document>> {
690        let field = self
691            .field
692            .ok_or_else(|| AuroraError::InvalidOperation("Search field not specified".into()))?;
693        let query = self
694            .query
695            .ok_or_else(|| AuroraError::InvalidOperation("Search query not specified".into()))?;
696
697        self.db.search_text(&self.collection, &field, &query).await
698    }
699}
700
701#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
702pub struct SimpleQueryBuilder {
703    pub collection: String,
704    pub filters: Vec<Filter>,
705}
706
707impl SimpleQueryBuilder {
708    pub fn new(collection: String) -> Self {
709        Self {
710            collection,
711            filters: Vec::new(),
712        }
713    }
714
715    pub fn filter(mut self, filter: Filter) -> Self {
716        self.filters.push(filter);
717        self
718    }
719
720    pub fn eq(self, field: &str, value: Value) -> Self {
721        self.filter(Filter::Eq(field.to_string(), value))
722    }
723
724    pub fn gt(self, field: &str, value: Value) -> Self {
725        self.filter(Filter::Gt(field.to_string(), value))
726    }
727
728    pub fn gte(self, field: &str, value: Value) -> Self {
729        self.filter(Filter::Gte(field.to_string(), value))
730    }
731
732    pub fn lt(self, field: &str, value: Value) -> Self {
733        self.filter(Filter::Lt(field.to_string(), value))
734    }
735
736    pub fn lte(self, field: &str, value: Value) -> Self {
737        self.filter(Filter::Lte(field.to_string(), value))
738    }
739
740    pub fn contains(self, field: &str, value: &str) -> Self {
741        self.filter(Filter::Contains(field.to_string(), value.to_string()))
742    }
743
744    /// Convenience method for range queries
745    pub fn between(self, field: &str, min: Value, max: Value) -> Self {
746        self.filter(Filter::Gte(field.to_string(), min))
747            .filter(Filter::Lte(field.to_string(), max))
748    }
749}
750
751#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
752pub enum Filter {
753    Eq(String, Value),
754    Gt(String, Value),
755    Gte(String, Value),
756    Lt(String, Value),
757    Lte(String, Value),
758    Contains(String, String),
759}