Skip to main content

sentinel_dbms/collection/
aggregation.rs

1use futures::TryStreamExt as _;
2use serde_json::{json, Value};
3use tracing::{debug, trace};
4
5use crate::{filtering::matches_filters, Document, Result};
6use super::coll::Collection;
7
8#[allow(
9    clippy::multiple_inherent_impl,
10    reason = "multiple impl blocks for Collection are intentional for organization"
11)]
12impl Collection {
13    /// Performs aggregation operations on documents matching the given filters.
14    ///
15    /// Supported aggregations:
16    /// - `Count`: Count of matching documents
17    /// - `Sum(field)`: Sum of numeric values in the specified field
18    /// - `Avg(field)`: Average of numeric values in the specified field
19    /// - `Min(field)`: Minimum value in the specified field
20    /// - `Max(field)`: Maximum value in the specified field
21    ///
22    /// # Arguments
23    ///
24    /// * `filters` - Filters to apply before aggregation
25    /// * `aggregation` - The aggregation operation to perform
26    ///
27    /// # Returns
28    ///
29    /// Returns the aggregated result as a JSON `Value`.
30    ///
31    /// # Examples
32    ///
33    /// ```rust
34    /// use sentinel_dbms::{Store, Collection, Filter, Aggregation};
35    /// use serde_json::json;
36    ///
37    /// # async fn example() -> sentinel_dbms::Result<()> {
38    /// let store = Store::new("/path/to/data", None).await?;
39    /// let collection = store.collection("products").await?;
40    ///
41    /// // Insert some test data
42    /// collection.insert("prod-1", json!({"name": "Widget", "price": 10.0})).await?;
43    /// collection.insert("prod-2", json!({"name": "Gadget", "price": 20.0})).await?;
44    ///
45    /// // Count all products
46    /// let count = collection.aggregate(vec![], Aggregation::Count).await?;
47    /// assert_eq!(count, json!(2));
48    ///
49    /// // Sum of all prices
50    /// let total = collection.aggregate(vec![], Aggregation::Sum("price".to_string())).await?;
51    /// assert_eq!(total, json!(30.0));
52    /// # Ok(())
53    /// # }
54    /// ```
55    pub async fn aggregate(&self, filters: Vec<crate::Filter>, aggregation: crate::Aggregation) -> Result<Value> {
56        trace!("Performing aggregation: {:?}", aggregation);
57
58        // Get all documents (we'll filter them)
59        let mut stream = self.all();
60
61        let mut count = 0usize;
62        let mut sum = 0.0f64;
63        let mut min = f64::INFINITY;
64        let mut max = f64::NEG_INFINITY;
65        let mut numeric_count = 0usize;
66
67        while let Some(doc) = stream.try_next().await? {
68            // Apply filters
69            if !filters.is_empty() {
70                let filter_refs: Vec<&crate::Filter> = filters.iter().collect();
71                if !matches_filters(&doc, &filter_refs) {
72                    continue;
73                }
74            }
75
76            count = count.saturating_add(1);
77
78            // Extract value for field-based aggregations
79            if let crate::Aggregation::Sum(ref field) |
80            crate::Aggregation::Avg(ref field) |
81            crate::Aggregation::Min(ref field) |
82            crate::Aggregation::Max(ref field) = aggregation &&
83                let Some(value) = Self::extract_numeric_value(&doc, field)
84            {
85                sum += value;
86                min = min.min(value);
87                max = max.max(value);
88                numeric_count = numeric_count.checked_add(1).unwrap_or(numeric_count);
89            }
90        }
91
92        let result = match aggregation {
93            crate::Aggregation::Count => json!(count),
94            crate::Aggregation::Sum(_) => json!(sum),
95            crate::Aggregation::Avg(_) => {
96                if numeric_count == 0 {
97                    json!(null)
98                }
99                else {
100                    json!(sum / numeric_count as f64)
101                }
102            },
103            crate::Aggregation::Min(_) => {
104                if min == f64::INFINITY {
105                    json!(null)
106                }
107                else {
108                    json!(min)
109                }
110            },
111            crate::Aggregation::Max(_) => {
112                if max == f64::NEG_INFINITY {
113                    json!(null)
114                }
115                else {
116                    json!(max)
117                }
118            },
119        };
120
121        debug!("Aggregation result: {}", result);
122        Ok(result)
123    }
124
125    /// Extracts a numeric value from a document field for aggregation operations.
126    pub fn extract_numeric_value(doc: &Document, field: &str) -> Option<f64> {
127        doc.data().get(field).and_then(|v| {
128            match *v {
129                Value::Number(ref n) => n.as_f64(),
130                Value::Null | Value::Bool(_) | Value::String(_) | Value::Array(_) | Value::Object(_) => None,
131            }
132        })
133    }
134}