Skip to main content

astarte_interfaces/mapping/datastream/
individual.rs

1// This file is part of Astarte.
2//
3// Copyright 2025, 2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Mapping for Datastream with aggregation individual.
20//!
21//! In case aggregation is individual, each mapping is treated as an independent value and is
22//! managed individually.
23
24use std::borrow::Cow;
25
26use cfg_if::cfg_if;
27
28use crate::{
29    interface::Retention,
30    mapping::{endpoint::Endpoint, invalid_filed, InterfaceMapping, MappingError},
31    schema::{Mapping, MappingType, Reliability},
32};
33
34/// Mapping of a [`DatastreamIndividual`](crate::DatastreamIndividual) interface.
35#[derive(Debug, PartialEq, Eq, Clone)]
36pub struct DatastreamIndividualMapping {
37    pub(crate) endpoint: Endpoint<String>,
38    pub(crate) mapping_type: MappingType,
39    pub(crate) reliability: Reliability,
40    pub(crate) retention: Retention,
41    pub(crate) explicit_timestamp: bool,
42    #[cfg(feature = "server-fields")]
43    pub(crate) database_retention: crate::interface::DatabaseRetention,
44    #[cfg(feature = "doc-fields")]
45    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
46    pub(crate) description: Option<String>,
47    #[cfg(feature = "doc-fields")]
48    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
49    pub(crate) doc: Option<String>,
50}
51
52impl DatastreamIndividualMapping {
53    /// Returns the [`Reliability`] of the mapping.
54    #[must_use]
55    pub fn reliability(&self) -> Reliability {
56        self.reliability
57    }
58
59    /// Returns the [`Retention`] of the mapping.
60    #[must_use]
61    pub fn retention(&self) -> Retention {
62        self.retention
63    }
64
65    /// Returns the [`DatabaseRetention`](crate::interface::DatabaseRetention) of the mapping.
66    #[must_use]
67    #[cfg(feature = "server-fields")]
68    #[cfg_attr(docsrs, doc(cfg(feature = "server-fields")))]
69    pub fn database_retention(&self) -> crate::interface::DatabaseRetention {
70        self.database_retention
71    }
72
73    /// Returns true if the mapping requires an explicit timestamp.
74    ///
75    /// Otherwise the reception timestamp is used.
76    #[must_use]
77    pub fn explicit_timestamp(&self) -> bool {
78        self.explicit_timestamp
79    }
80}
81
82impl InterfaceMapping for DatastreamIndividualMapping {
83    fn endpoint(&self) -> &Endpoint<String> {
84        &self.endpoint
85    }
86
87    fn mapping_type(&self) -> MappingType {
88        self.mapping_type
89    }
90
91    #[cfg(feature = "doc-fields")]
92    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
93    fn description(&self) -> Option<&str> {
94        self.description.as_deref()
95    }
96
97    #[cfg(feature = "doc-fields")]
98    #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
99    fn doc(&self) -> Option<&str> {
100        self.doc.as_deref()
101    }
102}
103
104impl<T> TryFrom<Mapping<T>> for DatastreamIndividualMapping
105where
106    T: AsRef<str> + Into<String>,
107{
108    type Error = MappingError;
109
110    fn try_from(value: Mapping<T>) -> Result<Self, Self::Error> {
111        let endpoint = Endpoint::try_from(value.endpoint.as_ref())?;
112        let retention = value.retention_with_expiry()?;
113        #[cfg(feature = "server-fields")]
114        let database_retention = value.database_retention_with_ttl()?;
115
116        if value.allow_unset.is_some() {
117            invalid_filed!(datastream, "allow_unset");
118        }
119
120        if value.required.is_some() {
121            invalid_filed!(datastream, "required");
122        }
123
124        Ok(Self {
125            endpoint,
126            reliability: value.reliability.unwrap_or_default(),
127            retention,
128            explicit_timestamp: value.explicit_timestamp.unwrap_or_default(),
129            mapping_type: value.mapping_type,
130            #[cfg(feature = "server-fields")]
131            database_retention,
132            #[cfg(feature = "doc-fields")]
133            description: value.description.map(T::into),
134            #[cfg(feature = "doc-fields")]
135            doc: value.doc.map(T::into),
136        })
137    }
138}
139
140impl<'a> From<&'a DatastreamIndividualMapping> for Mapping<Cow<'a, str>> {
141    fn from(value: &'a DatastreamIndividualMapping) -> Self {
142        cfg_if! {
143            if #[cfg(feature = "doc-fields")] {
144                let description = value.description().map(Cow::Borrowed);
145                let doc = value.doc().map(Cow::Borrowed);
146            } else {
147                let description = None;
148                let doc = None;
149            }
150        }
151
152        cfg_if! {
153            if #[cfg(feature = "server-fields")] {
154                let database_retention_policy = Some(value.database_retention.into());
155                let database_retention_ttl = value.database_retention.as_ttl_secs();
156            } else {
157                let database_retention_policy = None;
158                let database_retention_ttl = None;
159            }
160        }
161
162        Mapping {
163            endpoint: value.endpoint.to_string().into(),
164            mapping_type: value.mapping_type,
165            reliability: value.reliability.into(),
166            explicit_timestamp: Some(value.explicit_timestamp),
167            retention: Some(value.retention.into()),
168            expiry: value.retention.as_expiry_seconds(),
169            allow_unset: None,
170            required: None,
171            database_retention_policy,
172            database_retention_ttl,
173            description,
174            doc,
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::time::Duration;
182
183    use crate::schema::{self};
184
185    use super::*;
186
187    #[test]
188    fn getters_success() {
189        let mapping_type = MappingType::Boolean;
190        let reliability = Reliability::Guaranteed;
191        let retention_expiry = Duration::from_secs(420);
192        let database_ttl = Duration::from_secs(360);
193        let description = Some("Individual mapping description");
194        let doc = Some("Individual mapping doc");
195        let mapping = Mapping {
196            endpoint: "/individual/path",
197            mapping_type,
198            reliability: Some(reliability),
199            explicit_timestamp: Some(true),
200            retention: Some(schema::Retention::Stored),
201            expiry: Some(retention_expiry.as_secs().try_into().unwrap()),
202            database_retention_policy: Some(schema::DatabaseRetentionPolicy::UseTtl),
203            database_retention_ttl: Some(database_ttl.as_secs().try_into().unwrap()),
204            allow_unset: None,
205            required: None,
206            description,
207            doc,
208        };
209
210        let individual_mapping = DatastreamIndividualMapping::try_from(mapping).unwrap();
211
212        let exp = Endpoint::try_from("/individual/path").unwrap();
213        assert_eq!(*individual_mapping.endpoint(), exp);
214        assert_eq!(individual_mapping.mapping_type(), mapping_type);
215        assert_eq!(individual_mapping.reliability(), reliability);
216        assert!(individual_mapping.explicit_timestamp());
217        let exp_retention = Retention::Stored {
218            expiry: Some(retention_expiry),
219        };
220        assert_eq!(individual_mapping.retention(), exp_retention);
221        #[cfg(feature = "server-fields")]
222        {
223            let exp = crate::interface::DatabaseRetention::UseTtl { ttl: database_ttl };
224            assert_eq!(individual_mapping.database_retention(), exp);
225        }
226        #[cfg(feature = "doc-fields")]
227        {
228            assert_eq!(description, individual_mapping.description());
229            assert_eq!(doc, individual_mapping.doc());
230        }
231    }
232
233    #[cfg(feature = "strict")]
234    #[test]
235    fn mapping_error_invalid_fields() {
236        let mapping = Mapping {
237            endpoint: "/individual/path",
238            mapping_type: MappingType::Boolean,
239            reliability: None,
240            explicit_timestamp: None,
241            retention: None,
242            expiry: None,
243            database_retention_policy: None,
244            database_retention_ttl: None,
245            allow_unset: Some(true),
246            required: None,
247            description: None,
248            doc: None,
249        };
250
251        let err = DatastreamIndividualMapping::try_from(mapping).unwrap_err();
252        assert!(matches!(
253            err,
254            MappingError::InvalidField {
255                field: "allow_unset",
256                interface_type: crate::schema::InterfaceType::Datastream,
257            }
258        ));
259
260        let mapping = Mapping {
261            endpoint: "/individual/path",
262            mapping_type: MappingType::Boolean,
263            reliability: None,
264            explicit_timestamp: None,
265            retention: None,
266            expiry: None,
267            database_retention_policy: None,
268            database_retention_ttl: None,
269            allow_unset: None,
270            required: Some(true),
271            description: None,
272            doc: None,
273        };
274
275        let err = DatastreamIndividualMapping::try_from(mapping).unwrap_err();
276        assert!(matches!(
277            err,
278            MappingError::InvalidField {
279                field: "required",
280                interface_type: crate::schema::InterfaceType::Datastream,
281            }
282        ));
283    }
284}