use super::{
advertisement_data_type, ble::characteristic::BLECharacteristic,
ble::descriptor::BLEDescriptor, ble::device::BLEDevice, ble::service::BLEService, utils,
};
use crate::{
Error, Result,
api::{
self, AddressType, BDAddr, CentralEvent, Characteristic, ConnectionParameterPreset,
ConnectionParameters, Descriptor, Peripheral as ApiPeripheral, PeripheralProperties,
Service, ValueNotification, WriteType,
bleuuid::{uuid_from_u16, uuid_from_u32},
},
common::{adapter_manager::AdapterManager, util::notifications_stream_from_broadcast_receiver},
};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::stream::Stream;
use log::{trace, warn};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "serde")]
use serde_cr as serde;
use std::{
collections::{BTreeSet, HashMap, HashSet},
convert::TryInto,
fmt::{self, Debug, Display, Formatter},
pin::Pin,
sync::atomic::{AtomicBool, AtomicU16, Ordering},
sync::{Arc, RwLock},
};
use tokio::sync::broadcast;
use uuid::Uuid;
use std::sync::Weak;
use windows::Devices::Bluetooth::GenericAttributeProfile::GattCharacteristic;
use windows::Devices::Bluetooth::{Advertisement::*, BluetoothAddressType};
use windows::core::GUID;
#[cfg_attr(
feature = "serde",
derive(Serialize, Deserialize),
serde(crate = "serde_cr")
)]
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct PeripheralId(BDAddr);
impl Display for PeripheralId {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
Display::fmt(&self.0, f)
}
}
#[derive(Clone)]
pub struct Peripheral {
shared: Arc<Shared>,
}
struct Shared {
device: tokio::sync::Mutex<Option<BLEDevice>>,
adapter: Weak<AdapterManager<Peripheral>>,
address: BDAddr,
mtu: AtomicU16,
connected: AtomicBool,
ble_services: DashMap<Uuid, BLEService>,
notifications_channel: broadcast::Sender<ValueNotification>,
address_type: RwLock<Option<AddressType>>,
local_name: RwLock<Option<String>>,
advertisement_name: RwLock<Option<String>>,
last_tx_power_level: RwLock<Option<i16>>, last_rssi: RwLock<Option<i16>>, latest_manufacturer_data: RwLock<HashMap<u16, Vec<u8>>>,
latest_service_data: RwLock<HashMap<Uuid, Vec<u8>>>,
services: RwLock<HashSet<Uuid>>,
class: RwLock<Option<u32>>,
}
impl Peripheral {
pub(crate) fn new(adapter: Weak<AdapterManager<Self>>, address: BDAddr) -> Self {
let (broadcast_sender, _) = broadcast::channel(16);
Peripheral {
shared: Arc::new(Shared {
adapter,
device: tokio::sync::Mutex::new(None),
address,
mtu: AtomicU16::new(api::DEFAULT_MTU_SIZE),
connected: AtomicBool::new(false),
ble_services: DashMap::new(),
notifications_channel: broadcast_sender,
address_type: RwLock::new(None),
local_name: RwLock::new(None),
advertisement_name: RwLock::new(None),
last_tx_power_level: RwLock::new(None),
last_rssi: RwLock::new(None),
latest_manufacturer_data: RwLock::new(HashMap::new()),
latest_service_data: RwLock::new(HashMap::new()),
services: RwLock::new(HashSet::new()),
class: RwLock::new(None),
}),
}
}
fn derive_properties(&self) -> PeripheralProperties {
PeripheralProperties {
address: self.address(),
address_type: *self.shared.address_type.read().unwrap(),
local_name: self.shared.local_name.read().unwrap().clone(),
advertisement_name: self.shared.advertisement_name.read().unwrap().clone(),
tx_power_level: *self.shared.last_tx_power_level.read().unwrap(),
rssi: *self.shared.last_rssi.read().unwrap(),
manufacturer_data: self.shared.latest_manufacturer_data.read().unwrap().clone(),
service_data: self.shared.latest_service_data.read().unwrap().clone(),
services: self
.shared
.services
.read()
.unwrap()
.iter()
.copied()
.collect(),
class: *self.shared.class.read().unwrap(),
}
}
pub(crate) fn update_properties(&self, args: &BluetoothLEAdvertisementReceivedEventArgs) {
let advertisement = args.Advertisement().unwrap();
if let Ok(name) = advertisement.LocalName() {
if !name.is_empty() {
let name_str = name.to_string();
let mut adv_name_guard = self.shared.advertisement_name.write().unwrap();
*adv_name_guard = Some(name_str.clone());
drop(adv_name_guard);
let local_name_guard = self.shared.local_name.read().unwrap();
if local_name_guard.is_none() {
drop(local_name_guard);
let mut local_name_guard = self.shared.local_name.write().unwrap();
*local_name_guard = Some(name_str);
}
}
}
if let Ok(manufacturer_data) = advertisement.ManufacturerData() {
if manufacturer_data.Size().unwrap() > 0 {
let mut manufacturer_data_guard =
self.shared.latest_manufacturer_data.write().unwrap();
*manufacturer_data_guard = manufacturer_data
.into_iter()
.map(|d| {
let manufacturer_id = d.CompanyId().unwrap();
let data = utils::to_vec(&d.Data().unwrap());
(manufacturer_id, data)
})
.collect();
self.emit_event(CentralEvent::ManufacturerDataAdvertisement {
id: self.shared.address.into(),
manufacturer_data: manufacturer_data_guard.clone(),
});
}
}
if let Ok(data_sections) = advertisement.DataSections() {
let mut found_service_data = false;
for section in &data_sections {
match section.DataType().unwrap() {
advertisement_data_type::SERVICE_DATA_16_BIT_UUID
| advertisement_data_type::SERVICE_DATA_32_BIT_UUID
| advertisement_data_type::SERVICE_DATA_128_BIT_UUID => {
found_service_data = true;
break;
}
_ => {}
}
}
if found_service_data {
let mut service_data_guard = self.shared.latest_service_data.write().unwrap();
*service_data_guard = data_sections
.into_iter()
.filter_map(|d| {
let data = utils::to_vec(&d.Data().unwrap());
match d.DataType().unwrap() {
advertisement_data_type::SERVICE_DATA_16_BIT_UUID => {
let (uuid, data) = data.split_at(2);
let uuid =
uuid_from_u16(u16::from_le_bytes(uuid.try_into().unwrap()));
Some((uuid, data.to_owned()))
}
advertisement_data_type::SERVICE_DATA_32_BIT_UUID => {
let (uuid, data) = data.split_at(4);
let uuid =
uuid_from_u32(u32::from_le_bytes(uuid.try_into().unwrap()));
Some((uuid, data.to_owned()))
}
advertisement_data_type::SERVICE_DATA_128_BIT_UUID => {
let (uuid, data) = data.split_at(16);
let uuid = Uuid::from_slice(uuid).unwrap();
Some((uuid, data.to_owned()))
}
_ => None,
}
})
.collect();
self.emit_event(CentralEvent::ServiceDataAdvertisement {
id: self.shared.address.into(),
service_data: service_data_guard.clone(),
});
}
}
if let Ok(services) = advertisement.ServiceUuids() {
let mut found_new_service = false;
{
let services_guard_ro = self.shared.services.read().unwrap();
for uuid in &services {
if !services_guard_ro.contains(&utils::to_uuid(&uuid)) {
found_new_service = true;
break;
}
}
}
if found_new_service {
let mut services_guard = self.shared.services.write().unwrap();
for uuid in services {
services_guard.insert(utils::to_uuid(&uuid));
}
self.emit_event(CentralEvent::ServicesAdvertisement {
id: self.shared.address.into(),
services: services_guard.iter().copied().collect(),
});
}
}
if let Ok(address_type) = args.BluetoothAddressType() {
let mut address_type_guard = self.shared.address_type.write().unwrap();
*address_type_guard = match address_type {
BluetoothAddressType::Public => Some(AddressType::Public),
BluetoothAddressType::Random => Some(AddressType::Random),
_ => None,
};
}
if let Ok(tx_reference) = args.TransmitPowerLevelInDBm() {
if let Ok(tx) = tx_reference.Value() {
let mut tx_power_level_guard = self.shared.last_tx_power_level.write().unwrap();
*tx_power_level_guard = Some(tx);
}
}
if let Ok(rssi) = args.RawSignalStrengthInDBm() {
let mut rssi_guard = self.shared.last_rssi.write().unwrap();
let old_rssi = *rssi_guard;
*rssi_guard = Some(rssi);
drop(rssi_guard);
if old_rssi != Some(rssi) {
self.emit_event(CentralEvent::RssiUpdate {
id: self.shared.address.into(),
rssi,
});
}
}
}
fn emit_event(&self, event: CentralEvent) {
if let Some(manager) = self.shared.adapter.upgrade() {
manager.emit(event);
} else {
trace!("Could not emit an event. AdapterManager has been dropped");
}
}
}
impl Display for Peripheral {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let connected = if self.shared.connected.load(Ordering::Relaxed) {
" connected"
} else {
""
};
write!(
f,
"{} {}{}",
self.shared.address,
self.shared
.local_name
.read()
.unwrap()
.clone()
.unwrap_or_else(|| "(unknown)".to_string()),
connected
)
}
}
impl Debug for Peripheral {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let connected = if self.shared.connected.load(Ordering::Relaxed) {
" connected"
} else {
""
};
let properties = self.derive_properties();
write!(
f,
"{} properties: {:?}, services: {:?} {}",
self.shared.address, properties, self.shared.ble_services, connected
)
}
}
#[async_trait]
impl ApiPeripheral for Peripheral {
fn id(&self) -> PeripheralId {
PeripheralId(self.shared.address)
}
fn address(&self) -> BDAddr {
self.shared.address
}
fn mtu(&self) -> u16 {
self.shared.mtu.load(Ordering::Relaxed)
}
async fn properties(&self) -> Result<Option<PeripheralProperties>> {
Ok(Some(self.derive_properties()))
}
fn services(&self) -> BTreeSet<Service> {
self.shared
.ble_services
.iter()
.map(|item| item.value().to_service())
.collect()
}
async fn is_connected(&self) -> Result<bool> {
Ok(self.shared.connected.load(Ordering::Relaxed))
}
async fn connect(&self) -> Result<()> {
let adapter_clone = self.shared.adapter.clone();
let address = self.shared.address;
let connection_status_changed = Box::new({
let shared_clone = Arc::downgrade(&self.shared);
move |is_connected| {
if let Some(shared) = shared_clone.upgrade() {
shared.connected.store(is_connected, Ordering::Relaxed);
}
if !is_connected {
if let Some(adapter) = adapter_clone.upgrade() {
adapter.emit(CentralEvent::DeviceDisconnected(address.into()));
}
}
}
});
let max_pdu_size_changed = Box::new({
let shared_clone = Arc::downgrade(&self.shared);
move |mtu| {
if let Some(shared) = shared_clone.upgrade() {
shared.mtu.store(mtu, Ordering::Relaxed);
}
}
});
let device = BLEDevice::new(
self.shared.address,
connection_status_changed,
max_pdu_size_changed,
)
.await?;
device.connect().await?;
if let Ok(name) = device.name() {
let name_str = name.to_string();
if !name_str.is_empty() {
let mut local_name_guard = self.shared.local_name.write().unwrap();
*local_name_guard = Some(name_str);
}
}
let mut d = self.shared.device.lock().await;
*d = Some(device);
self.shared.connected.store(true, Ordering::Relaxed);
self.emit_event(CentralEvent::DeviceConnected(self.shared.address.into()));
Ok(())
}
async fn disconnect(&self) -> Result<()> {
self.shared.ble_services.clear();
let mut device = self.shared.device.lock().await;
*device = None;
self.shared.connected.store(false, Ordering::Relaxed);
self.emit_event(CentralEvent::DeviceDisconnected(self.shared.address.into()));
Ok(())
}
async fn discover_services(&self) -> Result<()> {
let mut device = self.shared.device.lock().await;
if let Some(ref mut device) = *device {
let gatt_services = device.discover_services().await?;
for service in gatt_services {
let uuid = utils::to_uuid(&service.Uuid().unwrap());
if !self.shared.ble_services.contains_key(&uuid) {
match BLEDevice::get_characteristics(service).await {
Ok(characteristics) => {
let characteristics = characteristics
.into_iter()
.fold(
HashMap::<GUID, GattCharacteristic>::new(),
|mut map, gatt_characteristic| {
let uuid = gatt_characteristic.Uuid().unwrap_or_default();
if !map.contains_key(&uuid) {
map.insert(uuid, gatt_characteristic);
}
map
},
)
.into_iter()
.map(|(_, characteristic)| async {
let c = characteristic.clone();
(
characteristic,
BLEDevice::get_characteristic_descriptors(&c)
.await
.unwrap_or(Vec::new())
.into_iter()
.map(|descriptor| {
let descriptor = BLEDescriptor::new(descriptor);
(descriptor.uuid(), descriptor)
})
.collect(),
)
});
let characteristics = futures::future::join_all(characteristics)
.await
.into_iter()
.map(|(characteristic, descriptors)| {
let characteristic =
BLECharacteristic::new(characteristic, descriptors);
(characteristic.uuid(), characteristic)
})
.collect();
self.shared.ble_services.insert(
uuid,
BLEService {
uuid,
characteristics,
},
);
}
Err(e) => {
warn!("get_characteristics_async {:?}", e);
}
}
}
}
return Ok(());
}
Err(Error::NotConnected)
}
async fn write(
&self,
characteristic: &Characteristic,
data: &[u8],
write_type: WriteType,
) -> Result<()> {
let ble_service = &*self
.shared
.ble_services
.get(&characteristic.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for write".into()))?;
let ble_characteristic = ble_service
.characteristics
.get(&characteristic.uuid)
.ok_or_else(|| Error::NotSupported("Characteristic not found for write".into()))?;
ble_characteristic.write_value(data, write_type).await
}
async fn subscribe(&self, characteristic: &Characteristic) -> Result<()> {
let ble_service = &mut *self
.shared
.ble_services
.get_mut(&characteristic.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for subscribe".into()))?;
let ble_characteristic = ble_service
.characteristics
.get_mut(&characteristic.uuid)
.ok_or_else(|| Error::NotSupported("Characteristic not found for subscribe".into()))?;
let notifications_sender = self.shared.notifications_channel.clone();
let uuid = characteristic.uuid;
let service_uuid = characteristic.service_uuid;
ble_characteristic
.subscribe(Box::new(move |value| {
let notification = ValueNotification {
uuid,
service_uuid,
value,
};
let _ = notifications_sender.send(notification);
}))
.await
}
async fn unsubscribe(&self, characteristic: &Characteristic) -> Result<()> {
let ble_service = &mut *self
.shared
.ble_services
.get_mut(&characteristic.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for unsubscribe".into()))?;
let ble_characteristic = ble_service
.characteristics
.get_mut(&characteristic.uuid)
.ok_or_else(|| {
Error::NotSupported("Characteristic not found for unsubscribe".into())
})?;
ble_characteristic.unsubscribe().await
}
async fn read(&self, characteristic: &Characteristic) -> Result<Vec<u8>> {
let ble_service = &*self
.shared
.ble_services
.get(&characteristic.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for read".into()))?;
let ble_characteristic = ble_service
.characteristics
.get(&characteristic.uuid)
.ok_or_else(|| Error::NotSupported("Characteristic not found for read".into()))?;
ble_characteristic.read_value().await
}
async fn notifications(&self) -> Result<Pin<Box<dyn Stream<Item = ValueNotification> + Send>>> {
let receiver = self.shared.notifications_channel.subscribe();
Ok(notifications_stream_from_broadcast_receiver(receiver))
}
async fn write_descriptor(&self, descriptor: &Descriptor, data: &[u8]) -> Result<()> {
let ble_service = &*self
.shared
.ble_services
.get(&descriptor.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for write".into()))?;
let ble_characteristic = ble_service
.characteristics
.get(&descriptor.characteristic_uuid)
.ok_or_else(|| Error::NotSupported("Characteristic not found for write".into()))?;
let ble_descriptor = ble_characteristic
.descriptors
.get(&descriptor.uuid)
.ok_or_else(|| Error::NotSupported("Descriptor not found for write".into()))?;
ble_descriptor.write_value(data).await
}
async fn read_descriptor(&self, descriptor: &Descriptor) -> Result<Vec<u8>> {
let ble_service = &*self
.shared
.ble_services
.get(&descriptor.service_uuid)
.ok_or_else(|| Error::NotSupported("Service not found for read".into()))?;
let ble_characteristic = ble_service
.characteristics
.get(&descriptor.characteristic_uuid)
.ok_or_else(|| Error::NotSupported("Characteristic not found for read".into()))?;
let ble_descriptor = ble_characteristic
.descriptors
.get(&descriptor.uuid)
.ok_or_else(|| Error::NotSupported("Descriptor not found for write".into()))?;
ble_descriptor.read_value().await
}
async fn read_rssi(&self) -> Result<i16> {
self.shared
.last_rssi
.read()
.unwrap()
.ok_or(Error::NotConnected)
}
async fn connection_parameters(&self) -> Result<Option<ConnectionParameters>> {
let device = self.shared.device.lock().await;
match &*device {
Some(device) => Ok(Some(device.get_connection_parameters()?)),
None => Err(Error::NotConnected),
}
}
async fn request_connection_parameters(&self, preset: ConnectionParameterPreset) -> Result<()> {
let device = self.shared.device.lock().await;
match &*device {
Some(device) => device.request_connection_parameters(preset),
None => Err(Error::NotConnected),
}
}
}
impl From<BDAddr> for PeripheralId {
fn from(address: BDAddr) -> Self {
PeripheralId(address)
}
}