1use std::{future::Future, io::Write};
22
23use astarte_interfaces::{
24 MappingPath, Schema, interface::InterfaceTypeAggregation, schema::InterfaceType,
25};
26use flate2::{Compression, bufread::ZlibDecoder, write::ZlibEncoder};
27use futures::{StreamExt, TryStreamExt, future};
28use tracing::{debug, error, warn};
29
30use crate::{
31 client::DeviceClient,
32 error::{Error, InterfaceTypeError},
33 store::{PropertyMapping, PropertyStore, StoredProp},
34 transport::Connection,
35 types::AstarteData,
36};
37
38#[non_exhaustive]
40#[derive(thiserror::Error, Debug)]
41pub enum PropertiesError {
42 #[error("the payload should at least 4 bytes long, got {0}")]
44 PayloadTooShort(usize),
45 #[error("the payload should be at most a u32")]
47 PayloadTooLong,
48 #[error("error converting the size from u32 to usize")]
50 Conversion(#[from] std::num::TryFromIntError),
51 #[error("error decoding the zlib compressed payload")]
53 Decode(#[source] std::io::Error),
54 #[error("error encoding the zlib compressed payload")]
56 Encode(#[source] std::io::Error),
57}
58
59pub trait PropAccess {
61 fn property(
93 &self,
94 interface: &str,
95 path: &str,
96 ) -> impl Future<Output = Result<Option<AstarteData>, Error>> + Send;
97 fn interface_props(
99 &self,
100 interface: &str,
101 ) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
102 fn all_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
104 fn device_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
106 fn server_props(&self) -> impl Future<Output = Result<Vec<StoredProp>, Error>> + Send;
108}
109
110impl<C> PropAccess for DeviceClient<C>
111where
112 C: Connection,
113{
114 async fn property(
115 &self,
116 interface_name: &str,
117 path: &str,
118 ) -> Result<Option<AstarteData>, Error> {
119 let path = MappingPath::try_from(path)?;
120
121 let interfaces = self.state.interfaces().read().await;
122 let mapping = interfaces.get_property(interface_name, &path)?;
123
124 self.try_load_prop(&mapping).await
125 }
126
127 async fn interface_props(&self, interface_name: &str) -> Result<Vec<StoredProp>, Error> {
128 let interfaces = self.state.interfaces().read().await;
129 let interface = interfaces
130 .get(interface_name)
131 .ok_or_else(|| Error::InterfaceNotFound {
132 name: interface_name.to_string(),
133 })?;
134
135 let InterfaceTypeAggregation::Properties(interface) = interface.inner() else {
136 return Err(Error::InterfaceType(InterfaceTypeError::new(
137 interface_name,
138 InterfaceType::Properties,
139 interface.interface_type(),
140 )));
141 };
142
143 let stored_prop = self.store.interface_props(interface).await?;
144
145 futures::stream::iter(stored_prop)
146 .then(|stored_prop| async {
147 if stored_prop.interface_major != interface.version_major() {
148 warn!(
149 "version mismatch for property {}{} (stored {}, interface {}), deleting",
150 stored_prop.interface,
151 stored_prop.path,
152 stored_prop.interface_major,
153 interface.version_major()
154 );
155
156 self.store
157 .delete_prop(&PropertyMapping::from(&stored_prop))
158 .await?;
159
160 Ok(None)
161 } else {
162 Ok(Some(stored_prop))
163 }
164 })
165 .try_filter_map(future::ok)
166 .try_collect()
167 .await
168 }
169
170 async fn all_props(&self) -> Result<Vec<StoredProp>, Error> {
171 self.store.load_all_props().await.map_err(Error::from)
172 }
173
174 async fn device_props(&self) -> Result<Vec<StoredProp>, Error> {
175 self.store.device_props().await.map_err(Error::from)
176 }
177
178 async fn server_props(&self) -> Result<Vec<StoredProp>, Error> {
179 self.store.server_props().await.map_err(Error::from)
180 }
181}
182
183pub(crate) fn extract_set_properties(bdata: &[u8]) -> Result<Vec<String>, PropertiesError> {
187 use std::io::Read;
188
189 if bdata.len() < 4 {
190 return Err(PropertiesError::PayloadTooShort(bdata.len()));
191 }
192
193 let (size, data) = bdata.split_at(4);
194 let size: u32 = u32::from_be_bytes([size[0], size[1], size[2], size[3]]);
196 let size: usize = size.try_into()?;
197
198 let mut d = ZlibDecoder::new(data);
199 let mut s = String::new();
200 let bytes_read = d.read_to_string(&mut s).map_err(PropertiesError::Decode)?;
201
202 debug_assert_eq!(
203 bytes_read, size,
204 "Byte red and size mismatch: {bytes_read} != {size}"
205 );
206 if bytes_read != size {
208 error!(
209 bytes_read = bytes_read,
210 size = size,
211 "Byte red and size mismatch",
212 );
213 }
214
215 if s.is_empty() {
216 Ok(Vec::new())
217 } else {
218 Ok(s.split(';').map(|x| x.to_string()).collect())
219 }
220}
221
222pub(crate) fn encode_set_properties<'a, I>(interface_paths: I) -> Result<Vec<u8>, PropertiesError>
226where
227 I: IntoIterator<Item = &'a String>,
228{
229 let mut iter = interface_paths.into_iter();
230
231 let mut uncompressed_size: u32 = 0;
232
233 let buf: Vec<u8> = vec![0, 0, 0, 0];
235
236 let mut encoder = ZlibEncoder::new(buf, Compression::default());
237
238 let Some(first) = iter.next() else {
239 debug!("no properties to retain, sending empty purge");
240
241 return encoder.finish().map_err(PropertiesError::Encode);
242 };
243
244 uncompressed_size = encode_prop(&mut encoder, uncompressed_size, first)?;
245
246 for prop in iter {
247 uncompressed_size = encode_prop(&mut encoder, uncompressed_size, ";")?;
249 uncompressed_size = encode_prop(&mut encoder, uncompressed_size, prop)?;
251 }
252
253 let mut res = encoder.finish().map_err(PropertiesError::Encode)?;
254
255 let bytes = uncompressed_size.to_be_bytes();
256
257 res[..bytes.len()].copy_from_slice(&bytes);
259
260 Ok(res)
261}
262
263fn encode_prop(
264 encoder: &mut ZlibEncoder<Vec<u8>>,
265 uncompressed_size: u32,
266 value: &str,
267) -> Result<u32, PropertiesError> {
268 let bytes = value.as_bytes();
269
270 let uncompressed_size = u32::try_from(bytes.len())
271 .ok()
272 .and_then(|val| uncompressed_size.checked_add(val))
273 .ok_or(PropertiesError::PayloadTooLong)?;
274
275 encoder.write_all(bytes).map_err(PropertiesError::Encode)?;
276
277 Ok(uncompressed_size)
278}
279
280#[cfg(test)]
281pub(crate) mod tests {
282 use astarte_interfaces::schema::Ownership;
283
284 use crate::client::tests::mock_client_with_store;
285 use crate::state::ConnStatus;
286 use crate::store::memory::MemoryStore;
287 use crate::store::{SqliteStore, StoreCapabilities};
288
289 use super::*;
290
291 pub(crate) const PROPERTIES_PAYLOAD: [u8; 66] = [
292 0x00, 0x00, 0x00, 0x46, 0x78, 0x9c, 0x4b, 0xce, 0xcf, 0xd5, 0x4b, 0xad, 0x48, 0xcc, 0x2d,
293 0xc8, 0x49, 0xd5, 0xf3, 0xad, 0xf4, 0xcc, 0x2b, 0x49, 0x2d, 0x4a, 0x4b, 0x4c, 0x4e, 0xd5,
294 0x2f, 0xce, 0xcf, 0x4d, 0xd5, 0x2f, 0x48, 0x2c, 0xc9, 0xb0, 0xce, 0x2f, 0x4a, 0x87, 0xab,
295 0x70, 0x29, 0x4a, 0x4c, 0x2b, 0x41, 0x28, 0xca, 0x2f, 0xc9, 0x48, 0x2d, 0x0a, 0x00, 0x2a,
296 0x02, 0x00, 0xb2, 0x0c, 0x1a, 0xc9,
297 ];
298
299 #[test]
300 fn test_deflate() {
301 let example = b"com.example.MyInterface/some/path;org.example.DraftInterface/otherPath";
302
303 let s = extract_set_properties(&PROPERTIES_PAYLOAD).unwrap();
304
305 assert_eq!(s.join(";").as_bytes(), example);
306 }
307
308 const SERVER_PROP: &str = r#"{
309 "interface_name": "org.Foo",
310 "version_major": 1,
311 "version_minor": 0,
312 "type": "properties",
313 "aggregation": "individual",
314 "ownership": "server",
315 "description": "Generic aggregated object data.",
316 "mappings": [{
317 "endpoint": "/bar",
318 "type": "boolean"
319 }]
320}"#;
321 const DEVICE_PROP: &str = r#"{
322 "interface_name": "org.Bar",
323 "version_major": 1,
324 "version_minor": 0,
325 "type": "properties",
326 "aggregation": "individual",
327 "ownership": "device",
328 "description": "Generic aggregated object data.",
329 "mappings": [{
330 "endpoint": "/foo",
331 "type": "integer"
332 }]
333}"#;
334
335 async fn test_prop_access_for_store<S>(store: S)
336 where
337 S: StoreCapabilities,
338 {
339 store
340 .store_prop(StoredProp {
341 interface: "org.Foo",
342 path: "/bar",
343 value: &AstarteData::Boolean(true),
344 interface_major: 1,
345 ownership: Ownership::Server,
346 })
347 .await
348 .unwrap();
349
350 store
351 .store_prop(StoredProp {
352 interface: "org.Bar",
353 path: "/foo",
354 value: &AstarteData::Integer(42),
355 interface_major: 1,
356 ownership: Ownership::Device,
357 })
358 .await
359 .unwrap();
360
361 let sdk = mock_client_with_store(&[SERVER_PROP, DEVICE_PROP], ConnStatus::Connected, store);
362
363 let prop = sdk.property("org.Foo", "/bar").await.unwrap();
364 assert_eq!(prop, Some(AstarteData::Boolean(true)));
365
366 let prop = sdk.property("org.Bar", "/foo").await.unwrap();
367 assert_eq!(prop, Some(AstarteData::Integer(42)));
368
369 let mut props = sdk.all_props().await.unwrap();
370 props.sort_unstable_by(|a, b| a.interface.cmp(&b.interface));
371 let expected = [
372 StoredProp::<&'static str> {
373 interface: "org.Bar",
374 path: "/foo",
375 value: AstarteData::Integer(42),
376 interface_major: 1,
377 ownership: Ownership::Device,
378 },
379 StoredProp::<&'static str> {
380 interface: "org.Foo",
381 path: "/bar",
382 value: AstarteData::Boolean(true),
383 interface_major: 1,
384 ownership: Ownership::Server,
385 },
386 ];
387 assert_eq!(props, expected);
388
389 let props = sdk.device_props().await.unwrap();
390 let expected = [StoredProp {
391 interface: "org.Bar",
392 path: "/foo",
393 value: AstarteData::Integer(42),
394 interface_major: 1,
395 ownership: Ownership::Device,
396 }];
397 assert_eq!(props, expected);
398
399 let props = sdk.interface_props("org.Bar").await.unwrap();
400 assert_eq!(props, expected);
401
402 let props = sdk.server_props().await.unwrap();
403 let expected = [StoredProp::<&'static str> {
404 interface: "org.Foo",
405 path: "/bar",
406 value: AstarteData::Boolean(true),
407 interface_major: 1,
408 ownership: Ownership::Server,
409 }];
410 assert_eq!(props, expected);
411
412 let props = sdk.interface_props("org.Foo").await.unwrap();
413 assert_eq!(props, expected);
414 }
415
416 #[tokio::test]
417 async fn test_in_memory_property_access() {
418 let store = MemoryStore::new();
419
420 test_prop_access_for_store(store).await;
421 }
422
423 #[tokio::test]
424 async fn test_in_sqlite_from_str() {
425 let dir = tempfile::tempdir().unwrap();
426
427 let db = dir.path().join("prop-cache.db");
428
429 let store = SqliteStore::options().with_db_file(&db).await.unwrap();
430
431 test_prop_access_for_store(store).await;
432 }
433
434 #[tokio::test]
435 async fn test_in_sqlite_property_access() {
436 let dir = tempfile::tempdir().unwrap();
437
438 let store = SqliteStore::options()
439 .with_writable_dir(dir.path())
440 .await
441 .unwrap();
442
443 test_prop_access_for_store(store).await;
444 }
445
446 #[test]
447 fn should_encode_props() {
448 let example = [
449 "com.example.MyInterface/some/path".to_string(),
450 "org.example.DraftInterface/otherPath".to_string(),
451 ];
452
453 let encoded = encode_set_properties(&example).unwrap();
454
455 let expected_snapshot: [u8; 71] = [
456 0, 0, 0, 70, 120, 156, 69, 202, 33, 14, 192, 32, 12, 5, 208, 27, 193, 1, 102, 103, 38,
457 150, 236, 10, 13, 249, 12, 65, 41, 41, 21, 112, 123, 80, 224, 95, 16, 118, 232, 196,
458 53, 195, 189, 227, 41, 6, 141, 20, 224, 155, 48, 124, 37, 75, 151, 232, 191, 197, 173,
459 20, 237, 32, 177, 4, 253, 22, 154, 178, 12, 26, 201,
460 ];
461
462 assert_eq!(
463 encoded,
464 expected_snapshot,
465 "the two are different, decoded is {:?}",
466 extract_set_properties(&encoded)
467 );
468
469 assert_eq!(extract_set_properties(&encoded).unwrap(), example);
470 }
471}