sentinel_dbms/collection/aggregation.rs
1use futures::TryStreamExt as _;
2use serde_json::{json, Value};
3use tracing::{debug, trace};
4
5use crate::{filtering::matches_filters, Document, Result};
6use super::coll::Collection;
7
8#[allow(
9 clippy::multiple_inherent_impl,
10 reason = "multiple impl blocks for Collection are intentional for organization"
11)]
12impl Collection {
13 /// Performs aggregation operations on documents matching the given filters.
14 ///
15 /// Supported aggregations:
16 /// - `Count`: Count of matching documents
17 /// - `Sum(field)`: Sum of numeric values in the specified field
18 /// - `Avg(field)`: Average of numeric values in the specified field
19 /// - `Min(field)`: Minimum value in the specified field
20 /// - `Max(field)`: Maximum value in the specified field
21 ///
22 /// # Arguments
23 ///
24 /// * `filters` - Filters to apply before aggregation
25 /// * `aggregation` - The aggregation operation to perform
26 ///
27 /// # Returns
28 ///
29 /// Returns the aggregated result as a JSON `Value`.
30 ///
31 /// # Examples
32 ///
33 /// ```rust
34 /// use sentinel_dbms::{Store, Collection, Filter, Aggregation};
35 /// use serde_json::json;
36 ///
37 /// # async fn example() -> sentinel_dbms::Result<()> {
38 /// let store = Store::new("/path/to/data", None).await?;
39 /// let collection = store.collection("products").await?;
40 ///
41 /// // Insert some test data
42 /// collection.insert("prod-1", json!({"name": "Widget", "price": 10.0})).await?;
43 /// collection.insert("prod-2", json!({"name": "Gadget", "price": 20.0})).await?;
44 ///
45 /// // Count all products
46 /// let count = collection.aggregate(vec![], Aggregation::Count).await?;
47 /// assert_eq!(count, json!(2));
48 ///
49 /// // Sum of all prices
50 /// let total = collection.aggregate(vec![], Aggregation::Sum("price".to_string())).await?;
51 /// assert_eq!(total, json!(30.0));
52 /// # Ok(())
53 /// # }
54 /// ```
55 pub async fn aggregate(&self, filters: Vec<crate::Filter>, aggregation: crate::Aggregation) -> Result<Value> {
56 trace!("Performing aggregation: {:?}", aggregation);
57
58 // Get all documents (we'll filter them)
59 let mut stream = self.all();
60
61 let mut count = 0usize;
62 let mut sum = 0.0f64;
63 let mut min = f64::INFINITY;
64 let mut max = f64::NEG_INFINITY;
65 let mut numeric_count = 0usize;
66
67 while let Some(doc) = stream.try_next().await? {
68 // Apply filters
69 if !filters.is_empty() {
70 let filter_refs: Vec<&crate::Filter> = filters.iter().collect();
71 if !matches_filters(&doc, &filter_refs) {
72 continue;
73 }
74 }
75
76 count = count.saturating_add(1);
77
78 // Extract value for field-based aggregations
79 if let crate::Aggregation::Sum(ref field) |
80 crate::Aggregation::Avg(ref field) |
81 crate::Aggregation::Min(ref field) |
82 crate::Aggregation::Max(ref field) = aggregation &&
83 let Some(value) = Self::extract_numeric_value(&doc, field)
84 {
85 sum += value;
86 min = min.min(value);
87 max = max.max(value);
88 numeric_count = numeric_count.checked_add(1).unwrap_or(numeric_count);
89 }
90 }
91
92 let result = match aggregation {
93 crate::Aggregation::Count => json!(count),
94 crate::Aggregation::Sum(_) => json!(sum),
95 crate::Aggregation::Avg(_) => {
96 if numeric_count == 0 {
97 json!(null)
98 }
99 else {
100 json!(sum / numeric_count as f64)
101 }
102 },
103 crate::Aggregation::Min(_) => {
104 if min == f64::INFINITY {
105 json!(null)
106 }
107 else {
108 json!(min)
109 }
110 },
111 crate::Aggregation::Max(_) => {
112 if max == f64::NEG_INFINITY {
113 json!(null)
114 }
115 else {
116 json!(max)
117 }
118 },
119 };
120
121 debug!("Aggregation result: {}", result);
122 Ok(result)
123 }
124
125 /// Extracts a numeric value from a document field for aggregation operations.
126 pub fn extract_numeric_value(doc: &Document, field: &str) -> Option<f64> {
127 doc.data().get(field).and_then(|v| {
128 match *v {
129 Value::Number(ref n) => n.as_f64(),
130 Value::Null | Value::Bool(_) | Value::String(_) | Value::Array(_) | Value::Object(_) => None,
131 }
132 })
133 }
134}