1mod adapter;
11mod bleuuid;
12mod characteristic;
13mod descriptor;
14mod device;
15mod events;
16mod introspect;
17mod macaddress;
18mod messagestream;
19mod modalias;
20mod serde_path;
21mod service;
22
23pub use self::adapter::{AdapterId, AdapterInfo};
24pub use self::bleuuid::{uuid_from_u16, uuid_from_u32, BleUuid};
25pub use self::characteristic::{CharacteristicFlags, CharacteristicId, CharacteristicInfo};
26pub use self::descriptor::{DescriptorId, DescriptorInfo};
27pub use self::device::{AddressType, DeviceId, DeviceInfo};
28pub use self::events::{AdapterEvent, BluetoothEvent, CharacteristicEvent, DeviceEvent};
29use self::introspect::IntrospectParse;
30pub use self::macaddress::{MacAddress, ParseMacAddressError};
31use self::messagestream::MessageStream;
32pub use self::modalias::{Modalias, ParseModaliasError};
33pub use self::service::{ServiceId, ServiceInfo};
34use bluez_generated::{
35 OrgBluezAdapter1, OrgBluezAdapter1Properties, OrgBluezDevice1, OrgBluezDevice1Properties,
36 OrgBluezGattCharacteristic1, OrgBluezGattCharacteristic1Properties, OrgBluezGattDescriptor1,
37 OrgBluezGattService1, ORG_BLUEZ_ADAPTER1_NAME, ORG_BLUEZ_DEVICE1_NAME,
38 ORG_BLUEZ_GATT_CHARACTERISTIC1_NAME,
39};
40use dbus::arg::{PropMap, Variant};
41use dbus::nonblock::stdintf::org_freedesktop_dbus::{Introspectable, ObjectManager, Properties};
42use dbus::nonblock::{Proxy, SyncConnection};
43use dbus::Path;
44use dbus_tokio::connection::IOResourceError;
45use futures::stream::{self, select_all, StreamExt};
46use futures::{FutureExt, Stream};
47use std::collections::HashMap;
48use std::fmt::{self, Debug, Display, Formatter};
49use std::future::Future;
50use std::sync::Arc;
51use std::time::Duration;
52use thiserror::Error;
53use tokio::task::JoinError;
54use tokio::time::timeout;
55use uuid::Uuid;
56
57const DBUS_METHOD_CALL_TIMEOUT: Duration = Duration::from_secs(30);
58const DBUS_METHOD_CALL_MAX_TIMEOUT: Duration = Duration::from_secs(i32::MAX as u64);
61const SERVICE_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(5);
62
63#[derive(Debug, Error)]
65pub enum BluetoothError {
66 #[error("No Bluetooth adapters found.")]
68 NoBluetoothAdapters,
69 #[error(transparent)]
71 DbusError(#[from] dbus::Error),
72 #[error("Error parsing XML for introspection: {0}")]
74 XmlParseError(#[from] serde_xml_rs::Error),
75 #[error("Service or characteristic UUID {uuid} not found.")]
77 UuidNotFound { uuid: Uuid },
78 #[error("Error parsing UUID string: {0}")]
80 UuidParseError(#[from] uuid::Error),
81 #[error("Invalid characteristic flag {0:?}")]
83 FlagParseError(String),
84 #[error("Invalid address type {0}")]
86 AddressTypeParseError(String),
87 #[error("Required property {0} missing.")]
89 RequiredPropertyMissing(&'static str),
90 #[error("Service discovery timed out")]
92 ServiceDiscoveryTimedOut,
93 #[error(transparent)]
95 MacAddressParseError(#[from] ParseMacAddressError),
96 #[error(transparent)]
98 ModaliasParseError(#[from] ParseModaliasError),
99}
100
101#[derive(Debug, Error)]
103pub enum SpawnError {
104 #[error("D-Bus connection lost: {0}")]
105 DbusConnectionLost(#[source] IOResourceError),
106 #[error("Task failed: {0}")]
107 Join(#[from] JoinError),
108}
109
110#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum Transport {
113 Auto,
115 BrEdr,
117 Le,
119}
120
121impl Transport {
122 fn as_str(&self) -> &'static str {
123 match self {
124 Self::Auto => "auto",
125 Self::BrEdr => "bredr",
126 Self::Le => "le",
127 }
128 }
129}
130
131impl Display for Transport {
132 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
133 f.write_str(self.as_str())
134 }
135}
136
137#[derive(Clone, Debug, Default, Eq, PartialEq)]
143pub struct DiscoveryFilter {
144 pub service_uuids: Vec<Uuid>,
146 pub rssi_threshold: Option<i16>,
148 pub pathloss_threshold: Option<u16>,
149 pub transport: Option<Transport>,
151 pub duplicate_data: Option<bool>,
155 pub discoverable: Option<bool>,
157 pub pattern: Option<String>,
159}
160
161impl From<&DiscoveryFilter> for PropMap {
162 fn from(filter: &DiscoveryFilter) -> Self {
163 let mut map: PropMap = HashMap::new();
164 if !filter.service_uuids.is_empty() {
165 let uuids: Vec<String> = filter.service_uuids.iter().map(Uuid::to_string).collect();
166 map.insert("UUIDs".to_string(), Variant(Box::new(uuids)));
167 }
168 if let Some(rssi_threshold) = filter.rssi_threshold {
169 map.insert("RSSI".to_string(), Variant(Box::new(rssi_threshold)));
170 }
171 if let Some(pathloss_threshold) = filter.pathloss_threshold {
172 map.insert(
173 "Pathloss".to_string(),
174 Variant(Box::new(pathloss_threshold)),
175 );
176 }
177 if let Some(transport) = filter.transport {
178 map.insert(
179 "Transport".to_string(),
180 Variant(Box::new(transport.to_string())),
181 );
182 }
183 if let Some(duplicate_data) = filter.duplicate_data {
184 map.insert(
185 "DuplicateData".to_string(),
186 Variant(Box::new(duplicate_data)),
187 );
188 }
189 if let Some(discoverable) = filter.discoverable {
190 map.insert("Discoverable".to_string(), Variant(Box::new(discoverable)));
191 }
192 if let Some(pattern) = &filter.pattern {
193 map.insert("Pattern".to_string(), Variant(Box::new(pattern.to_owned())));
194 }
195 map
196 }
197}
198
199#[derive(Clone, Copy, Debug, Eq, PartialEq)]
201pub enum WriteType {
202 WithResponse,
206 WithoutResponse,
209 Reliable,
212}
213
214impl WriteType {
215 fn as_str(&self) -> &'static str {
216 match self {
217 Self::WithResponse => "request",
218 Self::WithoutResponse => "command",
219 Self::Reliable => "reliable",
220 }
221 }
222}
223
224impl Display for WriteType {
225 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
226 f.write_str(self.as_str())
227 }
228}
229
230#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
232pub struct WriteOptions {
233 pub offset: usize,
235 pub write_type: Option<WriteType>,
243}
244
245impl From<WriteOptions> for PropMap {
246 fn from(options: WriteOptions) -> Self {
247 let mut map: PropMap = HashMap::new();
248 if options.offset != 0 {
249 map.insert(
250 "offset".to_string(),
251 Variant(Box::new(options.offset as u64)),
252 );
253 }
254 if let Some(write_type) = options.write_type {
255 map.insert(
256 "type".to_string(),
257 Variant(Box::new(write_type.to_string())),
258 );
259 }
260 map
261 }
262}
263
264#[derive(Clone)]
267pub struct BluetoothSession {
268 connection: Arc<SyncConnection>,
269}
270
271impl Debug for BluetoothSession {
272 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
273 write!(f, "BluetoothSession")
274 }
275}
276
277impl BluetoothSession {
278 pub async fn new(
284 ) -> Result<(impl Future<Output = Result<(), SpawnError>>, Self), BluetoothError> {
285 let (dbus_resource, connection) = dbus_tokio::connection::new_system_sync()?;
287 connection.set_signal_match_mode(true);
290 let dbus_handle = tokio::spawn(async {
293 let err = dbus_resource.await;
294 Err(SpawnError::DbusConnectionLost(err))
295 });
296 Ok((dbus_handle.map(|res| res?), BluetoothSession { connection }))
297 }
298
299 pub async fn start_discovery(&self) -> Result<(), BluetoothError> {
304 self.start_discovery_with_filter(&DiscoveryFilter::default())
305 .await
306 }
307
308 pub async fn start_discovery_on_adapter(
314 &self,
315 adapter: &AdapterId,
316 ) -> Result<(), BluetoothError> {
317 self.start_discovery_on_adapter_with_filter(adapter, &DiscoveryFilter::default())
318 .await
319 }
320
321 pub async fn start_discovery_with_filter(
330 &self,
331 discovery_filter: &DiscoveryFilter,
332 ) -> Result<(), BluetoothError> {
333 let adapters = self.get_adapters().await?;
334 if adapters.is_empty() {
335 return Err(BluetoothError::NoBluetoothAdapters);
336 }
337
338 for adapter in adapters {
339 log::trace!("Starting discovery on adapter {}", adapter.id);
340 self.start_discovery_on_adapter_with_filter(&adapter.id, discovery_filter)
341 .await
342 .unwrap_or_else(|err| log::error!("starting discovery failed {:?}", err));
343 }
344 Ok(())
345 }
346
347 pub async fn start_discovery_on_adapter_with_filter(
356 &self,
357 adapter_id: &AdapterId,
358 discovery_filter: &DiscoveryFilter,
359 ) -> Result<(), BluetoothError> {
360 let adapter = self.adapter(adapter_id);
361 adapter.set_powered(true).await?;
362 adapter
363 .set_discovery_filter(discovery_filter.into())
364 .await?;
365 adapter.start_discovery().await?;
366 Ok(())
367 }
368
369 pub async fn stop_discovery(&self) -> Result<(), BluetoothError> {
371 let adapters = self.get_adapters().await?;
372 if adapters.is_empty() {
373 return Err(BluetoothError::NoBluetoothAdapters);
374 }
375
376 for adapter in adapters {
377 self.stop_discovery_on_adapter(&adapter.id).await?;
378 }
379
380 Ok(())
381 }
382
383 pub async fn stop_discovery_on_adapter(
385 &self,
386 adapter_id: &AdapterId,
387 ) -> Result<(), BluetoothError> {
388 let adapter = self.adapter(adapter_id);
389 adapter.stop_discovery().await?;
390 Ok(())
391 }
392
393 pub async fn get_adapters(&self) -> Result<Vec<AdapterInfo>, BluetoothError> {
395 let bluez_root = Proxy::new(
396 "org.bluez",
397 "/",
398 DBUS_METHOD_CALL_TIMEOUT,
399 self.connection.clone(),
400 );
401 let tree = bluez_root.get_managed_objects().await?;
404 Ok(tree
405 .into_iter()
406 .filter_map(|(object_path, interfaces)| {
407 let adapter_properties = OrgBluezAdapter1Properties::from_interfaces(&interfaces)?;
408 AdapterInfo::from_properties(AdapterId { object_path }, adapter_properties).ok()
409 })
410 .collect())
411 }
412
413 pub async fn get_devices(&self) -> Result<Vec<DeviceInfo>, BluetoothError> {
415 let bluez_root = Proxy::new(
416 "org.bluez",
417 "/",
418 DBUS_METHOD_CALL_TIMEOUT,
419 self.connection.clone(),
420 );
421 let tree = bluez_root.get_managed_objects().await?;
422
423 let devices = tree
424 .into_iter()
425 .filter_map(|(object_path, interfaces)| {
426 let device_properties = OrgBluezDevice1Properties::from_interfaces(&interfaces)?;
427 DeviceInfo::from_properties(DeviceId { object_path }, device_properties).ok()
428 })
429 .collect();
430 Ok(devices)
431 }
432
433 pub async fn get_devices_on_adapter(
435 &self,
436 adapter: &AdapterId,
437 ) -> Result<Vec<DeviceInfo>, BluetoothError> {
438 let devices = self.get_devices().await?;
439 Ok(devices
440 .into_iter()
441 .filter(|device| device.id.adapter() == *adapter)
442 .collect())
443 }
444
445 pub async fn get_services(
449 &self,
450 device: &DeviceId,
451 ) -> Result<Vec<ServiceInfo>, BluetoothError> {
452 let device_node = self
453 .device(device, DBUS_METHOD_CALL_TIMEOUT)
454 .introspect_parse()
455 .await?;
456 let mut services = vec![];
457 for subnode in device_node.nodes {
458 let subnode_name = subnode.name.as_ref().unwrap();
459 if subnode_name.starts_with("service") {
462 let service_id = ServiceId {
463 object_path: format!("{}/{}", device.object_path, subnode_name).into(),
464 };
465 let service = self.service(&service_id);
466 let uuid = Uuid::parse_str(&service.uuid().await?)?;
467 let primary = service.primary().await?;
468 services.push(ServiceInfo {
469 id: service_id,
470 uuid,
471 primary,
472 });
473 }
474 }
475 Ok(services)
476 }
477
478 pub async fn get_characteristics(
480 &self,
481 service: &ServiceId,
482 ) -> Result<Vec<CharacteristicInfo>, BluetoothError> {
483 let service_node = self.service(service).introspect_parse().await?;
484 let mut characteristics = vec![];
485 for subnode in service_node.nodes {
486 let subnode_name = subnode.name.as_ref().unwrap();
487 if subnode_name.starts_with("char") {
490 let characteristic_id = CharacteristicId {
491 object_path: format!("{}/{}", service.object_path, subnode_name).into(),
492 };
493 characteristics.push(self.get_characteristic_info(&characteristic_id).await?);
494 }
495 }
496 Ok(characteristics)
497 }
498
499 pub async fn get_descriptors(
501 &self,
502 characteristic: &CharacteristicId,
503 ) -> Result<Vec<DescriptorInfo>, BluetoothError> {
504 let characteristic_node = self
505 .characteristic(characteristic)
506 .introspect_parse()
507 .await?;
508 let mut descriptors = vec![];
509 for subnode in characteristic_node.nodes {
510 let subnode_name = subnode.name.as_ref().unwrap();
511 if subnode_name.starts_with("desc") {
514 let descriptor_id = DescriptorId {
515 object_path: format!("{}/{}", characteristic.object_path, subnode_name).into(),
516 };
517 let uuid = Uuid::parse_str(&self.descriptor(&descriptor_id).uuid().await?)?;
518 descriptors.push(DescriptorInfo {
519 id: descriptor_id,
520 uuid,
521 });
522 }
523 }
524 Ok(descriptors)
525 }
526
527 pub async fn get_service_by_uuid(
531 &self,
532 device: &DeviceId,
533 uuid: Uuid,
534 ) -> Result<ServiceInfo, BluetoothError> {
535 let services = self.get_services(device).await?;
536 services
537 .into_iter()
538 .find(|service_info| service_info.uuid == uuid)
539 .ok_or(BluetoothError::UuidNotFound { uuid })
540 }
541
542 pub async fn get_characteristic_by_uuid(
545 &self,
546 service: &ServiceId,
547 uuid: Uuid,
548 ) -> Result<CharacteristicInfo, BluetoothError> {
549 let characteristics = self.get_characteristics(service).await?;
550 characteristics
551 .into_iter()
552 .find(|characteristic_info| characteristic_info.uuid == uuid)
553 .ok_or(BluetoothError::UuidNotFound { uuid })
554 }
555
556 pub async fn get_service_characteristic_by_uuid(
561 &self,
562 device: &DeviceId,
563 service_uuid: Uuid,
564 characteristic_uuid: Uuid,
565 ) -> Result<CharacteristicInfo, BluetoothError> {
566 let service = self.get_service_by_uuid(device, service_uuid).await?;
567 self.get_characteristic_by_uuid(&service.id, characteristic_uuid)
568 .await
569 }
570
571 pub async fn get_device_info(&self, id: &DeviceId) -> Result<DeviceInfo, BluetoothError> {
573 let device = self.device(id, DBUS_METHOD_CALL_TIMEOUT);
574 let properties = device.get_all(ORG_BLUEZ_DEVICE1_NAME).await?;
575 DeviceInfo::from_properties(id.to_owned(), OrgBluezDevice1Properties(&properties))
576 }
577
578 pub async fn get_adapter_info(&self, id: &AdapterId) -> Result<AdapterInfo, BluetoothError> {
580 let adapter = self.adapter(id);
581 let properties = adapter.get_all(ORG_BLUEZ_ADAPTER1_NAME).await?;
582 AdapterInfo::from_properties(id.to_owned(), OrgBluezAdapter1Properties(&properties))
583 }
584
585 pub async fn get_service_info(&self, id: &ServiceId) -> Result<ServiceInfo, BluetoothError> {
587 let service = self.service(id);
588 let uuid = Uuid::parse_str(&service.uuid().await?)?;
589 let primary = service.primary().await?;
590 Ok(ServiceInfo {
591 id: id.to_owned(),
592 uuid,
593 primary,
594 })
595 }
596
597 pub async fn get_characteristic_info(
599 &self,
600 id: &CharacteristicId,
601 ) -> Result<CharacteristicInfo, BluetoothError> {
602 let characteristic = self.characteristic(id);
603 let properties = characteristic
604 .get_all(ORG_BLUEZ_GATT_CHARACTERISTIC1_NAME)
605 .await?;
606 CharacteristicInfo::from_properties(
607 id.to_owned(),
608 OrgBluezGattCharacteristic1Properties(&properties),
609 )
610 }
611
612 pub async fn get_descriptor_info(
614 &self,
615 id: &DescriptorId,
616 ) -> Result<DescriptorInfo, BluetoothError> {
617 let uuid = Uuid::parse_str(&self.descriptor(id).uuid().await?)?;
618 Ok(DescriptorInfo {
619 id: id.to_owned(),
620 uuid,
621 })
622 }
623
624 fn adapter(&self, id: &AdapterId) -> impl OrgBluezAdapter1 + Introspectable + Properties {
625 Proxy::new(
626 "org.bluez",
627 id.object_path.to_owned(),
628 DBUS_METHOD_CALL_TIMEOUT,
629 self.connection.clone(),
630 )
631 }
632
633 fn device(
634 &self,
635 id: &DeviceId,
636 timeout: Duration,
637 ) -> impl OrgBluezDevice1 + Introspectable + Properties {
638 let timeout = timeout.min(DBUS_METHOD_CALL_MAX_TIMEOUT);
639 Proxy::new(
640 "org.bluez",
641 id.object_path.to_owned(),
642 timeout,
643 self.connection.clone(),
644 )
645 }
646
647 fn service(&self, id: &ServiceId) -> impl OrgBluezGattService1 + Introspectable + Properties {
648 Proxy::new(
649 "org.bluez",
650 id.object_path.to_owned(),
651 DBUS_METHOD_CALL_TIMEOUT,
652 self.connection.clone(),
653 )
654 }
655
656 fn characteristic(
657 &self,
658 id: &CharacteristicId,
659 ) -> impl OrgBluezGattCharacteristic1 + Introspectable + Properties {
660 Proxy::new(
661 "org.bluez",
662 id.object_path.to_owned(),
663 DBUS_METHOD_CALL_TIMEOUT,
664 self.connection.clone(),
665 )
666 }
667
668 fn descriptor(
669 &self,
670 id: &DescriptorId,
671 ) -> impl OrgBluezGattDescriptor1 + Introspectable + Properties {
672 Proxy::new(
673 "org.bluez",
674 id.object_path.to_owned(),
675 DBUS_METHOD_CALL_TIMEOUT,
676 self.connection.clone(),
677 )
678 }
679
680 async fn await_service_discovery(&self, device_id: &DeviceId) -> Result<(), BluetoothError> {
682 let mut events = self.device_event_stream(device_id).await?;
684 if self
685 .device(device_id, DBUS_METHOD_CALL_TIMEOUT)
686 .services_resolved()
687 .await?
688 {
689 log::info!("Services already resolved.");
690 return Ok(());
691 }
692 timeout(SERVICE_DISCOVERY_TIMEOUT, async {
693 while let Some(event) = events.next().await {
694 if matches!(event, BluetoothEvent::Device {
695 id,
696 event: DeviceEvent::ServicesResolved,
697 } if device_id == &id)
698 {
699 return Ok(());
700 }
701 }
702
703 Err(BluetoothError::ServiceDiscoveryTimedOut)
705 })
706 .await
707 .unwrap_or(Err(BluetoothError::ServiceDiscoveryTimedOut))
708 }
709
710 pub async fn connect(&self, id: &DeviceId) -> Result<(), BluetoothError> {
712 self.connect_with_timeout(id, DBUS_METHOD_CALL_TIMEOUT)
713 .await
714 }
715
716 pub async fn connect_with_timeout(
718 &self,
719 id: &DeviceId,
720 timeout: Duration,
721 ) -> Result<(), BluetoothError> {
722 self.device(id, timeout).connect().await?;
723 self.await_service_discovery(id).await
724 }
725
726 pub async fn disconnect(&self, id: &DeviceId) -> Result<(), BluetoothError> {
728 Ok(self
729 .device(id, DBUS_METHOD_CALL_TIMEOUT)
730 .disconnect()
731 .await?)
732 }
733
734 pub async fn read_characteristic_value(
738 &self,
739 id: &CharacteristicId,
740 ) -> Result<Vec<u8>, BluetoothError> {
741 self.read_characteristic_value_with_offset(id, 0).await
742 }
743
744 pub async fn read_characteristic_value_with_offset(
746 &self,
747 id: &CharacteristicId,
748 offset: usize,
749 ) -> Result<Vec<u8>, BluetoothError> {
750 let characteristic = self.characteristic(id);
751 Ok(characteristic.read_value(offset_to_propmap(offset)).await?)
752 }
753
754 pub async fn write_characteristic_value(
758 &self,
759 id: &CharacteristicId,
760 value: impl Into<Vec<u8>>,
761 ) -> Result<(), BluetoothError> {
762 self.write_characteristic_value_with_options(id, value, WriteOptions::default())
763 .await
764 }
765
766 pub async fn write_characteristic_value_with_options(
768 &self,
769 id: &CharacteristicId,
770 value: impl Into<Vec<u8>>,
771 options: WriteOptions,
772 ) -> Result<(), BluetoothError> {
773 let characteristic = self.characteristic(id);
774 Ok(characteristic
775 .write_value(value.into(), options.into())
776 .await?)
777 }
778
779 pub async fn read_descriptor_value(
783 &self,
784 id: &DescriptorId,
785 ) -> Result<Vec<u8>, BluetoothError> {
786 self.read_descriptor_value_with_offset(id, 0).await
787 }
788
789 pub async fn read_descriptor_value_with_offset(
791 &self,
792 id: &DescriptorId,
793 offset: usize,
794 ) -> Result<Vec<u8>, BluetoothError> {
795 let descriptor = self.descriptor(id);
796 Ok(descriptor.read_value(offset_to_propmap(offset)).await?)
797 }
798
799 pub async fn write_descriptor_value(
803 &self,
804 id: &DescriptorId,
805 value: impl Into<Vec<u8>>,
806 ) -> Result<(), BluetoothError> {
807 self.write_descriptor_value_with_offset(id, value, 0).await
808 }
809
810 pub async fn write_descriptor_value_with_offset(
812 &self,
813 id: &DescriptorId,
814 value: impl Into<Vec<u8>>,
815 offset: usize,
816 ) -> Result<(), BluetoothError> {
817 let descriptor = self.descriptor(id);
818 Ok(descriptor
819 .write_value(value.into(), offset_to_propmap(offset))
820 .await?)
821 }
822
823 pub async fn start_notify(&self, id: &CharacteristicId) -> Result<(), BluetoothError> {
825 let characteristic = self.characteristic(id);
826 characteristic.start_notify().await?;
827 Ok(())
828 }
829
830 pub async fn stop_notify(&self, id: &CharacteristicId) -> Result<(), BluetoothError> {
832 let characteristic = self.characteristic(id);
833 characteristic.stop_notify().await?;
834 Ok(())
835 }
836
837 pub async fn event_stream(&self) -> Result<impl Stream<Item = BluetoothEvent>, BluetoothError> {
839 self.filtered_event_stream(None::<&DeviceId>, true).await
840 }
841
842 pub async fn adapter_event_stream(
845 &self,
846 adapter: &AdapterId,
847 ) -> Result<impl Stream<Item = BluetoothEvent>, BluetoothError> {
848 self.filtered_event_stream(Some(adapter), true).await
849 }
850
851 pub async fn device_event_stream(
857 &self,
858 device: &DeviceId,
859 ) -> Result<impl Stream<Item = BluetoothEvent>, BluetoothError> {
860 self.filtered_event_stream(Some(device), false).await
861 }
862
863 pub async fn characteristic_event_stream(
865 &self,
866 characteristic: &CharacteristicId,
867 ) -> Result<impl Stream<Item = BluetoothEvent>, BluetoothError> {
868 self.filtered_event_stream(Some(characteristic), false)
869 .await
870 }
871
872 async fn filtered_event_stream(
873 &self,
874 object: Option<&(impl Into<Path<'static>> + Clone)>,
875 device_discovery: bool,
876 ) -> Result<impl Stream<Item = BluetoothEvent>, BluetoothError> {
877 let mut message_streams = vec![];
878 for match_rule in BluetoothEvent::match_rules(object.cloned(), device_discovery) {
879 let msg_match = self.connection.add_match(match_rule).await?;
880 message_streams.push(MessageStream::new(msg_match, self.connection.clone()));
881 }
882 Ok(select_all(message_streams)
883 .flat_map(|message| stream::iter(BluetoothEvent::message_to_events(message))))
884 }
885}
886
887fn offset_to_propmap(offset: usize) -> PropMap {
888 let mut map: PropMap = HashMap::new();
889 if offset != 0 {
890 map.insert("offset".to_string(), Variant(Box::new(offset as u64)));
891 }
892 map
893}