#[macro_use]
extern crate log;
pub mod client_event_loop;
pub mod client_message_sorter;
pub mod connector;
pub mod device;
pub mod serializer;
use buttplug_core::{
connector::{ButtplugConnector, ButtplugConnectorError},
errors::{ButtplugError, ButtplugHandshakeError},
message::{
BUTTPLUG_CURRENT_API_MAJOR_VERSION,
BUTTPLUG_CURRENT_API_MINOR_VERSION,
ButtplugClientMessageV4,
ButtplugServerMessageV4,
InputType,
PingV0,
RequestDeviceListV0,
RequestServerInfoV4,
StartScanningV0,
StopCmdV4,
StopScanningV0,
},
util::stream::convert_broadcast_receiver_to_stream,
};
use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
use dashmap::DashMap;
pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
use futures::{
Stream,
channel::oneshot,
future::{self, BoxFuture, FutureExt},
};
use log::*;
use std::{
collections::BTreeMap,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use strum_macros::Display;
use thiserror::Error;
use tokio::sync::{Mutex, broadcast, mpsc};
type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
pub(crate) type ButtplugServerMessageSender = oneshot::Sender<ButtplugServerMessageResult>;
pub struct ButtplugClientMessageFuturePair {
pub(crate) msg: ButtplugClientMessageV4,
pub(crate) sender: Option<ButtplugServerMessageSender>,
}
impl ButtplugClientMessageFuturePair {
pub fn new(msg: ButtplugClientMessageV4, sender: ButtplugServerMessageSender) -> Self {
Self {
msg,
sender: Some(sender),
}
}
}
#[derive(Debug, Error, Display)]
pub enum ButtplugClientError {
#[error(transparent)]
ButtplugConnectorError(#[from] ButtplugConnectorError),
#[error(transparent)]
ButtplugError(#[from] ButtplugError),
ButtplugOutputCommandConversionError(String),
ButtplugMultipleInputAvailableError(InputType),
}
#[derive(Clone, Debug)]
pub enum ButtplugClientEvent {
ScanningFinished,
DeviceListReceived,
DeviceAdded(ButtplugClientDevice),
DeviceRemoved(ButtplugClientDevice),
PingTimeout,
ServerConnect,
ServerDisconnect,
Error(ButtplugError),
}
impl Unpin for ButtplugClientEvent {
}
pub(crate) fn create_boxed_future_client_error<T>(
err: ButtplugError,
) -> ButtplugClientResultFuture<T>
where
T: 'static + Send + Sync,
{
future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
}
#[derive(Clone, Debug)]
pub(crate) struct ButtplugClientMessageSender {
message_sender: mpsc::Sender<ButtplugClientRequest>,
connected: Arc<AtomicBool>,
}
impl ButtplugClientMessageSender {
fn new(message_sender: mpsc::Sender<ButtplugClientRequest>, connected: &Arc<AtomicBool>) -> Self {
Self {
message_sender,
connected: connected.clone(),
}
}
pub fn send_message_to_event_loop(
&self,
msg: ButtplugClientRequest,
) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
let message_sender = self.message_sender.clone();
async move {
message_sender
.send(msg)
.await
.map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
Ok(())
}
.boxed()
}
pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
if !self.connected.load(Ordering::Relaxed) {
future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
} else {
self.send_message_ignore_connect_status(msg)
}
}
pub fn send_message_ignore_connect_status(
&self,
msg: ButtplugClientMessageV4,
) -> ButtplugServerMessageResultFuture {
let (tx, rx) = oneshot::channel();
let internal_msg =
ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(msg, tx));
let send_fut = self.send_message_to_event_loop(internal_msg);
async move {
send_fut.await?;
rx.await
.map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?
}
.boxed()
}
pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
let send_fut = self.send_message(msg);
async move { send_fut.await.map(|_| ()) }.boxed()
}
}
pub struct ButtplugClient {
client_name: String,
server_name: Arc<Mutex<Option<String>>>,
event_stream: broadcast::Sender<ButtplugClientEvent>,
message_sender: ButtplugClientMessageSender,
request_receiver: Arc<Mutex<Option<mpsc::Receiver<ButtplugClientRequest>>>>,
connected: Arc<AtomicBool>,
device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
}
impl ButtplugClient {
pub fn new(name: &str) -> Self {
let (request_sender, request_receiver) = mpsc::channel(256);
let (event_stream, _) = broadcast::channel(256);
let connected = Arc::new(AtomicBool::new(false));
Self {
client_name: name.to_owned(),
server_name: Arc::new(Mutex::new(None)),
event_stream,
message_sender: ButtplugClientMessageSender::new(request_sender, &connected),
request_receiver: Arc::new(Mutex::new(Some(request_receiver))),
connected,
device_map: Arc::new(DashMap::new()),
}
}
pub async fn connect<ConnectorType>(
&self,
mut connector: ConnectorType,
) -> Result<(), ButtplugClientError>
where
ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
{
if self.connected() {
return Err(ButtplugClientError::ButtplugConnectorError(
ButtplugConnectorError::ConnectorAlreadyConnected,
));
}
self.device_map.clear();
let request_receiver = self.request_receiver.lock().await.take().ok_or(
ButtplugConnectorError::ConnectorGenericError(
"Cannot reconnect - request channel already consumed. Create a new client.".to_string(),
),
)?;
info!("Connecting to server.");
let (connector_sender, connector_receiver) = mpsc::channel(256);
connector.connect(connector_sender).await.map_err(|e| {
error!("Connection to server failed: {:?}", e);
ButtplugClientError::from(e)
})?;
info!("Connection to server succeeded.");
let mut client_event_loop = ButtplugClientEventLoop::new(
self.connected.clone(),
connector,
connector_receiver,
self.event_stream.clone(),
self.message_sender.clone(),
request_receiver,
self.device_map.clone(),
);
buttplug_core::spawn!("ButtplugClient event loop", async move {
client_event_loop.run().await;
});
self.run_handshake().await
}
async fn run_handshake(&self) -> ButtplugClientResult {
info!("Running handshake with server.");
let msg = self
.message_sender
.send_message_ignore_connect_status(
RequestServerInfoV4::new(
&self.client_name,
BUTTPLUG_CURRENT_API_MAJOR_VERSION,
BUTTPLUG_CURRENT_API_MINOR_VERSION,
)
.into(),
)
.await?;
debug!("Got ServerInfo return.");
if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
info!("Connected to {}", server_info.server_name());
*self.server_name.lock().await = Some(server_info.server_name().clone());
self.connected.store(true, Ordering::Relaxed);
let msg = self
.message_sender
.send_message(RequestDeviceListV0::default().into())
.await?;
if let ButtplugServerMessageV4::DeviceList(m) = msg {
self
.message_sender
.send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
.await?;
}
Ok(())
} else {
self.disconnect().await?;
Err(ButtplugClientError::ButtplugError(
ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
))
}
}
pub fn connected(&self) -> bool {
self.connected.load(Ordering::Relaxed)
}
pub fn disconnect(&self) -> ButtplugClientResultFuture {
if !self.connected() {
return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
}
let (tx, rx) = oneshot::channel();
let msg = ButtplugClientRequest::Disconnect(tx);
let send_fut = self.message_sender.send_message_to_event_loop(msg);
let connected = self.connected.clone();
async move {
connected.store(false, Ordering::Relaxed);
send_fut.await?;
let _ = rx.await;
Ok(())
}
.boxed()
}
pub fn start_scanning(&self) -> ButtplugClientResultFuture {
self
.message_sender
.send_message_expect_ok(StartScanningV0::default().into())
}
pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
self
.message_sender
.send_message_expect_ok(StopScanningV0::default().into())
}
pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
self
.message_sender
.send_message_expect_ok(StopCmdV4::default().into())
}
pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
Box::pin(stream)
}
pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
self
.device_map
.iter()
.map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
.collect()
}
pub fn ping(&self) -> ButtplugClientResultFuture {
let ping_fut = self
.message_sender
.send_message_expect_ok(PingV0::default().into());
ping_fut.boxed()
}
pub fn server_name(&self) -> Option<String> {
if let Ok(name) = self.server_name.try_lock() {
name.clone()
} else {
None
}
}
}