aurora_db/query.rs
1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5//!
6//! ## Examples
7//!
8//! ```rust
9//! // Get all active users over 21
10//! let users = db.query("users")
11//! .filter(|f| f.eq("status", "active") && f.gt("age", 21))
12//! .order_by("last_login", false)
13//! .limit(20)
14//! .collect()
15//! .await?;
16//! ```
17
18use crate::Aurora;
19use crate::error::AuroraError;
20use crate::error::Result;
21use crate::types::{Document, Value};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25/// Trait for objects that can filter documents.
26///
27/// This trait is implemented for closures that take a reference to a
28/// `FilterBuilder` and return a boolean, allowing for a natural filter syntax.
29pub trait Queryable {
30 fn matches(&self, doc: &Document) -> bool;
31}
32
33impl<F> Queryable for F
34where
35 F: Fn(&Document) -> bool,
36{
37 fn matches(&self, doc: &Document) -> bool {
38 self(doc)
39 }
40}
41
42/// Builder for creating and executing document queries.
43///
44/// QueryBuilder uses a fluent interface pattern to construct
45/// and execute queries against Aurora collections.
46///
47/// # Examples
48///
49/// ```
50/// // Query for active premium users
51/// let premium_users = db.query("users")
52/// .filter(|f| f.eq("status", "active") && f.eq("account_type", "premium"))
53/// .order_by("created_at", false)
54/// .limit(10)
55/// .collect()
56/// .await?;
57/// ```
58pub struct QueryBuilder<'a> {
59 db: &'a Aurora,
60 collection: String,
61 // CHANGE 1: Add `+ Send + Sync` to make the boxed closure thread-safe.
62 filters: Vec<Box<dyn Fn(&Document) -> bool + Send + Sync + 'a>>,
63 order_by: Option<(String, bool)>,
64 limit: Option<usize>,
65 offset: Option<usize>,
66 fields: Option<Vec<String>>,
67}
68
69/// Builder for constructing document filter expressions.
70///
71/// This struct provides methods for comparing document fields
72/// with values to create filter conditions.
73///
74/// # Examples
75///
76/// ```
77/// // Combine multiple filter conditions
78/// db.query("products")
79/// .filter(|f| {
80/// f.gte("price", 10.0) &&
81/// f.lte("price", 50.0) &&
82/// f.contains("name", "widget")
83/// })
84/// .collect()
85/// .await?;
86/// ```
87pub struct FilterBuilder<'a, 'b> {
88 doc: &'b Document,
89 _marker: std::marker::PhantomData<&'a ()>,
90}
91
92impl<'a, 'b> FilterBuilder<'a, 'b> {
93 /// Create a new filter builder for the given document
94 pub fn new(doc: &'b Document) -> Self {
95 Self {
96 doc,
97 _marker: std::marker::PhantomData,
98 }
99 }
100
101 /// Check if a field equals a value
102 ///
103 /// # Examples
104 /// ```
105 /// .filter(|f| f.eq("status", "active"))
106 /// ```
107 pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> bool {
108 let value = value.into();
109 self.doc.data.get(field).map_or(false, |v| v == &value)
110 }
111
112 /// Check if a field is greater than a value
113 ///
114 /// # Examples
115 /// ```
116 /// .filter(|f| f.gt("age", 21))
117 /// ```
118 pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
119 let value = value.into();
120 self.doc.data.get(field).map_or(false, |v| v > &value)
121 }
122
123 /// Check if a field is greater than or equal to a value
124 ///
125 /// # Examples
126 /// ```
127 /// .filter(|f| f.gte("age", 21))
128 /// ```
129 pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
130 let value = value.into();
131 self.doc.data.get(field).map_or(false, |v| v >= &value)
132 }
133
134 /// Check if a field is less than a value
135 ///
136 /// # Examples
137 /// ```
138 /// .filter(|f| f.lt("age", 65))
139 /// ```
140 pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
141 let value = value.into();
142 self.doc.data.get(field).map_or(false, |v| v < &value)
143 }
144
145 /// Check if a field is less than or equal to a value
146 ///
147 /// # Examples
148 /// ```
149 /// .filter(|f| f.lte("age", 65))
150 /// ```
151 pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
152 let value = value.into();
153 self.doc.data.get(field).map_or(false, |v| v <= &value)
154 }
155
156 /// Check if a field contains a value
157 ///
158 /// # Examples
159 /// ```
160 /// .filter(|f| f.contains("name", "widget"))
161 /// ```
162 pub fn contains(&self, field: &str, value: &str) -> bool {
163 self.doc.data.get(field).map_or(false, |v| match v {
164 Value::String(s) => s.contains(value),
165 Value::Array(arr) => arr.contains(&Value::String(value.to_string())),
166 _ => false,
167 })
168 }
169
170 /// Check if a field is in a list of values
171 ///
172 /// # Examples
173 /// ```
174 /// .filter(|f| f.in_values("status", &["active", "inactive"]))
175 /// ```
176 pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> bool {
177 let values: Vec<Value> = values.iter().map(|v| v.clone().into()).collect();
178 self.doc
179 .data
180 .get(field)
181 .map_or(false, |v| values.contains(v))
182 }
183
184 /// Check if a field is between two values (inclusive)
185 ///
186 /// # Examples
187 /// ```
188 /// .filter(|f| f.between("age", 18, 65))
189 /// ```
190 pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> bool {
191 self.gte(field, min) && self.lte(field, max)
192 }
193
194 /// Check if a field exists and is not null
195 ///
196 /// # Examples
197 /// ```
198 /// .filter(|f| f.exists("email"))
199 /// ```
200 pub fn exists(&self, field: &str) -> bool {
201 self.doc
202 .data
203 .get(field)
204 .map_or(false, |v| !matches!(v, Value::Null))
205 }
206
207 /// Check if a field doesn't exist or is null
208 ///
209 /// # Examples
210 /// ```
211 /// .filter(|f| f.is_null("email"))
212 /// ```
213 pub fn is_null(&self, field: &str) -> bool {
214 self.doc
215 .data
216 .get(field)
217 .map_or(true, |v| matches!(v, Value::Null))
218 }
219
220 /// Check if a field starts with a prefix
221 ///
222 /// # Examples
223 /// ```
224 /// .filter(|f| f.starts_with("name", "John"))
225 /// ```
226 pub fn starts_with(&self, field: &str, prefix: &str) -> bool {
227 self.doc.data.get(field).map_or(false, |v| match v {
228 Value::String(s) => s.starts_with(prefix),
229 _ => false,
230 })
231 }
232
233 /// Check if a field ends with a suffix
234 ///
235 /// # Examples
236 /// ```
237 /// .filter(|f| f.ends_with("name", "son"))
238 /// ```
239 pub fn ends_with(&self, field: &str, suffix: &str) -> bool {
240 self.doc.data.get(field).map_or(false, |v| match v {
241 Value::String(s) => s.ends_with(suffix),
242 _ => false,
243 })
244 }
245
246 /// Check if a field is in an array
247 ///
248 /// # Examples
249 /// ```
250 /// .filter(|f| f.array_contains("status", "active"))
251 /// ```
252 pub fn array_contains(&self, field: &str, value: impl Into<Value>) -> bool {
253 let value = value.into();
254 self.doc.data.get(field).map_or(false, |v| match v {
255 Value::Array(arr) => arr.contains(&value),
256 _ => false,
257 })
258 }
259
260 /// Check if an array has a specific length
261 ///
262 /// # Examples
263 /// ```
264 /// .filter(|f| f.array_len_eq("status", 2))
265 /// ```
266 pub fn array_len_eq(&self, field: &str, len: usize) -> bool {
267 self.doc.data.get(field).map_or(false, |v| match v {
268 Value::Array(arr) => arr.len() == len,
269 _ => false,
270 })
271 }
272
273 /// Access a nested field using dot notation
274 ///
275 /// # Examples
276 /// ```
277 /// .filter(|f| f.get_nested_value("user.address.city") == Some(&Value::String("New York")))
278 /// ```
279 pub fn get_nested_value(&self, path: &str) -> Option<&Value> {
280 let parts: Vec<&str> = path.split('.').collect();
281 let mut current = self.doc.data.get(parts[0])?;
282
283 for &part in &parts[1..] {
284 if let Value::Object(map) = current {
285 current = map.get(part)?;
286 } else {
287 return None;
288 }
289 }
290
291 Some(current)
292 }
293
294 /// Check if a nested field equals a value
295 ///
296 /// # Examples
297 /// ```
298 /// .filter(|f| f.nested_eq("user.address.city", "New York"))
299 /// ```
300 pub fn nested_eq<T: Into<Value>>(&self, path: &str, value: T) -> bool {
301 let value = value.into();
302 self.get_nested_value(path).map_or(false, |v| v == &value)
303 }
304
305 /// Check if a field matches a regular expression
306 ///
307 /// # Examples
308 /// ```
309 /// .filter(|f| f.matches_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
310 /// ```
311 pub fn matches_regex(&self, field: &str, pattern: &str) -> bool {
312 use regex::Regex;
313
314 if let Ok(re) = Regex::new(pattern) {
315 self.doc.data.get(field).map_or(false, |v| match v {
316 Value::String(s) => re.is_match(s),
317 _ => false,
318 })
319 } else {
320 false
321 }
322 }
323}
324
325impl<'a> QueryBuilder<'a> {
326 /// Create a new query builder for the specified collection
327 ///
328 /// # Examples
329 /// ```
330 /// let query = db.query("users");
331 /// ```
332 pub fn new(db: &'a Aurora, collection: &str) -> Self {
333 Self {
334 db,
335 collection: collection.to_string(),
336 filters: Vec::new(),
337 order_by: None,
338 limit: None,
339 offset: None,
340 fields: None,
341 }
342 }
343
344 /// Add a filter function to the query
345 ///
346 /// # Examples
347 /// ```
348 /// let active_users = db.query("users")
349 /// .filter(|f| f.eq("status", "active"))
350 /// .collect()
351 /// .await?;
352 /// ```
353 pub fn filter<F>(mut self, filter_fn: F) -> Self
354 where
355 // CHANGE 2: Require the closure `F` to be `Send + Sync`.
356 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'a,
357 {
358 self.filters.push(Box::new(move |doc| {
359 let filter_builder = FilterBuilder::new(doc);
360 filter_fn(&filter_builder)
361 }));
362 self
363 }
364
365 /// Sort results by a field (ascending or descending)
366 ///
367 /// # Parameters
368 /// * `field` - The field to sort by
369 /// * `ascending` - `true` for ascending order, `false` for descending
370 ///
371 /// # Examples
372 /// ```
373 /// // Sort by age ascending
374 /// .order_by("age", true)
375 ///
376 /// // Sort by creation date descending (newest first)
377 /// .order_by("created_at", false)
378 /// ```
379 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
380 self.order_by = Some((field.to_string(), ascending));
381 self
382 }
383
384 /// Limit the number of results returned
385 ///
386 /// # Examples
387 /// ```
388 /// // Get at most 10 results
389 /// .limit(10)
390 /// ```
391 pub fn limit(mut self, limit: usize) -> Self {
392 self.limit = Some(limit);
393 self
394 }
395
396 /// Skip a number of results (for pagination)
397 ///
398 /// # Examples
399 /// ```
400 /// // For pagination: skip the first 20 results and get the next 10
401 /// .offset(20).limit(10)
402 /// ```
403 pub fn offset(mut self, offset: usize) -> Self {
404 self.offset = Some(offset);
405 self
406 }
407
408 /// Select only specific fields to return
409 ///
410 /// # Examples
411 /// ```
412 /// // Only return name and email fields
413 /// .select(&["name", "email"])
414 /// ```
415 pub fn select(mut self, fields: &[&str]) -> Self {
416 self.fields = Some(fields.iter().map(|s| s.to_string()).collect());
417 self
418 }
419
420 /// Execute the query and collect the results
421 ///
422 /// # Returns
423 /// A vector of documents matching the query criteria
424 ///
425 /// # Examples
426 /// ```
427 /// let results = db.query("products")
428 /// .filter(|f| f.lt("price", 100))
429 /// .collect()
430 /// .await?;
431 /// ```
432 pub async fn collect(self) -> Result<Vec<Document>> {
433 // Ensure indices are initialized
434 self.db.ensure_indices_initialized().await?;
435
436 let mut docs = self.db.get_all_collection(&self.collection).await?;
437
438 // Apply filters
439 docs.retain(|doc| self.filters.iter().all(|f| f(doc)));
440
441 // Apply ordering
442 if let Some((field, ascending)) = self.order_by {
443 docs.sort_by(|a, b| match (a.data.get(&field), b.data.get(&field)) {
444 (Some(v1), Some(v2)) => {
445 let cmp = v1.cmp(v2);
446 if ascending { cmp } else { cmp.reverse() }
447 }
448 (None, Some(_)) => std::cmp::Ordering::Less,
449 (Some(_), None) => std::cmp::Ordering::Greater,
450 (None, None) => std::cmp::Ordering::Equal,
451 });
452 }
453
454 // Apply field selection if specified
455 if let Some(fields) = self.fields {
456 // Create new documents with only selected fields
457 docs = docs
458 .into_iter()
459 .map(|doc| {
460 let mut new_data = HashMap::new();
461 // Always include the ID
462 for field in &fields {
463 if let Some(value) = doc.data.get(field) {
464 new_data.insert(field.clone(), value.clone());
465 }
466 }
467 Document {
468 id: doc.id,
469 data: new_data,
470 }
471 })
472 .collect();
473 }
474
475 // Apply offset and limit safely
476 let start = self.offset.unwrap_or(0);
477 let end = self
478 .limit
479 .map(|l| start.saturating_add(l))
480 .unwrap_or(docs.len());
481
482 // Ensure we don't go out of bounds
483 let end = end.min(docs.len());
484 Ok(docs.get(start..end).unwrap_or(&[]).to_vec())
485 }
486
487 /// Watch the query for real-time updates
488 ///
489 /// Returns a QueryWatcher that emits updates when documents are added,
490 /// removed, or modified in ways that affect the query results.
491 ///
492 /// Note: This method requires the QueryBuilder to have a 'static lifetime,
493 /// which means the database reference must also be 'static (e.g., Arc<Aurora>).
494 ///
495 /// # Examples
496 /// ```
497 /// let mut watcher = db.query("users")
498 /// .filter(|f| f.eq("active", true))
499 /// .watch()
500 /// .await?;
501 ///
502 /// // Receive updates in real-time
503 /// while let Some(update) = watcher.next().await {
504 /// match update {
505 /// QueryUpdate::Added(doc) => println!("New: {:?}", doc),
506 /// QueryUpdate::Removed(doc) => println!("Removed: {:?}", doc),
507 /// QueryUpdate::Modified { old, new } => println!("Modified: {:?}", new),
508 /// }
509 /// }
510 /// ```
511 pub async fn watch(mut self) -> Result<crate::reactive::QueryWatcher>
512 where
513 'a: 'static,
514 {
515 use crate::reactive::{QueryWatcher, ReactiveQueryState};
516 use std::sync::Arc;
517
518 // Extract the filters before consuming self
519 let collection = self.collection.clone();
520 let db = self.db;
521 let filters = std::mem::take(&mut self.filters);
522
523 // Get initial results
524 let docs = self.collect().await?;
525
526 // Create a listener for this collection
527 let listener = db.listen(&collection);
528
529 // Create filter closure that combines all the query filters
530 let filter_fn = move |doc: &Document| -> bool { filters.iter().all(|f| f(doc)) };
531
532 // Create reactive state
533 let state = Arc::new(ReactiveQueryState::new(filter_fn));
534
535 // Create and return watcher
536 Ok(QueryWatcher::new(collection, listener, state, docs))
537 }
538
539 /// Get only the first matching document or None if no matches
540 ///
541 /// # Examples
542 /// ```
543 /// let user = db.query("users")
544 /// .filter(|f| f.eq("email", "jane@example.com"))
545 /// .first_one()
546 /// .await?;
547 /// ```
548 pub async fn first_one(self) -> Result<Option<Document>> {
549 self.limit(1).collect().await.map(|mut docs| docs.pop())
550 }
551
552 /// Count the number of documents matching the query
553 ///
554 /// # Examples
555 /// ```
556 /// let active_count = db.query("users")
557 /// .filter(|f| f.eq("status", "active"))
558 /// .count()
559 /// .await?;
560 /// ```
561 pub async fn count(self) -> Result<usize> {
562 self.collect().await.map(|docs| docs.len())
563 }
564
565 /// Update documents matching the query with new field values
566 ///
567 /// # Returns
568 /// The number of documents updated
569 ///
570 /// # Examples
571 /// ```
572 /// let updated = db.query("products")
573 /// .filter(|f| f.lt("stock", 5))
574 /// .update([
575 /// ("status", Value::String("low_stock".to_string())),
576 /// ("needs_reorder", Value::Bool(true))
577 /// ].into_iter().collect())
578 /// .await?;
579 /// ```
580 pub async fn update(self, updates: HashMap<&str, Value>) -> Result<usize> {
581 // Store a reference to the db and collection before consuming self
582 let db = self.db;
583 let collection = self.collection.clone();
584
585 let docs = self.collect().await?;
586 let mut updated_count = 0;
587
588 for doc in docs {
589 let mut updated_doc = doc.clone();
590 let mut changed = false;
591
592 for (field, value) in &updates {
593 updated_doc.data.insert(field.to_string(), value.clone());
594 changed = true;
595 }
596
597 if changed {
598 // Update document in the database
599 db.put(
600 format!("{}:{}", collection, updated_doc.id),
601 serde_json::to_vec(&updated_doc)?,
602 None,
603 )?;
604 updated_count += 1;
605 }
606 }
607
608 Ok(updated_count)
609 }
610}
611
612/// Builder for performing full-text search operations
613///
614/// # Examples
615/// ```
616/// let results = db.search("products")
617/// .field("description")
618/// .matching("wireless headphones")
619/// .fuzzy(true)
620/// .collect()
621/// .await?;
622/// ```
623pub struct SearchBuilder<'a> {
624 db: &'a Aurora,
625 collection: String,
626 field: Option<String>,
627 query: Option<String>,
628 fuzzy: bool,
629}
630
631impl<'a> SearchBuilder<'a> {
632 /// Create a new search builder for the specified collection
633 pub fn new(db: &'a Aurora, collection: &str) -> Self {
634 Self {
635 db,
636 collection: collection.to_string(),
637 field: None,
638 query: None,
639 fuzzy: false,
640 }
641 }
642
643 /// Specify the field to search in
644 ///
645 /// # Examples
646 /// ```
647 /// .field("description")
648 /// ```
649 pub fn field(mut self, field: &str) -> Self {
650 self.field = Some(field.to_string());
651 self
652 }
653
654 /// Specify the search query text
655 ///
656 /// # Examples
657 /// ```
658 /// .matching("wireless headphones")
659 /// ```
660 pub fn matching(mut self, query: &str) -> Self {
661 self.query = Some(query.to_string());
662 self
663 }
664
665 /// Enable or disable fuzzy matching (for typo tolerance)
666 ///
667 /// # Examples
668 /// ```
669 /// .fuzzy(true) // Enable fuzzy matching
670 /// ```
671 pub fn fuzzy(mut self, enable: bool) -> Self {
672 self.fuzzy = enable;
673 self
674 }
675
676 /// Execute the search and collect the results
677 ///
678 /// # Returns
679 /// A vector of documents matching the search criteria
680 ///
681 /// # Examples
682 /// ```
683 /// let results = db.search("articles")
684 /// .field("content")
685 /// .matching("quantum computing")
686 /// .collect()
687 /// .await?;
688 /// ```
689 pub async fn collect(self) -> Result<Vec<Document>> {
690 let field = self
691 .field
692 .ok_or_else(|| AuroraError::InvalidOperation("Search field not specified".into()))?;
693 let query = self
694 .query
695 .ok_or_else(|| AuroraError::InvalidOperation("Search query not specified".into()))?;
696
697 self.db.search_text(&self.collection, &field, &query).await
698 }
699}
700
701#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
702pub struct SimpleQueryBuilder {
703 pub collection: String,
704 pub filters: Vec<Filter>,
705}
706
707impl SimpleQueryBuilder {
708 pub fn new(collection: String) -> Self {
709 Self {
710 collection,
711 filters: Vec::new(),
712 }
713 }
714
715 pub fn filter(mut self, filter: Filter) -> Self {
716 self.filters.push(filter);
717 self
718 }
719
720 pub fn eq(self, field: &str, value: Value) -> Self {
721 self.filter(Filter::Eq(field.to_string(), value))
722 }
723
724 pub fn gt(self, field: &str, value: Value) -> Self {
725 self.filter(Filter::Gt(field.to_string(), value))
726 }
727
728 pub fn gte(self, field: &str, value: Value) -> Self {
729 self.filter(Filter::Gte(field.to_string(), value))
730 }
731
732 pub fn lt(self, field: &str, value: Value) -> Self {
733 self.filter(Filter::Lt(field.to_string(), value))
734 }
735
736 pub fn lte(self, field: &str, value: Value) -> Self {
737 self.filter(Filter::Lte(field.to_string(), value))
738 }
739
740 pub fn contains(self, field: &str, value: &str) -> Self {
741 self.filter(Filter::Contains(field.to_string(), value.to_string()))
742 }
743
744 /// Convenience method for range queries
745 pub fn between(self, field: &str, min: Value, max: Value) -> Self {
746 self.filter(Filter::Gte(field.to_string(), min))
747 .filter(Filter::Lte(field.to_string(), max))
748 }
749}
750
751#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
752pub enum Filter {
753 Eq(String, Value),
754 Gt(String, Value),
755 Gte(String, Value),
756 Lt(String, Value),
757 Lte(String, Value),
758 Contains(String, String),
759}