Skip to main content

astarte_interfaces/interface/datastream/
object.rs

1// This file is part of Astarte.
2//
3// Copyright 2025, 2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Datastream with object aggregation
20//!
21//! Data sent on an object interface is grouped and sent together in a single message.
22
23use std::{borrow::Cow, fmt::Display, str::FromStr};
24
25use cfg_if::cfg_if;
26use serde::Serialize;
27
28use crate::{
29    error::Error,
30    interface::{name::InterfaceName, version::InterfaceVersion, MappingVec, Retention, Schema},
31    mapping::{
32        datastream::object::DatastreamObjectMapping, path::MappingPath, InterfaceMapping,
33        MappingError,
34    },
35    schema::{Aggregation, InterfaceJson, InterfaceType, Mapping, Ownership, Reliability},
36};
37
38/// Error when parsing a [`DatastreamObject`]
39#[derive(Debug, thiserror::Error)]
40pub enum ObjectError {
41    /// Object has a different value for the specified mapping
42    #[error("object has a different {ctx} for the mapping {endpoint}")]
43    Mapping {
44        /// The value that is different
45        ctx: &'static str,
46        /// Endpoint of the mapping that is different.
47        endpoint: String,
48    },
49    /// Mapping endpoint differs from others
50    ///
51    /// It needs to have up to the latest level equal to the others endpoints.
52    ///
53    /// See [the Astarte documentation](https://docs.astarte-platform.org/astarte/latest/030-interface.html#endpoints-and-aggregation)
54    #[error("object has an inconsistent endpoint {endpoint}")]
55    Endpoint {
56        /// Endpoint that is inconsistent.
57        endpoint: String,
58    },
59}
60
61impl ObjectError {
62    fn mapping(ctx: &'static str, endpoint: impl AsRef<str>) -> Self {
63        Self::Mapping {
64            ctx,
65            endpoint: endpoint.as_ref().to_string(),
66        }
67    }
68}
69
70/// Interface of type datastream object.
71///
72/// For this interface all the mappings have the same prefix and configurations.
73#[derive(Debug, PartialEq, Eq, Clone)]
74pub struct DatastreamObject {
75    name: InterfaceName,
76    version: InterfaceVersion,
77    ownership: Ownership,
78    reliability: Reliability,
79    explicit_timestamp: bool,
80    retention: Retention,
81    mappings: MappingVec<DatastreamObjectMapping>,
82    #[cfg(feature = "server-fields")]
83    #[cfg_attr(docsrs, doc(cfg(feature = "server-fields")))]
84    database_retention: crate::interface::DatabaseRetention,
85    #[cfg(feature = "doc-fields")]
86    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
87    description: Option<String>,
88    #[cfg(feature = "doc-fields")]
89    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
90    doc: Option<String>,
91}
92
93impl DatastreamObject {
94    /// Return the reliability for the object.
95    #[must_use]
96    pub fn reliability(&self) -> Reliability {
97        self.reliability
98    }
99
100    /// Return true if the object requires an explicit timestamp.
101    ///
102    /// Otherwise the reception timestamp is used.
103    #[must_use]
104    pub fn explicit_timestamp(&self) -> bool {
105        self.explicit_timestamp
106    }
107
108    /// Returns the retention for the object.
109    #[must_use]
110    pub fn retention(&self) -> Retention {
111        self.retention
112    }
113    /// Returns the database retention for the object.
114    #[cfg(feature = "server-fields")]
115    #[cfg_attr(docsrs, doc(cfg(feature = "server-fields")))]
116    #[must_use]
117    pub fn database_retention(&self) -> crate::interface::DatabaseRetention {
118        self.database_retention
119    }
120
121    /// Check if the path if the correct one for this object interface.
122    #[must_use]
123    pub fn is_object_path(&self, path: &MappingPath<'_>) -> bool {
124        let Some(mapping) = self.mappings.iter().next() else {
125            unreachable!("objects must have at least one mapping")
126        };
127
128        mapping.is_object_path(path)
129    }
130
131    /// Get a mapping for in the object for the given field.
132    ///
133    /// The field is the last level of an endpoint.
134    #[must_use]
135    pub fn mapping(&self, path: &str) -> Option<&DatastreamObjectMapping> {
136        self.mappings
137            .iter()
138            .find(|mapping| mapping.eq_object_field(path))
139    }
140}
141
142impl Schema for DatastreamObject {
143    type Mapping = DatastreamObjectMapping;
144
145    fn name(&self) -> &str {
146        self.name.as_ref()
147    }
148
149    fn interface_name(&self) -> &InterfaceName {
150        &self.name
151    }
152
153    fn version_major(&self) -> i32 {
154        self.version.version_major()
155    }
156
157    fn version_minor(&self) -> i32 {
158        self.version.version_minor()
159    }
160
161    fn version(&self) -> InterfaceVersion {
162        self.version
163    }
164
165    fn interface_type(&self) -> InterfaceType {
166        InterfaceType::Datastream
167    }
168
169    fn ownership(&self) -> Ownership {
170        self.ownership
171    }
172
173    fn aggregation(&self) -> Aggregation {
174        Aggregation::Object
175    }
176
177    #[cfg(feature = "doc-fields")]
178    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
179    fn description(&self) -> Option<&str> {
180        self.description.as_deref()
181    }
182
183    #[cfg(feature = "doc-fields")]
184    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
185    fn doc(&self) -> Option<&str> {
186        self.doc.as_deref()
187    }
188
189    fn iter_mappings(&self) -> impl Iterator<Item = &Self::Mapping> {
190        self.mappings.iter()
191    }
192
193    fn mappings_len(&self) -> usize {
194        self.mappings.len()
195    }
196
197    fn iter_interface_mappings(&self) -> impl Iterator<Item = Mapping<Cow<'_, str>>> {
198        self.iter_mappings().map(|mapping| {
199            cfg_if! {
200                if #[cfg(feature = "doc-fields")] {
201                    let description = mapping.description().map(Cow::Borrowed);
202                    let doc = mapping.doc().map(Cow::Borrowed);
203                } else {
204                    let description = None;
205                    let doc = None;
206                }
207            }
208
209            cfg_if! {
210                if #[cfg(feature = "server-fields")] {
211                    let database_retention_policy = Some(self.database_retention.into());
212                    let database_retention_ttl = self.database_retention.as_ttl_secs();
213                } else {
214                    let database_retention_policy = None;
215                    let database_retention_ttl = None;
216                }
217            }
218
219            Mapping {
220                endpoint: mapping.endpoint().to_string().into(),
221                mapping_type: mapping.mapping_type(),
222                reliability: Some(self.reliability),
223                explicit_timestamp: Some(self.explicit_timestamp),
224                retention: Some(self.retention.into()),
225                expiry: self.retention.as_expiry_seconds(),
226                allow_unset: None,
227                required: None,
228                database_retention_policy,
229                database_retention_ttl,
230                description,
231                doc,
232            }
233        })
234    }
235}
236
237impl Display for DatastreamObject {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        write!(f, "{}:{}", self.name, self.version)
240    }
241}
242
243impl<T> TryFrom<InterfaceJson<T>> for DatastreamObject
244where
245    T: AsRef<str> + Into<String>,
246{
247    type Error = Error;
248
249    fn try_from(value: InterfaceJson<T>) -> Result<Self, Self::Error> {
250        if value.interface_type != InterfaceType::Datastream
251            || value.aggregation != Some(Aggregation::Object)
252        {
253            return Err(Error::InterfaceConversion {
254                exp_type: InterfaceType::Datastream,
255                exp_aggregation: Aggregation::Object,
256                got_type: value.interface_type,
257                got_aggregation: value.aggregation.unwrap_or_default(),
258            });
259        }
260
261        let name = InterfaceName::from_str_ref(value.interface_name)?;
262        let version = InterfaceVersion::try_new(value.version_major, value.version_minor)?;
263
264        // Check the compatibility
265        let first = value.mappings.first().ok_or(MappingError::Empty)?;
266
267        value
268            .mappings
269            .iter()
270            .skip(1)
271            .try_for_each(|mapping| are_mapping_compatible(first, mapping))?;
272
273        // Get the variables
274        let reliability = first.reliability.unwrap_or_default();
275        let explicit_timestamp = first.explicit_timestamp.unwrap_or_default();
276        let retention = first.retention_with_expiry()?;
277        #[cfg(feature = "server-fields")]
278        let database_retention = first.database_retention_with_ttl()?;
279
280        let mappings = value
281            .mappings
282            .into_iter()
283            .map(|mapping| DatastreamObjectMapping::try_from(mapping).map_err(Error::Mapping))
284            .collect::<Result<Vec<_>, Error>>()?;
285
286        let first = mappings.first().ok_or(MappingError::Empty)?;
287
288        mappings.iter().skip(1).try_for_each(|other| {
289            if !first.is_same_object(other) {
290                return Err(Error::Object(ObjectError::Endpoint {
291                    endpoint: other.endpoint().to_string(),
292                }));
293            }
294            Ok(())
295        })?;
296
297        let mappings = MappingVec::try_from(mappings)?;
298
299        Ok(Self {
300            name: name.into_string(),
301            version,
302            ownership: value.ownership,
303            reliability,
304            explicit_timestamp,
305            retention,
306            #[cfg(feature = "server-fields")]
307            database_retention,
308            mappings,
309            #[cfg(feature = "doc-fields")]
310            description: value.description.map(T::into),
311            #[cfg(feature = "doc-fields")]
312            doc: value.doc.map(T::into),
313        })
314    }
315}
316
317fn are_mapping_compatible<T>(a: &Mapping<T>, b: &Mapping<T>) -> Result<(), ObjectError>
318where
319    T: AsRef<str>,
320{
321    if a.reliability != b.reliability {
322        return Err(ObjectError::mapping("reliability", &b.endpoint));
323    }
324
325    if a.explicit_timestamp != b.explicit_timestamp {
326        return Err(ObjectError::mapping("explicit_timestamp", &b.endpoint));
327    }
328
329    if a.retention != b.retention {
330        return Err(ObjectError::mapping("retention", &b.endpoint));
331    }
332
333    if a.expiry != b.expiry {
334        return Err(ObjectError::mapping("expiry", &b.endpoint));
335    }
336
337    #[cfg(feature = "server-fields")]
338    {
339        if a.database_retention_policy != b.database_retention_policy {
340            return Err(ObjectError::mapping(
341                "database_retention_policy",
342                &b.endpoint,
343            ));
344        }
345
346        if a.database_retention_ttl != b.database_retention_ttl {
347            return Err(ObjectError::mapping("database_retention_ttl", &b.endpoint));
348        }
349    }
350
351    Ok(())
352}
353
354impl Serialize for DatastreamObject {
355    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
356    where
357        S: serde::Serializer,
358    {
359        InterfaceJson::from(self).serialize(serializer)
360    }
361}
362
363impl FromStr for DatastreamObject {
364    type Err = Error;
365
366    fn from_str(s: &str) -> Result<Self, Self::Err> {
367        let interface: InterfaceJson<Cow<str>> = serde_json::from_str(s)?;
368
369        Self::try_from(interface)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::time::Duration;
376
377    use pretty_assertions::assert_eq;
378
379    use crate::interface::tests::E2E_DEVICE_AGGREGATE;
380    use crate::schema::MappingType;
381    use crate::Endpoint;
382
383    use super::*;
384
385    #[test]
386    fn should_parse_str() {
387        let object = DatastreamObject::from_str(
388            r#"{
389                "interface_name": "com.example.Example",
390                "version_major": 0,
391                "version_minor": 1,
392                "type": "datastream",
393                "aggregation": "object",
394                "ownership": "server",
395                "description": "The description of the\tinterface",
396                "doc": "The documentation of the\tinterface",
397                "mappings": [{
398                    "endpoint": "/prefix/path",
399                    "type": "boolean",
400                    "reliability": "unique",
401                    "explicit_timestamp": true,
402                    "retention": "stored",
403                    "expiry": 30,
404                    "database_retention_policy": "use_ttl",
405                    "database_retention_ttl": 420,
406                    "required": true,
407                    "description": "The description of the\tmapping",
408                    "doc": "The documentation of the\tmapping"
409                }]
410            }"#,
411        )
412        .unwrap();
413
414        let exp_mapping = DatastreamObjectMapping {
415            endpoint: Endpoint::try_from("/prefix/path").unwrap(),
416            mapping_type: MappingType::Boolean,
417            required: true,
418            #[cfg(feature = "doc-fields")]
419            description: Some("The description of the\tmapping".to_string()),
420            #[cfg(feature = "doc-fields")]
421            doc: Some("The documentation of the\tmapping".to_string()),
422        };
423
424        let exp = DatastreamObject {
425            name: InterfaceName::try_from("com.example.Example".to_string()).unwrap(),
426            version: InterfaceVersion::try_new(0, 1).unwrap(),
427            ownership: Ownership::Server,
428            reliability: Reliability::Unique,
429            explicit_timestamp: true,
430            retention: Retention::Stored {
431                expiry: Some(Duration::from_secs(30)),
432            },
433            mappings: MappingVec::try_from(vec![exp_mapping.clone()]).unwrap(),
434            #[cfg(feature = "server-fields")]
435            database_retention: crate::interface::DatabaseRetention::UseTtl {
436                ttl: Duration::from_secs(420),
437            },
438            #[cfg(feature = "doc-fields")]
439            description: Some("The description of the\tinterface".to_string()),
440            #[cfg(feature = "doc-fields")]
441            doc: Some("The documentation of the\tinterface".to_string()),
442        };
443
444        assert_eq!(object, exp);
445
446        // Just for coverage
447        assert_eq!(object.name(), object.name.as_str());
448        assert_eq!(*object.interface_name(), object.name);
449        assert_eq!(object.version(), object.version);
450        assert_eq!(object.version_major(), object.version.version_major());
451        assert_eq!(object.version_minor(), object.version.version_minor());
452        assert_eq!(object.ownership(), object.ownership);
453        assert_eq!(object.retention(), object.retention);
454        assert_eq!(object.reliability(), object.reliability);
455        assert_eq!(object.explicit_timestamp(), object.explicit_timestamp);
456        assert_eq!(object.interface_type(), InterfaceType::Datastream);
457        assert_eq!(object.aggregation(), Aggregation::Object);
458        #[cfg(feature = "server-fields")]
459        assert_eq!(object.database_retention(), object.database_retention);
460        #[cfg(feature = "doc-fields")]
461        {
462            assert_eq!(object.doc(), object.doc.as_deref());
463            assert_eq!(object.description(), object.description.as_deref());
464        }
465
466        let path = MappingPath::try_from("/prefix").unwrap();
467        assert!(object.is_object_path(&path));
468
469        assert_eq!(*object.mapping("path").unwrap(), exp_mapping);
470
471        let mapping = object.iter_mappings().next().unwrap();
472        assert_eq!(*mapping, exp_mapping);
473
474        let exp_interface_mapping = Mapping::<Cow<'_, str>> {
475            endpoint: mapping.endpoint.to_string().into(),
476            mapping_type: mapping.mapping_type,
477            reliability: object.reliability.into(),
478            explicit_timestamp: Some(object.explicit_timestamp),
479            retention: Some(object.retention.into()),
480            expiry: object.retention.as_expiry_seconds(),
481            allow_unset: None,
482            required: None,
483            #[cfg(feature = "doc-fields")]
484            description: exp_mapping.description.as_ref().map(Cow::from),
485            #[cfg(feature = "doc-fields")]
486            doc: exp_mapping.doc.as_ref().map(Cow::from),
487            #[cfg(not(feature = "doc-fields"))]
488            description: None,
489            #[cfg(not(feature = "doc-fields"))]
490            doc: None,
491            #[cfg(feature = "server-fields")]
492            database_retention_policy: Some(object.database_retention.into()),
493            #[cfg(feature = "server-fields")]
494            database_retention_ttl: object.database_retention.as_ttl_secs(),
495            #[cfg(not(feature = "server-fields"))]
496            database_retention_policy: None,
497            #[cfg(not(feature = "server-fields"))]
498            database_retention_ttl: None,
499        };
500        assert_eq!(
501            object.iter_interface_mappings().next().unwrap(),
502            exp_interface_mapping
503        );
504
505        assert_eq!(object.to_string(), format!("{}:{}", exp.name, exp.version));
506    }
507
508    #[test]
509    fn should_maintain_mapping_order_serde() {
510        let original = DatastreamObject::from_str(E2E_DEVICE_AGGREGATE).unwrap();
511
512        let serialized = serde_json::to_string(&original).unwrap();
513        let deserialized = DatastreamObject::from_str(&serialized).unwrap();
514
515        assert_eq!(deserialized, original);
516    }
517
518    #[test]
519    fn should_check_same_object_endpoint() {
520        let err = DatastreamObject::from_str(
521            r#"{
522                "interface_name": "com.example.Example",
523                "version_major": 0,
524                "version_minor": 1,
525                "type": "datastream",
526                "ownership": "server",
527                "aggregation": "object",
528                "mappings": [{
529                    "endpoint": "/prefix/path",
530                    "type": "boolean"
531                },{
532                    "endpoint": "/wrong/path",
533                    "type": "boolean"
534                }]
535            }"#,
536        )
537        .unwrap_err();
538
539        assert!(
540            matches!(
541            &err,
542            Error::Object(ObjectError::Endpoint { endpoint } ) if endpoint == "/wrong/path"),
543            "{err:?}"
544        );
545    }
546}