1use super::{
15 advertisement_data_type, ble::characteristic::BLECharacteristic,
16 ble::descriptor::BLEDescriptor, ble::device::BLEDevice, ble::service::BLEService, utils,
17};
18use crate::{
19 api::{
20 bleuuid::{uuid_from_u16, uuid_from_u32},
21 AddressType, BDAddr, CentralEvent, Characteristic, Descriptor, Peripheral as ApiPeripheral,
22 PeripheralProperties, Service, ValueNotification, WriteType,
23 },
24 common::{adapter_manager::AdapterManager, util::notifications_stream_from_broadcast_receiver},
25 Error, Result,
26};
27use async_trait::async_trait;
28use dashmap::DashMap;
29use futures::stream::Stream;
30use log::{trace, warn};
31#[cfg(feature = "serde")]
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "serde")]
34use serde_cr as serde;
35use std::{
36 collections::{BTreeSet, HashMap, HashSet},
37 convert::TryInto,
38 fmt::{self, Debug, Display, Formatter},
39 pin::Pin,
40 sync::atomic::{AtomicBool, Ordering},
41 sync::{Arc, RwLock},
42};
43use tokio::sync::broadcast;
44use uuid::Uuid;
45
46use std::sync::Weak;
47use windows::core::GUID;
48use windows::Devices::Bluetooth::GenericAttributeProfile::GattCharacteristic;
49use windows::Devices::Bluetooth::{Advertisement::*, BluetoothAddressType};
50
51#[cfg_attr(
52 feature = "serde",
53 derive(Serialize, Deserialize),
54 serde(crate = "serde_cr")
55)]
56#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
57pub struct PeripheralId(BDAddr);
58
59impl Display for PeripheralId {
60 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
61 Display::fmt(&self.0, f)
62 }
63}
64
65#[derive(Clone)]
67pub struct Peripheral {
68 shared: Arc<Shared>,
69}
70
71struct Shared {
72 device: tokio::sync::Mutex<Option<BLEDevice>>,
73 adapter: Weak<AdapterManager<Peripheral>>,
74 address: BDAddr,
75 connected: AtomicBool,
76 ble_services: DashMap<Uuid, BLEService>,
77 notifications_channel: broadcast::Sender<ValueNotification>,
78
79 address_type: RwLock<Option<AddressType>>,
81 local_name: RwLock<Option<String>>,
82 last_tx_power_level: RwLock<Option<i16>>, last_rssi: RwLock<Option<i16>>, latest_manufacturer_data: RwLock<HashMap<u16, Vec<u8>>>,
85 latest_service_data: RwLock<HashMap<Uuid, Vec<u8>>>,
86 services: RwLock<HashSet<Uuid>>,
87 class: RwLock<Option<u32>>,
88}
89
90impl Peripheral {
91 pub(crate) fn new(adapter: Weak<AdapterManager<Self>>, address: BDAddr) -> Self {
92 let (broadcast_sender, _) = broadcast::channel(16);
93 Peripheral {
94 shared: Arc::new(Shared {
95 adapter,
96 device: tokio::sync::Mutex::new(None),
97 address,
98 connected: AtomicBool::new(false),
99 ble_services: DashMap::new(),
100 notifications_channel: broadcast_sender,
101 address_type: RwLock::new(None),
102 local_name: RwLock::new(None),
103 last_tx_power_level: RwLock::new(None),
104 last_rssi: RwLock::new(None),
105 latest_manufacturer_data: RwLock::new(HashMap::new()),
106 latest_service_data: RwLock::new(HashMap::new()),
107 services: RwLock::new(HashSet::new()),
108 class: RwLock::new(None),
109 }),
110 }
111 }
112
113 fn derive_properties(&self) -> PeripheralProperties {
116 PeripheralProperties {
117 address: self.address(),
118 address_type: *self.shared.address_type.read().unwrap(),
119 local_name: self.shared.local_name.read().unwrap().clone(),
120 tx_power_level: *self.shared.last_tx_power_level.read().unwrap(),
121 rssi: *self.shared.last_rssi.read().unwrap(),
122 manufacturer_data: self.shared.latest_manufacturer_data.read().unwrap().clone(),
123 service_data: self.shared.latest_service_data.read().unwrap().clone(),
124 services: self
125 .shared
126 .services
127 .read()
128 .unwrap()
129 .iter()
130 .copied()
131 .collect(),
132 class: *self.shared.class.read().unwrap(),
133 }
134 }
135
136 pub(crate) fn update_properties(&self, args: &BluetoothLEAdvertisementReceivedEventArgs) {
137 let advertisement = args.Advertisement().unwrap();
138
139 if let Ok(name) = advertisement.LocalName() {
141 if !name.is_empty() {
142 let mut local_name_guard = self.shared.local_name.write().unwrap();
147 *local_name_guard = Some(name.to_string());
148 }
149 }
150 if let Ok(manufacturer_data) = advertisement.ManufacturerData() {
151 if manufacturer_data.Size().unwrap() > 0 {
152 let mut manufacturer_data_guard =
153 self.shared.latest_manufacturer_data.write().unwrap();
154 *manufacturer_data_guard = manufacturer_data
155 .into_iter()
156 .map(|d| {
157 let manufacturer_id = d.CompanyId().unwrap();
158 let data = utils::to_vec(&d.Data().unwrap());
159
160 (manufacturer_id, data)
161 })
162 .collect();
163
164 self.emit_event(CentralEvent::ManufacturerDataAdvertisement {
166 id: self.shared.address.into(),
167 manufacturer_data: manufacturer_data_guard.clone(),
168 });
169 }
170 }
171
172 if let Ok(data_sections) = advertisement.DataSections() {
175 let mut found_service_data = false;
177 for section in &data_sections {
178 match section.DataType().unwrap() {
179 advertisement_data_type::SERVICE_DATA_16_BIT_UUID
180 | advertisement_data_type::SERVICE_DATA_32_BIT_UUID
181 | advertisement_data_type::SERVICE_DATA_128_BIT_UUID => {
182 found_service_data = true;
183 break;
184 }
185 _ => {}
186 }
187 }
188 if found_service_data {
189 let mut service_data_guard = self.shared.latest_service_data.write().unwrap();
190
191 *service_data_guard = data_sections
192 .into_iter()
193 .filter_map(|d| {
194 let data = utils::to_vec(&d.Data().unwrap());
195
196 match d.DataType().unwrap() {
197 advertisement_data_type::SERVICE_DATA_16_BIT_UUID => {
198 let (uuid, data) = data.split_at(2);
199 let uuid =
200 uuid_from_u16(u16::from_le_bytes(uuid.try_into().unwrap()));
201 Some((uuid, data.to_owned()))
202 }
203 advertisement_data_type::SERVICE_DATA_32_BIT_UUID => {
204 let (uuid, data) = data.split_at(4);
205 let uuid =
206 uuid_from_u32(u32::from_le_bytes(uuid.try_into().unwrap()));
207 Some((uuid, data.to_owned()))
208 }
209 advertisement_data_type::SERVICE_DATA_128_BIT_UUID => {
210 let (uuid, data) = data.split_at(16);
211 let uuid = Uuid::from_slice(uuid).unwrap();
212 Some((uuid, data.to_owned()))
213 }
214 _ => None,
215 }
216 })
217 .collect();
218
219 self.emit_event(CentralEvent::ServiceDataAdvertisement {
221 id: self.shared.address.into(),
222 service_data: service_data_guard.clone(),
223 });
224 }
225 }
226
227 if let Ok(services) = advertisement.ServiceUuids() {
228 let mut found_new_service = false;
229
230 {
232 let services_guard_ro = self.shared.services.read().unwrap();
233
234 for uuid in &services {
237 if !services_guard_ro.contains(&utils::to_uuid(&uuid)) {
238 found_new_service = true;
239 break;
240 }
241 }
242 }
243
244 if found_new_service {
245 let mut services_guard = self.shared.services.write().unwrap();
246
247 for uuid in services {
253 services_guard.insert(utils::to_uuid(&uuid));
254 }
255
256 self.emit_event(CentralEvent::ServicesAdvertisement {
257 id: self.shared.address.into(),
258 services: services_guard.iter().copied().collect(),
259 });
260 }
261 }
262
263 if let Ok(address_type) = args.BluetoothAddressType() {
264 let mut address_type_guard = self.shared.address_type.write().unwrap();
265 *address_type_guard = match address_type {
266 BluetoothAddressType::Public => Some(AddressType::Public),
267 BluetoothAddressType::Random => Some(AddressType::Random),
268 _ => None,
269 };
270 }
271
272 if let Ok(tx_reference) = args.TransmitPowerLevelInDBm() {
273 if let Ok(tx) = tx_reference.Value() {
278 let mut tx_power_level_guard = self.shared.last_tx_power_level.write().unwrap();
279 *tx_power_level_guard = Some(tx);
280 }
281 }
282 if let Ok(rssi) = args.RawSignalStrengthInDBm() {
283 let mut rssi_guard = self.shared.last_rssi.write().unwrap();
284 *rssi_guard = Some(rssi);
285 }
286 }
287
288 fn emit_event(&self, event: CentralEvent) {
289 if let Some(manager) = self.shared.adapter.upgrade() {
290 manager.emit(event);
291 } else {
292 trace!("Could not emit an event. AdapterManager has been dropped");
293 }
294 }
295}
296
297impl Display for Peripheral {
298 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
299 let connected = if self.shared.connected.load(Ordering::Relaxed) {
300 " connected"
301 } else {
302 ""
303 };
304 write!(
305 f,
306 "{} {}{}",
307 self.shared.address,
308 self.shared
309 .local_name
310 .read()
311 .unwrap()
312 .clone()
313 .unwrap_or_else(|| "(unknown)".to_string()),
314 connected
315 )
316 }
317}
318
319impl Debug for Peripheral {
320 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
321 let connected = if self.shared.connected.load(Ordering::Relaxed) {
322 " connected"
323 } else {
324 ""
325 };
326 let properties = self.derive_properties();
327 write!(
328 f,
329 "{} properties: {:?}, services: {:?} {}",
330 self.shared.address, properties, self.shared.ble_services, connected
331 )
332 }
333}
334
335#[async_trait]
336impl ApiPeripheral for Peripheral {
337 fn id(&self) -> PeripheralId {
338 PeripheralId(self.shared.address)
339 }
340
341 fn address(&self) -> BDAddr {
343 self.shared.address
344 }
345
346 async fn properties(&self) -> Result<Option<PeripheralProperties>> {
349 Ok(Some(self.derive_properties()))
350 }
351
352 fn services(&self) -> BTreeSet<Service> {
353 self.shared
354 .ble_services
355 .iter()
356 .map(|item| item.value().to_service())
357 .collect()
358 }
359
360 async fn is_connected(&self) -> Result<bool> {
362 Ok(self.shared.connected.load(Ordering::Relaxed))
363 }
364
365 async fn connect(&self) -> Result<()> {
369 let shared_clone = Arc::downgrade(&self.shared);
370 let adapter_clone = self.shared.adapter.clone();
371 let address = self.shared.address;
372 let device = BLEDevice::new(
373 self.shared.address,
374 Box::new(move |is_connected| {
375 if let Some(shared) = shared_clone.upgrade() {
376 shared.connected.store(is_connected, Ordering::Relaxed);
377 }
378
379 if !is_connected {
380 if let Some(adapter) = adapter_clone.upgrade() {
381 adapter.emit(CentralEvent::DeviceDisconnected(address.into()));
382 }
383 }
384 }),
385 )
386 .await?;
387
388 device.connect().await?;
389 let mut d = self.shared.device.lock().await;
390 *d = Some(device);
391 self.shared.connected.store(true, Ordering::Relaxed);
392 self.emit_event(CentralEvent::DeviceConnected(self.shared.address.into()));
393 Ok(())
394 }
395
396 async fn disconnect(&self) -> Result<()> {
398 self.shared.ble_services.clear();
401 let mut device = self.shared.device.lock().await;
402 *device = None;
403 self.shared.connected.store(false, Ordering::Relaxed);
404 self.emit_event(CentralEvent::DeviceDisconnected(self.shared.address.into()));
405 Ok(())
406 }
407
408 async fn discover_services(&self) -> Result<()> {
410 let mut device = self.shared.device.lock().await;
411 if let Some(ref mut device) = *device {
412 let gatt_services = device.discover_services().await?;
413 for service in gatt_services {
414 let uuid = utils::to_uuid(&service.Uuid().unwrap());
415 if !self.shared.ble_services.contains_key(&uuid) {
416 match BLEDevice::get_characteristics(service).await {
417 Ok(characteristics) => {
418 let characteristics = characteristics
419 .into_iter()
420 .fold(
421 HashMap::<GUID, GattCharacteristic>::new(),
424 |mut map, gatt_characteristic| {
425 let uuid = gatt_characteristic.Uuid().unwrap_or_default();
426 if !map.contains_key(&uuid) {
427 map.insert(uuid, gatt_characteristic);
428 }
429 map
430 },
431 )
432 .into_iter()
433 .map(|(_, characteristic)| async {
434 let c = characteristic.clone();
435 (
436 characteristic,
437 BLEDevice::get_characteristic_descriptors(&c)
438 .await
439 .unwrap_or(Vec::new())
440 .into_iter()
441 .map(|descriptor| {
442 let descriptor = BLEDescriptor::new(descriptor);
443 (descriptor.uuid(), descriptor)
444 })
445 .collect(),
446 )
447 });
448
449 let characteristics = futures::future::join_all(characteristics)
450 .await
451 .into_iter()
452 .map(|(characteristic, descriptors)| {
453 let characteristic =
454 BLECharacteristic::new(characteristic, descriptors);
455 (characteristic.uuid(), characteristic)
456 })
457 .collect();
458
459 self.shared.ble_services.insert(
460 uuid,
461 BLEService {
462 uuid,
463 characteristics,
464 },
465 );
466 }
467 Err(e) => {
468 warn!("get_characteristics_async {:?}", e);
469 }
470 }
471 }
472 }
473 return Ok(());
474 }
475 Err(Error::NotConnected)
476 }
477
478 async fn write(
481 &self,
482 characteristic: &Characteristic,
483 data: &[u8],
484 write_type: WriteType,
485 ) -> Result<()> {
486 let ble_service = &*self
487 .shared
488 .ble_services
489 .get(&characteristic.service_uuid)
490 .ok_or_else(|| Error::NotSupported("Service not found for write".into()))?;
491 let ble_characteristic = ble_service
492 .characteristics
493 .get(&characteristic.uuid)
494 .ok_or_else(|| Error::NotSupported("Characteristic not found for write".into()))?;
495 ble_characteristic.write_value(data, write_type).await
496 }
497
498 async fn subscribe(&self, characteristic: &Characteristic) -> Result<()> {
501 let ble_service = &mut *self
502 .shared
503 .ble_services
504 .get_mut(&characteristic.service_uuid)
505 .ok_or_else(|| Error::NotSupported("Service not found for subscribe".into()))?;
506 let ble_characteristic = ble_service
507 .characteristics
508 .get_mut(&characteristic.uuid)
509 .ok_or_else(|| Error::NotSupported("Characteristic not found for subscribe".into()))?;
510 let notifications_sender = self.shared.notifications_channel.clone();
511 let uuid = characteristic.uuid;
512 ble_characteristic
513 .subscribe(Box::new(move |value| {
514 let notification = ValueNotification { uuid, value };
515 let _ = notifications_sender.send(notification);
518 }))
519 .await
520 }
521
522 async fn unsubscribe(&self, characteristic: &Characteristic) -> Result<()> {
525 let ble_service = &mut *self
526 .shared
527 .ble_services
528 .get_mut(&characteristic.service_uuid)
529 .ok_or_else(|| Error::NotSupported("Service not found for unsubscribe".into()))?;
530 let ble_characteristic = ble_service
531 .characteristics
532 .get_mut(&characteristic.uuid)
533 .ok_or_else(|| {
534 Error::NotSupported("Characteristic not found for unsubscribe".into())
535 })?;
536 ble_characteristic.unsubscribe().await
537 }
538
539 async fn read(&self, characteristic: &Characteristic) -> Result<Vec<u8>> {
540 let ble_service = &*self
541 .shared
542 .ble_services
543 .get(&characteristic.service_uuid)
544 .ok_or_else(|| Error::NotSupported("Service not found for read".into()))?;
545 let ble_characteristic = ble_service
546 .characteristics
547 .get(&characteristic.uuid)
548 .ok_or_else(|| Error::NotSupported("Characteristic not found for read".into()))?;
549 ble_characteristic.read_value().await
550 }
551
552 async fn notifications(&self) -> Result<Pin<Box<dyn Stream<Item = ValueNotification> + Send>>> {
553 let receiver = self.shared.notifications_channel.subscribe();
554 Ok(notifications_stream_from_broadcast_receiver(receiver))
555 }
556
557 async fn write_descriptor(&self, descriptor: &Descriptor, data: &[u8]) -> Result<()> {
558 let ble_service = &*self
559 .shared
560 .ble_services
561 .get(&descriptor.service_uuid)
562 .ok_or_else(|| Error::NotSupported("Service not found for write".into()))?;
563 let ble_characteristic = ble_service
564 .characteristics
565 .get(&descriptor.characteristic_uuid)
566 .ok_or_else(|| Error::NotSupported("Characteristic not found for write".into()))?;
567 let ble_descriptor = ble_characteristic
568 .descriptors
569 .get(&descriptor.uuid)
570 .ok_or_else(|| Error::NotSupported("Descriptor not found for write".into()))?;
571 ble_descriptor.write_value(data).await
572 }
573
574 async fn read_descriptor(&self, descriptor: &Descriptor) -> Result<Vec<u8>> {
575 let ble_service = &*self
576 .shared
577 .ble_services
578 .get(&descriptor.service_uuid)
579 .ok_or_else(|| Error::NotSupported("Service not found for read".into()))?;
580 let ble_characteristic = ble_service
581 .characteristics
582 .get(&descriptor.characteristic_uuid)
583 .ok_or_else(|| Error::NotSupported("Characteristic not found for read".into()))?;
584 let ble_descriptor = ble_characteristic
585 .descriptors
586 .get(&descriptor.uuid)
587 .ok_or_else(|| Error::NotSupported("Descriptor not found for write".into()))?;
588 ble_descriptor.read_value().await
589 }
590}
591
592impl From<BDAddr> for PeripheralId {
593 fn from(address: BDAddr) -> Self {
594 PeripheralId(address)
595 }
596}