1use super::internal::{
9 CoreBluetoothMessage, CoreBluetoothReply, CoreBluetoothReplyFuture, PeripheralEventInternal,
10};
11use crate::{
12 api::{
13 self, BDAddr, CentralEvent, CharPropFlags, Characteristic, Descriptor,
14 PeripheralProperties, Service, ValueNotification, WriteType,
15 },
16 common::{adapter_manager::AdapterManager, util::notifications_stream_from_broadcast_receiver},
17 Error, Result,
18};
19use async_trait::async_trait;
20use futures::channel::mpsc::{Receiver, SendError, Sender};
21use futures::sink::SinkExt;
22use futures::stream::{Stream, StreamExt};
23use log::*;
24use objc2_core_bluetooth::CBPeripheralState;
25#[cfg(feature = "serde")]
26use serde::{Deserialize, Serialize};
27#[cfg(feature = "serde")]
28use serde_cr as serde;
29use std::sync::Weak;
30use std::{
31 collections::{BTreeSet, HashMap},
32 fmt::{self, Debug, Display, Formatter},
33 pin::Pin,
34 sync::{Arc, Mutex},
35};
36use tokio::sync::broadcast;
37use tokio::task;
38use uuid::Uuid;
39
40#[cfg_attr(
41 feature = "serde",
42 derive(Serialize, Deserialize),
43 serde(crate = "serde_cr")
44)]
45#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
46pub struct PeripheralId(Uuid);
47
48impl Display for PeripheralId {
49 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
50 Display::fmt(&self.0, f)
51 }
52}
53
54#[derive(Clone)]
56pub struct Peripheral {
57 shared: Arc<Shared>,
58}
59
60struct Shared {
61 notifications_channel: broadcast::Sender<ValueNotification>,
62 manager: Weak<AdapterManager<Peripheral>>,
63 uuid: Uuid,
64 services: Mutex<BTreeSet<Service>>,
65 properties: Mutex<PeripheralProperties>,
66 message_sender: Sender<CoreBluetoothMessage>,
67 }
71
72impl Shared {
73 fn emit_event(&self, event: CentralEvent) {
74 if let Some(manager) = self.manager.upgrade() {
75 manager.emit(event);
76 } else {
77 trace!("Could not emit an event. AdapterManager has been dropped");
78 }
79 }
80}
81
82impl Peripheral {
83 pub(crate) fn new(
85 uuid: Uuid,
86 local_name: Option<String>,
87 manager: Weak<AdapterManager<Self>>,
88 event_receiver: Receiver<PeripheralEventInternal>,
89 message_sender: Sender<CoreBluetoothMessage>,
90 ) -> Self {
91 let properties = Mutex::from(PeripheralProperties {
94 address: BDAddr::default(),
95 address_type: None,
96 local_name,
97 tx_power_level: None,
98 rssi: None,
99 manufacturer_data: HashMap::new(),
100 service_data: HashMap::new(),
101 services: Vec::new(),
102 class: None,
103 });
104 let (notifications_channel, _) = broadcast::channel(16);
105
106 let shared = Arc::new(Shared {
107 properties,
108 manager,
109 services: Mutex::new(BTreeSet::new()),
110 notifications_channel,
111 uuid,
112 message_sender,
113 });
114 let shared_clone = shared.clone();
115 task::spawn(async move {
116 let mut event_receiver = event_receiver;
117 let shared = shared_clone;
118
119 loop {
120 match event_receiver.next().await {
121 Some(PeripheralEventInternal::Notification(uuid, data)) => {
122 let notification = ValueNotification { uuid, value: data };
123
124 let _ = shared.notifications_channel.send(notification);
127 }
128 Some(PeripheralEventInternal::ManufacturerData(
129 manufacturer_id,
130 data,
131 rssi,
132 )) => {
133 let mut properties = shared.properties.lock().unwrap();
134 properties.rssi = Some(rssi);
135 properties
136 .manufacturer_data
137 .insert(manufacturer_id, data.clone());
138 shared.emit_event(CentralEvent::ManufacturerDataAdvertisement {
139 id: shared.uuid.into(),
140 manufacturer_data: properties.manufacturer_data.clone(),
141 });
142 }
143 Some(PeripheralEventInternal::ServiceData(service_data, rssi)) => {
144 let mut properties = shared.properties.lock().unwrap();
145 properties.rssi = Some(rssi);
146 properties.service_data.extend(service_data.clone());
147
148 shared.emit_event(CentralEvent::ServiceDataAdvertisement {
149 id: shared.uuid.into(),
150 service_data,
151 });
152 }
153 Some(PeripheralEventInternal::Services(services, rssi)) => {
154 let mut properties = shared.properties.lock().unwrap();
155 properties.rssi = Some(rssi);
156 properties.services = services.clone();
157
158 shared.emit_event(CentralEvent::ServicesAdvertisement {
159 id: shared.uuid.into(),
160 services,
161 });
162 }
163 Some(PeripheralEventInternal::Disconnected) => (),
164 None => {
165 info!("Event receiver died, breaking out of corebluetooth device loop.");
166 break;
167 }
168 }
169 }
170 });
171 Self { shared: shared }
172 }
173
174 pub(super) fn update_name(&self, name: &str) {
175 self.shared.properties.lock().unwrap().local_name = Some(name.to_string());
176 }
177}
178
179impl Display for Peripheral {
180 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
181 write!(f, "Peripheral")
186 }
187}
188
189impl Debug for Peripheral {
190 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
191 f.debug_struct("Peripheral")
192 .field("uuid", &self.shared.uuid)
193 .field("services", &self.shared.services)
194 .field("properties", &self.shared.properties)
195 .field("message_sender", &self.shared.message_sender)
196 .finish()
197 }
198}
199
200#[async_trait]
201impl api::Peripheral for Peripheral {
202 fn id(&self) -> PeripheralId {
203 PeripheralId(self.shared.uuid)
204 }
205
206 fn address(&self) -> BDAddr {
207 BDAddr::default()
208 }
209
210 async fn properties(&self) -> Result<Option<PeripheralProperties>> {
211 Ok(Some(
212 self.shared
213 .properties
214 .lock()
215 .map_err(Into::<Error>::into)?
216 .clone(),
217 ))
218 }
219
220 fn services(&self) -> BTreeSet<Service> {
221 self.shared.services.lock().unwrap().clone()
222 }
223
224 async fn is_connected(&self) -> Result<bool> {
225 let fut = CoreBluetoothReplyFuture::default();
226 self.shared
227 .message_sender
228 .to_owned()
229 .send(CoreBluetoothMessage::IsConnected {
230 peripheral_uuid: self.shared.uuid,
231 future: fut.get_state_clone(),
232 })
233 .await?;
234 match fut.await {
235 CoreBluetoothReply::State(state) => match state {
236 CBPeripheralState::Connected => Ok(true),
237 _ => Ok(false),
238 },
239 _ => panic!("Shouldn't get anything but a State!"),
240 }
241 }
242
243 async fn connect(&self) -> Result<()> {
244 let fut = CoreBluetoothReplyFuture::default();
245 self.shared
246 .message_sender
247 .to_owned()
248 .send(CoreBluetoothMessage::ConnectDevice {
249 peripheral_uuid: self.shared.uuid,
250 future: fut.get_state_clone(),
251 })
252 .await?;
253 match fut.await {
254 CoreBluetoothReply::Connected(services) => {
255 *(self.shared.services.lock().map_err(Into::<Error>::into)?) = services;
256 self.shared
257 .emit_event(CentralEvent::DeviceConnected(self.shared.uuid.into()));
258 }
259 CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
260 _ => panic!("Shouldn't get anything but connected or err!"),
261 }
262 trace!("Device connected!");
263 Ok(())
264 }
265
266 async fn disconnect(&self) -> Result<()> {
267 let fut = CoreBluetoothReplyFuture::default();
268 self.shared
269 .message_sender
270 .to_owned()
271 .send(CoreBluetoothMessage::DisconnectDevice {
272 peripheral_uuid: self.shared.uuid,
273 future: fut.get_state_clone(),
274 })
275 .await?;
276 match fut.await {
277 CoreBluetoothReply::Ok => {
278 self.shared
279 .emit_event(CentralEvent::DeviceDisconnected(self.shared.uuid.into()));
280 trace!("Device disconnected!");
281 }
282 _ => error!("Shouldn't get anything but Ok!"),
283 }
284 Ok(())
285 }
286
287 async fn discover_services(&self) -> Result<()> {
288 Ok(())
290 }
291
292 async fn write(
293 &self,
294 characteristic: &Characteristic,
295 data: &[u8],
296 mut write_type: WriteType,
297 ) -> Result<()> {
298 let fut = CoreBluetoothReplyFuture::default();
299 if write_type == WriteType::WithoutResponse
303 && !characteristic
304 .properties
305 .contains(CharPropFlags::WRITE_WITHOUT_RESPONSE)
306 {
307 write_type = WriteType::WithResponse
308 }
309 self.shared
310 .message_sender
311 .to_owned()
312 .send(CoreBluetoothMessage::WriteValue {
313 peripheral_uuid: self.shared.uuid,
314 service_uuid: characteristic.service_uuid,
315 characteristic_uuid: characteristic.uuid,
316 data: Vec::from(data),
317 write_type,
318 future: fut.get_state_clone(),
319 })
320 .await?;
321 match fut.await {
322 CoreBluetoothReply::Ok => {}
323 CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
324 reply => panic!("Unexpected reply: {:?}", reply),
325 }
326 Ok(())
327 }
328
329 async fn read(&self, characteristic: &Characteristic) -> Result<Vec<u8>> {
330 let fut = CoreBluetoothReplyFuture::default();
331 self.shared
332 .message_sender
333 .to_owned()
334 .send(CoreBluetoothMessage::ReadValue {
335 peripheral_uuid: self.shared.uuid,
336 service_uuid: characteristic.service_uuid,
337 characteristic_uuid: characteristic.uuid,
338 future: fut.get_state_clone(),
339 })
340 .await?;
341 match fut.await {
342 CoreBluetoothReply::ReadResult(chars) => Ok(chars),
343 CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
344 _ => {
345 panic!("Shouldn't get anything but read result!");
346 }
347 }
348 }
349
350 async fn subscribe(&self, characteristic: &Characteristic) -> Result<()> {
351 let fut = CoreBluetoothReplyFuture::default();
352 self.shared
353 .message_sender
354 .to_owned()
355 .send(CoreBluetoothMessage::Subscribe {
356 peripheral_uuid: self.shared.uuid,
357 service_uuid: characteristic.service_uuid,
358 characteristic_uuid: characteristic.uuid,
359 future: fut.get_state_clone(),
360 })
361 .await?;
362 match fut.await {
363 CoreBluetoothReply::Ok => trace!("subscribed!"),
364 CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
365 _ => panic!("Didn't subscribe!"),
366 }
367 Ok(())
368 }
369
370 async fn unsubscribe(&self, characteristic: &Characteristic) -> Result<()> {
371 let fut = CoreBluetoothReplyFuture::default();
372 self.shared
373 .message_sender
374 .to_owned()
375 .send(CoreBluetoothMessage::Unsubscribe {
376 peripheral_uuid: self.shared.uuid,
377 service_uuid: characteristic.service_uuid,
378 characteristic_uuid: characteristic.uuid,
379 future: fut.get_state_clone(),
380 })
381 .await?;
382 match fut.await {
383 CoreBluetoothReply::Ok => {}
384 CoreBluetoothReply::Err(msg) => return Err(Error::RuntimeError(msg)),
385 _ => panic!("Didn't unsubscribe!"),
386 }
387 Ok(())
388 }
389
390 async fn notifications(&self) -> Result<Pin<Box<dyn Stream<Item = ValueNotification> + Send>>> {
391 let receiver = self.shared.notifications_channel.subscribe();
392 Ok(notifications_stream_from_broadcast_receiver(receiver))
393 }
394
395 async fn write_descriptor(&self, descriptor: &Descriptor, data: &[u8]) -> Result<()> {
396 let fut = CoreBluetoothReplyFuture::default();
397 self.shared
398 .message_sender
399 .to_owned()
400 .send(CoreBluetoothMessage::WriteDescriptorValue {
401 peripheral_uuid: self.shared.uuid,
402 service_uuid: descriptor.service_uuid,
403 characteristic_uuid: descriptor.characteristic_uuid,
404 descriptor_uuid: descriptor.uuid,
405 data: Vec::from(data),
406 future: fut.get_state_clone(),
407 })
408 .await?;
409 match fut.await {
410 CoreBluetoothReply::Ok => {}
411 reply => panic!("Unexpected reply: {:?}", reply),
412 }
413 Ok(())
414 }
415
416 async fn read_descriptor(&self, descriptor: &Descriptor) -> Result<Vec<u8>> {
417 let fut = CoreBluetoothReplyFuture::default();
418 self.shared
419 .message_sender
420 .to_owned()
421 .send(CoreBluetoothMessage::ReadDescriptorValue {
422 peripheral_uuid: self.shared.uuid,
423 service_uuid: descriptor.service_uuid,
424 characteristic_uuid: descriptor.characteristic_uuid,
425 descriptor_uuid: descriptor.uuid,
426 future: fut.get_state_clone(),
427 })
428 .await?;
429 match fut.await {
430 CoreBluetoothReply::ReadResult(chars) => Ok(chars),
431 _ => {
432 panic!("Shouldn't get anything but read result!");
433 }
434 }
435 }
436}
437
438impl From<Uuid> for PeripheralId {
439 fn from(uuid: Uuid) -> Self {
440 PeripheralId(uuid)
441 }
442}
443
444impl From<SendError> for Error {
445 fn from(_: SendError) -> Self {
446 Error::Other("Channel closed".to_string().into())
447 }
448}