cognite/api/core/time_series.rs
1mod datapoints_stream;
2
3use std::collections::HashSet;
4use std::iter::FromIterator;
5
6use futures::FutureExt;
7use futures::Stream;
8use serde::Serialize;
9
10use crate::api::core::time_series::datapoints_stream::DatapointsStream;
11use crate::api::data_modeling::instances::Instances;
12use crate::api::resource::*;
13use crate::dto::core::datapoint::*;
14use crate::dto::core::time_series::*;
15use crate::error::Result;
16use crate::get_missing_from_result;
17use crate::utils::execute_with_parallelism;
18use crate::IdentityList;
19use crate::IdentityOrInstance;
20use crate::IdentityOrInstanceList;
21use crate::IgnoreUnknownIds;
22use crate::Items;
23use crate::ItemsVec;
24use crate::Patch;
25
26pub use datapoints_stream::{DataPointRef, DatapointsStreamOptions, EitherDataPoint};
27
28/// A time series consists of a sequence of data points connected to a single asset.
29/// For example, a water pump asset can have a temperature time series taht records a data point in
30/// units of °C every second.
31pub type TimeSeriesResource = Resource<TimeSeries>;
32
33impl WithBasePath for TimeSeriesResource {
34 const BASE_PATH: &'static str = "timeseries";
35}
36
37impl List<TimeSeriesQuery, TimeSeries> for TimeSeriesResource {}
38impl Create<AddTimeSeries, TimeSeries> for TimeSeriesResource {}
39impl FilterItems<TimeSeriesFilter, TimeSeries> for TimeSeriesResource {}
40impl FilterWithRequest<TimeSeriesFilterRequest, TimeSeries> for TimeSeriesResource {}
41impl SearchItems<'_, TimeSeriesFilter, TimeSeriesSearch, TimeSeries> for TimeSeriesResource {}
42impl<R> RetrieveWithIgnoreUnknownIds<IdentityOrInstanceList<R>, TimeSeries> for TimeSeriesResource
43where
44 IdentityOrInstanceList<R>: Serialize,
45 R: Send + Sync,
46{
47}
48impl Update<Patch<PatchTimeSeries>, TimeSeries> for TimeSeriesResource {}
49impl<R> DeleteWithIgnoreUnknownIds<IdentityList<R>> for TimeSeriesResource
50where
51 IdentityList<R>: Serialize,
52 R: Send + Sync,
53{
54}
55impl TimeSeriesResource {
56 /// Insert datapoints for a set of timeseries. Any existing datapoints with the
57 /// same timestamp will be overwritten.
58 ///
59 /// Note: datapoints are inserted using protobuf, this converts from a slightly more ergonomic type
60 /// to the protobuf types used directly in `insert_datapoints_proto`.
61 ///
62 /// For very performance intensive workloads, consider using `insert_datapoints_proto`
63 /// directly.
64 ///
65 /// # Arguments
66 ///
67 /// * `add_datapoints` - List of datapoint batches to insert.
68 pub async fn insert_datapoints(&self, add_datapoints: Vec<AddDatapoints>) -> Result<()> {
69 let request = DataPointInsertionRequest::from(add_datapoints);
70 self.insert_datapoints_proto(&request).await?;
71 Ok(())
72 }
73
74 /// Insert datapoints for a set of timeseries. Any existing datapoints with the
75 /// same timestamp will be overwritten.
76 ///
77 /// # Arguments
78 ///
79 /// * `add_datapoints` - Datapoint batches to insert.
80 pub async fn insert_datapoints_proto(
81 &self,
82 add_datapoints: &DataPointInsertionRequest,
83 ) -> Result<()> {
84 self.api_client
85 .post_protobuf::<::serde_json::Value, DataPointInsertionRequest>(
86 "timeseries/data",
87 add_datapoints,
88 )
89 .await?;
90 Ok(())
91 }
92
93 /// Insert datapoints for a set of time series, then create any missing time series.
94 ///
95 /// In order for this to work correctly, `generator` must return an iterator over time series
96 /// with the same length as the passed slice.
97 ///
98 /// # Arguments
99 ///
100 /// * `add_datapoints` - Datapoint batches to insert.
101 /// * `generator` - Method called to produce timeseries that does not exist.
102 ///
103 /// # Example
104 ///
105 /// ```ignore
106 /// client.time_series.insert_datapoints_proto_create_missing(
107 /// &dps,
108 /// |idts| idts.iter().map(|idt| AddTimeSeries {
109 /// external_id: idt.as_external_id().unwrap(),
110 /// ..Default::default()
111 /// })
112 /// )
113 /// ```
114 pub async fn insert_datapoints_proto_create_missing<T: Iterator<Item = AddDmOrTimeSeries>>(
115 &self,
116 add_datapoints: &DataPointInsertionRequest,
117 generator: &impl Fn(&[IdentityOrInstance]) -> T,
118 ) -> Result<()> {
119 let result = self.insert_datapoints_proto(add_datapoints).await;
120 let missing = get_missing_from_result(&result);
121 let missing_idts = match missing {
122 Some(m) => m,
123 None => return result,
124 };
125 let (time_series, dm_time_series) =
126 generator(&missing_idts).fold((vec![], vec![]), |mut acc, v| {
127 match v {
128 AddDmOrTimeSeries::TimeSeries(add_time_series) => acc.0.push(*add_time_series),
129 AddDmOrTimeSeries::Cdm(cognite_timeseries) => acc.1.push(*cognite_timeseries),
130 }
131 acc
132 });
133 if !time_series.is_empty() {
134 let futures = time_series
135 .chunks(1000)
136 // Since we're discarding the output, don't collect it here.
137 .map(|c| self.create_ignore_duplicates(c).map(|r| r.map(|_| ())));
138 execute_with_parallelism(futures, 4).await?;
139 }
140 if !dm_time_series.is_empty() {
141 let instance_resource = Instances::new(self.api_client.clone());
142 let futures = dm_time_series.chunks(1000).map(|c| {
143 instance_resource
144 .apply(c, None, None, None, None, false)
145 .map(|r| r.map(|_| ()))
146 });
147 execute_with_parallelism(futures, 4).await?;
148 }
149
150 self.insert_datapoints_proto(add_datapoints).await
151 }
152
153 /// Insert datapoints for a set of time series, then create any missing time series.
154 ///
155 /// In order for this to work correctly, `generator` must return an iterator over time series
156 /// with the same length as the passed slice.
157 ///
158 /// # Arguments
159 ///
160 /// * `add_datapoints` - Datapoint batches to insert.
161 /// * `generator` - Method called to produce timeseries that does not exist.
162 ///
163 /// # Example
164 ///
165 /// ```ignore
166 /// client.time_series.insert_datapoints_create_missing(
167 /// &dps,
168 /// |idts| idts.iter().map(|idt| AddTimeSeries {
169 /// external_id: idt.as_external_id().unwrap(),
170 /// ..Default::default()
171 /// })
172 /// )
173 /// ```
174 pub async fn insert_datapoints_create_missing<T: Iterator<Item = AddDmOrTimeSeries>>(
175 &self,
176 add_datapoints: Vec<AddDatapoints>,
177 generator: &impl Fn(&[IdentityOrInstance]) -> T,
178 ) -> Result<()> {
179 let request = DataPointInsertionRequest::from(add_datapoints);
180 self.insert_datapoints_proto_create_missing(&request, generator)
181 .await?;
182 Ok(())
183 }
184
185 /// Insert datapoints for a set of timeseries. If the request fails due to any
186 /// missing time series, remove them from the request and retry.
187 ///
188 /// # Arguments
189 ///
190 /// * `add_datapoints` - Datapoint batches to insert.
191 pub async fn insert_datapoints_proto_ignore_missing(
192 &self,
193 add_datapoints: &DataPointInsertionRequest,
194 ) -> Result<()> {
195 let result = self.insert_datapoints_proto(add_datapoints).await;
196 let missing = get_missing_from_result(&result);
197 let missing_idts = match missing {
198 Some(m) => m,
199 None => return result,
200 };
201 let idt_set = HashSet::<IdentityOrInstance>::from_iter(missing_idts.into_iter());
202
203 let mut items = vec![];
204 for elem in add_datapoints.items.iter() {
205 let idt = match &elem.time_series_reference {
206 Some(x) => IdentityOrInstance::from(x.clone()),
207 None => continue,
208 };
209 if !idt_set.contains(&idt) {
210 items.push(elem.clone());
211 }
212 }
213
214 if items.is_empty() {
215 return Ok(());
216 }
217
218 let next_request = DataPointInsertionRequest { items };
219 self.insert_datapoints_proto(&next_request).await
220 }
221
222 /// Insert datapoints for a set of timeseries. If the request fails due to any
223 /// missing time series, remove them from the request and retry.
224 ///
225 /// # Arguments
226 ///
227 /// * `add_datapoints` - Datapoint batches to insert.
228 pub async fn insert_datapoints_ignore_missing(
229 &self,
230 add_datapoints: Vec<AddDatapoints>,
231 ) -> Result<()> {
232 let request = DataPointInsertionRequest::from(add_datapoints);
233 self.insert_datapoints_proto_ignore_missing(&request)
234 .await?;
235 Ok(())
236 }
237
238 /// Retrieve datapoints for a collection of time series.
239 ///
240 /// Note: datapoints are inserted using protobuf, this converts to a slightly more ergonomic type
241 /// from the type returned by `retrieve_datapoints_proto`.
242 ///
243 /// For very performance intensive workloads, consider using `retrieve_datapoints_proto`
244 /// directly.
245 ///
246 /// # Arguments
247 ///
248 /// * `datapoints_filter` - Filter describing which datapoints to retrieve.
249 pub async fn retrieve_datapoints(
250 &self,
251 datapoints_filter: &DatapointsFilter,
252 ) -> Result<Vec<DatapointsResponse>> {
253 let datapoints_response = self.retrieve_datapoints_proto(datapoints_filter).await?;
254 Ok(DatapointsListResponse::from(datapoints_response).items)
255 }
256
257 /// Retrieve datapoints for a collection of time series.
258 ///
259 /// # Arguments
260 ///
261 /// * `datapoints_filter` - Filter describing which datapoints to retrieve.
262 pub async fn retrieve_datapoints_proto(
263 &self,
264 datapoints_filter: &DatapointsFilter,
265 ) -> Result<DataPointListResponse> {
266 let datapoints_response: DataPointListResponse = self
267 .api_client
268 .post_expect_protobuf("timeseries/data/list", &datapoints_filter)
269 .await?;
270 Ok(datapoints_response)
271 }
272
273 /// Retrieve the latest datapoint before a given time for a list of time series.
274 ///
275 /// # Arguments
276 ///
277 /// * `items` - Queries for latest datapoint.
278 /// * `ignore_unknown_ids` - Set this to `true` to ignore timeseries that do not exist.
279 pub async fn retrieve_latest_datapoints(
280 &self,
281 items: &[LatestDatapointsQuery],
282 ignore_unknown_ids: bool,
283 ) -> Result<Vec<LatestDatapointsResponse>> {
284 let query = Items::new_with_extra_fields(items, IgnoreUnknownIds { ignore_unknown_ids });
285 let datapoints_response: Items<Vec<LatestDatapointsResponse>> = self
286 .api_client
287 .post("timeseries/data/latest", &query)
288 .await?;
289 Ok(datapoints_response.items)
290 }
291
292 /// Delete ranges of datapoints for a list of time series.
293 ///
294 /// # Arguments
295 ///
296 /// * `query` - Ranges of datapoints to delete.
297 pub async fn delete_datapoints(&self, query: &[DeleteDatapointsQuery]) -> Result<()> {
298 let items = Items::new(query);
299 self.api_client
300 .post::<::serde_json::Value, _>("timeseries/data/delete", &items)
301 .await?;
302 Ok(())
303 }
304
305 /// Query synthetic time series. Synthetic time series lets you combine various input time series, constants,
306 /// and operators, to create completely new time series.
307 ///
308 /// See [synthetic timeseries](https://developer.cognite.com/dev/concepts/resource_types/synthetic_timeseries.html)
309 /// for more details.
310 ///
311 /// # Arguments
312 ///
313 /// * `query` - Synthetic datapoints queries.
314 pub async fn query_synthetic_timeseries(
315 &self,
316 query: &[SyntheticTimeSeriesQuery],
317 ) -> Result<Vec<SyntheticQueryResponse>> {
318 let res: ItemsVec<SyntheticQueryResponse> = self
319 .api_client
320 .post("timeseries/synthetic/query", &Items::new(query))
321 .await?;
322 Ok(res.items)
323 }
324
325 /// Stream datapoints for a list of timeseries. The datapoints are returned in ascending order,
326 /// but we do not guarantee anything on the order between timeseries.
327 ///
328 /// We batch for you, so the `items` array in `DatapointsFilter` can contain more than 100 entries,
329 /// but `batch_size` should not be set larger than 100.
330 ///
331 /// `parallelism` controls how many requests we have in-flight at any given time.
332 /// Avoid setting this too high, as it may lead to rate limiting, which will reduce the actual
333 /// throughput.
334 ///
335 /// # Arguments
336 ///
337 /// * `filter` - Filter describing common filter properties and a list of timeseries to retrieve data from.
338 /// * `options` - Options for controlling the stream.
339 pub fn stream_datapoints(
340 &self,
341 filter: DatapointsFilter,
342 options: DatapointsStreamOptions,
343 ) -> impl Stream<Item = Result<DataPointRef>> + '_ {
344 DatapointsStream::new(self, filter, options).stream_datapoints()
345 }
346
347 /// Stream datapoints for a list of timeseries. This returns raw batches of datapoints
348 /// as they arrive from CDF. Use [stream_datapoints](Self::stream_datapoints) if you want to
349 /// work with individual datapoints.
350 ///
351 /// We batch for you, so the `items` array in `DatapointsFilter` can contain more than 100 entries,
352 /// but `batch_size` should not be set larger than 100.
353 ///
354 /// `parallelism` controls how many requests we have in-flight at any given time.
355 /// Avoid setting this too high, as it may lead to rate limiting, which will reduce the actual
356 /// throughput.
357 ///
358 /// # Arguments
359 ///
360 /// * `filter` - Filter describing common filter properties and a list of timeseries to retrieve data from.
361 /// * `options` - Options for controlling the stream.
362 pub fn stream_datapoint_batches(
363 &self,
364 filter: DatapointsFilter,
365 options: DatapointsStreamOptions,
366 ) -> impl Stream<Item = Result<DataPointListResponse>> + '_ {
367 DatapointsStream::new(self, filter, options).stream_batches()
368 }
369}