elasticsearch_dsl/search/aggregations/bucket/
composite_aggregation.rs

1use crate::search::*;
2use crate::util::*;
3use serde::{Serialize, Serializer};
4use serde_json::Value;
5
6/// A multi-bucket aggregation that creates composite buckets from different sources.
7///
8/// <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html>
9#[derive(Debug, Clone, Serialize, PartialEq)]
10pub struct CompositeAggregation {
11    composite: CompositeAggregationInner,
12
13    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
14    aggs: Aggregations,
15}
16
17#[derive(Debug, Clone, Serialize, PartialEq)]
18struct CompositeAggregationInner {
19    sources: Vec<CompositeSource>,
20
21    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
22    size: Option<u64>,
23
24    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
25    after: Option<AfterKey>,
26}
27
28impl Aggregation {
29    /// Creates an instance of [`CompositeAggregation`]
30    ///
31    /// - `sources` - A vector of `CompositeSource` which defines the sources for the composite aggregation.
32    pub fn composite(sources: Vec<CompositeSource>) -> CompositeAggregation {
33        CompositeAggregation {
34            composite: CompositeAggregationInner {
35                sources,
36                size: None,
37                after: None,
38            },
39            aggs: Aggregations::new(),
40        }
41    }
42}
43
44impl CompositeAggregation {
45    /// The `size` parameter can be set to define how many composite buckets should be returned.
46    ///
47    /// - `size` - The maximum number of composite buckets to be returned.
48    pub fn size(mut self, size: u64) -> Self {
49        self.composite.size = Some(size);
50        self
51    }
52
53    /// The `after` parameter can be set to paginate composite buckets.
54    ///
55    /// - `after` - The key to start after for pagination in composite aggregations.
56    pub fn after<T>(mut self, after: T) -> Self
57    where
58        T: Into<AfterKey>,
59    {
60        self.composite.after = Some(after.into());
61        self
62    }
63
64    add_aggregate!();
65}
66
67/// Represents the `after` key for pagination in composite aggregations.
68///
69/// The `AfterKey` is used to paginate through the composite aggregation results.
70/// It is typically a JSON object containing the values of the composite keys.
71#[derive(Debug, Clone, Serialize, PartialEq)]
72pub struct AfterKey(Value);
73
74impl From<Value> for AfterKey {
75    fn from(value: Value) -> Self {
76        AfterKey(value)
77    }
78}
79
80impl AfterKey {
81    /// Creates a new `AfterKey` instance from a JSON value.
82    ///
83    /// - `value` - The JSON value representing the `after` key.
84    pub fn new(value: Value) -> Self {
85        AfterKey(value)
86    }
87}
88
89/// Represents different types of sources for a composite aggregation.
90#[derive(Debug, Clone, PartialEq)]
91pub enum CompositeSource {
92    /// Terms source for the composite aggregation.
93    Terms {
94        /// The unique identifier for the terms source.
95        name: String,
96        /// The terms composite source.
97        terms: TermsCompositeSource,
98    },
99    /// Histogram source for the composite aggregation.
100    Histogram {
101        /// The unique identifier for the histogram source.
102        name: String,
103        /// The histogram composite source.
104        histogram: HistogramCompositeSource,
105    },
106    /// Date histogram source for the composite aggregation.
107    DateHistogram {
108        /// The unique identifier for the date histogram source.
109        name: String,
110        /// The date histogram composite source.
111        date_histogram: DateHistogramCompositeSource,
112    },
113}
114
115impl Serialize for CompositeSource {
116    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
117    where
118        S: Serializer,
119    {
120        let mut map = serde_json::Map::new();
121        match self {
122            CompositeSource::Terms { name, terms } => {
123                let _ = map.insert(name.clone(), serde_json::json!({ "terms": terms }));
124            }
125            CompositeSource::Histogram { name, histogram } => {
126                let _ = map.insert(name.clone(), serde_json::json!({ "histogram": histogram }));
127            }
128            CompositeSource::DateHistogram {
129                name,
130                date_histogram,
131            } => {
132                let _ = map.insert(
133                    name.clone(),
134                    serde_json::json!({ "date_histogram": date_histogram }),
135                );
136            }
137        }
138        map.serialize(serializer)
139    }
140}
141
142/// Represents a terms source in a composite aggregation.
143#[derive(Debug, Clone, Serialize, PartialEq)]
144pub struct TermsCompositeSource {
145    field: String,
146    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
147    missing_bucket: Option<bool>,
148    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
149    order: Option<SortOrder>,
150}
151
152/// Represents a histogram source in a composite aggregation.
153#[derive(Debug, Clone, Serialize, PartialEq)]
154pub struct HistogramCompositeSource {
155    field: String,
156    interval: f64,
157    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
158    missing_bucket: Option<bool>,
159    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
160    order: Option<SortOrder>,
161}
162
163/// Represents a date histogram source in a composite aggregation.
164#[derive(Debug, Clone, Serialize, PartialEq)]
165pub struct DateHistogramCompositeSource {
166    field: String,
167    calendar_interval: String,
168    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
169    missing_bucket: Option<bool>,
170    #[serde(skip_serializing_if = "ShouldSkip::should_skip")]
171    order: Option<SortOrder>,
172}
173
174impl CompositeSource {
175    /// Creates a terms source for the composite aggregation.
176    ///
177    /// - `name` - The unique identifier for the terms source.
178    /// - `field` - The field to perform the terms aggregation on.
179    pub fn terms(name: &str, field: &str) -> CompositeSource {
180        CompositeSource::Terms {
181            name: name.to_string(),
182            terms: TermsCompositeSource {
183                field: field.to_string(),
184                missing_bucket: None,
185                order: None,
186            },
187        }
188    }
189
190    /// Creates a histogram source for the composite aggregation.
191    ///
192    /// - `name` - The unique identifier for the histogram source.
193    /// - `field` - The field to perform the histogram aggregation on.
194    /// - `interval` - The interval for the histogram buckets.
195    pub fn histogram(name: &str, field: &str, interval: f64) -> CompositeSource {
196        CompositeSource::Histogram {
197            name: name.to_string(),
198            histogram: HistogramCompositeSource {
199                field: field.to_string(),
200                interval,
201                missing_bucket: None,
202                order: None,
203            },
204        }
205    }
206
207    /// Creates a date histogram source for the composite aggregation.
208    ///
209    /// - `name` - The unique identifier for the date histogram source.
210    /// - `field` - The field to perform the date histogram aggregation on.
211    /// - `calendar_interval` - The calendar interval for the date histogram buckets.
212    pub fn date_histogram(name: &str, field: &str, calendar_interval: &str) -> CompositeSource {
213        CompositeSource::DateHistogram {
214            name: name.to_string(),
215            date_histogram: DateHistogramCompositeSource {
216                field: field.to_string(),
217                calendar_interval: calendar_interval.to_string(),
218                missing_bucket: None,
219                order: None,
220            },
221        }
222    }
223}
224
225impl TermsCompositeSource {
226    /// Sets the `missing_bucket` parameter for the terms source.
227    ///
228    /// - `missing_bucket` - Whether to include documents with missing values in the bucket.
229    pub fn missing_bucket(mut self, missing_bucket: bool) -> Self {
230        self.missing_bucket = Some(missing_bucket);
231        self
232    }
233
234    /// Sets the `order` parameter for the terms source.
235    ///
236    /// - `order` - The order of the terms in the bucket.
237    pub fn order(mut self, order: SortOrder) -> Self {
238        self.order = Some(order);
239        self
240    }
241}
242
243impl HistogramCompositeSource {
244    /// Sets the `missing_bucket` parameter for the histogram source.
245    ///
246    /// - `missing_bucket` - Whether to include documents with missing values in the bucket.
247    pub fn missing_bucket(mut self, missing_bucket: bool) -> Self {
248        self.missing_bucket = Some(missing_bucket);
249        self
250    }
251
252    /// Sets the `order` parameter for the histogram source.
253    ///
254    /// - `order` - The order of the histogram buckets.
255    pub fn order(mut self, order: SortOrder) -> Self {
256        self.order = Some(order);
257        self
258    }
259}
260
261impl DateHistogramCompositeSource {
262    /// Sets the `missing_bucket` parameter for the date histogram source.
263    ///
264    /// - `missing_bucket` - Whether to include documents with missing values in the bucket.
265    pub fn missing_bucket(mut self, missing_bucket: bool) -> Self {
266        self.missing_bucket = Some(missing_bucket);
267        self
268    }
269
270    /// Sets the `order` parameter for the date histogram source.
271    ///
272    /// - `order` - The order of the date histogram buckets.
273    pub fn order(mut self, order: SortOrder) -> Self {
274        self.order = Some(order);
275        self
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn serialization() {
285        assert_serialize_aggregation(
286            Aggregation::composite(vec![CompositeSource::terms("test_field", "field_name")]),
287            json!({ "composite": { "sources": [{ "test_field": { "terms": { "field": "field_name" } } }] } }),
288        );
289
290        assert_serialize_aggregation(
291            Aggregation::composite(vec![CompositeSource::terms("test_field", "field_name")])
292                .size(10)
293                .after(serde_json::json!({"test_field": "after_key"})),
294            json!({
295                "composite": {
296                    "sources": [{ "test_field": { "terms": { "field": "field_name" } } }],
297                    "size": 10,
298                    "after": { "test_field": "after_key" }
299                }
300            }),
301        );
302
303        assert_serialize_aggregation(
304            Aggregation::composite(vec![CompositeSource::terms("test_field", "field_name")]),
305            json!({
306                "composite": {
307                    "sources": [{ "test_field": { "terms": { "field": "field_name" } } }],
308                }
309            }),
310        );
311    }
312}