use super::{
comm_managers::{
DeviceCommunicationEvent,
DeviceCommunicationManager,
DeviceCommunicationManagerCreator,
},
ButtplugServerStartupError,
};
use crate::{
core::{
errors::{ButtplugDeviceError, ButtplugMessageError, ButtplugUnknownError},
messages::{
self,
ButtplugClientMessage,
ButtplugDeviceCommandMessageUnion,
ButtplugDeviceManagerMessageUnion,
ButtplugDeviceMessage,
ButtplugMessage,
ButtplugServerMessage,
DeviceAdded,
DeviceList,
DeviceMessageInfo,
DeviceRemoved,
ScanningFinished,
},
},
device::{
configuration_manager::DeviceConfigurationManager,
ButtplugDevice,
ButtplugDeviceEvent,
},
server::ButtplugServerResultFuture,
test::{TestDeviceCommunicationManager, TestDeviceCommunicationManagerHelper},
util::async_manager,
};
use async_channel::{bounded, Receiver, Sender};
use dashmap::DashMap;
use futures::{
future::{self, Future},
FutureExt,
StreamExt,
};
use std::{
convert::TryFrom,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};
enum DeviceEvent {
DeviceCommunicationEvent(Option<DeviceCommunicationEvent>),
DeviceEvent(Option<(u32, ButtplugDeviceEvent)>),
PingTimeout,
}
fn wait_for_manager_events(
device_config_manager: Arc<DeviceConfigurationManager>,
ping_receiver: Option<Receiver<()>>,
server_sender: Sender<ButtplugServerMessage>,
) -> (
impl Future<Output = ()>,
Arc<DashMap<u32, ButtplugDevice>>,
Sender<DeviceCommunicationEvent>,
) {
let main_device_index = Arc::new(AtomicU32::new(0));
let (device_event_sender, mut device_event_receiver) = bounded::<(u32, ButtplugDeviceEvent)>(256);
let device_map = Arc::new(DashMap::new());
let (device_comm_sender, mut device_comm_receiver) = bounded(256);
let device_map_return = device_map.clone();
let mut device_manager_status: Vec<Arc<AtomicBool>> = vec![];
let event_loop = async move {
loop {
let ping_fut = async {
if let Some(recv) = &ping_receiver {
if recv.recv().await.is_err() {
error!("Ping sender disappeared, meaning server has died. Exiting.");
}
} else {
futures::future::pending::<()>().await;
}
DeviceEvent::PingTimeout
};
let manager_event = select! {
device_comm = device_comm_receiver.next().fuse() => DeviceEvent::DeviceCommunicationEvent(device_comm),
device_event = device_event_receiver.next().fuse() => DeviceEvent::DeviceEvent(device_event),
ping = ping_fut.fuse() => ping
};
match manager_event {
DeviceEvent::DeviceCommunicationEvent(e) => match e {
Some(event) => match event {
DeviceCommunicationEvent::DeviceFound(device_creator) => {
let device_index = main_device_index.load(Ordering::SeqCst);
main_device_index.store(
main_device_index.load(Ordering::SeqCst) + 1,
Ordering::SeqCst,
);
let device_event_sender_clone = device_event_sender.clone();
let device_map_clone = device_map.clone();
let server_sender_clone = server_sender.clone();
let device_config_mgr_clone = device_config_manager.clone();
async_manager::spawn(async move {
match ButtplugDevice::try_create_device(device_config_mgr_clone, device_creator)
.await
{
Ok(option_dev) => match option_dev {
Some(device) => {
info!("Assigning index {} to {}", device_index, device.name());
let mut recv = device.get_event_receiver();
let sender_clone = device_event_sender_clone.clone();
let idx_clone = device_index;
async_manager::spawn(async move {
while let Some(e) = recv.next().await {
if sender_clone.send((idx_clone, e)).await.is_err() {
error!("Device event receiver disappeared, exiting loop.");
return;
}
}
})
.unwrap();
let device_added_message = DeviceAdded::new(
device_index,
&device.name(),
&device.message_attributes(),
);
device_map_clone.insert(device_index, device);
if server_sender_clone
.send(device_added_message.into())
.await
.is_err()
{
error!("Server disappeared, exiting loop.");
return;
}
}
None => debug!("Device could not be matched to a protocol."),
},
Err(e) => error!("Device errored while trying to connect: {}", e),
}
})
.unwrap();
}
DeviceCommunicationEvent::ScanningFinished => {
for comm_mgr_status in &device_manager_status {
if comm_mgr_status.load(Ordering::SeqCst) {
continue;
}
}
if server_sender
.send(ScanningFinished::default().into())
.await
.is_err()
{
error!("Server disappeared, exiting loop.");
return;
}
}
DeviceCommunicationEvent::DeviceManagerAdded(status) => {
device_manager_status.push(status);
}
},
None => break,
},
DeviceEvent::DeviceEvent(e) => match e {
Some((idx, event)) => {
if let ButtplugDeviceEvent::Removed = event {
device_map.remove(&idx);
if server_sender
.send(DeviceRemoved::new(idx).into())
.await
.is_err()
{
error!("Server disappeared, exiting loop.");
return;
}
}
info!("Got device event: {:?}", event);
}
None => break,
},
DeviceEvent::PingTimeout => {
error!("Pinged out, stopping devices");
let fut_vec: Vec<_> = device_map
.iter()
.map(|dev| {
let device = dev.value();
device.parse_message(messages::StopDeviceCmd::new(1).into())
})
.collect();
for fut in fut_vec {
if let Err(e) = fut.await {
error!("Error stopping device on ping timeout: {}", e);
}
}
break;
}
}
}
};
(event_loop, device_map_return, device_comm_sender)
}
pub struct DeviceManager {
comm_managers: Arc<DashMap<String, Box<dyn DeviceCommunicationManager>>>,
devices: Arc<DashMap<u32, ButtplugDevice>>,
sender: Sender<DeviceCommunicationEvent>,
}
unsafe impl Send for DeviceManager {
}
unsafe impl Sync for DeviceManager {
}
impl DeviceManager {
pub fn new_with_options(
event_sender: Sender<ButtplugServerMessage>,
ping_receiver: Option<Receiver<()>>,
allow_raw_messages: bool,
device_config_json: &Option<String>,
user_device_config_json: &Option<String>,
) -> Result<Self, ButtplugDeviceError> {
let config = Arc::new(DeviceConfigurationManager::new_with_options(
allow_raw_messages,
device_config_json,
user_device_config_json,
)?);
let (event_loop_fut, device_map, device_event_sender) =
wait_for_manager_events(config, ping_receiver, event_sender);
async_manager::spawn(event_loop_fut).unwrap();
Ok(Self {
sender: device_event_sender,
devices: device_map,
comm_managers: Arc::new(DashMap::new()),
})
}
fn start_scanning(&self) -> ButtplugServerResultFuture {
if self.comm_managers.is_empty() {
ButtplugUnknownError::NoDeviceCommManagers.into()
} else {
let mgrs = self.comm_managers.clone();
Box::pin(async move {
for mgr in mgrs.iter() {
if mgr.value().scanning_status().load(Ordering::SeqCst) {
return Err(ButtplugDeviceError::DeviceScanningAlreadyStarted.into());
}
}
let fut_vec: Vec<_> = mgrs
.iter()
.map(|guard| guard.value().start_scanning())
.collect();
future::join_all(fut_vec).await;
Ok(messages::Ok::default().into())
})
}
}
fn stop_scanning(&self) -> ButtplugServerResultFuture {
if self.comm_managers.is_empty() {
ButtplugUnknownError::NoDeviceCommManagers.into()
} else {
let mgrs = self.comm_managers.clone();
Box::pin(async move {
let mut scanning_stopped = true;
for mgr in mgrs.iter() {
if mgr.value().scanning_status().load(Ordering::SeqCst) {
scanning_stopped = false;
break;
}
}
if scanning_stopped {
return Err(ButtplugDeviceError::DeviceScanningAlreadyStopped.into());
}
let fut_vec: Vec<_> = mgrs
.iter()
.map(|guard| guard.value().stop_scanning())
.collect();
future::join_all(fut_vec).await;
Ok(messages::Ok::default().into())
})
}
}
fn stop_all_devices(&self) -> ButtplugServerResultFuture {
let device_map = self.devices.clone();
Box::pin(async move {
let fut_vec: Vec<_> = device_map
.iter()
.map(|dev| {
let device = dev.value();
device.parse_message(messages::StopDeviceCmd::new(1).into())
})
.collect();
future::join_all(fut_vec).await;
Ok(messages::Ok::default().into())
})
}
fn parse_device_message(
&self,
device_msg: ButtplugDeviceCommandMessageUnion,
) -> ButtplugServerResultFuture {
match self.devices.get(&device_msg.get_device_index()) {
Some(device) => {
let fut = device.parse_message(device_msg);
Box::pin(async move { fut.await })
}
None => ButtplugDeviceError::DeviceNotAvailable(device_msg.get_device_index()).into(),
}
}
fn parse_device_manager_message(
&self,
manager_msg: ButtplugDeviceManagerMessageUnion,
) -> ButtplugServerResultFuture {
match manager_msg {
ButtplugDeviceManagerMessageUnion::RequestDeviceList(msg) => {
let devices = self
.devices
.iter()
.map(|device| {
let dev = device.value();
DeviceMessageInfo {
device_index: *device.key(),
device_name: dev.name(),
device_messages: dev.message_attributes(),
}
})
.collect();
let mut device_list = DeviceList::new(devices);
device_list.set_id(msg.get_id());
Box::pin(future::ready(Ok(device_list.into())))
}
ButtplugDeviceManagerMessageUnion::StopAllDevices(_) => self.stop_all_devices(),
ButtplugDeviceManagerMessageUnion::StartScanning(_) => self.start_scanning(),
ButtplugDeviceManagerMessageUnion::StopScanning(_) => self.stop_scanning(),
}
}
pub fn parse_message(&self, msg: ButtplugClientMessage) -> ButtplugServerResultFuture {
match ButtplugDeviceCommandMessageUnion::try_from(msg.clone()) {
Ok(device_msg) => self.parse_device_message(device_msg),
Err(_) => match ButtplugDeviceManagerMessageUnion::try_from(msg.clone()) {
Ok(manager_msg) => self.parse_device_manager_message(manager_msg),
Err(_) => ButtplugMessageError::UnexpectedMessageType(format!("{:?}", msg)).into(),
},
}
}
pub fn add_comm_manager<T>(&self) -> Result<(), ButtplugServerStartupError>
where
T: 'static + DeviceCommunicationManager + DeviceCommunicationManagerCreator,
{
let mgr = T::new(self.sender.clone());
if self.comm_managers.contains_key(mgr.name()) {
return Err(ButtplugServerStartupError::DeviceManagerTypeAlreadyAdded(
mgr.name().to_owned(),
));
}
let status = mgr.scanning_status();
let sender = self.sender.clone();
async_manager::spawn(async move {
sender
.send(DeviceCommunicationEvent::DeviceManagerAdded(status))
.await
.unwrap();
})
.unwrap();
self
.comm_managers
.insert(mgr.name().to_owned(), Box::new(mgr));
Ok(())
}
pub fn add_test_comm_manager(
&self,
) -> Result<TestDeviceCommunicationManagerHelper, ButtplugServerStartupError> {
let mgr = TestDeviceCommunicationManager::new(self.sender.clone());
if self.comm_managers.contains_key(mgr.name()) {
return Err(ButtplugServerStartupError::DeviceManagerTypeAlreadyAdded(
mgr.name().to_owned(),
));
}
let status = mgr.scanning_status();
let sender = self.sender.clone();
async_manager::spawn(async move {
sender
.send(DeviceCommunicationEvent::DeviceManagerAdded(status))
.await
.unwrap();
})
.unwrap();
let helper = mgr.helper();
self
.comm_managers
.insert(mgr.name().to_owned(), Box::new(mgr));
Ok(helper)
}
}
impl Drop for DeviceManager {
fn drop(&mut self) {
info!("Dropping device manager!");
}
}