Skip to main content

aurora_db/
query.rs

1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5//!
6//! ## Architectural Note: "The Query as a Stream"
7//!
8//! Unlike traditional databases that parse SQL strings into an execution plan,
9//! Aurora's query engine is built around the **Fluent Builder Pattern** and **Iterators**.
10//!
11//! 1.  **Type-Safe Construction**: Queries are built using Rust method chaining, ensuring
12//!     validity at compile time (or runtime construction without parsing overhead).
13//! 2.  **Zero-Allocation Filtering**: The `Queryable` trait and `DocumentFilter` type allow
14//!     us to stack closures. These filters run directly on the data stream without
15//!     intermediate allocations.
16//! 3.  **Reactive Core**: The `watch()` method transforms a static query into a
17//!     live stream of `QueryUpdate` events. This is the heart of the "Embedded Platform"
18//!     concept—allowing the application to react to data changes instantly.
19//!
20//! ## Examples
21//!
22
23//! // Get all active users over 21
24//! let users = db.query("users")
25//!     .filter(|f| f.eq("status", "active") && f.gt("age", 21))
26//!     .order_by("last_login", false)
27//!     .limit(20)
28//!     .collect()
29//!     .await?;
30//! ```
31
32use crate::Aurora;
33use crate::error::AqlError;
34use crate::error::Result;
35use crate::types::{Document, Value};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Trait for objects that can filter documents.
40///
41/// This trait is implemented for closures that take a reference to a
42/// `FilterBuilder` and return a boolean, allowing for a natural filter syntax.
43pub trait Queryable {
44    fn matches(&self, doc: &Document) -> bool;
45}
46
47impl<F> Queryable for F
48where
49    F: Fn(&Document) -> bool,
50{
51    fn matches(&self, doc: &Document) -> bool {
52        self(doc)
53    }
54}
55
56/// Type alias for document filter functions
57type DocumentFilter<'a> = Box<dyn Fn(&Document) -> bool + Send + Sync + 'a>;
58
59/// Builder for creating and executing document queries.
60///
61/// QueryBuilder uses a fluent interface pattern to construct
62/// and execute queries against Aurora collections.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// // Query for active premium users
68/// let premium_users = db.query("users")
69///     .filter(|f| f.eq("status", "active") && f.eq("account_type", "premium"))
70///     .order_by("created_at", false)
71///     .limit(10)
72///     .collect()
73///     .await?;
74/// ```
75pub struct QueryBuilder<'a> {
76    db: &'a Aurora,
77    collection: String,
78    filters: Vec<DocumentFilter<'a>>,
79    order_by: Option<(String, bool)>,
80    limit: Option<usize>,
81    offset: Option<usize>,
82    fields: Option<Vec<String>>,
83    /// Optional debounce duration for reactive queries (watch)
84    /// When set, updates are batched and emitted at most once per interval
85    debounce_duration: Option<std::time::Duration>,
86}
87
88/// Builder for constructing document filter expressions.
89///
90/// This struct provides methods for comparing document fields
91/// with values to create filter conditions.
92///
93/// # Examples
94///
95/// ```rust,no_run
96/// // Combine multiple filter conditions
97/// db.query("products")
98///     .filter(|f| {
99///         f.gte("price", 10.0) &&
100///         f.lte("price", 50.0) &&
101///         f.contains("name", "widget")
102///     })
103///     .collect()
104///     .await?;
105/// ```
106pub struct FilterBuilder<'a, 'b> {
107    doc: &'b Document,
108    _marker: std::marker::PhantomData<&'a ()>,
109}
110
111impl<'a, 'b> FilterBuilder<'a, 'b> {
112    /// Create a new filter builder for the given document
113    pub fn new(doc: &'b Document) -> Self {
114        Self {
115            doc,
116            _marker: std::marker::PhantomData,
117        }
118    }
119
120    /// Check if a field equals a value
121    ///
122    /// # Examples
123
124    /// .filter(|f| f.eq("status", "active"))
125    /// ```
126    pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> bool {
127        let value = value.into();
128        self.get_nested_value(field) == Some(&value)
129    }
130
131    /// Check if a field is greater than a value
132    ///
133    /// # Examples
134
135    /// .filter(|f| f.gt("age", 21))
136    /// ```
137    pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
138        let value = value.into();
139        self.get_nested_value(field).is_some_and(|v| v > &value)
140    }
141
142    /// Check if a field is greater than or equal to a value
143    ///
144    /// # Examples
145
146    /// .filter(|f| f.gte("age", 21))
147    /// ```
148    pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
149        let value = value.into();
150        self.get_nested_value(field).is_some_and(|v| v >= &value)
151    }
152
153    /// Check if a field is less than a value
154    ///
155    /// # Examples
156
157    /// .filter(|f| f.lt("age", 65))
158    /// ```
159    pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
160        let value = value.into();
161        self.get_nested_value(field).is_some_and(|v| v < &value)
162    }
163
164    /// Check if a field is less than or equal to a value
165    ///
166    /// # Examples
167
168    /// .filter(|f| f.lte("age", 65))
169    /// ```
170    pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
171        let value = value.into();
172        self.get_nested_value(field).is_some_and(|v| v <= &value)
173    }
174
175    /// Check if a field contains a value
176    ///
177    /// # Examples
178
179    /// .filter(|f| f.contains("name", "widget"))
180    /// ```
181    pub fn contains(&self, field: &str, value: &str) -> bool {
182        self.get_nested_value(field).is_some_and(|v| match v {
183            Value::String(s) => s.contains(value),
184            Value::Array(arr) => arr.contains(&Value::String(value.to_string())),
185            _ => false,
186        })
187    }
188
189    /// Check if a field is in a list of values
190    ///
191    /// # Examples
192
193    /// .filter(|f| f.in_values("status", &["active", "inactive"]))
194    /// ```
195    pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> bool {
196        let values: Vec<Value> = values.iter().map(|v| v.clone().into()).collect();
197        self.get_nested_value(field)
198            .is_some_and(|v| values.contains(v))
199    }
200
201    /// Check if a field is between two values (inclusive)
202    ///
203    /// # Examples
204
205    /// .filter(|f| f.between("age", 18, 65))
206    /// ```
207    pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> bool {
208        self.gte(field, min) && self.lte(field, max)
209    }
210
211    /// Check if a field exists and is not null
212    ///
213    /// # Examples
214
215    /// .filter(|f| f.exists("email"))
216    /// ```
217    pub fn exists(&self, field: &str) -> bool {
218        self.get_nested_value(field)
219            .is_some_and(|v| !matches!(v, Value::Null))
220    }
221
222    /// Check if a field doesn't exist or is null
223    ///
224    /// # Examples
225
226    /// .filter(|f| f.is_null("email"))
227    /// ```
228    pub fn is_null(&self, field: &str) -> bool {
229        self.get_nested_value(field)
230            .is_none_or(|v| matches!(v, Value::Null))
231    }
232
233    /// Check if a field starts with a prefix
234    ///
235    /// # Examples
236
237    /// .filter(|f| f.starts_with("name", "John"))
238    /// ```
239    pub fn starts_with(&self, field: &str, prefix: &str) -> bool {
240        self.get_nested_value(field).is_some_and(|v| match v {
241            Value::String(s) => s.starts_with(prefix),
242            _ => false,
243        })
244    }
245
246    /// Check if a field ends with a suffix
247    ///
248    /// # Examples
249
250    /// .filter(|f| f.ends_with("name", "son"))
251    /// ```
252    pub fn ends_with(&self, field: &str, suffix: &str) -> bool {
253        self.get_nested_value(field).is_some_and(|v| match v {
254            Value::String(s) => s.ends_with(suffix),
255            _ => false,
256        })
257    }
258
259    /// Check if a field is in an array
260    ///
261    /// # Examples
262
263    /// .filter(|f| f.array_contains("status", "active"))
264    /// ```
265    pub fn array_contains(&self, field: &str, value: impl Into<Value>) -> bool {
266        let value = value.into();
267        self.get_nested_value(field).is_some_and(|v| match v {
268            Value::Array(arr) => arr.contains(&value),
269            _ => false,
270        })
271    }
272
273    /// Check if an array has a specific length
274    ///
275    /// # Examples
276
277    /// .filter(|f| f.array_len_eq("status", 2))
278    /// ```
279    pub fn array_len_eq(&self, field: &str, len: usize) -> bool {
280        self.get_nested_value(field).is_some_and(|v| match v {
281            Value::Array(arr) => arr.len() == len,
282            _ => false,
283        })
284    }
285
286    /// Access a nested field using dot notation
287    ///
288    /// # Examples
289
290    /// .filter(|f| f.get_nested_value("user.address.city") == Some(&Value::String("New York")))
291    /// ```
292    pub fn get_nested_value(&self, path: &str) -> Option<&Value> {
293        let parts: Vec<&str> = path.split('.').collect();
294        let mut current = self.doc.data.get(parts[0])?;
295
296        for &part in &parts[1..] {
297            if let Value::Object(map) = current {
298                current = map.get(part)?;
299            } else {
300                return None;
301            }
302        }
303
304        Some(current)
305    }
306
307    /// Check if a nested field equals a value
308    ///
309    /// # Examples
310
311    /// .filter(|f| f.nested_eq("user.address.city", "New York"))
312    /// ```
313    pub fn nested_eq<T: Into<Value>>(&self, path: &str, value: T) -> bool {
314        let value = value.into();
315        self.get_nested_value(path) == Some(&value)
316    }
317
318    /// Check if a field matches a regular expression
319    ///
320    /// # Examples
321
322    /// .filter(|f| f.matches_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
323    /// ```
324    pub fn matches_regex(&self, field: &str, pattern: &str) -> bool {
325        use regex::Regex;
326
327        // Prevent DoS via overly complex/long regex
328        if pattern.len() > 100 {
329            return false;
330        }
331
332        if let Ok(re) = Regex::new(pattern) {
333            self.get_nested_value(field).is_some_and(|v| match v {
334                Value::String(s) => re.is_match(s),
335                _ => false,
336            })
337        } else {
338            false
339        }
340    }
341}
342
343impl<'a> QueryBuilder<'a> {
344    /// Create a new query builder for the specified collection
345    ///
346    /// # Examples
347
348    /// let query = db.query("users");
349    /// ```
350    pub fn new(db: &'a Aurora, collection: &str) -> Self {
351        Self {
352            db,
353            collection: collection.to_string(),
354            filters: Vec::new(),
355            order_by: None,
356            limit: None,
357            offset: None,
358            fields: None,
359            debounce_duration: None,
360        }
361    }
362
363    /// Add a filter function to the query
364    ///
365    /// # Examples
366
367    /// let active_users = db.query("users")
368    ///     .filter(|f| f.eq("status", "active"))
369    ///     .collect()
370    ///     .await?;
371    /// ```
372    pub fn filter<F>(mut self, filter_fn: F) -> Self
373    where
374        // CHANGE 2: Require the closure `F` to be `Send + Sync`.
375        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'a,
376    {
377        self.filters.push(Box::new(move |doc| {
378            let filter_builder = FilterBuilder::new(doc);
379            filter_fn(&filter_builder)
380        }));
381        self
382    }
383
384    /// Sort results by a field (ascending or descending)
385    ///
386    /// # Parameters
387    /// * `field` - The field to sort by
388    /// * `ascending` - `true` for ascending order, `false` for descending
389    ///
390    /// # Examples
391    /// ```
392    /// // Sort by age ascending
393    /// .order_by("age", true)
394    ///
395    /// // Sort by creation date descending (newest first)
396    /// .order_by("created_at", false)
397    /// ```
398    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
399        self.order_by = Some((field.to_string(), ascending));
400        self
401    }
402
403    /// Limit the number of results returned
404    ///
405    /// # Examples
406    /// ```
407    /// // Get at most 10 results
408    /// .limit(10)
409    /// ```
410    pub fn limit(mut self, limit: usize) -> Self {
411        self.limit = Some(limit);
412        self
413    }
414
415    /// Skip a number of results (for pagination)
416    ///
417    /// # Examples
418    /// ```
419    /// // For pagination: skip the first 20 results and get the next 10
420    /// .offset(20).limit(10)
421    /// ```
422    pub fn offset(mut self, offset: usize) -> Self {
423        self.offset = Some(offset);
424        self
425    }
426
427    /// Select only specific fields to return
428    ///
429    /// # Examples
430    /// ```
431    /// // Only return name and email fields
432    /// .select(&["name", "email"])
433    /// ```
434    pub fn select(mut self, fields: &[&str]) -> Self {
435        self.fields = Some(fields.iter().map(|s| s.to_string()).collect());
436        self
437    }
438
439    /// Set debounce/throttle interval for reactive queries (watch)
440    ///
441    /// When watching a query, updates will be batched and emitted at most once
442    /// per interval. This prevents overwhelming the UI with high-frequency updates.
443    /// Events are deduplicated by document ID, keeping only the latest state.
444    ///
445    /// # Examples
446    /// ```
447    /// use std::time::Duration;
448    ///
449    /// // Rate-limit to max 10 updates per second
450    /// let mut watcher = db.query("logs")
451    ///     .filter(|f| f.eq("level", "ERROR"))
452    ///     .debounce(Duration::from_millis(100))
453    ///     .watch()
454    ///     .await?;
455    /// ```
456    pub fn debounce(mut self, duration: std::time::Duration) -> Self {
457        self.debounce_duration = Some(duration);
458        self
459    }
460
461    /// Execute the query and collect the results
462    ///
463    /// # Returns
464    /// A vector of documents matching the query criteria
465    ///
466    /// # Examples
467
468    /// let results = db.query("products")
469    ///     .filter(|f| f.lt("price", 100))
470    ///     .collect()
471    ///     .await?;
472    /// ```
473    pub async fn collect(self) -> Result<Vec<Document>> {
474        self.collect_internal().await
475    }
476
477    /// Internal helper to execute query without consuming self
478    async fn collect_internal(&self) -> Result<Vec<Document>> {
479        // Ensure indices are initialized
480        self.db.ensure_indices_initialized().await?;
481
482        // Optimization: Use early termination for queries with LIMIT but no ORDER BY
483        let mut docs = if self.order_by.is_none() && self.limit.is_some() {
484            // Early termination path - scan only until we have enough results
485            let target = self.limit.unwrap() + self.offset.unwrap_or(0);
486            let filter = |doc: &Document| self.filters.iter().all(|f| f(doc));
487            self.db
488                .scan_and_filter(&self.collection, filter, Some(target))?
489        } else {
490            // Standard path - need all matching docs (for sorting or no limit)
491            let mut docs = self.db.get_all_collection(&self.collection).await?;
492            docs.retain(|doc| self.filters.iter().all(|f| f(doc)));
493            docs
494        };
495
496        // Apply ordering
497        if let Some((field, ascending)) = &self.order_by {
498            docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
499                (Some(v1), Some(v2)) => {
500                    let cmp = v1.cmp(v2);
501                    if *ascending { cmp } else { cmp.reverse() }
502                }
503                (None, Some(_)) => std::cmp::Ordering::Less,
504                (Some(_), None) => std::cmp::Ordering::Greater,
505                (None, None) => std::cmp::Ordering::Equal,
506            });
507        }
508
509        // Apply field selection if specified
510        if let Some(fields) = &self.fields {
511            // Create new documents with only selected fields
512            docs = docs
513                .into_iter()
514                .map(|doc| {
515                    let mut new_data = HashMap::new();
516                    // Always include the ID
517                    for field in fields {
518                        if let Some(value) = doc.data.get(field) {
519                            new_data.insert(field.clone(), value.clone());
520                        }
521                    }
522                    Document {
523                        id: doc.id,
524                        data: new_data,
525                    }
526                })
527                .collect();
528        }
529
530        // Apply offset and limit safely
531        let start = self.offset.unwrap_or(0);
532        let end = self
533            .limit
534            .map(|l| start.saturating_add(l))
535            .unwrap_or(docs.len());
536
537        // Ensure we don't go out of bounds
538        let end = end.min(docs.len());
539        Ok(docs.get(start..end).unwrap_or(&[]).to_vec())
540    }
541
542    /// Watch the query for real-time updates
543    ///
544    /// Returns a QueryWatcher that streams live updates when documents are added,
545    /// removed, or modified in ways that affect the query results. Perfect for
546    /// building reactive UIs, live dashboards, and real-time applications.
547    ///
548    /// # Performance
549    /// - Zero overhead for queries without watchers
550    /// - Updates delivered asynchronously via channels
551    /// - Automatic filtering - only matching changes are emitted
552    /// - Memory efficient - only tracks matching documents
553    ///
554    /// # Requirements
555    /// This method requires the QueryBuilder to have a 'static lifetime,
556    /// which means the database reference must also be 'static (e.g., Arc<Aurora>).
557    ///
558    /// # Examples
559    ///
560    /// ```
561    /// use aurora_db::{Aurora, types::Value};
562    /// use std::sync::Arc;
563    ///
564    /// let db = Arc::new(Aurora::open("mydb.db")?);
565    ///
566    /// // Basic reactive query - watch active users
567    /// let mut watcher = db.query("users")
568    ///     .filter(|f| f.eq("active", Value::Bool(true)))
569    ///     .watch()
570    ///     .await?;
571    ///
572    /// // Receive updates in real-time
573    /// while let Some(update) = watcher.next().await {
574    ///     match update {
575    ///         QueryUpdate::Added(doc) => {
576    ///             println!("New active user: {}", doc.id);
577    ///         },
578    ///         QueryUpdate::Removed(doc) => {
579    ///             println!("User deactivated: {}", doc.id);
580    ///         },
581    ///         QueryUpdate::Modified { old, new } => {
582    ///             println!("User updated: {} -> {}", old.id, new.id);
583    ///         },
584    ///     }
585    /// }
586    /// ```
587    ///
588    /// # Real-World Use Cases
589    ///
590    /// **Live Leaderboard:**
591    /// ```
592    /// // Watch top players by score
593    /// let mut leaderboard = db.query("players")
594    ///     .filter(|f| f.gte("score", Value::Int(1000)))
595    ///     .watch()
596    ///     .await?;
597    ///
598    /// tokio::spawn(async move {
599    ///     while let Some(update) = leaderboard.next().await {
600    ///         // Update UI with new rankings
601    ///         broadcast_to_clients(&update).await;
602    ///     }
603    /// });
604    /// ```
605    ///
606    /// **Activity Feed:**
607    /// ```
608    /// // Watch recent posts for a user's feed
609    /// let mut feed = db.query("posts")
610    ///     .filter(|f| f.eq("author_id", user_id))
611    ///     .watch()
612    ///     .await?;
613    ///
614    /// // Stream updates to WebSocket
615    /// while let Some(update) = feed.next().await {
616    ///     match update {
617    ///         QueryUpdate::Added(post) => {
618    ///             websocket.send(json!({"type": "new_post", "post": post})).await?;
619    ///         },
620    ///         _ => {}
621    ///     }
622    /// }
623    /// ```
624    ///
625    /// **Real-Time Dashboard:**
626    /// ```
627    /// // Watch critical metrics
628    /// let mut alerts = db.query("metrics")
629    ///     .filter(|f| f.gt("cpu_usage", Value::Float(80.0)))
630    ///     .watch()
631    ///     .await?;
632    ///
633    /// tokio::spawn(async move {
634    ///     while let Some(update) = alerts.next().await {
635    ///         if let QueryUpdate::Added(metric) = update {
636    ///             // Alert on high CPU usage
637    ///             send_alert(format!("High CPU: {:?}", metric)).await;
638    ///         }
639    ///     }
640    /// });
641    /// ```
642    ///
643    /// **Collaborative Editing:**
644    /// ```
645    /// // Watch document for changes from other users
646    /// let doc_id = "doc-123";
647    /// let mut changes = db.query("documents")
648    ///     .filter(|f| f.eq("id", doc_id))
649    ///     .watch()
650    ///     .await?;
651    ///
652    /// tokio::spawn(async move {
653    ///     while let Some(update) = changes.next().await {
654    ///         if let QueryUpdate::Modified { old, new } = update {
655    ///             // Merge changes from other editors
656    ///             apply_remote_changes(&old, &new).await;
657    ///         }
658    ///     }
659    /// });
660    /// ```
661    ///
662    /// **Stock Ticker:**
663    /// ```
664    /// // Watch price changes
665    /// let mut price_watcher = db.query("stocks")
666    ///     .filter(|f| f.eq("symbol", "AAPL"))
667    ///     .watch()
668    ///     .await?;
669    ///
670    /// while let Some(update) = price_watcher.next().await {
671    ///     if let QueryUpdate::Modified { old, new } = update {
672    ///         if let (Some(old_price), Some(new_price)) =
673    ///             (old.data.get("price"), new.data.get("price")) {
674    ///             println!("AAPL: {} -> {}", old_price, new_price);
675    ///         }
676    ///     }
677    /// }
678    /// ```
679    ///
680    /// # Multiple Watchers Pattern
681    ///
682    /// ```
683    /// // Watch multiple queries concurrently
684    /// let mut high_priority = db.query("tasks")
685    ///     .filter(|f| f.eq("priority", Value::String("high".into())))
686    ///     .watch()
687    ///     .await?;
688    ///
689    /// let mut urgent = db.query("tasks")
690    ///     .filter(|f| f.eq("status", Value::String("urgent".into())))
691    ///     .watch()
692    ///     .await?;
693    ///
694    /// tokio::spawn(async move {
695    ///     loop {
696    ///         tokio::select! {
697    ///             Some(update) = high_priority.next() => {
698    ///                 println!("High priority: {:?}", update);
699    ///             },
700    ///             Some(update) = urgent.next() => {
701    ///                 println!("Urgent: {:?}", update);
702    ///             },
703    ///         }
704    ///     }
705    /// });
706    /// ```
707    ///
708    /// # Important Notes
709    /// - Requires Arc<Aurora> for 'static lifetime
710    /// - Updates are delivered asynchronously
711    /// - Watcher keeps running until dropped
712    /// - Only matching documents trigger updates
713    /// - Use tokio::spawn to process updates in background
714    ///
715    /// # See Also
716    /// - `Aurora::listen()` for collection-level change notifications
717    /// - `QueryWatcher::next()` to receive the next update
718    /// - `QueryWatcher::try_next()` for non-blocking checks
719    pub async fn watch(mut self) -> Result<crate::reactive::QueryWatcher>
720    where
721        'a: 'static,
722    {
723        use crate::reactive::{QueryWatcher, ReactiveQueryState};
724        use std::sync::Arc;
725
726        // Get initial results BEFORE moving filters
727        let docs = self.collect_internal().await?;
728
729        // Extract context
730        let collection = self.collection.clone();
731        let db = self.db;
732        let debounce = self.debounce_duration;
733        let filters = std::mem::take(&mut self.filters);
734
735        // Create a listener for this collection
736        let listener = db.listen(&collection);
737
738        // Create filter closure that combines all the query filters
739        let filter_fn = move |doc: &Document| -> bool { filters.iter().all(|f| f(doc)) };
740
741        // Create reactive state
742        let state = Arc::new(ReactiveQueryState::new(filter_fn));
743
744        // Create and return watcher
745        Ok(QueryWatcher::new(
746            collection, listener, state, docs, debounce,
747        ))
748    }
749
750    /// Get only the first matching document or None if no matches
751    ///
752    /// # Examples
753
754    /// let user = db.query("users")
755    ///     .filter(|f| f.eq("email", "jane@example.com"))
756    ///     .first_one()
757    ///     .await?;
758    /// ```
759    pub async fn first_one(self) -> Result<Option<Document>> {
760        self.limit(1).collect().await.map(|mut docs| docs.pop())
761    }
762
763    /// Count the number of documents matching the query
764    ///
765    /// # Examples
766
767    /// let active_count = db.query("users")
768    ///     .filter(|f| f.eq("status", "active"))
769    ///     .count()
770    ///     .await?;
771    /// ```
772    pub async fn count(self) -> Result<usize> {
773        self.collect().await.map(|docs| docs.len())
774    }
775
776    /// Update documents matching the query with new field values
777    ///
778    /// # Returns
779    /// The number of documents updated
780    ///
781    /// # Examples
782
783    /// let updated = db.query("products")
784    ///     .filter(|f| f.lt("stock", 5))
785    ///     .update([
786    ///         ("status", Value::String("low_stock".to_string())),
787    ///         ("needs_reorder", Value::Bool(true))
788    ///     ].into_iter().collect())
789    ///     .await?;
790    /// ```
791    pub async fn update(self, updates: HashMap<&str, Value>) -> Result<usize> {
792        // Store a reference to the db and collection before consuming self
793        let db = self.db;
794        let collection = self.collection.clone();
795
796        let docs = self.collect().await?;
797        let mut updated_count = 0;
798
799        for doc in docs {
800            let mut updated_doc = doc.clone();
801            let mut changed = false;
802
803            for (field, value) in &updates {
804                updated_doc.data.insert(field.to_string(), value.clone());
805                changed = true;
806            }
807
808            if changed {
809                // Update document in the database
810                db.put(
811                    format!("{}:{}", collection, updated_doc.id),
812                    serde_json::to_vec(&updated_doc)?,
813                    None,
814                )
815                .await?;
816                updated_count += 1;
817            }
818        }
819
820        Ok(updated_count)
821    }
822
823    /// Delete documents matching the query
824    ///
825    /// # Returns
826    /// The number of documents deleted
827    ///
828    /// # Examples
829    /// ```rust,no_run
830    /// let deleted = db.query("users")
831    ///     .filter(|f| f.eq("active", false))
832    ///     .delete()
833    ///     .await?;
834    /// ```
835    /// Delete documents matching the query
836    ///
837    /// # Returns
838    /// The number of documents deleted
839    ///
840    /// # Note
841    /// Deletions are not atomic. If an error occurs during deletion,
842    /// some documents may have been deleted while others remain.
843    /// For atomic batch operations, consider using transactions.
844    ///
845    /// # Examples
846    /// ```no_run
847    /// # use aurora_db::{Aurora, Result};
848    /// # async fn example(db: &Aurora) -> Result<()> {
849    /// // Delete all inactive users
850    /// let deleted = db.query("users")
851    ///     .filter(|f| f.eq("status", "inactive"))
852    ///     .delete()
853    ///     .await?;
854    /// println!("Deleted {} users", deleted);
855    /// # Ok(())
856    /// # }
857    /// ```
858    pub async fn delete(self) -> Result<usize> {
859        // Store reference to db and collection before consuming self
860        let db = self.db;
861        let collection = self.collection.clone();
862
863        // Use collect() to get matching documents (leverages optimization)
864        let docs = self.collect().await?;
865        let mut deleted_count = 0;
866
867        for doc in docs {
868            let key = format!("{}:{}", collection, doc.id);
869            db.delete(&key).await?;
870            deleted_count += 1;
871        }
872
873        Ok(deleted_count)
874    }
875}
876
877/// Builder for performing full-text search operations
878///
879/// # Examples
880/// ```rust,no_run
881/// let results = db.search("products")
882///     .field("description")
883///     .matching("wireless headphones")
884///     .fuzzy(true)
885///     .collect()
886///     .await?;
887/// ```
888pub struct SearchBuilder<'a> {
889    db: &'a Aurora,
890    collection: String,
891    field: Option<String>,
892    query: Option<String>,
893    fuzzy: bool,
894}
895
896impl<'a> SearchBuilder<'a> {
897    /// Create a new search builder for the specified collection
898    pub fn new(db: &'a Aurora, collection: &str) -> Self {
899        Self {
900            db,
901            collection: collection.to_string(),
902            field: None,
903            query: None,
904            fuzzy: false,
905        }
906    }
907
908    /// Specify the field to search in
909    ///
910    /// # Examples
911
912    /// .field("description")
913    /// ```
914    pub fn field(mut self, field: &str) -> Self {
915        self.field = Some(field.to_string());
916        self
917    }
918
919    /// Specify the search query text
920    ///
921    /// # Examples
922
923    /// .matching("wireless headphones")
924    /// ```
925    pub fn matching(mut self, query: &str) -> Self {
926        self.query = Some(query.to_string());
927        self
928    }
929
930    /// Enable or disable fuzzy matching (for typo tolerance)
931    ///
932    /// # Examples
933
934    /// .fuzzy(true)  // Enable fuzzy matching
935    /// ```
936    pub fn fuzzy(mut self, enable: bool) -> Self {
937        self.fuzzy = enable;
938        self
939    }
940
941    /// Execute the search and collect the results
942    ///
943    /// # Returns
944    /// A vector of documents matching the search criteria
945    ///
946    /// # Examples
947
948    /// let results = db.search("articles")
949    ///     .field("content")
950    ///     .matching("quantum computing")
951    ///     .collect()
952    ///     .await?;
953    /// ```
954    pub async fn collect(self) -> Result<Vec<Document>> {
955        let field = self
956            .field
957            .ok_or_else(|| AqlError::invalid_operation("Search field not specified"))?;
958        let query = self
959            .query
960            .ok_or_else(|| AqlError::invalid_operation("Search query not specified"))?;
961
962        self.db.search_text(&self.collection, &field, &query).await
963    }
964}
965
966#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
967pub struct SimpleQueryBuilder {
968    pub collection: String,
969    pub filters: Vec<Filter>,
970    pub order_by: Option<(String, bool)>,
971    pub limit: Option<usize>,
972    pub offset: Option<usize>,
973}
974
975impl SimpleQueryBuilder {
976    pub fn new(collection: String) -> Self {
977        Self {
978            collection,
979            filters: Vec::new(),
980            order_by: None,
981            limit: None,
982            offset: None,
983        }
984    }
985
986    pub fn filter(mut self, filter: Filter) -> Self {
987        self.filters.push(filter);
988        self
989    }
990
991    /// Filter for exact equality
992    ///
993    /// Uses secondary index if the field is indexed (O(1) lookup).
994    /// Falls back to full collection scan if not indexed (O(n)).
995    ///
996    /// # Arguments
997    /// * `field` - The field name to filter on
998    /// * `value` - The exact value to match
999    ///
1000    /// # Examples
1001    ///
1002
1003    /// use aurora_db::{Aurora, types::Value};
1004    ///
1005    /// let db = Aurora::open("mydb.db")?;
1006    ///
1007    /// // Find active users
1008    /// let active_users = db.query("users")
1009    ///     .filter(|f| f.eq("status", Value::String("active".into())))
1010    ///     .collect()
1011    ///     .await?;
1012    ///
1013    /// // Multiple equality filters (AND logic)
1014    /// let premium_active = db.query("users")
1015    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
1016    ///     .filter(|f| f.eq("active", Value::Bool(true)))
1017    ///     .collect()
1018    ///     .await?;
1019    ///
1020    /// // Numeric equality
1021    /// let age_30 = db.query("users")
1022    ///     .filter(|f| f.eq("age", Value::Int(30)))
1023    ///     .collect()
1024    ///     .await?;
1025    /// ```
1026    pub fn eq(self, field: &str, value: Value) -> Self {
1027        self.filter(Filter::Eq(field.to_string(), value))
1028    }
1029
1030    /// Filter for greater than
1031    ///
1032    /// Finds all documents where the field value is strictly greater than
1033    /// the provided value. With LIMIT, uses early termination for performance.
1034    ///
1035    /// # Arguments
1036    /// * `field` - The field name to compare
1037    /// * `value` - The minimum value (exclusive)
1038    ///
1039    /// # Performance
1040    /// - Without LIMIT: O(n) - scans all documents
1041    /// - With LIMIT: O(k) where k = limit + offset (early termination)
1042    /// - No index support yet (planned for future)
1043    ///
1044    /// # Examples
1045    ///
1046
1047    /// use aurora_db::{Aurora, types::Value};
1048    ///
1049    /// let db = Aurora::open("mydb.db")?;
1050    ///
1051    /// // Find high scorers (with early termination)
1052    /// let high_scorers = db.query("users")
1053    ///     .filter(|f| f.gt("score", Value::Int(1000)))
1054    ///     .limit(100)  // Stops after finding 100 matches
1055    ///     .collect()
1056    ///     .await?;
1057    ///
1058    /// // Price range queries
1059    /// let expensive = db.query("products")
1060    ///     .filter(|f| f.gt("price", Value::Float(99.99)))
1061    ///     .order_by("price", false)  // Descending
1062    ///     .collect()
1063    ///     .await?;
1064    ///
1065    /// // Date filtering (timestamps as integers)
1066    /// let recent = db.query("events")
1067    ///     .filter(|f| f.gt("timestamp", Value::Int(1609459200)))  // After Jan 1, 2021
1068    ///     .collect()
1069    ///     .await?;
1070    /// ```
1071    pub fn gt(self, field: &str, value: Value) -> Self {
1072        self.filter(Filter::Gt(field.to_string(), value))
1073    }
1074
1075    /// Filter for greater than or equal to
1076    ///
1077    /// Finds all documents where the field value is greater than or equal to
1078    /// the provided value. Inclusive version of `gt()`.
1079    ///
1080    /// # Arguments
1081    /// * `field` - The field name to compare
1082    /// * `value` - The minimum value (inclusive)
1083    ///
1084    /// # Examples
1085    ///
1086
1087    /// use aurora_db::{Aurora, types::Value};
1088    ///
1089    /// let db = Aurora::open("mydb.db")?;
1090    ///
1091    /// // Minimum age requirement (inclusive)
1092    /// let adults = db.query("users")
1093    ///     .filter(|f| f.gte("age", Value::Int(18)))
1094    ///     .collect()
1095    ///     .await?;
1096    ///
1097    /// // Inventory management
1098    /// let in_stock = db.query("products")
1099    ///     .filter(|f| f.gte("stock", Value::Int(1)))
1100    ///     .collect()
1101    ///     .await?;
1102    /// ```
1103    pub fn gte(self, field: &str, value: Value) -> Self {
1104        self.filter(Filter::Gte(field.to_string(), value))
1105    }
1106
1107    /// Filter for less than
1108    ///
1109    /// Finds all documents where the field value is strictly less than
1110    /// the provided value.
1111    ///
1112    /// # Arguments
1113    /// * `field` - The field name to compare
1114    /// * `value` - The maximum value (exclusive)
1115    ///
1116    /// # Examples
1117    ///
1118
1119    /// use aurora_db::{Aurora, types::Value};
1120    ///
1121    /// let db = Aurora::open("mydb.db")?;
1122    ///
1123    /// // Low balance accounts
1124    /// let low_balance = db.query("accounts")
1125    ///     .filter(|f| f.lt("balance", Value::Float(10.0)))
1126    ///     .collect()
1127    ///     .await?;
1128    ///
1129    /// // Budget products
1130    /// let budget = db.query("products")
1131    ///     .filter(|f| f.lt("price", Value::Float(50.0)))
1132    ///     .order_by("price", true)  // Ascending
1133    ///     .collect()
1134    ///     .await?;
1135    /// ```
1136    pub fn lt(self, field: &str, value: Value) -> Self {
1137        self.filter(Filter::Lt(field.to_string(), value))
1138    }
1139
1140    /// Filter for less than or equal to
1141    ///
1142    /// Finds all documents where the field value is less than or equal to
1143    /// the provided value. Inclusive version of `lt()`.
1144    ///
1145    /// # Arguments
1146    /// * `field` - The field name to compare
1147    /// * `value` - The maximum value (inclusive)
1148    ///
1149    /// # Examples
1150    ///
1151
1152    /// use aurora_db::{Aurora, types::Value};
1153    ///
1154    /// let db = Aurora::open("mydb.db")?;
1155    ///
1156    /// // Senior discount eligibility
1157    /// let seniors = db.query("users")
1158    ///     .filter(|f| f.lte("age", Value::Int(65)))
1159    ///     .collect()
1160    ///     .await?;
1161    ///
1162    /// // Clearance items
1163    /// let clearance = db.query("products")
1164    ///     .filter(|f| f.lte("price", Value::Float(20.0)))
1165    ///     .collect()
1166    ///     .await?;
1167    /// ```
1168    pub fn lte(self, field: &str, value: Value) -> Self {
1169        self.filter(Filter::Lte(field.to_string(), value))
1170    }
1171
1172    /// Filter for substring containment
1173    ///
1174    /// Finds all documents where the field value contains the specified substring.
1175    /// Case-sensitive matching. For text search, consider using the `search()` API instead.
1176    ///
1177    /// # Arguments
1178    /// * `field` - The field name to search in (must be a string field)
1179    /// * `value` - The substring to search for
1180    ///
1181    /// # Performance
1182    /// - Always O(n) - scans all documents
1183    /// - Case-sensitive string matching
1184    /// - For full-text search, use `db.search()` instead
1185    ///
1186    /// # Examples
1187    ///
1188    /// ```
1189    /// use aurora_db::Aurora;
1190    ///
1191    /// let db = Aurora::open("mydb.db")?;
1192    ///
1193    /// // Find articles about Rust
1194    /// let rust_articles = db.query("articles")
1195    ///     .filter(|f| f.contains("title", "Rust"))
1196    ///     .collect()
1197    ///     .await?;
1198    ///
1199    /// // Email domain filtering
1200    /// let gmail_users = db.query("users")
1201    ///     .filter(|f| f.contains("email", "@gmail.com"))
1202    ///     .collect()
1203    ///     .await?;
1204    ///
1205    /// // Tag searching
1206    /// let rust_posts = db.query("posts")
1207    ///     .filter(|f| f.contains("tags", "rust"))
1208    ///     .collect()
1209    ///     .await?;
1210    /// ```
1211    ///
1212    /// # Note
1213    /// For case-insensitive search or more advanced text matching,
1214    /// use the full-text search API: `db.search(collection).query(text)`
1215    pub fn contains(self, field: &str, value: &str) -> Self {
1216        self.filter(Filter::Contains(field.to_string(), value.to_string()))
1217    }
1218
1219    /// Convenience method for range queries
1220    pub fn between(self, field: &str, min: Value, max: Value) -> Self {
1221        self.filter(Filter::Gte(field.to_string(), min))
1222            .filter(Filter::Lte(field.to_string(), max))
1223    }
1224
1225    /// Sort results by a field (ascending or descending)
1226    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
1227        self.order_by = Some((field.to_string(), ascending));
1228        self
1229    }
1230
1231    /// Limit the number of results returned
1232    pub fn limit(mut self, limit: usize) -> Self {
1233        self.limit = Some(limit);
1234        self
1235    }
1236
1237    /// Skip a number of results (for pagination)
1238    pub fn offset(mut self, offset: usize) -> Self {
1239        self.offset = Some(offset);
1240        self
1241    }
1242}
1243
1244#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1245pub enum Filter {
1246    Eq(String, Value),
1247    Gt(String, Value),
1248    Gte(String, Value),
1249    Lt(String, Value),
1250    Lte(String, Value),
1251    Contains(String, String),
1252    And(Vec<Filter>),
1253    Or(Vec<Filter>),
1254}
1255
1256fn get_field_value<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
1257    // First, try a direct lookup for field names that might contain dots.
1258    if let Some(value) = doc.data.get(path) {
1259        return Some(value);
1260    }
1261
1262    if !path.contains('.') {
1263        return None;
1264    }
1265
1266    let parts: Vec<&str> = path.split('.').collect();
1267    let mut current = doc.data.get(parts[0])?;
1268
1269    for &part in &parts[1..] {
1270        if let Value::Object(map) = current {
1271            current = map.get(part)?;
1272        } else {
1273            return None;
1274        }
1275    }
1276
1277    Some(current)
1278}
1279
1280impl Filter {
1281    /// Check if a document matches this filter
1282    pub fn matches(&self, doc: &Document) -> bool {
1283        match self {
1284            Filter::Eq(field, value) => get_field_value(doc, field) == Some(value),
1285            Filter::Gt(field, value) => get_field_value(doc, field).is_some_and(|v| v > value),
1286            Filter::Gte(field, value) => get_field_value(doc, field).is_some_and(|v| v >= value),
1287            Filter::Lt(field, value) => get_field_value(doc, field).is_some_and(|v| v < value),
1288            Filter::Lte(field, value) => get_field_value(doc, field).is_some_and(|v| v <= value),
1289            Filter::Contains(field, substr) => get_field_value(doc, field).is_some_and(|v| {
1290                if let Value::String(s) = v {
1291                    s.contains(substr)
1292                } else if let Value::Array(arr) = v {
1293                    arr.contains(&Value::String(substr.clone()))
1294                } else {
1295                    false
1296                }
1297            }),
1298            Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
1299            Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
1300        }
1301    }
1302}