1use std::collections::HashMap;
20use std::path::Path;
21use std::str::FromStr;
22
23use astarte_interfaces::interface::InterfaceTypeAggregation;
24use astarte_interfaces::{Interface, Schema};
25use tokio::fs;
26use tracing::{debug, error};
27
28use crate::Error;
29use crate::error::Report;
30use crate::introspection::{AddInterfaceError, DeviceIntrospection};
31use crate::prelude::DynamicIntrospection;
32use crate::retention::StoredRetention;
33use crate::retention::memory::VolatileStore;
34use crate::store::wrapper::StoreWrapper;
35use crate::store::{PropertyStore, StoreCapabilities};
36use crate::transport::{Connection, Register};
37
38use super::DeviceClient;
39
40impl<C> DeviceClient<C>
41where
42 C: Connection,
43{
44 async fn cleanup_interface(
50 volatile_store: &VolatileStore,
51 store: &StoreWrapper<C::Store>,
52 interface: &Interface,
53 ) {
54 match interface.inner() {
55 InterfaceTypeAggregation::DatastreamIndividual(interface) => {
56 Self::cleanup_retention(volatile_store, store, interface.name()).await
57 }
58 InterfaceTypeAggregation::DatastreamObject(interface) => {
59 Self::cleanup_retention(volatile_store, store, interface.name()).await
60 }
61 InterfaceTypeAggregation::Properties(properties) => {
62 let res = store.delete_interface(properties).await;
63
64 if let Err(err) = res {
65 error!(error = %Report::new(err),"failed to remove interfaces from properties");
66 }
67 }
68 }
69 }
70
71 async fn cleanup_retention(
73 volatile_store: &VolatileStore,
74 store: &StoreWrapper<C::Store>,
75 interface_name: &str,
76 ) {
77 volatile_store.delete_interface(interface_name).await;
78
79 if let Some(retention) = store.get_retention() {
80 let res = retention.delete_interface(interface_name).await;
81
82 if let Err(err) = res {
83 error!(error = %Report::new(err),"failed to remove interfaces from retention");
84 }
85 }
86 }
87}
88
89impl<C> DeviceIntrospection for DeviceClient<C>
90where
91 C: Connection,
92{
93 async fn get_interface<F, O>(&self, interface_name: &str, mut f: F) -> O
94 where
95 F: FnMut(Option<&Interface>) -> O + Send,
96 {
97 let interfaces = self.state.interfaces().read().await;
98
99 f(interfaces.get(interface_name))
100 }
101}
102
103impl<C> DynamicIntrospection for DeviceClient<C>
104where
105 C: Connection,
106 C::Sender: Register,
107{
108 async fn add_interface(&mut self, interface: Interface) -> Result<bool, Error> {
109 let mut interfaces = self.state.interfaces().write().await;
111
112 let map_err = interfaces
113 .validate(interface)
114 .map_err(AddInterfaceError::Interface)?;
115
116 let Some(to_add) = map_err else {
117 debug!("interfaces already present");
118
119 return Ok(false);
120 };
121
122 self.sender.add_interface(&interfaces, &to_add).await?;
123
124 if to_add.is_major_change() {
125 Self::cleanup_interface(self.state.volatile_store(), &self.store, &to_add).await;
126 }
127
128 debug!("adding interface to introspection");
129
130 interfaces.add(to_add);
131
132 Ok(true)
133 }
134
135 async fn extend_interfaces<I>(&mut self, iter: I) -> Result<Vec<String>, Error>
136 where
137 I: IntoIterator<Item = Interface> + Send,
138 {
139 let mut interfaces = self.state.interfaces().write().await;
141
142 let to_add = interfaces
143 .validate_many(iter)
144 .map_err(AddInterfaceError::Interface)?;
145
146 if to_add.is_empty() {
147 debug!("All interfaces already present");
148 return Ok(Vec::new());
149 }
150
151 debug!("Adding {} interfaces", to_add.len());
152
153 self.sender.extend_interfaces(&interfaces, &to_add).await?;
154
155 let major_changes = to_add
156 .values()
157 .filter(|interface| interface.is_major_change());
158
159 for interface in major_changes {
160 Self::cleanup_interface(self.state.volatile_store(), &self.store, interface).await;
161 }
162
163 let names = to_add.keys().cloned().collect();
164
165 debug!("adding interfaces to introspection");
166
167 interfaces.extend(to_add);
168
169 debug!("Interfaces added");
170
171 Ok(names)
172 }
173
174 async fn add_interface_from_file<P>(&mut self, file_path: P) -> Result<bool, Error>
175 where
176 P: AsRef<Path> + Send + Sync,
177 {
178 let interface =
179 fs::read_to_string(&file_path)
180 .await
181 .map_err(|err| AddInterfaceError::Io {
182 path: file_path.as_ref().to_owned(),
183 backtrace: err,
184 })?;
185
186 let interface =
187 Interface::from_str(&interface).map_err(|err| AddInterfaceError::InterfaceFile {
188 path: file_path.as_ref().to_owned(),
189 backtrace: err,
190 })?;
191
192 self.add_interface(interface).await
193 }
194
195 async fn add_interface_from_str(&mut self, json_str: &str) -> Result<bool, Error> {
196 let interface = Interface::from_str(json_str).map_err(AddInterfaceError::Interface)?;
197
198 self.add_interface(interface).await
199 }
200
201 async fn remove_interface(&mut self, interface_name: &str) -> Result<bool, Error> {
202 let mut interfaces = self.state.interfaces().write().await;
204
205 let Some(to_remove) = interfaces.get(interface_name) else {
206 debug!("{interface_name} not found, skipping");
207 return Ok(false);
208 };
209
210 self.sender.remove_interface(&interfaces, to_remove).await?;
211
212 Self::cleanup_interface(self.state.volatile_store(), &self.store, to_remove).await;
213
214 debug!("removing interface from introspection");
215
216 interfaces.remove(interface_name);
217
218 Ok(true)
219 }
220
221 async fn remove_interfaces<I>(&mut self, interfaces_name: I) -> Result<Vec<String>, Error>
222 where
223 I: IntoIterator<Item = String> + Send,
224 I::IntoIter: Send,
225 {
226 let mut interfaces = self.state.interfaces().write().await;
228
229 let to_remove: HashMap<&str, &Interface> = interfaces_name
230 .into_iter()
231 .filter_map(|iface_name| {
232 let interface = interfaces.get(&iface_name).map(|i| (i.interface_name(), i));
233
234 if interface.is_none() {
235 debug!("{iface_name} not found, skipping");
236 }
237
238 interface
239 })
240 .collect();
241
242 if to_remove.is_empty() {
243 return Ok(Vec::new());
244 }
245
246 self.sender
247 .remove_interfaces(&interfaces, &to_remove)
248 .await?;
249
250 for interface in to_remove.values() {
251 Self::cleanup_interface(self.state.volatile_store(), &self.store, interface).await;
252 }
253
254 let removed_names: Vec<String> = to_remove.keys().map(|k| k.to_string()).collect();
255
256 debug!("removing interfaces from introspection");
257
258 interfaces.remove_many(&removed_names);
259
260 Ok(removed_names)
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use astarte_interfaces::interface::Retention;
267 use astarte_interfaces::schema::Reliability;
268 use astarte_interfaces::{MappingPath, Properties};
269 use chrono::Utc;
270 use mockall::{Sequence, predicate};
271 use pretty_assertions::assert_eq;
272 use tempfile::TempDir;
273
274 use super::*;
275
276 use crate::AstarteData;
277 use crate::client::tests::{mock_client, mock_client_with_store};
278 use crate::interfaces::MappingRef;
279 use crate::interfaces::tests::{mock_validated_collection, mock_validated_interface};
280 use crate::retention::StoredRetentionExt;
281 use crate::state::ConnStatus;
282 use crate::store::{PropertyMapping, SqliteStore};
283 use crate::test::{
284 E2E_DEVICE_AGGREGATE, E2E_DEVICE_AGGREGATE_NAME, E2E_DEVICE_PROPERTY,
285 E2E_DEVICE_PROPERTY_NAME, for_update,
286 };
287 use crate::validate::ValidatedIndividual;
288
289 #[tokio::test]
290 async fn get_interface() {
291 let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
292
293 let client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
294
295 client
296 .get_interface(interface.interface_name(), |i| {
297 assert_eq!(i, Some(&interface));
298 })
299 .await;
300
301 client
302 .get_interface(E2E_DEVICE_PROPERTY_NAME, |i| {
303 assert_eq!(i, None);
304 })
305 .await;
306 }
307
308 #[tokio::test]
309 async fn add_interface_missing() {
310 let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
311
312 let mut client = mock_client(&[], ConnStatus::Connected);
313
314 let mut seq = Sequence::new();
315 client
316 .sender
317 .expect_add_interface()
318 .once()
319 .in_sequence(&mut seq)
320 .with(
321 predicate::always(),
322 predicate::eq(mock_validated_interface(interface.clone(), false)),
323 )
324 .returning(|_, _| Ok(()));
325
326 let added = client.add_interface(interface.clone()).await.unwrap();
327 assert!(added);
328
329 client
330 .get_interface(interface.interface_name(), |i| {
331 assert_eq!(i, Some(&interface));
332 })
333 .await;
334
335 let added = client.add_interface(interface).await.unwrap();
336 assert!(!added);
337 }
338
339 #[tokio::test]
340 async fn add_interface_missing_from_str() {
341 let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
342
343 let mut client = mock_client(&[], ConnStatus::Connected);
344
345 let mut seq = Sequence::new();
346 client
347 .sender
348 .expect_add_interface()
349 .once()
350 .in_sequence(&mut seq)
351 .with(
352 predicate::always(),
353 predicate::eq(mock_validated_interface(interface.clone(), false)),
354 )
355 .returning(|_, _| Ok(()));
356
357 let added = client
358 .add_interface_from_str(E2E_DEVICE_AGGREGATE)
359 .await
360 .unwrap();
361 assert!(added);
362
363 client
364 .get_interface(interface.interface_name(), |i| {
365 assert_eq!(i, Some(&interface));
366 })
367 .await;
368
369 let added = client.add_interface(interface).await.unwrap();
370 assert!(!added);
371 }
372
373 #[tokio::test]
374 async fn add_interface_missing_from_file() {
375 let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
376
377 let mut client = mock_client(&[], ConnStatus::Connected);
378
379 let mut seq = Sequence::new();
380 client
381 .sender
382 .expect_add_interface()
383 .once()
384 .in_sequence(&mut seq)
385 .with(
386 predicate::always(),
387 predicate::eq(mock_validated_interface(interface.clone(), false)),
388 )
389 .returning(|_, _| Ok(()));
390
391 let dir = TempDir::new().unwrap();
392
393 let path = dir.path().join("interface");
394 std::fs::write(&path, E2E_DEVICE_AGGREGATE).unwrap();
395
396 let added = client.add_interface_from_file(&path).await.unwrap();
397 assert!(added);
398
399 client
400 .get_interface(interface.interface_name(), |i| {
401 assert_eq!(i, Some(&interface));
402 })
403 .await;
404
405 let added = client.add_interface(interface).await.unwrap();
406 assert!(!added);
407 }
408
409 #[tokio::test]
410 async fn add_interface_major_with_retention_volatile() {
411 let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
412
413 let mut client = mock_client(
414 &[for_update::E2E_DEVICE_DATASTREAM_0_1],
415 ConnStatus::Connected,
416 );
417
418 client
419 .state
420 .volatile_store()
421 .push_sent(
422 client.state.retention_ctx().next(),
423 ValidatedIndividual {
424 interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
425 path: "/sensor_1/volatile".to_string(),
426 version_major: 0,
427 reliability: Reliability::Guaranteed,
428 retention: Retention::Volatile { expiry: None },
429 data: AstarteData::try_from(42.0).unwrap(),
430 timestamp: Some(Utc::now()),
431 },
432 )
433 .await;
434
435 let mut seq = Sequence::new();
436 client
437 .sender
438 .expect_add_interface()
439 .once()
440 .in_sequence(&mut seq)
441 .with(
442 predicate::always(),
443 predicate::eq(mock_validated_interface(updated.clone(), true)),
444 )
445 .returning(|_, _| Ok(()));
446
447 let added = client.add_interface(updated.clone()).await.unwrap();
448 assert!(added);
449
450 client
451 .get_interface(updated.interface_name(), |i| {
452 assert_eq!(i, Some(&updated));
453 })
454 .await;
455
456 assert!(client.state.volatile_store().pop_next().await.is_none());
457 }
458
459 #[tokio::test]
460 async fn add_interface_major_with_retention_stored() {
461 let dir = TempDir::new().unwrap();
462 let store = SqliteStore::options()
463 .with_writable_dir(dir.path())
464 .await
465 .unwrap();
466
467 let mut client = mock_client_with_store(
468 &[for_update::E2E_DEVICE_DATASTREAM_0_1],
469 ConnStatus::Connected,
470 store,
471 );
472
473 let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
474
475 let id = client.state.retention_ctx().next();
476 client
477 .store
478 .get_retention()
479 .unwrap()
480 .store_publish_individual(
481 &id,
482 &ValidatedIndividual {
483 interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
484 path: "/sensor_1/stored".to_string(),
485 version_major: 0,
486 reliability: Reliability::Guaranteed,
487 retention: Retention::Stored { expiry: None },
488 data: AstarteData::try_from(42.0).unwrap(),
489 timestamp: Some(Utc::now()),
490 },
491 &[1, 2, 3, 4],
492 true,
493 )
494 .await
495 .unwrap();
496
497 let mut seq = Sequence::new();
498 client
499 .sender
500 .expect_add_interface()
501 .once()
502 .in_sequence(&mut seq)
503 .with(
504 predicate::always(),
505 predicate::eq(mock_validated_interface(updated.clone(), true)),
506 )
507 .returning(|_, _| Ok(()));
508
509 let added = client.add_interface(updated.clone()).await.unwrap();
510 assert!(added);
511
512 client
513 .get_interface(updated.interface_name(), |i| {
514 assert_eq!(i, Some(&updated));
515 })
516 .await;
517
518 let packets = client
519 .store
520 .get_retention()
521 .unwrap()
522 .fetch_all_interfaces()
523 .await
524 .unwrap();
525 assert!(packets.is_empty());
526 }
527
528 #[tokio::test]
529 async fn extend_interfaces_major_with_retention_volatile() {
530 let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
531
532 let mut client = mock_client(
533 &[for_update::E2E_DEVICE_DATASTREAM_0_1],
534 ConnStatus::Connected,
535 );
536
537 client
538 .state
539 .volatile_store()
540 .push_sent(
541 client.state.retention_ctx().next(),
542 ValidatedIndividual {
543 interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
544 path: "/sensor_1/volatile".to_string(),
545 version_major: 0,
546 reliability: Reliability::Guaranteed,
547 retention: Retention::Volatile { expiry: None },
548 data: AstarteData::try_from(42.0).unwrap(),
549 timestamp: Some(Utc::now()),
550 },
551 )
552 .await;
553
554 let mut seq = Sequence::new();
555 client
556 .sender
557 .expect_extend_interfaces()
558 .once()
559 .in_sequence(&mut seq)
560 .with(
561 predicate::always(),
562 predicate::eq(mock_validated_collection(&[mock_validated_interface(
563 updated.clone(),
564 true,
565 )])),
566 )
567 .returning(|_, _| Ok(()));
568
569 let added = client.extend_interfaces([updated.clone()]).await.unwrap();
570 assert_eq!(added, vec![updated.interface_name()]);
571
572 client
573 .get_interface(updated.interface_name(), |i| {
574 assert_eq!(i, Some(&updated));
575 })
576 .await;
577
578 assert!(client.state.volatile_store().pop_next().await.is_none());
579 }
580
581 #[tokio::test]
582 async fn extend_interfaces_major_with_retention_stored() {
583 let dir = TempDir::new().unwrap();
584 let store = SqliteStore::options()
585 .with_writable_dir(dir.path())
586 .await
587 .unwrap();
588
589 let mut client = mock_client_with_store(
590 &[for_update::E2E_DEVICE_DATASTREAM_0_1],
591 ConnStatus::Connected,
592 store,
593 );
594
595 let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
596
597 let id = client.state.retention_ctx().next();
598 client
599 .store
600 .get_retention()
601 .unwrap()
602 .store_publish_individual(
603 &id,
604 &ValidatedIndividual {
605 interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
606 path: "/sensor_1/stored".to_string(),
607 version_major: 0,
608 reliability: Reliability::Guaranteed,
609 retention: Retention::Stored { expiry: None },
610 data: AstarteData::try_from(42.0).unwrap(),
611 timestamp: Some(Utc::now()),
612 },
613 &[1, 2, 3, 4],
614 true,
615 )
616 .await
617 .unwrap();
618
619 let mut seq = Sequence::new();
620 client
621 .sender
622 .expect_extend_interfaces()
623 .once()
624 .in_sequence(&mut seq)
625 .with(
626 predicate::always(),
627 predicate::eq(mock_validated_collection(&[mock_validated_interface(
628 updated.clone(),
629 true,
630 )])),
631 )
632 .returning(|_, _| Ok(()));
633
634 let added = client.extend_interfaces([updated.clone()]).await.unwrap();
635 assert_eq!(added, [updated.interface_name()]);
636
637 client
638 .get_interface(updated.interface_name(), |i| {
639 assert_eq!(i, Some(&updated));
640 })
641 .await;
642
643 let packets = client
644 .store
645 .get_retention()
646 .unwrap()
647 .fetch_all_interfaces()
648 .await
649 .unwrap();
650 assert!(packets.is_empty());
651 }
652
653 #[tokio::test]
654 async fn extend_interfaces_nothing_to_add() {
655 let mut client = mock_client(
656 &[for_update::E2E_DEVICE_DATASTREAM_1_0],
657 ConnStatus::Connected,
658 );
659
660 let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
661
662 let added = client.extend_interfaces([updated.clone()]).await.unwrap();
663 assert!(added.is_empty());
664
665 client
666 .get_interface(updated.interface_name(), |i| {
667 assert_eq!(i, Some(&updated));
668 })
669 .await;
670 }
671
672 #[tokio::test]
673 async fn remove_interface_present() {
674 let mut client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
675
676 let to_remove = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
677
678 let mut seq = Sequence::new();
679 client
680 .sender
681 .expect_remove_interface()
682 .once()
683 .in_sequence(&mut seq)
684 .with(predicate::always(), predicate::eq(to_remove.clone()))
685 .returning(|_, _| Ok(()));
686
687 let removed = client
688 .remove_interface(E2E_DEVICE_AGGREGATE_NAME)
689 .await
690 .unwrap();
691 assert!(removed);
692
693 client
694 .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
695 assert_eq!(i, None);
696 })
697 .await;
698 }
699
700 #[tokio::test]
701 async fn remove_interface_property() {
702 let mut client = mock_client(&[E2E_DEVICE_PROPERTY], ConnStatus::Connected);
703
704 let to_remove = Interface::from_str(E2E_DEVICE_PROPERTY).unwrap();
705
706 let path = "/sensor_1/double_endpoint";
707
708 client
709 .store
710 .store_prop(crate::store::StoredProp {
711 interface: E2E_DEVICE_PROPERTY_NAME,
712 path,
713 value: &AstarteData::LongInteger(2),
714 interface_major: to_remove.version_major(),
715 ownership: to_remove.ownership(),
716 })
717 .await
718 .unwrap();
719
720 let mut seq = Sequence::new();
721 client
722 .sender
723 .expect_remove_interface()
724 .once()
725 .in_sequence(&mut seq)
726 .with(predicate::always(), predicate::eq(to_remove.clone()))
727 .returning(|_, _| Ok(()));
728
729 let removed = client
730 .remove_interface(E2E_DEVICE_PROPERTY_NAME)
731 .await
732 .unwrap();
733 assert!(removed);
734
735 client
736 .get_interface(E2E_DEVICE_PROPERTY_NAME, |i| {
737 assert_eq!(i, None);
738 })
739 .await;
740
741 let prop = Properties::from_str(E2E_DEVICE_PROPERTY).unwrap();
742 let path = MappingPath::try_from(path).unwrap();
743 let mapping = MappingRef::new(&prop, &path).unwrap();
744
745 let res = client
746 .store
747 .load_prop(&PropertyMapping::from(&mapping))
748 .await
749 .unwrap();
750 assert_eq!(res, None);
751 }
752
753 #[tokio::test]
754 async fn remove_interface_not_found() {
755 let mut client = mock_client(&[], ConnStatus::Connected);
756
757 let removed = client
758 .remove_interface(E2E_DEVICE_AGGREGATE_NAME)
759 .await
760 .unwrap();
761 assert!(!removed);
762
763 client
764 .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
765 assert_eq!(i, None);
766 })
767 .await;
768 }
769
770 #[tokio::test]
771 async fn remove_interface_many_present() {
772 let mut client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
773
774 let mut seq = Sequence::new();
775 client
776 .sender
777 .expect_remove_interfaces()
778 .once()
779 .in_sequence(&mut seq)
780 .withf(|_, a| a.len() == 1 && a.contains_key(E2E_DEVICE_AGGREGATE_NAME))
782 .returning(|_, _| Ok(()));
783
784 let removed = client
785 .remove_interfaces([E2E_DEVICE_AGGREGATE_NAME.to_string()])
786 .await
787 .unwrap();
788 assert_eq!(removed, [E2E_DEVICE_AGGREGATE_NAME]);
789
790 client
791 .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
792 assert_eq!(i, None);
793 })
794 .await;
795 }
796
797 #[tokio::test]
798 async fn remove_interface_many_missing() {
799 let mut client = mock_client(&[], ConnStatus::Connected);
800
801 let removed = client
802 .remove_interfaces([E2E_DEVICE_AGGREGATE_NAME.to_string()])
803 .await
804 .unwrap();
805 assert!(removed.is_empty());
806
807 client
808 .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
809 assert_eq!(i, None);
810 })
811 .await;
812 }
813}