Skip to main content

aurora_db/
query.rs

1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5//!
6//! ## Architectural Note: "The Query as a Stream"
7//!
8//! Unlike traditional databases that parse SQL strings into an execution plan,
9//! Aurora's query engine is built around the **Fluent Builder Pattern** and **Iterators**.
10//!
11//! 1.  **Type-Safe Construction**: Queries are built using Rust method chaining, ensuring
12//!     validity at compile time (or runtime construction without parsing overhead).
13//! 2.  **Zero-Allocation Filtering**: The `Queryable` trait and `DocumentFilter` type allow
14//!     us to stack closures. These filters run directly on the data stream without
15//!     intermediate allocations.
16//! 3.  **Reactive Core**: The `watch()` method transforms a static query into a
17//!     live stream of `QueryUpdate` events. This is the heart of the "Embedded Platform"
18//!     concept—allowing the application to react to data changes instantly.
19//!
20//! ## Examples
21//!
22
23//! // Get all active users over 21
24//! let users = db.query("users")
25//!     .filter(|f| f.eq("status", "active") && f.gt("age", 21))
26//!     .order_by("last_login", false)
27//!     .limit(20)
28//!     .collect()
29//!     .await?;
30//! ```
31
32use crate::Aurora;
33use crate::error::AqlError;
34use crate::error::Result;
35use crate::types::{Document, Value};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Trait for objects that can filter documents.
40///
41/// This trait is implemented for closures that take a reference to a
42/// `FilterBuilder` and return a boolean, allowing for a natural filter syntax.
43pub trait Queryable {
44    fn matches(&self, doc: &Document) -> bool;
45}
46
47impl<F> Queryable for F
48where
49    F: Fn(&Document) -> bool,
50{
51    fn matches(&self, doc: &Document) -> bool {
52        self(doc)
53    }
54}
55
56/// Type alias for document filter functions
57type DocumentFilter<'a> = Box<dyn Fn(&Document) -> bool + Send + Sync + 'a>;
58
59/// Builder for creating and executing document queries.
60///
61/// QueryBuilder uses a fluent interface pattern to construct
62/// and execute queries against Aurora collections.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// // Query for active premium users
68/// let premium_users = db.query("users")
69///     .filter(|f| f.eq("status", "active") && f.eq("account_type", "premium"))
70///     .order_by("created_at", false)
71///     .limit(10)
72///     .collect()
73///     .await?;
74/// ```
75pub struct QueryBuilder<'a> {
76    db: &'a Aurora,
77    collection: String,
78    filters: Vec<DocumentFilter<'a>>,
79    order_by: Option<(String, bool)>,
80    limit: Option<usize>,
81    offset: Option<usize>,
82    fields: Option<Vec<String>>,
83    /// Optional debounce duration for reactive queries (watch)
84    /// When set, updates are batched and emitted at most once per interval
85    debounce_duration: Option<std::time::Duration>,
86}
87
88/// Builder for constructing document filter expressions.
89///
90/// This struct provides methods for comparing document fields
91/// with values to create filter conditions.
92///
93/// # Examples
94///
95/// ```rust,no_run
96/// // Combine multiple filter conditions
97/// db.query("products")
98///     .filter(|f| {
99///         f.gte("price", 10.0) &&
100///         f.lte("price", 50.0) &&
101///         f.contains("name", "widget")
102///     })
103///     .collect()
104///     .await?;
105/// ```
106pub struct FilterBuilder<'a, 'b> {
107    doc: &'b Document,
108    _marker: std::marker::PhantomData<&'a ()>,
109}
110
111impl<'a, 'b> FilterBuilder<'a, 'b> {
112    /// Create a new filter builder for the given document
113    pub fn new(doc: &'b Document) -> Self {
114        Self {
115            doc,
116            _marker: std::marker::PhantomData,
117        }
118    }
119
120    /// Check if a field equals a value
121    ///
122    /// # Examples
123   
124    /// .filter(|f| f.eq("status", "active"))
125    /// ```
126    pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> bool {
127        let value = value.into();
128        self.get_nested_value(field) == Some(&value)
129    }
130
131    /// Check if a field is greater than a value
132    ///
133    /// # Examples
134   
135    /// .filter(|f| f.gt("age", 21))
136    /// ```
137    pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
138        let value = value.into();
139        self.get_nested_value(field).is_some_and(|v| v > &value)
140    }
141
142    /// Check if a field is greater than or equal to a value
143    ///
144    /// # Examples
145   
146    /// .filter(|f| f.gte("age", 21))
147    /// ```
148    pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
149        let value = value.into();
150        self.get_nested_value(field).is_some_and(|v| v >= &value)
151    }
152
153    /// Check if a field is less than a value
154    ///
155    /// # Examples
156   
157    /// .filter(|f| f.lt("age", 65))
158    /// ```
159    pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
160        let value = value.into();
161        self.get_nested_value(field).is_some_and(|v| v < &value)
162    }
163
164    /// Check if a field is less than or equal to a value
165    ///
166    /// # Examples
167   
168    /// .filter(|f| f.lte("age", 65))
169    /// ```
170    pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
171        let value = value.into();
172        self.get_nested_value(field).is_some_and(|v| v <= &value)
173    }
174
175    /// Check if a field contains a value
176    ///
177    /// # Examples
178   
179    /// .filter(|f| f.contains("name", "widget"))
180    /// ```
181    pub fn contains(&self, field: &str, value: &str) -> bool {
182        self.get_nested_value(field).is_some_and(|v| match v {
183            Value::String(s) => s.contains(value),
184            Value::Array(arr) => arr.contains(&Value::String(value.to_string())),
185            _ => false,
186        })
187    }
188
189    /// Check if a field is in a list of values
190    ///
191    /// # Examples
192   
193    /// .filter(|f| f.in_values("status", &["active", "inactive"]))
194    /// ```
195    pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> bool {
196        let values: Vec<Value> = values.iter().map(|v| v.clone().into()).collect();
197        self.get_nested_value(field).is_some_and(|v| values.contains(v))
198    }
199
200    /// Check if a field is between two values (inclusive)
201    ///
202    /// # Examples
203   
204    /// .filter(|f| f.between("age", 18, 65))
205    /// ```
206    pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> bool {
207        self.gte(field, min) && self.lte(field, max)
208    }
209
210    /// Check if a field exists and is not null
211    ///
212    /// # Examples
213   
214    /// .filter(|f| f.exists("email"))
215    /// ```
216    pub fn exists(&self, field: &str) -> bool {
217        self.get_nested_value(field)
218            .is_some_and(|v| !matches!(v, Value::Null))
219    }
220
221    /// Check if a field doesn't exist or is null
222    ///
223    /// # Examples
224   
225    /// .filter(|f| f.is_null("email"))
226    /// ```
227    pub fn is_null(&self, field: &str) -> bool {
228        self.get_nested_value(field)
229            .is_none_or(|v| matches!(v, Value::Null))
230    }
231
232    /// Check if a field starts with a prefix
233    ///
234    /// # Examples
235   
236    /// .filter(|f| f.starts_with("name", "John"))
237    /// ```
238    pub fn starts_with(&self, field: &str, prefix: &str) -> bool {
239        self.get_nested_value(field).is_some_and(|v| match v {
240            Value::String(s) => s.starts_with(prefix),
241            _ => false,
242        })
243    }
244
245    /// Check if a field ends with a suffix
246    ///
247    /// # Examples
248   
249    /// .filter(|f| f.ends_with("name", "son"))
250    /// ```
251    pub fn ends_with(&self, field: &str, suffix: &str) -> bool {
252        self.get_nested_value(field).is_some_and(|v| match v {
253            Value::String(s) => s.ends_with(suffix),
254            _ => false,
255        })
256    }
257
258    /// Check if a field is in an array
259    ///
260    /// # Examples
261   
262    /// .filter(|f| f.array_contains("status", "active"))
263    /// ```
264    pub fn array_contains(&self, field: &str, value: impl Into<Value>) -> bool {
265        let value = value.into();
266        self.get_nested_value(field).is_some_and(|v| match v {
267            Value::Array(arr) => arr.contains(&value),
268            _ => false,
269        })
270    }
271
272    /// Check if an array has a specific length
273    ///
274    /// # Examples
275   
276    /// .filter(|f| f.array_len_eq("status", 2))
277    /// ```
278    pub fn array_len_eq(&self, field: &str, len: usize) -> bool {
279        self.get_nested_value(field).is_some_and(|v| match v {
280            Value::Array(arr) => arr.len() == len,
281            _ => false,
282        })
283    }
284
285    /// Access a nested field using dot notation
286    ///
287    /// # Examples
288   
289    /// .filter(|f| f.get_nested_value("user.address.city") == Some(&Value::String("New York")))
290    /// ```
291    pub fn get_nested_value(&self, path: &str) -> Option<&Value> {
292        let parts: Vec<&str> = path.split('.').collect();
293        let mut current = self.doc.data.get(parts[0])?;
294
295        for &part in &parts[1..] {
296            if let Value::Object(map) = current {
297                current = map.get(part)?;
298            } else {
299                return None;
300            }
301        }
302
303        Some(current)
304    }
305
306    /// Check if a nested field equals a value
307    ///
308    /// # Examples
309   
310    /// .filter(|f| f.nested_eq("user.address.city", "New York"))
311    /// ```
312    pub fn nested_eq<T: Into<Value>>(&self, path: &str, value: T) -> bool {
313        let value = value.into();
314        self.get_nested_value(path) == Some(&value)
315    }
316
317    /// Check if a field matches a regular expression
318    ///
319    /// # Examples
320   
321    /// .filter(|f| f.matches_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
322    /// ```
323    pub fn matches_regex(&self, field: &str, pattern: &str) -> bool {
324        use regex::Regex;
325
326        // Prevent DoS via overly complex/long regex
327        if pattern.len() > 100 {
328            return false;
329        }
330
331        if let Ok(re) = Regex::new(pattern) {
332            self.get_nested_value(field).is_some_and(|v| match v {
333                Value::String(s) => re.is_match(s),
334                _ => false,
335            })
336        } else {
337            false
338        }
339    }
340}
341
342impl<'a> QueryBuilder<'a> {
343    /// Create a new query builder for the specified collection
344    ///
345    /// # Examples
346   
347    /// let query = db.query("users");
348    /// ```
349    pub fn new(db: &'a Aurora, collection: &str) -> Self {
350        Self {
351            db,
352            collection: collection.to_string(),
353            filters: Vec::new(),
354            order_by: None,
355            limit: None,
356            offset: None,
357            fields: None,
358            debounce_duration: None,
359        }
360    }
361
362    /// Add a filter function to the query
363    ///
364    /// # Examples
365   
366    /// let active_users = db.query("users")
367    ///     .filter(|f| f.eq("status", "active"))
368    ///     .collect()
369    ///     .await?;
370    /// ```
371    pub fn filter<F>(mut self, filter_fn: F) -> Self
372    where
373        // CHANGE 2: Require the closure `F` to be `Send + Sync`.
374        F: Fn(&FilterBuilder) -> bool + Send + Sync + 'a,
375    {
376        self.filters.push(Box::new(move |doc| {
377            let filter_builder = FilterBuilder::new(doc);
378            filter_fn(&filter_builder)
379        }));
380        self
381    }
382
383    /// Sort results by a field (ascending or descending)
384    ///
385    /// # Parameters
386    /// * `field` - The field to sort by
387    /// * `ascending` - `true` for ascending order, `false` for descending
388    ///
389    /// # Examples
390    /// ```
391    /// // Sort by age ascending
392    /// .order_by("age", true)
393    ///
394    /// // Sort by creation date descending (newest first)
395    /// .order_by("created_at", false)
396    /// ```
397    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
398        self.order_by = Some((field.to_string(), ascending));
399        self
400    }
401
402    /// Limit the number of results returned
403    ///
404    /// # Examples
405    /// ```
406    /// // Get at most 10 results
407    /// .limit(10)
408    /// ```
409    pub fn limit(mut self, limit: usize) -> Self {
410        self.limit = Some(limit);
411        self
412    }
413
414    /// Skip a number of results (for pagination)
415    ///
416    /// # Examples
417    /// ```
418    /// // For pagination: skip the first 20 results and get the next 10
419    /// .offset(20).limit(10)
420    /// ```
421    pub fn offset(mut self, offset: usize) -> Self {
422        self.offset = Some(offset);
423        self
424    }
425
426    /// Select only specific fields to return
427    ///
428    /// # Examples
429    /// ```
430    /// // Only return name and email fields
431    /// .select(&["name", "email"])
432    /// ```
433    pub fn select(mut self, fields: &[&str]) -> Self {
434        self.fields = Some(fields.iter().map(|s| s.to_string()).collect());
435        self
436    }
437
438    /// Set debounce/throttle interval for reactive queries (watch)
439    ///
440    /// When watching a query, updates will be batched and emitted at most once
441    /// per interval. This prevents overwhelming the UI with high-frequency updates.
442    /// Events are deduplicated by document ID, keeping only the latest state.
443    ///
444    /// # Examples
445    /// ```
446    /// use std::time::Duration;
447    ///
448    /// // Rate-limit to max 10 updates per second
449    /// let mut watcher = db.query("logs")
450    ///     .filter(|f| f.eq("level", "ERROR"))
451    ///     .debounce(Duration::from_millis(100))
452    ///     .watch()
453    ///     .await?;
454    /// ```
455    pub fn debounce(mut self, duration: std::time::Duration) -> Self {
456        self.debounce_duration = Some(duration);
457        self
458    }
459
460    /// Execute the query and collect the results
461    ///
462    /// # Returns
463    /// A vector of documents matching the query criteria
464    ///
465    /// # Examples
466   
467    /// let results = db.query("products")
468    ///     .filter(|f| f.lt("price", 100))
469    ///     .collect()
470    ///     .await?;
471    /// ```
472    pub async fn collect(self) -> Result<Vec<Document>> {
473        // Ensure indices are initialized
474        self.db.ensure_indices_initialized().await?;
475
476        // Optimization: Use early termination for queries with LIMIT but no ORDER BY
477        let mut docs = if self.order_by.is_none() && self.limit.is_some() {
478            // Early termination path - scan only until we have enough results
479            let target = self.limit.unwrap() + self.offset.unwrap_or(0);
480            let filter = |doc: &Document| self.filters.iter().all(|f| f(doc));
481            self.db
482                .scan_and_filter(&self.collection, filter, Some(target))?
483        } else {
484            // Standard path - need all matching docs (for sorting or no limit)
485            let mut docs = self.db.get_all_collection(&self.collection).await?;
486            docs.retain(|doc| self.filters.iter().all(|f| f(doc)));
487            docs
488        };
489
490        // Apply ordering
491        if let Some((field, ascending)) = self.order_by {
492            docs.sort_by(|a, b| match (a.data.get(&field), b.data.get(&field)) {
493                (Some(v1), Some(v2)) => {
494                    let cmp = v1.cmp(v2);
495                    if ascending { cmp } else { cmp.reverse() }
496                }
497                (None, Some(_)) => std::cmp::Ordering::Less,
498                (Some(_), None) => std::cmp::Ordering::Greater,
499                (None, None) => std::cmp::Ordering::Equal,
500            });
501        }
502
503        // Apply field selection if specified
504        if let Some(fields) = self.fields {
505            // Create new documents with only selected fields
506            docs = docs
507                .into_iter()
508                .map(|doc| {
509                    let mut new_data = HashMap::new();
510                    // Always include the ID
511                    for field in &fields {
512                        if let Some(value) = doc.data.get(field) {
513                            new_data.insert(field.clone(), value.clone());
514                        }
515                    }
516                    Document {
517                        id: doc.id,
518                        data: new_data,
519                    }
520                })
521                .collect();
522        }
523
524        // Apply offset and limit safely
525        let start = self.offset.unwrap_or(0);
526        let end = self
527            .limit
528            .map(|l| start.saturating_add(l))
529            .unwrap_or(docs.len());
530
531        // Ensure we don't go out of bounds
532        let end = end.min(docs.len());
533        Ok(docs.get(start..end).unwrap_or(&[]).to_vec())
534    }
535
536    /// Watch the query for real-time updates
537    ///
538    /// Returns a QueryWatcher that streams live updates when documents are added,
539    /// removed, or modified in ways that affect the query results. Perfect for
540    /// building reactive UIs, live dashboards, and real-time applications.
541    ///
542    /// # Performance
543    /// - Zero overhead for queries without watchers
544    /// - Updates delivered asynchronously via channels
545    /// - Automatic filtering - only matching changes are emitted
546    /// - Memory efficient - only tracks matching documents
547    ///
548    /// # Requirements
549    /// This method requires the QueryBuilder to have a 'static lifetime,
550    /// which means the database reference must also be 'static (e.g., Arc<Aurora>).
551    ///
552    /// # Examples
553    ///
554    /// ```
555    /// use aurora_db::{Aurora, types::Value};
556    /// use std::sync::Arc;
557    ///
558    /// let db = Arc::new(Aurora::open("mydb.db")?);
559    ///
560    /// // Basic reactive query - watch active users
561    /// let mut watcher = db.query("users")
562    ///     .filter(|f| f.eq("active", Value::Bool(true)))
563    ///     .watch()
564    ///     .await?;
565    ///
566    /// // Receive updates in real-time
567    /// while let Some(update) = watcher.next().await {
568    ///     match update {
569    ///         QueryUpdate::Added(doc) => {
570    ///             println!("New active user: {}", doc.id);
571    ///         },
572    ///         QueryUpdate::Removed(doc) => {
573    ///             println!("User deactivated: {}", doc.id);
574    ///         },
575    ///         QueryUpdate::Modified { old, new } => {
576    ///             println!("User updated: {} -> {}", old.id, new.id);
577    ///         },
578    ///     }
579    /// }
580    /// ```
581    ///
582    /// # Real-World Use Cases
583    ///
584    /// **Live Leaderboard:**
585    /// ```
586    /// // Watch top players by score
587    /// let mut leaderboard = db.query("players")
588    ///     .filter(|f| f.gte("score", Value::Int(1000)))
589    ///     .watch()
590    ///     .await?;
591    ///
592    /// tokio::spawn(async move {
593    ///     while let Some(update) = leaderboard.next().await {
594    ///         // Update UI with new rankings
595    ///         broadcast_to_clients(&update).await;
596    ///     }
597    /// });
598    /// ```
599    ///
600    /// **Activity Feed:**
601    /// ```
602    /// // Watch recent posts for a user's feed
603    /// let mut feed = db.query("posts")
604    ///     .filter(|f| f.eq("author_id", user_id))
605    ///     .watch()
606    ///     .await?;
607    ///
608    /// // Stream updates to WebSocket
609    /// while let Some(update) = feed.next().await {
610    ///     match update {
611    ///         QueryUpdate::Added(post) => {
612    ///             websocket.send(json!({"type": "new_post", "post": post})).await?;
613    ///         },
614    ///         _ => {}
615    ///     }
616    /// }
617    /// ```
618    ///
619    /// **Real-Time Dashboard:**
620    /// ```
621    /// // Watch critical metrics
622    /// let mut alerts = db.query("metrics")
623    ///     .filter(|f| f.gt("cpu_usage", Value::Float(80.0)))
624    ///     .watch()
625    ///     .await?;
626    ///
627    /// tokio::spawn(async move {
628    ///     while let Some(update) = alerts.next().await {
629    ///         if let QueryUpdate::Added(metric) = update {
630    ///             // Alert on high CPU usage
631    ///             send_alert(format!("High CPU: {:?}", metric)).await;
632    ///         }
633    ///     }
634    /// });
635    /// ```
636    ///
637    /// **Collaborative Editing:**
638    /// ```
639    /// // Watch document for changes from other users
640    /// let doc_id = "doc-123";
641    /// let mut changes = db.query("documents")
642    ///     .filter(|f| f.eq("id", doc_id))
643    ///     .watch()
644    ///     .await?;
645    ///
646    /// tokio::spawn(async move {
647    ///     while let Some(update) = changes.next().await {
648    ///         if let QueryUpdate::Modified { old, new } = update {
649    ///             // Merge changes from other editors
650    ///             apply_remote_changes(&old, &new).await;
651    ///         }
652    ///     }
653    /// });
654    /// ```
655    ///
656    /// **Stock Ticker:**
657    /// ```
658    /// // Watch price changes
659    /// let mut price_watcher = db.query("stocks")
660    ///     .filter(|f| f.eq("symbol", "AAPL"))
661    ///     .watch()
662    ///     .await?;
663    ///
664    /// while let Some(update) = price_watcher.next().await {
665    ///     if let QueryUpdate::Modified { old, new } = update {
666    ///         if let (Some(old_price), Some(new_price)) =
667    ///             (old.data.get("price"), new.data.get("price")) {
668    ///             println!("AAPL: {} -> {}", old_price, new_price);
669    ///         }
670    ///     }
671    /// }
672    /// ```
673    ///
674    /// # Multiple Watchers Pattern
675    ///
676    /// ```
677    /// // Watch multiple queries concurrently
678    /// let mut high_priority = db.query("tasks")
679    ///     .filter(|f| f.eq("priority", Value::String("high".into())))
680    ///     .watch()
681    ///     .await?;
682    ///
683    /// let mut urgent = db.query("tasks")
684    ///     .filter(|f| f.eq("status", Value::String("urgent".into())))
685    ///     .watch()
686    ///     .await?;
687    ///
688    /// tokio::spawn(async move {
689    ///     loop {
690    ///         tokio::select! {
691    ///             Some(update) = high_priority.next() => {
692    ///                 println!("High priority: {:?}", update);
693    ///             },
694    ///             Some(update) = urgent.next() => {
695    ///                 println!("Urgent: {:?}", update);
696    ///             },
697    ///         }
698    ///     }
699    /// });
700    /// ```
701    ///
702    /// # Important Notes
703    /// - Requires Arc<Aurora> for 'static lifetime
704    /// - Updates are delivered asynchronously
705    /// - Watcher keeps running until dropped
706    /// - Only matching documents trigger updates
707    /// - Use tokio::spawn to process updates in background
708    ///
709    /// # See Also
710    /// - `Aurora::listen()` for collection-level change notifications
711    /// - `QueryWatcher::next()` to receive the next update
712    /// - `QueryWatcher::try_next()` for non-blocking checks
713    pub async fn watch(mut self) -> Result<crate::reactive::QueryWatcher>
714    where
715        'a: 'static,
716    {
717        use crate::reactive::{QueryWatcher, ReactiveQueryState};
718        use std::sync::Arc;
719
720        // Extract the filters before consuming self
721        let collection = self.collection.clone();
722        let db = self.db;
723        let filters = std::mem::take(&mut self.filters);
724
725        // Get initial results
726        let docs = self.collect().await?;
727
728        // Create a listener for this collection
729        let listener = db.listen(&collection);
730
731        // Create filter closure that combines all the query filters
732        let filter_fn = move |doc: &Document| -> bool { filters.iter().all(|f| f(doc)) };
733
734        // Create reactive state
735        let state = Arc::new(ReactiveQueryState::new(filter_fn));
736
737        // Create and return watcher
738        Ok(QueryWatcher::new(collection, listener, state, docs))
739    }
740
741    /// Get only the first matching document or None if no matches
742    ///
743    /// # Examples
744   
745    /// let user = db.query("users")
746    ///     .filter(|f| f.eq("email", "jane@example.com"))
747    ///     .first_one()
748    ///     .await?;
749    /// ```
750    pub async fn first_one(self) -> Result<Option<Document>> {
751        self.limit(1).collect().await.map(|mut docs| docs.pop())
752    }
753
754    /// Count the number of documents matching the query
755    ///
756    /// # Examples
757   
758    /// let active_count = db.query("users")
759    ///     .filter(|f| f.eq("status", "active"))
760    ///     .count()
761    ///     .await?;
762    /// ```
763    pub async fn count(self) -> Result<usize> {
764        self.collect().await.map(|docs| docs.len())
765    }
766
767    /// Update documents matching the query with new field values
768    ///
769    /// # Returns
770    /// The number of documents updated
771    ///
772    /// # Examples
773   
774    /// let updated = db.query("products")
775    ///     .filter(|f| f.lt("stock", 5))
776    ///     .update([
777    ///         ("status", Value::String("low_stock".to_string())),
778    ///         ("needs_reorder", Value::Bool(true))
779    ///     ].into_iter().collect())
780    ///     .await?;
781    /// ```
782    pub async fn update(self, updates: HashMap<&str, Value>) -> Result<usize> {
783        // Store a reference to the db and collection before consuming self
784        let db = self.db;
785        let collection = self.collection.clone();
786
787        let docs = self.collect().await?;
788        let mut updated_count = 0;
789
790        for doc in docs {
791            let mut updated_doc = doc.clone();
792            let mut changed = false;
793
794            for (field, value) in &updates {
795                updated_doc.data.insert(field.to_string(), value.clone());
796                changed = true;
797            }
798
799            if changed {
800                // Update document in the database
801                db.put(
802                    format!("{}:{}", collection, updated_doc.id),
803                    serde_json::to_vec(&updated_doc)?,
804                    None,
805                )
806                .await?;
807                updated_count += 1;
808            }
809        }
810
811        Ok(updated_count)
812    }
813}
814
815/// Builder for performing full-text search operations
816///
817/// # Examples
818/// ```rust,no_run
819/// let results = db.search("products")
820///     .field("description")
821///     .matching("wireless headphones")
822///     .fuzzy(true)
823///     .collect()
824///     .await?;
825/// ```
826pub struct SearchBuilder<'a> {
827    db: &'a Aurora,
828    collection: String,
829    field: Option<String>,
830    query: Option<String>,
831    fuzzy: bool,
832}
833
834impl<'a> SearchBuilder<'a> {
835    /// Create a new search builder for the specified collection
836    pub fn new(db: &'a Aurora, collection: &str) -> Self {
837        Self {
838            db,
839            collection: collection.to_string(),
840            field: None,
841            query: None,
842            fuzzy: false,
843        }
844    }
845
846    /// Specify the field to search in
847    ///
848    /// # Examples
849   
850    /// .field("description")
851    /// ```
852    pub fn field(mut self, field: &str) -> Self {
853        self.field = Some(field.to_string());
854        self
855    }
856
857    /// Specify the search query text
858    ///
859    /// # Examples
860   
861    /// .matching("wireless headphones")
862    /// ```
863    pub fn matching(mut self, query: &str) -> Self {
864        self.query = Some(query.to_string());
865        self
866    }
867
868    /// Enable or disable fuzzy matching (for typo tolerance)
869    ///
870    /// # Examples
871   
872    /// .fuzzy(true)  // Enable fuzzy matching
873    /// ```
874    pub fn fuzzy(mut self, enable: bool) -> Self {
875        self.fuzzy = enable;
876        self
877    }
878
879    /// Execute the search and collect the results
880    ///
881    /// # Returns
882    /// A vector of documents matching the search criteria
883    ///
884    /// # Examples
885   
886    /// let results = db.search("articles")
887    ///     .field("content")
888    ///     .matching("quantum computing")
889    ///     .collect()
890    ///     .await?;
891    /// ```
892    pub async fn collect(self) -> Result<Vec<Document>> {
893        let field = self
894            .field
895            .ok_or_else(|| AqlError::invalid_operation("Search field not specified"))?;
896        let query = self
897            .query
898            .ok_or_else(|| AqlError::invalid_operation("Search query not specified"))?;
899
900        self.db.search_text(&self.collection, &field, &query).await
901    }
902}
903
904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
905pub struct SimpleQueryBuilder {
906    pub collection: String,
907    pub filters: Vec<Filter>,
908    pub order_by: Option<(String, bool)>,
909    pub limit: Option<usize>,
910    pub offset: Option<usize>,
911}
912
913impl SimpleQueryBuilder {
914    pub fn new(collection: String) -> Self {
915        Self {
916            collection,
917            filters: Vec::new(),
918            order_by: None,
919            limit: None,
920            offset: None,
921        }
922    }
923
924    pub fn filter(mut self, filter: Filter) -> Self {
925        self.filters.push(filter);
926        self
927    }
928
929    /// Filter for exact equality
930    ///
931    /// Uses secondary index if the field is indexed (O(1) lookup).
932    /// Falls back to full collection scan if not indexed (O(n)).
933    ///
934    /// # Arguments
935    /// * `field` - The field name to filter on
936    /// * `value` - The exact value to match
937    ///
938    /// # Examples
939    ///
940   
941    /// use aurora_db::{Aurora, types::Value};
942    ///
943    /// let db = Aurora::open("mydb.db")?;
944    ///
945    /// // Find active users
946    /// let active_users = db.query("users")
947    ///     .filter(|f| f.eq("status", Value::String("active".into())))
948    ///     .collect()
949    ///     .await?;
950    ///
951    /// // Multiple equality filters (AND logic)
952    /// let premium_active = db.query("users")
953    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
954    ///     .filter(|f| f.eq("active", Value::Bool(true)))
955    ///     .collect()
956    ///     .await?;
957    ///
958    /// // Numeric equality
959    /// let age_30 = db.query("users")
960    ///     .filter(|f| f.eq("age", Value::Int(30)))
961    ///     .collect()
962    ///     .await?;
963    /// ```
964    pub fn eq(self, field: &str, value: Value) -> Self {
965        self.filter(Filter::Eq(field.to_string(), value))
966    }
967
968    /// Filter for greater than
969    ///
970    /// Finds all documents where the field value is strictly greater than
971    /// the provided value. With LIMIT, uses early termination for performance.
972    ///
973    /// # Arguments
974    /// * `field` - The field name to compare
975    /// * `value` - The minimum value (exclusive)
976    ///
977    /// # Performance
978    /// - Without LIMIT: O(n) - scans all documents
979    /// - With LIMIT: O(k) where k = limit + offset (early termination)
980    /// - No index support yet (planned for future)
981    ///
982    /// # Examples
983    ///
984   
985    /// use aurora_db::{Aurora, types::Value};
986    ///
987    /// let db = Aurora::open("mydb.db")?;
988    ///
989    /// // Find high scorers (with early termination)
990    /// let high_scorers = db.query("users")
991    ///     .filter(|f| f.gt("score", Value::Int(1000)))
992    ///     .limit(100)  // Stops after finding 100 matches
993    ///     .collect()
994    ///     .await?;
995    ///
996    /// // Price range queries
997    /// let expensive = db.query("products")
998    ///     .filter(|f| f.gt("price", Value::Float(99.99)))
999    ///     .order_by("price", false)  // Descending
1000    ///     .collect()
1001    ///     .await?;
1002    ///
1003    /// // Date filtering (timestamps as integers)
1004    /// let recent = db.query("events")
1005    ///     .filter(|f| f.gt("timestamp", Value::Int(1609459200)))  // After Jan 1, 2021
1006    ///     .collect()
1007    ///     .await?;
1008    /// ```
1009    pub fn gt(self, field: &str, value: Value) -> Self {
1010        self.filter(Filter::Gt(field.to_string(), value))
1011    }
1012
1013    /// Filter for greater than or equal to
1014    ///
1015    /// Finds all documents where the field value is greater than or equal to
1016    /// the provided value. Inclusive version of `gt()`.
1017    ///
1018    /// # Arguments
1019    /// * `field` - The field name to compare
1020    /// * `value` - The minimum value (inclusive)
1021    ///
1022    /// # Examples
1023    ///
1024   
1025    /// use aurora_db::{Aurora, types::Value};
1026    ///
1027    /// let db = Aurora::open("mydb.db")?;
1028    ///
1029    /// // Minimum age requirement (inclusive)
1030    /// let adults = db.query("users")
1031    ///     .filter(|f| f.gte("age", Value::Int(18)))
1032    ///     .collect()
1033    ///     .await?;
1034    ///
1035    /// // Inventory management
1036    /// let in_stock = db.query("products")
1037    ///     .filter(|f| f.gte("stock", Value::Int(1)))
1038    ///     .collect()
1039    ///     .await?;
1040    /// ```
1041    pub fn gte(self, field: &str, value: Value) -> Self {
1042        self.filter(Filter::Gte(field.to_string(), value))
1043    }
1044
1045    /// Filter for less than
1046    ///
1047    /// Finds all documents where the field value is strictly less than
1048    /// the provided value.
1049    ///
1050    /// # Arguments
1051    /// * `field` - The field name to compare
1052    /// * `value` - The maximum value (exclusive)
1053    ///
1054    /// # Examples
1055    ///
1056   
1057    /// use aurora_db::{Aurora, types::Value};
1058    ///
1059    /// let db = Aurora::open("mydb.db")?;
1060    ///
1061    /// // Low balance accounts
1062    /// let low_balance = db.query("accounts")
1063    ///     .filter(|f| f.lt("balance", Value::Float(10.0)))
1064    ///     .collect()
1065    ///     .await?;
1066    ///
1067    /// // Budget products
1068    /// let budget = db.query("products")
1069    ///     .filter(|f| f.lt("price", Value::Float(50.0)))
1070    ///     .order_by("price", true)  // Ascending
1071    ///     .collect()
1072    ///     .await?;
1073    /// ```
1074    pub fn lt(self, field: &str, value: Value) -> Self {
1075        self.filter(Filter::Lt(field.to_string(), value))
1076    }
1077
1078    /// Filter for less than or equal to
1079    ///
1080    /// Finds all documents where the field value is less than or equal to
1081    /// the provided value. Inclusive version of `lt()`.
1082    ///
1083    /// # Arguments
1084    /// * `field` - The field name to compare
1085    /// * `value` - The maximum value (inclusive)
1086    ///
1087    /// # Examples
1088    ///
1089   
1090    /// use aurora_db::{Aurora, types::Value};
1091    ///
1092    /// let db = Aurora::open("mydb.db")?;
1093    ///
1094    /// // Senior discount eligibility
1095    /// let seniors = db.query("users")
1096    ///     .filter(|f| f.lte("age", Value::Int(65)))
1097    ///     .collect()
1098    ///     .await?;
1099    ///
1100    /// // Clearance items
1101    /// let clearance = db.query("products")
1102    ///     .filter(|f| f.lte("price", Value::Float(20.0)))
1103    ///     .collect()
1104    ///     .await?;
1105    /// ```
1106    pub fn lte(self, field: &str, value: Value) -> Self {
1107        self.filter(Filter::Lte(field.to_string(), value))
1108    }
1109
1110    /// Filter for substring containment
1111    ///
1112    /// Finds all documents where the field value contains the specified substring.
1113    /// Case-sensitive matching. For text search, consider using the `search()` API instead.
1114    ///
1115    /// # Arguments
1116    /// * `field` - The field name to search in (must be a string field)
1117    /// * `value` - The substring to search for
1118    ///
1119    /// # Performance
1120    /// - Always O(n) - scans all documents
1121    /// - Case-sensitive string matching
1122    /// - For full-text search, use `db.search()` instead
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```
1127    /// use aurora_db::Aurora;
1128    ///
1129    /// let db = Aurora::open("mydb.db")?;
1130    ///
1131    /// // Find articles about Rust
1132    /// let rust_articles = db.query("articles")
1133    ///     .filter(|f| f.contains("title", "Rust"))
1134    ///     .collect()
1135    ///     .await?;
1136    ///
1137    /// // Email domain filtering
1138    /// let gmail_users = db.query("users")
1139    ///     .filter(|f| f.contains("email", "@gmail.com"))
1140    ///     .collect()
1141    ///     .await?;
1142    ///
1143    /// // Tag searching
1144    /// let rust_posts = db.query("posts")
1145    ///     .filter(|f| f.contains("tags", "rust"))
1146    ///     .collect()
1147    ///     .await?;
1148    /// ```
1149    ///
1150    /// # Note
1151    /// For case-insensitive search or more advanced text matching,
1152    /// use the full-text search API: `db.search(collection).query(text)`
1153    pub fn contains(self, field: &str, value: &str) -> Self {
1154        self.filter(Filter::Contains(field.to_string(), value.to_string()))
1155    }
1156
1157    /// Convenience method for range queries
1158    pub fn between(self, field: &str, min: Value, max: Value) -> Self {
1159        self.filter(Filter::Gte(field.to_string(), min))
1160            .filter(Filter::Lte(field.to_string(), max))
1161    }
1162
1163    /// Sort results by a field (ascending or descending)
1164    pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
1165        self.order_by = Some((field.to_string(), ascending));
1166        self
1167    }
1168
1169    /// Limit the number of results returned
1170    pub fn limit(mut self, limit: usize) -> Self {
1171        self.limit = Some(limit);
1172        self
1173    }
1174
1175    /// Skip a number of results (for pagination)
1176    pub fn offset(mut self, offset: usize) -> Self {
1177        self.offset = Some(offset);
1178        self
1179    }
1180}
1181
1182#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1183pub enum Filter {
1184    Eq(String, Value),
1185    Gt(String, Value),
1186    Gte(String, Value),
1187    Lt(String, Value),
1188    Lte(String, Value),
1189    Contains(String, String),
1190    And(Vec<Filter>),
1191    Or(Vec<Filter>),
1192}
1193
1194fn get_field_value<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
1195    // First, try a direct lookup for field names that might contain dots.
1196    if let Some(value) = doc.data.get(path) {
1197        return Some(value);
1198    }
1199
1200    if !path.contains('.') {
1201        return None;
1202    }
1203
1204    let parts: Vec<&str> = path.split('.').collect();
1205    let mut current = doc.data.get(parts[0])?;
1206
1207    for &part in &parts[1..] {
1208        if let Value::Object(map) = current {
1209            current = map.get(part)?;
1210        } else {
1211            return None;
1212        }
1213    }
1214
1215    Some(current)
1216}
1217
1218impl Filter {
1219    /// Check if a document matches this filter
1220    pub fn matches(&self, doc: &Document) -> bool {
1221        match self {
1222            Filter::Eq(field, value) => get_field_value(doc, field) == Some(value),
1223            Filter::Gt(field, value) => get_field_value(doc, field).is_some_and(|v| v > value),
1224            Filter::Gte(field, value) => get_field_value(doc, field).is_some_and(|v| v >= value),
1225            Filter::Lt(field, value) => get_field_value(doc, field).is_some_and(|v| v < value),
1226            Filter::Lte(field, value) => get_field_value(doc, field).is_some_and(|v| v <= value),
1227            Filter::Contains(field, substr) => get_field_value(doc, field).is_some_and(|v| {
1228                if let Value::String(s) = v {
1229                    s.contains(substr)
1230                } else if let Value::Array(arr) = v {
1231                    arr.contains(&Value::String(substr.clone()))
1232                } else {
1233                    false
1234                }
1235            }),
1236            Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
1237            Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
1238        }
1239    }
1240}