use super::{
comm_managers::{
DeviceCommunicationEvent,
DeviceCommunicationManager,
DeviceCommunicationManagerCreator,
},
device_manager_event_loop::DeviceManagerEventLoop,
ping_timer::PingTimer,
ButtplugServerStartupError,
};
use crate::{
core::{
errors::{ButtplugDeviceError, ButtplugMessageError, ButtplugUnknownError},
messages::{
self,
ButtplugClientMessage,
ButtplugDeviceCommandMessageUnion,
ButtplugDeviceManagerMessageUnion,
ButtplugDeviceMessage,
ButtplugMessage,
ButtplugServerMessage,
DeviceList,
DeviceMessageInfo,
},
},
device::{configuration_manager::DeviceConfigurationManager, ButtplugDevice},
server::ButtplugServerResultFuture,
test::{TestDeviceCommunicationManager, TestDeviceCommunicationManagerHelper},
util::async_manager,
};
use dashmap::DashMap;
use futures::future;
use std::{
convert::TryFrom,
sync::{atomic::Ordering, Arc},
};
use tokio::sync::{broadcast, mpsc};
pub struct DeviceManager {
comm_managers: Arc<DashMap<String, Box<dyn DeviceCommunicationManager>>>,
devices: Arc<DashMap<u32, Arc<ButtplugDevice>>>,
device_event_sender: mpsc::Sender<DeviceCommunicationEvent>,
}
unsafe impl Send for DeviceManager {
}
unsafe impl Sync for DeviceManager {
}
impl DeviceManager {
pub fn try_new(
output_sender: broadcast::Sender<ButtplugServerMessage>,
ping_timer: Arc<PingTimer>,
allow_raw_messages: bool,
device_config_json: &Option<String>,
user_device_config_json: &Option<String>,
) -> Result<Self, ButtplugDeviceError> {
let config = Arc::new(DeviceConfigurationManager::new_with_options(
allow_raw_messages,
device_config_json,
user_device_config_json,
)?);
let devices = Arc::new(DashMap::new());
let (device_event_sender, device_event_receiver) = mpsc::channel(256);
let mut event_loop = DeviceManagerEventLoop::new(
config,
output_sender,
devices.clone(),
ping_timer,
device_event_receiver,
);
async_manager::spawn(async move {
event_loop.run().await;
})
.unwrap();
Ok(Self {
device_event_sender,
devices,
comm_managers: Arc::new(DashMap::new()),
})
}
fn start_scanning(&self) -> ButtplugServerResultFuture {
if self.comm_managers.is_empty() {
ButtplugUnknownError::NoDeviceCommManagers.into()
} else {
let mgrs = self.comm_managers.clone();
let sender = self.device_event_sender.clone();
Box::pin(async move {
for mgr in mgrs.iter() {
if mgr.value().scanning_status().load(Ordering::SeqCst) {
return Err(ButtplugDeviceError::DeviceScanningAlreadyStarted.into());
}
}
let fut_vec: Vec<_> = mgrs
.iter()
.map(|guard| guard.value().start_scanning())
.collect();
future::join_all(fut_vec).await;
debug!("All managers started, sending ScanningStarted (and invoking ScanningFinished hack) signal to event loop.");
if sender
.send(DeviceCommunicationEvent::ScanningStarted)
.await
.is_err()
|| sender
.send(DeviceCommunicationEvent::ScanningFinished)
.await
.is_err()
{
error!("Device manager event loop shut down, cannot send ScanningStarted");
}
Ok(messages::Ok::default().into())
})
}
}
fn stop_scanning(&self) -> ButtplugServerResultFuture {
if self.comm_managers.is_empty() {
ButtplugUnknownError::NoDeviceCommManagers.into()
} else {
let mgrs = self.comm_managers.clone();
Box::pin(async move {
let mut scanning_stopped = true;
for mgr in mgrs.iter() {
if mgr.value().scanning_status().load(Ordering::SeqCst) {
debug!("Device manager {} has not stopped scanning yet.", mgr.key());
scanning_stopped = false;
break;
}
}
if scanning_stopped {
return Err(ButtplugDeviceError::DeviceScanningAlreadyStopped.into());
}
let fut_vec: Vec<_> = mgrs
.iter()
.map(|guard| guard.value().stop_scanning())
.collect();
future::join_all(fut_vec).await;
Ok(messages::Ok::default().into())
})
}
}
fn stop_all_devices(&self) -> ButtplugServerResultFuture {
let device_map = self.devices.clone();
Box::pin(async move {
let fut_vec: Vec<_> = device_map
.iter()
.map(|dev| {
let device = dev.value();
device.parse_message(messages::StopDeviceCmd::new(1).into())
})
.collect();
future::join_all(fut_vec).await;
Ok(messages::Ok::default().into())
})
}
fn parse_device_message(
&self,
device_msg: ButtplugDeviceCommandMessageUnion,
) -> ButtplugServerResultFuture {
match self.devices.get(&device_msg.device_index()) {
Some(device) => {
let fut = device.parse_message(device_msg);
Box::pin(async move { fut.await })
}
None => ButtplugDeviceError::DeviceNotAvailable(device_msg.device_index()).into(),
}
}
fn parse_device_manager_message(
&self,
manager_msg: ButtplugDeviceManagerMessageUnion,
) -> ButtplugServerResultFuture {
match manager_msg {
ButtplugDeviceManagerMessageUnion::RequestDeviceList(msg) => {
let devices = self
.devices
.iter()
.map(|device| {
let dev = device.value();
DeviceMessageInfo::new(*device.key(), &dev.name(), dev.message_attributes())
})
.collect();
let mut device_list = DeviceList::new(devices);
device_list.set_id(msg.id());
Box::pin(future::ready(Ok(device_list.into())))
}
ButtplugDeviceManagerMessageUnion::StopAllDevices(_) => self.stop_all_devices(),
ButtplugDeviceManagerMessageUnion::StartScanning(_) => self.start_scanning(),
ButtplugDeviceManagerMessageUnion::StopScanning(_) => self.stop_scanning(),
}
}
pub fn parse_message(&self, msg: ButtplugClientMessage) -> ButtplugServerResultFuture {
match ButtplugDeviceCommandMessageUnion::try_from(msg.clone()) {
Ok(device_msg) => self.parse_device_message(device_msg),
Err(_) => match ButtplugDeviceManagerMessageUnion::try_from(msg.clone()) {
Ok(manager_msg) => self.parse_device_manager_message(manager_msg),
Err(_) => ButtplugMessageError::UnexpectedMessageType(format!("{:?}", msg)).into(),
},
}
}
pub fn add_comm_manager<T>(&self) -> Result<(), ButtplugServerStartupError>
where
T: 'static + DeviceCommunicationManager + DeviceCommunicationManagerCreator,
{
let mgr = T::new(self.device_event_sender.clone());
if self.comm_managers.contains_key(mgr.name()) {
return Err(ButtplugServerStartupError::DeviceManagerTypeAlreadyAdded(
mgr.name().to_owned(),
));
}
let status = mgr.scanning_status();
let sender = self.device_event_sender.clone();
async_manager::spawn(async move {
sender
.send(DeviceCommunicationEvent::DeviceManagerAdded(status))
.await
.unwrap();
})
.unwrap();
self
.comm_managers
.insert(mgr.name().to_owned(), Box::new(mgr));
Ok(())
}
pub fn add_test_comm_manager(
&self,
) -> Result<TestDeviceCommunicationManagerHelper, ButtplugServerStartupError> {
let mgr = TestDeviceCommunicationManager::new(self.device_event_sender.clone());
if self.comm_managers.contains_key(mgr.name()) {
return Err(ButtplugServerStartupError::DeviceManagerTypeAlreadyAdded(
mgr.name().to_owned(),
));
}
let status = mgr.scanning_status();
let sender = self.device_event_sender.clone();
async_manager::spawn(async move {
sender
.send(DeviceCommunicationEvent::DeviceManagerAdded(status))
.await
.unwrap();
})
.unwrap();
let helper = mgr.helper();
self
.comm_managers
.insert(mgr.name().to_owned(), Box::new(mgr));
Ok(helper)
}
}
impl Drop for DeviceManager {
fn drop(&mut self) {
info!("Dropping device manager!");
}
}