use super::{comm_managers::DeviceCommunicationEvent, ping_timer::PingTimer};
use crate::{
core::messages::{
ButtplugServerMessage,
DeviceAdded,
DeviceRemoved,
ScanningFinished,
StopDeviceCmd,
},
device::{
configuration_manager::DeviceConfigurationManager,
ButtplugDevice,
ButtplugDeviceEvent,
ButtplugDeviceImplCreator,
},
util::async_manager,
};
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::{broadcast, mpsc};
pub struct DeviceManagerEventLoop {
device_config_manager: Arc<DeviceConfigurationManager>,
device_index_generator: u32,
device_map: Arc<DashMap<u32, Arc<ButtplugDevice>>>,
ping_timer: Arc<PingTimer>,
device_index_map: Arc<DashMap<String, u32>>,
server_sender: broadcast::Sender<ButtplugServerMessage>,
device_comm_receiver: mpsc::Receiver<DeviceCommunicationEvent>,
device_event_sender: mpsc::Sender<ButtplugDeviceEvent>,
device_event_receiver: mpsc::Receiver<ButtplugDeviceEvent>,
scanning_in_progress: bool,
comm_manager_scanning_statuses: Vec<Arc<AtomicBool>>,
}
impl DeviceManagerEventLoop {
pub fn new(
device_config_manager: Arc<DeviceConfigurationManager>,
server_sender: broadcast::Sender<ButtplugServerMessage>,
device_map: Arc<DashMap<u32, Arc<ButtplugDevice>>>,
ping_timer: Arc<PingTimer>,
device_comm_receiver: mpsc::Receiver<DeviceCommunicationEvent>,
) -> Self {
let (device_event_sender, device_event_receiver) = mpsc::channel(256);
Self {
device_config_manager,
server_sender,
device_map,
ping_timer,
device_comm_receiver,
device_index_generator: 0,
device_index_map: Arc::new(DashMap::new()),
device_event_sender,
device_event_receiver,
scanning_in_progress: false,
comm_manager_scanning_statuses: vec![],
}
}
fn try_create_new_device(&mut self, device_creator: Box<dyn ButtplugDeviceImplCreator>) {
let device_event_sender_clone = self.device_event_sender.clone();
let create_device_future =
ButtplugDevice::try_create_device(self.device_config_manager.clone(), device_creator);
async_manager::spawn(async move {
match create_device_future.await {
Ok(option_dev) => match option_dev {
Some(device) => {
if device_event_sender_clone
.send(ButtplugDeviceEvent::Connected(Arc::new(device)))
.await
.is_err() {
error!("Device manager disappeared before connection established, device will be dropped.");
}
}
None => debug!("Device could not be matched to a protocol."),
},
Err(e) => error!("Device errored while trying to connect: {}", e),
}
})
.unwrap();
}
async fn handle_device_communication(&mut self, event: DeviceCommunicationEvent) {
match event {
DeviceCommunicationEvent::ScanningStarted => {
self.scanning_in_progress = true;
}
DeviceCommunicationEvent::ScanningFinished => {
debug!(
"System signaled that scanning was finished, check to see if all managers are finished."
);
if !self.scanning_in_progress {
debug!("Manager finished before scanning was fully started, continuing event loop.");
return;
}
if self
.comm_manager_scanning_statuses
.iter()
.any(|x| x.load(Ordering::SeqCst))
{
debug!("At least one manager still scanning, continuing event loop.");
return;
}
debug!("All managers finished, emitting ScanningFinished");
self.scanning_in_progress = false;
if self
.server_sender
.send(ScanningFinished::default().into())
.is_err()
{
error!("Server disappeared, exiting loop.");
return;
}
}
DeviceCommunicationEvent::DeviceFound(device_creator) => {
self.try_create_new_device(device_creator);
}
DeviceCommunicationEvent::DeviceManagerAdded(status) => {
self.comm_manager_scanning_statuses.push(status);
}
}
}
async fn handle_device_event(&mut self, device_event: ButtplugDeviceEvent) {
info!("Got device event: {:?}", device_event);
match device_event {
ButtplugDeviceEvent::Connected(device) => {
let generated_device_index = self.device_index_generator;
self.device_index_generator += 1;
let device_index = if let Some(id) = self.device_index_map.get(device.address()) {
*id.value()
} else {
self
.device_index_map
.insert(device.address().to_owned(), generated_device_index);
generated_device_index
};
if self.device_map.contains_key(&device_index) {
info!("Device map contains key!");
let (_, old_device) = self.device_map.remove(&device_index).unwrap();
if let Err(err) = old_device.disconnect().await {
error!("Error during index collision disconnect: {:?}", err);
}
} else {
info!("Device map does not contain key!");
}
let mut event_listener = device.event_stream();
let event_sender = self.device_event_sender.clone();
async_manager::spawn(async move {
while let Ok(event) = event_listener.recv().await {
event_sender.send(event).await.unwrap();
}
})
.unwrap();
info!("Assigning index {} to {}", device_index, device.name());
let device_added_message =
DeviceAdded::new(device_index, &device.name(), &device.message_attributes());
self.device_map.insert(device_index, device);
if self
.server_sender
.send(device_added_message.into())
.is_err()
{
error!("Server disappeared.");
}
}
ButtplugDeviceEvent::Removed(address) => {
let device_index = *self.device_index_map.get(&address).unwrap().value();
self.device_map.remove(&device_index).unwrap();
if self
.server_sender
.send(DeviceRemoved::new(device_index).into())
.is_err()
{
error!("Server disappeared.");
}
}
ButtplugDeviceEvent::Notification(_address, _endpoint, _data) => {
}
}
}
async fn handle_ping_timeout(&self) {
error!("Pinged out, stopping devices");
let mut fut_vec = FuturesUnordered::new();
self.device_map.iter().for_each(|dev| {
let device = dev.value();
fut_vec.push(device.parse_message(StopDeviceCmd::new(1).into()))
});
async_manager::spawn(async move {
while let Some(val) = fut_vec.next().await {
if let Err(e) = val {
error!("Error stopping device on ping timeout: {}", e);
}
}
})
.unwrap();
}
pub async fn run(&mut self) {
loop {
select! {
_ = self.ping_timer.ping_timeout_waiter().fuse() => {
self.handle_ping_timeout().await;
},
device_comm_msg = self.device_comm_receiver.recv().fuse() => {
if let Some(msg) = device_comm_msg {
self.handle_device_communication(msg).await;
} else {
break;
}
}
device_event_msg = self.device_event_receiver.recv().fuse() => {
if let Some(msg) = device_event_msg {
self.handle_device_event(msg).await;
} else {
panic!("We shouldn't be able to get here since we also own the sender.");
}
},
}
}
}
}