1mod filter;
2#[allow(clippy::all)]
3#[allow(missing_docs)]
4#[path = "datapoint/generated/com.cognite.v1.timeseries.proto.rs"]
5mod proto;
6mod status_code;
7
8use std::convert::TryFrom;
9
10pub use self::filter::*;
11pub use self::proto::data_point_insertion_item::DatapointType as InsertDatapointType;
12pub use self::proto::data_point_insertion_item::TimeSeriesReference;
13pub use self::proto::data_point_list_item::DatapointType as ListDatapointType;
14pub use self::proto::*;
15pub use self::status_code::*;
16
17use serde::{de::Error, Deserialize, Serialize};
18use serde_json::Value;
19
20use crate::Identity;
21use crate::IdentityOrInstance;
22
23#[derive(Serialize, Debug, Clone)]
24#[serde(untagged)]
25pub enum DatapointsEnumType {
27 NumericDatapoints(Vec<DatapointDouble>),
29 StringDatapoints(Vec<DatapointString>),
31 AggregateDatapoints(Vec<DatapointAggregate>),
33}
34
35impl From<Vec<DatapointDouble>> for DatapointsEnumType {
36 fn from(value: Vec<DatapointDouble>) -> Self {
37 Self::NumericDatapoints(value)
38 }
39}
40
41impl From<Vec<DatapointString>> for DatapointsEnumType {
42 fn from(value: Vec<DatapointString>) -> Self {
43 Self::StringDatapoints(value)
44 }
45}
46
47impl From<Vec<DatapointAggregate>> for DatapointsEnumType {
48 fn from(value: Vec<DatapointAggregate>) -> Self {
49 Self::AggregateDatapoints(value)
50 }
51}
52
53impl DatapointsEnumType {
54 pub fn numeric(self) -> Option<Vec<DatapointDouble>> {
56 match self {
57 Self::NumericDatapoints(x) => Some(x),
58 _ => None,
59 }
60 }
61 pub fn string(self) -> Option<Vec<DatapointString>> {
63 match self {
64 Self::StringDatapoints(x) => Some(x),
65 _ => None,
66 }
67 }
68 pub fn aggregate(self) -> Option<Vec<DatapointAggregate>> {
70 match self {
71 Self::AggregateDatapoints(x) => Some(x),
72 _ => None,
73 }
74 }
75}
76
77impl From<Status> for StatusCode {
106 fn from(value: Status) -> Self {
107 if value.code != 0 {
108 StatusCode::try_from(value.code).unwrap_or(StatusCode::Invalid)
109 } else if !value.symbol.is_empty() {
110 StatusCode::try_parse(&value.symbol).unwrap_or(StatusCode::Invalid)
111 } else {
112 StatusCode::Good
113 }
114 }
115}
116
117impl From<StatusCode> for Status {
118 fn from(code: StatusCode) -> Status {
119 Status {
120 code: code.bits() as i64,
121 symbol: String::new(),
122 }
123 }
124}
125
126mod cdf_double_serde {
127 use core::f64;
128
129 use serde::{de::Visitor, Deserializer, Serializer};
130
131 pub fn deserialize<'de, D: Deserializer<'de>>(
132 deserializer: D,
133 ) -> Result<Option<f64>, D::Error> {
134 struct CdfDoubleVisitor;
135
136 impl Visitor<'_> for CdfDoubleVisitor {
137 type Value = Option<f64>;
138
139 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
140 write!(formatter, "double, null, Infinity, or NaN")
141 }
142
143 fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
144 where
145 E: serde::de::Error,
146 {
147 Ok(Some(v))
148 }
149
150 fn visit_none<E>(self) -> Result<Self::Value, E>
151 where
152 E: serde::de::Error,
153 {
154 Ok(None)
155 }
156
157 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
158 where
159 E: serde::de::Error,
160 {
161 match v {
162 "Infinity" => Ok(Some(f64::INFINITY)),
163 "-Infinity" => Ok(Some(f64::NEG_INFINITY)),
164 "NaN" => Ok(Some(f64::NAN)),
165 r => Err(E::custom(format!("Failed to parse double value from string. Got {r} expected Infinity, -Infinity, or NaN")))
166 }
167 }
168 }
169
170 deserializer.deserialize_any(CdfDoubleVisitor)
171 }
172
173 pub fn serialize<S: Serializer>(value: &Option<f64>, ser: S) -> Result<S::Ok, S::Error> {
174 match value {
175 None => ser.serialize_none(),
176 Some(r) if r.is_nan() => ser.serialize_str("NaN"),
177 Some(f64::INFINITY) => ser.serialize_str("Infinity"),
178 Some(f64::NEG_INFINITY) => ser.serialize_str("-Infinity"),
179 Some(r) => ser.serialize_f64(*r),
180 }
181 }
182}
183
184#[derive(Serialize, Deserialize, Debug, Clone)]
185#[serde(rename_all = "camelCase")]
186pub struct DatapointDouble {
188 pub timestamp: i64,
190 #[serde(with = "cdf_double_serde")]
192 pub value: Option<f64>,
193 pub status: Option<StatusCode>,
195}
196
197#[derive(Serialize, Deserialize, Debug, Clone)]
198#[serde(rename_all = "camelCase")]
199pub struct DatapointString {
201 pub timestamp: i64,
203 pub value: Option<String>,
205 pub status: Option<StatusCode>,
207}
208
209#[derive(Serialize, Deserialize, Debug, Clone)]
210#[serde(rename_all = "camelCase")]
211pub struct DatapointAggregate {
213 pub timestamp: i64,
215 pub average: f64,
217 pub max: f64,
219 pub min: f64,
221 pub count: f64,
223 pub sum: f64,
225 pub interpolation: f64,
227 pub step_interpolation: f64,
229 pub continuous_variance: f64,
231 pub discrete_variance: f64,
233 pub total_variation: f64,
235 pub count_good: f64,
238 pub count_uncertain: f64,
240 pub count_bad: f64,
243 pub duration_good: f64,
246 pub duration_uncertain: f64,
249 pub duration_bad: f64,
252}
253
254impl From<NumericDatapoint> for DatapointDouble {
255 fn from(dp: NumericDatapoint) -> DatapointDouble {
256 DatapointDouble {
257 timestamp: dp.timestamp,
258 value: if dp.null_value { None } else { Some(dp.value) },
259 status: dp.status.map(|s| s.into()),
260 }
261 }
262}
263
264impl From<DatapointDouble> for NumericDatapoint {
265 fn from(dp: DatapointDouble) -> NumericDatapoint {
266 NumericDatapoint {
267 timestamp: dp.timestamp,
268 null_value: dp.value.is_none(),
269 value: dp.value.unwrap_or_default(),
270 status: dp.status.map(|s| s.into()),
271 }
272 }
273}
274
275impl From<StringDatapoint> for DatapointString {
276 fn from(dp: StringDatapoint) -> DatapointString {
277 DatapointString {
278 timestamp: dp.timestamp,
279 value: if dp.null_value { None } else { Some(dp.value) },
280 status: dp.status.map(|s| s.into()),
281 }
282 }
283}
284
285impl From<DatapointString> for StringDatapoint {
286 fn from(dp: DatapointString) -> StringDatapoint {
287 StringDatapoint {
288 timestamp: dp.timestamp,
289 null_value: dp.value.is_none(),
290 value: dp.value.unwrap_or_default(),
291 status: dp.status.map(|s| s.into()),
292 }
293 }
294}
295
296impl From<AggregateDatapoint> for DatapointAggregate {
297 fn from(dp: AggregateDatapoint) -> DatapointAggregate {
298 DatapointAggregate {
299 timestamp: dp.timestamp,
300 average: dp.average,
301 max: dp.max,
302 min: dp.min,
303 count: dp.count,
304 sum: dp.sum,
305 interpolation: dp.interpolation,
306 step_interpolation: dp.step_interpolation,
307 continuous_variance: dp.continuous_variance,
308 discrete_variance: dp.discrete_variance,
309 total_variation: dp.total_variation,
310 count_good: dp.count_good,
311 count_uncertain: dp.count_uncertain,
312 count_bad: dp.count_bad,
313 duration_good: dp.duration_good,
314 duration_uncertain: dp.duration_uncertain,
315 duration_bad: dp.duration_bad,
316 }
317 }
318}
319
320#[derive(Debug)]
321pub struct DatapointsListResponse {
323 pub items: Vec<DatapointsResponse>,
325}
326
327#[derive(Debug)]
328pub struct DatapointsResponse {
330 pub id: i64,
332 pub external_id: Option<String>,
334 pub datapoints: DatapointsEnumType,
336 pub unit: Option<String>,
339 pub unit_external_id: Option<String>,
342 pub is_step: bool,
344 pub is_string: bool,
346 pub next_cursor: Option<String>,
350}
351
352#[derive(Debug)]
353pub enum LatestDatapoint {
355 Numeric(DatapointDouble),
357 String(DatapointString),
359}
360
361impl LatestDatapoint {
362 pub fn numeric(&self) -> Option<&DatapointDouble> {
364 match self {
365 Self::Numeric(d) => Some(d),
366 _ => None,
367 }
368 }
369
370 pub fn string(&self) -> Option<&DatapointString> {
372 match self {
373 Self::String(d) => Some(d),
374 _ => None,
375 }
376 }
377}
378
379#[derive(Debug)]
380pub struct LatestDatapointsResponse {
382 pub id: i64,
384 pub external_id: Option<String>,
386 pub datapoint: Option<LatestDatapoint>,
388 pub unit: Option<String>,
391 pub unit_external_id: Option<String>,
394 pub is_step: bool,
396 pub is_string: bool,
398 pub next_cursor: Option<String>,
402}
403
404#[derive(Deserialize)]
405#[serde(rename_all = "camelCase")]
406struct DatapointsResponsePartial {
407 id: i64,
408 external_id: Option<String>,
409 datapoints: Value,
410 unit: Option<String>,
411 unit_external_id: Option<String>,
412 #[serde(default)]
413 is_step: bool,
414 #[serde(default)]
415 is_string: bool,
416 next_cursor: Option<String>,
417}
418
419impl<'de> Deserialize<'de> for LatestDatapointsResponse {
420 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
421 where
422 D: serde::Deserializer<'de>,
423 {
424 let r = DatapointsResponsePartial::deserialize(deserializer)?;
425 let dps = r.datapoints;
426 let dps = if matches!(dps, Value::Null) {
427 None
428 } else if let Value::Array(v) = dps {
429 match v.into_iter().next() {
430 Some(v) => {
431 if r.is_string {
432 Some(LatestDatapoint::String(serde_json::from_value(v).map_err(
433 |e| {
434 D::Error::custom(format!(
435 "Failed to deserialize string datapoint: {e:?}"
436 ))
437 },
438 )?))
439 } else {
440 Some(LatestDatapoint::Numeric(
441 serde_json::from_value(v).map_err(|e| {
442 D::Error::custom(format!(
443 "Failed to deserialize numeric datapoint: {e:?}"
444 ))
445 })?,
446 ))
447 }
448 }
449 None => None,
450 }
451 } else {
452 None
453 };
454
455 Ok(Self {
456 id: r.id,
457 external_id: r.external_id,
458 datapoint: dps,
459 unit: r.unit,
460 unit_external_id: r.unit_external_id,
461 is_step: r.is_step,
462 is_string: r.is_string,
463 next_cursor: r.next_cursor,
464 })
465 }
466}
467
468#[derive(Debug, Clone)]
469pub struct AddDatapoints {
471 pub id: IdentityOrInstance,
473 pub datapoints: DatapointsEnumType,
475}
476
477impl AddDatapoints {
478 pub fn new(id: i64, datapoints: DatapointsEnumType) -> AddDatapoints {
485 AddDatapoints {
486 id: IdentityOrInstance::Identity(Identity::Id { id }),
487 datapoints,
488 }
489 }
490 pub fn new_external_id(external_id: &str, datapoints: DatapointsEnumType) -> AddDatapoints {
497 AddDatapoints {
498 id: IdentityOrInstance::Identity(Identity::ExternalId {
499 external_id: external_id.to_string(),
500 }),
501 datapoints,
502 }
503 }
504}
505
506impl From<Identity> for TimeSeriesReference {
507 fn from(idt: Identity) -> TimeSeriesReference {
508 match idt {
509 Identity::Id { id } => TimeSeriesReference::Id(id),
510 Identity::ExternalId {
511 external_id: ext_id,
512 } => TimeSeriesReference::ExternalId(ext_id),
513 }
514 }
515}
516
517impl From<IdentityOrInstance> for TimeSeriesReference {
518 fn from(idt: IdentityOrInstance) -> TimeSeriesReference {
519 match idt {
520 IdentityOrInstance::Identity(Identity::Id { id }) => TimeSeriesReference::Id(id),
521 IdentityOrInstance::Identity(Identity::ExternalId {
522 external_id: ext_id,
523 }) => TimeSeriesReference::ExternalId(ext_id),
524 IdentityOrInstance::InstanceId { instance_id } => {
525 TimeSeriesReference::InstanceId(instance_id.into())
526 }
527 }
528 }
529}
530
531impl TryFrom<TimeSeriesReference> for Identity {
532 type Error = ();
533
534 fn try_from(idt: TimeSeriesReference) -> Result<Identity, ()> {
535 match idt {
536 TimeSeriesReference::Id(id) => Ok(Identity::Id { id }),
537 TimeSeriesReference::ExternalId(ext_id) => Ok(Identity::ExternalId {
538 external_id: ext_id,
539 }),
540 TimeSeriesReference::InstanceId(_) => Err(()),
541 }
542 }
543}
544
545impl From<crate::dto::data_modeling::instances::InstanceId> for InstanceId {
546 fn from(value: crate::dto::data_modeling::instances::InstanceId) -> Self {
547 Self {
548 external_id: value.external_id,
549 space: value.space,
550 }
551 }
552}
553
554impl From<InstanceId> for crate::dto::data_modeling::instances::InstanceId {
555 fn from(value: InstanceId) -> Self {
556 Self {
557 external_id: value.external_id,
558 space: value.space,
559 }
560 }
561}
562
563impl From<TimeSeriesReference> for IdentityOrInstance {
564 fn from(value: TimeSeriesReference) -> Self {
565 match value {
566 TimeSeriesReference::Id(id) => IdentityOrInstance::Identity(Identity::Id { id }),
567 TimeSeriesReference::ExternalId(external_id) => {
568 IdentityOrInstance::Identity(Identity::ExternalId { external_id })
569 }
570 TimeSeriesReference::InstanceId(instance_id) => IdentityOrInstance::InstanceId {
571 instance_id: instance_id.into(),
572 },
573 }
574 }
575}
576
577impl From<DataPointListItem> for DatapointsResponse {
578 fn from(req: DataPointListItem) -> DatapointsResponse {
579 DatapointsResponse {
580 id: req.id,
581 external_id: if req.external_id.is_empty() {
582 None
583 } else {
584 Some(req.external_id)
585 },
586 unit: if req.unit.is_empty() {
587 None
588 } else {
589 Some(req.unit)
590 },
591 is_step: req.is_step,
592 is_string: req.is_string,
593 datapoints: match req.datapoint_type {
594 Some(dps) => match dps {
595 data_point_list_item::DatapointType::NumericDatapoints(num_dps) => {
596 DatapointsEnumType::NumericDatapoints(
597 num_dps
598 .datapoints
599 .into_iter()
600 .map(DatapointDouble::from)
601 .collect(),
602 )
603 }
604 data_point_list_item::DatapointType::StringDatapoints(str_dps) => {
605 DatapointsEnumType::StringDatapoints(
606 str_dps
607 .datapoints
608 .into_iter()
609 .map(DatapointString::from)
610 .collect(),
611 )
612 }
613 data_point_list_item::DatapointType::AggregateDatapoints(aggr_dps) => {
614 DatapointsEnumType::AggregateDatapoints(
615 aggr_dps
616 .datapoints
617 .into_iter()
618 .map(DatapointAggregate::from)
619 .collect(),
620 )
621 }
622 },
623 None => DatapointsEnumType::NumericDatapoints(Vec::<DatapointDouble>::new()),
624 },
625 unit_external_id: if req.unit_external_id.is_empty() {
626 None
627 } else {
628 Some(req.unit_external_id)
629 },
630 next_cursor: if req.next_cursor.is_empty() {
631 None
632 } else {
633 Some(req.next_cursor)
634 },
635 }
636 }
637}
638
639impl From<AddDatapoints> for DataPointInsertionItem {
640 fn from(req: AddDatapoints) -> DataPointInsertionItem {
641 DataPointInsertionItem {
642 time_series_reference: Some(TimeSeriesReference::from(req.id)),
643 datapoint_type: match req.datapoints {
644 DatapointsEnumType::NumericDatapoints(dps) => Some(
645 self::proto::data_point_insertion_item::DatapointType::NumericDatapoints(
646 NumericDatapoints {
647 datapoints: dps.into_iter().map(NumericDatapoint::from).collect(),
648 },
649 ),
650 ),
651 DatapointsEnumType::StringDatapoints(dps) => Some(
652 self::proto::data_point_insertion_item::DatapointType::StringDatapoints(
653 StringDatapoints {
654 datapoints: dps.into_iter().map(StringDatapoint::from).collect(),
655 },
656 ),
657 ),
658 DatapointsEnumType::AggregateDatapoints(_) => {
659 panic!("Cannot insert aggregate datapoints")
660 }
661 },
662 }
663 }
664}
665
666impl From<Vec<AddDatapoints>> for DataPointInsertionRequest {
667 fn from(items: Vec<AddDatapoints>) -> DataPointInsertionRequest {
668 DataPointInsertionRequest {
669 items: items
670 .into_iter()
671 .map(DataPointInsertionItem::from)
672 .collect(),
673 }
674 }
675}
676
677impl From<DataPointListResponse> for DatapointsListResponse {
678 fn from(resp: DataPointListResponse) -> DatapointsListResponse {
679 DatapointsListResponse {
680 items: resp
681 .items
682 .into_iter()
683 .map(DatapointsResponse::from)
684 .collect(),
685 }
686 }
687}