use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::config::{BleConfig, DiscoveryConfig};
use crate::error::{BleError, Result};
use crate::platform::{
BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
DiscoveryCallback,
};
use crate::transport::BleConnection;
use crate::NodeId;
use super::central::CentralManager;
use super::connection::CoreBluetoothConnection;
use super::peripheral::PeripheralManager;
struct AdapterState {
connections: HashMap<NodeId, CoreBluetoothConnection>,
identifier_to_node: HashMap<String, NodeId>,
node_to_identifier: HashMap<NodeId, String>,
}
impl Default for AdapterState {
fn default() -> Self {
Self {
connections: HashMap::new(),
identifier_to_node: HashMap::new(),
node_to_identifier: HashMap::new(),
}
}
}
pub struct CoreBluetoothAdapter {
central: Arc<CentralManager>,
peripheral: Arc<PeripheralManager>,
config: RwLock<Option<BleConfig>>,
state: RwLock<AdapterState>,
discovery_callback: RwLock<Option<DiscoveryCallback>>,
connection_callback: RwLock<Option<ConnectionCallback>>,
}
impl CoreBluetoothAdapter {
pub fn new() -> Result<Self> {
let central = Arc::new(CentralManager::new()?);
let peripheral = Arc::new(PeripheralManager::new()?);
log::info!("CoreBluetoothAdapter created");
Ok(Self {
central,
peripheral,
config: RwLock::new(None),
state: RwLock::new(AdapterState::default()),
discovery_callback: RwLock::new(None),
connection_callback: RwLock::new(None),
})
}
async fn register_node_identifier(&self, node_id: NodeId, identifier: String) {
let mut state = self.state.write().await;
state
.identifier_to_node
.insert(identifier.clone(), node_id.clone());
state.node_to_identifier.insert(node_id, identifier);
}
async fn get_node_identifier(&self, node_id: &NodeId) -> Option<String> {
let state = self.state.read().await;
state.node_to_identifier.get(node_id).cloned()
}
#[allow(dead_code)] async fn get_identifier_node(&self, identifier: &str) -> Option<NodeId> {
let state = self.state.read().await;
state.identifier_to_node.get(identifier).cloned()
}
pub async fn start_scan_unfiltered(&self) -> Result<()> {
let config = self.config.read().await;
let config = config
.as_ref()
.ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
self.central.start_scan(&config.discovery, None).await
}
pub async fn connect_by_identifier(&self, identifier: &str) -> Result<()> {
self.central.connect(identifier).await
}
pub async fn get_peripheral_info(
&self,
identifier: &str,
) -> Option<super::central::PeripheralInfo> {
self.central.get_peripheral(identifier).await
}
pub async fn discover_services(&self, identifier: &str) -> Result<()> {
let peat_service_uuid = crate::PEAT_SERVICE_UUID.to_string();
self.central
.discover_services(identifier, Some(&[&peat_service_uuid]))
.await
}
pub async fn discover_characteristics(&self, identifier: &str) -> Result<()> {
let peat_service_uuid = crate::PEAT_SERVICE_UUID.to_string();
self.central
.discover_characteristics(identifier, &peat_service_uuid)
.await
}
pub async fn read_characteristic(
&self,
identifier: &str,
characteristic_uuid: &str,
) -> Result<()> {
let peat_service_uuid = crate::PEAT_SERVICE_UUID.to_string();
self.central
.read_characteristic(identifier, &peat_service_uuid, characteristic_uuid)
.await
}
pub async fn write_characteristic(
&self,
identifier: &str,
characteristic_uuid: &str,
data: &[u8],
with_response: bool,
) -> Result<()> {
let peat_service_uuid = crate::PEAT_SERVICE_UUID.to_string();
self.central
.write_characteristic(
identifier,
&peat_service_uuid,
characteristic_uuid,
data,
with_response,
)
.await
}
pub async fn try_recv_peripheral_event(&self) -> Option<super::delegates::PeripheralEvent> {
self.central.try_recv_peripheral_event().await
}
pub async fn poll(&self) -> Result<()> {
self.central.process_events().await?;
self.peripheral.process_events().await?;
let peat_peripherals = self.central.get_peat_peripherals().await;
if let Some(ref callback) = *self.discovery_callback.read().await {
for peripheral in peat_peripherals {
if let Some(node_id) = &peripheral.node_id {
self.register_node_identifier(node_id.clone(), peripheral.identifier.clone())
.await;
}
let device = DiscoveredDevice {
address: peripheral.identifier.clone(),
name: peripheral.name.clone(),
rssi: peripheral.rssi,
is_peat_node: peripheral.is_peat_node,
node_id: peripheral.node_id.clone(),
adv_data: Vec::new(),
};
callback(device);
}
}
Ok(())
}
}
#[async_trait]
impl BleAdapter for CoreBluetoothAdapter {
async fn init(&mut self, config: &BleConfig) -> Result<()> {
*self.config.write().await = Some(config.clone());
log::info!("Waiting for Bluetooth to be ready...");
let mut attempts = 0;
loop {
self.central.process_events().await?;
self.peripheral.process_events().await?;
let central_state = self.central.state().await;
let peripheral_state = self.peripheral.state().await;
log::debug!(
"BLE states: central={:?}, peripheral={:?}",
central_state,
peripheral_state
);
match central_state {
super::delegates::CentralState::PoweredOn => break,
super::delegates::CentralState::Unsupported => {
return Err(BleError::NotSupported(
"Bluetooth not supported".to_string(),
))
}
super::delegates::CentralState::Unauthorized => {
return Err(BleError::PlatformError(
"Bluetooth not authorized - check System Preferences".to_string(),
))
}
super::delegates::CentralState::PoweredOff => {
return Err(BleError::PlatformError(
"Bluetooth is powered off - please enable it".to_string(),
))
}
_ => {
attempts += 1;
if attempts > 50 {
return Err(BleError::PlatformError(
"Bluetooth failed to initialize (timeout)".to_string(),
));
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
}
let central_state = self.central.state().await;
let peripheral_state = self.peripheral.state().await;
log::info!(
"CoreBluetoothAdapter initialized for node {:08X} (central: {:?}, peripheral: {:?})",
config.node_id.as_u32(),
central_state,
peripheral_state
);
Ok(())
}
async fn start(&self) -> Result<()> {
let config = self.config.read().await;
let config = config
.as_ref()
.ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
if let Err(e) = self
.peripheral
.register_peat_service(config.node_id.clone())
.await
{
log::warn!("Failed to register Peat service: {}", e);
}
if let Err(e) = self
.peripheral
.start_advertising(config.node_id.clone(), &config.discovery)
.await
{
log::warn!("Failed to start advertising: {}", e);
}
if let Err(e) = self.central.start_scan(&config.discovery, None).await {
log::warn!("Failed to start scanning: {}", e);
}
log::info!("CoreBluetoothAdapter started");
Ok(())
}
async fn stop(&self) -> Result<()> {
self.central.stop_scan().await?;
self.peripheral.stop_advertising().await?;
self.peripheral.unregister_all_services().await?;
log::info!("CoreBluetoothAdapter stopped");
Ok(())
}
fn is_powered(&self) -> bool {
true }
fn address(&self) -> Option<String> {
None
}
async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
self.central.start_scan(config, None).await
}
async fn stop_scan(&self) -> Result<()> {
self.central.stop_scan().await
}
async fn start_advertising(&self, config: &DiscoveryConfig) -> Result<()> {
let ble_config = self.config.read().await;
let ble_config = ble_config
.as_ref()
.ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
self.peripheral
.start_advertising(ble_config.node_id.clone(), config)
.await
}
async fn stop_advertising(&self) -> Result<()> {
self.peripheral.stop_advertising().await
}
fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
if let Ok(mut cb) = self.discovery_callback.try_write() {
*cb = callback;
}
}
async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
let identifier = self
.get_node_identifier(peer_id)
.await
.ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
self.central.connect(&identifier).await?;
let connection = CoreBluetoothConnection::new(peer_id.clone(), identifier.clone());
{
let mut state = self.state.write().await;
state
.connections
.insert(peer_id.clone(), connection.clone());
}
if let Some(ref cb) = *self.connection_callback.read().await {
cb(
peer_id.clone(),
ConnectionEvent::Connected {
mtu: connection.mtu(),
phy: connection.phy(),
},
);
}
log::info!("Connected to peer {} ({})", peer_id, identifier);
Ok(Box::new(connection))
}
async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
let (connection, identifier) = {
let mut state = self.state.write().await;
let conn = state.connections.remove(peer_id);
let id = state.node_to_identifier.get(peer_id).cloned();
(conn, id)
};
if let Some(identifier) = identifier {
self.central.disconnect(&identifier).await?;
}
if connection.is_some() {
if let Some(ref cb) = *self.connection_callback.read().await {
cb(
peer_id.clone(),
ConnectionEvent::Disconnected {
reason: DisconnectReason::LocalRequest,
},
);
}
log::info!("Disconnected from peer {}", peer_id);
}
Ok(())
}
fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
if let Ok(state) = self.state.try_read() {
state
.connections
.get(peer_id)
.map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
} else {
None
}
}
fn peer_count(&self) -> usize {
if let Ok(state) = self.state.try_read() {
state.connections.len()
} else {
0
}
}
fn connected_peers(&self) -> Vec<NodeId> {
if let Ok(state) = self.state.try_read() {
state.connections.keys().cloned().collect()
} else {
Vec::new()
}
}
fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
if let Ok(mut cb) = self.connection_callback.try_write() {
*cb = callback;
}
}
async fn register_gatt_service(&self) -> Result<()> {
let config = self.config.read().await;
let config = config
.as_ref()
.ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
self.peripheral
.register_peat_service(config.node_id.clone())
.await
}
async fn unregister_gatt_service(&self) -> Result<()> {
self.peripheral.unregister_all_services().await
}
async fn write_to_peer(
&self,
peer_id: &NodeId,
char_uuid: uuid::Uuid,
data: &[u8],
) -> Result<()> {
let identifier = self
.get_node_identifier(peer_id)
.await
.ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
let peat_service_uuid = crate::PEAT_SERVICE_UUID.to_string();
let char_uuid_str = char_uuid.to_string();
self.central
.write_characteristic(
&identifier,
&peat_service_uuid,
&char_uuid_str,
data,
true, )
.await
}
fn supports_coded_phy(&self) -> bool {
false
}
fn supports_extended_advertising(&self) -> bool {
false
}
fn max_mtu(&self) -> u16 {
512
}
fn max_connections(&self) -> u8 {
8
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_adapter_state_default() {
use super::AdapterState;
let state = AdapterState::default();
assert!(state.connections.is_empty());
assert!(state.identifier_to_node.is_empty());
}
}