use super::{
ButtplugClientEvent,
ButtplugClientMessageFuturePair,
ButtplugClientMessageSender,
client_message_sorter::ClientMessageSorter,
device::{ButtplugClientDevice, ButtplugClientDeviceEvent},
};
use buttplug_core::{
connector::{ButtplugConnector, ButtplugConnectorStateSender},
errors::ButtplugError,
message::{
ButtplugClientMessageV4,
ButtplugDeviceMessage,
ButtplugMessageValidator,
ButtplugServerMessageV4,
DeviceListV4,
DeviceMessageInfoV4,
},
};
use dashmap::DashMap;
use log::*;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use tokio::{
select,
sync::{broadcast, mpsc},
};
pub enum ButtplugClientRequest {
Disconnect(ButtplugConnectorStateSender),
HandleDeviceList(DeviceListV4),
Message(ButtplugClientMessageFuturePair),
}
pub(super) struct ButtplugClientEventLoop<ConnectorType>
where
ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
{
connected_status: Arc<AtomicBool>,
connector: ConnectorType,
from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>,
device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
to_client_sender: broadcast::Sender<ButtplugClientEvent>,
from_client_sender: ButtplugClientMessageSender,
from_client_receiver: mpsc::Receiver<ButtplugClientRequest>,
sorter: ClientMessageSorter,
}
impl<ConnectorType> ButtplugClientEventLoop<ConnectorType>
where
ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
{
pub fn new(
connected_status: Arc<AtomicBool>,
connector: ConnectorType,
from_connector_receiver: mpsc::Receiver<ButtplugServerMessageV4>,
to_client_sender: broadcast::Sender<ButtplugClientEvent>,
from_client_sender: ButtplugClientMessageSender,
from_client_receiver: mpsc::Receiver<ButtplugClientRequest>,
device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
) -> Self {
trace!("Creating ButtplugClientEventLoop instance.");
Self {
connected_status,
device_map,
from_client_receiver,
from_client_sender,
to_client_sender,
from_connector_receiver,
connector,
sorter: ClientMessageSorter::default(),
}
}
fn create_client_device(&mut self, info: &DeviceMessageInfoV4) -> ButtplugClientDevice {
debug!(
"Trying to create a client device from DeviceMessageInfo: {:?}",
info
);
match self.device_map.get(&info.device_index()) {
Some(dev) => {
debug!("Device already exists, creating clone.");
dev.clone()
}
None => {
debug!("Device does not exist, creating new entry.");
let device = ButtplugClientDevice::new_from_device_info(info, &self.from_client_sender);
self.device_map.insert(info.device_index(), device.clone());
device
}
}
}
fn send_client_event(&mut self, event: ButtplugClientEvent) {
trace!("Forwarding event {:?} to client", event);
if self.to_client_sender.receiver_count() == 0 {
error!(
"Client event {:?} dropped, no client event listener available.",
event
);
return;
}
self
.to_client_sender
.send(event)
.expect("Already checked for receivers.");
}
fn disconnect_device(&mut self, device_index: u32) {
if !self.device_map.contains_key(&device_index) {
return;
}
let device = (*self
.device_map
.get(&device_index)
.expect("Checked for device index already."))
.clone();
device.set_device_connected(false);
device.queue_event(ButtplugClientDeviceEvent::DeviceRemoved);
self.device_map.remove(&device_index);
self.send_client_event(ButtplugClientEvent::DeviceRemoved(device));
}
async fn parse_connector_message(&mut self, msg: ButtplugServerMessageV4) {
if self.sorter.maybe_resolve_result(&msg) {
trace!("Message future found, returning");
return;
}
if let Err(e) = msg.is_valid() {
error!("Message not valid: {:?} - Error: {}", msg, e);
self.send_client_event(ButtplugClientEvent::Error(ButtplugError::from(e)));
return;
}
trace!("Message future not found, assuming server event.");
info!("{:?}", msg);
match msg {
ButtplugServerMessageV4::DeviceList(list) => {
trace!("Got device list, devices either added or removed");
for dev in list.devices() {
if self.device_map.contains_key(&dev.1.device_index()) {
continue;
}
trace!("Device added, updating map and sending to client");
let info = dev.1.clone();
let device = self.create_client_device(&info);
self.send_client_event(ButtplugClientEvent::DeviceAdded(device));
}
let new_indexes: Vec<u32> = list.devices().iter().map(|x| x.1.device_index()).collect();
let disconnected_indexes: Vec<u32> = self
.device_map
.iter()
.filter(|x| !new_indexes.contains(x.key()))
.map(|x| *x.key())
.collect();
for index in disconnected_indexes {
trace!("Device removed, updating map and sending to client");
self.disconnect_device(index);
}
}
ButtplugServerMessageV4::ScanningFinished(_) => {
trace!("Scanning finished event received, forwarding to client.");
self.send_client_event(ButtplugClientEvent::ScanningFinished);
}
ButtplugServerMessageV4::InputReading(msg) => {
let device_idx = msg.device_index();
if let Some(device) = self.device_map.get(&device_idx) {
device
.value()
.queue_event(ButtplugClientDeviceEvent::Message(
ButtplugServerMessageV4::from(msg),
));
}
}
ButtplugServerMessageV4::Error(e) => {
self.send_client_event(ButtplugClientEvent::Error(e.into()));
}
_ => error!("Cannot process message, dropping: {:?}", msg),
}
}
async fn send_message(&mut self, mut msg_fut: ButtplugClientMessageFuturePair) {
if let Err(e) = &msg_fut.msg.is_valid() {
error!("Message not valid: {:?} - Error: {}", msg_fut.msg, e);
if let Some(sender) = msg_fut.sender.take() {
let _ = sender.send(Err(ButtplugError::from(e.clone()).into()));
}
return;
}
trace!("Sending message to connector: {:?}", msg_fut.msg);
self.sorter.register_future(&mut msg_fut);
if self.connector.send(msg_fut.msg.clone()).await.is_err() {
error!("Sending message failed, connector most likely no longer connected.");
}
}
async fn parse_client_request(&mut self, msg: ButtplugClientRequest) -> bool {
match msg {
ButtplugClientRequest::Message(msg_fut) => {
trace!("Sending message through connector: {:?}", msg_fut.msg);
self.send_message(msg_fut).await;
true
}
ButtplugClientRequest::Disconnect(sender) => {
trace!("Client requested disconnect");
let _ = sender.send(self.connector.disconnect().await);
false
}
ButtplugClientRequest::HandleDeviceList(device_list) => {
trace!("Device list received, updating map.");
for (i, device) in device_list.devices() {
if self.device_map.contains_key(i) {
continue;
}
let device = self.create_client_device(device);
self.send_client_event(ButtplugClientEvent::DeviceAdded(device));
}
self.send_client_event(ButtplugClientEvent::DeviceListReceived);
true
}
}
}
pub async fn run(&mut self) {
debug!("Running client event loop.");
loop {
select! {
event = self.from_connector_receiver.recv() => match event {
None => {
info!("Connector disconnected, exiting loop.");
break;
}
Some(msg) => {
self.parse_connector_message(msg).await;
}
},
client = self.from_client_receiver.recv() => match client {
None => {
info!("Client disconnected, exiting loop.");
break;
}
Some(msg) => {
if !self.parse_client_request(msg).await {
break;
}
}
},
};
}
self
.device_map
.iter()
.for_each(|val| val.value().set_client_connected(false));
let device_indexes: Vec<u32> = self.device_map.iter().map(|k| *k.key()).collect();
device_indexes
.iter()
.for_each(|k| self.disconnect_device(*k));
self.connected_status.store(false, Ordering::Relaxed);
self.send_client_event(ButtplugClientEvent::ServerDisconnect);
debug!("Exiting client event loop.");
}
}