use super::{
client_message_sorter::ClientMessageSorter,
device::{ButtplugClientDevice, ButtplugClientDeviceEvent},
ButtplugClientError,
ButtplugClientEvent,
ButtplugClientMessageFuturePair,
};
use crate::{
connector::{ButtplugConnector, ButtplugConnectorStateShared},
core::{
errors::ButtplugServerError,
messages::{
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
DeviceList,
DeviceMessageInfo,
},
},
};
use async_channel::{bounded, Receiver, Sender};
use broadcaster::BroadcastChannel;
use dashmap::DashMap;
use futures::{Future, FutureExt, StreamExt};
use std::{
hash::{Hash, Hasher},
sync::Arc,
};
use tracing_futures::Instrument;
pub(super) enum ButtplugClientRequest {
Disconnect(ButtplugConnectorStateShared),
HandleDeviceList(DeviceList),
Message(ButtplugClientMessageFuturePair),
}
pub(super) struct ButtplugClientDeviceInternal {
pub device: Arc<DeviceMessageInfo>,
pub channel: Arc<BroadcastChannel<ButtplugClientDeviceEvent>>,
}
impl Eq for ButtplugClientDeviceInternal {
}
impl ButtplugClientDeviceInternal {
pub fn new(
device: DeviceMessageInfo,
channel: BroadcastChannel<ButtplugClientDeviceEvent>,
) -> Self {
Self {
device: Arc::new(device),
channel: Arc::new(channel),
}
}
}
impl PartialEq for ButtplugClientDeviceInternal {
fn eq(&self, other: &Self) -> bool {
self.device.device_index == other.device.device_index
}
}
impl Hash for ButtplugClientDeviceInternal {
fn hash<H: Hasher>(&self, state: &mut H) {
self.device.device_index.hash(state);
self.device.device_name.hash(state);
}
}
struct ButtplugClientEventLoop<ConnectorType>
where
ConnectorType:
ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage> + 'static,
{
device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
event_sender: BroadcastChannel<ButtplugClientEvent>,
client_sender: Sender<ButtplugClientRequest>,
client_receiver: Receiver<ButtplugClientRequest>,
connector: ConnectorType,
connector_receiver: Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
sorter: ClientMessageSorter,
}
impl<ConnectorType> ButtplugClientEventLoop<ConnectorType>
where
ConnectorType:
ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage> + 'static,
{
pub fn new(
connector: ConnectorType,
connector_receiver: Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
event_sender: BroadcastChannel<ButtplugClientEvent>,
client_sender: Sender<ButtplugClientRequest>,
client_receiver: Receiver<ButtplugClientRequest>,
device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
) -> Self {
trace!("Creating ButtplugClientEventLoop instance.");
Self {
device_map,
client_sender,
client_receiver,
event_sender,
connector_receiver,
connector,
sorter: ClientMessageSorter::default(),
}
}
fn create_client_device(&mut self, info: &DeviceMessageInfo) -> ButtplugClientDevice {
debug!(
"Trying to create a client device from DeviceMessageInfo: {:?}",
info
);
match self.device_map.get(&info.device_index) {
Some(dev) => {
debug!("Device already exists, creating clone.");
ButtplugClientDevice::from((
&*dev.device,
self.client_sender.clone(),
(*dev.channel).clone(),
))
}
None => {
debug!("Device does not exist, creating new entry.");
let channel = BroadcastChannel::new();
let device =
ButtplugClientDevice::from((info, self.client_sender.clone(), channel.clone()));
self.device_map.insert(
info.device_index,
ButtplugClientDeviceInternal::new(info.clone(), channel),
);
device
}
}
}
async fn send_client_event(&mut self, event: &ButtplugClientEvent) {
trace!("Forwarding event to client");
self.event_sender.send(event).await.unwrap();
self.event_sender.recv().await.unwrap();
}
async fn parse_connector_message(
&mut self,
msg_result: Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>,
) {
if self.sorter.maybe_resolve_result(&msg_result).await {
trace!("Message future found, returning");
return;
}
match msg_result {
Ok(msg) => {
trace!("Message received from connector, sending to clients.");
trace!("Message future not found, assuming server event.");
match &msg {
ButtplugCurrentSpecServerMessage::DeviceAdded(dev) => {
trace!("Device added, updating map and sending to client");
let info = DeviceMessageInfo::from(dev);
let device = self.create_client_device(&info);
self
.send_client_event(&ButtplugClientEvent::DeviceAdded(device))
.await;
}
ButtplugCurrentSpecServerMessage::DeviceRemoved(dev) => {
if self.device_map.contains_key(&dev.device_index) {
trace!("Device removed, updating map and sending to client");
let info = (*self.device_map.get(&dev.device_index).unwrap().device).clone();
self.device_map.remove(&dev.device_index);
self
.send_client_event(&ButtplugClientEvent::DeviceRemoved(info))
.await;
} else {
error!("Received DeviceRemoved for non-existent device index");
}
}
ButtplugCurrentSpecServerMessage::ScanningFinished(_) => {
trace!("Scanning finished event received, forwarding to client.");
self
.send_client_event(&ButtplugClientEvent::ScanningFinished)
.await;
}
_ => error!("Cannot process message, dropping: {:?}", msg),
}
}
Err(err) => {
self
.send_client_event(&ButtplugClientEvent::Error(err.into()))
.await;
}
}
}
async fn send_message(&mut self, mut msg_fut: ButtplugClientMessageFuturePair) {
trace!("Sending message to connector: {:?}", msg_fut.msg);
self.sorter.register_future(&mut msg_fut);
self.connector.send(msg_fut.msg).await.unwrap();
}
async fn parse_client_request(&mut self, msg: ButtplugClientRequest) -> bool {
match msg {
ButtplugClientRequest::Message(msg_fut) => {
trace!("Sending message through connector: {:?}", msg_fut.msg);
self.send_message(msg_fut).await;
true
}
ButtplugClientRequest::Disconnect(state) => {
trace!("Client requested disconnect");
state.set_reply(self.connector.disconnect().await);
false
}
ButtplugClientRequest::HandleDeviceList(device_list) => {
trace!("Device list received, updating map.");
for d in &device_list.devices {
if self.device_map.contains_key(&d.device_index) {
continue;
}
let device = self.create_client_device(&d);
self
.send_client_event(&ButtplugClientEvent::DeviceAdded(device))
.await;
}
true
}
}
}
pub async fn run(&mut self) {
debug!("Running client event loop.");
let mut client_receiver = self.client_receiver.clone();
let mut connector_receiver = self.connector_receiver.clone();
loop {
select! {
event = connector_receiver.next().fuse() => match event {
None => {
info!("Connector disconnected, exiting loop.");
return;
}
Some(msg) => {
self.parse_connector_message(msg).await;
}
},
client = client_receiver.next().fuse() => match client {
None => {
info!("Client disconnected, exiting loop.");
return;
}
Some(msg) => {
if !self.parse_client_request(msg).await {
break;
}
}
},
};
}
debug!("Exiting client event loop.");
}
}
pub(super) fn client_event_loop(
connector: impl ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
+ 'static,
connector_receiver: Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
) -> (
impl Future<Output = Result<(), ButtplugClientError>>,
Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
Sender<ButtplugClientRequest>,
// This needs clone internally, as the client will make multiple copies.
impl StreamExt<Item = ButtplugClientEvent> + Clone,
) {
trace!("Creating client event loop future.");
let event_channel = BroadcastChannel::new();
let device_map = Arc::new(DashMap::new());
let device_map_clone = device_map.clone();
let (client_sender, client_receiver) = bounded(256);
let client_sender_clone = client_sender.clone();
let event_loop_sender = event_channel.clone();
let mut event_loop = ButtplugClientEventLoop::new(
connector,
connector_receiver,
event_loop_sender,
client_sender,
client_receiver,
device_map,
);
(
Box::pin(async move {
info!("Starting client event loop.");
event_loop
.run()
.instrument(tracing::info_span!("Client Event Loop"))
.await;
info!("Stopping client event loop.");
Ok(())
}),
device_map_clone,
client_sender_clone,
event_channel,
)
}