use std::thread;
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::sleep;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::timeout;
use futures::StreamExt;
use json::JsonValue;
use log::{error, info};
use windows::Devices::Bluetooth::Advertisement::{BluetoothLEAdvertisementReceivedEventArgs, BluetoothLEAdvertisementWatcher, BluetoothLEScanningMode};
use windows::Devices::Bluetooth::{BluetoothAdapter, BluetoothAddressType, BluetoothCacheMode, BluetoothConnectionStatus, BluetoothLEDevice};
use windows::Devices::Bluetooth::GenericAttributeProfile::{GattCharacteristic, GattClientCharacteristicConfigurationDescriptorValue, GattDeviceService};
use windows::core::{Error, Ref};
use windows::Devices::Radios::RadioState;
use windows::Foundation::TypedEventHandler;
use crate::device::Characteristic;
use crate::win::until::{connected_device, subscribe};
#[derive(Clone)]
pub enum CentralEvent {
ManagerStateChanged {
new_state: ManagerState,
},
ConnectedPeripheral {
connected_device: Vec<String>,
},
PeripheralDiscovered {
peripheral: BluetoothLEDevice,
advertisement_data: AdvertisementData,
rssi: i16,
},
PeripheralConnected {
peripheral: BluetoothLEDevice,
},
PeripheralUnConnected {
peripheral: BluetoothLEDevice,
},
PeripheralDisconnected {
peripheral: BluetoothLEDevice,
},
ServicesDiscovered { peripheral: BluetoothLEDevice, services: Vec<GattDeviceService> },
CharacteristicsDiscovered {
peripheral: BluetoothLEDevice,
service: GattDeviceService,
characteristics: Vec<GattCharacteristic>,
},
SubscriptionChangeResult {
peripheral: BluetoothLEDevice,
characteristic: Characteristic,
},
CharacteristicValue {
peripheral: BluetoothLEDevice,
characteristic: Characteristic,
value: Result<Vec<u8>, Error>,
},
PeripheralConnectFailed {
peripheral: BluetoothLEDevice,
},
}
#[derive(Clone)]
pub struct CentralManager {
sender: Arc<Sender<CentralEvent>>,
pending_disconnect_devices: Arc<RwLock<Vec<BluetoothLEDevice>>>,
scan_running: Arc<AtomicBool>,
connected_running: Arc<AtomicBool>,
}
impl CentralManager {
pub fn new() -> (Self, Receiver<CentralEvent>) {
loop {
match BluetoothAdapter::GetDefaultAsync() {
Ok(v) => {
match v.get() {
Ok(_) => break,
Err(e) => {
error!("找不到蓝牙接收器2: {}",e);
sleep(Duration::from_secs(5));
continue
}
}
},
Err(e) => {
error!("找不到蓝牙接收器1: {}",e);
sleep(Duration::from_secs(5));
continue
}
}
}
let (tx, receiver) = mpsc::channel();
let sender = Arc::new(tx.clone());
thread::spawn(move || {
let mut state = ManagerState::None;
loop {
let bluetooth_adapter = match BluetoothAdapter::GetDefaultAsync() {
Ok(e) => e,
Err(e) => {
error!("BluetoothAdapter::GetDefaultAsync: {}",e);
sender.send(CentralEvent::ManagerStateChanged { new_state: ManagerState::Unsupported }).unwrap();
continue;
}
};
let adapter = match bluetooth_adapter.get() {
Ok(e) => e,
Err(e) => {
error!("BluetoothAdapter::get: {}",e);
sender.send(CentralEvent::ManagerStateChanged { new_state: ManagerState::Unsupported }).unwrap();
continue;
}
};
let radio_state = match adapter.GetRadioAsync() {
Ok(e) => match e.get() {
Ok(e) => match e.State() {
Ok(e) => e,
Err(e) => {
error!("BluetoothAdapter::State: {}",e);
sender.send(CentralEvent::ManagerStateChanged { new_state: ManagerState::Unsupported }).unwrap();
continue;
}
},
Err(e) => {
error!("BluetoothAdapter::get: {}",e);
sender.send(CentralEvent::ManagerStateChanged { new_state: ManagerState::Unsupported }).unwrap();
continue;
}
},
Err(e) => {
error!("BluetoothAdapter::GetRadioAsync: {}",e);
sender.send(CentralEvent::ManagerStateChanged { new_state: ManagerState::Unsupported }).unwrap();
continue;
}
};
let manager_state = match radio_state {
RadioState::On => ManagerState::PoweredOn,
RadioState::Off => ManagerState::PoweredOff,
RadioState::Disabled => ManagerState::Unauthorized,
RadioState::Unknown => ManagerState::Unknown,
_ => ManagerState::Unknown
};
if state == manager_state {
sleep(Duration::from_secs(2));
continue;
}
state = manager_state;
sender.send(CentralEvent::ManagerStateChanged { new_state: manager_state }).unwrap();
}
});
(
Self {
sender: Arc::new(tx.clone()),
pending_disconnect_devices: Arc::new(RwLock::new(Vec::new())),
scan_running: Arc::new(AtomicBool::new(false)),
connected_running: Arc::new(AtomicBool::new(false)),
},
receiver
)
}
pub fn stop_threads(&self) {
self.scan_running.store(false, Ordering::SeqCst);
self.connected_running.store(false, Ordering::SeqCst);
}
pub fn scan(&self) {
let tx = self.sender.clone();
if self.scan_running.load(Ordering::SeqCst) {
return;
}
self.scan_running.store(true, Ordering::SeqCst);
let scan_running = self.scan_running.clone(); thread::spawn(move || {
let received_handler = TypedEventHandler::new(
move |_watcher, event_args: Ref<'_, BluetoothLEAdvertisementReceivedEventArgs>| {
let name = event_args.clone().unwrap().Advertisement().unwrap().LocalName().ok().and_then(|x| (!x.is_empty()).then(|| x.to_string_lossy())).unwrap_or("unknown".to_string());
let addr = match event_args.clone().unwrap().BluetoothAddress().ok() {
None => return Ok(()),
Some(e) => e
};
let rssi = match event_args.clone().unwrap().RawSignalStrengthInDBm().ok() {
None => return Ok(()),
Some(e) => e
};
let addr_type = match event_args.clone().unwrap().BluetoothAddressType().ok() {
None => return Ok(()),
Some(e) => e
};
let is_connectable = event_args.clone().unwrap().IsConnectable().unwrap_or(false);
let advertisement_data = AdvertisementData {
name,
addr,
addr_type: AddrType::from(addr_type),
is_connectable,
};
let device = match BluetoothLEDevice::FromBluetoothAddressAsync(addr).unwrap().get() {
Ok(e) => e,
Err(_) => return Ok(()),
};
match tx.send(CentralEvent::PeripheralDiscovered { peripheral: device.clone(), advertisement_data, rssi}) {
Ok(()) => {}
Err(e) => {
error!("{}", e);
}
};
Ok(())
},
);
let stopped_handler = TypedEventHandler::new(
move |watcher: Ref<'_, BluetoothLEAdvertisementWatcher>, _event_args| {
error!("扫描关闭: {:?}",watcher.clone());
Ok(())
},
);
let build_watcher = || -> Result<BluetoothLEAdvertisementWatcher, Error> {
let watcher = match BluetoothLEAdvertisementWatcher::new() {
Ok(e) => e,
Err(e) => return Err(e)
};
match watcher.SetScanningMode(BluetoothLEScanningMode::Active) {
Ok(()) => {}
Err(e) => return Err(e)
};
watcher.SetAllowExtendedAdvertisements(true).unwrap();
match watcher.Received(&received_handler) {
Ok(e) => e,
Err(e) => return Err(e)
};
watcher.Stopped(&stopped_handler)?;
Ok(watcher)
};
let build_watcher = match build_watcher() {
Ok(e) => e,
Err(_) => {
info!("错误");
return;
}
};
match build_watcher.Start() {
Ok(()) => {
info!("扫描启动成功");
}
Err(_e) => {
error!("蓝牙未开启");
}
}
loop {
if !scan_running.load(Ordering::SeqCst) {
info!("扫描线程退出");
break;
}
sleep(Duration::from_secs(1));
}
scan_running.store(false, Ordering::SeqCst);
});
}
pub fn connected(&self){
let tx = self.sender.clone();
if self.connected_running.load(Ordering::SeqCst) {
return;
}
self.connected_running.store(true, Ordering::SeqCst);
let connected_running = self.connected_running.clone();
thread::spawn(move || {
loop {
if !connected_running.load(Ordering::SeqCst) {
info!("连接监听已连接蓝牙设备");
break;
}
match connected_device() {
Ok(connected_device) => {
info!("本机已连接蓝牙设备: {:#}", JsonValue::from(connected_device.clone()));
tx.send(CentralEvent::ConnectedPeripheral { connected_device }).unwrap();
}
Err(_) => {
info!("获取失败");
}
};
sleep(Duration::from_secs(3));
}
});
}
pub fn connect(&self, device: BluetoothLEDevice) {
match device.ConnectionStatus() {
Ok(e) => {
match e {
BluetoothConnectionStatus(0i32) => {
self.sender.send(CentralEvent::PeripheralUnConnected { peripheral: device.clone() }).unwrap();
}
_ => {
self.sender.send(CentralEvent::PeripheralConnected { peripheral: device.clone() }).unwrap();
}
}
}
Err(e) => {
error!("链接失败: {}",e);
}
}
}
pub fn discover_services(&self, device: BluetoothLEDevice) {
let gatt_services_result = match match device.GetGattServicesWithCacheModeAsync(BluetoothCacheMode::Uncached) {
Ok(e) => e,
Err(_) => return,
}.get() {
Ok(e) => e,
Err(_) => return,
};
let services = match gatt_services_result.Services() {
Ok(e) => e,
Err(_) => return
};
let mut list = vec![];
for service in &services {
list.push(service);
}
self.sender.send(CentralEvent::ServicesDiscovered { peripheral: device.clone(), services: list }).unwrap();
}
pub fn discover_characteristics(&self, peripheral: BluetoothLEDevice, service: GattDeviceService) {
let characteristics_result = match service.GetCharacteristicsAsync() {
Ok(e) => {
match e.get() {
Ok(e) => {
e
}
Err(_) => {
info!("错误");
return;
}
}
}
Err(_) => {
info!("错误");
return;
}
};
let characteristics = characteristics_result.Characteristics().unwrap();
let mut list = vec![];
for characteristic in &characteristics {
list.push(characteristic);
}
self.sender.send(CentralEvent::CharacteristicsDiscovered { peripheral: peripheral.clone(), service, characteristics: list }).unwrap();
}
pub fn subscribe(&self, peripheral: BluetoothLEDevice, characteristic: Characteristic) {
self.sender.send(CentralEvent::SubscriptionChangeResult { peripheral, characteristic }).unwrap();
}
pub fn get_value(&self, peripheral: BluetoothLEDevice, characteristic: Characteristic) {
let tx = self.sender.clone();
let pending_disconnect_devices = self.pending_disconnect_devices.clone();
thread::spawn(move || {
let gatt_characteristic = characteristic.gatt_characteristic.clone();
let mut event_stream = match subscribe(&gatt_characteristic) {
Ok(v) => {
info!("订阅成功! {}", characteristic.uuid);
v
}
Err(e) => {
info!("订阅失败 {} - {:?}", e, gatt_characteristic.Uuid().unwrap());
return;
}
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
loop {
match timeout(Duration::from_secs(1), event_stream.next()).await {
Ok(Some(value)) => {
tx.send(CentralEvent::CharacteristicValue {
peripheral: peripheral.clone(),
characteristic: characteristic.clone(),
value
}).unwrap();
},
Ok(None) => {
break;
},
Err(_) => {
let r = pending_disconnect_devices.read().await;
if !r.contains(&peripheral) {
drop(r);
continue;
}
drop(r);
log::info!("Found pending disconnect device, {:?}, {:?}, prepare disconnect", peripheral.Name(), peripheral.BluetoothAddress());
let mut w = pending_disconnect_devices.write().await;
let idx_res = w.iter().position(|item|item == &peripheral);
let need_break = idx_res.is_some();
if let Some(idx) = idx_res {
log::info!("Remove pending disconnect device: {:?}, {:?}",
peripheral.Name().ok(),
peripheral.BluetoothAddress().ok()
);
w.remove(idx);
}
drop(w);
if need_break {
log::debug!("Break get_value() loop");
break;
} else {
continue;
}
}
};
}
});
});
}
pub fn disconnect(&self, peripheral: BluetoothLEDevice) {
info!("正在断开与设备的连接: 名称: {:?}, 地址: {:?}",peripheral.Name().ok(),peripheral.BluetoothAddress().ok());
let mut w = self.pending_disconnect_devices.blocking_write();
if !w.contains(&peripheral){
info!("Add pending disconnect peripheral: {:?}, {:?}", peripheral.Name(), peripheral.BluetoothAddress());
w.push(peripheral.clone());
}
drop(w);
let gatt_services_result = peripheral.GetGattServicesAsync().ok().unwrap().get().unwrap();
let gatt_services = gatt_services_result.Services().ok().unwrap();
let count = gatt_services.Size().ok().unwrap();
for i in 0..count {
if let Ok(service) = gatt_services.GetAt(i) {
let characteristic_res = service.GetCharacteristicsAsync().ok().unwrap().get().unwrap();
let characteristics = characteristic_res.Characteristics().unwrap();
let characteristics_num = characteristics.Size().ok().unwrap();
for j in 0..characteristics_num {
let single_chara = characteristics.GetAt(j).ok().unwrap();
match single_chara.WriteClientCharacteristicConfigurationDescriptorWithResultAsync(
GattClientCharacteristicConfigurationDescriptorValue::None
) {
Ok(_) => {
}
Err(_e) => {
}
}
}
let g_close_res = service.Close(); match g_close_res {
Ok(_) => {
}
Err(_e) => {
}
}
}
}
let p_close_res = peripheral.Close();
match p_close_res {
Ok(_) => {
}
Err(_e) => {
}
}
drop(peripheral.clone());
self.sender.send(CentralEvent::PeripheralDisconnected { peripheral }).unwrap();
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[non_exhaustive]
pub enum ManagerState {
Unknown = 0,
Resetting = 1,
Unsupported = 2,
Unauthorized = 3,
PoweredOff = 4,
PoweredOn = 5,
None,
}
impl ManagerState {
fn from_u8(v: u8) -> Option<Self> {
Some(match v {
0 => Self::Unknown,
1 => Self::Resetting,
2 => Self::Unsupported,
3 => Self::Unauthorized,
4 => Self::PoweredOff,
5 => Self::PoweredOn,
_ => return None,
})
}
}
#[derive(Debug, Clone)]
pub struct AdvertisementData {
pub(crate) name: String,
addr: u64,
addr_type: AddrType,
pub(crate) is_connectable: bool,
}
#[derive(Debug, Clone)]
pub enum AddrType {
Public,
Random,
Unspecified,
None,
}
impl AddrType {
pub fn from(addr_type: BluetoothAddressType) -> Self {
match addr_type {
BluetoothAddressType(0i32) => Self::Public,
BluetoothAddressType(1i32) => Self::Random,
BluetoothAddressType(2i32) => Self::Unspecified,
_ => Self::None
}
}
}