cognite/dto/core/
datapoint.rs

1mod filter;
2#[allow(clippy::all)]
3#[allow(missing_docs)]
4#[path = "datapoint/generated/com.cognite.v1.timeseries.proto.rs"]
5mod proto;
6mod status_code;
7
8use std::convert::TryFrom;
9
10pub use self::filter::*;
11pub use self::proto::data_point_insertion_item::DatapointType as InsertDatapointType;
12pub use self::proto::data_point_insertion_item::TimeSeriesReference;
13pub use self::proto::data_point_list_item::DatapointType as ListDatapointType;
14pub use self::proto::*;
15pub use self::status_code::*;
16
17use serde::{de::Error, Deserialize, Serialize};
18use serde_json::Value;
19
20use crate::Identity;
21use crate::IdentityOrInstance;
22
23#[derive(Serialize, Debug, Clone)]
24#[serde(untagged)]
25/// Enumeration over different types of retrieved data points.
26pub enum DatapointsEnumType {
27    /// Datapoints with double precision floating point values.
28    NumericDatapoints(Vec<DatapointDouble>),
29    /// Datapoints with string values.
30    StringDatapoints(Vec<DatapointString>),
31    /// Aggregate data points.
32    AggregateDatapoints(Vec<DatapointAggregate>),
33}
34
35impl From<Vec<DatapointDouble>> for DatapointsEnumType {
36    fn from(value: Vec<DatapointDouble>) -> Self {
37        Self::NumericDatapoints(value)
38    }
39}
40
41impl From<Vec<DatapointString>> for DatapointsEnumType {
42    fn from(value: Vec<DatapointString>) -> Self {
43        Self::StringDatapoints(value)
44    }
45}
46
47impl From<Vec<DatapointAggregate>> for DatapointsEnumType {
48    fn from(value: Vec<DatapointAggregate>) -> Self {
49        Self::AggregateDatapoints(value)
50    }
51}
52
53impl DatapointsEnumType {
54    /// Get self as numeric datapoints, or none if a different type.
55    pub fn numeric(self) -> Option<Vec<DatapointDouble>> {
56        match self {
57            Self::NumericDatapoints(x) => Some(x),
58            _ => None,
59        }
60    }
61    /// Get self as string datapoints, or none if a different type.
62    pub fn string(self) -> Option<Vec<DatapointString>> {
63        match self {
64            Self::StringDatapoints(x) => Some(x),
65            _ => None,
66        }
67    }
68    /// Get self as aggregate datapoints, or none if a different type.
69    pub fn aggregate(self) -> Option<Vec<DatapointAggregate>> {
70        match self {
71            Self::AggregateDatapoints(x) => Some(x),
72            _ => None,
73        }
74    }
75}
76
77/* #[derive(Serialize, Deserialize, Debug, Clone)]
78#[serde(rename_all = "camelCase")]
79/// A data point status code.
80pub struct StatusCode {
81    /// Status code numeric representation.
82    pub code: Option<i64>,
83    /// Status code symbol.
84    pub symbol: Option<String>,
85}
86
87impl StatusCode {
88    /// Create a new status code from a given symbol.
89    pub fn new(symbol: impl Into<String>) -> Self {
90        Self {
91            symbol: Some(symbol.into()),
92            code: None,
93        }
94    }
95
96    /// Create a new status code from a numeric code.
97    pub fn new_code(code: i64) -> Self {
98        Self {
99            code: Some(code),
100            symbol: None,
101        }
102    }
103} */
104
105impl From<Status> for StatusCode {
106    fn from(value: Status) -> Self {
107        if value.code != 0 {
108            StatusCode::try_from(value.code).unwrap_or(StatusCode::Invalid)
109        } else if !value.symbol.is_empty() {
110            StatusCode::try_parse(&value.symbol).unwrap_or(StatusCode::Invalid)
111        } else {
112            StatusCode::Good
113        }
114    }
115}
116
117impl From<StatusCode> for Status {
118    fn from(code: StatusCode) -> Status {
119        Status {
120            code: code.bits() as i64,
121            symbol: String::new(),
122        }
123    }
124}
125
126mod cdf_double_serde {
127    use core::f64;
128
129    use serde::{de::Visitor, Deserializer, Serializer};
130
131    pub fn deserialize<'de, D: Deserializer<'de>>(
132        deserializer: D,
133    ) -> Result<Option<f64>, D::Error> {
134        struct CdfDoubleVisitor;
135
136        impl Visitor<'_> for CdfDoubleVisitor {
137            type Value = Option<f64>;
138
139            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
140                write!(formatter, "double, null, Infinity, or NaN")
141            }
142
143            fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
144            where
145                E: serde::de::Error,
146            {
147                Ok(Some(v))
148            }
149
150            fn visit_none<E>(self) -> Result<Self::Value, E>
151            where
152                E: serde::de::Error,
153            {
154                Ok(None)
155            }
156
157            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
158            where
159                E: serde::de::Error,
160            {
161                match v {
162                    "Infinity" => Ok(Some(f64::INFINITY)),
163                    "-Infinity" => Ok(Some(f64::NEG_INFINITY)),
164                    "NaN" => Ok(Some(f64::NAN)),
165                    r => Err(E::custom(format!("Failed to parse double value from string. Got {r} expected Infinity, -Infinity, or NaN")))
166                }
167            }
168        }
169
170        deserializer.deserialize_any(CdfDoubleVisitor)
171    }
172
173    pub fn serialize<S: Serializer>(value: &Option<f64>, ser: S) -> Result<S::Ok, S::Error> {
174        match value {
175            None => ser.serialize_none(),
176            Some(r) if r.is_nan() => ser.serialize_str("NaN"),
177            Some(f64::INFINITY) => ser.serialize_str("Infinity"),
178            Some(f64::NEG_INFINITY) => ser.serialize_str("-Infinity"),
179            Some(r) => ser.serialize_f64(*r),
180        }
181    }
182}
183
184#[derive(Serialize, Deserialize, Debug, Clone)]
185#[serde(rename_all = "camelCase")]
186/// A datapoint with double precision floating point value.
187pub struct DatapointDouble {
188    /// Timestamp in milliseconds since epoch.
189    pub timestamp: i64,
190    /// Datapoint value.
191    #[serde(with = "cdf_double_serde")]
192    pub value: Option<f64>,
193    /// Datapoint status code.
194    pub status: Option<StatusCode>,
195}
196
197#[derive(Serialize, Deserialize, Debug, Clone)]
198#[serde(rename_all = "camelCase")]
199/// A datapoint with string value.
200pub struct DatapointString {
201    /// Timestamp in milliseconds since epoch.
202    pub timestamp: i64,
203    /// Datapoint value.
204    pub value: Option<String>,
205    /// Datapoint status code.
206    pub status: Option<StatusCode>,
207}
208
209#[derive(Serialize, Deserialize, Debug, Clone)]
210#[serde(rename_all = "camelCase")]
211/// An aggregate data point.
212pub struct DatapointAggregate {
213    /// Timestamp in milliseconds since epoch.
214    pub timestamp: i64,
215    /// Average of values in aggregate.
216    pub average: f64,
217    /// Max value in aggregate.
218    pub max: f64,
219    /// Min value in aggregate.
220    pub min: f64,
221    /// Number of values in aggregate.
222    pub count: f64,
223    /// Sum of values in aggregate.
224    pub sum: f64,
225    /// Interpolated value.
226    pub interpolation: f64,
227    /// Step-interpolated value.
228    pub step_interpolation: f64,
229    /// The variance of the underlying function when assuming linear or step behavior between data points.
230    pub continuous_variance: f64,
231    /// The variance of the discrete set of data points, no weighting for density of points in time.
232    pub discrete_variance: f64,
233    /// The sum of absolute differences between neighboring data points in a period.
234    pub total_variation: f64,
235    /// The number of data points in the aggregate period that have a Good status code.
236    /// Uncertain does not count, irrespective of treatUncertainAsBad parameter.
237    pub count_good: f64,
238    /// The number of data points in the aggregate period that have an Uncertain status code.
239    pub count_uncertain: f64,
240    /// The number of data points in the aggregate period that have a Bad status code.
241    /// Uncertain does not count, irrespective of treatUncertainAsBad parameter.
242    pub count_bad: f64,
243    /// The duration the aggregate is defined and marked as good (regardless of ignoreBadDataPoints parameter).
244    /// Measured in milliseconds. Equivalent to duration that the previous data point is good and in range.
245    pub duration_good: f64,
246    /// The duration the aggregate is defined and marked as uncertain (regardless of ignoreBadDataPoints parameter).
247    /// Measured in milliseconds. Equivalent to duration that the previous data point is uncertain and in range.
248    pub duration_uncertain: f64,
249    /// The duration the aggregate is defined but marked as bad (regardless of ignoreBadDataPoints parameter).
250    /// Measured in milliseconds. Equivalent to duration that the previous data point is bad and in range.
251    pub duration_bad: f64,
252}
253
254impl From<NumericDatapoint> for DatapointDouble {
255    fn from(dp: NumericDatapoint) -> DatapointDouble {
256        DatapointDouble {
257            timestamp: dp.timestamp,
258            value: if dp.null_value { None } else { Some(dp.value) },
259            status: dp.status.map(|s| s.into()),
260        }
261    }
262}
263
264impl From<DatapointDouble> for NumericDatapoint {
265    fn from(dp: DatapointDouble) -> NumericDatapoint {
266        NumericDatapoint {
267            timestamp: dp.timestamp,
268            null_value: dp.value.is_none(),
269            value: dp.value.unwrap_or_default(),
270            status: dp.status.map(|s| s.into()),
271        }
272    }
273}
274
275impl From<StringDatapoint> for DatapointString {
276    fn from(dp: StringDatapoint) -> DatapointString {
277        DatapointString {
278            timestamp: dp.timestamp,
279            value: if dp.null_value { None } else { Some(dp.value) },
280            status: dp.status.map(|s| s.into()),
281        }
282    }
283}
284
285impl From<DatapointString> for StringDatapoint {
286    fn from(dp: DatapointString) -> StringDatapoint {
287        StringDatapoint {
288            timestamp: dp.timestamp,
289            null_value: dp.value.is_none(),
290            value: dp.value.unwrap_or_default(),
291            status: dp.status.map(|s| s.into()),
292        }
293    }
294}
295
296impl From<AggregateDatapoint> for DatapointAggregate {
297    fn from(dp: AggregateDatapoint) -> DatapointAggregate {
298        DatapointAggregate {
299            timestamp: dp.timestamp,
300            average: dp.average,
301            max: dp.max,
302            min: dp.min,
303            count: dp.count,
304            sum: dp.sum,
305            interpolation: dp.interpolation,
306            step_interpolation: dp.step_interpolation,
307            continuous_variance: dp.continuous_variance,
308            discrete_variance: dp.discrete_variance,
309            total_variation: dp.total_variation,
310            count_good: dp.count_good,
311            count_uncertain: dp.count_uncertain,
312            count_bad: dp.count_bad,
313            duration_good: dp.duration_good,
314            duration_uncertain: dp.duration_uncertain,
315            duration_bad: dp.duration_bad,
316        }
317    }
318}
319
320#[derive(Debug)]
321/// Response to a request for datapoints.
322pub struct DatapointsListResponse {
323    /// List of datapoint responses.
324    pub items: Vec<DatapointsResponse>,
325}
326
327#[derive(Debug)]
328/// Response for a single timeseries when listing datapoints.
329pub struct DatapointsResponse {
330    /// Time series internal ID.
331    pub id: i64,
332    /// Time series external ID.
333    pub external_id: Option<String>,
334    /// Retrieved datapoints.
335    pub datapoints: DatapointsEnumType,
336    /// The physical unit of the time series (free-text field).
337    /// Omitted if data points were converted to a different unit.
338    pub unit: Option<String>,
339    /// The physical unit of the time series as represented in the unit catalog.
340    /// Replaced with target unit if data points were converted.
341    pub unit_external_id: Option<String>,
342    /// Time series `is_step` property value.
343    pub is_step: bool,
344    /// Whether this is a string time series.
345    pub is_string: bool,
346    /// The cursor to get the next page of results (if available).
347    /// nextCursor will be omitted when the next aggregate datapoint
348    /// is after the end of the interval. Increase start/end to fetch more data.
349    pub next_cursor: Option<String>,
350}
351
352#[derive(Debug)]
353/// Result for retrieving a latest datapoint from CDF.
354pub enum LatestDatapoint {
355    /// Numeric datapoint.
356    Numeric(DatapointDouble),
357    /// String datapoint.
358    String(DatapointString),
359}
360
361impl LatestDatapoint {
362    /// Get the value of this as a numeric datapoint.
363    pub fn numeric(&self) -> Option<&DatapointDouble> {
364        match self {
365            Self::Numeric(d) => Some(d),
366            _ => None,
367        }
368    }
369
370    /// Get the value of this as a string datapoint.
371    pub fn string(&self) -> Option<&DatapointString> {
372        match self {
373            Self::String(d) => Some(d),
374            _ => None,
375        }
376    }
377}
378
379#[derive(Debug)]
380/// Response to a request retrieving latest datapoints for a single time series.
381pub struct LatestDatapointsResponse {
382    /// Time series internal ID.
383    pub id: i64,
384    /// Time series external ID.
385    pub external_id: Option<String>,
386    /// Retrieved datapoints.
387    pub datapoint: Option<LatestDatapoint>,
388    /// The physical unit of the time series (free-text field).
389    /// Omitted if data points were converted to a different unit.
390    pub unit: Option<String>,
391    /// The physical unit of the time series as represented in the unit catalog.
392    /// Replaced with target unit if data points were converted.
393    pub unit_external_id: Option<String>,
394    /// Time series `is_step` property value.
395    pub is_step: bool,
396    /// Whether this is a string time series.
397    pub is_string: bool,
398    /// The cursor to get the next page of results (if available).
399    /// nextCursor will be omitted when the next aggregate datapoint
400    /// is after the end of the interval. Increase start/end to fetch more data.
401    pub next_cursor: Option<String>,
402}
403
404#[derive(Deserialize)]
405#[serde(rename_all = "camelCase")]
406struct DatapointsResponsePartial {
407    id: i64,
408    external_id: Option<String>,
409    datapoints: Value,
410    unit: Option<String>,
411    unit_external_id: Option<String>,
412    #[serde(default)]
413    is_step: bool,
414    #[serde(default)]
415    is_string: bool,
416    next_cursor: Option<String>,
417}
418
419impl<'de> Deserialize<'de> for LatestDatapointsResponse {
420    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
421    where
422        D: serde::Deserializer<'de>,
423    {
424        let r = DatapointsResponsePartial::deserialize(deserializer)?;
425        let dps = r.datapoints;
426        let dps = if matches!(dps, Value::Null) {
427            None
428        } else if let Value::Array(v) = dps {
429            match v.into_iter().next() {
430                Some(v) => {
431                    if r.is_string {
432                        Some(LatestDatapoint::String(serde_json::from_value(v).map_err(
433                            |e| {
434                                D::Error::custom(format!(
435                                    "Failed to deserialize string datapoint: {e:?}"
436                                ))
437                            },
438                        )?))
439                    } else {
440                        Some(LatestDatapoint::Numeric(
441                            serde_json::from_value(v).map_err(|e| {
442                                D::Error::custom(format!(
443                                    "Failed to deserialize numeric datapoint: {e:?}"
444                                ))
445                            })?,
446                        ))
447                    }
448                }
449                None => None,
450            }
451        } else {
452            None
453        };
454
455        Ok(Self {
456            id: r.id,
457            external_id: r.external_id,
458            datapoint: dps,
459            unit: r.unit,
460            unit_external_id: r.unit_external_id,
461            is_step: r.is_step,
462            is_string: r.is_string,
463            next_cursor: r.next_cursor,
464        })
465    }
466}
467
468#[derive(Debug, Clone)]
469/// Add datapoints to a time series.
470pub struct AddDatapoints {
471    /// ID or external ID of time series to insert into.
472    pub id: IdentityOrInstance,
473    /// Data points to insert.
474    pub datapoints: DatapointsEnumType,
475}
476
477impl AddDatapoints {
478    /// Create a new batch of data points to insert.
479    ///
480    /// # Arguments
481    ///
482    /// * `id` - Internal ID of time series to insert into.
483    /// * `datapoints` - Datapoints to insert.
484    pub fn new(id: i64, datapoints: DatapointsEnumType) -> AddDatapoints {
485        AddDatapoints {
486            id: IdentityOrInstance::Identity(Identity::Id { id }),
487            datapoints,
488        }
489    }
490    /// Create a new batch of data points to insert.
491    ///
492    /// # Arguments
493    ///
494    /// * `external_id` - External ID of time series to insert into.
495    /// * `datapoints` - Datapoints to insert.
496    pub fn new_external_id(external_id: &str, datapoints: DatapointsEnumType) -> AddDatapoints {
497        AddDatapoints {
498            id: IdentityOrInstance::Identity(Identity::ExternalId {
499                external_id: external_id.to_string(),
500            }),
501            datapoints,
502        }
503    }
504}
505
506impl From<Identity> for TimeSeriesReference {
507    fn from(idt: Identity) -> TimeSeriesReference {
508        match idt {
509            Identity::Id { id } => TimeSeriesReference::Id(id),
510            Identity::ExternalId {
511                external_id: ext_id,
512            } => TimeSeriesReference::ExternalId(ext_id),
513        }
514    }
515}
516
517impl From<IdentityOrInstance> for TimeSeriesReference {
518    fn from(idt: IdentityOrInstance) -> TimeSeriesReference {
519        match idt {
520            IdentityOrInstance::Identity(Identity::Id { id }) => TimeSeriesReference::Id(id),
521            IdentityOrInstance::Identity(Identity::ExternalId {
522                external_id: ext_id,
523            }) => TimeSeriesReference::ExternalId(ext_id),
524            IdentityOrInstance::InstanceId { instance_id } => {
525                TimeSeriesReference::InstanceId(instance_id.into())
526            }
527        }
528    }
529}
530
531impl TryFrom<TimeSeriesReference> for Identity {
532    type Error = ();
533
534    fn try_from(idt: TimeSeriesReference) -> Result<Identity, ()> {
535        match idt {
536            TimeSeriesReference::Id(id) => Ok(Identity::Id { id }),
537            TimeSeriesReference::ExternalId(ext_id) => Ok(Identity::ExternalId {
538                external_id: ext_id,
539            }),
540            TimeSeriesReference::InstanceId(_) => Err(()),
541        }
542    }
543}
544
545impl From<crate::dto::data_modeling::instances::InstanceId> for InstanceId {
546    fn from(value: crate::dto::data_modeling::instances::InstanceId) -> Self {
547        Self {
548            external_id: value.external_id,
549            space: value.space,
550        }
551    }
552}
553
554impl From<InstanceId> for crate::dto::data_modeling::instances::InstanceId {
555    fn from(value: InstanceId) -> Self {
556        Self {
557            external_id: value.external_id,
558            space: value.space,
559        }
560    }
561}
562
563impl From<TimeSeriesReference> for IdentityOrInstance {
564    fn from(value: TimeSeriesReference) -> Self {
565        match value {
566            TimeSeriesReference::Id(id) => IdentityOrInstance::Identity(Identity::Id { id }),
567            TimeSeriesReference::ExternalId(external_id) => {
568                IdentityOrInstance::Identity(Identity::ExternalId { external_id })
569            }
570            TimeSeriesReference::InstanceId(instance_id) => IdentityOrInstance::InstanceId {
571                instance_id: instance_id.into(),
572            },
573        }
574    }
575}
576
577impl From<DataPointListItem> for DatapointsResponse {
578    fn from(req: DataPointListItem) -> DatapointsResponse {
579        DatapointsResponse {
580            id: req.id,
581            external_id: if req.external_id.is_empty() {
582                None
583            } else {
584                Some(req.external_id)
585            },
586            unit: if req.unit.is_empty() {
587                None
588            } else {
589                Some(req.unit)
590            },
591            is_step: req.is_step,
592            is_string: req.is_string,
593            datapoints: match req.datapoint_type {
594                Some(dps) => match dps {
595                    data_point_list_item::DatapointType::NumericDatapoints(num_dps) => {
596                        DatapointsEnumType::NumericDatapoints(
597                            num_dps
598                                .datapoints
599                                .into_iter()
600                                .map(DatapointDouble::from)
601                                .collect(),
602                        )
603                    }
604                    data_point_list_item::DatapointType::StringDatapoints(str_dps) => {
605                        DatapointsEnumType::StringDatapoints(
606                            str_dps
607                                .datapoints
608                                .into_iter()
609                                .map(DatapointString::from)
610                                .collect(),
611                        )
612                    }
613                    data_point_list_item::DatapointType::AggregateDatapoints(aggr_dps) => {
614                        DatapointsEnumType::AggregateDatapoints(
615                            aggr_dps
616                                .datapoints
617                                .into_iter()
618                                .map(DatapointAggregate::from)
619                                .collect(),
620                        )
621                    }
622                },
623                None => DatapointsEnumType::NumericDatapoints(Vec::<DatapointDouble>::new()),
624            },
625            unit_external_id: if req.unit_external_id.is_empty() {
626                None
627            } else {
628                Some(req.unit_external_id)
629            },
630            next_cursor: if req.next_cursor.is_empty() {
631                None
632            } else {
633                Some(req.next_cursor)
634            },
635        }
636    }
637}
638
639impl From<AddDatapoints> for DataPointInsertionItem {
640    fn from(req: AddDatapoints) -> DataPointInsertionItem {
641        DataPointInsertionItem {
642            time_series_reference: Some(TimeSeriesReference::from(req.id)),
643            datapoint_type: match req.datapoints {
644                DatapointsEnumType::NumericDatapoints(dps) => Some(
645                    self::proto::data_point_insertion_item::DatapointType::NumericDatapoints(
646                        NumericDatapoints {
647                            datapoints: dps.into_iter().map(NumericDatapoint::from).collect(),
648                        },
649                    ),
650                ),
651                DatapointsEnumType::StringDatapoints(dps) => Some(
652                    self::proto::data_point_insertion_item::DatapointType::StringDatapoints(
653                        StringDatapoints {
654                            datapoints: dps.into_iter().map(StringDatapoint::from).collect(),
655                        },
656                    ),
657                ),
658                DatapointsEnumType::AggregateDatapoints(_) => {
659                    panic!("Cannot insert aggregate datapoints")
660                }
661            },
662        }
663    }
664}
665
666impl From<Vec<AddDatapoints>> for DataPointInsertionRequest {
667    fn from(items: Vec<AddDatapoints>) -> DataPointInsertionRequest {
668        DataPointInsertionRequest {
669            items: items
670                .into_iter()
671                .map(DataPointInsertionItem::from)
672                .collect(),
673        }
674    }
675}
676
677impl From<DataPointListResponse> for DatapointsListResponse {
678    fn from(resp: DataPointListResponse) -> DatapointsListResponse {
679        DatapointsListResponse {
680            items: resp
681                .items
682                .into_iter()
683                .map(DatapointsResponse::from)
684                .collect(),
685        }
686    }
687}