use crate::api::{CentralEvent, Peripheral};
use crate::platform::PeripheralId;
use dashmap::{DashMap, mapref::one::RefMut};
use futures::stream::{Stream, StreamExt};
use log::trace;
use std::pin::Pin;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
#[derive(Debug)]
pub struct AdapterManager<PeripheralType>
where
PeripheralType: Peripheral,
{
peripherals: DashMap<PeripheralId, PeripheralType>,
events_channel: broadcast::Sender<CentralEvent>,
}
impl<PeripheralType: Peripheral + 'static> Default for AdapterManager<PeripheralType> {
fn default() -> Self {
let (broadcast_sender, _) = broadcast::channel(16);
AdapterManager {
peripherals: DashMap::new(),
events_channel: broadcast_sender,
}
}
}
impl<PeripheralType> AdapterManager<PeripheralType>
where
PeripheralType: Peripheral + 'static,
{
pub fn emit(&self, event: CentralEvent) {
if let CentralEvent::DeviceDisconnected(ref id) = event {
self.peripherals.remove(id);
}
if let Err(lost) = self.events_channel.send(event) {
trace!("Lost central event, while nothing subscribed: {:?}", lost);
}
}
pub fn event_stream(&self) -> Pin<Box<dyn Stream<Item = CentralEvent> + Send>> {
let receiver = self.events_channel.subscribe();
Box::pin(BroadcastStream::new(receiver).filter_map(|x| async move { x.ok() }))
}
pub fn add_peripheral(&self, peripheral: PeripheralType) {
assert!(
!self.peripherals.contains_key(&peripheral.id()),
"Adding a peripheral that's already in the map."
);
self.peripherals.insert(peripheral.id(), peripheral);
}
pub fn clear_peripherals(&self) {
self.peripherals.clear();
}
pub fn peripherals(&self) -> Vec<PeripheralType> {
self.peripherals
.iter()
.map(|val| val.value().clone())
.collect()
}
#[allow(dead_code)]
pub fn peripheral_mut(
&self,
id: &PeripheralId,
) -> Option<RefMut<'_, PeripheralId, PeripheralType>> {
self.peripherals.get_mut(id)
}
pub fn peripheral(&self, id: &PeripheralId) -> Option<PeripheralType> {
self.peripherals.get(id).map(|val| val.value().clone())
}
}