use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use windows::Devices::Bluetooth::BluetoothLEDevice;
use windows::Devices::Bluetooth::GenericAttributeProfile::{
GattCharacteristic, GattCommunicationStatus, GattDeviceService, GattWriteOption,
};
use windows::Storage::Streams::{DataReader, DataWriter, InMemoryRandomAccessStream};
use crate::config::BlePhy;
use crate::error::{BleError, Result};
use crate::transport::BleConnection;
use crate::NodeId;
fn win_err(msg: &str) -> impl Fn(windows::core::Error) -> BleError + '_ {
move |e| BleError::PlatformError(format!("{}: {}", msg, e))
}
#[derive(Clone)]
pub struct WinRtConnection {
node_id: NodeId,
address: u64,
device: Arc<Option<BluetoothLEDevice>>,
service: Arc<Option<GattDeviceService>>,
sync_char: Arc<Option<GattCharacteristic>>,
mtu: Arc<AtomicU16>,
connected: Arc<std::sync::atomic::AtomicBool>,
connected_at: Arc<std::sync::Mutex<Option<Instant>>>,
}
impl WinRtConnection {
pub fn new(node_id: NodeId, address: u64) -> Self {
Self {
node_id,
address,
device: Arc::new(None),
service: Arc::new(None),
sync_char: Arc::new(None),
mtu: Arc::new(AtomicU16::new(23)), connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
connected_at: Arc::new(std::sync::Mutex::new(None)),
}
}
pub fn connect_sync(&mut self) -> Result<()> {
let async_op = BluetoothLEDevice::FromBluetoothAddressAsync(self.address)
.map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
let device_result = async_op
.get()
.map_err(|e| BleError::ConnectionFailed(format!("Device connection failed: {}", e)))?;
let services_op = device_result
.GetGattServicesAsync()
.map_err(|e| BleError::ConnectionFailed(format!("Failed to get services: {}", e)))?;
let services_result = services_op
.get()
.map_err(|e| BleError::ConnectionFailed(format!("Service discovery failed: {}", e)))?;
let status = services_result
.Status()
.map_err(win_err("Failed to get status"))?;
if status != GattCommunicationStatus::Success {
return Err(BleError::ConnectionFailed(
"GATT service discovery failed".to_string(),
));
}
let services = services_result
.Services()
.map_err(win_err("Failed to get services"))?;
let mut peat_service: Option<GattDeviceService> = None;
let count = services.Size().map_err(win_err("Failed to get count"))?;
for i in 0..count {
let service = services
.GetAt(i)
.map_err(win_err("Failed to get service"))?;
let uuid = service.Uuid().map_err(win_err("Failed to get UUID"))?;
let uuid_str = format!("{:?}", uuid).to_lowercase();
if uuid_str.contains("f47ac10b-58cc-4372-a567-0e02b2c3d479") {
peat_service = Some(service);
break;
}
}
let service = peat_service.ok_or_else(|| {
BleError::ServiceNotFound("Peat service not found on device".to_string())
})?;
let chars_op = service.GetCharacteristicsAsync().map_err(|e| {
BleError::ConnectionFailed(format!("Failed to get characteristics: {}", e))
})?;
let chars_result = chars_op.get().map_err(|e| {
BleError::ConnectionFailed(format!("Characteristic discovery failed: {}", e))
})?;
let char_status = chars_result
.Status()
.map_err(win_err("Failed to get char status"))?;
if char_status != GattCommunicationStatus::Success {
return Err(BleError::ConnectionFailed(
"GATT characteristic discovery failed".to_string(),
));
}
let chars = chars_result
.Characteristics()
.map_err(win_err("Failed to get chars"))?;
let mut sync_char: Option<GattCharacteristic> = None;
let char_count = chars.Size().map_err(win_err("Failed to get char count"))?;
for i in 0..char_count {
let char = chars.GetAt(i).map_err(win_err("Failed to get char"))?;
let uuid = char.Uuid().map_err(win_err("Failed to get char UUID"))?;
let uuid_str = format!("{:?}", uuid).to_lowercase();
if uuid_str.contains("0e02b2c3d003") {
sync_char = Some(char);
break;
}
}
self.device = Arc::new(Some(device_result));
self.service = Arc::new(Some(service));
self.sync_char = Arc::new(sync_char);
self.connected
.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut connected_at) = self.connected_at.lock() {
*connected_at = Some(Instant::now());
}
log::info!(
"Connected to node {:08X} at {:012X}",
self.node_id.as_u32(),
self.address
);
Ok(())
}
pub async fn connect(&mut self) -> Result<()> {
let mut this = self.clone();
tokio::task::spawn_blocking(move || this.connect_sync())
.await
.map_err(|e| BleError::ConnectionFailed(format!("Task join failed: {}", e)))?
}
pub fn disconnect(&mut self) {
self.connected
.store(false, std::sync::atomic::Ordering::SeqCst);
self.device = Arc::new(None);
self.service = Arc::new(None);
self.sync_char = Arc::new(None);
log::info!("Disconnected from node {:08X}", self.node_id.as_u32());
}
pub fn read_sync_data_blocking(&self) -> Result<Vec<u8>> {
let char = self
.sync_char
.as_ref()
.as_ref()
.ok_or_else(|| BleError::ConnectionLost("Not connected".to_string()))?;
let read_op = char
.ReadValueAsync()
.map_err(|e| BleError::GattError(format!("Failed to read: {}", e)))?;
let result = read_op
.get()
.map_err(|e| BleError::GattError(format!("Read failed: {}", e)))?;
if result
.Status()
.map_err(win_err("Failed to get read status"))?
!= GattCommunicationStatus::Success
{
return Err(BleError::GattError(
"Read failed with error status".to_string(),
));
}
let buffer = result
.Value()
.map_err(win_err("Failed to get read value"))?;
let reader = DataReader::FromBuffer(&buffer).map_err(win_err("Failed to create reader"))?;
let len = reader
.UnconsumedBufferLength()
.map_err(win_err("Failed to get buffer length"))? as usize;
let mut data = vec![0u8; len];
reader
.ReadBytes(&mut data)
.map_err(win_err("Failed to read bytes"))?;
Ok(data)
}
pub async fn read_sync_data(&self) -> Result<Vec<u8>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.read_sync_data_blocking())
.await
.map_err(|e| BleError::GattError(format!("Task join failed: {}", e)))?
}
pub fn write_sync_data_blocking(&self, data: &[u8]) -> Result<()> {
let char = self
.sync_char
.as_ref()
.as_ref()
.ok_or_else(|| BleError::ConnectionLost("Not connected".to_string()))?;
let stream = InMemoryRandomAccessStream::new()
.map_err(|e| BleError::GattError(format!("Failed to create stream: {}", e)))?;
let writer = DataWriter::CreateDataWriter(&stream)
.map_err(|e| BleError::GattError(format!("Failed to create writer: {}", e)))?;
writer
.WriteBytes(data)
.map_err(|e| BleError::GattError(format!("Failed to write bytes: {}", e)))?;
let buffer = writer
.DetachBuffer()
.map_err(|e| BleError::GattError(format!("Failed to detach buffer: {}", e)))?;
let write_op = char
.WriteValueWithOptionAsync(&buffer, GattWriteOption::WriteWithResponse)
.map_err(|e| BleError::GattError(format!("Failed to write: {}", e)))?;
let result = write_op
.get()
.map_err(|e| BleError::GattError(format!("Write failed: {}", e)))?;
if result != GattCommunicationStatus::Success {
return Err(BleError::GattError(
"Write failed with error status".to_string(),
));
}
Ok(())
}
pub async fn write_sync_data(&self, data: &[u8]) -> Result<()> {
let this = self.clone();
let data = data.to_vec();
tokio::task::spawn_blocking(move || this.write_sync_data_blocking(&data))
.await
.map_err(|e| BleError::GattError(format!("Task join failed: {}", e)))?
}
}
impl BleConnection for WinRtConnection {
fn peer_id(&self) -> &NodeId {
&self.node_id
}
fn is_alive(&self) -> bool {
self.connected.load(std::sync::atomic::Ordering::SeqCst)
}
fn mtu(&self) -> u16 {
self.mtu.load(Ordering::Relaxed)
}
fn phy(&self) -> BlePhy {
BlePhy::Le1M
}
fn rssi(&self) -> Option<i8> {
None
}
fn connected_duration(&self) -> Duration {
if let Ok(connected_at) = self.connected_at.lock() {
if let Some(start) = *connected_at {
return start.elapsed();
}
}
Duration::ZERO
}
}
impl std::fmt::Debug for WinRtConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WinRtConnection")
.field("node_id", &self.node_id)
.field("address", &format!("{:012X}", self.address))
.field("connected", &self.connected.load(Ordering::Relaxed))
.field("mtu", &self.mtu.load(Ordering::Relaxed))
.finish()
}
}