pub mod connector;
pub mod device;
pub mod internal;
mod messagesorter;
use connector::{
ButtplugClientConnectionFuture, ButtplugClientConnector, ButtplugClientConnectorError,
};
use device::ButtplugClientDevice;
use internal::{
ButtplugClientInternalLoop, ButtplugClientMessageFuture, ButtplugInternalClientMessage,
};
use crate::core::{
errors::{ButtplugError, ButtplugHandshakeError, ButtplugMessageError},
messages::{ButtplugMessage, ButtplugMessageUnion, RequestServerInfo, StartScanning},
};
use async_std::{
future::join,
sync::{channel, Receiver, Sender},
};
use futures::{Future, StreamExt};
use std::error::Error;
use std::fmt;
#[derive(Clone)]
pub enum ButtplugClientEvent {
ScanningFinished,
DeviceAdded(ButtplugClientDevice),
DeviceRemoved(ButtplugClientDevice),
Log,
PingTimeout,
ServerDisconnect,
}
#[derive(Debug, Clone)]
pub enum ButtplugClientError {
ButtplugClientConnectorError(ButtplugClientConnectorError),
ButtplugError(ButtplugError),
}
impl fmt::Display for ButtplugClientError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ButtplugClientError::ButtplugError(ref e) => e.fmt(f),
ButtplugClientError::ButtplugClientConnectorError(ref e) => e.fmt(f),
}
}
}
impl Error for ButtplugClientError {
fn description(&self) -> &str {
match *self {
ButtplugClientError::ButtplugError(ref e) => e.description(),
ButtplugClientError::ButtplugClientConnectorError(ref e) => e.description(),
}
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
#[derive(Clone)]
pub struct ButtplugClient {
pub client_name: String,
pub server_name: Option<String>,
devices: Vec<ButtplugClientDevice>,
message_sender: Sender<ButtplugInternalClientMessage>,
event_receiver: Receiver<ButtplugMessageUnion>,
connected: bool,
}
unsafe impl Sync for ButtplugClient {}
unsafe impl Send for ButtplugClient {}
impl ButtplugClient {
pub fn run<F, T>(name: &str, func: F) -> impl Future
where
F: FnOnce(ButtplugClient) -> T,
T: Future,
{
debug!("Run called!");
let (event_sender, event_receiver) = channel(256);
let (message_sender, message_receiver) = channel(256);
let client = ButtplugClient {
client_name: name.to_string(),
server_name: None,
devices: vec![],
event_receiver,
message_sender,
connected: false,
};
let app_future = func(client);
async move {
let mut internal_loop = ButtplugClientInternalLoop::new(event_sender, message_receiver);
let internal_loop_future = internal_loop.event_loop();
join!(app_future, internal_loop_future).await;
}
}
pub async fn connect(
&mut self,
connector: impl ButtplugClientConnector + 'static,
) -> Option<ButtplugClientError> {
debug!("Running client connection.");
let fut = ButtplugClientConnectionFuture::default();
let msg =
ButtplugInternalClientMessage::Connect(Box::new(connector), fut.get_state_clone());
self.send_internal_message(msg).await;
debug!("Waiting on internal loop to connect");
if let Some(err) = fut.await {
return Some(ButtplugClientError::ButtplugClientConnectorError(err));
}
info!("Client connected to server, running handshake.");
self.connected = true;
self.handshake().await
}
async fn handshake(&mut self) -> Option<ButtplugClientError> {
info!("Running handshake with server.");
let res = self
.send_message(&RequestServerInfo::new(&self.client_name, 1).as_union())
.await;
match res {
Ok(msg) => {
debug!("Got ServerInfo return.");
if let ButtplugMessageUnion::ServerInfo(server_info) = msg {
info!("Connected to {}", server_info.server_name);
self.server_name = Option::Some(server_info.server_name);
None
} else {
Some(ButtplugClientError::ButtplugError(
ButtplugError::ButtplugHandshakeError(ButtplugHandshakeError {
message: "Did not receive expected ServerInfo or Error messages."
.to_string(),
}),
))
}
}
Err(_) => None,
}
}
pub fn connected(&self) -> bool {
self.connected
}
pub fn disconnect(&mut self) -> Option<ButtplugClientError> {
self.connected = false;
None
}
pub async fn start_scanning(&mut self) -> Option<ButtplugClientError> {
self.send_message_expect_ok(&ButtplugMessageUnion::StartScanning(StartScanning::new()))
.await
}
async fn send_internal_message(&mut self, msg: ButtplugInternalClientMessage) {
self.message_sender.send(msg).await;
}
async fn send_message(
&mut self,
msg: &ButtplugMessageUnion,
) -> Result<ButtplugMessageUnion, ButtplugClientError> {
if !self.connected {
return Err(ButtplugClientError::ButtplugClientConnectorError(
ButtplugClientConnectorError {
message: "Client not Connected.".to_string(),
},
));
}
let fut = ButtplugClientMessageFuture::default();
let internal_msg =
ButtplugInternalClientMessage::Message((msg.clone(), fut.get_state_clone()));
self.send_internal_message(internal_msg).await;
Ok(fut.await)
}
async fn send_message_expect_ok(
&mut self,
msg: &ButtplugMessageUnion,
) -> Option<ButtplugClientError> {
let msg = self.send_message(msg).await;
match msg.unwrap() {
ButtplugMessageUnion::Ok(_) => None,
_ => Some(ButtplugClientError::ButtplugError(
ButtplugError::ButtplugMessageError(ButtplugMessageError {
message: "Got non-Ok message back".to_string(),
}),
)),
}
}
pub async fn wait_for_event(&mut self) -> Vec<ButtplugClientEvent> {
debug!("Client waiting for event.");
let mut events = vec![];
match self.event_receiver.next().await.unwrap() {
ButtplugMessageUnion::ScanningFinished(_) => {}
ButtplugMessageUnion::DeviceList(_msg) => {
for info in _msg.devices.iter() {
let device =
ButtplugClientDevice::from((&info.clone(), self.message_sender.clone()));
self.devices.push(device.clone());
events.push(ButtplugClientEvent::DeviceAdded(device));
}
}
ButtplugMessageUnion::DeviceAdded(_msg) => {
info!("Got a device added message!");
let device = ButtplugClientDevice::from((&_msg, self.message_sender.clone()));
self.devices.push(device.clone());
info!("Sending to observers!");
events.push(ButtplugClientEvent::DeviceAdded(device));
info!("Observers sent!");
}
ButtplugMessageUnion::DeviceRemoved(_) => {}
_ => panic!("Unhandled incoming message!"),
}
events
}
}
#[cfg(test)]
mod test {
use super::ButtplugClient;
use crate::client::connector::ButtplugEmbeddedClientConnector;
use async_std::task;
use env_logger;
async fn connect_test_client(client: &mut ButtplugClient) {
let _ = env_logger::builder().is_test(true).try_init();
assert!(client
.connect(ButtplugEmbeddedClientConnector::new("Test Server", 0))
.await
.is_none());
assert!(client.connected());
}
#[test]
fn test_connect_status() {
task::block_on(async {
ButtplugClient::run("Test Client", |mut client| {
async move {
connect_test_client(&mut client).await;
}
})
.await;
});
}
#[test]
fn test_disconnect_status() {
task::block_on(async {
ButtplugClient::run("Test Client", |mut client| {
async move {
connect_test_client(&mut client).await;
assert!(client.disconnect().is_none());
assert!(!client.connected());
}
})
.await;
});
}
#[test]
fn test_connect_init() {
task::block_on(async {
ButtplugClient::run("Test Client", |mut client| {
async move {
connect_test_client(&mut client).await;
assert_eq!(client.server_name.as_ref().unwrap(), "Test Server");
}
})
.await;
});
}
#[test]
fn test_start_scanning() {
task::block_on(async {
ButtplugClient::run("Test Client", |mut client| {
async move {
connect_test_client(&mut client).await;
assert!(client.start_scanning().await.is_none());
}
})
.await;
});
}
}