cognite/api/core/time_series/
datapoints_stream.rs1use std::{
2 collections::{HashMap, VecDeque},
3 pin::Pin,
4 sync::Arc,
5};
6
7use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryStream};
8use pin_project::pin_project;
9
10use crate::{
11 time_series::{
12 DataPointListItem, DataPointListResponse, DatapointAggregate, DatapointDouble,
13 DatapointString, DatapointsFilter, DatapointsQuery, InstanceId, ListDatapointType,
14 TimeSeriesResource,
15 },
16 Identity, IdentityOrInstance,
17};
18
19pub enum EitherDataPoint {
21 Numeric(DatapointDouble),
23 String(DatapointString),
25 Aggregate(DatapointAggregate),
27}
28
29struct TimeSeriesRef {
30 id: i64,
31 external_id: Option<String>,
32 instance_id: Option<InstanceId>,
33 original_id: IdentityOrInstance,
34 is_string: bool,
35 is_step: bool,
36 unit: Option<String>,
37 unit_external_id: Option<String>,
38}
39
40pub struct DataPointRef {
43 timeseries: Arc<TimeSeriesRef>,
46 datapoint: EitherDataPoint,
47}
48
49impl DataPointRef {
50 pub fn id(&self) -> i64 {
52 self.timeseries.id
53 }
54
55 pub fn external_id(&self) -> Option<&str> {
57 self.timeseries.external_id.as_deref()
58 }
59
60 pub fn instance_id(&self) -> Option<&InstanceId> {
62 self.timeseries.instance_id.as_ref()
63 }
64
65 pub fn original_id(&self) -> &IdentityOrInstance {
67 &self.timeseries.original_id
68 }
69
70 pub fn is_string(&self) -> bool {
72 self.timeseries.is_string
73 }
74
75 pub fn is_step(&self) -> bool {
77 self.timeseries.is_step
78 }
79
80 pub fn unit(&self) -> Option<&str> {
82 self.timeseries.unit.as_deref()
83 }
84
85 pub fn unit_external_id(&self) -> Option<&str> {
87 self.timeseries.unit_external_id.as_deref()
88 }
89
90 pub fn into_datapoint(self) -> EitherDataPoint {
92 self.datapoint
93 }
94
95 pub fn datapoint(&self) -> &EitherDataPoint {
97 &self.datapoint
98 }
99
100 pub fn as_numeric(&self) -> Option<&DatapointDouble> {
102 match &self.datapoint {
103 EitherDataPoint::Numeric(dp) => Some(dp),
104 _ => None,
105 }
106 }
107
108 pub fn as_string(&self) -> Option<&DatapointString> {
110 match &self.datapoint {
111 EitherDataPoint::String(dp) => Some(dp),
112 _ => None,
113 }
114 }
115
116 pub fn as_aggregate(&self) -> Option<&DatapointAggregate> {
118 match &self.datapoint {
119 EitherDataPoint::Aggregate(dp) => Some(dp),
120 _ => None,
121 }
122 }
123}
124
125struct FetchResult {
126 query_items: Vec<DatapointsQuery>,
127 response: DataPointListResponse,
128}
129
130#[derive(Clone, Debug)]
132pub struct DatapointsStreamOptions {
133 pub batch_size: usize,
135 pub parallelism: usize,
137}
138
139impl Default for DatapointsStreamOptions {
140 fn default() -> Self {
141 Self {
142 batch_size: 100,
143 parallelism: 4,
144 }
145 }
146}
147
148pub(super) struct DatapointsStream<'a> {
149 timeseries: &'a TimeSeriesResource,
150 filter: DatapointsFilter,
151 queries: VecDeque<DatapointsQuery>,
152 options: DatapointsStreamOptions,
153 known_timeseries: HashMap<i64, Arc<TimeSeriesRef>>,
154 futures: FuturesUnordered<BoxFuture<'a, Result<FetchResult, crate::Error>>>,
158}
159
160impl<'a> DatapointsStream<'a> {
161 pub(super) fn new(
162 timeseries: &'a TimeSeriesResource,
163 mut filter: DatapointsFilter,
164 options: DatapointsStreamOptions,
165 ) -> Self {
166 Self {
167 timeseries,
168 queries: std::mem::take(&mut filter.items).into(),
169 filter,
170 options,
171 known_timeseries: HashMap::new(),
172 futures: FuturesUnordered::new(),
173 }
174 }
175
176 async fn fetch_batch(
177 timeseries: &'a TimeSeriesResource,
178 filter: DatapointsFilter,
179 ) -> Result<FetchResult, crate::Error> {
180 let response = timeseries.retrieve_datapoints_proto(&filter).await?;
181 Ok(FetchResult {
182 query_items: filter.items,
183 response,
184 })
185 }
186
187 fn update_known_timeseries_from_batch(
188 &mut self,
189 response: &DataPointListItem,
190 query: &DatapointsQuery,
191 ) {
192 if self.known_timeseries.contains_key(&response.id) {
194 return;
195 }
196
197 self.known_timeseries.insert(
198 response.id,
199 Arc::new(TimeSeriesRef {
200 id: response.id,
201 external_id: if !response.external_id.is_empty() {
202 Some(response.external_id.clone())
203 } else {
204 None
205 },
206 instance_id: response.instance_id.clone(),
207 original_id: query.id.clone(),
208 is_string: response.is_string,
209 is_step: response.is_step,
210 unit: if !response.unit.is_empty() {
211 Some(response.unit.clone())
212 } else {
213 None
214 },
215 unit_external_id: if !response.unit_external_id.is_empty() {
216 Some(response.unit_external_id.clone())
217 } else {
218 None
219 },
220 }),
221 );
222 }
223
224 fn equals_identity(id: &IdentityOrInstance, response_item: &DataPointListItem) -> bool {
225 match id {
226 IdentityOrInstance::Identity(Identity::Id { id }) => response_item.id == *id,
227 IdentityOrInstance::Identity(Identity::ExternalId { external_id }) => {
228 response_item.external_id == *external_id
229 }
230 IdentityOrInstance::InstanceId { instance_id } => response_item
231 .instance_id
232 .as_ref()
233 .is_some_and(|i| i == instance_id),
234 }
235 }
236
237 async fn stream_batches_inner(
238 &mut self,
239 maintain_internal_state: bool,
240 ) -> Result<Option<DataPointListResponse>, crate::Error> {
241 while self.futures.len() < self.options.parallelism && !self.queries.is_empty() {
243 let mut batch = Vec::with_capacity(self.options.batch_size.min(self.queries.len()));
244 while batch.len() < self.options.batch_size {
245 if let Some(query) = self.queries.pop_front() {
246 batch.push(query);
247 } else {
248 break;
249 }
250 }
251 let filter = DatapointsFilter {
252 items: batch,
253 ..self.filter.clone()
254 };
255 let timeseries = self.timeseries;
256 self.futures
257 .push(Box::pin(Self::fetch_batch(timeseries, filter)));
258 }
259
260 let Some(result) = self.futures.next().await else {
262 return Ok(None);
264 };
265 let mut fetch_result = result?;
266 let mut query_iter = fetch_result.query_items.into_iter();
267
268 for response_item in &mut fetch_result.response.items {
270 let Some(mut query) = query_iter.next() else {
273 return Err(crate::Error::Other(
274 "Internal logic error: more response items than query items".to_string(),
275 ));
276 };
277
278 while !Self::equals_identity(&query.id, response_item) {
279 let Some(next_query) = query_iter.next() else {
282 return Err(crate::Error::Other(
283 "Internal logic error: response item does not match any query item"
284 .to_string(),
285 ));
286 };
287 query = next_query;
288 }
289
290 if maintain_internal_state {
293 self.update_known_timeseries_from_batch(response_item, &query);
294 }
295
296 if !response_item.next_cursor.is_empty() {
297 query.cursor = Some(std::mem::take(&mut response_item.next_cursor));
300 self.queries.push_back(query);
301 }
302 }
303
304 Ok(Some(fetch_result.response))
305 }
306
307 pub fn stream_batches(
308 self,
309 ) -> impl Stream<Item = Result<DataPointListResponse, crate::Error>> + 'a {
310 futures::stream::try_unfold(self, move |mut state| async move {
311 Ok(state.stream_batches_inner(false).await?.map(|v| (v, state)))
312 })
313 }
314
315 pub fn stream_datapoints(self) -> impl Stream<Item = Result<DataPointRef, crate::Error>> + 'a {
316 FlatIterStream::new(futures::stream::try_unfold(
317 self,
318 move |mut state| async move {
319 let Some(batch) = state.stream_batches_inner(true).await? else {
320 return Ok(None);
321 };
322 let mut res = Vec::new();
323
324 for item in batch.items {
325 let timeseries = state
326 .known_timeseries
327 .get(&item.id)
328 .ok_or_else(|| {
329 crate::Error::Other(format!(
330 "Internal logic error: timeseries with id {} not found in known_timeseries",
331 item.id
332 ))
333 })?
334 .clone();
335 match item.datapoint_type {
336 None => continue,
337 Some(ListDatapointType::AggregateDatapoints(dps)) => {
338 res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
339 timeseries: timeseries.clone(),
340 datapoint: EitherDataPoint::Aggregate(dp.into()),
341 }));
342 }
343 Some(ListDatapointType::StringDatapoints(dps)) => {
344 res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
345 timeseries: timeseries.clone(),
346 datapoint: EitherDataPoint::String(dp.into()),
347 }));
348 }
349 Some(ListDatapointType::NumericDatapoints(dps)) => {
350 res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
351 timeseries: timeseries.clone(),
352 datapoint: EitherDataPoint::Numeric(dp.into()),
353 }));
354 }
355 }
356 }
357
358 Ok(Some((res.into_iter(), state)))
359 },
360 ))
361 }
362}
363
364#[pin_project]
365struct FlatIterStream<R>
367where
368 R: TryStream,
369 R::Ok: IntoIterator,
370{
371 #[pin]
372 inner: R,
373 current: Option<<R::Ok as IntoIterator>::IntoIter>,
374}
375
376impl<R> FlatIterStream<R>
377where
378 R: TryStream,
379 R::Ok: IntoIterator,
380{
381 fn new(stream: R) -> Self {
382 Self {
383 inner: stream,
384 current: None,
385 }
386 }
387}
388
389impl<R: TryStream> Stream for FlatIterStream<R>
390where
391 R: TryStream,
392 R::Ok: IntoIterator,
393{
394 type Item = Result<<R::Ok as IntoIterator>::Item, R::Error>;
395
396 fn poll_next(
397 self: Pin<&mut Self>,
398 cx: &mut std::task::Context<'_>,
399 ) -> std::task::Poll<Option<Self::Item>> {
400 let mut this = self.project();
401 loop {
402 if let Some(current) = this.current.as_mut() {
403 if let Some(item) = current.next() {
404 return std::task::Poll::Ready(Some(Ok(item)));
405 } else {
406 *this.current = None;
407 }
408 }
409 match this.inner.as_mut().try_poll_next(cx)? {
410 std::task::Poll::Ready(Some(next_iter)) => {
411 *this.current = Some(next_iter.into_iter());
412 }
413 std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
414 std::task::Poll::Pending => return std::task::Poll::Pending,
415 }
416 }
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use crate::{
423 api::core::time_series::datapoints_stream::FlatIterStream, time_series::StatusCode,
424 };
425 #[test]
426 fn test_datapoint_ref() {
427 use super::*;
428 let ts_ref = Arc::new(TimeSeriesRef {
429 id: 42,
430 external_id: Some("ts1".to_string()),
431 instance_id: None,
432 original_id: IdentityOrInstance::from("ts1"),
433 is_string: false,
434 is_step: false,
435 unit: Some("°C".to_string()),
436 unit_external_id: None,
437 });
438 let dp = DataPointRef {
439 timeseries: ts_ref.clone(),
440 datapoint: EitherDataPoint::Numeric(DatapointDouble {
441 timestamp: 1625079600000,
442 value: Some(23.5),
443 status: Some(StatusCode::Good),
444 }),
445 };
446 assert_eq!(dp.id(), 42);
447 assert_eq!(dp.external_id(), Some("ts1"));
448 assert!(!dp.is_string());
449 assert_eq!(dp.unit(), Some("°C"));
450 if let EitherDataPoint::Numeric(n) = dp.datapoint() {
451 assert_eq!(n.value, Some(23.5));
452 } else {
453 panic!("Expected numeric datapoint");
454 }
455 }
456 #[test]
457 fn test_flat_iter_stream() {
458 use futures::stream;
459 use futures::StreamExt;
460 let s = stream::iter(vec![
461 Ok::<_, crate::Error>(vec![1, 2, 3]),
462 Ok(vec![4, 5]),
463 Ok(vec![6]),
464 ]);
465 let mut flat_stream = FlatIterStream::new(s);
466 let mut results = Vec::new();
467 while let Some(item) = futures::executor::block_on(flat_stream.next()) {
468 results.push(item.unwrap());
469 }
470 assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
471 }
472}