#![allow(clippy::let_unit_value)]
use std::future::ready;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use futures_util::{Stream, StreamExt};
use objc_foundation::{INSArray, INSFastEnumeration, NSArray};
use objc_id::ShareId;
use tokio_stream::wrappers::BroadcastStream;
use tracing::{debug, error, info, warn};
use super::delegates::{self, CentralDelegate};
use super::types::{CBCentralManager, CBManagerAuthorization, CBManagerState, CBUUID, NSUUID};
use crate::corebluetooth::types::{dispatch_get_global_queue, QOS_CLASS_UTILITY};
use crate::error::ErrorKind;
use crate::util::defer;
use crate::{AdapterEvent, AdvertisementData, AdvertisingDevice, Device, DeviceId, Error, Result, Uuid};
#[derive(Clone)]
pub struct AdapterImpl {
central: ShareId<CBCentralManager>,
delegate: ShareId<CentralDelegate>,
scanning: Arc<AtomicBool>,
}
impl PartialEq for AdapterImpl {
fn eq(&self, other: &Self) -> bool {
self.central == other.central
}
}
impl Eq for AdapterImpl {}
impl std::hash::Hash for AdapterImpl {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.central.hash(state);
}
}
impl std::fmt::Debug for AdapterImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Adapter").field(&self.central).finish()
}
}
impl AdapterImpl {
pub async fn default() -> Option<Self> {
match CBCentralManager::authorization() {
CBManagerAuthorization::ALLOWED_ALWAYS => info!("Bluetooth authorization is allowed"),
CBManagerAuthorization::DENIED => error!("Bluetooth authorization is denied"),
CBManagerAuthorization::NOT_DETERMINED => warn!("Bluetooth authorization is undetermined"),
CBManagerAuthorization::RESTRICTED => warn!("Bluetooth authorization is restricted"),
val => error!("Bluetooth authorization returned unknown value {:?}", val),
}
let (sender, _) = tokio::sync::broadcast::channel(16);
let delegate = CentralDelegate::with_sender(sender)?.share();
let central = unsafe {
let queue = dispatch_get_global_queue(QOS_CLASS_UTILITY, 0);
if queue.is_null() {
return None;
}
CBCentralManager::with_delegate(&delegate, queue).share()
};
Some(AdapterImpl {
central,
delegate,
scanning: Arc::new(AtomicBool::new(false)),
})
}
pub async fn events(&self) -> Result<impl Stream<Item = Result<AdapterEvent>> + '_> {
let receiver = self.delegate.sender().subscribe();
Ok(BroadcastStream::new(receiver).filter_map(|x| {
ready(match x {
Ok(delegates::CentralEvent::StateChanged) => {
let state = self.central.state();
debug!("Central state is now {:?}", state);
match state {
CBManagerState::POWERED_ON => Some(Ok(AdapterEvent::Available)),
_ => Some(Ok(AdapterEvent::Unavailable)),
}
}
Err(err) => Some(Err(Error::new(
ErrorKind::Internal,
Some(Box::new(err)),
"adapter event stream".to_string(),
))),
_ => None,
})
}))
}
pub async fn wait_available(&self) -> Result<()> {
let events = self.events();
if self.central.state() != CBManagerState::POWERED_ON {
events
.await?
.skip_while(|x| ready(x.is_ok() && !matches!(x, Ok(AdapterEvent::Available))))
.next()
.await
.ok_or_else(|| {
Error::new(
ErrorKind::Internal,
None,
"adapter event stream closed unexpectedly".to_string(),
)
})??;
}
Ok(())
}
pub async fn open_device(&self, id: &DeviceId) -> Result<Device> {
let identifiers = NSArray::from_vec(vec![NSUUID::from_uuid(id.0)]);
let peripherals = self.central.retrieve_peripherals_with_identifiers(&identifiers);
peripherals
.first_object()
.map(|x| Device::new(unsafe { ShareId::from_ptr(x as *const _ as *mut _) }))
.ok_or_else(|| Error::new(ErrorKind::NotFound, None, "opening device".to_string()))
}
pub async fn connected_devices(&self) -> Result<Vec<Device>> {
self.connected_devices_with_services(&[crate::btuuid::services::GENERIC_ATTRIBUTE])
.await
}
pub async fn connected_devices_with_services(&self, services: &[Uuid]) -> Result<Vec<Device>> {
assert!(!services.is_empty());
let services = {
let vec = services.iter().copied().map(CBUUID::from_uuid).collect::<Vec<_>>();
NSArray::from_vec(vec)
};
let peripherals = self.central.retrieve_connected_peripherals_with_services(&services);
Ok(peripherals
.enumerator()
.map(|x| Device::new(unsafe { ShareId::from_ptr(x as *const _ as *mut _) }))
.collect())
}
pub async fn scan<'a>(&'a self, services: &'a [Uuid]) -> Result<impl Stream<Item = AdvertisingDevice> + 'a> {
if self.central.state() != CBManagerState::POWERED_ON {
return Err(ErrorKind::AdapterUnavailable.into());
}
if self.scanning.swap(true, Ordering::Acquire) {
return Err(ErrorKind::AlreadyScanning.into());
}
let services = (!services.is_empty()).then(|| {
let vec = services.iter().copied().map(CBUUID::from_uuid).collect::<Vec<_>>();
NSArray::from_vec(vec)
});
let guard = defer(|| {
self.central.stop_scan();
self.scanning.store(false, Ordering::Release);
});
let events = BroadcastStream::new(self.delegate.sender().subscribe())
.take_while(|_| ready(self.central.state() == CBManagerState::POWERED_ON))
.filter_map(move |x| {
let _guard = &guard;
ready(match x {
Ok(delegates::CentralEvent::Discovered {
peripheral,
adv_data,
rssi,
}) => Some(AdvertisingDevice {
device: Device::new(peripheral),
adv_data: AdvertisementData::from_nsdictionary(&adv_data),
rssi: Some(rssi),
}),
_ => None,
})
});
self.central
.scan_for_peripherals_with_services(services.as_deref(), None);
Ok(events)
}
pub async fn discover_devices<'a>(
&'a self,
services: &'a [Uuid],
) -> Result<impl Stream<Item = Result<Device>> + 'a> {
use futures_util::TryFutureExt;
let connected = self.connected_devices_with_services(services).await?;
let advertising = Box::pin(async {
match self.scan(services).await {
Ok(stream) => Ok(stream.map(|x| Ok(x.device))),
Err(err) => Err(err),
}
})
.try_flatten_stream();
Ok(futures_util::stream::iter(connected).map(Ok).chain(advertising))
}
pub async fn connect_device(&self, device: &Device) -> Result<()> {
if self.central.state() != CBManagerState::POWERED_ON {
return Err(ErrorKind::AdapterUnavailable.into());
}
let mut events = BroadcastStream::new(self.delegate.sender().subscribe());
debug!("Connecting to {:?}", device);
self.central.connect_peripheral(&*device.0.peripheral, None);
while let Some(event) = events.next().await {
if self.central.state() != CBManagerState::POWERED_ON {
return Err(ErrorKind::AdapterUnavailable.into());
}
match event {
Ok(delegates::CentralEvent::Connect { peripheral }) if peripheral == device.0.peripheral => break,
Ok(delegates::CentralEvent::ConnectFailed { peripheral, error })
if peripheral == device.0.peripheral =>
{
return Err(error.map_or(ErrorKind::ConnectionFailed.into(), Error::from_nserror));
}
_ => (),
}
}
Ok(())
}
pub async fn disconnect_device(&self, device: &Device) -> Result<()> {
if self.central.state() != CBManagerState::POWERED_ON {
return Err(ErrorKind::AdapterUnavailable.into());
}
let mut events = BroadcastStream::new(self.delegate.sender().subscribe());
debug!("Disconnecting from {:?}", device);
self.central.cancel_peripheral_connection(&*device.0.peripheral);
while let Some(event) = events.next().await {
if self.central.state() != CBManagerState::POWERED_ON {
return Err(ErrorKind::AdapterUnavailable.into());
}
match event {
Ok(delegates::CentralEvent::Disconnect {
peripheral,
error: None,
}) if peripheral == device.0.peripheral => break,
Ok(delegates::CentralEvent::Disconnect {
peripheral,
error: Some(err),
}) if peripheral == device.0.peripheral => return Err(Error::from_nserror(err)),
Err(err) => return Err(Error::from_stream_recv_error(err)),
_ => (),
}
}
Ok(())
}
}