cognite/api/core/
time_series.rs

1mod datapoints_stream;
2
3use std::collections::HashSet;
4use std::iter::FromIterator;
5
6use futures::FutureExt;
7use futures::Stream;
8use serde::Serialize;
9
10use crate::api::core::time_series::datapoints_stream::DatapointsStream;
11use crate::api::data_modeling::instances::Instances;
12use crate::api::resource::*;
13use crate::dto::core::datapoint::*;
14use crate::dto::core::time_series::*;
15use crate::error::Result;
16use crate::get_missing_from_result;
17use crate::utils::execute_with_parallelism;
18use crate::IdentityList;
19use crate::IdentityOrInstance;
20use crate::IdentityOrInstanceList;
21use crate::IgnoreUnknownIds;
22use crate::Items;
23use crate::ItemsVec;
24use crate::Patch;
25
26pub use datapoints_stream::{DataPointRef, DatapointsStreamOptions, EitherDataPoint};
27
28/// A time series consists of a sequence of data points connected to a single asset.
29/// For example, a water pump asset can have a temperature time series taht records a data point in
30/// units of °C every second.
31pub type TimeSeriesResource = Resource<TimeSeries>;
32
33impl WithBasePath for TimeSeriesResource {
34    const BASE_PATH: &'static str = "timeseries";
35}
36
37impl List<TimeSeriesQuery, TimeSeries> for TimeSeriesResource {}
38impl Create<AddTimeSeries, TimeSeries> for TimeSeriesResource {}
39impl FilterItems<TimeSeriesFilter, TimeSeries> for TimeSeriesResource {}
40impl FilterWithRequest<TimeSeriesFilterRequest, TimeSeries> for TimeSeriesResource {}
41impl SearchItems<'_, TimeSeriesFilter, TimeSeriesSearch, TimeSeries> for TimeSeriesResource {}
42impl<R> RetrieveWithIgnoreUnknownIds<IdentityOrInstanceList<R>, TimeSeries> for TimeSeriesResource
43where
44    IdentityOrInstanceList<R>: Serialize,
45    R: Send + Sync,
46{
47}
48impl Update<Patch<PatchTimeSeries>, TimeSeries> for TimeSeriesResource {}
49impl<R> DeleteWithIgnoreUnknownIds<IdentityList<R>> for TimeSeriesResource
50where
51    IdentityList<R>: Serialize,
52    R: Send + Sync,
53{
54}
55impl TimeSeriesResource {
56    /// Insert datapoints for a set of timeseries. Any existing datapoints with the
57    /// same timestamp will be overwritten.
58    ///
59    /// Note: datapoints are inserted using protobuf, this converts from a slightly more ergonomic type
60    /// to the protobuf types used directly in `insert_datapoints_proto`.
61    ///
62    /// For very performance intensive workloads, consider using `insert_datapoints_proto`
63    /// directly.
64    ///
65    /// # Arguments
66    ///
67    /// * `add_datapoints` - List of datapoint batches to insert.
68    pub async fn insert_datapoints(&self, add_datapoints: Vec<AddDatapoints>) -> Result<()> {
69        let request = DataPointInsertionRequest::from(add_datapoints);
70        self.insert_datapoints_proto(&request).await?;
71        Ok(())
72    }
73
74    /// Insert datapoints for a set of timeseries. Any existing datapoints with the
75    /// same timestamp will be overwritten.
76    ///
77    /// # Arguments
78    ///
79    /// * `add_datapoints` - Datapoint batches to insert.
80    pub async fn insert_datapoints_proto(
81        &self,
82        add_datapoints: &DataPointInsertionRequest,
83    ) -> Result<()> {
84        self.api_client
85            .post_protobuf::<::serde_json::Value, DataPointInsertionRequest>(
86                "timeseries/data",
87                add_datapoints,
88            )
89            .await?;
90        Ok(())
91    }
92
93    /// Insert datapoints for a set of time series, then create any missing time series.
94    ///
95    /// In order for this to work correctly, `generator` must return an iterator over time series
96    /// with the same length as the passed slice.
97    ///
98    /// # Arguments
99    ///
100    /// * `add_datapoints` - Datapoint batches to insert.
101    /// * `generator` - Method called to produce timeseries that does not exist.
102    ///
103    /// # Example
104    ///
105    /// ```ignore
106    /// client.time_series.insert_datapoints_proto_create_missing(
107    ///     &dps,
108    ///     |idts| idts.iter().map(|idt| AddTimeSeries {
109    ///         external_id: idt.as_external_id().unwrap(),
110    ///         ..Default::default()
111    ///     })
112    /// )
113    /// ```
114    pub async fn insert_datapoints_proto_create_missing<T: Iterator<Item = AddDmOrTimeSeries>>(
115        &self,
116        add_datapoints: &DataPointInsertionRequest,
117        generator: &impl Fn(&[IdentityOrInstance]) -> T,
118    ) -> Result<()> {
119        let result = self.insert_datapoints_proto(add_datapoints).await;
120        let missing = get_missing_from_result(&result);
121        let missing_idts = match missing {
122            Some(m) => m,
123            None => return result,
124        };
125        let (time_series, dm_time_series) =
126            generator(&missing_idts).fold((vec![], vec![]), |mut acc, v| {
127                match v {
128                    AddDmOrTimeSeries::TimeSeries(add_time_series) => acc.0.push(*add_time_series),
129                    AddDmOrTimeSeries::Cdm(cognite_timeseries) => acc.1.push(*cognite_timeseries),
130                }
131                acc
132            });
133        if !time_series.is_empty() {
134            let futures = time_series
135                .chunks(1000)
136                // Since we're discarding the output, don't collect it here.
137                .map(|c| self.create_ignore_duplicates(c).map(|r| r.map(|_| ())));
138            execute_with_parallelism(futures, 4).await?;
139        }
140        if !dm_time_series.is_empty() {
141            let instance_resource = Instances::new(self.api_client.clone());
142            let futures = dm_time_series.chunks(1000).map(|c| {
143                instance_resource
144                    .apply(c, None, None, None, None, false)
145                    .map(|r| r.map(|_| ()))
146            });
147            execute_with_parallelism(futures, 4).await?;
148        }
149
150        self.insert_datapoints_proto(add_datapoints).await
151    }
152
153    /// Insert datapoints for a set of time series, then create any missing time series.
154    ///
155    /// In order for this to work correctly, `generator` must return an iterator over time series
156    /// with the same length as the passed slice.
157    ///
158    /// # Arguments
159    ///
160    /// * `add_datapoints` - Datapoint batches to insert.
161    /// * `generator` - Method called to produce timeseries that does not exist.
162    ///
163    /// # Example
164    ///
165    /// ```ignore
166    /// client.time_series.insert_datapoints_create_missing(
167    ///     &dps,
168    ///     |idts| idts.iter().map(|idt| AddTimeSeries {
169    ///         external_id: idt.as_external_id().unwrap(),
170    ///         ..Default::default()
171    ///     })
172    /// )
173    /// ```
174    pub async fn insert_datapoints_create_missing<T: Iterator<Item = AddDmOrTimeSeries>>(
175        &self,
176        add_datapoints: Vec<AddDatapoints>,
177        generator: &impl Fn(&[IdentityOrInstance]) -> T,
178    ) -> Result<()> {
179        let request = DataPointInsertionRequest::from(add_datapoints);
180        self.insert_datapoints_proto_create_missing(&request, generator)
181            .await?;
182        Ok(())
183    }
184
185    /// Insert datapoints for a set of timeseries. If the request fails due to any
186    /// missing time series, remove them from the request and retry.
187    ///
188    /// # Arguments
189    ///
190    /// * `add_datapoints` - Datapoint batches to insert.
191    pub async fn insert_datapoints_proto_ignore_missing(
192        &self,
193        add_datapoints: &DataPointInsertionRequest,
194    ) -> Result<()> {
195        let result = self.insert_datapoints_proto(add_datapoints).await;
196        let missing = get_missing_from_result(&result);
197        let missing_idts = match missing {
198            Some(m) => m,
199            None => return result,
200        };
201        let idt_set = HashSet::<IdentityOrInstance>::from_iter(missing_idts.into_iter());
202
203        let mut items = vec![];
204        for elem in add_datapoints.items.iter() {
205            let idt = match &elem.time_series_reference {
206                Some(x) => IdentityOrInstance::from(x.clone()),
207                None => continue,
208            };
209            if !idt_set.contains(&idt) {
210                items.push(elem.clone());
211            }
212        }
213
214        if items.is_empty() {
215            return Ok(());
216        }
217
218        let next_request = DataPointInsertionRequest { items };
219        self.insert_datapoints_proto(&next_request).await
220    }
221
222    /// Insert datapoints for a set of timeseries. If the request fails due to any
223    /// missing time series, remove them from the request and retry.
224    ///
225    /// # Arguments
226    ///
227    /// * `add_datapoints` - Datapoint batches to insert.
228    pub async fn insert_datapoints_ignore_missing(
229        &self,
230        add_datapoints: Vec<AddDatapoints>,
231    ) -> Result<()> {
232        let request = DataPointInsertionRequest::from(add_datapoints);
233        self.insert_datapoints_proto_ignore_missing(&request)
234            .await?;
235        Ok(())
236    }
237
238    /// Retrieve datapoints for a collection of time series.
239    ///
240    /// Note: datapoints are inserted using protobuf, this converts to a slightly more ergonomic type
241    /// from the type returned by `retrieve_datapoints_proto`.
242    ///
243    /// For very performance intensive workloads, consider using `retrieve_datapoints_proto`
244    /// directly.
245    ///
246    /// # Arguments
247    ///
248    /// * `datapoints_filter` - Filter describing which datapoints to retrieve.
249    pub async fn retrieve_datapoints(
250        &self,
251        datapoints_filter: &DatapointsFilter,
252    ) -> Result<Vec<DatapointsResponse>> {
253        let datapoints_response = self.retrieve_datapoints_proto(datapoints_filter).await?;
254        Ok(DatapointsListResponse::from(datapoints_response).items)
255    }
256
257    /// Retrieve datapoints for a collection of time series.
258    ///
259    /// # Arguments
260    ///
261    /// * `datapoints_filter` - Filter describing which datapoints to retrieve.
262    pub async fn retrieve_datapoints_proto(
263        &self,
264        datapoints_filter: &DatapointsFilter,
265    ) -> Result<DataPointListResponse> {
266        let datapoints_response: DataPointListResponse = self
267            .api_client
268            .post_expect_protobuf("timeseries/data/list", &datapoints_filter)
269            .await?;
270        Ok(datapoints_response)
271    }
272
273    /// Retrieve the latest datapoint before a given time for a list of time series.
274    ///
275    /// # Arguments
276    ///
277    /// * `items` - Queries for latest datapoint.
278    /// * `ignore_unknown_ids` - Set this to `true` to ignore timeseries that do not exist.
279    pub async fn retrieve_latest_datapoints(
280        &self,
281        items: &[LatestDatapointsQuery],
282        ignore_unknown_ids: bool,
283    ) -> Result<Vec<LatestDatapointsResponse>> {
284        let query = Items::new_with_extra_fields(items, IgnoreUnknownIds { ignore_unknown_ids });
285        let datapoints_response: Items<Vec<LatestDatapointsResponse>> = self
286            .api_client
287            .post("timeseries/data/latest", &query)
288            .await?;
289        Ok(datapoints_response.items)
290    }
291
292    /// Delete ranges of datapoints for a list of time series.
293    ///
294    /// # Arguments
295    ///
296    /// * `query` - Ranges of datapoints to delete.
297    pub async fn delete_datapoints(&self, query: &[DeleteDatapointsQuery]) -> Result<()> {
298        let items = Items::new(query);
299        self.api_client
300            .post::<::serde_json::Value, _>("timeseries/data/delete", &items)
301            .await?;
302        Ok(())
303    }
304
305    /// Query synthetic time series. Synthetic time series lets you combine various input time series, constants,
306    /// and operators, to create completely new time series.
307    ///
308    /// See [synthetic timeseries](https://developer.cognite.com/dev/concepts/resource_types/synthetic_timeseries.html)
309    /// for more details.
310    ///
311    /// # Arguments
312    ///
313    /// * `query` - Synthetic datapoints queries.
314    pub async fn query_synthetic_timeseries(
315        &self,
316        query: &[SyntheticTimeSeriesQuery],
317    ) -> Result<Vec<SyntheticQueryResponse>> {
318        let res: ItemsVec<SyntheticQueryResponse> = self
319            .api_client
320            .post("timeseries/synthetic/query", &Items::new(query))
321            .await?;
322        Ok(res.items)
323    }
324
325    /// Stream datapoints for a list of timeseries. The datapoints are returned in ascending order,
326    /// but we do not guarantee anything on the order between timeseries.
327    ///
328    /// We batch for you, so the `items` array in `DatapointsFilter` can contain more than 100 entries,
329    /// but `batch_size` should not be set larger than 100.
330    ///
331    /// `parallelism` controls how many requests we have in-flight at any given time.
332    /// Avoid setting this too high, as it may lead to rate limiting, which will reduce the actual
333    /// throughput.
334    ///
335    /// # Arguments
336    ///
337    /// * `filter` - Filter describing common filter properties and a list of timeseries to retrieve data from.
338    /// * `options` - Options for controlling the stream.
339    pub fn stream_datapoints(
340        &self,
341        filter: DatapointsFilter,
342        options: DatapointsStreamOptions,
343    ) -> impl Stream<Item = Result<DataPointRef>> + '_ {
344        DatapointsStream::new(self, filter, options).stream_datapoints()
345    }
346
347    /// Stream datapoints for a list of timeseries. This returns raw batches of datapoints
348    /// as they arrive from CDF. Use [stream_datapoints](Self::stream_datapoints) if you want to
349    /// work with individual datapoints.
350    ///
351    /// We batch for you, so the `items` array in `DatapointsFilter` can contain more than 100 entries,
352    /// but `batch_size` should not be set larger than 100.
353    ///
354    /// `parallelism` controls how many requests we have in-flight at any given time.
355    /// Avoid setting this too high, as it may lead to rate limiting, which will reduce the actual
356    /// throughput.
357    ///
358    /// # Arguments
359    ///
360    /// * `filter` - Filter describing common filter properties and a list of timeseries to retrieve data from.
361    /// * `options` - Options for controlling the stream.
362    pub fn stream_datapoint_batches(
363        &self,
364        filter: DatapointsFilter,
365        options: DatapointsStreamOptions,
366    ) -> impl Stream<Item = Result<DataPointListResponse>> + '_ {
367        DatapointsStream::new(self, filter, options).stream_batches()
368    }
369}