1use crate::Aurora;
7use crate::error::Result;
8use crate::types::{Document, Value};
9use std::collections::HashMap;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use roaring::RoaringBitmap;
13
14#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
15pub struct SimpleQueryBuilder {
16 pub collection: String,
17 pub filters: Vec<Filter>,
18 pub order_by: Option<(String, bool)>,
19 pub limit: Option<usize>,
20 pub offset: Option<usize>,
21}
22
23pub struct QueryBuilder<'a> {
25 db: &'a Aurora,
26 collection: String,
27 filters: Vec<Filter>,
28 order_by: Option<(String, bool)>,
29 limit: Option<usize>,
30 offset: Option<usize>,
31 fields: Option<Vec<String>>,
32 debounce_duration: Option<std::time::Duration>,
33}
34
35pub struct FilterBuilder;
37
38impl FilterBuilder {
39 pub fn new() -> Self {
40 Self
41 }
42
43 pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
44 Filter::Eq(field.to_string(), value.into())
45 }
46
47 pub fn ne<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
48 Filter::Ne(field.to_string(), value.into())
49 }
50
51 pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> Filter {
52 Filter::In(field.to_string(), values.iter().cloned().map(|v| v.into()).collect())
53 }
54
55 pub fn starts_with(&self, field: &str, value: &str) -> Filter {
56 Filter::StartsWith(field.to_string(), value.to_string())
57 }
58
59 pub fn contains(&self, field: &str, value: &str) -> Filter {
60 Filter::Contains(field.to_string(), value.to_string())
61 }
62
63 pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
64 Filter::Gt(field.to_string(), value.into())
65 }
66
67 pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
68 Filter::Gte(field.to_string(), value.into())
69 }
70
71 pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
72 Filter::Lt(field.to_string(), value.into())
73 }
74
75 pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
76 Filter::Lte(field.to_string(), value.into())
77 }
78
79 pub fn in_vec<T: Into<Value>>(&self, field: &str, values: Vec<T>) -> Filter {
80 Filter::In(field.to_string(), values.into_iter().map(|v| v.into()).collect())
81 }
82
83 pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> Filter {
84 Filter::And(vec![
85 Filter::Gte(field.to_string(), min.into()),
86 Filter::Lte(field.to_string(), max.into()),
87 ])
88 }
89}
90
91impl<'a> QueryBuilder<'a> {
92 pub fn new(db: &'a Aurora, collection: &str) -> Self {
93 Self {
94 db,
95 collection: collection.to_string(),
96 filters: Vec::new(),
97 order_by: None,
98 limit: None,
99 offset: None,
100 fields: None,
101 debounce_duration: None,
102 }
103 }
104
105 pub fn filter<F>(mut self, f: F) -> Self
106 where
107 F: FnOnce(&FilterBuilder) -> Filter,
108 {
109 let builder = FilterBuilder::new();
110 self.filters.push(f(&builder));
111 self
112 }
113
114 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
115 self.order_by = Some((field.to_string(), ascending));
116 self
117 }
118
119 pub fn limit(mut self, limit: usize) -> Self {
120 self.limit = Some(limit);
121 self
122 }
123
124 pub fn offset(mut self, offset: usize) -> Self {
125 self.offset = Some(offset);
126 self
127 }
128
129 pub fn select(mut self, fields: Vec<&str>) -> Self {
130 self.fields = Some(fields.into_iter().map(|s| s.to_string()).collect());
131 self
132 }
133
134 pub fn debounce(mut self, duration: std::time::Duration) -> Self {
135 self.debounce_duration = Some(duration);
136 self
137 }
138
139 pub async fn first_one(self) -> Result<Option<Document>> {
140 let docs = self.limit(1).collect().await?;
141 Ok(docs.into_iter().next())
142 }
143
144 pub async fn collect(self) -> Result<Vec<Document>> {
147 self.db.ensure_indices_initialized().await?;
148
149 let mut candidate_bitmap: Option<RoaringBitmap> = None;
151
152 for filter in &self.filters {
153 if let Filter::Eq(field, value) = filter {
154 let index_key = format!("{}:{}", self.collection, field);
156 let val_str = match value {
157 Value::String(s) => s.clone(),
158 _ => value.to_string(),
159 };
160 let full_key = format!("{}:{}:{}", self.collection, field, val_str);
161
162 let mut current_bitmap = RoaringBitmap::new();
163 let mut found = false;
164
165 if let Some(loc) = self.db.index_manifest.get(&full_key) {
167 let (offset, len) = *loc.value();
168 if let Ok(guard) = self.db.mmap_index.read() {
169 if let Some(mmap) = guard.as_ref() {
170 if offset + len <= mmap.len() {
171 let bytes = &mmap[offset..(offset + len)];
172 if let Ok(cold_bitmap) = RoaringBitmap::deserialize_from(bytes) {
173 current_bitmap |= cold_bitmap;
174 found = true;
175 }
176 }
177 }
178 }
179 }
180
181 if let Some(storage_arc) = self.db.get_indexed_storage(&index_key, &val_str) {
183 if let Ok(storage) = storage_arc.read() {
184 current_bitmap |= storage.to_bitmap();
185 found = true;
186 }
187 }
188
189 if !found {
190 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
194 .try_with(|id| *id)
195 .ok()
196 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
197 .is_some();
198
199 if !in_transaction && self.db.has_index_key(&index_key) {
200 return Ok(vec![]);
201 }
202 candidate_bitmap = None;
204 break;
205 }
206
207 if let Some(ref mut existing) = candidate_bitmap {
208 *existing &= current_bitmap; } else {
210 candidate_bitmap = Some(current_bitmap);
211 }
212
213 if let Some(ref b) = candidate_bitmap {
216 if b.is_empty() {
217 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
218 .try_with(|id| *id)
219 .ok()
220 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
221 .is_some();
222 if !in_transaction {
223 return Ok(vec![]);
224 }
225 }
226 }
227 }
228 }
229
230 let mut docs = if let Some(bitmap) = candidate_bitmap {
231 let id_only = self.fields.as_ref().map(|f| f.len() == 1 && f[0] == "id").unwrap_or(false);
233
234 let tx_id = crate::transaction::ACTIVE_TRANSACTION_ID
236 .try_with(|id| *id)
237 .ok();
238
239 let tx_buffer = tx_id.and_then(|id| self.db.transaction_manager.active_transactions.get(&id));
240
241 let mut final_docs = Vec::with_capacity(bitmap.len() as usize);
243 for internal_id in bitmap {
244 if let Some(external_id) = self.db.get_external_id(internal_id) {
245 if let Some(ref buffer) = tx_buffer {
247 let key = format!("{}:{}", self.collection, external_id);
248 if buffer.deletes.contains_key(&key) {
249 continue;
250 }
251 }
252
253 if id_only && self.filters.is_empty() {
254 final_docs.push(Document { _sid: external_id, data: HashMap::new() });
256 continue;
257 }
258
259 if let Ok(Some(doc)) = self.db.get_document(&self.collection, &external_id) {
260 if self.filters.iter().all(|f| f.matches(&doc)) {
262 final_docs.push(doc);
263 }
264 }
265 }
266 }
267
268 if let Some(buffer) = tx_buffer {
270 let prefix = format!("{}:", self.collection);
271 for item in buffer.writes.iter() {
272 let key: &String = item.key();
273 if let Some(external_id) = key.strip_prefix(&prefix) {
274
275 if final_docs.iter().any(|d| d._sid == external_id) {
277 continue;
278 }
279
280 let data: &Vec<u8> = item.value();
281 if let Ok(doc) = self.db.deserialize_internal::<Document>(data) {
282 if self.filters.iter().all(|f| f.matches(&doc)) {
283 final_docs.push(doc);
284 }
285 }
286 }
287 }
288 }
289
290 final_docs
291 } else {
292 let scan_limit = if self.order_by.is_none() {
297 self.limit.map(|l| l + self.offset.unwrap_or(0))
298 } else {
299 None
300 };
301
302 let db_filters = self.filters.clone();
303 self.db.scan_and_filter(&self.collection, move |doc| {
304 db_filters.iter().all(|f| f.matches(doc))
305 }, scan_limit)?
306 };
307
308 if let Some((field, ascending)) = self.order_by {
310 docs.sort_by(|a, b| {
311 let v1 = a.data.get(&field);
312 let v2 = b.data.get(&field);
313 let ord = compare_values(v1, v2);
314 if ascending { ord } else { ord.reverse() }
315 });
316 }
317
318 let mut start = self.offset.unwrap_or(0);
320 if start > docs.len() { start = docs.len(); }
321 let mut end = docs.len();
322 if let Some(max) = self.limit {
323 if start + max < end { end = start + max; }
324 }
325
326 let mut result = docs[start..end].to_vec();
327
328 if let Ok(computed) = self.db.computed.read() {
330 for doc in &mut result {
331 let _ = computed.apply(&self.collection, doc);
332 }
333 }
334
335 if let Some(ref fields) = self.fields {
337 let field_set: std::collections::HashSet<&str> =
338 fields.iter().map(|s| s.as_str()).collect();
339 for doc in &mut result {
340 doc.data.retain(|k, _| field_set.contains(k.as_str()));
341 }
342 }
343
344 Ok(result)
345 }
346
347 pub async fn count(self) -> Result<usize> {
348 let results = self.collect().await?;
349 Ok(results.len())
350 }
351
352 pub async fn delete(self) -> Result<usize> {
353 let db = self.db;
354 let collection = self.collection.clone();
355 let docs = self.collect().await?;
356 let count = docs.len();
357 for doc in docs {
358 let _ = db.aql_delete_document(&collection, &doc._sid).await;
359 }
360 Ok(count)
361 }
362
363 pub async fn watch(self) -> Result<crate::reactive::QueryWatcher> {
364 let collection = self.collection.clone();
365 let filters = self.filters.clone();
366 let db_clone = self.db.clone();
367 let debounce_duration = self.debounce_duration;
368
369 let initial_results = self.collect().await?;
370 let listener = db_clone.pubsub.listen(&collection);
371 let state = Arc::new(crate::reactive::ReactiveQueryState::new(filters));
372
373 Ok(crate::reactive::QueryWatcher::new(
374 Arc::new(db_clone),
375 collection,
376 listener,
377 state,
378 initial_results,
379 debounce_duration,
380 ))
381 }
382}
383
384pub struct SearchBuilder<'a> {
386 db: &'a Aurora,
387 collection: String,
388 query: String,
389 limit: Option<usize>,
390 fuzzy: bool,
391 distance: u8,
392 search_fields: Option<Vec<String>>,
393}
394
395fn fuzzy_score(doc: &Document, query_tokens: &[&str], max_dist: usize, fields: Option<&[String]>) -> f32 {
398 let mut score = 0.0f32;
399 for (field, value) in &doc.data {
400 if let Some(allowed) = fields {
401 if !allowed.contains(field) { continue; }
402 }
403 if let crate::types::Value::String(text) = value {
404 let doc_tokens: Vec<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
405 for q in query_tokens {
406 for d in &doc_tokens {
407 let dist = crate::search::levenshtein_distance(q, d);
408 if dist <= max_dist {
409 score += 1.0 / (1.0 + dist as f32 * 0.3);
411 }
412 }
413 }
414 }
415 }
416 score
417}
418
419impl<'a> SearchBuilder<'a> {
420 pub fn new(db: &'a Aurora, collection: &str) -> Self {
421 Self {
422 db,
423 collection: collection.to_string(),
424 query: String::new(),
425 limit: None,
426 fuzzy: false,
427 distance: 0,
428 search_fields: None,
429 }
430 }
431
432 pub fn query(mut self, query: &str) -> Self {
433 self.query = query.to_string();
434 self
435 }
436
437 pub fn limit(mut self, limit: usize) -> Self {
438 self.limit = Some(limit);
439 self
440 }
441
442 pub fn fuzzy(mut self, distance: u8) -> Self {
443 self.fuzzy = true;
444 self.distance = distance;
445 self
446 }
447
448 pub fn fields(mut self, fields: Vec<String>) -> Self {
450 self.search_fields = Some(fields);
451 self
452 }
453
454 pub async fn collect_with_fields(self, fields: Option<&[String]>) -> Result<Vec<Document>> {
456 let builder = if let Some(f) = fields {
457 Self { search_fields: Some(f.to_vec()), ..self }
458 } else {
459 self
460 };
461 builder.collect().await
462 }
463
464 pub async fn collect(self) -> Result<Vec<Document>> {
465 let query = self.query.to_lowercase();
466 let mut results = Vec::new();
467
468 if let Some(index) = self.db.primary_indices.get(&self.collection) {
469 if self.fuzzy && !query.is_empty() {
470 let query_tokens: Vec<&str> = query.split_whitespace().collect();
472 let max_dist = self.distance as usize;
473 let fields = self.search_fields.as_deref();
474 let mut scored: Vec<(f32, Document)> = Vec::new();
475
476 for entry in index.iter() {
477 if let Some(data) = self.db.get(entry.key())? {
478 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
479 let score = fuzzy_score(&doc, &query_tokens, max_dist, fields);
480 if score > 0.0 {
481 scored.push((score, doc));
482 }
483 }
484 }
485 }
486
487 scored.sort_by(|(a, _), (b, _)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
488 for (_, doc) in scored {
489 results.push(doc);
490 if let Some(l) = self.limit {
491 if results.len() >= l { break; }
492 }
493 }
494 } else {
495 for entry in index.iter() {
497 if let Some(data) = self.db.get(entry.key())? {
498 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
499 let matches = if query.is_empty() {
500 true
501 } else {
502 let fields_to_check = self.search_fields.as_deref();
503 doc.data.iter().any(|(k, v)| {
504 if let Some(ref allowed) = fields_to_check {
505 if !allowed.contains(k) { return false; }
506 }
507 if let crate::types::Value::String(s) = v {
508 s.to_lowercase().contains(&query)
509 } else {
510 false
511 }
512 })
513 };
514 if matches {
515 results.push(doc);
516 if let Some(l) = self.limit {
517 if results.len() >= l { break; }
518 }
519 }
520 }
521 }
522 }
523 }
524 }
525
526 Ok(results)
527 }
528}
529
530
531fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
532 match (a, b) {
533 (None, None) => std::cmp::Ordering::Equal,
534 (None, Some(_)) => std::cmp::Ordering::Less,
535 (Some(_), None) => std::cmp::Ordering::Greater,
536 (Some(v1), Some(v2)) => v1.partial_cmp(v2).unwrap_or(std::cmp::Ordering::Equal),
537 }
538}
539
540#[derive(Debug, Clone, Serialize, Deserialize)]
542pub enum Filter {
543 Eq(String, Value),
544 Ne(String, Value),
545 Gt(String, Value),
546 Gte(String, Value),
547 Lt(String, Value),
548 Lte(String, Value),
549 In(String, Vec<Value>),
550 Contains(String, String),
551 StartsWith(String, String),
552 IsNull(String),
553 IsNotNull(String),
554 Not(Box<Filter>),
555 And(Vec<Filter>),
556 Or(Vec<Filter>),
557}
558
559fn get_nested<'a>(doc: &'a Document, field: &str) -> Option<&'a Value> {
562 let mut parts = field.splitn(2, '.');
563 let first = parts.next()?;
564 let rest = parts.next();
565 let val = doc.data.get(first)?;
566 match rest {
567 None => Some(val),
568 Some(remaining) => get_nested_value(val, remaining),
569 }
570}
571
572fn get_field_owned(doc: &Document, field: &str) -> Option<Value> {
575 if field == "_sid" {
576 Some(Value::String(doc._sid.clone()))
577 } else {
578 get_nested(doc, field).cloned()
579 }
580}
581
582fn get_nested_value<'a>(val: &'a Value, path: &str) -> Option<&'a Value> {
583 let mut parts = path.splitn(2, '.');
584 let first = parts.next()?;
585 let rest = parts.next();
586 if let Value::Object(map) = val {
587 let child = map.get(first)?;
588 match rest {
589 None => Some(child),
590 Some(remaining) => get_nested_value(child, remaining),
591 }
592 } else {
593 None
594 }
595}
596
597impl std::ops::Not for Filter {
598 type Output = Self;
599 fn not(self) -> Self::Output {
600 Filter::Not(Box::new(self))
601 }
602}
603
604impl Filter {
605 pub fn matches(&self, doc: &Document) -> bool {
606 match self {
607 Filter::Eq(f, v) => get_field_owned(doc, f).as_ref() == Some(v),
608 Filter::Ne(f, v) => get_field_owned(doc, f).as_ref() != Some(v),
609 Filter::Gt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv > *v),
610 Filter::Gte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv >= *v),
611 Filter::Lt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv < *v),
612 Filter::Lte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv <= *v),
613 Filter::In(f, v) => get_field_owned(doc, f).map_or(false, |dv| v.contains(&dv)),
614 Filter::Contains(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
615 if let Value::String(s) = dv { s.contains(v.as_str()) } else { false }
616 }),
617 Filter::StartsWith(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
618 if let Value::String(s) = dv { s.starts_with(v.as_str()) } else { false }
619 }),
620 Filter::IsNull(f) => get_field_owned(doc, f).map_or(true, |v| matches!(v, Value::Null)),
621 Filter::IsNotNull(f) => get_field_owned(doc, f).map_or(false, |v| !matches!(v, Value::Null)),
622 Filter::Not(f) => !f.matches(doc),
623 Filter::And(fs) => fs.iter().all(|f| f.matches(doc)),
624 Filter::Or(fs) => fs.iter().any(|f| f.matches(doc)),
625 }
626 }
627}
628
629impl std::ops::BitAnd for Filter {
630 type Output = Filter;
631 fn bitand(self, rhs: Self) -> Self::Output {
632 match (self, rhs) {
633 (Filter::And(mut a), Filter::And(mut b)) => { a.append(&mut b); Filter::And(a) }
634 (Filter::And(mut a), b) => { a.push(b); Filter::And(a) }
635 (a, Filter::And(mut b)) => { b.insert(0, a); Filter::And(b) }
636 (a, b) => Filter::And(vec![a, b]),
637 }
638 }
639}
640
641impl std::ops::BitOr for Filter {
642 type Output = Filter;
643 fn bitor(self, rhs: Self) -> Self::Output {
644 match (self, rhs) {
645 (Filter::Or(mut a), Filter::Or(mut b)) => { a.append(&mut b); Filter::Or(a) }
646 (Filter::Or(mut a), b) => { a.push(b); Filter::Or(a) }
647 (a, Filter::Or(mut b)) => { b.insert(0, a); Filter::Or(b) }
648 (a, b) => Filter::Or(vec![a, b]),
649 }
650 }
651}