cognite/dto/data_modeling/records/
aggregates.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use serde_with::skip_serializing_none;
5
6use crate::{AdvancedFilter, PropertyIdentifier};
7
8use super::LastUpdatedTimeFilter;
9
10#[skip_serializing_none]
11#[derive(Debug, Default, Clone, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13/// Object used to define the fixed bounds of a histogram.
14pub struct Bounds<T> {
15    /// Lower bound of the histogram.
16    pub min: Option<T>,
17    /// Upper bound of the histogram.
18    pub max: Option<T>,
19}
20
21#[skip_serializing_none]
22#[derive(Debug, Default, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "camelCase")]
24/// A simple aggregate on a property.
25pub struct SimpleAggregate {
26    /// Property to aggregate over.
27    pub property: Vec<String>,
28}
29
30#[skip_serializing_none]
31#[derive(Debug, Default, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33/// An aggregate that returns unique values of a property.
34pub struct UniqueValuesAggregate {
35    /// Property to aggregate over.
36    pub property: Vec<String>,
37    /// Nested aggregates, to further group the unique values.
38    pub aggregates: Option<HashMap<String, RecordsAggregate>>,
39    /// The number of top buckets returned. The default limit is 10 items.
40    pub size: Option<u32>,
41}
42
43#[skip_serializing_none]
44#[derive(Debug, Default, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46/// This aggregate is used to create a histogram of numeric values
47/// from the specified property.
48pub struct NumberHistogramAggregate {
49    /// Property to aggregate over.
50    pub property: Vec<String>,
51    /// To limit the range of buckets in the histogram.
52    /// It is particularly useful in the case of open data ranges
53    /// that can result in a very large number of buckets.
54    /// One or both bounds must be specified.
55    pub hard_bounds: Option<Bounds<f64>>,
56    /// The interval between each bucket.
57    pub interval: f64,
58    /// Nested aggregates, to further group the histogram values.
59    pub aggregates: Option<HashMap<String, RecordsAggregate>>,
60}
61
62#[derive(Serialize, Deserialize, Clone, Debug)]
63/// Calendar interval between each bucket.
64pub enum CalendarInterval {
65    #[serde(rename = "1y")]
66    /// 1 year
67    Year,
68    #[serde(rename = "1q")]
69    /// 1 quarter
70    Quarter,
71    #[serde(rename = "1M")]
72    /// 1 month
73    Month,
74    #[serde(rename = "1w")]
75    /// 1 week
76    Week,
77    #[serde(rename = "1d")]
78    /// 1 day
79    Day,
80    #[serde(rename = "1h")]
81    /// 1 hour
82    Hour,
83    #[serde(rename = "1m")]
84    /// 1 minute
85    Minute,
86    #[serde(rename = "1s")]
87    /// 1 second
88    Second,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "camelCase")]
93/// The interval between each bucket in a time histogram.
94pub enum TimeHistogramInterval {
95    /// Calendar interval
96    CalendarInterval(CalendarInterval),
97    /// Fixed interval, as a duration, e.g. 3m, 400h, 25d, etc.
98    FixedInterval(String),
99}
100
101#[skip_serializing_none]
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104/// A time histogram aggregator function. This function will generate a histogram
105/// from the values of the specified property. It uses the specified
106/// calendar or fixed interval.
107pub struct TimeHistogramAggregate {
108    /// Property to aggregate over.
109    pub property: Vec<String>,
110    #[serde(flatten)]
111    /// The interval between each bucket.
112    pub interval: TimeHistogramInterval,
113    /// To limit the range of buckets in the histogram.
114    /// It is particularly useful in the case of open data ranges
115    /// that can result in a very large number of buckets.
116    /// One or both bounds must be specified.
117    pub hard_bounds: Option<Bounds<String>>,
118    /// Nested aggregates, to further group the histogram values.
119    pub aggregates: Option<HashMap<String, RecordsAggregate>>,
120}
121
122#[derive(Serialize, Deserialize, Clone, Debug)]
123
124/// The moving function to use in the moving function aggregate.
125pub enum MovingFunction {
126    #[serde(rename = "MovingFunctions.max")]
127    /// The maximum value in the window.
128    Max,
129    #[serde(rename = "MovingFunctions.min")]
130    /// The minimum value in the window.
131    Min,
132    #[serde(rename = "MovingFunctions.sum")]
133    /// The sum of the values in the window.
134    Sum,
135    #[serde(rename = "MovingFunctions.unweightedAvg")]
136    /// The unweighted average of the values in the window.
137    UnweightedAvg,
138    #[serde(rename = "MovingFunctions.linearWeightedAvg")]
139    /// The linear weighted average of the values in the window.
140    LinearWeightedAvg,
141    /// Some other function.
142    #[serde(untagged)]
143    Other(String),
144}
145
146#[skip_serializing_none]
147#[derive(Debug, Clone, Serialize, Deserialize)]
148#[serde(rename_all = "camelCase")]
149/// Given an ordered series of data, the Moving Function aggregation will slide
150/// a window across the data and allow the user to specify a function
151/// that is executed on each window of data. A number of common functions
152/// are predefined such as min/max, moving averages, etc.
153/// Customer defined functions are not allowed now.
154/// The aggregate must be embedded inside of a numberHistogram or
155/// timeHistogram aggregate. It can be embedded like any other metric aggregate.
156pub struct MovingFunctionAggregate {
157    /// The path to the buckets to use for the moving function.
158    /// Syntax is `[AggregateName][MultiBucketKey]?(>[AggregateName])*`.
159    /// See documentation for more details.
160    pub buckets_path: String,
161    /// The size of window to slide accross the histogram.
162    pub window: u32,
163    /// The function that should be executed on each window of data.
164    pub function: MovingFunction,
165}
166
167#[skip_serializing_none]
168#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170/// A multi-bucket aggregation where each bucket contains the records that matches
171/// a filter in the filters list.
172///
173/// Note: only 30 seuch buckets are allowed accross all filter aggregates in a
174/// single request.
175pub struct FilterAggregate {
176    /// List of filters to describe each bucket.
177    pub filters: Vec<AdvancedFilter>,
178    /// Nested aggregates, to further group the filter values.
179    pub aggregates: Option<HashMap<String, RecordsAggregate>>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183#[serde(rename_all = "camelCase")]
184/// Aggregates on records.
185pub enum RecordsAggregate {
186    /// Average aggregate.
187    Avg(SimpleAggregate),
188    /// Count aggregate.
189    Count(SimpleAggregate),
190    /// Minimum aggregate.
191    Min(SimpleAggregate),
192    /// Maximum aggregate.
193    Max(SimpleAggregate),
194    /// Sum aggregate.
195    Sum(SimpleAggregate),
196    /// Unique values aggregate.
197    UniqueValues(UniqueValuesAggregate),
198    /// Number histogram aggregate.
199    NumberHistogram(NumberHistogramAggregate),
200    /// Time histogram aggregate.
201    TimeHistogram(TimeHistogramAggregate),
202    /// Moving function aggregate.
203    MovingFunction(MovingFunctionAggregate),
204    /// Filter aggregate.
205    Filters(FilterAggregate),
206}
207
208impl RecordsAggregate {
209    /// Create an average aggregate on the specified property.
210    pub fn average(property: impl PropertyIdentifier) -> Self {
211        RecordsAggregate::Avg(SimpleAggregate {
212            property: property.into_identifier(),
213        })
214    }
215
216    /// Create a count aggregate on the specified property.
217    pub fn count(property: impl PropertyIdentifier) -> Self {
218        RecordsAggregate::Count(SimpleAggregate {
219            property: property.into_identifier(),
220        })
221    }
222
223    /// Create a min aggregate on the specified property.
224    pub fn min(property: impl PropertyIdentifier) -> Self {
225        RecordsAggregate::Min(SimpleAggregate {
226            property: property.into_identifier(),
227        })
228    }
229
230    /// Create a max aggregate on the specified property.
231    pub fn max(property: impl PropertyIdentifier) -> Self {
232        RecordsAggregate::Max(SimpleAggregate {
233            property: property.into_identifier(),
234        })
235    }
236
237    /// Create a sum aggregate on the specified property.
238    pub fn sum(property: impl PropertyIdentifier) -> Self {
239        RecordsAggregate::Sum(SimpleAggregate {
240            property: property.into_identifier(),
241        })
242    }
243
244    /// Create a unique values aggregate on the specified property.
245    ///
246    /// * `size` is the maximum number of unique values to return, default is 10.
247    /// * `aggregates` is a map of nested aggregates to further group the unique values.
248    pub fn unique_values(
249        property: impl PropertyIdentifier,
250        size: Option<u32>,
251        aggregates: Option<HashMap<String, RecordsAggregate>>,
252    ) -> Self {
253        RecordsAggregate::UniqueValues(UniqueValuesAggregate {
254            property: property.into_identifier(),
255            size,
256            aggregates,
257        })
258    }
259
260    /// Create a number histogram aggregate on the specified property.
261    ///
262    /// * `interval` is the interval between each bucket.
263    /// * `hard_bounds` is the fixed bounds of the histogram.
264    /// * `aggregates` is a map of nested aggregates to further group the histogram values.
265    pub fn number_histogram(
266        property: impl PropertyIdentifier,
267        interval: f64,
268        hard_bounds: Option<Bounds<f64>>,
269        aggregates: Option<HashMap<String, RecordsAggregate>>,
270    ) -> Self {
271        RecordsAggregate::NumberHistogram(NumberHistogramAggregate {
272            property: property.into_identifier(),
273            hard_bounds,
274            interval,
275            aggregates,
276        })
277    }
278
279    /// Create a time histogram aggregate on the specified property.
280    ///
281    /// * `interval` is the interval between each bucket.
282    /// * `hard_bounds` is the fixed bounds of the histogram.
283    /// * `aggregates` is a map of nested aggregates to further group the histogram values.
284    pub fn time_histogram(
285        property: impl PropertyIdentifier,
286        interval: TimeHistogramInterval,
287        hard_bounds: Option<Bounds<String>>,
288        aggregates: Option<HashMap<String, RecordsAggregate>>,
289    ) -> Self {
290        RecordsAggregate::TimeHistogram(TimeHistogramAggregate {
291            property: property.into_identifier(),
292            interval,
293            hard_bounds,
294            aggregates,
295        })
296    }
297
298    /// Create a moving function aggregate.
299    ///
300    /// * `buckets_path` is the path to the buckets to use for the moving function.
301    /// * `window` is the size of window to slide across the histogram.
302    /// * `function` is the function that should be executed on each window of data.
303    pub fn moving_function(
304        buckets_path: impl Into<String>,
305        window: u32,
306        function: MovingFunction,
307    ) -> Self {
308        RecordsAggregate::MovingFunction(MovingFunctionAggregate {
309            buckets_path: buckets_path.into(),
310            window,
311            function,
312        })
313    }
314
315    /// Create a filter aggregate.
316    ///
317    /// * `filters` is a list of filters to describe each bucket.
318    /// * `aggregates` is a map of nested aggregates to further group the filter values.
319    pub fn filters(
320        filters: Vec<AdvancedFilter>,
321        aggregates: Option<HashMap<String, RecordsAggregate>>,
322    ) -> Self {
323        RecordsAggregate::Filters(FilterAggregate {
324            filters,
325            aggregates,
326        })
327    }
328}
329
330/// Request for aggregates on records.
331#[skip_serializing_none]
332#[derive(Debug, Clone, Serialize, Deserialize)]
333#[serde(rename_all = "camelCase")]
334pub struct RecordsAggregateRequest {
335    /// Matches records with the last updated time within the provided range.
336    /// This attribute is mandatory, and the maximum interval it can define
337    /// is limited by the stream settings.
338    pub last_updated_time: LastUpdatedTimeFilter,
339    /// A custom filter to apply to the records to include in the aggregate result.
340    pub filter: Option<AdvancedFilter>,
341    /// A dictionary of requested aggregates with client defined names/identifiers.
342    pub aggregates: HashMap<String, RecordsAggregate>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
346#[serde(untagged)]
347/// The value of a multivalued aggregate.
348pub enum AggregateValue {
349    /// A string value
350    String(String),
351    /// An integer value
352    Integer(i64),
353    /// A float value
354    Number(f64),
355    /// A boolean value
356    Boolean(bool),
357}
358
359impl From<String> for AggregateValue {
360    fn from(value: String) -> Self {
361        AggregateValue::String(value)
362    }
363}
364
365impl<'a> From<&'a str> for AggregateValue {
366    fn from(value: &'a str) -> Self {
367        AggregateValue::String(value.to_string())
368    }
369}
370impl From<i64> for AggregateValue {
371    fn from(value: i64) -> Self {
372        AggregateValue::Integer(value)
373    }
374}
375impl From<f64> for AggregateValue {
376    fn from(value: f64) -> Self {
377        AggregateValue::Number(value)
378    }
379}
380impl From<bool> for AggregateValue {
381    fn from(value: bool) -> Self {
382        AggregateValue::Boolean(value)
383    }
384}
385
386#[skip_serializing_none]
387#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(rename_all = "camelCase")]
389/// A bucket of unique values in an aggregate.
390pub struct UniqueValuesBucket {
391    /// The number of items with the given value.
392    pub count: u64,
393    /// The unique value of this bucket.
394    pub value: AggregateValue,
395    /// The nested aggregates for this bucket.
396    pub aggregates: Option<HashMap<String, AggregateResult>>,
397}
398
399#[skip_serializing_none]
400#[derive(Debug, Clone, Serialize, Deserialize)]
401#[serde(rename_all = "camelCase")]
402/// A bucket of a histogram aggregate.
403pub struct HistogramBucket<T> {
404    /// The number of values in this bucket.
405    pub count: u64,
406    /// The lower bound of this bucket.
407    pub interval_start: T,
408    /// The nested aggregates for this bucket.
409    pub aggregates: Option<HashMap<String, AggregateResult>>,
410}
411
412#[skip_serializing_none]
413#[derive(Debug, Clone, Serialize, Deserialize)]
414#[serde(rename_all = "camelCase")]
415/// A bucket of a filter aggregate.
416pub struct FilterBucket {
417    /// The number of values in this bucket.
418    pub count: u64,
419    /// The nested aggregates for this bucket.
420    pub aggregates: Option<HashMap<String, AggregateResult>>,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
424#[serde(untagged)]
425/// A bucket of an aggregate.
426pub enum AggregateBuckets {
427    /// A bucket of unique values.
428    UniqueValues(Vec<UniqueValuesBucket>),
429    /// A bucket of a histogram aggregate.
430    NumberHistogram(Vec<HistogramBucket<f64>>),
431    /// A bucket of a time histogram aggregate.
432    TimeHistogram(Vec<HistogramBucket<String>>),
433    /// A bucket of a filter aggregate.
434    Filter(Vec<FilterBucket>),
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
438#[serde(rename_all = "camelCase")]
439/// The result of an aggregate.
440pub enum AggregateResult {
441    /// An average aggregate.
442    Avg(f64),
443    /// A count aggregate.
444    Count(u64),
445    /// A minimum aggregate.
446    Min(f64),
447    /// A maximum aggregate.
448    Max(f64),
449    /// A sum aggregate.
450    Sum(f64),
451    /// Buckets of a unique values or histogram aggregate.
452    Buckets(AggregateBuckets),
453    /// A moving function aggregate.
454    MovingFunction(f64),
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(rename_all = "camelCase")]
459/// The result of a request for aggregates.
460pub struct RecordsAggregateResult {
461    /// Retrieved aggregates, by user defined names.
462    pub aggregates: HashMap<String, AggregateResult>,
463}