1mod adapter;
11mod bleuuid;
12mod characteristic;
13mod descriptor;
14mod device;
15mod events;
16mod introspect;
17mod macaddress;
18mod messagestream;
19mod modalias;
20mod serde_path;
21mod service;
22
23pub use self::adapter::{AdapterId, AdapterInfo};
24pub use self::bleuuid::{BleUuid, uuid_from_u16, uuid_from_u32};
25pub use self::characteristic::{CharacteristicFlags, CharacteristicId, CharacteristicInfo};
26pub use self::descriptor::{DescriptorId, DescriptorInfo};
27pub use self::device::{AddressType, DeviceId, DeviceInfo};
28pub use self::events::{AdapterEvent, BluetoothEvent, CharacteristicEvent, DeviceEvent};
29use self::introspect::IntrospectParse;
30pub use self::macaddress::{MacAddress, ParseMacAddressError};
31use self::messagestream::MessageStream;
32pub use self::modalias::{Modalias, ParseModaliasError};
33pub use self::service::{ServiceId, ServiceInfo};
34use bluez_generated::{
35 ORG_BLUEZ_ADAPTER1_NAME, ORG_BLUEZ_DEVICE1_NAME, ORG_BLUEZ_GATT_CHARACTERISTIC1_NAME,
36 OrgBluezAdapter1, OrgBluezAdapter1Properties, OrgBluezDevice1, OrgBluezDevice1Properties,
37 OrgBluezGattCharacteristic1, OrgBluezGattCharacteristic1Properties, OrgBluezGattDescriptor1,
38 OrgBluezGattService1,
39};
40use dbus::Path;
41use dbus::arg::{PropMap, Variant};
42use dbus::nonblock::stdintf::org_freedesktop_dbus::{Introspectable, ObjectManager, Properties};
43use dbus::nonblock::{Proxy, SyncConnection};
44use dbus_tokio::connection::IOResourceError;
45use futures::stream::{self, StreamExt, select_all};
46use futures::{FutureExt, Stream};
47use std::collections::HashMap;
48use std::fmt::{self, Debug, Display, Formatter};
49use std::future::Future;
50use std::sync::Arc;
51use std::time::Duration;
52use thiserror::Error;
53use tokio::task::JoinError;
54use tokio::time::timeout;
55use uuid::Uuid;
56
57const DBUS_METHOD_CALL_TIMEOUT: Duration = Duration::from_secs(30);
58const DBUS_METHOD_CALL_MAX_TIMEOUT: Duration = Duration::from_secs(i32::MAX as u64);
61const SERVICE_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(5);
62
63#[derive(Debug, Error)]
65pub enum BluetoothError {
66 #[error("No Bluetooth adapters found.")]
68 NoBluetoothAdapters,
69 #[error(transparent)]
71 DbusError(#[from] dbus::Error),
72 #[error("Error parsing XML for introspection: {0}")]
74 XmlParseError(#[from] serde_xml_rs::Error),
75 #[error("Service or characteristic UUID {uuid} not found.")]
77 UuidNotFound { uuid: Uuid },
78 #[error("Error parsing UUID string: {0}")]
80 UuidParseError(#[from] uuid::Error),
81 #[error("Invalid characteristic flag {0:?}")]
83 FlagParseError(String),
84 #[error("Invalid address type {0}")]
86 AddressTypeParseError(String),
87 #[error("Required property {0} missing.")]
89 RequiredPropertyMissing(&'static str),
90 #[error("Service discovery timed out")]
92 ServiceDiscoveryTimedOut,
93 #[error(transparent)]
95 MacAddressParseError(#[from] ParseMacAddressError),
96 #[error(transparent)]
98 ModaliasParseError(#[from] ParseModaliasError),
99}
100
101#[derive(Debug, Error)]
103pub enum SpawnError {
104 #[error("D-Bus connection lost: {0}")]
105 DbusConnectionLost(#[source] IOResourceError),
106 #[error("Task failed: {0}")]
107 Join(#[from] JoinError),
108}
109
110#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum Transport {
113 Auto,
115 BrEdr,
117 Le,
119}
120
121impl Transport {
122 fn as_str(&self) -> &'static str {
123 match self {
124 Self::Auto => "auto",
125 Self::BrEdr => "bredr",
126 Self::Le => "le",
127 }
128 }
129}
130
131impl Display for Transport {
132 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
133 f.write_str(self.as_str())
134 }
135}
136
137#[derive(Clone, Debug, Default, Eq, PartialEq)]
143pub struct DiscoveryFilter {
144 pub service_uuids: Vec<Uuid>,
146 pub rssi_threshold: Option<i16>,
148 pub pathloss_threshold: Option<u16>,
149 pub transport: Option<Transport>,
151 pub duplicate_data: Option<bool>,
155 pub discoverable: Option<bool>,
157 pub pattern: Option<String>,
159}
160
161impl From<&DiscoveryFilter> for PropMap {
162 fn from(filter: &DiscoveryFilter) -> Self {
163 let mut map: PropMap = HashMap::new();
164 if !filter.service_uuids.is_empty() {
165 let uuids: Vec<String> = filter.service_uuids.iter().map(Uuid::to_string).collect();
166 map.insert("UUIDs".to_string(), Variant(Box::new(uuids)));
167 }
168 if let Some(rssi_threshold) = filter.rssi_threshold {
169 map.insert("RSSI".to_string(), Variant(Box::new(rssi_threshold)));
170 }
171 if let Some(pathloss_threshold) = filter.pathloss_threshold {
172 map.insert(
173 "Pathloss".to_string(),
174 Variant(Box::new(pathloss_threshold)),
175 );
176 }
177 if let Some(transport) = filter.transport {
178 map.insert(
179 "Transport".to_string(),
180 Variant(Box::new(transport.to_string())),
181 );
182 }
183 if let Some(duplicate_data) = filter.duplicate_data {
184 map.insert(
185 "DuplicateData".to_string(),
186 Variant(Box::new(duplicate_data)),
187 );
188 }
189 if let Some(discoverable) = filter.discoverable {
190 map.insert("Discoverable".to_string(), Variant(Box::new(discoverable)));
191 }
192 if let Some(pattern) = &filter.pattern {
193 map.insert("Pattern".to_string(), Variant(Box::new(pattern.to_owned())));
194 }
195 map
196 }
197}
198
199#[derive(Clone, Copy, Debug, Eq, PartialEq)]
201pub enum WriteType {
202 WithResponse,
206 WithoutResponse,
209 Reliable,
212}
213
214impl WriteType {
215 fn as_str(&self) -> &'static str {
216 match self {
217 Self::WithResponse => "request",
218 Self::WithoutResponse => "command",
219 Self::Reliable => "reliable",
220 }
221 }
222}
223
224impl Display for WriteType {
225 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
226 f.write_str(self.as_str())
227 }
228}
229
230#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
232pub struct WriteOptions {
233 pub offset: usize,
235 pub write_type: Option<WriteType>,
243}
244
245impl From<WriteOptions> for PropMap {
246 fn from(options: WriteOptions) -> Self {
247 let mut map: PropMap = HashMap::new();
248 if options.offset != 0 {
249 map.insert(
250 "offset".to_string(),
251 Variant(Box::new(options.offset as u64)),
252 );
253 }
254 if let Some(write_type) = options.write_type {
255 map.insert(
256 "type".to_string(),
257 Variant(Box::new(write_type.to_string())),
258 );
259 }
260 map
261 }
262}
263
264#[derive(Clone)]
267pub struct BluetoothSession {
268 connection: Arc<SyncConnection>,
269}
270
271impl Debug for BluetoothSession {
272 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
273 write!(f, "BluetoothSession")
274 }
275}
276
277impl BluetoothSession {
278 pub async fn new()
284 -> Result<(impl Future<Output = Result<(), SpawnError>>, Self), BluetoothError> {
285 let (dbus_resource, connection) = dbus_tokio::connection::new_system_sync()?;
287 connection.set_signal_match_mode(true);
290 let dbus_handle = tokio::spawn(async {
293 let err = dbus_resource.await;
294 Err(SpawnError::DbusConnectionLost(err))
295 });
296 Ok((dbus_handle.map(|res| res?), BluetoothSession { connection }))
297 }
298
299 pub async fn set_powered(
301 &self,
302 adapter_id: &AdapterId,
303 powered: bool,
304 ) -> Result<(), BluetoothError> {
305 self.adapter(adapter_id).set_powered(powered).await?;
306 Ok(())
307 }
308
309 pub async fn start_discovery(&self) -> Result<(), BluetoothError> {
314 self.start_discovery_with_filter(&DiscoveryFilter::default())
315 .await
316 }
317
318 pub async fn start_discovery_on_adapter(
324 &self,
325 adapter: &AdapterId,
326 ) -> Result<(), BluetoothError> {
327 self.start_discovery_on_adapter_with_filter(adapter, &DiscoveryFilter::default())
328 .await
329 }
330
331 pub async fn start_discovery_with_filter(
340 &self,
341 discovery_filter: &DiscoveryFilter,
342 ) -> Result<(), BluetoothError> {
343 let adapters = self.get_adapters().await?;
344 if adapters.is_empty() {
345 return Err(BluetoothError::NoBluetoothAdapters);
346 }
347
348 for adapter in adapters {
349 log::trace!("Starting discovery on adapter {}", adapter.id);
350 self.start_discovery_on_adapter_with_filter(&adapter.id, discovery_filter)
351 .await
352 .unwrap_or_else(|err| log::error!("starting discovery failed {:?}", err));
353 }
354 Ok(())
355 }
356
357 pub async fn start_discovery_on_adapter_with_filter(
366 &self,
367 adapter_id: &AdapterId,
368 discovery_filter: &DiscoveryFilter,
369 ) -> Result<(), BluetoothError> {
370 let adapter = self.adapter(adapter_id);
371 adapter.set_powered(true).await?;
372 adapter
373 .set_discovery_filter(discovery_filter.into())
374 .await?;
375 adapter.start_discovery().await?;
376 Ok(())
377 }
378
379 pub async fn stop_discovery(&self) -> Result<(), BluetoothError> {
381 let adapters = self.get_adapters().await?;
382 if adapters.is_empty() {
383 return Err(BluetoothError::NoBluetoothAdapters);
384 }
385
386 for adapter in adapters {
387 self.stop_discovery_on_adapter(&adapter.id).await?;
388 }
389
390 Ok(())
391 }
392
393 pub async fn stop_discovery_on_adapter(
395 &self,
396 adapter_id: &AdapterId,
397 ) -> Result<(), BluetoothError> {
398 let adapter = self.adapter(adapter_id);
399 adapter.stop_discovery().await?;
400 Ok(())
401 }
402
403 pub async fn get_adapters(&self) -> Result<Vec<AdapterInfo>, BluetoothError> {
405 let bluez_root = Proxy::new(
406 "org.bluez",
407 "/",
408 DBUS_METHOD_CALL_TIMEOUT,
409 self.connection.clone(),
410 );
411 let tree = bluez_root.get_managed_objects().await?;
414 Ok(tree
415 .into_iter()
416 .filter_map(|(object_path, interfaces)| {
417 let adapter_properties = OrgBluezAdapter1Properties::from_interfaces(&interfaces)?;
418 AdapterInfo::from_properties(AdapterId { object_path }, adapter_properties).ok()
419 })
420 .collect())
421 }
422
423 pub async fn get_devices(&self) -> Result<Vec<DeviceInfo>, BluetoothError> {
425 let bluez_root = Proxy::new(
426 "org.bluez",
427 "/",
428 DBUS_METHOD_CALL_TIMEOUT,
429 self.connection.clone(),
430 );
431 let tree = bluez_root.get_managed_objects().await?;
432
433 let devices = tree
434 .into_iter()
435 .filter_map(|(object_path, interfaces)| {
436 let device_properties = OrgBluezDevice1Properties::from_interfaces(&interfaces)?;
437 DeviceInfo::from_properties(DeviceId { object_path }, device_properties).ok()
438 })
439 .collect();
440 Ok(devices)
441 }
442
443 pub async fn get_devices_on_adapter(
445 &self,
446 adapter: &AdapterId,
447 ) -> Result<Vec<DeviceInfo>, BluetoothError> {
448 let devices = self.get_devices().await?;
449 Ok(devices
450 .into_iter()
451 .filter(|device| device.id.adapter() == *adapter)
452 .collect())
453 }
454
455 pub async fn get_services(
459 &self,
460 device: &DeviceId,
461 ) -> Result<Vec<ServiceInfo>, BluetoothError> {
462 let device_node = self
463 .device(device, DBUS_METHOD_CALL_TIMEOUT)
464 .introspect_parse()
465 .await?;
466 let mut services = vec![];
467 for subnode in device_node.nodes {
468 let subnode_name = subnode.name.as_ref().unwrap();
469 if subnode_name.starts_with("service") {
472 let service_id = ServiceId {
473 object_path: format!("{}/{}", device.object_path, subnode_name).into(),
474 };
475 let service = self.service(&service_id);
476 let uuid = Uuid::parse_str(&service.uuid().await?)?;
477 let primary = service.primary().await?;
478 services.push(ServiceInfo {
479 id: service_id,
480 uuid,
481 primary,
482 });
483 }
484 }
485 Ok(services)
486 }
487
488 pub async fn get_characteristics(
490 &self,
491 service: &ServiceId,
492 ) -> Result<Vec<CharacteristicInfo>, BluetoothError> {
493 let service_node = self.service(service).introspect_parse().await?;
494 let mut characteristics = vec![];
495 for subnode in service_node.nodes {
496 let subnode_name = subnode.name.as_ref().unwrap();
497 if subnode_name.starts_with("char") {
500 let characteristic_id = CharacteristicId {
501 object_path: format!("{}/{}", service.object_path, subnode_name).into(),
502 };
503 characteristics.push(self.get_characteristic_info(&characteristic_id).await?);
504 }
505 }
506 Ok(characteristics)
507 }
508
509 pub async fn get_descriptors(
511 &self,
512 characteristic: &CharacteristicId,
513 ) -> Result<Vec<DescriptorInfo>, BluetoothError> {
514 let characteristic_node = self
515 .characteristic(characteristic)
516 .introspect_parse()
517 .await?;
518 let mut descriptors = vec![];
519 for subnode in characteristic_node.nodes {
520 let subnode_name = subnode.name.as_ref().unwrap();
521 if subnode_name.starts_with("desc") {
524 let descriptor_id = DescriptorId {
525 object_path: format!("{}/{}", characteristic.object_path, subnode_name).into(),
526 };
527 let uuid = Uuid::parse_str(&self.descriptor(&descriptor_id).uuid().await?)?;
528 descriptors.push(DescriptorInfo {
529 id: descriptor_id,
530 uuid,
531 });
532 }
533 }
534 Ok(descriptors)
535 }
536
537 pub async fn get_service_by_uuid(
541 &self,
542 device: &DeviceId,
543 uuid: Uuid,
544 ) -> Result<ServiceInfo, BluetoothError> {
545 let services = self.get_services(device).await?;
546 services
547 .into_iter()
548 .find(|service_info| service_info.uuid == uuid)
549 .ok_or(BluetoothError::UuidNotFound { uuid })
550 }
551
552 pub async fn get_characteristic_by_uuid(
555 &self,
556 service: &ServiceId,
557 uuid: Uuid,
558 ) -> Result<CharacteristicInfo, BluetoothError> {
559 let characteristics = self.get_characteristics(service).await?;
560 characteristics
561 .into_iter()
562 .find(|characteristic_info| characteristic_info.uuid == uuid)
563 .ok_or(BluetoothError::UuidNotFound { uuid })
564 }
565
566 pub async fn get_service_characteristic_by_uuid(
571 &self,
572 device: &DeviceId,
573 service_uuid: Uuid,
574 characteristic_uuid: Uuid,
575 ) -> Result<CharacteristicInfo, BluetoothError> {
576 let service = self.get_service_by_uuid(device, service_uuid).await?;
577 self.get_characteristic_by_uuid(&service.id, characteristic_uuid)
578 .await
579 }
580
581 pub async fn get_device_info(&self, id: &DeviceId) -> Result<DeviceInfo, BluetoothError> {
583 let device = self.device(id, DBUS_METHOD_CALL_TIMEOUT);
584 let properties = device.get_all(ORG_BLUEZ_DEVICE1_NAME).await?;
585 DeviceInfo::from_properties(id.to_owned(), OrgBluezDevice1Properties(&properties))
586 }
587
588 pub async fn get_adapter_info(&self, id: &AdapterId) -> Result<AdapterInfo, BluetoothError> {
590 let adapter = self.adapter(id);
591 let properties = adapter.get_all(ORG_BLUEZ_ADAPTER1_NAME).await?;
592 AdapterInfo::from_properties(id.to_owned(), OrgBluezAdapter1Properties(&properties))
593 }
594
595 pub async fn get_service_info(&self, id: &ServiceId) -> Result<ServiceInfo, BluetoothError> {
597 let service = self.service(id);
598 let uuid = Uuid::parse_str(&service.uuid().await?)?;
599 let primary = service.primary().await?;
600 Ok(ServiceInfo {
601 id: id.to_owned(),
602 uuid,
603 primary,
604 })
605 }
606
607 pub async fn get_characteristic_info(
609 &self,
610 id: &CharacteristicId,
611 ) -> Result<CharacteristicInfo, BluetoothError> {
612 let characteristic = self.characteristic(id);
613 let properties = characteristic
614 .get_all(ORG_BLUEZ_GATT_CHARACTERISTIC1_NAME)
615 .await?;
616 CharacteristicInfo::from_properties(
617 id.to_owned(),
618 OrgBluezGattCharacteristic1Properties(&properties),
619 )
620 }
621
622 pub async fn get_descriptor_info(
624 &self,
625 id: &DescriptorId,
626 ) -> Result<DescriptorInfo, BluetoothError> {
627 let uuid = Uuid::parse_str(&self.descriptor(id).uuid().await?)?;
628 Ok(DescriptorInfo {
629 id: id.to_owned(),
630 uuid,
631 })
632 }
633
634 fn adapter(
635 &self,
636 id: &AdapterId,
637 ) -> impl OrgBluezAdapter1 + Introspectable + Properties + use<> {
638 Proxy::new(
639 "org.bluez",
640 id.object_path.to_owned(),
641 DBUS_METHOD_CALL_TIMEOUT,
642 self.connection.clone(),
643 )
644 }
645
646 fn device(
647 &self,
648 id: &DeviceId,
649 timeout: Duration,
650 ) -> impl OrgBluezDevice1 + Introspectable + Properties + use<> {
651 let timeout = timeout.min(DBUS_METHOD_CALL_MAX_TIMEOUT);
652 Proxy::new(
653 "org.bluez",
654 id.object_path.to_owned(),
655 timeout,
656 self.connection.clone(),
657 )
658 }
659
660 fn service(
661 &self,
662 id: &ServiceId,
663 ) -> impl OrgBluezGattService1 + Introspectable + Properties + use<> {
664 Proxy::new(
665 "org.bluez",
666 id.object_path.to_owned(),
667 DBUS_METHOD_CALL_TIMEOUT,
668 self.connection.clone(),
669 )
670 }
671
672 fn characteristic(
673 &self,
674 id: &CharacteristicId,
675 ) -> impl OrgBluezGattCharacteristic1 + Introspectable + Properties + use<> {
676 Proxy::new(
677 "org.bluez",
678 id.object_path.to_owned(),
679 DBUS_METHOD_CALL_TIMEOUT,
680 self.connection.clone(),
681 )
682 }
683
684 fn descriptor(
685 &self,
686 id: &DescriptorId,
687 ) -> impl OrgBluezGattDescriptor1 + Introspectable + Properties + use<> {
688 Proxy::new(
689 "org.bluez",
690 id.object_path.to_owned(),
691 DBUS_METHOD_CALL_TIMEOUT,
692 self.connection.clone(),
693 )
694 }
695
696 async fn await_service_discovery(&self, device_id: &DeviceId) -> Result<(), BluetoothError> {
698 let mut events = self.device_event_stream(device_id).await?;
700 if self
701 .device(device_id, DBUS_METHOD_CALL_TIMEOUT)
702 .services_resolved()
703 .await?
704 {
705 log::info!("Services already resolved.");
706 return Ok(());
707 }
708 timeout(SERVICE_DISCOVERY_TIMEOUT, async {
709 while let Some(event) = events.next().await {
710 if matches!(event, BluetoothEvent::Device {
711 id,
712 event: DeviceEvent::ServicesResolved,
713 } if device_id == &id)
714 {
715 return Ok(());
716 }
717 }
718
719 Err(BluetoothError::ServiceDiscoveryTimedOut)
721 })
722 .await
723 .unwrap_or(Err(BluetoothError::ServiceDiscoveryTimedOut))
724 }
725
726 pub async fn pair_with_timeout(
728 &self,
729 id: &DeviceId,
730 timeout: Duration,
731 ) -> Result<(), BluetoothError> {
732 self.device(id, timeout).pair().await?;
733 self.await_service_discovery(id).await
734 }
735
736 pub async fn cancel_pairing(&self, id: &DeviceId) -> Result<(), BluetoothError> {
738 Ok(self
739 .device(id, DBUS_METHOD_CALL_TIMEOUT)
740 .cancel_pairing()
741 .await?)
742 }
743
744 pub async fn connect(&self, id: &DeviceId) -> Result<(), BluetoothError> {
746 self.connect_with_timeout(id, DBUS_METHOD_CALL_TIMEOUT)
747 .await
748 }
749
750 pub async fn connect_with_timeout(
752 &self,
753 id: &DeviceId,
754 timeout: Duration,
755 ) -> Result<(), BluetoothError> {
756 self.device(id, timeout).connect().await?;
757 self.await_service_discovery(id).await
758 }
759
760 pub async fn disconnect(&self, id: &DeviceId) -> Result<(), BluetoothError> {
762 Ok(self
763 .device(id, DBUS_METHOD_CALL_TIMEOUT)
764 .disconnect()
765 .await?)
766 }
767
768 pub async fn read_characteristic_value(
772 &self,
773 id: &CharacteristicId,
774 ) -> Result<Vec<u8>, BluetoothError> {
775 self.read_characteristic_value_with_offset(id, 0).await
776 }
777
778 pub async fn read_characteristic_value_with_offset(
780 &self,
781 id: &CharacteristicId,
782 offset: usize,
783 ) -> Result<Vec<u8>, BluetoothError> {
784 let characteristic = self.characteristic(id);
785 Ok(characteristic.read_value(offset_to_propmap(offset)).await?)
786 }
787
788 pub async fn write_characteristic_value(
792 &self,
793 id: &CharacteristicId,
794 value: impl Into<Vec<u8>>,
795 ) -> Result<(), BluetoothError> {
796 self.write_characteristic_value_with_options(id, value, WriteOptions::default())
797 .await
798 }
799
800 pub async fn write_characteristic_value_with_options(
802 &self,
803 id: &CharacteristicId,
804 value: impl Into<Vec<u8>>,
805 options: WriteOptions,
806 ) -> Result<(), BluetoothError> {
807 let characteristic = self.characteristic(id);
808 Ok(characteristic
809 .write_value(value.into(), options.into())
810 .await?)
811 }
812
813 pub async fn read_descriptor_value(
817 &self,
818 id: &DescriptorId,
819 ) -> Result<Vec<u8>, BluetoothError> {
820 self.read_descriptor_value_with_offset(id, 0).await
821 }
822
823 pub async fn read_descriptor_value_with_offset(
825 &self,
826 id: &DescriptorId,
827 offset: usize,
828 ) -> Result<Vec<u8>, BluetoothError> {
829 let descriptor = self.descriptor(id);
830 Ok(descriptor.read_value(offset_to_propmap(offset)).await?)
831 }
832
833 pub async fn write_descriptor_value(
837 &self,
838 id: &DescriptorId,
839 value: impl Into<Vec<u8>>,
840 ) -> Result<(), BluetoothError> {
841 self.write_descriptor_value_with_offset(id, value, 0).await
842 }
843
844 pub async fn write_descriptor_value_with_offset(
846 &self,
847 id: &DescriptorId,
848 value: impl Into<Vec<u8>>,
849 offset: usize,
850 ) -> Result<(), BluetoothError> {
851 let descriptor = self.descriptor(id);
852 Ok(descriptor
853 .write_value(value.into(), offset_to_propmap(offset))
854 .await?)
855 }
856
857 pub async fn start_notify(&self, id: &CharacteristicId) -> Result<(), BluetoothError> {
859 let characteristic = self.characteristic(id);
860 characteristic.start_notify().await?;
861 Ok(())
862 }
863
864 pub async fn stop_notify(&self, id: &CharacteristicId) -> Result<(), BluetoothError> {
866 let characteristic = self.characteristic(id);
867 characteristic.stop_notify().await?;
868 Ok(())
869 }
870
871 pub async fn event_stream(
873 &self,
874 ) -> Result<impl Stream<Item = BluetoothEvent> + use<>, BluetoothError> {
875 self.filtered_event_stream(None::<&DeviceId>, true).await
876 }
877
878 pub async fn adapter_event_stream(
881 &self,
882 adapter: &AdapterId,
883 ) -> Result<impl Stream<Item = BluetoothEvent> + use<>, BluetoothError> {
884 self.filtered_event_stream(Some(adapter), true).await
885 }
886
887 pub async fn device_event_stream(
893 &self,
894 device: &DeviceId,
895 ) -> Result<impl Stream<Item = BluetoothEvent> + use<>, BluetoothError> {
896 self.filtered_event_stream(Some(device), false).await
897 }
898
899 pub async fn characteristic_event_stream(
901 &self,
902 characteristic: &CharacteristicId,
903 ) -> Result<impl Stream<Item = BluetoothEvent> + use<>, BluetoothError> {
904 self.filtered_event_stream(Some(characteristic), false)
905 .await
906 }
907
908 async fn filtered_event_stream<P: Into<Path<'static>> + Clone>(
909 &self,
910 object: Option<&P>,
911 device_discovery: bool,
912 ) -> Result<impl Stream<Item = BluetoothEvent> + use<P>, BluetoothError> {
913 let mut message_streams = vec![];
914 for match_rule in BluetoothEvent::match_rules(object.cloned(), device_discovery) {
915 let msg_match = self.connection.add_match(match_rule).await?;
916 message_streams.push(MessageStream::new(msg_match, self.connection.clone()));
917 }
918 Ok(select_all(message_streams)
919 .flat_map(|message| stream::iter(BluetoothEvent::message_to_events(message))))
920 }
921}
922
923fn offset_to_propmap(offset: usize) -> PropMap {
924 let mut map: PropMap = HashMap::new();
925 if offset != 0 {
926 map.insert("offset".to_string(), Variant(Box::new(offset as u64)));
927 }
928 map
929}