Skip to main content

vantage_mongodb/select/
pipeline.rs

1//! Aggregation pipeline builders for MongoSelect.
2
3use bson::{Bson, Document, doc};
4
5use super::MongoSelect;
6
7impl MongoSelect {
8    /// Build a count aggregation pipeline: [$match, $count].
9    pub async fn as_count_pipeline(&self) -> vantage_core::Result<Vec<Document>> {
10        let filter = self.build_filter().await?;
11        let mut pipeline = Vec::new();
12        if !filter.is_empty() {
13            pipeline.push(doc! { "$match": filter });
14        }
15        pipeline.push(doc! { "$count": "count" });
16        Ok(pipeline)
17    }
18
19    /// Build a $group aggregation pipeline with an accumulator.
20    /// `func` is e.g. "$sum", "$max", "$min". `field` is the column name.
21    pub async fn as_aggregate_pipeline(
22        &self,
23        func: &str,
24        field: &str,
25    ) -> vantage_core::Result<Vec<Document>> {
26        let filter = self.build_filter().await?;
27        let field_ref = format!("${}", field);
28        let mut pipeline = Vec::new();
29        if !filter.is_empty() {
30            pipeline.push(doc! { "$match": filter });
31        }
32        pipeline.push(doc! { "$group": { "_id": Bson::Null, "val": { func: field_ref } } });
33        Ok(pipeline)
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use bson::doc;
40    use vantage_expressions::Selectable;
41
42    use super::*;
43
44    #[tokio::test]
45    async fn test_count_pipeline_empty() {
46        let s = MongoSelect::new();
47        let pipeline = s.as_count_pipeline().await.unwrap();
48        assert_eq!(pipeline.len(), 1);
49        assert_eq!(pipeline[0], doc! { "$count": "count" });
50    }
51
52    #[tokio::test]
53    async fn test_count_pipeline_with_filter() {
54        let s = MongoSelect::new().with_condition(doc! { "active": true });
55        let pipeline = s.as_count_pipeline().await.unwrap();
56        assert_eq!(pipeline.len(), 2);
57        assert_eq!(pipeline[0], doc! { "$match": { "active": true } });
58        assert_eq!(pipeline[1], doc! { "$count": "count" });
59    }
60
61    #[tokio::test]
62    async fn test_aggregate_pipeline_sum() {
63        let s = MongoSelect::new();
64        let pipeline = s.as_aggregate_pipeline("$sum", "price").await.unwrap();
65        assert_eq!(pipeline.len(), 1);
66        assert_eq!(
67            pipeline[0],
68            doc! { "$group": { "_id": Bson::Null, "val": { "$sum": "$price" } } }
69        );
70    }
71
72    #[tokio::test]
73    async fn test_aggregate_pipeline_max_with_filter() {
74        let s = MongoSelect::new().with_condition(doc! { "is_deleted": false });
75        let pipeline = s.as_aggregate_pipeline("$max", "price").await.unwrap();
76        assert_eq!(pipeline.len(), 2);
77        assert_eq!(pipeline[0], doc! { "$match": { "is_deleted": false } });
78        assert_eq!(
79            pipeline[1],
80            doc! { "$group": { "_id": Bson::Null, "val": { "$max": "$price" } } }
81        );
82    }
83}