use super::{
ButtplugClientError,
ButtplugClientMessageFuturePair,
ButtplugClientRequest,
ButtplugClientResultFuture,
ButtplugServerMessageFuture,
};
use crate::{
core::{
connector::ButtplugConnectorError,
errors::{ButtplugDeviceError, ButtplugError, ButtplugMessageError},
message::{
ActuatorType,
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
ButtplugDeviceMessageType,
ButtplugMessage,
ClientDeviceMessageAttributes,
ClientGenericDeviceMessageAttributes,
DeviceMessageInfo,
Endpoint,
LinearCmd,
RawReadCmd,
RawSubscribeCmd,
RawUnsubscribeCmd,
RawWriteCmd,
RotateCmd,
RotationSubcommand,
ScalarCmd,
ScalarSubcommand,
SensorReadCmd,
SensorSubscribeCmd,
SensorType,
SensorUnsubscribeCmd,
StopDeviceCmd,
VectorSubcommand,
},
},
util::stream::convert_broadcast_receiver_to_stream,
};
use futures::{future, FutureExt, Stream};
use getset::{CopyGetters, Getters};
use std::{
collections::HashMap,
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::broadcast;
use tracing_futures::Instrument;
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ButtplugClientDeviceEvent {
DeviceRemoved,
ClientDisconnect,
Message(ButtplugCurrentSpecServerMessage),
}
pub enum ScalarCommand {
Scalar((f64, ActuatorType)),
ScalarVec(Vec<(f64, ActuatorType)>),
ScalarMap(HashMap<u32, (f64, ActuatorType)>),
}
pub enum ScalarValueCommand {
ScalarValue(f64),
ScalarValueVec(Vec<f64>),
ScalarValueMap(HashMap<u32, f64>),
}
pub enum RotateCommand {
Rotate(f64, bool),
RotateVec(Vec<(f64, bool)>),
RotateMap(HashMap<u32, (f64, bool)>),
}
pub enum LinearCommand {
Linear(u32, f64),
LinearVec(Vec<(u32, f64)>),
LinearMap(HashMap<u32, (u32, f64)>),
}
#[derive(Getters, CopyGetters)]
pub struct ButtplugClientDevice {
#[getset(get = "pub")]
name: String,
#[getset(get = "pub")]
display_name: Option<String>,
#[getset(get_copy = "pub")]
index: u32,
#[getset(get = "pub")]
message_attributes: ClientDeviceMessageAttributes,
event_loop_sender: broadcast::Sender<ButtplugClientRequest>,
internal_event_sender: broadcast::Sender<ButtplugClientDeviceEvent>,
device_connected: Arc<AtomicBool>,
client_connected: Arc<AtomicBool>,
}
impl ButtplugClientDevice {
pub(super) fn new(
name: &str,
display_name: &Option<String>,
index: u32,
message_attributes: &ClientDeviceMessageAttributes,
message_sender: broadcast::Sender<ButtplugClientRequest>,
) -> Self {
info!(
"Creating client device {} with index {} and messages {:?}.",
name, index, message_attributes
);
let (event_sender, _) = broadcast::channel(256);
let device_connected = Arc::new(AtomicBool::new(true));
let client_connected = Arc::new(AtomicBool::new(true));
Self {
name: name.to_owned(),
display_name: display_name.clone(),
index,
message_attributes: message_attributes.clone(),
event_loop_sender: message_sender,
internal_event_sender: event_sender,
device_connected,
client_connected,
}
}
pub(super) fn new_from_device_info(
info: &DeviceMessageInfo,
sender: broadcast::Sender<ButtplugClientRequest>,
) -> Self {
ButtplugClientDevice::new(
info.device_name(),
info.device_display_name(),
info.device_index(),
info.device_messages(),
sender,
)
}
pub fn connected(&self) -> bool {
self.device_connected.load(Ordering::SeqCst)
}
fn send_message(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture<ButtplugCurrentSpecServerMessage> {
let message_sender = self.event_loop_sender.clone();
let client_connected = self.client_connected.clone();
let device_connected = self.device_connected.clone();
let id = msg.id();
let device_name = self.name.clone();
async move {
if !client_connected.load(Ordering::SeqCst) {
error!("Client not connected, cannot run device command");
return Err(ButtplugConnectorError::ConnectorNotConnected.into());
} else if !device_connected.load(Ordering::SeqCst) {
error!("Device not connected, cannot run device command");
return Err(
ButtplugError::from(ButtplugDeviceError::DeviceNotConnected(device_name)).into(),
);
}
let fut = ButtplugServerMessageFuture::default();
message_sender
.send(ButtplugClientRequest::Message(
ButtplugClientMessageFuturePair::new(msg.clone(), fut.get_state_clone()),
))
.map_err(|_| {
ButtplugClientError::ButtplugConnectorError(
ButtplugConnectorError::ConnectorChannelClosed,
)
})?;
let msg = fut.await?;
if let ButtplugCurrentSpecServerMessage::Error(_err) = msg {
Err(ButtplugError::from(_err).into())
} else {
Ok(msg)
}
}
.instrument(tracing::trace_span!("ClientDeviceSendFuture for {}", id))
.boxed()
}
pub fn event_stream(&self) -> Box<dyn Stream<Item = ButtplugClientDeviceEvent> + Send + Unpin> {
Box::new(Box::pin(convert_broadcast_receiver_to_stream(
self.internal_event_sender.subscribe(),
)))
}
fn create_boxed_future_client_error<T>(&self, err: ButtplugError) -> ButtplugClientResultFuture<T>
where
T: 'static + Send + Sync,
{
future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
}
fn send_message_expect_ok(
&self,
msg: ButtplugCurrentSpecClientMessage,
) -> ButtplugClientResultFuture {
let send_fut = self.send_message(msg);
async move {
match send_fut.await? {
ButtplugCurrentSpecServerMessage::Ok(_) => Ok(()),
ButtplugCurrentSpecServerMessage::Error(_err) => Err(ButtplugError::from(_err).into()),
msg => Err(
ButtplugError::from(ButtplugMessageError::UnexpectedMessageType(format!(
"{:?}",
msg
)))
.into(),
),
}
}
.boxed()
}
fn scalar_value_attributes(
&self,
actuator: &ActuatorType,
) -> Vec<ClientGenericDeviceMessageAttributes> {
if let Some(attrs) = self.message_attributes.scalar_cmd() {
attrs
.iter()
.filter(|x| *x.actuator_type() == *actuator)
.cloned()
.collect()
} else {
vec![]
}
}
pub fn scalar_attributes(&self) -> Vec<ClientGenericDeviceMessageAttributes> {
if let Some(attrs) = self.message_attributes.scalar_cmd() {
attrs.clone()
} else {
vec![]
}
}
fn scalar_from_value_command(
&self,
value_cmd: &ScalarValueCommand,
actuator: &ActuatorType,
attrs: &Vec<ClientGenericDeviceMessageAttributes>,
) -> ButtplugClientResultFuture {
if attrs.is_empty() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::UnhandledCommand(format!(
"ScalarCmd with {actuator} is not handled by this device"
))
.into(),
);
}
let mut scalar_vec: Vec<ScalarSubcommand>;
let scalar_count: u32 = attrs.len() as u32;
match value_cmd {
ScalarValueCommand::ScalarValue(speed) => {
scalar_vec = Vec::with_capacity(scalar_count as usize);
for attr in attrs {
scalar_vec.push(ScalarSubcommand::new(*attr.index(), *speed, *actuator));
}
}
ScalarValueCommand::ScalarValueMap(map) => {
if map.len() as u32 > scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(scalar_count, map.len() as u32).into(),
);
}
scalar_vec = Vec::with_capacity(map.len() as usize);
for (idx, speed) in map {
if *idx >= scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(scalar_count, *idx).into(),
);
}
scalar_vec.push(ScalarSubcommand::new(
*attrs[*idx as usize].index(),
*speed,
*actuator,
));
}
}
ScalarValueCommand::ScalarValueVec(vec) => {
if vec.len() as u32 > scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(scalar_count, vec.len() as u32).into(),
);
}
scalar_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
scalar_vec.push(ScalarSubcommand::new(*attrs[i].index(), *v, *actuator));
}
}
}
let msg = ScalarCmd::new(self.index, scalar_vec).into();
info!("{:?}", msg);
self.send_message_expect_ok(msg)
}
pub fn vibrate_attributes(&self) -> Vec<ClientGenericDeviceMessageAttributes> {
self.scalar_value_attributes(&ActuatorType::Vibrate)
}
pub fn vibrate(&self, speed_cmd: &ScalarValueCommand) -> ButtplugClientResultFuture {
self.scalar_from_value_command(
speed_cmd,
&ActuatorType::Vibrate,
&self.vibrate_attributes(),
)
}
pub fn oscillate_attributes(&self) -> Vec<ClientGenericDeviceMessageAttributes> {
self.scalar_value_attributes(&ActuatorType::Oscillate)
}
pub fn oscillate(&self, speed_cmd: &ScalarValueCommand) -> ButtplugClientResultFuture {
self.scalar_from_value_command(
speed_cmd,
&ActuatorType::Oscillate,
&self.oscillate_attributes(),
)
}
pub fn scalar(&self, scalar_cmd: &ScalarCommand) -> ButtplugClientResultFuture {
if self.message_attributes.scalar_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::VibrateCmd).into(),
);
}
let scalar_count: u32 = self
.message_attributes
.scalar_cmd()
.as_ref()
.expect("Already checked existence")
.len() as u32;
let mut scalar_vec: Vec<ScalarSubcommand>;
match scalar_cmd {
ScalarCommand::Scalar((scalar, actuator)) => {
scalar_vec = Vec::with_capacity(scalar_count as usize);
for i in 0..scalar_count {
scalar_vec.push(ScalarSubcommand::new(i, *scalar, *actuator));
}
}
ScalarCommand::ScalarMap(map) => {
if map.len() as u32 > scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(scalar_count, map.len() as u32).into(),
);
}
scalar_vec = Vec::with_capacity(map.len() as usize);
for (idx, (scalar, actuator)) in map {
if *idx >= scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(scalar_count, *idx).into(),
);
}
scalar_vec.push(ScalarSubcommand::new(*idx, *scalar, *actuator));
}
}
ScalarCommand::ScalarVec(vec) => {
if vec.len() as u32 > scalar_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(scalar_count, vec.len() as u32).into(),
);
}
scalar_vec = Vec::with_capacity(vec.len() as usize);
for (i, (scalar, actuator)) in vec.iter().enumerate() {
scalar_vec.push(ScalarSubcommand::new(i as u32, *scalar, *actuator));
}
}
}
let msg = ScalarCmd::new(self.index, scalar_vec).into();
self.send_message_expect_ok(msg)
}
pub fn linear_attributes(&self) -> Vec<ClientGenericDeviceMessageAttributes> {
if let Some(attrs) = self.message_attributes.linear_cmd() {
attrs.clone()
} else {
vec![]
}
}
pub fn linear(&self, linear_cmd: &LinearCommand) -> ButtplugClientResultFuture {
if self.message_attributes.linear_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::LinearCmd).into(),
);
}
let linear_count: u32 = self.message_attributes.linear_cmd().as_ref().unwrap().len() as u32;
let mut linear_vec: Vec<VectorSubcommand>;
match linear_cmd {
LinearCommand::Linear(dur, pos) => {
linear_vec = Vec::with_capacity(linear_count as usize);
for i in 0..linear_count {
linear_vec.push(VectorSubcommand::new(i, *dur, *pos));
}
}
LinearCommand::LinearMap(map) => {
if map.len() as u32 > linear_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(linear_count, map.len() as u32).into(),
);
}
linear_vec = Vec::with_capacity(map.len() as usize);
for (idx, (dur, pos)) in map {
if *idx >= linear_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(linear_count, *idx).into(),
);
}
linear_vec.push(VectorSubcommand::new(*idx, *dur, *pos));
}
}
LinearCommand::LinearVec(vec) => {
if vec.len() as u32 > linear_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(linear_count, vec.len() as u32).into(),
);
}
linear_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
linear_vec.push(VectorSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = LinearCmd::new(self.index, linear_vec).into();
self.send_message_expect_ok(msg)
}
pub fn rotate_attributes(&self) -> Vec<ClientGenericDeviceMessageAttributes> {
if let Some(attrs) = self.message_attributes.linear_cmd() {
attrs.clone()
} else {
vec![]
}
}
pub fn rotate(&self, rotate_cmd: &RotateCommand) -> ButtplugClientResultFuture {
if self.message_attributes.rotate_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RotateCmd).into(),
);
}
let rotate_count: u32 = self.message_attributes.rotate_cmd().as_ref().unwrap().len() as u32;
let mut rotate_vec: Vec<RotationSubcommand>;
match rotate_cmd {
RotateCommand::Rotate(speed, clockwise) => {
rotate_vec = Vec::with_capacity(rotate_count as usize);
for i in 0..rotate_count {
rotate_vec.push(RotationSubcommand::new(i, *speed, *clockwise));
}
}
RotateCommand::RotateMap(map) => {
if map.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(rotate_count, map.len() as u32).into(),
);
}
rotate_vec = Vec::with_capacity(map.len() as usize);
for (idx, (speed, clockwise)) in map {
if *idx > rotate_count - 1 {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureIndexError(rotate_count, *idx).into(),
);
}
rotate_vec.push(RotationSubcommand::new(*idx, *speed, *clockwise));
}
}
RotateCommand::RotateVec(vec) => {
if vec.len() as u32 > rotate_count {
return self.create_boxed_future_client_error(
ButtplugDeviceError::DeviceFeatureCountMismatch(rotate_count, vec.len() as u32).into(),
);
}
rotate_vec = Vec::with_capacity(vec.len() as usize);
for (i, v) in vec.iter().enumerate() {
rotate_vec.push(RotationSubcommand::new(i as u32, v.0, v.1));
}
}
}
let msg = RotateCmd::new(self.index, rotate_vec).into();
self.send_message_expect_ok(msg)
}
pub fn subscribe_sensor(
&self,
sensor_index: u32,
sensor_type: SensorType,
) -> ButtplugClientResultFuture {
if self.message_attributes.sensor_subscribe_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::SensorSubscribeCmd)
.into(),
);
}
let msg = SensorSubscribeCmd::new(self.index, sensor_index, sensor_type).into();
self.send_message_expect_ok(msg)
}
pub fn unsubscribe_sensor(
&self,
sensor_index: u32,
sensor_type: SensorType,
) -> ButtplugClientResultFuture {
if self.message_attributes.sensor_subscribe_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::SensorSubscribeCmd)
.into(),
);
}
let msg = SensorUnsubscribeCmd::new(self.index, sensor_index, sensor_type).into();
self.send_message_expect_ok(msg)
}
fn read_single_sensor(&self, sensor_type: &SensorType) -> ButtplugClientResultFuture<Vec<i32>> {
if self.message_attributes.sensor_read_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::SensorReadCmd).into(),
);
}
let sensor_indexes: Vec<u32> = self
.message_attributes
.sensor_read_cmd()
.as_ref()
.expect("Already check existence")
.iter()
.enumerate()
.filter(|x| *x.1.sensor_type() == *sensor_type)
.map(|x| x.0 as u32)
.collect();
if sensor_indexes.len() != 1 {
return self.create_boxed_future_client_error(
ButtplugDeviceError::ProtocolSensorNotSupported(*sensor_type).into(),
);
}
let msg = SensorReadCmd::new(self.index, sensor_indexes[0], *sensor_type).into();
let reply = self.send_message(msg);
async move {
if let ButtplugCurrentSpecServerMessage::SensorReading(data) = reply.await? {
Ok(data.data().clone())
} else {
Err(
ButtplugError::ButtplugMessageError(ButtplugMessageError::UnexpectedMessageType(
"SensorReading".to_owned(),
))
.into(),
)
}
}
.boxed()
}
fn has_sensor_read(&self, sensor_type: SensorType) -> bool {
if let Some(sensor_attrs) = self.message_attributes.sensor_read_cmd() {
sensor_attrs.iter().any(|x| *x.sensor_type() == sensor_type)
} else {
false
}
}
pub fn has_battery_level(&self) -> bool {
self.has_sensor_read(SensorType::Battery)
}
pub fn battery_level(&self) -> ButtplugClientResultFuture<f64> {
let send_fut = self.read_single_sensor(&SensorType::Battery);
Box::pin(async move {
let data = send_fut.await?;
let battery_level = data[0];
Ok(battery_level as f64 / 100.0f64)
})
}
pub fn has_rssi_level(&self) -> bool {
self.has_sensor_read(SensorType::RSSI)
}
pub fn rssi_level(&self) -> ButtplugClientResultFuture<i32> {
let send_fut = self.read_single_sensor(&SensorType::RSSI);
Box::pin(async move {
let data = send_fut.await?;
Ok(data[0])
})
}
pub fn raw_write(
&self,
endpoint: Endpoint,
data: &[u8],
write_with_response: bool,
) -> ButtplugClientResultFuture {
if self.message_attributes.raw_write_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RawWriteCmd).into(),
);
}
let msg = ButtplugCurrentSpecClientMessage::RawWriteCmd(RawWriteCmd::new(
self.index,
endpoint,
data,
write_with_response,
));
self.send_message_expect_ok(msg)
}
pub fn raw_read(
&self,
endpoint: Endpoint,
expected_length: u32,
timeout: u32,
) -> ButtplugClientResultFuture<Vec<u8>> {
if self.message_attributes.raw_read_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RawReadCmd).into(),
);
}
let msg = ButtplugCurrentSpecClientMessage::RawReadCmd(RawReadCmd::new(
self.index,
endpoint,
expected_length,
timeout,
));
let send_fut = self.send_message(msg);
async move {
match send_fut.await? {
ButtplugCurrentSpecServerMessage::RawReading(reading) => Ok(reading.data().clone()),
ButtplugCurrentSpecServerMessage::Error(err) => Err(ButtplugError::from(err).into()),
msg => Err(
ButtplugError::from(ButtplugMessageError::UnexpectedMessageType(format!(
"{:?}",
msg
)))
.into(),
),
}
}
.boxed()
}
pub fn raw_subscribe(&self, endpoint: Endpoint) -> ButtplugClientResultFuture {
if self.message_attributes.raw_subscribe_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RawSubscribeCmd).into(),
);
}
let msg =
ButtplugCurrentSpecClientMessage::RawSubscribeCmd(RawSubscribeCmd::new(self.index, endpoint));
self.send_message_expect_ok(msg)
}
pub fn raw_unsubscribe(&self, endpoint: Endpoint) -> ButtplugClientResultFuture {
if self.message_attributes.raw_subscribe_cmd().is_none() {
return self.create_boxed_future_client_error(
ButtplugDeviceError::MessageNotSupported(ButtplugDeviceMessageType::RawSubscribeCmd).into(),
);
}
let msg = ButtplugCurrentSpecClientMessage::RawUnsubscribeCmd(RawUnsubscribeCmd::new(
self.index, endpoint,
));
self.send_message_expect_ok(msg)
}
pub fn stop(&self) -> ButtplugClientResultFuture {
self.send_message_expect_ok(StopDeviceCmd::new(self.index).into())
}
pub(super) fn set_device_connected(&self, connected: bool) {
self.device_connected.store(connected, Ordering::SeqCst);
}
pub(super) fn set_client_connected(&self, connected: bool) {
self.client_connected.store(connected, Ordering::SeqCst);
}
pub(super) fn queue_event(&self, event: ButtplugClientDeviceEvent) {
if self.internal_event_sender.receiver_count() == 0 {
debug!("No handlers for device event, dropping event: {:?}", event);
return;
}
self
.internal_event_sender
.send(event)
.expect("Checked for receivers already.");
}
}
impl Eq for ButtplugClientDevice {
}
impl PartialEq for ButtplugClientDevice {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}
impl fmt::Debug for ButtplugClientDevice {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ButtplugClientDevice")
.field("name", &self.name)
.field("index", &self.index)
.finish()
}
}