use super::internal::{
CoreBluetoothMessage, CoreBluetoothReply, CoreBluetoothReplyFuture, PeripheralEventInternal,
};
use crate::{
Error, Result,
api::{
self, BDAddr, CentralEvent, CharPropFlags, Characteristic, Descriptor,
PeripheralProperties, Service, ValueNotification, WriteType,
},
common::{adapter_manager::AdapterManager, util::notifications_stream_from_broadcast_receiver},
};
use async_trait::async_trait;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::sink::SinkExt;
use futures::stream::{Stream, StreamExt};
use log::*;
use objc2_core_bluetooth::CBPeripheralState;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "serde")]
use serde_cr as serde;
use std::sync::Weak;
use std::{
collections::{BTreeSet, HashMap},
fmt::{self, Debug, Display, Formatter},
pin::Pin,
sync::{Arc, Mutex, atomic::AtomicU16},
};
use tokio::sync::broadcast;
use tokio::task;
use uuid::Uuid;
#[cfg_attr(
feature = "serde",
derive(Serialize, Deserialize),
serde(crate = "serde_cr")
)]
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct PeripheralId(Uuid);
impl Display for PeripheralId {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
Display::fmt(&self.0, f)
}
}
#[derive(Clone)]
pub struct Peripheral {
shared: Arc<Shared>,
}
struct Shared {
notifications_channel: broadcast::Sender<ValueNotification>,
manager: Weak<AdapterManager<Peripheral>>,
uuid: Uuid,
services: Mutex<BTreeSet<Service>>,
properties: Mutex<PeripheralProperties>,
message_sender: Sender<CoreBluetoothMessage>,
mtu: AtomicU16,
}
impl Shared {
fn emit_event(&self, event: CentralEvent) {
match self.manager.upgrade() {
Some(manager) => {
manager.emit(event);
}
_ => {
trace!("Could not emit an event. AdapterManager has been dropped");
}
}
}
}
impl Peripheral {
pub(crate) fn new(
uuid: Uuid,
local_name: Option<String>,
advertisement_name: Option<String>,
manager: Weak<AdapterManager<Self>>,
event_receiver: Receiver<PeripheralEventInternal>,
message_sender: Sender<CoreBluetoothMessage>,
) -> Self {
let properties = Mutex::from(PeripheralProperties {
address: BDAddr::default(),
address_type: None,
local_name,
advertisement_name,
tx_power_level: None,
rssi: None,
manufacturer_data: HashMap::new(),
service_data: HashMap::new(),
services: Vec::new(),
class: None,
});
let (notifications_channel, _) = broadcast::channel(16);
let shared = Arc::new(Shared {
properties,
manager,
services: Mutex::new(BTreeSet::new()),
notifications_channel,
uuid,
message_sender,
mtu: AtomicU16::new(crate::api::DEFAULT_MTU_SIZE),
});
let shared_clone = shared.clone();
task::spawn(async move {
let mut event_receiver = event_receiver;
let shared = shared_clone;
loop {
match event_receiver.next().await {
Some(PeripheralEventInternal::Notification(uuid, service_uuid, data)) => {
let notification = ValueNotification {
uuid,
service_uuid,
value: data,
};
let _ = shared.notifications_channel.send(notification);
}
Some(PeripheralEventInternal::ManufacturerData(
manufacturer_id,
data,
rssi,
)) => {
let mut properties = shared.properties.lock().unwrap();
properties.rssi = Some(rssi);
properties
.manufacturer_data
.insert(manufacturer_id, data.clone());
shared.emit_event(CentralEvent::ManufacturerDataAdvertisement {
id: shared.uuid.into(),
manufacturer_data: properties.manufacturer_data.clone(),
});
}
Some(PeripheralEventInternal::ServiceData(service_data, rssi)) => {
let mut properties = shared.properties.lock().unwrap();
properties.rssi = Some(rssi);
properties.service_data.extend(service_data.clone());
shared.emit_event(CentralEvent::ServiceDataAdvertisement {
id: shared.uuid.into(),
service_data,
});
}
Some(PeripheralEventInternal::Services(services, rssi)) => {
let mut properties = shared.properties.lock().unwrap();
properties.rssi = Some(rssi);
properties.services = services.clone();
shared.emit_event(CentralEvent::ServicesAdvertisement {
id: shared.uuid.into(),
services,
});
}
Some(PeripheralEventInternal::ServicesModified) => {
shared.services.lock().unwrap().clear();
shared.emit_event(CentralEvent::DeviceServicesModified(shared.uuid.into()));
}
Some(PeripheralEventInternal::TxPowerLevel(tx_power_level)) => {
let mut properties = shared.properties.lock().unwrap();
properties.tx_power_level = Some(tx_power_level);
}
Some(PeripheralEventInternal::RssiRead(rssi)) => {
shared.emit_event(CentralEvent::RssiUpdate {
id: shared.uuid.into(),
rssi,
});
}
Some(PeripheralEventInternal::Disconnected) => (),
None => {
info!("Event receiver died, breaking out of corebluetooth device loop.");
break;
}
}
}
});
Self { shared: shared }
}
pub(super) fn update_name(
&self,
local_name: Option<String>,
advertisement_name: Option<String>,
) {
if let Ok(mut props) = self.shared.properties.lock() {
props.local_name = local_name;
props.advertisement_name = advertisement_name;
}
}
}
impl Display for Peripheral {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Peripheral")
}
}
impl Debug for Peripheral {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("Peripheral")
.field("uuid", &self.shared.uuid)
.field("services", &self.shared.services)
.field("properties", &self.shared.properties)
.field("message_sender", &self.shared.message_sender)
.finish()
}
}
#[async_trait]
impl api::Peripheral for Peripheral {
fn id(&self) -> PeripheralId {
PeripheralId(self.shared.uuid)
}
fn address(&self) -> BDAddr {
BDAddr::default()
}
fn mtu(&self) -> u16 {
self.shared.mtu.load(std::sync::atomic::Ordering::Relaxed)
}
async fn properties(&self) -> Result<Option<PeripheralProperties>> {
Ok(Some(
self.shared
.properties
.lock()
.map_err(Into::<Error>::into)?
.clone(),
))
}
fn services(&self) -> BTreeSet<Service> {
self.shared.services.lock().unwrap().clone()
}
async fn is_connected(&self) -> Result<bool> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::IsConnected {
peripheral_uuid: self.shared.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::State(state) => match state {
CBPeripheralState::Connected => Ok(true),
_ => Ok(false),
},
_ => panic!("Shouldn't get anything but a State!"),
}
}
async fn connect(&self) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::ConnectDevice {
peripheral_uuid: self.shared.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Connected => {
self.shared
.emit_event(CentralEvent::DeviceConnected(self.shared.uuid.into()));
}
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
_ => panic!("Shouldn't get anything but connected or err!"),
}
trace!("Device connected!");
Ok(())
}
async fn disconnect(&self) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::DisconnectDevice {
peripheral_uuid: self.shared.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Ok => {
self.shared
.emit_event(CentralEvent::DeviceDisconnected(self.shared.uuid.into()));
trace!("Device disconnected!");
}
_ => error!("Shouldn't get anything but Ok!"),
}
Ok(())
}
async fn discover_services(&self) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::DiscoverServices {
peripheral_uuid: self.shared.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::ServicesDiscovered(services) => {
*(self.shared.services.lock().map_err(Into::<Error>::into)?) = services;
return Ok(());
}
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
_ => panic!("Shouldn't get anything but discovered or err!"),
}
}
async fn write(
&self,
characteristic: &Characteristic,
data: &[u8],
mut write_type: WriteType,
) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
if write_type == WriteType::WithoutResponse
&& !characteristic
.properties
.contains(CharPropFlags::WRITE_WITHOUT_RESPONSE)
{
write_type = WriteType::WithResponse
}
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::WriteValue {
peripheral_uuid: self.shared.uuid,
service_uuid: characteristic.service_uuid,
characteristic_uuid: characteristic.uuid,
data: Vec::from(data),
write_type,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Ok => {}
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
reply => panic!("Unexpected reply: {:?}", reply),
}
Ok(())
}
async fn read(&self, characteristic: &Characteristic) -> Result<Vec<u8>> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::ReadValue {
peripheral_uuid: self.shared.uuid,
service_uuid: characteristic.service_uuid,
characteristic_uuid: characteristic.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::ReadResult(chars) => Ok(chars),
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
_ => {
panic!("Shouldn't get anything but read result!");
}
}
}
async fn subscribe(&self, characteristic: &Characteristic) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::Subscribe {
peripheral_uuid: self.shared.uuid,
service_uuid: characteristic.service_uuid,
characteristic_uuid: characteristic.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Ok => trace!("subscribed!"),
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
_ => panic!("Didn't subscribe!"),
}
Ok(())
}
async fn unsubscribe(&self, characteristic: &Characteristic) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::Unsubscribe {
peripheral_uuid: self.shared.uuid,
service_uuid: characteristic.service_uuid,
characteristic_uuid: characteristic.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Ok => {}
CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
_ => panic!("Didn't unsubscribe!"),
}
Ok(())
}
async fn notifications(&self) -> Result<Pin<Box<dyn Stream<Item = ValueNotification> + Send>>> {
let receiver = self.shared.notifications_channel.subscribe();
Ok(notifications_stream_from_broadcast_receiver(receiver))
}
async fn write_descriptor(&self, descriptor: &Descriptor, data: &[u8]) -> Result<()> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::WriteDescriptorValue {
peripheral_uuid: self.shared.uuid,
service_uuid: descriptor.service_uuid,
characteristic_uuid: descriptor.characteristic_uuid,
descriptor_uuid: descriptor.uuid,
data: Vec::from(data),
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::Ok => {}
reply => panic!("Unexpected reply: {:?}", reply),
}
Ok(())
}
async fn read_rssi(&self) -> Result<i16> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::ReadRssi {
peripheral_uuid: self.shared.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::ReadRssi(rssi) => Ok(rssi),
CoreBluetoothReply::Err(msg) => Err(Error::RuntimeError(msg)),
_ => panic!("Unexpected reply for read_rssi"),
}
}
async fn read_descriptor(&self, descriptor: &Descriptor) -> Result<Vec<u8>> {
let fut = CoreBluetoothReplyFuture::default();
self.shared
.message_sender
.to_owned()
.send(CoreBluetoothMessage::ReadDescriptorValue {
peripheral_uuid: self.shared.uuid,
service_uuid: descriptor.service_uuid,
characteristic_uuid: descriptor.characteristic_uuid,
descriptor_uuid: descriptor.uuid,
future: fut.get_state_clone(),
})
.await?;
match fut.await {
CoreBluetoothReply::ReadResult(chars) => Ok(chars),
_ => {
panic!("Shouldn't get anything but read result!");
}
}
}
}
impl From<Uuid> for PeripheralId {
fn from(uuid: Uuid) -> Self {
PeripheralId(uuid)
}
}
impl From<SendError> for Error {
fn from(_: SendError) -> Self {
Error::Other("Channel closed".to_string().into())
}
}