mod client_message_sorter;
pub mod device;
pub mod internal;
use device::ButtplugClientDevice;
use internal::{client_event_loop, ButtplugClientDeviceInternal, ButtplugClientRequest};
use crate::{
connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorFuture},
core::{
errors::{ButtplugError, ButtplugHandshakeError},
messages::{
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
ButtplugMessageSpecVersion,
DeviceMessageInfo,
RequestDeviceList,
RequestServerInfo,
StartScanning,
StopAllDevices,
StopScanning,
},
},
util::{
async_manager,
future::{ButtplugFuture, ButtplugFutureStateShared},
},
};
use async_channel::Sender;
use dashmap::DashMap;
use futures::{
future::{self, BoxFuture},
FutureExt,
StreamExt,
};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use thiserror::Error;
use tracing::{span::Span, Level};
use tracing_futures::Instrument;
type ButtplugInternalClientResult<T = ()> = Result<T, ButtplugClientError>;
type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
pub type ButtplugInternalClientMessageResult =
ButtplugInternalClientResult<ButtplugCurrentSpecServerMessage>;
pub type ButtplugInternalClientMessageResultFuture =
BoxFuture<'static, ButtplugInternalClientMessageResult>;
pub(crate) type ButtplugClientMessageStateShared =
ButtplugFutureStateShared<ButtplugInternalClientMessageResult>;
pub(crate) type ButtplugClientMessageFuture = ButtplugFuture<ButtplugInternalClientMessageResult>;
pub struct ButtplugClientMessageFuturePair {
pub msg: ButtplugCurrentSpecClientMessage,
pub waker: ButtplugClientMessageStateShared,
}
impl ButtplugClientMessageFuturePair {
pub fn new(
msg: ButtplugCurrentSpecClientMessage,
waker: ButtplugClientMessageStateShared,
) -> Self {
Self { msg, waker }
}
}
#[derive(Debug, Error)]
pub enum ButtplugClientError {
#[error(transparent)]
ButtplugConnectorError(#[from] ButtplugConnectorError),
#[error(transparent)]
ButtplugError(#[from] ButtplugError),
}
#[derive(Clone)]
pub enum ButtplugClientEvent {
ScanningFinished,
DeviceAdded(ButtplugClientDevice),
DeviceRemoved(DeviceMessageInfo),
PingTimeout,
ServerDisconnect,
Error(ButtplugError),
}
pub struct ButtplugClient {
pub client_name: String,
pub server_name: String,
message_sender: Sender<ButtplugClientRequest>,
connected: Arc<AtomicBool>,
_client_span: Span,
device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
}
unsafe impl Send for ButtplugClient {
}
unsafe impl Sync for ButtplugClient {
}
impl ButtplugClient {
pub fn connect<ConnectorType>(
name: &str,
mut connector: ConnectorType,
) -> BoxFuture<
'static,
Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>,
>
where
ConnectorType: ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
+ 'static,
{
trace!("run() called, creating client future.");
let client_name = name.to_string();
Box::pin(async move {
let span = span!(Level::INFO, "Client");
let _client_span = span.enter();
info!("Connecting to server.");
let connector_receiver = connector.connect().await.map_err(|e| {
error!("Connection to server failed: {:?}", e);
let err: ButtplugClientError = e.into();
err
})?;
info!("Connection to server succeeded.");
let (client_event_loop_fut, device_map_reader, message_sender, event_channel) =
client_event_loop(connector, connector_receiver);
let client_event_receiver = event_channel.clone();
let mut disconnect_event_receiver = event_channel.clone();
let connected_status = Arc::new(AtomicBool::new(true));
let connected_status_clone = connected_status.clone();
async_manager::spawn(
async move {
let disconnect_fut = async move {
loop {
if let Some(ButtplugClientEvent::ServerDisconnect) =
disconnect_event_receiver.next().await
{
connected_status.store(false, Ordering::SeqCst);
break;
}
}
Result::<(), ButtplugClientError>::Ok(())
}
.instrument(tracing::info_span!("Client Disconnect Loop"));
select! {
_ = client_event_loop_fut.fuse() => (),
_ = disconnect_fut.fuse() => (),
};
}
.instrument(tracing::info_span!("Client Loop Span")),
)
.unwrap();
let client = ButtplugClient::create_client(
&client_name,
connected_status_clone,
message_sender,
device_map_reader,
span.clone(),
)
.await?;
Ok((client, client_event_receiver))
})
}
#[cfg(feature = "server")]
pub fn connect_in_process(
name: &str,
options: &crate::server::ButtplugServerOptions,
) -> BoxFuture<
'static,
Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>,
> {
use crate::connector::ButtplugInProcessClientConnector;
let connector = match ButtplugInProcessClientConnector::new_with_options(options) {
Ok(conn) => conn,
Err(err) => return Box::pin(future::ready(Err(ButtplugClientError::ButtplugError(err)))),
};
#[cfg(feature = "btleplug-manager")]
{
use crate::server::comm_managers::btleplug::BtlePlugCommunicationManager;
connector
.server_ref()
.add_comm_manager::<BtlePlugCommunicationManager>()
.unwrap();
}
#[cfg(feature = "serial-manager")]
{
use crate::server::comm_managers::serialport::SerialPortCommunicationManager;
connector
.server_ref()
.add_comm_manager::<SerialPortCommunicationManager>()
.unwrap();
}
#[cfg(feature = "lovense-dongle-manager")]
{
use crate::server::comm_managers::lovense_dongle::{
LovenseHIDDongleCommunicationManager,
LovenseSerialDongleCommunicationManager,
};
connector
.server_ref()
.add_comm_manager::<LovenseHIDDongleCommunicationManager>()
.unwrap();
connector
.server_ref()
.add_comm_manager::<LovenseSerialDongleCommunicationManager>()
.unwrap();
}
#[cfg(all(feature = "xinput-manager", target_os = "windows"))]
{
use crate::server::comm_managers::xinput::XInputDeviceCommunicationManager;
connector
.server_ref()
.add_comm_manager::<XInputDeviceCommunicationManager>()
.unwrap();
}
ButtplugClient::connect(name, connector)
}
async fn create_client(
client_name: &str,
connected_status: Arc<AtomicBool>,
message_sender: Sender<ButtplugClientRequest>,
device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
span: Span,
) -> Result<Self, ButtplugClientError> {
let mut client = ButtplugClient {
client_name: client_name.to_string(),
server_name: String::new(),
message_sender,
connected: connected_status,
device_map,
_client_span: span,
};
info!("Running handshake with server.");
let msg = client
.send_message(
RequestServerInfo::new(&client.client_name, ButtplugMessageSpecVersion::Version2).into(),
)
.await?;
debug!("Got ServerInfo return.");
if let ButtplugCurrentSpecServerMessage::ServerInfo(server_info) = msg {
info!("Connected to {}", server_info.server_name);
client.server_name = server_info.server_name;
let msg = client
.send_message(RequestDeviceList::default().into())
.await?;
if let ButtplugCurrentSpecServerMessage::DeviceList(m) = msg {
client
.send_internal_message(ButtplugClientRequest::HandleDeviceList(m))
.await?;
}
Ok(client)
} else {
client.disconnect().await?;
Err(ButtplugClientError::ButtplugError(
ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{:?}", msg)).into(),
))
}
}
pub fn connected(&self) -> bool {
self.connected.load(Ordering::SeqCst)
}
pub fn disconnect(&self) -> ButtplugClientResultFuture {
let fut = ButtplugConnectorFuture::default();
let msg = ButtplugClientRequest::Disconnect(fut.get_state_clone());
let send_fut = self.send_internal_message(msg);
let connected = self.connected.clone();
Box::pin(async move {
send_fut.await?;
connected.store(false, Ordering::SeqCst);
Ok(())
})
}
pub fn start_scanning(&self) -> ButtplugClientResultFuture {
self.send_message_expect_ok(StartScanning::default().into())
}
pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
self.send_message_expect_ok(StopScanning::default().into())
}
pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
self.send_message_expect_ok(StopAllDevices::default().into())
}
fn send_internal_message(
&self,
msg: ButtplugClientRequest,
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
if !self.connected.load(Ordering::SeqCst) {
return Box::pin(future::ready(Err(
ButtplugConnectorError::ConnectorNotConnected,
)));
}
let message_sender = self.message_sender.clone();
Box::pin(async move {
message_sender
.send(msg)
.await
.map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
Ok(())
})
}
fn send_message(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugInternalClientMessageResultFuture {
let fut = ButtplugClientMessageFuture::default();
let internal_msg = ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(
msg,
fut.get_state_clone(),
));
let send_fut = self.send_internal_message(internal_msg);
Box::pin(async move {
send_fut.await?;
fut.await
})
}
fn send_message_expect_ok(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture {
let send_fut = self.send_message(msg);
Box::pin(async move { send_fut.await.map(|_| ()).map_err(|err| err) })
}
pub fn devices(&self) -> Vec<ButtplugClientDevice> {
info!("Request devices from inner loop!");
let mut device_clones = vec![];
for device in self.device_map.iter() {
device_clones.push(ButtplugClientDevice::from((
&(*device.device),
self.message_sender.clone(),
(*device.channel).clone(),
)));
}
device_clones
}
}