1use crate::Aurora;
7use crate::error::Result;
8use crate::types::{Document, Value};
9use roaring::RoaringBitmap;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
15pub struct SimpleQueryBuilder {
16 pub collection: String,
17 pub filters: Vec<Filter>,
18 pub order_by: Option<(String, bool)>,
19 pub limit: Option<usize>,
20 pub offset: Option<usize>,
21}
22
23pub struct QueryBuilder<'a> {
25 db: &'a Aurora,
26 collection: String,
27 filters: Vec<Filter>,
28 order_by: Option<(String, bool)>,
29 limit: Option<usize>,
30 offset: Option<usize>,
31 fields: Option<Vec<String>>,
32 debounce_duration: Option<std::time::Duration>,
33}
34
35pub struct FilterBuilder;
40
41impl FilterBuilder {
42 pub fn new() -> Self {
44 Self
45 }
46
47 pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
49 Filter::Eq(field.to_string(), value.into())
50 }
51
52 pub fn ne<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
54 Filter::Ne(field.to_string(), value.into())
55 }
56
57 pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> Filter {
59 Filter::In(
60 field.to_string(),
61 values.iter().cloned().map(|v| v.into()).collect(),
62 )
63 }
64
65 pub fn starts_with(&self, field: &str, value: &str) -> Filter {
67 Filter::StartsWith(field.to_string(), value.to_string())
68 }
69
70 pub fn contains(&self, field: &str, value: &str) -> Filter {
72 Filter::Contains(field.to_string(), value.to_string())
73 }
74
75 pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
77 Filter::Gt(field.to_string(), value.into())
78 }
79
80 pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
82 Filter::Gte(field.to_string(), value.into())
83 }
84
85 pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
87 Filter::Lt(field.to_string(), value.into())
88 }
89
90 pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
92 Filter::Lte(field.to_string(), value.into())
93 }
94
95 pub fn in_vec<T: Into<Value>>(&self, field: &str, values: Vec<T>) -> Filter {
97 Filter::In(
98 field.to_string(),
99 values.into_iter().map(|v| v.into()).collect(),
100 )
101 }
102
103 pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> Filter {
105 Filter::And(vec![
106 Filter::Gte(field.to_string(), min.into()),
107 Filter::Lte(field.to_string(), max.into()),
108 ])
109 }
110}
111
112impl<'a> QueryBuilder<'a> {
113 pub fn new(db: &'a Aurora, collection: &str) -> Self {
115 Self {
116 db,
117 collection: collection.to_string(),
118 filters: Vec::new(),
119 order_by: None,
120 limit: None,
121 offset: None,
122 fields: None,
123 debounce_duration: None,
124 }
125 }
126
127 pub fn filter<F>(mut self, f: F) -> Self
137 where
138 F: FnOnce(&FilterBuilder) -> Filter,
139 {
140 let builder = FilterBuilder::new();
141 self.filters.push(f(&builder));
142 self
143 }
144
145 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
151 self.order_by = Some((field.to_string(), ascending));
152 self
153 }
154
155 pub fn limit(mut self, limit: usize) -> Self {
160 self.limit = Some(limit);
161 self
162 }
163
164 pub fn offset(mut self, offset: usize) -> Self {
166 self.offset = Some(offset);
167 self
168 }
169
170 pub fn select(mut self, fields: Vec<&str>) -> Self {
174 self.fields = Some(fields.into_iter().map(|s| s.to_string()).collect());
175 self
176 }
177
178 pub fn debounce(mut self, duration: std::time::Duration) -> Self {
180 self.debounce_duration = Some(duration);
181 self
182 }
183
184 pub async fn first_one(self) -> Result<Option<Document>> {
188 let docs = self.limit(1).collect().await?;
189 Ok(docs.into_iter().next())
190 }
191
192 pub async fn collect(self) -> Result<Vec<Document>> {
196 self.db.ensure_indices_initialized().await?;
197
198 let mut candidate_bitmap: Option<RoaringBitmap> = None;
200
201 for filter in &self.filters {
202 if let Filter::Eq(field, value) = filter {
203 let index_key = format!("{}:{}", self.collection, field);
205 let val_str = match value {
206 Value::String(s) => s.clone(),
207 _ => value.to_string(),
208 };
209 let full_key = format!("{}:{}:{}", self.collection, field, val_str);
210
211 let mut current_bitmap = RoaringBitmap::new();
212 let mut found = false;
213
214 if let Some(loc) = self.db.index_manifest.get(&full_key) {
216 let (offset, len) = *loc.value();
217 if let Ok(guard) = self.db.mmap_index.read() {
218 if let Some(mmap) = guard.as_ref() {
219 if offset + len <= mmap.len() {
220 let bytes = &mmap[offset..(offset + len)];
221 if let Ok(cold_bitmap) = RoaringBitmap::deserialize_from(bytes) {
222 current_bitmap |= cold_bitmap;
223 found = true;
224 }
225 }
226 }
227 }
228 }
229
230 if let Some(storage_arc) = self.db.get_indexed_storage(&index_key, &val_str) {
232 if let Ok(storage) = storage_arc.read() {
233 current_bitmap |= storage.to_bitmap();
234 found = true;
235 }
236 }
237
238 if !found {
239 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
243 .try_with(|id| *id)
244 .ok()
245 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
246 .is_some();
247
248 if !in_transaction && self.db.has_index_key(&index_key) {
249 return Ok(vec![]);
250 }
251 candidate_bitmap = None;
253 break;
254 }
255
256 if let Some(ref mut existing) = candidate_bitmap {
257 *existing &= current_bitmap; } else {
259 candidate_bitmap = Some(current_bitmap);
260 }
261
262 if let Some(ref b) = candidate_bitmap {
265 if b.is_empty() {
266 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
267 .try_with(|id| *id)
268 .ok()
269 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
270 .is_some();
271 if !in_transaction {
272 return Ok(vec![]);
273 }
274 }
275 }
276 }
277 }
278
279 let mut docs = if let Some(bitmap) = candidate_bitmap {
280 let id_only = self
282 .fields
283 .as_ref()
284 .map(|f| f.len() == 1 && f[0] == "id")
285 .unwrap_or(false);
286
287 let tx_id = crate::transaction::ACTIVE_TRANSACTION_ID
289 .try_with(|id| *id)
290 .ok();
291
292 let tx_buffer =
293 tx_id.and_then(|id| self.db.transaction_manager.active_transactions.get(&id));
294
295 let mut final_docs = Vec::with_capacity(bitmap.len() as usize);
297 for internal_id in bitmap {
298 if let Some(external_id) = self.db.get_external_id(internal_id) {
299 if let Some(ref buffer) = tx_buffer {
301 let key = format!("{}:{}", self.collection, external_id);
302 if buffer.deletes.contains_key(&key) {
303 continue;
304 }
305 }
306
307 if id_only && self.filters.is_empty() {
308 final_docs.push(Document {
310 _sid: external_id,
311 data: HashMap::new(),
312 });
313 continue;
314 }
315
316 if let Ok(Some(doc)) = self.db.get_document(&self.collection, &external_id) {
317 if self.filters.iter().all(|f| f.matches(&doc)) {
319 final_docs.push(doc);
320 }
321 }
322 }
323 }
324
325 if let Some(buffer) = tx_buffer {
327 let prefix = format!("{}:", self.collection);
328 for item in buffer.writes.iter() {
329 let key: &String = item.key();
330 if let Some(external_id) = key.strip_prefix(&prefix) {
331 if final_docs.iter().any(|d| d._sid == external_id) {
333 continue;
334 }
335
336 let data: &Vec<u8> = item.value();
337 if let Ok(doc) = self.db.deserialize_internal::<Document>(data) {
338 if self.filters.iter().all(|f| f.matches(&doc)) {
339 final_docs.push(doc);
340 }
341 }
342 }
343 }
344 }
345
346 final_docs
347 } else {
348 let scan_limit = if self.order_by.is_none() {
353 self.limit.map(|l| l + self.offset.unwrap_or(0))
354 } else {
355 None
356 };
357
358 let db_filters = self.filters.clone();
359 self.db.scan_and_filter(
360 &self.collection,
361 move |doc| db_filters.iter().all(|f| f.matches(doc)),
362 scan_limit,
363 )?
364 };
365
366 if let Some((field, ascending)) = self.order_by {
368 docs.sort_by(|a, b| {
369 let v1 = a.data.get(&field);
370 let v2 = b.data.get(&field);
371 let ord = compare_values(v1, v2);
372 if ascending { ord } else { ord.reverse() }
373 });
374 }
375
376 let mut start = self.offset.unwrap_or(0);
378 if start > docs.len() {
379 start = docs.len();
380 }
381 let mut end = docs.len();
382 if let Some(max) = self.limit {
383 if start + max < end {
384 end = start + max;
385 }
386 }
387
388 let mut result = docs[start..end].to_vec();
389
390 if let Ok(computed) = self.db.computed.read() {
392 for doc in &mut result {
393 let _ = computed.apply(&self.collection, doc);
394 }
395 }
396
397 if let Some(ref fields) = self.fields {
399 let field_set: std::collections::HashSet<&str> =
400 fields.iter().map(|s| s.as_str()).collect();
401 for doc in &mut result {
402 doc.data.retain(|k, _| field_set.contains(k.as_str()));
403 }
404 }
405
406 Ok(result)
407 }
408
409 pub async fn count(self) -> Result<usize> {
411 let results = self.collect().await?;
412 Ok(results.len())
413 }
414
415 pub async fn delete(self) -> Result<usize> {
419 let db = self.db;
420 let collection = self.collection.clone();
421 let docs = self.collect().await?;
422 let count = docs.len();
423 for doc in docs {
424 let _ = db.aql_delete_document(&collection, &doc._sid).await;
425 }
426 Ok(count)
427 }
428
429 pub async fn watch(self) -> Result<crate::reactive::QueryWatcher> {
434 let collection = self.collection.clone();
435 let filters = self.filters.clone();
436 let db_clone = self.db.clone();
437 let debounce_duration = self.debounce_duration;
438
439 let initial_results = self.collect().await?;
440 let listener = db_clone.pubsub.listen(&collection);
441 let state = Arc::new(crate::reactive::ReactiveQueryState::new(filters));
442
443 Ok(crate::reactive::QueryWatcher::new(
444 Arc::new(db_clone),
445 collection,
446 listener,
447 state,
448 initial_results,
449 debounce_duration,
450 ))
451 }
452}
453
454pub struct SearchBuilder<'a> {
456 db: &'a Aurora,
457 collection: String,
458 query: String,
459 limit: Option<usize>,
460 fuzzy: bool,
461 distance: u8,
462 search_fields: Option<Vec<String>>,
463}
464
465fn fuzzy_score(
468 doc: &Document,
469 query_tokens: &[&str],
470 max_dist: usize,
471 fields: Option<&[String]>,
472) -> f32 {
473 let mut score = 0.0f32;
474 for (field, value) in &doc.data {
475 if let Some(allowed) = fields {
476 if !allowed.contains(field) {
477 continue;
478 }
479 }
480 if let crate::types::Value::String(text) = value {
481 let doc_tokens: Vec<String> =
482 text.split_whitespace().map(|t| t.to_lowercase()).collect();
483 for q in query_tokens {
484 for d in &doc_tokens {
485 let dist = crate::search::levenshtein_distance(q, d);
486 if dist <= max_dist {
487 score += 1.0 / (1.0 + dist as f32 * 0.3);
489 }
490 }
491 }
492 }
493 }
494 score
495}
496
497impl<'a> SearchBuilder<'a> {
498 pub fn new(db: &'a Aurora, collection: &str) -> Self {
500 Self {
501 db,
502 collection: collection.to_string(),
503 query: String::new(),
504 limit: None,
505 fuzzy: false,
506 distance: 0,
507 search_fields: None,
508 }
509 }
510
511 pub fn query(mut self, query: &str) -> Self {
513 self.query = query.to_string();
514 self
515 }
516
517 pub fn limit(mut self, limit: usize) -> Self {
519 self.limit = Some(limit);
520 self
521 }
522
523 pub fn fuzzy(mut self, distance: u8) -> Self {
525 self.fuzzy = true;
526 self.distance = distance;
527 self
528 }
529
530 pub fn fields(mut self, fields: Vec<String>) -> Self {
534 self.search_fields = Some(fields);
535 self
536 }
537
538 pub async fn collect_with_fields(self, fields: Option<&[String]>) -> Result<Vec<Document>> {
540 let builder = if let Some(f) = fields {
541 Self {
542 search_fields: Some(f.to_vec()),
543 ..self
544 }
545 } else {
546 self
547 };
548 builder.collect().await
549 }
550
551 pub async fn collect(self) -> Result<Vec<Document>> {
553 let query = self.query.to_lowercase();
554 let mut results = Vec::new();
555
556 if let Some(index) = self.db.primary_indices.get(&self.collection) {
557 if self.fuzzy && !query.is_empty() {
558 let query_tokens: Vec<&str> = query.split_whitespace().collect();
560 let max_dist = self.distance as usize;
561 let fields = self.search_fields.as_deref();
562 let mut scored: Vec<(f32, Document)> = Vec::new();
563
564 for entry in index.iter() {
565 if let Some(data) = self.db.get(entry.key())? {
566 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
567 let score = fuzzy_score(&doc, &query_tokens, max_dist, fields);
568 if score > 0.0 {
569 scored.push((score, doc));
570 }
571 }
572 }
573 }
574
575 scored.sort_by(|(a, _), (b, _)| {
576 b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal)
577 });
578 for (_, doc) in scored {
579 results.push(doc);
580 if let Some(l) = self.limit {
581 if results.len() >= l {
582 break;
583 }
584 }
585 }
586 } else {
587 for entry in index.iter() {
589 if let Some(data) = self.db.get(entry.key())? {
590 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
591 let matches = if query.is_empty() {
592 true
593 } else {
594 let fields_to_check = self.search_fields.as_deref();
595 doc.data.iter().any(|(k, v)| {
596 if let Some(ref allowed) = fields_to_check {
597 if !allowed.contains(k) {
598 return false;
599 }
600 }
601 if let crate::types::Value::String(s) = v {
602 s.to_lowercase().contains(&query)
603 } else {
604 false
605 }
606 })
607 };
608 if matches {
609 results.push(doc);
610 if let Some(l) = self.limit {
611 if results.len() >= l {
612 break;
613 }
614 }
615 }
616 }
617 }
618 }
619 }
620 }
621
622 Ok(results)
623 }
624}
625
626fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
627 match (a, b) {
628 (None, None) => std::cmp::Ordering::Equal,
629 (None, Some(_)) => std::cmp::Ordering::Less,
630 (Some(_), None) => std::cmp::Ordering::Greater,
631 (Some(v1), Some(v2)) => v1.partial_cmp(v2).unwrap_or(std::cmp::Ordering::Equal),
632 }
633}
634
635#[derive(Debug, Clone, Serialize, Deserialize)]
637pub enum Filter {
638 Eq(String, Value),
639 Ne(String, Value),
640 Gt(String, Value),
641 Gte(String, Value),
642 Lt(String, Value),
643 Lte(String, Value),
644 In(String, Vec<Value>),
645 Contains(String, String),
646 StartsWith(String, String),
647 IsNull(String),
648 IsNotNull(String),
649 Not(Box<Filter>),
650 And(Vec<Filter>),
651 Or(Vec<Filter>),
652}
653
654fn get_nested<'a>(doc: &'a Document, field: &str) -> Option<&'a Value> {
657 let mut parts = field.splitn(2, '.');
658 let first = parts.next()?;
659 let rest = parts.next();
660 let val = doc.data.get(first)?;
661 match rest {
662 None => Some(val),
663 Some(remaining) => get_nested_value(val, remaining),
664 }
665}
666
667fn get_field_owned(doc: &Document, field: &str) -> Option<Value> {
670 if field == "_sid" {
671 Some(Value::String(doc._sid.clone()))
672 } else {
673 get_nested(doc, field).cloned()
674 }
675}
676
677fn get_nested_value<'a>(val: &'a Value, path: &str) -> Option<&'a Value> {
678 let mut parts = path.splitn(2, '.');
679 let first = parts.next()?;
680 let rest = parts.next();
681 if let Value::Object(map) = val {
682 let child = map.get(first)?;
683 match rest {
684 None => Some(child),
685 Some(remaining) => get_nested_value(child, remaining),
686 }
687 } else {
688 None
689 }
690}
691
692impl std::ops::Not for Filter {
693 type Output = Self;
694 fn not(self) -> Self::Output {
695 Filter::Not(Box::new(self))
696 }
697}
698
699impl Filter {
700 pub fn matches(&self, doc: &Document) -> bool {
701 match self {
702 Filter::Eq(f, v) => get_field_owned(doc, f).as_ref() == Some(v),
703 Filter::Ne(f, v) => get_field_owned(doc, f).as_ref() != Some(v),
704 Filter::Gt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv > *v),
705 Filter::Gte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv >= *v),
706 Filter::Lt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv < *v),
707 Filter::Lte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv <= *v),
708 Filter::In(f, v) => get_field_owned(doc, f).map_or(false, |dv| v.contains(&dv)),
709 Filter::Contains(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
710 if let Value::String(s) = dv {
711 s.contains(v.as_str())
712 } else {
713 false
714 }
715 }),
716 Filter::StartsWith(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
717 if let Value::String(s) = dv {
718 s.starts_with(v.as_str())
719 } else {
720 false
721 }
722 }),
723 Filter::IsNull(f) => get_field_owned(doc, f).map_or(true, |v| matches!(v, Value::Null)),
724 Filter::IsNotNull(f) => {
725 get_field_owned(doc, f).map_or(false, |v| !matches!(v, Value::Null))
726 }
727 Filter::Not(f) => !f.matches(doc),
728 Filter::And(fs) => fs.iter().all(|f| f.matches(doc)),
729 Filter::Or(fs) => fs.iter().any(|f| f.matches(doc)),
730 }
731 }
732}
733
734impl std::ops::BitAnd for Filter {
735 type Output = Filter;
736 fn bitand(self, rhs: Self) -> Self::Output {
737 match (self, rhs) {
738 (Filter::And(mut a), Filter::And(mut b)) => {
739 a.append(&mut b);
740 Filter::And(a)
741 }
742 (Filter::And(mut a), b) => {
743 a.push(b);
744 Filter::And(a)
745 }
746 (a, Filter::And(mut b)) => {
747 b.insert(0, a);
748 Filter::And(b)
749 }
750 (a, b) => Filter::And(vec![a, b]),
751 }
752 }
753}
754
755impl std::ops::BitOr for Filter {
756 type Output = Filter;
757 fn bitor(self, rhs: Self) -> Self::Output {
758 match (self, rhs) {
759 (Filter::Or(mut a), Filter::Or(mut b)) => {
760 a.append(&mut b);
761 Filter::Or(a)
762 }
763 (Filter::Or(mut a), b) => {
764 a.push(b);
765 Filter::Or(a)
766 }
767 (a, Filter::Or(mut b)) => {
768 b.insert(0, a);
769 Filter::Or(b)
770 }
771 (a, b) => Filter::Or(vec![a, b]),
772 }
773 }
774}