cognite/api/core/time_series/
datapoints_stream.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    pin::Pin,
4    sync::Arc,
5};
6
7use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryStream};
8use pin_project::pin_project;
9
10use crate::{
11    time_series::{
12        DataPointListItem, DataPointListResponse, DatapointAggregate, DatapointDouble,
13        DatapointString, DatapointsFilter, DatapointsQuery, InstanceId, ListDatapointType,
14        TimeSeriesResource,
15    },
16    Identity, IdentityOrInstance,
17};
18
19/// A datapoint of either type.
20pub enum EitherDataPoint {
21    /// A numeric datapoint.
22    Numeric(DatapointDouble),
23    /// A string datapoint.
24    String(DatapointString),
25    /// An aggregate datapoint.
26    Aggregate(DatapointAggregate),
27}
28
29struct TimeSeriesRef {
30    id: i64,
31    external_id: Option<String>,
32    instance_id: Option<InstanceId>,
33    original_id: IdentityOrInstance,
34    is_string: bool,
35    is_step: bool,
36    unit: Option<String>,
37    unit_external_id: Option<String>,
38}
39
40/// A datapoint containing a reference to its timeseries metadata.
41/// Used in streaming responses to avoid cloning timeseries info for every datapoint.
42pub struct DataPointRef {
43    // This is an Arc to avoid cloning the timeseries information for every datapoint,
44    // which can be a considerable amount of data for larger requests.
45    timeseries: Arc<TimeSeriesRef>,
46    datapoint: EitherDataPoint,
47}
48
49impl DataPointRef {
50    /// Get the internal ID of the timeseries this datapoint belongs to.
51    pub fn id(&self) -> i64 {
52        self.timeseries.id
53    }
54
55    /// Get the external ID of the timeseries this datapoint belongs to, if it has one.
56    pub fn external_id(&self) -> Option<&str> {
57        self.timeseries.external_id.as_deref()
58    }
59
60    /// Get the data modelling instance ID of the timeseries this datapoint belongs to, if it has one.
61    pub fn instance_id(&self) -> Option<&InstanceId> {
62        self.timeseries.instance_id.as_ref()
63    }
64
65    /// Get the original ID used to identify the timeseries this datapoint belongs to in the request.
66    pub fn original_id(&self) -> &IdentityOrInstance {
67        &self.timeseries.original_id
68    }
69
70    /// Check if the timeseries this datapoint belongs to is of string type.
71    pub fn is_string(&self) -> bool {
72        self.timeseries.is_string
73    }
74
75    /// Check if the timeseries this datapoint belongs to is a step timeseries.
76    pub fn is_step(&self) -> bool {
77        self.timeseries.is_step
78    }
79
80    /// Get the unit of the timeseries this datapoint belongs to, if it has one.
81    pub fn unit(&self) -> Option<&str> {
82        self.timeseries.unit.as_deref()
83    }
84
85    /// Get the external ID of the unit of the timeseries this datapoint belongs to, if it has one.
86    pub fn unit_external_id(&self) -> Option<&str> {
87        self.timeseries.unit_external_id.as_deref()
88    }
89
90    /// Consume the reference and return the underlying datapoint, to avoid cloning.
91    pub fn into_datapoint(self) -> EitherDataPoint {
92        self.datapoint
93    }
94
95    /// Get a reference to the underlying datapoint.
96    pub fn datapoint(&self) -> &EitherDataPoint {
97        &self.datapoint
98    }
99
100    /// Get a reference to the underlying datapoint as numeric, if it is of that type.
101    pub fn as_numeric(&self) -> Option<&DatapointDouble> {
102        match &self.datapoint {
103            EitherDataPoint::Numeric(dp) => Some(dp),
104            _ => None,
105        }
106    }
107
108    /// Get a reference to the underlying datapoint as string, if it is of that type.
109    pub fn as_string(&self) -> Option<&DatapointString> {
110        match &self.datapoint {
111            EitherDataPoint::String(dp) => Some(dp),
112            _ => None,
113        }
114    }
115
116    /// Get a reference to the underlying datapoint as aggregate, if it is of that type.
117    pub fn as_aggregate(&self) -> Option<&DatapointAggregate> {
118        match &self.datapoint {
119            EitherDataPoint::Aggregate(dp) => Some(dp),
120            _ => None,
121        }
122    }
123}
124
125struct FetchResult {
126    query_items: Vec<DatapointsQuery>,
127    response: DataPointListResponse,
128}
129
130/// Options for configuring the behavior of a `DatapointsStream`.
131#[derive(Clone, Debug)]
132pub struct DatapointsStreamOptions {
133    /// The maximum number of timeseries to include in each request. Default is 100.
134    pub batch_size: usize,
135    /// The maximum number of requests to have in flight at any given time. Default is 4.
136    pub parallelism: usize,
137}
138
139impl Default for DatapointsStreamOptions {
140    fn default() -> Self {
141        Self {
142            batch_size: 100,
143            parallelism: 4,
144        }
145    }
146}
147
148pub(super) struct DatapointsStream<'a> {
149    timeseries: &'a TimeSeriesResource,
150    filter: DatapointsFilter,
151    queries: VecDeque<DatapointsQuery>,
152    options: DatapointsStreamOptions,
153    known_timeseries: HashMap<i64, Arc<TimeSeriesRef>>,
154    // Technically, if we had existential types, we could avoid the box here.
155    // In practice it really doesn't matter, the overhead of a network request is much larger
156    // than anything from boxing.
157    futures: FuturesUnordered<BoxFuture<'a, Result<FetchResult, crate::Error>>>,
158}
159
160impl<'a> DatapointsStream<'a> {
161    pub(super) fn new(
162        timeseries: &'a TimeSeriesResource,
163        mut filter: DatapointsFilter,
164        options: DatapointsStreamOptions,
165    ) -> Self {
166        Self {
167            timeseries,
168            queries: std::mem::take(&mut filter.items).into(),
169            filter,
170            options,
171            known_timeseries: HashMap::new(),
172            futures: FuturesUnordered::new(),
173        }
174    }
175
176    async fn fetch_batch(
177        timeseries: &'a TimeSeriesResource,
178        filter: DatapointsFilter,
179    ) -> Result<FetchResult, crate::Error> {
180        let response = timeseries.retrieve_datapoints_proto(&filter).await?;
181        Ok(FetchResult {
182            query_items: filter.items,
183            response,
184        })
185    }
186
187    fn update_known_timeseries_from_batch(
188        &mut self,
189        response: &DataPointListItem,
190        query: &DatapointsQuery,
191    ) {
192        // We've already seen this timeseries, nothing to do.
193        if self.known_timeseries.contains_key(&response.id) {
194            return;
195        }
196
197        self.known_timeseries.insert(
198            response.id,
199            Arc::new(TimeSeriesRef {
200                id: response.id,
201                external_id: if !response.external_id.is_empty() {
202                    Some(response.external_id.clone())
203                } else {
204                    None
205                },
206                instance_id: response.instance_id.clone(),
207                original_id: query.id.clone(),
208                is_string: response.is_string,
209                is_step: response.is_step,
210                unit: if !response.unit.is_empty() {
211                    Some(response.unit.clone())
212                } else {
213                    None
214                },
215                unit_external_id: if !response.unit_external_id.is_empty() {
216                    Some(response.unit_external_id.clone())
217                } else {
218                    None
219                },
220            }),
221        );
222    }
223
224    fn equals_identity(id: &IdentityOrInstance, response_item: &DataPointListItem) -> bool {
225        match id {
226            IdentityOrInstance::Identity(Identity::Id { id }) => response_item.id == *id,
227            IdentityOrInstance::Identity(Identity::ExternalId { external_id }) => {
228                response_item.external_id == *external_id
229            }
230            IdentityOrInstance::InstanceId { instance_id } => response_item
231                .instance_id
232                .as_ref()
233                .is_some_and(|i| i == instance_id),
234        }
235    }
236
237    async fn stream_batches_inner(
238        &mut self,
239        maintain_internal_state: bool,
240    ) -> Result<Option<DataPointListResponse>, crate::Error> {
241        // If there's room for more requests, spawn them immediately.
242        while self.futures.len() < self.options.parallelism && !self.queries.is_empty() {
243            let mut batch = Vec::with_capacity(self.options.batch_size.min(self.queries.len()));
244            while batch.len() < self.options.batch_size {
245                if let Some(query) = self.queries.pop_front() {
246                    batch.push(query);
247                } else {
248                    break;
249                }
250            }
251            let filter = DatapointsFilter {
252                items: batch,
253                ..self.filter.clone()
254            };
255            let timeseries = self.timeseries;
256            self.futures
257                .push(Box::pin(Self::fetch_batch(timeseries, filter)));
258        }
259
260        // Wait for the next request to complete.
261        let Some(result) = self.futures.next().await else {
262            // No more requests in flight, we're done.
263            return Ok(None);
264        };
265        let mut fetch_result = result?;
266        let mut query_iter = fetch_result.query_items.into_iter();
267
268        // Update queries from the result, then re-queue them.
269        for response_item in &mut fetch_result.response.items {
270            // Datapoints are returned in order, so we check if the next query has an ID matching the
271            // response item. If not, we keep going until we find it.
272            let Some(mut query) = query_iter.next() else {
273                return Err(crate::Error::Other(
274                    "Internal logic error: more response items than query items".to_string(),
275                ));
276            };
277
278            while !Self::equals_identity(&query.id, response_item) {
279                // This query had no datapoints in the response, so we don't need to do anything
280                // special with it. Just move on to the next one.
281                let Some(next_query) = query_iter.next() else {
282                    return Err(crate::Error::Other(
283                        "Internal logic error: response item does not match any query item"
284                            .to_string(),
285                    ));
286                };
287                query = next_query;
288            }
289
290            // If we're maintaining internal state, record information about
291            // the timeseries we've seen.
292            if maintain_internal_state {
293                self.update_known_timeseries_from_batch(response_item, &query);
294            }
295
296            if !response_item.next_cursor.is_empty() {
297                // Take the cursor. There's no reason to allow the caller to
298                // see it or use it.
299                query.cursor = Some(std::mem::take(&mut response_item.next_cursor));
300                self.queries.push_back(query);
301            }
302        }
303
304        Ok(Some(fetch_result.response))
305    }
306
307    pub fn stream_batches(
308        self,
309    ) -> impl Stream<Item = Result<DataPointListResponse, crate::Error>> + 'a {
310        futures::stream::try_unfold(self, move |mut state| async move {
311            Ok(state.stream_batches_inner(false).await?.map(|v| (v, state)))
312        })
313    }
314
315    pub fn stream_datapoints(self) -> impl Stream<Item = Result<DataPointRef, crate::Error>> + 'a {
316        FlatIterStream::new(futures::stream::try_unfold(
317            self,
318            move |mut state| async move {
319                let Some(batch) = state.stream_batches_inner(true).await? else {
320                    return Ok(None);
321                };
322                let mut res = Vec::new();
323
324                for item in batch.items {
325                    let timeseries = state
326                        .known_timeseries
327                        .get(&item.id)
328                        .ok_or_else(|| {
329                            crate::Error::Other(format!(
330                                "Internal logic error: timeseries with id {} not found in known_timeseries",
331                                item.id
332                            ))
333                        })?
334                        .clone();
335                    match item.datapoint_type {
336                        None => continue,
337                        Some(ListDatapointType::AggregateDatapoints(dps)) => {
338                            res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
339                                timeseries: timeseries.clone(),
340                                datapoint: EitherDataPoint::Aggregate(dp.into()),
341                            }));
342                        }
343                        Some(ListDatapointType::StringDatapoints(dps)) => {
344                            res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
345                                timeseries: timeseries.clone(),
346                                datapoint: EitherDataPoint::String(dp.into()),
347                            }));
348                        }
349                        Some(ListDatapointType::NumericDatapoints(dps)) => {
350                            res.extend(dps.datapoints.into_iter().map(move |dp| DataPointRef {
351                                timeseries: timeseries.clone(),
352                                datapoint: EitherDataPoint::Numeric(dp.into()),
353                            }));
354                        }
355                    }
356                }
357
358                Ok(Some((res.into_iter(), state)))
359            },
360        ))
361    }
362}
363
364#[pin_project]
365/// Simple stream adapter that flattens a stream of iterables into a stream of items.
366struct FlatIterStream<R>
367where
368    R: TryStream,
369    R::Ok: IntoIterator,
370{
371    #[pin]
372    inner: R,
373    current: Option<<R::Ok as IntoIterator>::IntoIter>,
374}
375
376impl<R> FlatIterStream<R>
377where
378    R: TryStream,
379    R::Ok: IntoIterator,
380{
381    fn new(stream: R) -> Self {
382        Self {
383            inner: stream,
384            current: None,
385        }
386    }
387}
388
389impl<R: TryStream> Stream for FlatIterStream<R>
390where
391    R: TryStream,
392    R::Ok: IntoIterator,
393{
394    type Item = Result<<R::Ok as IntoIterator>::Item, R::Error>;
395
396    fn poll_next(
397        self: Pin<&mut Self>,
398        cx: &mut std::task::Context<'_>,
399    ) -> std::task::Poll<Option<Self::Item>> {
400        let mut this = self.project();
401        loop {
402            if let Some(current) = this.current.as_mut() {
403                if let Some(item) = current.next() {
404                    return std::task::Poll::Ready(Some(Ok(item)));
405                } else {
406                    *this.current = None;
407                }
408            }
409            match this.inner.as_mut().try_poll_next(cx)? {
410                std::task::Poll::Ready(Some(next_iter)) => {
411                    *this.current = Some(next_iter.into_iter());
412                }
413                std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
414                std::task::Poll::Pending => return std::task::Poll::Pending,
415            }
416        }
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use crate::{
423        api::core::time_series::datapoints_stream::FlatIterStream, time_series::StatusCode,
424    };
425    #[test]
426    fn test_datapoint_ref() {
427        use super::*;
428        let ts_ref = Arc::new(TimeSeriesRef {
429            id: 42,
430            external_id: Some("ts1".to_string()),
431            instance_id: None,
432            original_id: IdentityOrInstance::from("ts1"),
433            is_string: false,
434            is_step: false,
435            unit: Some("°C".to_string()),
436            unit_external_id: None,
437        });
438        let dp = DataPointRef {
439            timeseries: ts_ref.clone(),
440            datapoint: EitherDataPoint::Numeric(DatapointDouble {
441                timestamp: 1625079600000,
442                value: Some(23.5),
443                status: Some(StatusCode::Good),
444            }),
445        };
446        assert_eq!(dp.id(), 42);
447        assert_eq!(dp.external_id(), Some("ts1"));
448        assert!(!dp.is_string());
449        assert_eq!(dp.unit(), Some("°C"));
450        if let EitherDataPoint::Numeric(n) = dp.datapoint() {
451            assert_eq!(n.value, Some(23.5));
452        } else {
453            panic!("Expected numeric datapoint");
454        }
455    }
456    #[test]
457    fn test_flat_iter_stream() {
458        use futures::stream;
459        use futures::StreamExt;
460        let s = stream::iter(vec![
461            Ok::<_, crate::Error>(vec![1, 2, 3]),
462            Ok(vec![4, 5]),
463            Ok(vec![6]),
464        ]);
465        let mut flat_stream = FlatIterStream::new(s);
466        let mut results = Vec::new();
467        while let Some(item) = futures::executor::block_on(flat_stream.next()) {
468            results.push(item.unwrap());
469        }
470        assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
471    }
472}