use super::{ButtplugClientError, ButtplugClientRequest, ButtplugClientResultFuture};
use crate::{
client::{ButtplugClientMessageFuture, ButtplugClientMessageFuturePair},
connector::ButtplugConnectorError,
core::{
errors::{ButtplugDeviceError, ButtplugError, ButtplugMessageError},
messages::{
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
ButtplugDeviceMessageType,
ButtplugMessage,
DeviceMessageInfo,
LinearCmd,
MessageAttributesMap,
RotateCmd,
RotationSubcommand,
StopDeviceCmd,
VectorSubcommand,
VibrateCmd,
VibrateSubcommand,
},
},
util::async_manager,
};
use async_channel::Sender;
use broadcaster::BroadcastChannel;
use futures::{future, StreamExt};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing_futures::Instrument;
#[derive(Clone)]
pub enum ButtplugClientDeviceEvent {
DeviceDisconnect,
ClientDisconnect,
Message(ButtplugCurrentSpecServerMessage),
}
pub enum VibrateCommand {
Speed(f64),
SpeedVec(Vec<f64>),
SpeedMap(HashMap<u32, f64>),
}
pub enum RotateCommand {
Rotate(f64, bool),
RotateVec(Vec<(f64, bool)>),
RotateMap(HashMap<u32, (f64, bool)>),
}
pub enum LinearCommand {
Linear(u32, f64),
LinearVec(Vec<(u32, f64)>),
LinearMap(HashMap<u32, (u32, f64)>),
}
#[derive(Clone)]
pub struct ButtplugClientDevice {
pub name: String,
index: u32,
pub allowed_messages: MessageAttributesMap,
message_sender: Sender<ButtplugClientRequest>,
event_receiver: BroadcastChannel<ButtplugClientDeviceEvent>,
device_connected: Arc<AtomicBool>,
client_connected: Arc<AtomicBool>,
}
unsafe impl Send for ButtplugClientDevice {
}
unsafe impl Sync for ButtplugClientDevice {
}
impl ButtplugClientDevice {
pub(super) fn new(
name: &str,
index: u32,
allowed_messages: MessageAttributesMap,
message_sender: Sender<ButtplugClientRequest>,
event_receiver: BroadcastChannel<ButtplugClientDeviceEvent>,
) -> Self {
info!(
"Creating client device {} with index {} and messages {:?}.",
name, index, allowed_messages
);
let mut disconnect_receiver = event_receiver.clone();
let device_connected = Arc::new(AtomicBool::new(true));
let client_connected = Arc::new(AtomicBool::new(true));
let device_connected_clone = device_connected.clone();
let client_connected_clone = client_connected.clone();
async_manager::spawn(
async move {
debug!("Entering client device disconnection loop.");
loop {
match disconnect_receiver.recv().await.unwrap() {
ButtplugClientDeviceEvent::ClientDisconnect => {
debug!("Client disconnected.");
device_connected_clone.store(false, Ordering::SeqCst);
client_connected_clone.store(false, Ordering::SeqCst);
break;
}
ButtplugClientDeviceEvent::DeviceDisconnect => {
debug!("Device disconnected.");
device_connected_clone.store(false, Ordering::SeqCst);
break;
}
ButtplugClientDeviceEvent::Message(_) => {
continue;
}
}
}
debug!("Exiting client device disconnection loop.");
}
.instrument(tracing::info_span!(
"Client Device {} Disconnect Loop",
index
)),
)
.unwrap();
Self {
name: name.to_owned(),
index,
allowed_messages,
message_sender,
event_receiver,
device_connected,
client_connected,
}
}
fn send_message(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture<ButtplugCurrentSpecServerMessage> {
let message_sender = self.message_sender.clone();
let client_connected = self.client_connected.clone();
let device_connected = self.device_connected.clone();
let id = msg.get_id();
let device_name = self.name.clone();
Box::pin(
async move {
if !client_connected.load(Ordering::SeqCst) {
error!("Client not connected, cannot run device command");
return Err(ButtplugConnectorError::ConnectorNotConnected.into());
} else if !device_connected.load(Ordering::SeqCst) {
error!("Device not connected, cannot run device command");
return Err(
ButtplugError::from(ButtplugDeviceError::DeviceNotConnected(device_name)).into(),
);
}
let fut = ButtplugClientMessageFuture::default();
message_sender
.send(ButtplugClientRequest::Message(
ButtplugClientMessageFuturePair::new(msg.clone(), fut.get_state_clone()),
))
.await
.map_err(|_| {
ButtplugClientError::ButtplugConnectorError(
ButtplugConnectorError::ConnectorChannelClosed,
)
})?;
let msg = fut.await?;
if let ButtplugCurrentSpecServerMessage::Error(_err) = msg {
Err(ButtplugError::from(_err).into())
} else {
Ok(msg)
}
}
.instrument(tracing::trace_span!("ClientDeviceSendFuture for {}", id)),
)
}
pub fn event_receiver(&self) -> impl StreamExt<Item = ButtplugClientDeviceEvent> + Sync + Send {
self.event_receiver.clone()
}
fn create_boxed_future_client_error(&self, err: ButtplugError) -> ButtplugClientResultFuture {
Box::pin(future::ready(Err(ButtplugClientError::ButtplugError(err))))
}
fn send_message_expect_ok(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture {
let send_fut = self.send_message(msg);
Box::pin(async move {
match send_fut.await? {
ButtplugCurrentSpecServerMessage::Ok(_) => Ok(()),
ButtplugCurrentSpecServerMessage::Error(_err) => Err(ButtplugError::from(_err).into()),
msg => Err(
ButtplugError::from(ButtplugMessageError::UnexpectedMessageType(format!(
"{:?}",
msg
)))
.into(),
),
}
})
}
pub fn vibrate(&self, speed_cmd: VibrateCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::VibrateCmd)
{
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::VibrateCmd).into(),
);
}
let mut vibrator_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::VibrateCmd)
{
if let Some(v) = features.feature_count {
vibrator_count = v;
}
}
let mut speed_vec: Vec<VibrateSubcommand>;
match speed_cmd {
VibrateCommand::Speed(speed) => {
speed_vec = Vec::with_capacity(vibrator_count as usize);
for i in 0..vibrator_count {
speed_vec.push(VibrateSubcommand::new(i, speed));
}
}
VibrateCommand::SpeedMap(map) => {
if map.len() as u32 > vibrator_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(vibrator_count, map.len() as u32)
.into(),
);
}
speed_vec = Vec::with_capacity(map.len() as usize);
for (idx, speed) in map {
if idx > vibrator_count - 1 {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(vibrator_count, idx).into(),
);
}
speed_vec.push(VibrateSubcommand::new(idx, speed));
}
}
VibrateCommand::SpeedVec(vec) => {
if vec.len() as u32 > vibrator_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(vibrator_count, vec.len() as u32)
.into(),
);
}
speed_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
speed_vec.push(VibrateSubcommand::new(i as u32, *v));
}
}
}
let msg = VibrateCmd::new(self.index, speed_vec).into();
self.send_message_expect_ok(msg)
}
pub fn linear(&self, linear_cmd: LinearCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::LinearCmd)
{
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::LinearCmd).into(),
);
}
let mut linear_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::LinearCmd)
{
if let Some(v) = features.feature_count {
linear_count = v;
}
}
let mut linear_vec: Vec<VectorSubcommand>;
match linear_cmd {
LinearCommand::Linear(dur, pos) => {
linear_vec = Vec::with_capacity(linear_count as usize);
for i in 0..linear_count {
linear_vec.push(VectorSubcommand::new(i, dur, pos));
}
}
LinearCommand::LinearMap(map) => {
if map.len() as u32 > linear_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(linear_count, map.len() as u32).into(),
);
}
linear_vec = Vec::with_capacity(map.len() as usize);
for (idx, (dur, pos)) in map {
if idx > linear_count - 1 {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(linear_count, idx).into(),
);
}
linear_vec.push(VectorSubcommand::new(idx, dur, pos));
}
}
LinearCommand::LinearVec(vec) => {
if vec.len() as u32 > linear_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(linear_count, vec.len() as u32).into(),
);
}
linear_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
linear_vec.push(VectorSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = LinearCmd::new(self.index, linear_vec).into();
self.send_message_expect_ok(msg)
}
pub fn rotate(&self, rotate_cmd: RotateCommand) -> ButtplugClientResultFuture {
if !self
.allowed_messages
.contains_key(&ButtplugDeviceMessageType::RotateCmd)
{
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RotateCmd).into(),
);
}
let mut rotate_count: u32 = 0;
if let Some(features) = self
.allowed_messages
.get(&ButtplugDeviceMessageType::RotateCmd)
{
if let Some(v) = features.feature_count {
rotate_count = v;
}
}
let mut rotate_vec: Vec<RotationSubcommand>;
match rotate_cmd {
RotateCommand::Rotate(speed, clockwise) => {
rotate_vec = Vec::with_capacity(rotate_count as usize);
for i in 0..rotate_count {
rotate_vec.push(RotationSubcommand::new(i, speed, clockwise));
}
}
RotateCommand::RotateMap(map) => {
if map.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(rotate_count, map.len() as u32).into(),
);
}
rotate_vec = Vec::with_capacity(map.len() as usize);
for (idx, (speed, clockwise)) in map {
if idx > rotate_count - 1 {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(rotate_count, idx).into(),
);
}
rotate_vec.push(RotationSubcommand::new(idx, speed, clockwise));
}
}
RotateCommand::RotateVec(vec) => {
if vec.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(rotate_count, vec.len() as u32).into(),
);
}
rotate_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
rotate_vec.push(RotationSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = RotateCmd::new(self.index, rotate_vec).into();
self.send_message_expect_ok(msg)
}
pub fn stop(&self) -> ButtplugClientResultFuture {
self.send_message_expect_ok(StopDeviceCmd::default().into())
}
pub fn index(&self) -> u32 {
self.index
}
}
impl Eq for ButtplugClientDevice {
}
impl PartialEq for ButtplugClientDevice {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}
impl
From<(
&DeviceMessageInfo,
Sender<ButtplugClientRequest>,
BroadcastChannel<ButtplugClientDeviceEvent>,
)> for ButtplugClientDevice
{
fn from(
msg_sender_tuple: (
&DeviceMessageInfo,
Sender<ButtplugClientRequest>,
BroadcastChannel<ButtplugClientDeviceEvent>,
),
) -> Self {
let msg = msg_sender_tuple.0.clone();
ButtplugClientDevice::new(
&*msg.device_name,
msg.device_index,
msg.device_messages,
msg_sender_tuple.1,
msg_sender_tuple.2,
)
}
}