1use super::automerge_store::AutomergeStore;
20use anyhow::Result;
21use automerge::{Automerge, ReadDoc};
22use std::collections::HashSet;
23use std::sync::Arc;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum SortOrder {
28 #[default]
30 Asc,
31 Desc,
33}
34
35#[derive(Debug, Clone, PartialEq)]
39pub enum Value {
40 Null,
42 Bool(bool),
44 Int(i64),
46 Uint(u64),
48 Float(f64),
50 String(String),
52 Timestamp(i64),
54}
55
56impl PartialOrd for Value {
57 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58 use std::cmp::Ordering;
59 match (self, other) {
60 (Value::Null, Value::Null) => Some(Ordering::Equal),
61 (Value::Bool(a), Value::Bool(b)) => a.partial_cmp(b),
62 (Value::Int(a), Value::Int(b)) => a.partial_cmp(b),
63 (Value::Uint(a), Value::Uint(b)) => a.partial_cmp(b),
64 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
65 (Value::String(a), Value::String(b)) => a.partial_cmp(b),
66 (Value::Timestamp(a), Value::Timestamp(b)) => a.partial_cmp(b),
67 (Value::Int(a), Value::Uint(b)) => (*a as f64).partial_cmp(&(*b as f64)),
69 (Value::Uint(a), Value::Int(b)) => (*a as f64).partial_cmp(&(*b as f64)),
70 (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
71 (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
72 (Value::Uint(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
73 (Value::Float(a), Value::Uint(b)) => a.partial_cmp(&(*b as f64)),
74 _ => None,
76 }
77 }
78}
79
80type Predicate = Box<dyn Fn(&Automerge) -> bool + Send + Sync>;
82
83pub struct Query {
87 store: Arc<AutomergeStore>,
88 collection_name: String,
89 predicates: Vec<Predicate>,
90 sort_field: Option<(String, SortOrder)>,
91 limit: Option<usize>,
92 offset: usize,
93 doc_id_filter: Option<HashSet<String>>,
94}
95
96impl Query {
97 pub fn new(store: Arc<AutomergeStore>, collection_name: &str) -> Self {
104 Self {
105 store,
106 collection_name: collection_name.to_string(),
107 predicates: Vec::new(),
108 sort_field: None,
109 limit: None,
110 offset: 0,
111 doc_id_filter: None,
112 }
113 }
114
115 pub fn where_eq(mut self, field: &str, value: Value) -> Self {
123 let field = field.to_string();
124 self.predicates.push(Box::new(move |doc| {
125 extract_field(doc, &field)
126 .map(|v| v == value)
127 .unwrap_or(false)
128 }));
129 self
130 }
131
132 pub fn where_gt(mut self, field: &str, value: Value) -> Self {
134 let field = field.to_string();
135 self.predicates.push(Box::new(move |doc| {
136 extract_field(doc, &field)
137 .and_then(|v| v.partial_cmp(&value))
138 .map(|ord| ord == std::cmp::Ordering::Greater)
139 .unwrap_or(false)
140 }));
141 self
142 }
143
144 pub fn where_lt(mut self, field: &str, value: Value) -> Self {
146 let field = field.to_string();
147 self.predicates.push(Box::new(move |doc| {
148 extract_field(doc, &field)
149 .and_then(|v| v.partial_cmp(&value))
150 .map(|ord| ord == std::cmp::Ordering::Less)
151 .unwrap_or(false)
152 }));
153 self
154 }
155
156 pub fn where_gte(mut self, field: &str, value: Value) -> Self {
158 let field = field.to_string();
159 self.predicates.push(Box::new(move |doc| {
160 extract_field(doc, &field)
161 .and_then(|v| v.partial_cmp(&value))
162 .map(|ord| ord != std::cmp::Ordering::Less)
163 .unwrap_or(false)
164 }));
165 self
166 }
167
168 pub fn where_lte(mut self, field: &str, value: Value) -> Self {
170 let field = field.to_string();
171 self.predicates.push(Box::new(move |doc| {
172 extract_field(doc, &field)
173 .and_then(|v| v.partial_cmp(&value))
174 .map(|ord| ord != std::cmp::Ordering::Greater)
175 .unwrap_or(false)
176 }));
177 self
178 }
179
180 pub fn where_contains(mut self, field: &str, value: Value) -> Self {
188 let field = field.to_string();
189 self.predicates.push(Box::new(move |doc| {
190 extract_array_contains(doc, &field, &value)
191 }));
192 self
193 }
194
195 pub fn filter_by_ids(mut self, ids: &[String]) -> Self {
206 self.doc_id_filter = Some(ids.iter().cloned().collect());
207 self
208 }
209
210 pub fn order_by(mut self, field: &str, order: SortOrder) -> Self {
218 self.sort_field = Some((field.to_string(), order));
219 self
220 }
221
222 pub fn limit(mut self, n: usize) -> Self {
224 self.limit = Some(n);
225 self
226 }
227
228 pub fn offset(mut self, n: usize) -> Self {
230 self.offset = n;
231 self
232 }
233
234 pub fn execute(self) -> Result<Vec<(String, Automerge)>> {
238 let prefix = format!("{}:", self.collection_name);
239 let all_docs = self.store.scan_prefix(&prefix)?;
240
241 let filtered_by_id: Vec<(String, Automerge)> =
243 if let Some(ref id_filter) = self.doc_id_filter {
244 all_docs
245 .into_iter()
246 .filter(|(key, _)| {
247 key.strip_prefix(&prefix)
248 .map(|doc_id| id_filter.contains(doc_id))
249 .unwrap_or(false)
250 })
251 .collect()
252 } else {
253 all_docs
254 };
255
256 let mut results: Vec<(String, Automerge)> = filtered_by_id
258 .into_iter()
259 .filter(|(_, doc)| self.predicates.iter().all(|pred| pred(doc)))
260 .map(|(key, doc)| {
261 let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
262 (doc_id, doc)
263 })
264 .collect();
265
266 if let Some((field, order)) = &self.sort_field {
268 results.sort_by(|(_, a), (_, b)| {
269 let val_a = extract_field(a, field);
270 let val_b = extract_field(b, field);
271 let cmp = match (val_a, val_b) {
272 (Some(a), Some(b)) => a.partial_cmp(&b).unwrap_or(std::cmp::Ordering::Equal),
273 (Some(_), None) => std::cmp::Ordering::Less,
274 (None, Some(_)) => std::cmp::Ordering::Greater,
275 (None, None) => std::cmp::Ordering::Equal,
276 };
277 match order {
278 SortOrder::Asc => cmp,
279 SortOrder::Desc => cmp.reverse(),
280 }
281 });
282 }
283
284 let results: Vec<(String, Automerge)> = results
286 .into_iter()
287 .skip(self.offset)
288 .take(self.limit.unwrap_or(usize::MAX))
289 .collect();
290
291 Ok(results)
292 }
293
294 pub fn execute_ids(self) -> Result<Vec<String>> {
296 let results = self.execute()?;
297 Ok(results.into_iter().map(|(id, _)| id).collect())
298 }
299
300 pub fn execute_json(self) -> Result<Vec<(String, serde_json::Value)>> {
302 let results = self.execute()?;
303 Ok(results
304 .into_iter()
305 .map(|(id, doc)| (id, super::json_convert::automerge_to_json(&doc)))
306 .collect())
307 }
308
309 pub fn count(self) -> Result<usize> {
311 Ok(self.execute()?.len())
313 }
314}
315
316pub fn extract_field(doc: &Automerge, field: &str) -> Option<Value> {
320 let parts: Vec<&str> = field.split('.').collect();
321 extract_field_recursive(doc, automerge::ROOT, &parts)
322}
323
324fn extract_field_recursive(
325 doc: &Automerge,
326 obj_id: automerge::ObjId,
327 parts: &[&str],
328) -> Option<Value> {
329 if parts.is_empty() {
330 return None;
331 }
332
333 let field_name = parts[0];
334 let remaining = &parts[1..];
335
336 match doc.get(&obj_id, field_name) {
337 Ok(Some((value, _))) => {
338 if remaining.is_empty() {
339 automerge_value_to_query_value(&value)
341 } else {
342 match value {
344 automerge::Value::Object(obj_type) => {
345 if matches!(obj_type, automerge::ObjType::Map) {
346 if let Ok(Some((automerge::Value::Object(_), nested_obj_id))) =
347 doc.get(&obj_id, field_name)
348 {
349 return extract_field_recursive(doc, nested_obj_id, remaining);
350 }
351 }
352 None
353 }
354 _ => None,
355 }
356 }
357 }
358 Ok(None) => None,
359 Err(_) => None,
360 }
361}
362
363fn automerge_value_to_query_value(value: &automerge::Value) -> Option<Value> {
365 match value {
366 automerge::Value::Scalar(scalar) => match scalar.as_ref() {
367 automerge::ScalarValue::Null => Some(Value::Null),
368 automerge::ScalarValue::Boolean(b) => Some(Value::Bool(*b)),
369 automerge::ScalarValue::Int(i) => Some(Value::Int(*i)),
370 automerge::ScalarValue::Uint(u) => Some(Value::Uint(*u)),
371 automerge::ScalarValue::F64(f) => Some(Value::Float(*f)),
372 automerge::ScalarValue::Str(s) => Some(Value::String(s.to_string())),
373 automerge::ScalarValue::Timestamp(t) => Some(Value::Timestamp(*t)),
374 automerge::ScalarValue::Bytes(_) => None, automerge::ScalarValue::Counter(_) => None,
376 automerge::ScalarValue::Unknown { .. } => None,
377 },
378 automerge::Value::Object(_) => None, }
380}
381
382fn extract_array_contains(doc: &Automerge, field: &str, target: &Value) -> bool {
384 let parts: Vec<&str> = field.split('.').collect();
385 extract_array_contains_recursive(doc, automerge::ROOT, &parts, target)
386}
387
388fn extract_array_contains_recursive(
389 doc: &Automerge,
390 obj_id: automerge::ObjId,
391 parts: &[&str],
392 target: &Value,
393) -> bool {
394 if parts.is_empty() {
395 return false;
396 }
397
398 let field_name = parts[0];
399 let remaining = &parts[1..];
400
401 match doc.get(&obj_id, field_name) {
402 Ok(Some((value, obj_id_ref))) => {
403 if remaining.is_empty() {
404 match value {
406 automerge::Value::Object(automerge::ObjType::List) => {
407 let len = doc.length(&obj_id_ref);
409 for idx in 0..len {
410 if let Ok(Some((elem_val, _))) = doc.get(&obj_id_ref, idx) {
411 if let Some(query_val) = automerge_value_to_query_value(&elem_val) {
412 if &query_val == target {
413 return true;
414 }
415 }
416 }
417 }
418 false
419 }
420 _ => false,
421 }
422 } else {
423 match value {
425 automerge::Value::Object(automerge::ObjType::Map) => {
426 extract_array_contains_recursive(doc, obj_id_ref, remaining, target)
427 }
428 _ => false,
429 }
430 }
431 }
432 _ => false,
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use automerge::transaction::Transactable;
440 use tempfile::TempDir;
441
442 fn create_test_store() -> (Arc<AutomergeStore>, TempDir) {
443 let temp_dir = TempDir::new().unwrap();
444 let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
445 (store, temp_dir)
446 }
447
448 fn create_test_doc(fields: Vec<(&str, automerge::ScalarValue)>) -> Automerge {
449 let mut doc = Automerge::new();
450 doc.transact(|tx| {
451 for (key, value) in fields {
452 tx.put(automerge::ROOT, key, value)?;
453 }
454 Ok::<(), automerge::AutomergeError>(())
455 })
456 .unwrap();
457 doc
458 }
459
460 #[test]
461 fn test_value_comparison() {
462 assert!(Value::Int(5) > Value::Int(3));
463 assert!(Value::Float(3.15) > Value::Float(2.72));
464 assert!(Value::String("b".into()) > Value::String("a".into()));
465 assert!(Value::Bool(true) > Value::Bool(false));
466
467 assert!(Value::Int(5) > Value::Uint(3));
469 assert!(Value::Float(5.0) > Value::Int(3));
470 }
471
472 #[test]
473 fn test_extract_field_simple() {
474 let doc = create_test_doc(vec![
475 ("name", automerge::ScalarValue::Str("test".into())),
476 ("count", automerge::ScalarValue::Int(42)),
477 ("active", automerge::ScalarValue::Boolean(true)),
478 ]);
479
480 assert_eq!(
481 extract_field(&doc, "name"),
482 Some(Value::String("test".into()))
483 );
484 assert_eq!(extract_field(&doc, "count"), Some(Value::Int(42)));
485 assert_eq!(extract_field(&doc, "active"), Some(Value::Bool(true)));
486 assert_eq!(extract_field(&doc, "nonexistent"), None);
487 }
488
489 #[test]
490 fn test_where_eq() {
491 let (store, _temp) = create_test_store();
492
493 let doc1 = create_test_doc(vec![
495 ("name", automerge::ScalarValue::Str("alpha".into())),
496 ("operational", automerge::ScalarValue::Boolean(true)),
497 ]);
498 let doc2 = create_test_doc(vec![
499 ("name", automerge::ScalarValue::Str("beta".into())),
500 ("operational", automerge::ScalarValue::Boolean(false)),
501 ]);
502
503 store.put("test:doc1", &doc1).unwrap();
504 store.put("test:doc2", &doc2).unwrap();
505
506 let results = Query::new(store.clone(), "test")
507 .where_eq("operational", Value::Bool(true))
508 .execute()
509 .unwrap();
510
511 assert_eq!(results.len(), 1);
512 assert_eq!(results[0].0, "doc1");
513 }
514
515 #[test]
516 fn test_where_gt_lt() {
517 let (store, _temp) = create_test_store();
518
519 let doc1 = create_test_doc(vec![
520 ("name", automerge::ScalarValue::Str("a".into())),
521 ("score", automerge::ScalarValue::Int(10)),
522 ]);
523 let doc2 = create_test_doc(vec![
524 ("name", automerge::ScalarValue::Str("b".into())),
525 ("score", automerge::ScalarValue::Int(50)),
526 ]);
527 let doc3 = create_test_doc(vec![
528 ("name", automerge::ScalarValue::Str("c".into())),
529 ("score", automerge::ScalarValue::Int(90)),
530 ]);
531
532 store.put("test:doc1", &doc1).unwrap();
533 store.put("test:doc2", &doc2).unwrap();
534 store.put("test:doc3", &doc3).unwrap();
535
536 let results = Query::new(store.clone(), "test")
538 .where_gt("score", Value::Int(30))
539 .execute()
540 .unwrap();
541 assert_eq!(results.len(), 2);
542
543 let results = Query::new(store.clone(), "test")
545 .where_lt("score", Value::Int(60))
546 .execute()
547 .unwrap();
548 assert_eq!(results.len(), 2);
549
550 let results = Query::new(store.clone(), "test")
552 .where_gt("score", Value::Int(20))
553 .where_lt("score", Value::Int(80))
554 .execute()
555 .unwrap();
556 assert_eq!(results.len(), 1);
557 assert_eq!(results[0].0, "doc2");
558 }
559
560 #[test]
561 fn test_order_by() {
562 let (store, _temp) = create_test_store();
563
564 let doc1 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(3))]);
565 let doc2 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(1))]);
566 let doc3 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(2))]);
567
568 store.put("test:a", &doc1).unwrap();
569 store.put("test:b", &doc2).unwrap();
570 store.put("test:c", &doc3).unwrap();
571
572 let results = Query::new(store.clone(), "test")
574 .order_by("priority", SortOrder::Asc)
575 .execute()
576 .unwrap();
577 let priorities: Vec<i64> = results
578 .iter()
579 .filter_map(|(_, doc)| {
580 if let Some(Value::Int(p)) = extract_field(doc, "priority") {
581 Some(p)
582 } else {
583 None
584 }
585 })
586 .collect();
587 assert_eq!(priorities, vec![1, 2, 3]);
588
589 let results = Query::new(store.clone(), "test")
591 .order_by("priority", SortOrder::Desc)
592 .execute()
593 .unwrap();
594 let priorities: Vec<i64> = results
595 .iter()
596 .filter_map(|(_, doc)| {
597 if let Some(Value::Int(p)) = extract_field(doc, "priority") {
598 Some(p)
599 } else {
600 None
601 }
602 })
603 .collect();
604 assert_eq!(priorities, vec![3, 2, 1]);
605 }
606
607 #[test]
608 fn test_limit_offset() {
609 let (store, _temp) = create_test_store();
610
611 for i in 0..10 {
612 let doc = create_test_doc(vec![("index", automerge::ScalarValue::Int(i))]);
613 store.put(&format!("test:doc{}", i), &doc).unwrap();
614 }
615
616 let results = Query::new(store.clone(), "test")
618 .order_by("index", SortOrder::Asc)
619 .limit(3)
620 .execute()
621 .unwrap();
622 assert_eq!(results.len(), 3);
623
624 let results = Query::new(store.clone(), "test")
626 .order_by("index", SortOrder::Asc)
627 .offset(7)
628 .execute()
629 .unwrap();
630 assert_eq!(results.len(), 3);
631
632 let results = Query::new(store.clone(), "test")
634 .order_by("index", SortOrder::Asc)
635 .offset(3)
636 .limit(2)
637 .execute()
638 .unwrap();
639 assert_eq!(results.len(), 2);
640 }
641
642 #[test]
643 fn test_filter_by_ids() {
644 let (store, _temp) = create_test_store();
645
646 for i in 0..5 {
647 let doc = create_test_doc(vec![("value", automerge::ScalarValue::Int(i))]);
648 store.put(&format!("test:doc{}", i), &doc).unwrap();
649 }
650
651 let results = Query::new(store.clone(), "test")
652 .filter_by_ids(&["doc1".to_string(), "doc3".to_string()])
653 .execute()
654 .unwrap();
655
656 assert_eq!(results.len(), 2);
657 let ids: Vec<&str> = results.iter().map(|(id, _)| id.as_str()).collect();
658 assert!(ids.contains(&"doc1"));
659 assert!(ids.contains(&"doc3"));
660 }
661
662 #[test]
663 fn test_where_contains() {
664 let (store, _temp) = create_test_store();
665
666 let mut doc1 = Automerge::new();
668 doc1.transact(|tx| {
669 tx.put(automerge::ROOT, "name", "node1")?;
670 let caps_id =
671 tx.put_object(automerge::ROOT, "capabilities", automerge::ObjType::List)?;
672 tx.insert(&caps_id, 0, "sensor")?;
673 tx.insert(&caps_id, 1, "comms")?;
674 Ok::<(), automerge::AutomergeError>(())
675 })
676 .unwrap();
677
678 let mut doc2 = Automerge::new();
679 doc2.transact(|tx| {
680 tx.put(automerge::ROOT, "name", "node2")?;
681 let caps_id =
682 tx.put_object(automerge::ROOT, "capabilities", automerge::ObjType::List)?;
683 tx.insert(&caps_id, 0, "weapon")?;
684 Ok::<(), automerge::AutomergeError>(())
685 })
686 .unwrap();
687
688 store.put("test:node1", &doc1).unwrap();
689 store.put("test:node2", &doc2).unwrap();
690
691 let results = Query::new(store.clone(), "test")
692 .where_contains("capabilities", Value::String("sensor".into()))
693 .execute()
694 .unwrap();
695
696 assert_eq!(results.len(), 1);
697 assert_eq!(results[0].0, "node1");
698 }
699
700 #[test]
701 fn test_execute_ids() {
702 let (store, _temp) = create_test_store();
703
704 let doc1 = create_test_doc(vec![("active", automerge::ScalarValue::Boolean(true))]);
705 let doc2 = create_test_doc(vec![("active", automerge::ScalarValue::Boolean(true))]);
706
707 store.put("test:a", &doc1).unwrap();
708 store.put("test:b", &doc2).unwrap();
709
710 let ids = Query::new(store.clone(), "test")
711 .where_eq("active", Value::Bool(true))
712 .execute_ids()
713 .unwrap();
714
715 assert_eq!(ids.len(), 2);
716 }
717
718 #[test]
719 fn test_count() {
720 let (store, _temp) = create_test_store();
721
722 for i in 0..5 {
723 let doc = create_test_doc(vec![("value", automerge::ScalarValue::Int(i))]);
724 store.put(&format!("test:doc{}", i), &doc).unwrap();
725 }
726
727 let count = Query::new(store.clone(), "test")
728 .where_gt("value", Value::Int(2))
729 .count()
730 .unwrap();
731
732 assert_eq!(count, 2);
733 }
734}