Skip to main content

peat_mesh/storage/
query.rs

1//! Query engine for Automerge documents
2//!
3//! Provides a fluent builder API for querying Automerge documents stored in RocksDB
4//! with support for field-based filtering, sorting, and pagination.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use peat_mesh::storage::{Query, SortOrder, Value};
10//!
11//! let results = Query::new(store.clone(), "beacons")
12//!     .where_eq("operational", Value::Bool(true))
13//!     .where_gt("fuel_percent", Value::Int(20))
14//!     .order_by("timestamp", SortOrder::Desc)
15//!     .limit(10)
16//!     .execute()?;
17//! ```
18
19use super::automerge_store::AutomergeStore;
20use anyhow::Result;
21use automerge::{Automerge, ReadDoc};
22use std::collections::HashSet;
23use std::sync::Arc;
24
25/// Sort order for query results
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum SortOrder {
28    /// Ascending order (smallest first)
29    #[default]
30    Asc,
31    /// Descending order (largest first)
32    Desc,
33}
34
35/// Query value types for comparisons
36///
37/// Maps to Automerge scalar types for type-safe field comparisons.
38#[derive(Debug, Clone, PartialEq)]
39pub enum Value {
40    /// Null value
41    Null,
42    /// Boolean value
43    Bool(bool),
44    /// Signed integer
45    Int(i64),
46    /// Unsigned integer
47    Uint(u64),
48    /// Floating point
49    Float(f64),
50    /// String value
51    String(String),
52    /// Unix timestamp (seconds since epoch)
53    Timestamp(i64),
54}
55
56impl PartialOrd for Value {
57    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58        use std::cmp::Ordering;
59        match (self, other) {
60            (Value::Null, Value::Null) => Some(Ordering::Equal),
61            (Value::Bool(a), Value::Bool(b)) => a.partial_cmp(b),
62            (Value::Int(a), Value::Int(b)) => a.partial_cmp(b),
63            (Value::Uint(a), Value::Uint(b)) => a.partial_cmp(b),
64            (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
65            (Value::String(a), Value::String(b)) => a.partial_cmp(b),
66            (Value::Timestamp(a), Value::Timestamp(b)) => a.partial_cmp(b),
67            // Cross-type numeric comparisons
68            (Value::Int(a), Value::Uint(b)) => (*a as f64).partial_cmp(&(*b as f64)),
69            (Value::Uint(a), Value::Int(b)) => (*a as f64).partial_cmp(&(*b as f64)),
70            (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
71            (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
72            (Value::Uint(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
73            (Value::Float(a), Value::Uint(b)) => a.partial_cmp(&(*b as f64)),
74            // Incompatible types
75            _ => None,
76        }
77    }
78}
79
80/// Predicate type for document filtering
81type Predicate = Box<dyn Fn(&Automerge) -> bool + Send + Sync>;
82
83/// Query builder for Automerge documents
84///
85/// Provides a fluent API for building queries with filtering, sorting, and pagination.
86pub struct Query {
87    store: Arc<AutomergeStore>,
88    collection_name: String,
89    predicates: Vec<Predicate>,
90    sort_field: Option<(String, SortOrder)>,
91    limit: Option<usize>,
92    offset: usize,
93    doc_id_filter: Option<HashSet<String>>,
94}
95
96impl Query {
97    /// Create a new query for a collection
98    ///
99    /// # Arguments
100    ///
101    /// * `store` - AutomergeStore containing the documents
102    /// * `collection_name` - Name of the collection to query
103    pub fn new(store: Arc<AutomergeStore>, collection_name: &str) -> Self {
104        Self {
105            store,
106            collection_name: collection_name.to_string(),
107            predicates: Vec::new(),
108            sort_field: None,
109            limit: None,
110            offset: 0,
111            doc_id_filter: None,
112        }
113    }
114
115    /// Filter where field equals value
116    ///
117    /// # Example
118    ///
119    /// ```ignore
120    /// query.where_eq("operational", Value::Bool(true))
121    /// ```
122    pub fn where_eq(mut self, field: &str, value: Value) -> Self {
123        let field = field.to_string();
124        self.predicates.push(Box::new(move |doc| {
125            extract_field(doc, &field)
126                .map(|v| v == value)
127                .unwrap_or(false)
128        }));
129        self
130    }
131
132    /// Filter where field is greater than value
133    pub fn where_gt(mut self, field: &str, value: Value) -> Self {
134        let field = field.to_string();
135        self.predicates.push(Box::new(move |doc| {
136            extract_field(doc, &field)
137                .and_then(|v| v.partial_cmp(&value))
138                .map(|ord| ord == std::cmp::Ordering::Greater)
139                .unwrap_or(false)
140        }));
141        self
142    }
143
144    /// Filter where field is less than value
145    pub fn where_lt(mut self, field: &str, value: Value) -> Self {
146        let field = field.to_string();
147        self.predicates.push(Box::new(move |doc| {
148            extract_field(doc, &field)
149                .and_then(|v| v.partial_cmp(&value))
150                .map(|ord| ord == std::cmp::Ordering::Less)
151                .unwrap_or(false)
152        }));
153        self
154    }
155
156    /// Filter where field is greater than or equal to value
157    pub fn where_gte(mut self, field: &str, value: Value) -> Self {
158        let field = field.to_string();
159        self.predicates.push(Box::new(move |doc| {
160            extract_field(doc, &field)
161                .and_then(|v| v.partial_cmp(&value))
162                .map(|ord| ord != std::cmp::Ordering::Less)
163                .unwrap_or(false)
164        }));
165        self
166    }
167
168    /// Filter where field is less than or equal to value
169    pub fn where_lte(mut self, field: &str, value: Value) -> Self {
170        let field = field.to_string();
171        self.predicates.push(Box::new(move |doc| {
172            extract_field(doc, &field)
173                .and_then(|v| v.partial_cmp(&value))
174                .map(|ord| ord != std::cmp::Ordering::Greater)
175                .unwrap_or(false)
176        }));
177        self
178    }
179
180    /// Filter where array field contains value
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// query.where_contains("capabilities", Value::String("sensor".into()))
186    /// ```
187    pub fn where_contains(mut self, field: &str, value: Value) -> Self {
188        let field = field.to_string();
189        self.predicates.push(Box::new(move |doc| {
190            extract_array_contains(doc, &field, &value)
191        }));
192        self
193    }
194
195    /// Filter to only documents with IDs in the given set
196    ///
197    /// Used for integrating with spatial indices like GeohashIndex.
198    ///
199    /// # Example
200    ///
201    /// ```ignore
202    /// let nearby_ids = geohash_index.find_near(lat, lon)?;
203    /// query.filter_by_ids(&nearby_ids)
204    /// ```
205    pub fn filter_by_ids(mut self, ids: &[String]) -> Self {
206        self.doc_id_filter = Some(ids.iter().cloned().collect());
207        self
208    }
209
210    /// Sort results by field
211    ///
212    /// # Example
213    ///
214    /// ```ignore
215    /// query.order_by("timestamp", SortOrder::Desc)
216    /// ```
217    pub fn order_by(mut self, field: &str, order: SortOrder) -> Self {
218        self.sort_field = Some((field.to_string(), order));
219        self
220    }
221
222    /// Limit number of results
223    pub fn limit(mut self, n: usize) -> Self {
224        self.limit = Some(n);
225        self
226    }
227
228    /// Skip first n results (for pagination)
229    pub fn offset(mut self, n: usize) -> Self {
230        self.offset = n;
231        self
232    }
233
234    /// Execute the query and return matching documents
235    ///
236    /// Returns (doc_id, Automerge) tuples for all matching documents.
237    pub fn execute(self) -> Result<Vec<(String, Automerge)>> {
238        let prefix = format!("{}:", self.collection_name);
239        let all_docs = self.store.scan_prefix(&prefix)?;
240
241        // Filter by doc_id_filter if set
242        let filtered_by_id: Vec<(String, Automerge)> =
243            if let Some(ref id_filter) = self.doc_id_filter {
244                all_docs
245                    .into_iter()
246                    .filter(|(key, _)| {
247                        key.strip_prefix(&prefix)
248                            .map(|doc_id| id_filter.contains(doc_id))
249                            .unwrap_or(false)
250                    })
251                    .collect()
252            } else {
253                all_docs
254            };
255
256        // Apply predicates
257        let mut results: Vec<(String, Automerge)> = filtered_by_id
258            .into_iter()
259            .filter(|(_, doc)| self.predicates.iter().all(|pred| pred(doc)))
260            .map(|(key, doc)| {
261                let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
262                (doc_id, doc)
263            })
264            .collect();
265
266        // Sort if specified
267        if let Some((field, order)) = &self.sort_field {
268            results.sort_by(|(_, a), (_, b)| {
269                let val_a = extract_field(a, field);
270                let val_b = extract_field(b, field);
271                let cmp = match (val_a, val_b) {
272                    (Some(a), Some(b)) => a.partial_cmp(&b).unwrap_or(std::cmp::Ordering::Equal),
273                    (Some(_), None) => std::cmp::Ordering::Less,
274                    (None, Some(_)) => std::cmp::Ordering::Greater,
275                    (None, None) => std::cmp::Ordering::Equal,
276                };
277                match order {
278                    SortOrder::Asc => cmp,
279                    SortOrder::Desc => cmp.reverse(),
280                }
281            });
282        }
283
284        // Apply offset and limit
285        let results: Vec<(String, Automerge)> = results
286            .into_iter()
287            .skip(self.offset)
288            .take(self.limit.unwrap_or(usize::MAX))
289            .collect();
290
291        Ok(results)
292    }
293
294    /// Execute and return only document IDs
295    pub fn execute_ids(self) -> Result<Vec<String>> {
296        let results = self.execute()?;
297        Ok(results.into_iter().map(|(id, _)| id).collect())
298    }
299
300    /// Execute the query and return matching documents as JSON values.
301    pub fn execute_json(self) -> Result<Vec<(String, serde_json::Value)>> {
302        let results = self.execute()?;
303        Ok(results
304            .into_iter()
305            .map(|(id, doc)| (id, super::json_convert::automerge_to_json(&doc)))
306            .collect())
307    }
308
309    /// Count matching documents (without loading full documents)
310    pub fn count(self) -> Result<usize> {
311        // For now, we execute and count. Could be optimized later.
312        Ok(self.execute()?.len())
313    }
314}
315
316/// Extract a field value from an Automerge document
317///
318/// Supports nested field paths like "position.lat" and "data.fuel_percent".
319pub fn extract_field(doc: &Automerge, field: &str) -> Option<Value> {
320    let parts: Vec<&str> = field.split('.').collect();
321    extract_field_recursive(doc, automerge::ROOT, &parts)
322}
323
324fn extract_field_recursive(
325    doc: &Automerge,
326    obj_id: automerge::ObjId,
327    parts: &[&str],
328) -> Option<Value> {
329    if parts.is_empty() {
330        return None;
331    }
332
333    let field_name = parts[0];
334    let remaining = &parts[1..];
335
336    match doc.get(&obj_id, field_name) {
337        Ok(Some((value, _))) => {
338            if remaining.is_empty() {
339                // Reached the target field
340                automerge_value_to_query_value(&value)
341            } else {
342                // Need to recurse into nested object
343                match value {
344                    automerge::Value::Object(obj_type) => {
345                        if matches!(obj_type, automerge::ObjType::Map) {
346                            if let Ok(Some((automerge::Value::Object(_), nested_obj_id))) =
347                                doc.get(&obj_id, field_name)
348                            {
349                                return extract_field_recursive(doc, nested_obj_id, remaining);
350                            }
351                        }
352                        None
353                    }
354                    _ => None,
355                }
356            }
357        }
358        Ok(None) => None,
359        Err(_) => None,
360    }
361}
362
363/// Convert Automerge Value to Query Value
364fn automerge_value_to_query_value(value: &automerge::Value) -> Option<Value> {
365    match value {
366        automerge::Value::Scalar(scalar) => match scalar.as_ref() {
367            automerge::ScalarValue::Null => Some(Value::Null),
368            automerge::ScalarValue::Boolean(b) => Some(Value::Bool(*b)),
369            automerge::ScalarValue::Int(i) => Some(Value::Int(*i)),
370            automerge::ScalarValue::Uint(u) => Some(Value::Uint(*u)),
371            automerge::ScalarValue::F64(f) => Some(Value::Float(*f)),
372            automerge::ScalarValue::Str(s) => Some(Value::String(s.to_string())),
373            automerge::ScalarValue::Timestamp(t) => Some(Value::Timestamp(*t)),
374            automerge::ScalarValue::Bytes(_) => None, // Not comparable
375            automerge::ScalarValue::Counter(_) => None,
376            automerge::ScalarValue::Unknown { .. } => None,
377        },
378        automerge::Value::Object(_) => None, // Objects are not scalar values
379    }
380}
381
382/// Check if an array field contains a value
383fn extract_array_contains(doc: &Automerge, field: &str, target: &Value) -> bool {
384    let parts: Vec<&str> = field.split('.').collect();
385    extract_array_contains_recursive(doc, automerge::ROOT, &parts, target)
386}
387
388fn extract_array_contains_recursive(
389    doc: &Automerge,
390    obj_id: automerge::ObjId,
391    parts: &[&str],
392    target: &Value,
393) -> bool {
394    if parts.is_empty() {
395        return false;
396    }
397
398    let field_name = parts[0];
399    let remaining = &parts[1..];
400
401    match doc.get(&obj_id, field_name) {
402        Ok(Some((value, obj_id_ref))) => {
403            if remaining.is_empty() {
404                // Check if this is an array containing the target
405                match value {
406                    automerge::Value::Object(automerge::ObjType::List) => {
407                        // Iterate over array elements
408                        let len = doc.length(&obj_id_ref);
409                        for idx in 0..len {
410                            if let Ok(Some((elem_val, _))) = doc.get(&obj_id_ref, idx) {
411                                if let Some(query_val) = automerge_value_to_query_value(&elem_val) {
412                                    if &query_val == target {
413                                        return true;
414                                    }
415                                }
416                            }
417                        }
418                        false
419                    }
420                    _ => false,
421                }
422            } else {
423                // Recurse into nested object
424                match value {
425                    automerge::Value::Object(automerge::ObjType::Map) => {
426                        extract_array_contains_recursive(doc, obj_id_ref, remaining, target)
427                    }
428                    _ => false,
429                }
430            }
431        }
432        _ => false,
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use automerge::transaction::Transactable;
440    use tempfile::TempDir;
441
442    fn create_test_store() -> (Arc<AutomergeStore>, TempDir) {
443        let temp_dir = TempDir::new().unwrap();
444        let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
445        (store, temp_dir)
446    }
447
448    fn create_test_doc(fields: Vec<(&str, automerge::ScalarValue)>) -> Automerge {
449        let mut doc = Automerge::new();
450        doc.transact(|tx| {
451            for (key, value) in fields {
452                tx.put(automerge::ROOT, key, value)?;
453            }
454            Ok::<(), automerge::AutomergeError>(())
455        })
456        .unwrap();
457        doc
458    }
459
460    #[test]
461    fn test_value_comparison() {
462        assert!(Value::Int(5) > Value::Int(3));
463        assert!(Value::Float(3.15) > Value::Float(2.72));
464        assert!(Value::String("b".into()) > Value::String("a".into()));
465        assert!(Value::Bool(true) > Value::Bool(false));
466
467        // Cross-type numeric comparison
468        assert!(Value::Int(5) > Value::Uint(3));
469        assert!(Value::Float(5.0) > Value::Int(3));
470    }
471
472    #[test]
473    fn test_extract_field_simple() {
474        let doc = create_test_doc(vec![
475            ("name", automerge::ScalarValue::Str("test".into())),
476            ("count", automerge::ScalarValue::Int(42)),
477            ("active", automerge::ScalarValue::Boolean(true)),
478        ]);
479
480        assert_eq!(
481            extract_field(&doc, "name"),
482            Some(Value::String("test".into()))
483        );
484        assert_eq!(extract_field(&doc, "count"), Some(Value::Int(42)));
485        assert_eq!(extract_field(&doc, "active"), Some(Value::Bool(true)));
486        assert_eq!(extract_field(&doc, "nonexistent"), None);
487    }
488
489    #[test]
490    fn test_where_eq() {
491        let (store, _temp) = create_test_store();
492
493        // Create test documents
494        let doc1 = create_test_doc(vec![
495            ("name", automerge::ScalarValue::Str("alpha".into())),
496            ("operational", automerge::ScalarValue::Boolean(true)),
497        ]);
498        let doc2 = create_test_doc(vec![
499            ("name", automerge::ScalarValue::Str("beta".into())),
500            ("operational", automerge::ScalarValue::Boolean(false)),
501        ]);
502
503        store.put("test:doc1", &doc1).unwrap();
504        store.put("test:doc2", &doc2).unwrap();
505
506        let results = Query::new(store.clone(), "test")
507            .where_eq("operational", Value::Bool(true))
508            .execute()
509            .unwrap();
510
511        assert_eq!(results.len(), 1);
512        assert_eq!(results[0].0, "doc1");
513    }
514
515    #[test]
516    fn test_where_gt_lt() {
517        let (store, _temp) = create_test_store();
518
519        let doc1 = create_test_doc(vec![
520            ("name", automerge::ScalarValue::Str("a".into())),
521            ("score", automerge::ScalarValue::Int(10)),
522        ]);
523        let doc2 = create_test_doc(vec![
524            ("name", automerge::ScalarValue::Str("b".into())),
525            ("score", automerge::ScalarValue::Int(50)),
526        ]);
527        let doc3 = create_test_doc(vec![
528            ("name", automerge::ScalarValue::Str("c".into())),
529            ("score", automerge::ScalarValue::Int(90)),
530        ]);
531
532        store.put("test:doc1", &doc1).unwrap();
533        store.put("test:doc2", &doc2).unwrap();
534        store.put("test:doc3", &doc3).unwrap();
535
536        // Test where_gt
537        let results = Query::new(store.clone(), "test")
538            .where_gt("score", Value::Int(30))
539            .execute()
540            .unwrap();
541        assert_eq!(results.len(), 2);
542
543        // Test where_lt
544        let results = Query::new(store.clone(), "test")
545            .where_lt("score", Value::Int(60))
546            .execute()
547            .unwrap();
548        assert_eq!(results.len(), 2);
549
550        // Test combined
551        let results = Query::new(store.clone(), "test")
552            .where_gt("score", Value::Int(20))
553            .where_lt("score", Value::Int(80))
554            .execute()
555            .unwrap();
556        assert_eq!(results.len(), 1);
557        assert_eq!(results[0].0, "doc2");
558    }
559
560    #[test]
561    fn test_order_by() {
562        let (store, _temp) = create_test_store();
563
564        let doc1 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(3))]);
565        let doc2 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(1))]);
566        let doc3 = create_test_doc(vec![("priority", automerge::ScalarValue::Int(2))]);
567
568        store.put("test:a", &doc1).unwrap();
569        store.put("test:b", &doc2).unwrap();
570        store.put("test:c", &doc3).unwrap();
571
572        // Ascending order
573        let results = Query::new(store.clone(), "test")
574            .order_by("priority", SortOrder::Asc)
575            .execute()
576            .unwrap();
577        let priorities: Vec<i64> = results
578            .iter()
579            .filter_map(|(_, doc)| {
580                if let Some(Value::Int(p)) = extract_field(doc, "priority") {
581                    Some(p)
582                } else {
583                    None
584                }
585            })
586            .collect();
587        assert_eq!(priorities, vec![1, 2, 3]);
588
589        // Descending order
590        let results = Query::new(store.clone(), "test")
591            .order_by("priority", SortOrder::Desc)
592            .execute()
593            .unwrap();
594        let priorities: Vec<i64> = results
595            .iter()
596            .filter_map(|(_, doc)| {
597                if let Some(Value::Int(p)) = extract_field(doc, "priority") {
598                    Some(p)
599                } else {
600                    None
601                }
602            })
603            .collect();
604        assert_eq!(priorities, vec![3, 2, 1]);
605    }
606
607    #[test]
608    fn test_limit_offset() {
609        let (store, _temp) = create_test_store();
610
611        for i in 0..10 {
612            let doc = create_test_doc(vec![("index", automerge::ScalarValue::Int(i))]);
613            store.put(&format!("test:doc{}", i), &doc).unwrap();
614        }
615
616        // Test limit
617        let results = Query::new(store.clone(), "test")
618            .order_by("index", SortOrder::Asc)
619            .limit(3)
620            .execute()
621            .unwrap();
622        assert_eq!(results.len(), 3);
623
624        // Test offset
625        let results = Query::new(store.clone(), "test")
626            .order_by("index", SortOrder::Asc)
627            .offset(7)
628            .execute()
629            .unwrap();
630        assert_eq!(results.len(), 3);
631
632        // Test offset + limit (pagination)
633        let results = Query::new(store.clone(), "test")
634            .order_by("index", SortOrder::Asc)
635            .offset(3)
636            .limit(2)
637            .execute()
638            .unwrap();
639        assert_eq!(results.len(), 2);
640    }
641
642    #[test]
643    fn test_filter_by_ids() {
644        let (store, _temp) = create_test_store();
645
646        for i in 0..5 {
647            let doc = create_test_doc(vec![("value", automerge::ScalarValue::Int(i))]);
648            store.put(&format!("test:doc{}", i), &doc).unwrap();
649        }
650
651        let results = Query::new(store.clone(), "test")
652            .filter_by_ids(&["doc1".to_string(), "doc3".to_string()])
653            .execute()
654            .unwrap();
655
656        assert_eq!(results.len(), 2);
657        let ids: Vec<&str> = results.iter().map(|(id, _)| id.as_str()).collect();
658        assert!(ids.contains(&"doc1"));
659        assert!(ids.contains(&"doc3"));
660    }
661
662    #[test]
663    fn test_where_contains() {
664        let (store, _temp) = create_test_store();
665
666        // Create document with array
667        let mut doc1 = Automerge::new();
668        doc1.transact(|tx| {
669            tx.put(automerge::ROOT, "name", "node1")?;
670            let caps_id =
671                tx.put_object(automerge::ROOT, "capabilities", automerge::ObjType::List)?;
672            tx.insert(&caps_id, 0, "sensor")?;
673            tx.insert(&caps_id, 1, "comms")?;
674            Ok::<(), automerge::AutomergeError>(())
675        })
676        .unwrap();
677
678        let mut doc2 = Automerge::new();
679        doc2.transact(|tx| {
680            tx.put(automerge::ROOT, "name", "node2")?;
681            let caps_id =
682                tx.put_object(automerge::ROOT, "capabilities", automerge::ObjType::List)?;
683            tx.insert(&caps_id, 0, "weapon")?;
684            Ok::<(), automerge::AutomergeError>(())
685        })
686        .unwrap();
687
688        store.put("test:node1", &doc1).unwrap();
689        store.put("test:node2", &doc2).unwrap();
690
691        let results = Query::new(store.clone(), "test")
692            .where_contains("capabilities", Value::String("sensor".into()))
693            .execute()
694            .unwrap();
695
696        assert_eq!(results.len(), 1);
697        assert_eq!(results[0].0, "node1");
698    }
699
700    #[test]
701    fn test_execute_ids() {
702        let (store, _temp) = create_test_store();
703
704        let doc1 = create_test_doc(vec![("active", automerge::ScalarValue::Boolean(true))]);
705        let doc2 = create_test_doc(vec![("active", automerge::ScalarValue::Boolean(true))]);
706
707        store.put("test:a", &doc1).unwrap();
708        store.put("test:b", &doc2).unwrap();
709
710        let ids = Query::new(store.clone(), "test")
711            .where_eq("active", Value::Bool(true))
712            .execute_ids()
713            .unwrap();
714
715        assert_eq!(ids.len(), 2);
716    }
717
718    #[test]
719    fn test_count() {
720        let (store, _temp) = create_test_store();
721
722        for i in 0..5 {
723            let doc = create_test_doc(vec![("value", automerge::ScalarValue::Int(i))]);
724            store.put(&format!("test:doc{}", i), &doc).unwrap();
725        }
726
727        let count = Query::new(store.clone(), "test")
728            .where_gt("value", Value::Int(2))
729            .count()
730            .unwrap();
731
732        assert_eq!(count, 2);
733    }
734}