Skip to main content

astarte_device_sdk/
properties.rs

1// This file is part of Astarte.
2//
3// Copyright 2023-2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19//! Handles the properties for the device.
20
21use std::{future::Future, io::Write};
22
23use astarte_interfaces::{
24    MappingPath, Schema, interface::InterfaceTypeAggregation, schema::InterfaceType,
25};
26use flate2::{Compression, bufread::ZlibDecoder, write::ZlibEncoder};
27use futures::{StreamExt, TryStreamExt, future};
28use tracing::{debug, error, warn};
29
30use crate::{
31    client::DeviceClient,
32    error::{Error, InterfaceTypeError},
33    store::{PropertyMapping, PropertyStore, StoredProp},
34    transport::Connection,
35    types::AstarteData,
36};
37
38/// Error handling the properties.
39#[non_exhaustive]
40#[derive(thiserror::Error, Debug)]
41pub enum PropertiesError {
42    /// The payload is too short, it should be at least 4 bytes long.
43    #[error("the payload should at least 4 bytes long, got {0}")]
44    PayloadTooShort(usize),
45    /// The payload is too log, it should be at most a u32.
46    #[error("the payload should be at most a u32")]
47    PayloadTooLong,
48    /// Couldn't convert the size from u32 to usize.
49    #[error("error converting the size from u32 to usize")]
50    Conversion(#[from] std::num::TryFromIntError),
51    /// Error decoding the zlib compressed payload.
52    #[error("error decoding the zlib compressed payload")]
53    Decode(#[source] std::io::Error),
54    /// Error encoding the zlib compressed payload.
55    #[error("error encoding the zlib compressed payload")]
56    Encode(#[source] std::io::Error),
57}
58
59/// Trait to access the stored properties.
60pub trait PropAccess {
61    /// Get the value of a property given the interface and path.
62    ///
63    /// ```no_run
64    /// use astarte_device_sdk::builder::DeviceBuilder;
65    /// use astarte_device_sdk::prelude::*;
66    /// use astarte_device_sdk::store::sqlite::SqliteStore;
67    /// use astarte_device_sdk::transport::mqtt::{MqttConfig, MqttArgs, Credential};
68    /// use astarte_device_sdk::types::AstarteData;
69    ///
70    /// #[tokio::main]
71    /// async fn main() {
72    ///     let database = SqliteStore::options().with_db_file("/path/to/database/store.db").await.unwrap();
73    ///     let args = MqttArgs{
74    ///         realm: "realm_id".to_string(),
75    ///         device_id: "device_id".to_string(),
76    ///         credential: Credential::secret("credential_secret"),
77    ///         pairing_url: "http://api.astarte.localhost/pairing".parse().expect("a valid URL")
78    ///     };
79    ///     let mqtt_config = MqttConfig::new(args);
80    ///
81    ///
82    ///     let (mut device, _connection) = DeviceBuilder::new().store(database)
83    ///         .connection(mqtt_config)
84    ///         .build().await.unwrap();
85    ///
86    ///     let property_value: Option<AstarteData> = device
87    ///         .property("my.interface.name", "/endpoint/path")
88    ///         .await
89    ///         .unwrap();
90    /// }
91    /// ```
92    fn property(
93        &self,
94        interface: &str,
95        path: &str,
96    ) -> impl Future<Output = Result<Option<AstarteData>, Error>> + Send;
97    /// Get all the properties of the given interface.
98    fn interface_props(
99        &self,
100        interface: &str,
101    ) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
102    /// Get all the stored properties, device or server owners.
103    fn all_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
104    /// Get all the stored device properties.
105    fn device_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
106    /// Get all the stored server properties.
107    fn server_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
108}
109
110impl<C> PropAccess for DeviceClient<C>
111where
112    C: Connection,
113{
114    async fn property(
115        &self,
116        interface_name: &str,
117        path: &str,
118    ) -> Result<Option<AstarteData>, Error> {
119        let path = MappingPath::try_from(path)?;
120
121        let interfaces = self.state.interfaces().read().await;
122        let mapping = interfaces.get_property(interface_name, &path)?;
123
124        self.try_load_prop(&mapping).await
125    }
126
127    async fn interface_props(&self, interface_name: &str) -> Result<Vec<StoredProp>, Error> {
128        let interfaces = self.state.interfaces().read().await;
129        let interface = interfaces
130            .get(interface_name)
131            .ok_or_else(|| Error::InterfaceNotFound {
132                name: interface_name.to_string(),
133            })?;
134
135        let InterfaceTypeAggregation::Properties(interface) = interface.inner() else {
136            return Err(Error::InterfaceType(InterfaceTypeError::new(
137                interface_name,
138                InterfaceType::Properties,
139                interface.interface_type(),
140            )));
141        };
142
143        let stored_prop = self.store.interface_props(interface).await?;
144
145        futures::stream::iter(stored_prop)
146            .then(|stored_prop| async {
147                if stored_prop.interface_major != interface.version_major() {
148                    warn!(
149                        "version mismatch for property {}{} (stored {}, interface {}), deleting",
150                        stored_prop.interface,
151                        stored_prop.path,
152                        stored_prop.interface_major,
153                        interface.version_major()
154                    );
155
156                    self.store
157                        .delete_prop(&PropertyMapping::from(&stored_prop))
158                        .await?;
159
160                    Ok(None)
161                } else {
162                    Ok(Some(stored_prop))
163                }
164            })
165            .try_filter_map(future::ok)
166            .try_collect()
167            .await
168    }
169
170    async fn all_props(&self) -> Result<Vec<StoredProp>, Error> {
171        self.store.load_all_props().await.map_err(Error::from)
172    }
173
174    async fn device_props(&self) -> Result<Vec<StoredProp>, Error> {
175        self.store.device_props().await.map_err(Error::from)
176    }
177
178    async fn server_props(&self) -> Result<Vec<StoredProp>, Error> {
179        self.store.server_props().await.map_err(Error::from)
180    }
181}
182
183/// Extracts the properties from a set payload.
184///
185/// See https://docs.astarte-platform.org/astarte/latest/080-mqtt-v1-protocol.html#purge-properties
186pub(crate) fn extract_set_properties(bdata: &[u8]) -> Result<Vec<String>, PropertiesError> {
187    use std::io::Read;
188
189    if bdata.len() < 4 {
190        return Err(PropertiesError::PayloadTooShort(bdata.len()));
191    }
192
193    let (size, data) = bdata.split_at(4);
194    // The size is a u32 in big endian, so we need to convert it to usize
195    let size: u32 = u32::from_be_bytes([size[0], size[1], size[2], size[3]]);
196    let size: usize = size.try_into()?;
197
198    let mut d = ZlibDecoder::new(data);
199    let mut s = String::new();
200    let bytes_read = d.read_to_string(&mut s).map_err(PropertiesError::Decode)?;
201
202    debug_assert_eq!(
203        bytes_read, size,
204        "Byte red and size mismatch: {bytes_read} != {size}"
205    );
206    // Signal the error in production
207    if bytes_read != size {
208        error!(
209            bytes_read = bytes_read,
210            size = size,
211            "Byte red and size mismatch",
212        );
213    }
214
215    if s.is_empty() {
216        Ok(Vec::new())
217    } else {
218        Ok(s.split(';').map(|x| x.to_string()).collect())
219    }
220}
221
222/// Extracts the properties from a set payload.
223///
224/// See https://docs.astarte-platform.org/astarte/latest/080-mqtt-v1-protocol.html#purge-properties
225pub(crate) fn encode_set_properties<'a, I>(interface_paths: I) -> Result<Vec<u8>, PropertiesError>
226where
227    I: IntoIterator<Item = &'a String>,
228{
229    let mut iter = interface_paths.into_iter();
230
231    let mut uncompressed_size: u32 = 0;
232
233    // pre-insert the 4 bytes for the size
234    let buf: Vec<u8> = vec![0, 0, 0, 0];
235
236    let mut encoder = ZlibEncoder::new(buf, Compression::default());
237
238    let Some(first) = iter.next() else {
239        debug!("no properties to retain, sending empty purge");
240
241        return encoder.finish().map_err(PropertiesError::Encode);
242    };
243
244    uncompressed_size = encode_prop(&mut encoder, uncompressed_size, first)?;
245
246    for prop in iter {
247        // encode the separator
248        uncompressed_size = encode_prop(&mut encoder, uncompressed_size, ";")?;
249        // encode the prop
250        uncompressed_size = encode_prop(&mut encoder, uncompressed_size, prop)?;
251    }
252
253    let mut res = encoder.finish().map_err(PropertiesError::Encode)?;
254
255    let bytes = uncompressed_size.to_be_bytes();
256
257    // we allocated the first 4 bytes
258    res[..bytes.len()].copy_from_slice(&bytes);
259
260    Ok(res)
261}
262
263fn encode_prop(
264    encoder: &mut ZlibEncoder<Vec<u8>>,
265    uncompressed_size: u32,
266    value: &str,
267) -> Result<u32, PropertiesError> {
268    let bytes = value.as_bytes();
269
270    let uncompressed_size = u32::try_from(bytes.len())
271        .ok()
272        .and_then(|val| uncompressed_size.checked_add(val))
273        .ok_or(PropertiesError::PayloadTooLong)?;
274
275    encoder.write_all(bytes).map_err(PropertiesError::Encode)?;
276
277    Ok(uncompressed_size)
278}
279
280#[cfg(test)]
281pub(crate) mod tests {
282    use astarte_interfaces::schema::Ownership;
283
284    use crate::client::tests::mock_client_with_store;
285    use crate::state::ConnStatus;
286    use crate::store::memory::MemoryStore;
287    use crate::store::{SqliteStore, StoreCapabilities};
288
289    use super::*;
290
291    pub(crate) const PROPERTIES_PAYLOAD: [u8; 66] = [
292        0x00, 0x00, 0x00, 0x46, 0x78, 0x9c, 0x4b, 0xce, 0xcf, 0xd5, 0x4b, 0xad, 0x48, 0xcc, 0x2d,
293        0xc8, 0x49, 0xd5, 0xf3, 0xad, 0xf4, 0xcc, 0x2b, 0x49, 0x2d, 0x4a, 0x4b, 0x4c, 0x4e, 0xd5,
294        0x2f, 0xce, 0xcf, 0x4d, 0xd5, 0x2f, 0x48, 0x2c, 0xc9, 0xb0, 0xce, 0x2f, 0x4a, 0x87, 0xab,
295        0x70, 0x29, 0x4a, 0x4c, 0x2b, 0x41, 0x28, 0xca, 0x2f, 0xc9, 0x48, 0x2d, 0x0a, 0x00, 0x2a,
296        0x02, 0x00, 0xb2, 0x0c, 0x1a, 0xc9,
297    ];
298
299    #[test]
300    fn test_deflate() {
301        let example = b"com.example.MyInterface/some/path;org.example.DraftInterface/otherPath";
302
303        let s = extract_set_properties(&PROPERTIES_PAYLOAD).unwrap();
304
305        assert_eq!(s.join(";").as_bytes(), example);
306    }
307
308    const SERVER_PROP: &str = r#"{
309    "interface_name": "org.Foo",
310    "version_major": 1,
311    "version_minor": 0,
312    "type": "properties",
313    "aggregation": "individual",
314    "ownership": "server",
315    "description": "Generic aggregated object data.",
316    "mappings": [{
317        "endpoint": "/bar",
318        "type": "boolean"
319    }]
320}"#;
321    const DEVICE_PROP: &str = r#"{
322    "interface_name": "org.Bar",
323    "version_major": 1,
324    "version_minor": 0,
325    "type": "properties",
326    "aggregation": "individual",
327    "ownership": "device",
328    "description": "Generic aggregated object data.",
329    "mappings": [{
330        "endpoint": "/foo",
331        "type": "integer"
332    }]
333}"#;
334
335    async fn test_prop_access_for_store<S>(store: S)
336    where
337        S: StoreCapabilities,
338    {
339        store
340            .store_prop(StoredProp {
341                interface: "org.Foo",
342                path: "/bar",
343                value: &AstarteData::Boolean(true),
344                interface_major: 1,
345                ownership: Ownership::Server,
346            })
347            .await
348            .unwrap();
349
350        store
351            .store_prop(StoredProp {
352                interface: "org.Bar",
353                path: "/foo",
354                value: &AstarteData::Integer(42),
355                interface_major: 1,
356                ownership: Ownership::Device,
357            })
358            .await
359            .unwrap();
360
361        let sdk = mock_client_with_store(&[SERVER_PROP, DEVICE_PROP], ConnStatus::Connected, store);
362
363        let prop = sdk.property("org.Foo", "/bar").await.unwrap();
364        assert_eq!(prop, Some(AstarteData::Boolean(true)));
365
366        let prop = sdk.property("org.Bar", "/foo").await.unwrap();
367        assert_eq!(prop, Some(AstarteData::Integer(42)));
368
369        let mut props = sdk.all_props().await.unwrap();
370        props.sort_unstable_by(|a, b| a.interface.cmp(&b.interface));
371        let expected = [
372            StoredProp::<&'static str> {
373                interface: "org.Bar",
374                path: "/foo",
375                value: AstarteData::Integer(42),
376                interface_major: 1,
377                ownership: Ownership::Device,
378            },
379            StoredProp::<&'static str> {
380                interface: "org.Foo",
381                path: "/bar",
382                value: AstarteData::Boolean(true),
383                interface_major: 1,
384                ownership: Ownership::Server,
385            },
386        ];
387        assert_eq!(props, expected);
388
389        let props = sdk.device_props().await.unwrap();
390        let expected = [StoredProp {
391            interface: "org.Bar",
392            path: "/foo",
393            value: AstarteData::Integer(42),
394            interface_major: 1,
395            ownership: Ownership::Device,
396        }];
397        assert_eq!(props, expected);
398
399        let props = sdk.interface_props("org.Bar").await.unwrap();
400        assert_eq!(props, expected);
401
402        let props = sdk.server_props().await.unwrap();
403        let expected = [StoredProp::<&'static str> {
404            interface: "org.Foo",
405            path: "/bar",
406            value: AstarteData::Boolean(true),
407            interface_major: 1,
408            ownership: Ownership::Server,
409        }];
410        assert_eq!(props, expected);
411
412        let props = sdk.interface_props("org.Foo").await.unwrap();
413        assert_eq!(props, expected);
414    }
415
416    #[tokio::test]
417    async fn test_in_memory_property_access() {
418        let store = MemoryStore::new();
419
420        test_prop_access_for_store(store).await;
421    }
422
423    #[tokio::test]
424    async fn test_in_sqlite_from_str() {
425        let dir = tempfile::tempdir().unwrap();
426
427        let db = dir.path().join("prop-cache.db");
428
429        let store = SqliteStore::options().with_db_file(&db).await.unwrap();
430
431        test_prop_access_for_store(store).await;
432    }
433
434    #[tokio::test]
435    async fn test_in_sqlite_property_access() {
436        let dir = tempfile::tempdir().unwrap();
437
438        let store = SqliteStore::options()
439            .with_writable_dir(dir.path())
440            .await
441            .unwrap();
442
443        test_prop_access_for_store(store).await;
444    }
445
446    #[test]
447    fn should_encode_props() {
448        let example = [
449            "com.example.MyInterface/some/path".to_string(),
450            "org.example.DraftInterface/otherPath".to_string(),
451        ];
452
453        let encoded = encode_set_properties(&example).unwrap();
454
455        let expected_snapshot: [u8; 71] = [
456            0, 0, 0, 70, 120, 156, 69, 202, 33, 14, 192, 32, 12, 5, 208, 27, 193, 1, 102, 103, 38,
457            150, 236, 10, 13, 249, 12, 65, 41, 41, 21, 112, 123, 80, 224, 95, 16, 118, 232, 196,
458            53, 195, 189, 227, 41, 6, 141, 20, 224, 155, 48, 124, 37, 75, 151, 232, 191, 197, 173,
459            20, 237, 32, 177, 4, 253, 22, 154, 178, 12, 26, 201,
460        ];
461
462        assert_eq!(
463            encoded,
464            expected_snapshot,
465            "the two are different, decoded is {:?}",
466            extract_set_properties(&encoded)
467        );
468
469        assert_eq!(extract_set_properties(&encoded).unwrap(), example);
470    }
471}