1use crate::Aurora;
7use crate::error::Result;
8use crate::types::{Document, Value};
9use std::collections::HashMap;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use roaring::RoaringBitmap;
13
14#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
15pub struct SimpleQueryBuilder {
16 pub collection: String,
17 pub filters: Vec<Filter>,
18 pub order_by: Option<(String, bool)>,
19 pub limit: Option<usize>,
20 pub offset: Option<usize>,
21}
22
23pub struct QueryBuilder<'a> {
25 db: &'a Aurora,
26 collection: String,
27 filters: Vec<Filter>,
28 order_by: Option<(String, bool)>,
29 limit: Option<usize>,
30 offset: Option<usize>,
31 fields: Option<Vec<String>>,
32 debounce_duration: Option<std::time::Duration>,
33}
34
35pub struct FilterBuilder;
37
38impl FilterBuilder {
39 pub fn new() -> Self {
40 Self
41 }
42
43 pub fn eq<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
44 Filter::Eq(field.to_string(), value.into())
45 }
46
47 pub fn ne<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
48 Filter::Ne(field.to_string(), value.into())
49 }
50
51 pub fn in_values<T: Into<Value> + Clone>(&self, field: &str, values: &[T]) -> Filter {
52 Filter::In(field.to_string(), values.iter().cloned().map(|v| v.into()).collect())
53 }
54
55 pub fn starts_with(&self, field: &str, value: &str) -> Filter {
56 Filter::StartsWith(field.to_string(), value.to_string())
57 }
58
59 pub fn contains(&self, field: &str, value: &str) -> Filter {
60 Filter::Contains(field.to_string(), value.to_string())
61 }
62
63 pub fn gt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
64 Filter::Gt(field.to_string(), value.into())
65 }
66
67 pub fn gte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
68 Filter::Gte(field.to_string(), value.into())
69 }
70
71 pub fn lt<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
72 Filter::Lt(field.to_string(), value.into())
73 }
74
75 pub fn lte<T: Into<Value>>(&self, field: &str, value: T) -> Filter {
76 Filter::Lte(field.to_string(), value.into())
77 }
78
79 pub fn in_vec<T: Into<Value>>(&self, field: &str, values: Vec<T>) -> Filter {
80 Filter::In(field.to_string(), values.into_iter().map(|v| v.into()).collect())
81 }
82
83 pub fn between<T: Into<Value> + Clone>(&self, field: &str, min: T, max: T) -> Filter {
84 Filter::And(vec![
85 Filter::Gte(field.to_string(), min.into()),
86 Filter::Lte(field.to_string(), max.into()),
87 ])
88 }
89}
90
91impl<'a> QueryBuilder<'a> {
92 pub fn new(db: &'a Aurora, collection: &str) -> Self {
93 Self {
94 db,
95 collection: collection.to_string(),
96 filters: Vec::new(),
97 order_by: None,
98 limit: None,
99 offset: None,
100 fields: None,
101 debounce_duration: None,
102 }
103 }
104
105 pub fn filter<F>(mut self, f: F) -> Self
106 where
107 F: FnOnce(&FilterBuilder) -> Filter,
108 {
109 let builder = FilterBuilder::new();
110 self.filters.push(f(&builder));
111 self
112 }
113
114 pub fn order_by(mut self, field: &str, ascending: bool) -> Self {
115 self.order_by = Some((field.to_string(), ascending));
116 self
117 }
118
119 pub fn limit(mut self, limit: usize) -> Self {
120 self.limit = Some(limit);
121 self
122 }
123
124 pub fn offset(mut self, offset: usize) -> Self {
125 self.offset = Some(offset);
126 self
127 }
128
129 pub fn select(mut self, fields: Vec<&str>) -> Self {
130 self.fields = Some(fields.into_iter().map(|s| s.to_string()).collect());
131 self
132 }
133
134 pub fn debounce(mut self, duration: std::time::Duration) -> Self {
135 self.debounce_duration = Some(duration);
136 self
137 }
138
139 pub async fn first_one(self) -> Result<Option<Document>> {
140 let docs = self.limit(1).collect().await?;
141 Ok(docs.into_iter().next())
142 }
143
144 pub async fn collect(self) -> Result<Vec<Document>> {
147 self.db.ensure_indices_initialized().await?;
148
149 let mut candidate_bitmap: Option<RoaringBitmap> = None;
151
152 for filter in &self.filters {
153 if let Filter::Eq(field, value) = filter {
154 let index_key = format!("{}:{}", self.collection, field);
156 let val_str = match value {
157 Value::String(s) => s.clone(),
158 _ => value.to_string(),
159 };
160 let full_key = format!("{}:{}:{}", self.collection, field, val_str);
161
162 let mut current_bitmap = RoaringBitmap::new();
163 let mut found = false;
164
165 if let Some(loc) = self.db.index_manifest.get(&full_key) {
167 let (offset, len) = *loc.value();
168 if let Ok(guard) = self.db.mmap_index.read() {
169 if let Some(mmap) = guard.as_ref() {
170 if offset + len <= mmap.len() {
171 let bytes = &mmap[offset..(offset + len)];
172 if let Ok(cold_bitmap) = RoaringBitmap::deserialize_from(bytes) {
173 current_bitmap |= cold_bitmap;
174 found = true;
175 }
176 }
177 }
178 }
179 }
180
181 if let Some(storage_arc) = self.db.get_indexed_storage(&index_key, &val_str) {
183 if let Ok(storage) = storage_arc.read() {
184 current_bitmap |= storage.to_bitmap();
185 found = true;
186 }
187 }
188
189 if !found {
190 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
194 .try_with(|id| *id)
195 .ok()
196 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
197 .is_some();
198
199 if !in_transaction && self.db.has_index_key(&index_key) {
200 return Ok(vec![]);
201 }
202 candidate_bitmap = None;
204 break;
205 }
206
207 if let Some(ref mut existing) = candidate_bitmap {
208 *existing &= current_bitmap; } else {
210 candidate_bitmap = Some(current_bitmap);
211 }
212
213 if let Some(ref b) = candidate_bitmap {
216 if b.is_empty() {
217 let in_transaction = crate::transaction::ACTIVE_TRANSACTION_ID
218 .try_with(|id| *id)
219 .ok()
220 .and_then(|id| self.db.transaction_manager.active_transactions.get(&id))
221 .is_some();
222 if !in_transaction {
223 return Ok(vec![]);
224 }
225 }
226 }
227 }
228 }
229
230 let mut docs = if let Some(bitmap) = candidate_bitmap {
231 let id_only = self.fields.as_ref().map(|f| f.len() == 1 && f[0] == "id").unwrap_or(false);
233
234 let tx_id = crate::transaction::ACTIVE_TRANSACTION_ID
236 .try_with(|id| *id)
237 .ok();
238
239 let tx_buffer = tx_id.and_then(|id| self.db.transaction_manager.active_transactions.get(&id));
240
241 let mut final_docs = Vec::with_capacity(bitmap.len() as usize);
243 for internal_id in bitmap {
244 if let Some(external_id) = self.db.get_external_id(internal_id) {
245 if let Some(ref buffer) = tx_buffer {
247 let key = format!("{}:{}", self.collection, external_id);
248 if buffer.deletes.contains_key(&key) {
249 continue;
250 }
251 }
252
253 if id_only && self.filters.is_empty() {
254 final_docs.push(Document { id: external_id, data: HashMap::new() });
256 continue;
257 }
258
259 if let Ok(Some(doc)) = self.db.get_document(&self.collection, &external_id) {
260 if self.filters.iter().all(|f| f.matches(&doc)) {
262 final_docs.push(doc);
263 }
264 }
265 }
266 }
267
268 if let Some(buffer) = tx_buffer {
270 let prefix = format!("{}:", self.collection);
271 for item in buffer.writes.iter() {
272 let key: &String = item.key();
273 if let Some(external_id) = key.strip_prefix(&prefix) {
274
275 if final_docs.iter().any(|d| d.id == external_id) {
277 continue;
278 }
279
280 let data: &Vec<u8> = item.value();
281 if let Ok(doc) = self.db.deserialize_internal::<Document>(data) {
282 if self.filters.iter().all(|f| f.matches(&doc)) {
283 final_docs.push(doc);
284 }
285 }
286 }
287 }
288 }
289
290 final_docs
291 } else {
292 let scan_limit = if self.order_by.is_none() {
297 self.limit.map(|l| l + self.offset.unwrap_or(0))
298 } else {
299 None
300 };
301
302 let db_filters = self.filters.clone();
303 self.db.scan_and_filter(&self.collection, move |doc| {
304 db_filters.iter().all(|f| f.matches(doc))
305 }, scan_limit)?
306 };
307
308 if let Some((field, ascending)) = self.order_by {
310 docs.sort_by(|a, b| {
311 let v1 = a.data.get(&field);
312 let v2 = b.data.get(&field);
313 let ord = compare_values(v1, v2);
314 if ascending { ord } else { ord.reverse() }
315 });
316 }
317
318 let mut start = self.offset.unwrap_or(0);
320 if start > docs.len() { start = docs.len(); }
321 let mut end = docs.len();
322 if let Some(max) = self.limit {
323 if start + max < end { end = start + max; }
324 }
325
326 let mut result = docs[start..end].to_vec();
327
328 if let Ok(computed) = self.db.computed.read() {
330 for doc in &mut result {
331 let _ = computed.apply(&self.collection, doc);
332 }
333 }
334
335 if let Some(ref fields) = self.fields {
337 let field_set: std::collections::HashSet<&str> =
338 fields.iter().map(|s| s.as_str()).collect();
339 for doc in &mut result {
340 doc.data.retain(|k, _| field_set.contains(k.as_str()));
341 }
342 }
343
344 Ok(result)
345 }
346
347 pub async fn count(self) -> Result<usize> {
348 let results = self.collect().await?;
349 Ok(results.len())
350 }
351
352 pub async fn delete(self) -> Result<usize> {
353 let db = self.db;
354 let collection = self.collection.clone();
355 let docs = self.collect().await?;
356 let count = docs.len();
357 for doc in docs {
358 let _ = db.aql_delete_document(&collection, &doc.id).await;
359 }
360 Ok(count)
361 }
362
363 pub async fn watch(self) -> Result<crate::reactive::QueryWatcher> {
364 let collection = self.collection.clone();
365 let filters = self.filters.clone();
366 let db_clone = self.db.clone();
367 let debounce_duration = self.debounce_duration;
368
369 let initial_results = self.collect().await?;
370 let listener = db_clone.pubsub.listen(&collection);
371 let state = Arc::new(crate::reactive::ReactiveQueryState::new(filters));
372
373 Ok(crate::reactive::QueryWatcher::new(
374 Arc::new(db_clone),
375 collection,
376 listener,
377 state,
378 initial_results,
379 debounce_duration,
380 ))
381 }
382}
383
384pub struct SearchBuilder<'a> {
386 db: &'a Aurora,
387 collection: String,
388 query: String,
389 limit: Option<usize>,
390 fuzzy: bool,
391 distance: u8,
392 search_fields: Option<Vec<String>>,
393}
394
395impl<'a> SearchBuilder<'a> {
396 pub fn new(db: &'a Aurora, collection: &str) -> Self {
397 Self {
398 db,
399 collection: collection.to_string(),
400 query: String::new(),
401 limit: None,
402 fuzzy: false,
403 distance: 0,
404 search_fields: None,
405 }
406 }
407
408 pub fn query(mut self, query: &str) -> Self {
409 self.query = query.to_string();
410 self
411 }
412
413 pub fn limit(mut self, limit: usize) -> Self {
414 self.limit = Some(limit);
415 self
416 }
417
418 pub fn fuzzy(mut self, distance: u8) -> Self {
419 self.fuzzy = true;
420 self.distance = distance;
421 self
422 }
423
424 pub fn fields(mut self, fields: Vec<String>) -> Self {
426 self.search_fields = Some(fields);
427 self
428 }
429
430 pub async fn collect_with_fields(self, fields: Option<&[String]>) -> Result<Vec<Document>> {
432 let builder = if let Some(f) = fields {
433 Self { search_fields: Some(f.to_vec()), ..self }
434 } else {
435 self
436 };
437 builder.collect().await
438 }
439
440 pub async fn collect(self) -> Result<Vec<Document>> {
441 let query = self.query.to_lowercase();
442 let mut results = Vec::new();
443
444 if let Some(index) = self.db.primary_indices.get(&self.collection) {
445 for entry in index.iter() {
446 if let Some(data) = self.db.get(entry.key())? {
447 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
448 let matches = if query.is_empty() {
449 true
450 } else {
451 let fields_to_check = self.search_fields.as_deref();
452 doc.data.iter().any(|(k, v)| {
453 if let Some(ref allowed) = fields_to_check {
454 if !allowed.contains(k) {
455 return false;
456 }
457 }
458 if let crate::types::Value::String(s) = v {
459 s.to_lowercase().contains(&query)
460 } else {
461 false
462 }
463 })
464 };
465 if matches {
466 results.push(doc);
467 if let Some(l) = self.limit {
468 if results.len() >= l {
469 break;
470 }
471 }
472 }
473 }
474 }
475 }
476 }
477
478 Ok(results)
479 }
480}
481
482
483fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
484 match (a, b) {
485 (None, None) => std::cmp::Ordering::Equal,
486 (None, Some(_)) => std::cmp::Ordering::Less,
487 (Some(_), None) => std::cmp::Ordering::Greater,
488 (Some(v1), Some(v2)) => v1.partial_cmp(v2).unwrap_or(std::cmp::Ordering::Equal),
489 }
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize)]
494pub enum Filter {
495 Eq(String, Value),
496 Ne(String, Value),
497 Gt(String, Value),
498 Gte(String, Value),
499 Lt(String, Value),
500 Lte(String, Value),
501 In(String, Vec<Value>),
502 Contains(String, String),
503 StartsWith(String, String),
504 IsNull(String),
505 IsNotNull(String),
506 Not(Box<Filter>),
507 And(Vec<Filter>),
508 Or(Vec<Filter>),
509}
510
511fn get_nested<'a>(doc: &'a Document, field: &str) -> Option<&'a Value> {
514 let mut parts = field.splitn(2, '.');
515 let first = parts.next()?;
516 let rest = parts.next();
517 let val = doc.data.get(first)?;
518 match rest {
519 None => Some(val),
520 Some(remaining) => get_nested_value(val, remaining),
521 }
522}
523
524fn get_field_owned(doc: &Document, field: &str) -> Option<Value> {
527 if field == "id" && !doc.data.contains_key("id") {
528 Some(Value::String(doc.id.clone()))
529 } else {
530 get_nested(doc, field).cloned()
531 }
532}
533
534fn get_nested_value<'a>(val: &'a Value, path: &str) -> Option<&'a Value> {
535 let mut parts = path.splitn(2, '.');
536 let first = parts.next()?;
537 let rest = parts.next();
538 if let Value::Object(map) = val {
539 let child = map.get(first)?;
540 match rest {
541 None => Some(child),
542 Some(remaining) => get_nested_value(child, remaining),
543 }
544 } else {
545 None
546 }
547}
548
549impl std::ops::Not for Filter {
550 type Output = Self;
551 fn not(self) -> Self::Output {
552 Filter::Not(Box::new(self))
553 }
554}
555
556impl Filter {
557 pub fn matches(&self, doc: &Document) -> bool {
558 match self {
559 Filter::Eq(f, v) => get_field_owned(doc, f).as_ref() == Some(v),
560 Filter::Ne(f, v) => get_field_owned(doc, f).as_ref() != Some(v),
561 Filter::Gt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv > *v),
562 Filter::Gte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv >= *v),
563 Filter::Lt(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv < *v),
564 Filter::Lte(f, v) => get_field_owned(doc, f).map_or(false, |dv| dv <= *v),
565 Filter::In(f, v) => get_field_owned(doc, f).map_or(false, |dv| v.contains(&dv)),
566 Filter::Contains(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
567 if let Value::String(s) = dv { s.contains(v.as_str()) } else { false }
568 }),
569 Filter::StartsWith(f, v) => get_field_owned(doc, f).map_or(false, |dv| {
570 if let Value::String(s) = dv { s.starts_with(v.as_str()) } else { false }
571 }),
572 Filter::IsNull(f) => get_field_owned(doc, f).map_or(true, |v| matches!(v, Value::Null)),
573 Filter::IsNotNull(f) => get_field_owned(doc, f).map_or(false, |v| !matches!(v, Value::Null)),
574 Filter::Not(f) => !f.matches(doc),
575 Filter::And(fs) => fs.iter().all(|f| f.matches(doc)),
576 Filter::Or(fs) => fs.iter().any(|f| f.matches(doc)),
577 }
578 }
579}
580
581impl std::ops::BitAnd for Filter {
582 type Output = Filter;
583 fn bitand(self, rhs: Self) -> Self::Output {
584 match (self, rhs) {
585 (Filter::And(mut a), Filter::And(mut b)) => { a.append(&mut b); Filter::And(a) }
586 (Filter::And(mut a), b) => { a.push(b); Filter::And(a) }
587 (a, Filter::And(mut b)) => { b.insert(0, a); Filter::And(b) }
588 (a, b) => Filter::And(vec![a, b]),
589 }
590 }
591}
592
593impl std::ops::BitOr for Filter {
594 type Output = Filter;
595 fn bitor(self, rhs: Self) -> Self::Output {
596 match (self, rhs) {
597 (Filter::Or(mut a), Filter::Or(mut b)) => { a.append(&mut b); Filter::Or(a) }
598 (Filter::Or(mut a), b) => { a.push(b); Filter::Or(a) }
599 (a, Filter::Or(mut b)) => { b.insert(0, a); Filter::Or(b) }
600 (a, b) => Filter::Or(vec![a, b]),
601 }
602 }
603}