Skip to main content

astarte_device_sdk/client/
introspection.rs

1// This file is part of Astarte.
2//
3// Copyright 2025, 2026 SECO Mind Srl
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//    http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// SPDX-License-Identifier: Apache-2.0
18
19use std::collections::HashMap;
20use std::path::Path;
21use std::str::FromStr;
22
23use astarte_interfaces::interface::InterfaceTypeAggregation;
24use astarte_interfaces::{Interface, Schema};
25use tokio::fs;
26use tracing::{debug, error};
27
28use crate::Error;
29use crate::error::Report;
30use crate::introspection::{AddInterfaceError, DeviceIntrospection};
31use crate::prelude::DynamicIntrospection;
32use crate::retention::StoredRetention;
33use crate::retention::memory::VolatileStore;
34use crate::store::wrapper::StoreWrapper;
35use crate::store::{PropertyStore, StoreCapabilities};
36use crate::transport::{Connection, Register};
37
38use super::DeviceClient;
39
40impl<C> DeviceClient<C>
41where
42    C: Connection,
43{
44    // Cleans up an interface, it will remove the properties and retention values.
45    //
46    // For the datastream, we would have to check all the mappings for each retention type and then
47    // delete them from the stores (volatile and non). Instead we remove all the values with the
48    // given interface from each store.
49    async fn cleanup_interface(
50        volatile_store: &VolatileStore,
51        store: &StoreWrapper<C::Store>,
52        interface: &Interface,
53    ) {
54        match interface.inner() {
55            InterfaceTypeAggregation::DatastreamIndividual(interface) => {
56                Self::cleanup_retention(volatile_store, store, interface.name()).await
57            }
58            InterfaceTypeAggregation::DatastreamObject(interface) => {
59                Self::cleanup_retention(volatile_store, store, interface.name()).await
60            }
61            InterfaceTypeAggregation::Properties(properties) => {
62                let res = store.delete_interface(properties).await;
63
64                if let Err(err) = res {
65                    error!(error = %Report::new(err),"failed to remove interfaces from properties");
66                }
67            }
68        }
69    }
70
71    // Cleans up the volatile and store retention.
72    async fn cleanup_retention(
73        volatile_store: &VolatileStore,
74        store: &StoreWrapper<C::Store>,
75        interface_name: &str,
76    ) {
77        volatile_store.delete_interface(interface_name).await;
78
79        if let Some(retention) = store.get_retention() {
80            let res = retention.delete_interface(interface_name).await;
81
82            if let Err(err) = res {
83                error!(error = %Report::new(err),"failed to remove interfaces from retention");
84            }
85        }
86    }
87}
88
89impl<C> DeviceIntrospection for DeviceClient<C>
90where
91    C: Connection,
92{
93    async fn get_interface<F, O>(&self, interface_name: &str, mut f: F) -> O
94    where
95        F: FnMut(Option<&Interface>) -> O + Send,
96    {
97        let interfaces = self.state.interfaces().read().await;
98
99        f(interfaces.get(interface_name))
100    }
101}
102
103impl<C> DynamicIntrospection for DeviceClient<C>
104where
105    C: Connection,
106    C::Sender: Register,
107{
108    async fn add_interface(&mut self, interface: Interface) -> Result<bool, Error> {
109        // Lock for writing for the whole scope, even the checks
110        let mut interfaces = self.state.interfaces().write().await;
111
112        let map_err = interfaces
113            .validate(interface)
114            .map_err(AddInterfaceError::Interface)?;
115
116        let Some(to_add) = map_err else {
117            debug!("interfaces already present");
118
119            return Ok(false);
120        };
121
122        self.sender.add_interface(&interfaces, &to_add).await?;
123
124        if to_add.is_major_change() {
125            Self::cleanup_interface(self.state.volatile_store(), &self.store, &to_add).await;
126        }
127
128        debug!("adding interface to introspection");
129
130        interfaces.add(to_add);
131
132        Ok(true)
133    }
134
135    async fn extend_interfaces<I>(&mut self, iter: I) -> Result<Vec<String>, Error>
136    where
137        I: IntoIterator<Item = Interface> + Send,
138    {
139        // Lock for writing for the whole scope, even the checks
140        let mut interfaces = self.state.interfaces().write().await;
141
142        let to_add = interfaces
143            .validate_many(iter)
144            .map_err(AddInterfaceError::Interface)?;
145
146        if to_add.is_empty() {
147            debug!("All interfaces already present");
148            return Ok(Vec::new());
149        }
150
151        debug!("Adding {} interfaces", to_add.len());
152
153        self.sender.extend_interfaces(&interfaces, &to_add).await?;
154
155        let major_changes = to_add
156            .values()
157            .filter(|interface| interface.is_major_change());
158
159        for interface in major_changes {
160            Self::cleanup_interface(self.state.volatile_store(), &self.store, interface).await;
161        }
162
163        let names = to_add.keys().cloned().collect();
164
165        debug!("adding interfaces to introspection");
166
167        interfaces.extend(to_add);
168
169        debug!("Interfaces added");
170
171        Ok(names)
172    }
173
174    async fn add_interface_from_file<P>(&mut self, file_path: P) -> Result<bool, Error>
175    where
176        P: AsRef<Path> + Send + Sync,
177    {
178        let interface =
179            fs::read_to_string(&file_path)
180                .await
181                .map_err(|err| AddInterfaceError::Io {
182                    path: file_path.as_ref().to_owned(),
183                    backtrace: err,
184                })?;
185
186        let interface =
187            Interface::from_str(&interface).map_err(|err| AddInterfaceError::InterfaceFile {
188                path: file_path.as_ref().to_owned(),
189                backtrace: err,
190            })?;
191
192        self.add_interface(interface).await
193    }
194
195    async fn add_interface_from_str(&mut self, json_str: &str) -> Result<bool, Error> {
196        let interface = Interface::from_str(json_str).map_err(AddInterfaceError::Interface)?;
197
198        self.add_interface(interface).await
199    }
200
201    async fn remove_interface(&mut self, interface_name: &str) -> Result<bool, Error> {
202        // Lock for writing for the whole scope, even the checks
203        let mut interfaces = self.state.interfaces().write().await;
204
205        let Some(to_remove) = interfaces.get(interface_name) else {
206            debug!("{interface_name} not found, skipping");
207            return Ok(false);
208        };
209
210        self.sender.remove_interface(&interfaces, to_remove).await?;
211
212        Self::cleanup_interface(self.state.volatile_store(), &self.store, to_remove).await;
213
214        debug!("removing interface from introspection");
215
216        interfaces.remove(interface_name);
217
218        Ok(true)
219    }
220
221    async fn remove_interfaces<I>(&mut self, interfaces_name: I) -> Result<Vec<String>, Error>
222    where
223        I: IntoIterator<Item = String> + Send,
224        I::IntoIter: Send,
225    {
226        // Lock for writing for the whole scope, even the checks
227        let mut interfaces = self.state.interfaces().write().await;
228
229        let to_remove: HashMap<&str, &Interface> = interfaces_name
230            .into_iter()
231            .filter_map(|iface_name| {
232                let interface = interfaces.get(&iface_name).map(|i| (i.interface_name(), i));
233
234                if interface.is_none() {
235                    debug!("{iface_name} not found, skipping");
236                }
237
238                interface
239            })
240            .collect();
241
242        if to_remove.is_empty() {
243            return Ok(Vec::new());
244        }
245
246        self.sender
247            .remove_interfaces(&interfaces, &to_remove)
248            .await?;
249
250        for interface in to_remove.values() {
251            Self::cleanup_interface(self.state.volatile_store(), &self.store, interface).await;
252        }
253
254        let removed_names: Vec<String> = to_remove.keys().map(|k| k.to_string()).collect();
255
256        debug!("removing interfaces from introspection");
257
258        interfaces.remove_many(&removed_names);
259
260        Ok(removed_names)
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use astarte_interfaces::interface::Retention;
267    use astarte_interfaces::schema::Reliability;
268    use astarte_interfaces::{MappingPath, Properties};
269    use chrono::Utc;
270    use mockall::{Sequence, predicate};
271    use pretty_assertions::assert_eq;
272    use tempfile::TempDir;
273
274    use super::*;
275
276    use crate::AstarteData;
277    use crate::client::tests::{mock_client, mock_client_with_store};
278    use crate::interfaces::MappingRef;
279    use crate::interfaces::tests::{mock_validated_collection, mock_validated_interface};
280    use crate::retention::StoredRetentionExt;
281    use crate::state::ConnStatus;
282    use crate::store::{PropertyMapping, SqliteStore};
283    use crate::test::{
284        E2E_DEVICE_AGGREGATE, E2E_DEVICE_AGGREGATE_NAME, E2E_DEVICE_PROPERTY,
285        E2E_DEVICE_PROPERTY_NAME, for_update,
286    };
287    use crate::validate::ValidatedIndividual;
288
289    #[tokio::test]
290    async fn get_interface() {
291        let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
292
293        let client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
294
295        client
296            .get_interface(interface.interface_name(), |i| {
297                assert_eq!(i, Some(&interface));
298            })
299            .await;
300
301        client
302            .get_interface(E2E_DEVICE_PROPERTY_NAME, |i| {
303                assert_eq!(i, None);
304            })
305            .await;
306    }
307
308    #[tokio::test]
309    async fn add_interface_missing() {
310        let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
311
312        let mut client = mock_client(&[], ConnStatus::Connected);
313
314        let mut seq = Sequence::new();
315        client
316            .sender
317            .expect_add_interface()
318            .once()
319            .in_sequence(&mut seq)
320            .with(
321                predicate::always(),
322                predicate::eq(mock_validated_interface(interface.clone(), false)),
323            )
324            .returning(|_, _| Ok(()));
325
326        let added = client.add_interface(interface.clone()).await.unwrap();
327        assert!(added);
328
329        client
330            .get_interface(interface.interface_name(), |i| {
331                assert_eq!(i, Some(&interface));
332            })
333            .await;
334
335        let added = client.add_interface(interface).await.unwrap();
336        assert!(!added);
337    }
338
339    #[tokio::test]
340    async fn add_interface_missing_from_str() {
341        let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
342
343        let mut client = mock_client(&[], ConnStatus::Connected);
344
345        let mut seq = Sequence::new();
346        client
347            .sender
348            .expect_add_interface()
349            .once()
350            .in_sequence(&mut seq)
351            .with(
352                predicate::always(),
353                predicate::eq(mock_validated_interface(interface.clone(), false)),
354            )
355            .returning(|_, _| Ok(()));
356
357        let added = client
358            .add_interface_from_str(E2E_DEVICE_AGGREGATE)
359            .await
360            .unwrap();
361        assert!(added);
362
363        client
364            .get_interface(interface.interface_name(), |i| {
365                assert_eq!(i, Some(&interface));
366            })
367            .await;
368
369        let added = client.add_interface(interface).await.unwrap();
370        assert!(!added);
371    }
372
373    #[tokio::test]
374    async fn add_interface_missing_from_file() {
375        let interface = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
376
377        let mut client = mock_client(&[], ConnStatus::Connected);
378
379        let mut seq = Sequence::new();
380        client
381            .sender
382            .expect_add_interface()
383            .once()
384            .in_sequence(&mut seq)
385            .with(
386                predicate::always(),
387                predicate::eq(mock_validated_interface(interface.clone(), false)),
388            )
389            .returning(|_, _| Ok(()));
390
391        let dir = TempDir::new().unwrap();
392
393        let path = dir.path().join("interface");
394        std::fs::write(&path, E2E_DEVICE_AGGREGATE).unwrap();
395
396        let added = client.add_interface_from_file(&path).await.unwrap();
397        assert!(added);
398
399        client
400            .get_interface(interface.interface_name(), |i| {
401                assert_eq!(i, Some(&interface));
402            })
403            .await;
404
405        let added = client.add_interface(interface).await.unwrap();
406        assert!(!added);
407    }
408
409    #[tokio::test]
410    async fn add_interface_major_with_retention_volatile() {
411        let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
412
413        let mut client = mock_client(
414            &[for_update::E2E_DEVICE_DATASTREAM_0_1],
415            ConnStatus::Connected,
416        );
417
418        client
419            .state
420            .volatile_store()
421            .push_sent(
422                client.state.retention_ctx().next(),
423                ValidatedIndividual {
424                    interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
425                    path: "/sensor_1/volatile".to_string(),
426                    version_major: 0,
427                    reliability: Reliability::Guaranteed,
428                    retention: Retention::Volatile { expiry: None },
429                    data: AstarteData::try_from(42.0).unwrap(),
430                    timestamp: Some(Utc::now()),
431                },
432            )
433            .await;
434
435        let mut seq = Sequence::new();
436        client
437            .sender
438            .expect_add_interface()
439            .once()
440            .in_sequence(&mut seq)
441            .with(
442                predicate::always(),
443                predicate::eq(mock_validated_interface(updated.clone(), true)),
444            )
445            .returning(|_, _| Ok(()));
446
447        let added = client.add_interface(updated.clone()).await.unwrap();
448        assert!(added);
449
450        client
451            .get_interface(updated.interface_name(), |i| {
452                assert_eq!(i, Some(&updated));
453            })
454            .await;
455
456        assert!(client.state.volatile_store().pop_next().await.is_none());
457    }
458
459    #[tokio::test]
460    async fn add_interface_major_with_retention_stored() {
461        let dir = TempDir::new().unwrap();
462        let store = SqliteStore::options()
463            .with_writable_dir(dir.path())
464            .await
465            .unwrap();
466
467        let mut client = mock_client_with_store(
468            &[for_update::E2E_DEVICE_DATASTREAM_0_1],
469            ConnStatus::Connected,
470            store,
471        );
472
473        let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
474
475        let id = client.state.retention_ctx().next();
476        client
477            .store
478            .get_retention()
479            .unwrap()
480            .store_publish_individual(
481                &id,
482                &ValidatedIndividual {
483                    interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
484                    path: "/sensor_1/stored".to_string(),
485                    version_major: 0,
486                    reliability: Reliability::Guaranteed,
487                    retention: Retention::Stored { expiry: None },
488                    data: AstarteData::try_from(42.0).unwrap(),
489                    timestamp: Some(Utc::now()),
490                },
491                &[1, 2, 3, 4],
492                true,
493            )
494            .await
495            .unwrap();
496
497        let mut seq = Sequence::new();
498        client
499            .sender
500            .expect_add_interface()
501            .once()
502            .in_sequence(&mut seq)
503            .with(
504                predicate::always(),
505                predicate::eq(mock_validated_interface(updated.clone(), true)),
506            )
507            .returning(|_, _| Ok(()));
508
509        let added = client.add_interface(updated.clone()).await.unwrap();
510        assert!(added);
511
512        client
513            .get_interface(updated.interface_name(), |i| {
514                assert_eq!(i, Some(&updated));
515            })
516            .await;
517
518        let packets = client
519            .store
520            .get_retention()
521            .unwrap()
522            .fetch_all_interfaces()
523            .await
524            .unwrap();
525        assert!(packets.is_empty());
526    }
527
528    #[tokio::test]
529    async fn extend_interfaces_major_with_retention_volatile() {
530        let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
531
532        let mut client = mock_client(
533            &[for_update::E2E_DEVICE_DATASTREAM_0_1],
534            ConnStatus::Connected,
535        );
536
537        client
538            .state
539            .volatile_store()
540            .push_sent(
541                client.state.retention_ctx().next(),
542                ValidatedIndividual {
543                    interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
544                    path: "/sensor_1/volatile".to_string(),
545                    version_major: 0,
546                    reliability: Reliability::Guaranteed,
547                    retention: Retention::Volatile { expiry: None },
548                    data: AstarteData::try_from(42.0).unwrap(),
549                    timestamp: Some(Utc::now()),
550                },
551            )
552            .await;
553
554        let mut seq = Sequence::new();
555        client
556            .sender
557            .expect_extend_interfaces()
558            .once()
559            .in_sequence(&mut seq)
560            .with(
561                predicate::always(),
562                predicate::eq(mock_validated_collection(&[mock_validated_interface(
563                    updated.clone(),
564                    true,
565                )])),
566            )
567            .returning(|_, _| Ok(()));
568
569        let added = client.extend_interfaces([updated.clone()]).await.unwrap();
570        assert_eq!(added, vec![updated.interface_name()]);
571
572        client
573            .get_interface(updated.interface_name(), |i| {
574                assert_eq!(i, Some(&updated));
575            })
576            .await;
577
578        assert!(client.state.volatile_store().pop_next().await.is_none());
579    }
580
581    #[tokio::test]
582    async fn extend_interfaces_major_with_retention_stored() {
583        let dir = TempDir::new().unwrap();
584        let store = SqliteStore::options()
585            .with_writable_dir(dir.path())
586            .await
587            .unwrap();
588
589        let mut client = mock_client_with_store(
590            &[for_update::E2E_DEVICE_DATASTREAM_0_1],
591            ConnStatus::Connected,
592            store,
593        );
594
595        let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
596
597        let id = client.state.retention_ctx().next();
598        client
599            .store
600            .get_retention()
601            .unwrap()
602            .store_publish_individual(
603                &id,
604                &ValidatedIndividual {
605                    interface: for_update::E2E_DEVICE_DATASTREAM_NAME.to_string(),
606                    path: "/sensor_1/stored".to_string(),
607                    version_major: 0,
608                    reliability: Reliability::Guaranteed,
609                    retention: Retention::Stored { expiry: None },
610                    data: AstarteData::try_from(42.0).unwrap(),
611                    timestamp: Some(Utc::now()),
612                },
613                &[1, 2, 3, 4],
614                true,
615            )
616            .await
617            .unwrap();
618
619        let mut seq = Sequence::new();
620        client
621            .sender
622            .expect_extend_interfaces()
623            .once()
624            .in_sequence(&mut seq)
625            .with(
626                predicate::always(),
627                predicate::eq(mock_validated_collection(&[mock_validated_interface(
628                    updated.clone(),
629                    true,
630                )])),
631            )
632            .returning(|_, _| Ok(()));
633
634        let added = client.extend_interfaces([updated.clone()]).await.unwrap();
635        assert_eq!(added, [updated.interface_name()]);
636
637        client
638            .get_interface(updated.interface_name(), |i| {
639                assert_eq!(i, Some(&updated));
640            })
641            .await;
642
643        let packets = client
644            .store
645            .get_retention()
646            .unwrap()
647            .fetch_all_interfaces()
648            .await
649            .unwrap();
650        assert!(packets.is_empty());
651    }
652
653    #[tokio::test]
654    async fn extend_interfaces_nothing_to_add() {
655        let mut client = mock_client(
656            &[for_update::E2E_DEVICE_DATASTREAM_1_0],
657            ConnStatus::Connected,
658        );
659
660        let updated = Interface::from_str(for_update::E2E_DEVICE_DATASTREAM_1_0).unwrap();
661
662        let added = client.extend_interfaces([updated.clone()]).await.unwrap();
663        assert!(added.is_empty());
664
665        client
666            .get_interface(updated.interface_name(), |i| {
667                assert_eq!(i, Some(&updated));
668            })
669            .await;
670    }
671
672    #[tokio::test]
673    async fn remove_interface_present() {
674        let mut client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
675
676        let to_remove = Interface::from_str(E2E_DEVICE_AGGREGATE).unwrap();
677
678        let mut seq = Sequence::new();
679        client
680            .sender
681            .expect_remove_interface()
682            .once()
683            .in_sequence(&mut seq)
684            .with(predicate::always(), predicate::eq(to_remove.clone()))
685            .returning(|_, _| Ok(()));
686
687        let removed = client
688            .remove_interface(E2E_DEVICE_AGGREGATE_NAME)
689            .await
690            .unwrap();
691        assert!(removed);
692
693        client
694            .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
695                assert_eq!(i, None);
696            })
697            .await;
698    }
699
700    #[tokio::test]
701    async fn remove_interface_property() {
702        let mut client = mock_client(&[E2E_DEVICE_PROPERTY], ConnStatus::Connected);
703
704        let to_remove = Interface::from_str(E2E_DEVICE_PROPERTY).unwrap();
705
706        let path = "/sensor_1/double_endpoint";
707
708        client
709            .store
710            .store_prop(crate::store::StoredProp {
711                interface: E2E_DEVICE_PROPERTY_NAME,
712                path,
713                value: &AstarteData::LongInteger(2),
714                interface_major: to_remove.version_major(),
715                ownership: to_remove.ownership(),
716            })
717            .await
718            .unwrap();
719
720        let mut seq = Sequence::new();
721        client
722            .sender
723            .expect_remove_interface()
724            .once()
725            .in_sequence(&mut seq)
726            .with(predicate::always(), predicate::eq(to_remove.clone()))
727            .returning(|_, _| Ok(()));
728
729        let removed = client
730            .remove_interface(E2E_DEVICE_PROPERTY_NAME)
731            .await
732            .unwrap();
733        assert!(removed);
734
735        client
736            .get_interface(E2E_DEVICE_PROPERTY_NAME, |i| {
737                assert_eq!(i, None);
738            })
739            .await;
740
741        let prop = Properties::from_str(E2E_DEVICE_PROPERTY).unwrap();
742        let path = MappingPath::try_from(path).unwrap();
743        let mapping = MappingRef::new(&prop, &path).unwrap();
744
745        let res = client
746            .store
747            .load_prop(&PropertyMapping::from(&mapping))
748            .await
749            .unwrap();
750        assert_eq!(res, None);
751    }
752
753    #[tokio::test]
754    async fn remove_interface_not_found() {
755        let mut client = mock_client(&[], ConnStatus::Connected);
756
757        let removed = client
758            .remove_interface(E2E_DEVICE_AGGREGATE_NAME)
759            .await
760            .unwrap();
761        assert!(!removed);
762
763        client
764            .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
765                assert_eq!(i, None);
766            })
767            .await;
768    }
769
770    #[tokio::test]
771    async fn remove_interface_many_present() {
772        let mut client = mock_client(&[E2E_DEVICE_AGGREGATE], ConnStatus::Connected);
773
774        let mut seq = Sequence::new();
775        client
776            .sender
777            .expect_remove_interfaces()
778            .once()
779            .in_sequence(&mut seq)
780            // The hashmap require a reference that lives for static
781            .withf(|_, a| a.len() == 1 && a.contains_key(E2E_DEVICE_AGGREGATE_NAME))
782            .returning(|_, _| Ok(()));
783
784        let removed = client
785            .remove_interfaces([E2E_DEVICE_AGGREGATE_NAME.to_string()])
786            .await
787            .unwrap();
788        assert_eq!(removed, [E2E_DEVICE_AGGREGATE_NAME]);
789
790        client
791            .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
792                assert_eq!(i, None);
793            })
794            .await;
795    }
796
797    #[tokio::test]
798    async fn remove_interface_many_missing() {
799        let mut client = mock_client(&[], ConnStatus::Connected);
800
801        let removed = client
802            .remove_interfaces([E2E_DEVICE_AGGREGATE_NAME.to_string()])
803            .await
804            .unwrap();
805        assert!(removed.is_empty());
806
807        client
808            .get_interface(E2E_DEVICE_AGGREGATE_NAME, |i| {
809                assert_eq!(i, None);
810            })
811            .await;
812    }
813}