aurora_db/query.rs
1//! # Aurora Query System
2//!
3//! This module provides a powerful, fluent query interface for filtering, sorting,
4//! and retrieving documents from Aurora collections.
5//!
6//! ## Architectural Note: "The Query as a Stream"
7//!
8//! Unlike traditional databases that parse SQL strings into an execution plan,
9//! Aurora's query engine is built around the **Fluent Builder Pattern** and **Iterators**.
10//!
11//! 1. **Type-Safe Construction**: Queries are built using Rust method chaining, ensuring
12//! validity at compile time (or runtime construction without parsing overhead).
13//! 2. **Zero-Allocation Filtering**: The `Queryable` trait and `DocumentFilter` type allow
14//! us to stack closures. These filters run directly on the data stream without
15//! intermediate allocations.
16//! 3. **Reactive Core**: The `watch()` method transforms a static query into a
17//! live stream of `QueryUpdate` events. This is the heart of the "Embedded Platform"
18//! concept—allowing the application to react to data changes instantly.
19//!
20//! ## Examples
21//!
22
23//! // Get all active users over 21
24//! let users = db.query("users")
25//! .filter(|f| f.eq("status", "active") && f.gt("age", 21))
26//! .order_by("last_login", false)
27//! .limit(20)
28//! .collect()
29//! .await?;
30//! ```
31
32use crate::Aurora;
33use crate::error::AqlError;
34use crate::error::Result;
35use crate::types::{Document, Value};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Trait for objects that can filter documents.
40///
41/// This trait is implemented for closures that take a reference to a
42/// `FilterBuilder` and return a boolean, allowing for a natural filter syntax.
43pub trait Queryable {
44 fn matches(&self, doc: &Document) -> bool;
45}
46
47impl<F> Queryable for F
48where
49 F: Fn(&Document) -> bool,
50{
51 fn matches(&self, doc: &Document) -> bool {
52 self(doc)
53 }
54}
55
56/// Type alias for document filter functions
57type DocumentFilter<'a> = Box<dyn Fn(&Document) -> bool + Send + Sync + 'a>;
58
59/// Builder for creating and executing document queries.
60///
61/// QueryBuilder uses a fluent interface pattern to construct
62/// and execute queries against Aurora collections.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// // Query for active premium users
68/// let premium_users = db.query("users")
69/// .filter(|f| f.eq("status", "active") && f.eq("account_type", "premium"))
70/// .order_by("created_at", false)
71/// .limit(10)
72/// .collect()
73/// .await?;
74/// ```
75pub struct QueryBuilder<'a> {
76 db: &'a Aurora,
77 collection: String,
78 filters: Vec<DocumentFilter<'a>>,
79 order_by: Option<(String, bool)>,
80 limit: Option<usize>,
81 offset: Option<usize>,
82 fields: Option<Vec<String>>,
83 /// Optional debounce duration for reactive queries (watch)
84 /// When set, updates are batched and emitted at most once per interval
85 debounce_duration: Option<std::time::Duration>,
86}
87
88/// Builder for constructing document filter expressions.
89///
90/// This struct provides methods for comparing document fields
91/// with values to create filter conditions.
92///
93/// # Examples
94///
95/// ```rust,no_run
96/// // Combine multiple filter conditions
97/// db.query("products")
98/// .filter(|f| {
99/// f.gte("price", 10.0) &&
100/// f.lte("price", 50.0) &&
101/// f.contains("name", "widget")
102/// })
103/// .collect()
104/// .await?;
105/// ```
106pub struct FilterBuilder<'a, 'b> {
107 doc: &'b Document,
108 _marker: std::marker::PhantomData<&'a ()>,
109}
110
111impl<'a, 'b> FilterBuilder<'a, 'b> {
112 /// Create a new filter builder for the given document
113 pub fn new(doc: &'b Document) -> Self {
114 Self {
115 doc,
116 _marker: std::marker::PhantomData,
117 }
118 }
119
120 /// Check if a field equals a value
121 ///
122 /// # Examples
123
124 /// .filter(|f| f.eq("status", "active"))
125 /// ```
126 pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> bool {
127 let value = value.into();
128 self.get_nested_value(field) == Some(&value)
129 }
130
131 /// Check if a field is greater than a value
132 ///
133 /// # Examples
134
135 /// .filter(|f| f.gt("age", 21))
136 /// ```
137 pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
138 let value = value.into();
139 self.get_nested_value(field).is_some_and(|v| v > &value)
140 }
141
142 /// Check if a field is greater than or equal to a value
143 ///
144 /// # Examples
145
146 /// .filter(|f| f.gte("age", 21))
147 /// ```
148 pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
149 let value = value.into();
150 self.get_nested_value(field).is_some_and(|v| v >= &value)
151 }
152
153 /// Check if a field is less than a value
154 ///
155 /// # Examples
156
157 /// .filter(|f| f.lt("age", 65))
158 /// ```
159 pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> bool {
160 let value = value.into();
161 self.get_nested_value(field).is_some_and(|v| v < &value)
162 }
163
164 /// Check if a field is less than or equal to a value
165 ///
166 /// # Examples
167
168 /// .filter(|f| f.lte("age", 65))
169 /// ```
170 pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> bool {
171 let value = value.into();
172 self.get_nested_value(field).is_some_and(|v| v <= &value)
173 }
174
175 /// Check if a field contains a value
176 ///
177 /// # Examples
178
179 /// .filter(|f| f.contains("name", "widget"))
180 /// ```
181 pub fn contains(&self, field: &str, value: &str) -> bool {
182 self.get_nested_value(field).is_some_and(|v| match v {
183 Value::String(s) => s.contains(value),
184 Value::Array(arr) => arr.contains(&Value::String(value.to_string())),
185 _ => false,
186 })
187 }
188
189 /// Check if a field is in a list of values
190 ///
191 /// # Examples
192
193 /// .filter(|f| f.in_values("status", &["active", "inactive"]))
194 /// ```
195 pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> bool {
196 let values: Vec<Value> = values.iter().map(|v| v.clone().into()).collect();
197 self.get_nested_value(field)
198 .is_some_and(|v| values.contains(v))
199 }
200
201 /// Check if a field is between two values (inclusive)
202 ///
203 /// # Examples
204
205 /// .filter(|f| f.between("age", 18, 65))
206 /// ```
207 pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> bool {
208 self.gte(field, min) && self.lte(field, max)
209 }
210
211 /// Check if a field exists and is not null
212 ///
213 /// # Examples
214
215 /// .filter(|f| f.exists("email"))
216 /// ```
217 pub fn exists(&self, field: &str) -> bool {
218 self.get_nested_value(field)
219 .is_some_and(|v| !matches!(v, Value::Null))
220 }
221
222 /// Check if a field doesn't exist or is null
223 ///
224 /// # Examples
225
226 /// .filter(|f| f.is_null("email"))
227 /// ```
228 pub fn is_null(&self, field: &str) -> bool {
229 self.get_nested_value(field)
230 .is_none_or(|v| matches!(v, Value::Null))
231 }
232
233 /// Check if a field starts with a prefix
234 ///
235 /// # Examples
236
237 /// .filter(|f| f.starts_with("name", "John"))
238 /// ```
239 pub fn starts_with(&self, field: &str, prefix: &str) -> bool {
240 self.get_nested_value(field).is_some_and(|v| match v {
241 Value::String(s) => s.starts_with(prefix),
242 _ => false,
243 })
244 }
245
246 /// Check if a field ends with a suffix
247 ///
248 /// # Examples
249
250 /// .filter(|f| f.ends_with("name", "son"))
251 /// ```
252 pub fn ends_with(&self, field: &str, suffix: &str) -> bool {
253 self.get_nested_value(field).is_some_and(|v| match v {
254 Value::String(s) => s.ends_with(suffix),
255 _ => false,
256 })
257 }
258
259 /// Check if a field is in an array
260 ///
261 /// # Examples
262
263 /// .filter(|f| f.array_contains("status", "active"))
264 /// ```
265 pub fn array_contains(&self, field: &str, value: impl Into<Value>) -> bool {
266 let value = value.into();
267 self.get_nested_value(field).is_some_and(|v| match v {
268 Value::Array(arr) => arr.contains(&value),
269 _ => false,
270 })
271 }
272
273 /// Check if an array has a specific length
274 ///
275 /// # Examples
276
277 /// .filter(|f| f.array_len_eq("status", 2))
278 /// ```
279 pub fn array_len_eq(&self, field: &str, len: usize) -> bool {
280 self.get_nested_value(field).is_some_and(|v| match v {
281 Value::Array(arr) => arr.len() == len,
282 _ => false,
283 })
284 }
285
286 /// Access a nested field using dot notation
287 ///
288 /// # Examples
289
290 /// .filter(|f| f.get_nested_value("user.address.city") == Some(&Value::String("New York")))
291 /// ```
292 pub fn get_nested_value(&self, path: &str) -> Option<&Value> {
293 let parts: Vec<&str> = path.split('.').collect();
294 let mut current = self.doc.data.get(parts[0])?;
295
296 for &part in &parts[1..] {
297 if let Value::Object(map) = current {
298 current = map.get(part)?;
299 } else {
300 return None;
301 }
302 }
303
304 Some(current)
305 }
306
307 /// Check if a nested field equals a value
308 ///
309 /// # Examples
310
311 /// .filter(|f| f.nested_eq("user.address.city", "New York"))
312 /// ```
313 pub fn nested_eq<T: Into<Value>>(&self, path: &str, value: T) -> bool {
314 let value = value.into();
315 self.get_nested_value(path) == Some(&value)
316 }
317
318 /// Check if a field matches a regular expression
319 ///
320 /// # Examples
321
322 /// .filter(|f| f.matches_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"))
323 /// ```
324 pub fn matches_regex(&self, field: &str, pattern: &str) -> bool {
325 use regex::Regex;
326
327 // Prevent DoS via overly complex/long regex
328 if pattern.len() > 100 {
329 return false;
330 }
331
332 if let Ok(re) = Regex::new(pattern) {
333 self.get_nested_value(field).is_some_and(|v| match v {
334 Value::String(s) => re.is_match(s),
335 _ => false,
336 })
337 } else {
338 false
339 }
340 }
341}
342
343impl<'a> QueryBuilder<'a> {
344 /// Create a new query builder for the specified collection
345 ///
346 /// # Examples
347
348 /// let query = db.query("users");
349 /// ```
350 pub fn new(db: &'a Aurora, collection: &str) -> Self {
351 Self {
352 db,
353 collection: collection.to_string(),
354 filters: Vec::new(),
355 order_by: None,
356 limit: None,
357 offset: None,
358 fields: None,
359 debounce_duration: None,
360 }
361 }
362
363 /// Add a filter function to the query
364 ///
365 /// # Examples
366
367 /// let active_users = db.query("users")
368 /// .filter(|f| f.eq("status", "active"))
369 /// .collect()
370 /// .await?;
371 /// ```
372 pub fn filter<F>(mut self, filter_fn: F) -> Self
373 where
374 // CHANGE 2: Require the closure `F` to be `Send + Sync`.
375 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'a,
376 {
377 self.filters.push(Box::new(move |doc| {
378 let filter_builder = FilterBuilder::new(doc);
379 filter_fn(&filter_builder)
380 }));
381 self
382 }
383
384 /// Sort results by a field (ascending or descending)
385 ///
386 /// # Parameters
387 /// * `field` - The field to sort by
388 /// * `ascending` - `true` for ascending order, `false` for descending
389 ///
390 /// # Examples
391 /// ```
392 /// // Sort by age ascending
393 /// .order_by("age", true)
394 ///
395 /// // Sort by creation date descending (newest first)
396 /// .order_by("created_at", false)
397 /// ```
398 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
399 self.order_by = Some((field.to_string(), ascending));
400 self
401 }
402
403 /// Limit the number of results returned
404 ///
405 /// # Examples
406 /// ```
407 /// // Get at most 10 results
408 /// .limit(10)
409 /// ```
410 pub fn limit(mut self, limit: usize) -> Self {
411 self.limit = Some(limit);
412 self
413 }
414
415 /// Skip a number of results (for pagination)
416 ///
417 /// # Examples
418 /// ```
419 /// // For pagination: skip the first 20 results and get the next 10
420 /// .offset(20).limit(10)
421 /// ```
422 pub fn offset(mut self, offset: usize) -> Self {
423 self.offset = Some(offset);
424 self
425 }
426
427 /// Select only specific fields to return
428 ///
429 /// # Examples
430 /// ```
431 /// // Only return name and email fields
432 /// .select(&["name", "email"])
433 /// ```
434 pub fn select(mut self, fields: &[&str]) -> Self {
435 self.fields = Some(fields.iter().map(|s| s.to_string()).collect());
436 self
437 }
438
439 /// Set debounce/throttle interval for reactive queries (watch)
440 ///
441 /// When watching a query, updates will be batched and emitted at most once
442 /// per interval. This prevents overwhelming the UI with high-frequency updates.
443 /// Events are deduplicated by document ID, keeping only the latest state.
444 ///
445 /// # Examples
446 /// ```
447 /// use std::time::Duration;
448 ///
449 /// // Rate-limit to max 10 updates per second
450 /// let mut watcher = db.query("logs")
451 /// .filter(|f| f.eq("level", "ERROR"))
452 /// .debounce(Duration::from_millis(100))
453 /// .watch()
454 /// .await?;
455 /// ```
456 pub fn debounce(mut self, duration: std::time::Duration) -> Self {
457 self.debounce_duration = Some(duration);
458 self
459 }
460
461 /// Execute the query and collect the results
462 ///
463 /// # Returns
464 /// A vector of documents matching the query criteria
465 ///
466 /// # Examples
467
468 /// let results = db.query("products")
469 /// .filter(|f| f.lt("price", 100))
470 /// .collect()
471 /// .await?;
472 /// ```
473 pub async fn collect(self) -> Result<Vec<Document>> {
474 self.collect_internal().await
475 }
476
477 /// Internal helper to execute query without consuming self
478 async fn collect_internal(&self) -> Result<Vec<Document>> {
479 // Ensure indices are initialized
480 self.db.ensure_indices_initialized().await?;
481
482 // Optimization: Use early termination for queries with LIMIT but no ORDER BY
483 let mut docs = if self.order_by.is_none() && self.limit.is_some() {
484 // Early termination path - scan only until we have enough results
485 let target = self.limit.unwrap() + self.offset.unwrap_or(0);
486 let filter = |doc: &Document| self.filters.iter().all(|f| f(doc));
487 self.db
488 .scan_and_filter(&self.collection, filter, Some(target))?
489 } else {
490 // Standard path - need all matching docs (for sorting or no limit)
491 let mut docs = self.db.get_all_collection(&self.collection).await?;
492 docs.retain(|doc| self.filters.iter().all(|f| f(doc)));
493 docs
494 };
495
496 // Apply ordering
497 if let Some((field, ascending)) = &self.order_by {
498 docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
499 (Some(v1), Some(v2)) => {
500 let cmp = v1.cmp(v2);
501 if *ascending { cmp } else { cmp.reverse() }
502 }
503 (None, Some(_)) => std::cmp::Ordering::Less,
504 (Some(_), None) => std::cmp::Ordering::Greater,
505 (None, None) => std::cmp::Ordering::Equal,
506 });
507 }
508
509 // Apply field selection if specified
510 if let Some(fields) = &self.fields {
511 // Create new documents with only selected fields
512 docs = docs
513 .into_iter()
514 .map(|doc| {
515 let mut new_data = HashMap::new();
516 // Always include the ID
517 for field in fields {
518 if let Some(value) = doc.data.get(field) {
519 new_data.insert(field.clone(), value.clone());
520 }
521 }
522 Document {
523 id: doc.id,
524 data: new_data,
525 }
526 })
527 .collect();
528 }
529
530 // Apply offset and limit safely
531 let start = self.offset.unwrap_or(0);
532 let end = self
533 .limit
534 .map(|l| start.saturating_add(l))
535 .unwrap_or(docs.len());
536
537 // Ensure we don't go out of bounds
538 let end = end.min(docs.len());
539 Ok(docs.get(start..end).unwrap_or(&[]).to_vec())
540 }
541
542 /// Watch the query for real-time updates
543 ///
544 /// Returns a QueryWatcher that streams live updates when documents are added,
545 /// removed, or modified in ways that affect the query results. Perfect for
546 /// building reactive UIs, live dashboards, and real-time applications.
547 ///
548 /// # Performance
549 /// - Zero overhead for queries without watchers
550 /// - Updates delivered asynchronously via channels
551 /// - Automatic filtering - only matching changes are emitted
552 /// - Memory efficient - only tracks matching documents
553 ///
554 /// # Requirements
555 /// This method requires the QueryBuilder to have a 'static lifetime,
556 /// which means the database reference must also be 'static (e.g., Arc<Aurora>).
557 ///
558 /// # Examples
559 ///
560 /// ```
561 /// use aurora_db::{Aurora, types::Value};
562 /// use std::sync::Arc;
563 ///
564 /// let db = Arc::new(Aurora::open("mydb.db")?);
565 ///
566 /// // Basic reactive query - watch active users
567 /// let mut watcher = db.query("users")
568 /// .filter(|f| f.eq("active", Value::Bool(true)))
569 /// .watch()
570 /// .await?;
571 ///
572 /// // Receive updates in real-time
573 /// while let Some(update) = watcher.next().await {
574 /// match update {
575 /// QueryUpdate::Added(doc) => {
576 /// println!("New active user: {}", doc.id);
577 /// },
578 /// QueryUpdate::Removed(doc) => {
579 /// println!("User deactivated: {}", doc.id);
580 /// },
581 /// QueryUpdate::Modified { old, new } => {
582 /// println!("User updated: {} -> {}", old.id, new.id);
583 /// },
584 /// }
585 /// }
586 /// ```
587 ///
588 /// # Real-World Use Cases
589 ///
590 /// **Live Leaderboard:**
591 /// ```
592 /// // Watch top players by score
593 /// let mut leaderboard = db.query("players")
594 /// .filter(|f| f.gte("score", Value::Int(1000)))
595 /// .watch()
596 /// .await?;
597 ///
598 /// tokio::spawn(async move {
599 /// while let Some(update) = leaderboard.next().await {
600 /// // Update UI with new rankings
601 /// broadcast_to_clients(&update).await;
602 /// }
603 /// });
604 /// ```
605 ///
606 /// **Activity Feed:**
607 /// ```
608 /// // Watch recent posts for a user's feed
609 /// let mut feed = db.query("posts")
610 /// .filter(|f| f.eq("author_id", user_id))
611 /// .watch()
612 /// .await?;
613 ///
614 /// // Stream updates to WebSocket
615 /// while let Some(update) = feed.next().await {
616 /// match update {
617 /// QueryUpdate::Added(post) => {
618 /// websocket.send(json!({"type": "new_post", "post": post})).await?;
619 /// },
620 /// _ => {}
621 /// }
622 /// }
623 /// ```
624 ///
625 /// **Real-Time Dashboard:**
626 /// ```
627 /// // Watch critical metrics
628 /// let mut alerts = db.query("metrics")
629 /// .filter(|f| f.gt("cpu_usage", Value::Float(80.0)))
630 /// .watch()
631 /// .await?;
632 ///
633 /// tokio::spawn(async move {
634 /// while let Some(update) = alerts.next().await {
635 /// if let QueryUpdate::Added(metric) = update {
636 /// // Alert on high CPU usage
637 /// send_alert(format!("High CPU: {:?}", metric)).await;
638 /// }
639 /// }
640 /// });
641 /// ```
642 ///
643 /// **Collaborative Editing:**
644 /// ```
645 /// // Watch document for changes from other users
646 /// let doc_id = "doc-123";
647 /// let mut changes = db.query("documents")
648 /// .filter(|f| f.eq("id", doc_id))
649 /// .watch()
650 /// .await?;
651 ///
652 /// tokio::spawn(async move {
653 /// while let Some(update) = changes.next().await {
654 /// if let QueryUpdate::Modified { old, new } = update {
655 /// // Merge changes from other editors
656 /// apply_remote_changes(&old, &new).await;
657 /// }
658 /// }
659 /// });
660 /// ```
661 ///
662 /// **Stock Ticker:**
663 /// ```
664 /// // Watch price changes
665 /// let mut price_watcher = db.query("stocks")
666 /// .filter(|f| f.eq("symbol", "AAPL"))
667 /// .watch()
668 /// .await?;
669 ///
670 /// while let Some(update) = price_watcher.next().await {
671 /// if let QueryUpdate::Modified { old, new } = update {
672 /// if let (Some(old_price), Some(new_price)) =
673 /// (old.data.get("price"), new.data.get("price")) {
674 /// println!("AAPL: {} -> {}", old_price, new_price);
675 /// }
676 /// }
677 /// }
678 /// ```
679 ///
680 /// # Multiple Watchers Pattern
681 ///
682 /// ```
683 /// // Watch multiple queries concurrently
684 /// let mut high_priority = db.query("tasks")
685 /// .filter(|f| f.eq("priority", Value::String("high".into())))
686 /// .watch()
687 /// .await?;
688 ///
689 /// let mut urgent = db.query("tasks")
690 /// .filter(|f| f.eq("status", Value::String("urgent".into())))
691 /// .watch()
692 /// .await?;
693 ///
694 /// tokio::spawn(async move {
695 /// loop {
696 /// tokio::select! {
697 /// Some(update) = high_priority.next() => {
698 /// println!("High priority: {:?}", update);
699 /// },
700 /// Some(update) = urgent.next() => {
701 /// println!("Urgent: {:?}", update);
702 /// },
703 /// }
704 /// }
705 /// });
706 /// ```
707 ///
708 /// # Important Notes
709 /// - Requires Arc<Aurora> for 'static lifetime
710 /// - Updates are delivered asynchronously
711 /// - Watcher keeps running until dropped
712 /// - Only matching documents trigger updates
713 /// - Use tokio::spawn to process updates in background
714 ///
715 /// # See Also
716 /// - `Aurora::listen()` for collection-level change notifications
717 /// - `QueryWatcher::next()` to receive the next update
718 /// - `QueryWatcher::try_next()` for non-blocking checks
719 pub async fn watch(mut self) -> Result<crate::reactive::QueryWatcher>
720 where
721 'a: 'static,
722 {
723 use crate::reactive::{QueryWatcher, ReactiveQueryState};
724 use std::sync::Arc;
725
726 // Get initial results BEFORE moving filters
727 let docs = self.collect_internal().await?;
728
729 // Extract context
730 let collection = self.collection.clone();
731 let db = self.db;
732 let debounce = self.debounce_duration;
733 let filters = std::mem::take(&mut self.filters);
734
735 // Create a listener for this collection
736 let listener = db.listen(&collection);
737
738 // Create filter closure that combines all the query filters
739 let filter_fn = move |doc: &Document| -> bool { filters.iter().all(|f| f(doc)) };
740
741 // Create reactive state
742 let state = Arc::new(ReactiveQueryState::new(filter_fn));
743
744 // Create and return watcher
745 Ok(QueryWatcher::new(
746 collection, listener, state, docs, debounce,
747 ))
748 }
749
750 /// Get only the first matching document or None if no matches
751 ///
752 /// # Examples
753
754 /// let user = db.query("users")
755 /// .filter(|f| f.eq("email", "jane@example.com"))
756 /// .first_one()
757 /// .await?;
758 /// ```
759 pub async fn first_one(self) -> Result<Option<Document>> {
760 self.limit(1).collect().await.map(|mut docs| docs.pop())
761 }
762
763 /// Count the number of documents matching the query
764 ///
765 /// # Examples
766
767 /// let active_count = db.query("users")
768 /// .filter(|f| f.eq("status", "active"))
769 /// .count()
770 /// .await?;
771 /// ```
772 pub async fn count(self) -> Result<usize> {
773 self.collect().await.map(|docs| docs.len())
774 }
775
776 /// Update documents matching the query with new field values
777 ///
778 /// # Returns
779 /// The number of documents updated
780 ///
781 /// # Examples
782
783 /// let updated = db.query("products")
784 /// .filter(|f| f.lt("stock", 5))
785 /// .update([
786 /// ("status", Value::String("low_stock".to_string())),
787 /// ("needs_reorder", Value::Bool(true))
788 /// ].into_iter().collect())
789 /// .await?;
790 /// ```
791 pub async fn update(self, updates: HashMap<&str, Value>) -> Result<usize> {
792 // Store a reference to the db and collection before consuming self
793 let db = self.db;
794 let collection = self.collection.clone();
795
796 let docs = self.collect().await?;
797 let mut updated_count = 0;
798
799 for doc in docs {
800 let mut updated_doc = doc.clone();
801 let mut changed = false;
802
803 for (field, value) in &updates {
804 updated_doc.data.insert(field.to_string(), value.clone());
805 changed = true;
806 }
807
808 if changed {
809 // Update document in the database
810 db.put(
811 format!("{}:{}", collection, updated_doc.id),
812 serde_json::to_vec(&updated_doc)?,
813 None,
814 )
815 .await?;
816 updated_count += 1;
817 }
818 }
819
820 Ok(updated_count)
821 }
822
823 /// Delete documents matching the query
824 ///
825 /// # Returns
826 /// The number of documents deleted
827 ///
828 /// # Examples
829 /// ```rust,no_run
830 /// let deleted = db.query("users")
831 /// .filter(|f| f.eq("active", false))
832 /// .delete()
833 /// .await?;
834 /// ```
835 /// Delete documents matching the query
836 ///
837 /// # Returns
838 /// The number of documents deleted
839 ///
840 /// # Note
841 /// Deletions are not atomic. If an error occurs during deletion,
842 /// some documents may have been deleted while others remain.
843 /// For atomic batch operations, consider using transactions.
844 ///
845 /// # Examples
846 /// ```no_run
847 /// # use aurora_db::{Aurora, Result};
848 /// # async fn example(db: &Aurora) -> Result<()> {
849 /// // Delete all inactive users
850 /// let deleted = db.query("users")
851 /// .filter(|f| f.eq("status", "inactive"))
852 /// .delete()
853 /// .await?;
854 /// println!("Deleted {} users", deleted);
855 /// # Ok(())
856 /// # }
857 /// ```
858 pub async fn delete(self) -> Result<usize> {
859 // Store reference to db and collection before consuming self
860 let db = self.db;
861 let collection = self.collection.clone();
862
863 // Use collect() to get matching documents (leverages optimization)
864 let docs = self.collect().await?;
865 let mut deleted_count = 0;
866
867 for doc in docs {
868 let key = format!("{}:{}", collection, doc.id);
869 db.delete(&key).await?;
870 deleted_count += 1;
871 }
872
873 Ok(deleted_count)
874 }
875}
876
877/// Builder for performing full-text search operations
878///
879/// # Examples
880/// ```rust,no_run
881/// let results = db.search("products")
882/// .field("description")
883/// .matching("wireless headphones")
884/// .fuzzy(true)
885/// .collect()
886/// .await?;
887/// ```
888pub struct SearchBuilder<'a> {
889 db: &'a Aurora,
890 collection: String,
891 field: Option<String>,
892 query: Option<String>,
893 fuzzy: bool,
894}
895
896impl<'a> SearchBuilder<'a> {
897 /// Create a new search builder for the specified collection
898 pub fn new(db: &'a Aurora, collection: &str) -> Self {
899 Self {
900 db,
901 collection: collection.to_string(),
902 field: None,
903 query: None,
904 fuzzy: false,
905 }
906 }
907
908 /// Specify the field to search in
909 ///
910 /// # Examples
911
912 /// .field("description")
913 /// ```
914 pub fn field(mut self, field: &str) -> Self {
915 self.field = Some(field.to_string());
916 self
917 }
918
919 /// Specify the search query text
920 ///
921 /// # Examples
922
923 /// .matching("wireless headphones")
924 /// ```
925 pub fn matching(mut self, query: &str) -> Self {
926 self.query = Some(query.to_string());
927 self
928 }
929
930 /// Enable or disable fuzzy matching (for typo tolerance)
931 ///
932 /// # Examples
933
934 /// .fuzzy(true) // Enable fuzzy matching
935 /// ```
936 pub fn fuzzy(mut self, enable: bool) -> Self {
937 self.fuzzy = enable;
938 self
939 }
940
941 /// Execute the search and collect the results
942 ///
943 /// # Returns
944 /// A vector of documents matching the search criteria
945 ///
946 /// # Examples
947
948 /// let results = db.search("articles")
949 /// .field("content")
950 /// .matching("quantum computing")
951 /// .collect()
952 /// .await?;
953 /// ```
954 pub async fn collect(self) -> Result<Vec<Document>> {
955 let field = self
956 .field
957 .ok_or_else(|| AqlError::invalid_operation("Search field not specified"))?;
958 let query = self
959 .query
960 .ok_or_else(|| AqlError::invalid_operation("Search query not specified"))?;
961
962 self.db.search_text(&self.collection, &field, &query).await
963 }
964}
965
966#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
967pub struct SimpleQueryBuilder {
968 pub collection: String,
969 pub filters: Vec<Filter>,
970 pub order_by: Option<(String, bool)>,
971 pub limit: Option<usize>,
972 pub offset: Option<usize>,
973}
974
975impl SimpleQueryBuilder {
976 pub fn new(collection: String) -> Self {
977 Self {
978 collection,
979 filters: Vec::new(),
980 order_by: None,
981 limit: None,
982 offset: None,
983 }
984 }
985
986 pub fn filter(mut self, filter: Filter) -> Self {
987 self.filters.push(filter);
988 self
989 }
990
991 /// Filter for exact equality
992 ///
993 /// Uses secondary index if the field is indexed (O(1) lookup).
994 /// Falls back to full collection scan if not indexed (O(n)).
995 ///
996 /// # Arguments
997 /// * `field` - The field name to filter on
998 /// * `value` - The exact value to match
999 ///
1000 /// # Examples
1001 ///
1002
1003 /// use aurora_db::{Aurora, types::Value};
1004 ///
1005 /// let db = Aurora::open("mydb.db")?;
1006 ///
1007 /// // Find active users
1008 /// let active_users = db.query("users")
1009 /// .filter(|f| f.eq("status", Value::String("active".into())))
1010 /// .collect()
1011 /// .await?;
1012 ///
1013 /// // Multiple equality filters (AND logic)
1014 /// let premium_active = db.query("users")
1015 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
1016 /// .filter(|f| f.eq("active", Value::Bool(true)))
1017 /// .collect()
1018 /// .await?;
1019 ///
1020 /// // Numeric equality
1021 /// let age_30 = db.query("users")
1022 /// .filter(|f| f.eq("age", Value::Int(30)))
1023 /// .collect()
1024 /// .await?;
1025 /// ```
1026 pub fn eq(self, field: &str, value: Value) -> Self {
1027 self.filter(Filter::Eq(field.to_string(), value))
1028 }
1029
1030 /// Filter for greater than
1031 ///
1032 /// Finds all documents where the field value is strictly greater than
1033 /// the provided value. With LIMIT, uses early termination for performance.
1034 ///
1035 /// # Arguments
1036 /// * `field` - The field name to compare
1037 /// * `value` - The minimum value (exclusive)
1038 ///
1039 /// # Performance
1040 /// - Without LIMIT: O(n) - scans all documents
1041 /// - With LIMIT: O(k) where k = limit + offset (early termination)
1042 /// - No index support yet (planned for future)
1043 ///
1044 /// # Examples
1045 ///
1046
1047 /// use aurora_db::{Aurora, types::Value};
1048 ///
1049 /// let db = Aurora::open("mydb.db")?;
1050 ///
1051 /// // Find high scorers (with early termination)
1052 /// let high_scorers = db.query("users")
1053 /// .filter(|f| f.gt("score", Value::Int(1000)))
1054 /// .limit(100) // Stops after finding 100 matches
1055 /// .collect()
1056 /// .await?;
1057 ///
1058 /// // Price range queries
1059 /// let expensive = db.query("products")
1060 /// .filter(|f| f.gt("price", Value::Float(99.99)))
1061 /// .order_by("price", false) // Descending
1062 /// .collect()
1063 /// .await?;
1064 ///
1065 /// // Date filtering (timestamps as integers)
1066 /// let recent = db.query("events")
1067 /// .filter(|f| f.gt("timestamp", Value::Int(1609459200))) // After Jan 1, 2021
1068 /// .collect()
1069 /// .await?;
1070 /// ```
1071 pub fn gt(self, field: &str, value: Value) -> Self {
1072 self.filter(Filter::Gt(field.to_string(), value))
1073 }
1074
1075 /// Filter for greater than or equal to
1076 ///
1077 /// Finds all documents where the field value is greater than or equal to
1078 /// the provided value. Inclusive version of `gt()`.
1079 ///
1080 /// # Arguments
1081 /// * `field` - The field name to compare
1082 /// * `value` - The minimum value (inclusive)
1083 ///
1084 /// # Examples
1085 ///
1086
1087 /// use aurora_db::{Aurora, types::Value};
1088 ///
1089 /// let db = Aurora::open("mydb.db")?;
1090 ///
1091 /// // Minimum age requirement (inclusive)
1092 /// let adults = db.query("users")
1093 /// .filter(|f| f.gte("age", Value::Int(18)))
1094 /// .collect()
1095 /// .await?;
1096 ///
1097 /// // Inventory management
1098 /// let in_stock = db.query("products")
1099 /// .filter(|f| f.gte("stock", Value::Int(1)))
1100 /// .collect()
1101 /// .await?;
1102 /// ```
1103 pub fn gte(self, field: &str, value: Value) -> Self {
1104 self.filter(Filter::Gte(field.to_string(), value))
1105 }
1106
1107 /// Filter for less than
1108 ///
1109 /// Finds all documents where the field value is strictly less than
1110 /// the provided value.
1111 ///
1112 /// # Arguments
1113 /// * `field` - The field name to compare
1114 /// * `value` - The maximum value (exclusive)
1115 ///
1116 /// # Examples
1117 ///
1118
1119 /// use aurora_db::{Aurora, types::Value};
1120 ///
1121 /// let db = Aurora::open("mydb.db")?;
1122 ///
1123 /// // Low balance accounts
1124 /// let low_balance = db.query("accounts")
1125 /// .filter(|f| f.lt("balance", Value::Float(10.0)))
1126 /// .collect()
1127 /// .await?;
1128 ///
1129 /// // Budget products
1130 /// let budget = db.query("products")
1131 /// .filter(|f| f.lt("price", Value::Float(50.0)))
1132 /// .order_by("price", true) // Ascending
1133 /// .collect()
1134 /// .await?;
1135 /// ```
1136 pub fn lt(self, field: &str, value: Value) -> Self {
1137 self.filter(Filter::Lt(field.to_string(), value))
1138 }
1139
1140 /// Filter for less than or equal to
1141 ///
1142 /// Finds all documents where the field value is less than or equal to
1143 /// the provided value. Inclusive version of `lt()`.
1144 ///
1145 /// # Arguments
1146 /// * `field` - The field name to compare
1147 /// * `value` - The maximum value (inclusive)
1148 ///
1149 /// # Examples
1150 ///
1151
1152 /// use aurora_db::{Aurora, types::Value};
1153 ///
1154 /// let db = Aurora::open("mydb.db")?;
1155 ///
1156 /// // Senior discount eligibility
1157 /// let seniors = db.query("users")
1158 /// .filter(|f| f.lte("age", Value::Int(65)))
1159 /// .collect()
1160 /// .await?;
1161 ///
1162 /// // Clearance items
1163 /// let clearance = db.query("products")
1164 /// .filter(|f| f.lte("price", Value::Float(20.0)))
1165 /// .collect()
1166 /// .await?;
1167 /// ```
1168 pub fn lte(self, field: &str, value: Value) -> Self {
1169 self.filter(Filter::Lte(field.to_string(), value))
1170 }
1171
1172 /// Filter for substring containment
1173 ///
1174 /// Finds all documents where the field value contains the specified substring.
1175 /// Case-sensitive matching. For text search, consider using the `search()` API instead.
1176 ///
1177 /// # Arguments
1178 /// * `field` - The field name to search in (must be a string field)
1179 /// * `value` - The substring to search for
1180 ///
1181 /// # Performance
1182 /// - Always O(n) - scans all documents
1183 /// - Case-sensitive string matching
1184 /// - For full-text search, use `db.search()` instead
1185 ///
1186 /// # Examples
1187 ///
1188 /// ```
1189 /// use aurora_db::Aurora;
1190 ///
1191 /// let db = Aurora::open("mydb.db")?;
1192 ///
1193 /// // Find articles about Rust
1194 /// let rust_articles = db.query("articles")
1195 /// .filter(|f| f.contains("title", "Rust"))
1196 /// .collect()
1197 /// .await?;
1198 ///
1199 /// // Email domain filtering
1200 /// let gmail_users = db.query("users")
1201 /// .filter(|f| f.contains("email", "@gmail.com"))
1202 /// .collect()
1203 /// .await?;
1204 ///
1205 /// // Tag searching
1206 /// let rust_posts = db.query("posts")
1207 /// .filter(|f| f.contains("tags", "rust"))
1208 /// .collect()
1209 /// .await?;
1210 /// ```
1211 ///
1212 /// # Note
1213 /// For case-insensitive search or more advanced text matching,
1214 /// use the full-text search API: `db.search(collection).query(text)`
1215 pub fn contains(self, field: &str, value: &str) -> Self {
1216 self.filter(Filter::Contains(field.to_string(), value.to_string()))
1217 }
1218
1219 /// Convenience method for range queries
1220 pub fn between(self, field: &str, min: Value, max: Value) -> Self {
1221 self.filter(Filter::Gte(field.to_string(), min))
1222 .filter(Filter::Lte(field.to_string(), max))
1223 }
1224
1225 /// Sort results by a field (ascending or descending)
1226 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
1227 self.order_by = Some((field.to_string(), ascending));
1228 self
1229 }
1230
1231 /// Limit the number of results returned
1232 pub fn limit(mut self, limit: usize) -> Self {
1233 self.limit = Some(limit);
1234 self
1235 }
1236
1237 /// Skip a number of results (for pagination)
1238 pub fn offset(mut self, offset: usize) -> Self {
1239 self.offset = Some(offset);
1240 self
1241 }
1242}
1243
1244#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1245pub enum Filter {
1246 Eq(String, Value),
1247 Gt(String, Value),
1248 Gte(String, Value),
1249 Lt(String, Value),
1250 Lte(String, Value),
1251 Contains(String, String),
1252 And(Vec<Filter>),
1253 Or(Vec<Filter>),
1254}
1255
1256fn get_field_value<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
1257 // First, try a direct lookup for field names that might contain dots.
1258 if let Some(value) = doc.data.get(path) {
1259 return Some(value);
1260 }
1261
1262 if !path.contains('.') {
1263 return None;
1264 }
1265
1266 let parts: Vec<&str> = path.split('.').collect();
1267 let mut current = doc.data.get(parts[0])?;
1268
1269 for &part in &parts[1..] {
1270 if let Value::Object(map) = current {
1271 current = map.get(part)?;
1272 } else {
1273 return None;
1274 }
1275 }
1276
1277 Some(current)
1278}
1279
1280impl Filter {
1281 /// Check if a document matches this filter
1282 pub fn matches(&self, doc: &Document) -> bool {
1283 match self {
1284 Filter::Eq(field, value) => get_field_value(doc, field) == Some(value),
1285 Filter::Gt(field, value) => get_field_value(doc, field).is_some_and(|v| v > value),
1286 Filter::Gte(field, value) => get_field_value(doc, field).is_some_and(|v| v >= value),
1287 Filter::Lt(field, value) => get_field_value(doc, field).is_some_and(|v| v < value),
1288 Filter::Lte(field, value) => get_field_value(doc, field).is_some_and(|v| v <= value),
1289 Filter::Contains(field, substr) => get_field_value(doc, field).is_some_and(|v| {
1290 if let Value::String(s) = v {
1291 s.contains(substr)
1292 } else if let Value::Array(arr) = v {
1293 arr.contains(&Value::String(substr.clone()))
1294 } else {
1295 false
1296 }
1297 }),
1298 Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
1299 Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
1300 }
1301 }
1302}