use super::{
ButtplugClientError,
ButtplugClientResultFuture,
ButtplugClientRequest,
};
use crate::{
client::{ButtplugClientMessageFuture, ButtplugClientMessageFuturePair},
connector::ButtplugConnectorError,
core::{
errors::{ButtplugDeviceError, ButtplugError, ButtplugMessageError},
messages::{
ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage, ButtplugMessage,
ButtplugDeviceMessageType, DeviceMessageInfo, LinearCmd, MessageAttributesMap, RotateCmd,
RotationSubcommand, StopDeviceCmd, VectorSubcommand, VibrateCmd, VibrateSubcommand,
},
},
util::async_manager
};
use async_channel::Sender;
use broadcaster::BroadcastChannel;
use futures::{channel::mpsc::SendError, future, sink::SinkExt};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing_futures::Instrument;
#[derive(Clone)]
pub enum ButtplugClientDeviceEvent {
DeviceDisconnect,
ClientDisconnect,
Message(ButtplugCurrentSpecServerMessage),
}
pub enum VibrateCommand {
Speed(f64),
SpeedVec(Vec<f64>),
SpeedMap(HashMap<u32, f64>),
}
pub enum RotateCommand {
Rotate(f64, bool),
RotateVec(Vec<(f64, bool)>),
RotateMap(HashMap<u32, (f64, bool)>),
}
pub enum LinearCommand {
Linear(u32, f64),
LinearVec(Vec<(u32, f64)>),
LinearMap(HashMap<u32, (u32, f64)>),
}
#[derive(Clone)]
pub struct ButtplugClientDevice {
pub name: String,
pub(super) index: u32,
pub allowed_messages: MessageAttributesMap,
message_sender: Sender<ButtplugClientRequest>,
event_receiver: BroadcastChannel<ButtplugClientDeviceEvent>,
device_connected: Arc<AtomicBool>,
client_connected: Arc<AtomicBool>,
}
unsafe impl Send for ButtplugClientDevice {}
unsafe impl Sync for ButtplugClientDevice {}
impl ButtplugClientDevice {
pub(super) fn new(
name: &str,
index: u32,
allowed_messages: MessageAttributesMap,
message_sender: Sender<ButtplugClientRequest>,
event_receiver: BroadcastChannel<ButtplugClientDeviceEvent>,
) -> Self {
info!("Creating client device {} with index {} and messages {:?}.", name, index, allowed_messages);
let mut disconnect_receiver = event_receiver.clone();
let device_connected = Arc::new(AtomicBool::new(true));
let client_connected = Arc::new(AtomicBool::new(true));
let device_connected_clone = device_connected.clone();
let client_connected_clone = client_connected.clone();
async_manager::spawn(async move {
debug!("Entering client device disconnection loop.");
loop {
match disconnect_receiver.recv().await.unwrap() {
ButtplugClientDeviceEvent::ClientDisconnect => {
debug!("Client disconnected.");
device_connected_clone.store(false, Ordering::SeqCst);
client_connected_clone.store(false, Ordering::SeqCst);
break;
}
ButtplugClientDeviceEvent::DeviceDisconnect => {
debug!("Device disconnected.");
device_connected_clone.store(false, Ordering::SeqCst);
break;
}
ButtplugClientDeviceEvent::Message(_) => {
continue;
}
}
}
debug!("Exiting client device disconnection loop.");
}.instrument(tracing::info_span!("Client Device {} Disconnect Loop", index))).unwrap();
Self {
name: name.to_owned(),
index,
allowed_messages,
message_sender,
event_receiver,
device_connected,
client_connected,
}
}
fn send_message(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture<ButtplugCurrentSpecServerMessage> {
let message_sender = self.message_sender.clone();
let client_connected = self.client_connected.clone();
let device_connected = self.device_connected.clone();
let id = msg.get_id();
let device_name = self.name.clone();
Box::pin(async move {
if !client_connected.load(Ordering::SeqCst) {
error!("Client not connected, cannot run device command");
return Err(ButtplugConnectorError::ConnectorNotConnected.into());
} else if !device_connected.load(Ordering::SeqCst) {
error!("Device not connected, cannot run device command");
return Err(ButtplugError::from(ButtplugDeviceError::DeviceNotConnected(device_name)).into());
}
let fut = ButtplugClientMessageFuture::default();
message_sender
.send(ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(
msg.clone(),
fut.get_state_clone(),
)))
.await
.map_err(|_| ButtplugClientError::ButtplugConnectorError(ButtplugConnectorError::ConnectorChannelClosed))?;
match fut.await {
Ok(msg) => {
if let ButtplugCurrentSpecServerMessage::Error(_err) = msg {
Err(ButtplugError::from(_err).into())
} else {
Ok(msg)
}
}
Err(connector_err) => Err(connector_err.into()),
}
}.instrument(tracing::trace_span!("ClientDeviceSendFuture for {}", id)))
}
pub fn event_receiver(
&self,
) -> impl SinkExt<ButtplugClientDeviceEvent, Error = SendError> + Sync + Send {
self.event_receiver.clone()
}
fn create_boxed_future_client_error(&self, err: ButtplugError) -> ButtplugClientResultFuture {
Box::pin(future::ready(Err(ButtplugClientError::ButtplugError(err))))
}
fn send_message_expect_ok(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture {
let send_fut = self.send_message(msg);
Box::pin(async move {
match send_fut.await? {
ButtplugCurrentSpecServerMessage::Ok(_) => Ok(()),
ButtplugCurrentSpecServerMessage::Error(_err) => Err(ButtplugError::from(_err).into()),
msg => Err(ButtplugError::from(ButtplugMessageError::UnexpectedMessageType(format!("{:?}", msg))).into()),
}
})
}
pub fn vibrate(&self, speed_cmd: VibrateCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::VibrateCmd)
{
return self.create_boxed_future_client_error(ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::VibrateCmd).into());
}
let mut vibrator_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::VibrateCmd)
{
if let Some(v) = features.feature_count {
vibrator_count = v;
}
}
let mut speed_vec: Vec<VibrateSubcommand>;
match speed_cmd {
VibrateCommand::Speed(speed) => {
speed_vec = Vec::with_capacity(vibrator_count as usize);
for i in 0..vibrator_count {
speed_vec.push(VibrateSubcommand::new(i, speed));
}
}
VibrateCommand::SpeedMap(map) => {
if map.len() as u32 > vibrator_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
vibrator_count,
map.len() as u32
).into());
}
speed_vec = Vec::with_capacity(map.len() as usize);
for (idx, speed) in map {
if idx > vibrator_count - 1 {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureIndexError(vibrator_count, idx).into());
}
speed_vec.push(VibrateSubcommand::new(idx, speed));
}
}
VibrateCommand::SpeedVec(vec) => {
if vec.len() as u32 > vibrator_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
vibrator_count,
vec.len() as u32
).into());
}
speed_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
speed_vec.push(VibrateSubcommand::new(i as u32, *v));
}
}
}
let msg = VibrateCmd::new(self.index, speed_vec).into();
self.send_message_expect_ok(msg)
}
pub fn linear(&self, linear_cmd: LinearCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::LinearCmd)
{
return self.create_boxed_future_client_error(ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::LinearCmd).into());
}
let mut linear_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::LinearCmd)
{
if let Some(v) = features.feature_count {
linear_count = v;
}
}
let mut linear_vec: Vec<VectorSubcommand>;
match linear_cmd {
LinearCommand::Linear(dur, pos) => {
linear_vec = Vec::with_capacity(linear_count as usize);
for i in 0..linear_count {
linear_vec.push(VectorSubcommand::new(i, dur, pos));
}
}
LinearCommand::LinearMap(map) => {
if map.len() as u32 > linear_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
linear_count,
map.len() as u32
).into());
}
linear_vec = Vec::with_capacity(map.len() as usize);
for (idx, (dur, pos)) in map {
if idx > linear_count - 1 {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureIndexError(linear_count, idx).into());
}
linear_vec.push(VectorSubcommand::new(idx, dur, pos));
}
}
LinearCommand::LinearVec(vec) => {
if vec.len() as u32 > linear_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
linear_count,
vec.len() as u32
).into());
}
linear_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
linear_vec.push(VectorSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = LinearCmd::new(self.index, linear_vec).into();
self.send_message_expect_ok(msg)
}
pub fn rotate(&self, rotate_cmd: RotateCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::RotateCmd)
{
return self.create_boxed_future_client_error(ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RotateCmd).into());
}
let mut rotate_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::RotateCmd)
{
if let Some(v) = features.feature_count {
rotate_count = v;
}
}
let mut rotate_vec: Vec<RotationSubcommand>;
match rotate_cmd {
RotateCommand::Rotate(speed, clockwise) => {
rotate_vec = Vec::with_capacity(rotate_count as usize);
for i in 0..rotate_count {
rotate_vec.push(RotationSubcommand::new(i, speed, clockwise));
}
}
RotateCommand::RotateMap(map) => {
if map.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
rotate_count,
map.len() as u32
).into());
}
rotate_vec = Vec::with_capacity(map.len() as usize);
for (idx, (speed, clockwise)) in map {
if idx > rotate_count - 1 {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureIndexError(rotate_count, idx).into());
}
rotate_vec.push(RotationSubcommand::new(idx, speed, clockwise));
}
}
RotateCommand::RotateVec(vec) => {
if vec.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(ButtplugDeviceError::DeviceFeatureCountMismatch(
rotate_count,
vec.len() as u32
).into());
}
rotate_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
rotate_vec.push(RotationSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = RotateCmd::new(self.index, rotate_vec).into();
self.send_message_expect_ok(msg)
}
pub fn stop(&self) -> ButtplugClientResultFuture {
self
.send_message_expect_ok(StopDeviceCmd::default().into())
}
}
impl
From<(
&DeviceMessageInfo,
Sender<ButtplugClientRequest>,
BroadcastChannel<ButtplugClientDeviceEvent>,
)> for ButtplugClientDevice
{
fn from(
msg_sender_tuple: (
&DeviceMessageInfo,
Sender<ButtplugClientRequest>,
BroadcastChannel<ButtplugClientDeviceEvent>,
),
) -> Self {
let msg = msg_sender_tuple.0.clone();
ButtplugClientDevice::new(
&*msg.device_name,
msg.device_index,
msg.device_messages,
msg_sender_tuple.1,
msg_sender_tuple.2,
)
}
}