use super::transport::{ButtplugConnectorTransport, ButtplugTransportMessage};
use crate::{
connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
core::{
errors::{ButtplugMessageError, ButtplugServerError},
messages::{
serializer::{
ButtplugClientJSONSerializer,
ButtplugMessageSerializer,
ButtplugSerializedMessage,
},
ButtplugClientMessage,
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
ButtplugMessage,
ButtplugServerMessage,
},
},
util::async_manager,
};
use async_channel::{bounded, Receiver, Sender};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use std::marker::PhantomData;
enum ButtplugRemoteConnectorMessage<T>
where
T: ButtplugMessage + 'static,
{
Message(T),
Close,
}
enum StreamValue<T>
where
T: ButtplugMessage + 'static,
{
NoValue,
Incoming(ButtplugTransportMessage),
Outgoing(ButtplugRemoteConnectorMessage<T>),
}
async fn remote_connector_event_loop<
TransportType,
SerializerType,
OutboundMessageType,
InboundMessageType,
>(
mut connector_outgoing_recv: Receiver<ButtplugRemoteConnectorMessage<OutboundMessageType>>,
connector_incoming_sender: Sender<Result<InboundMessageType, ButtplugServerError>>,
transport: TransportType,
transport_outgoing_sender: Sender<ButtplugSerializedMessage>,
mut transport_incoming_recv: Receiver<ButtplugTransportMessage>,
) where
TransportType: ButtplugConnectorTransport + 'static,
SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
+ 'static,
OutboundMessageType: ButtplugMessage + 'static,
InboundMessageType: ButtplugMessage + 'static,
{
let mut serializer = SerializerType::default();
loop {
let mut stream_return = select! {
transport = transport_incoming_recv.next().fuse() =>
match transport {
Some(msg) => StreamValue::Incoming(msg),
None => StreamValue::NoValue,
},
connector = connector_outgoing_recv.next().fuse() =>
match connector {
Some(msg) => StreamValue::Outgoing(msg),
None => StreamValue::NoValue,
}
};
match stream_return {
StreamValue::NoValue => break,
StreamValue::Incoming(remote_msg) => {
match remote_msg {
ButtplugTransportMessage::Message(serialized_msg) => {
match serializer.deserialize(serialized_msg) {
Ok(array) => {
for smsg in array {
if connector_incoming_sender.send(Ok(smsg)).await.is_err() {
error!("Connector has disconnected, ending remote connector loop.");
return;
}
}
}
Err(e) => {
let error_str =
format!("Got invalid messages from remote Buttplug Server: {:?}", e);
error!("{}", error_str);
let _ = connector_incoming_sender
.send(Err(
ButtplugMessageError::MessageSerializationError(e).into(),
))
.await;
}
}
}
ButtplugTransportMessage::Close(s) => {
info!("Connector closing connection {}", s);
break;
}
ButtplugTransportMessage::Connected => {}
ButtplugTransportMessage::Error(_) => {}
}
}
StreamValue::Outgoing(ref mut buttplug_msg) => {
match buttplug_msg {
ButtplugRemoteConnectorMessage::Message(msg) => {
let serialized_msg = serializer.serialize(vec![msg.clone()]);
if transport_outgoing_sender
.send(serialized_msg)
.await
.is_err()
{
error!("Transport has disconnected, exiting remote connector loop.");
return;
}
}
ButtplugRemoteConnectorMessage::Close => {
if let Err(e) = transport.disconnect().await {
error!("Error disconnecting transport: {:?}", e);
}
break;
}
}
}
}
}
}
pub type ButtplugRemoteClientConnector<
TransportType,
SerializerType = ButtplugClientJSONSerializer,
> = ButtplugRemoteConnector<
TransportType,
SerializerType,
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
>;
pub type ButtplugRemoteServerConnector<TransportType, SerializerType> = ButtplugRemoteConnector<
TransportType,
SerializerType,
ButtplugServerMessage,
ButtplugClientMessage,
>;
pub struct ButtplugRemoteConnector<
TransportType,
SerializerType,
OutboundMessageType,
InboundMessageType,
> where
TransportType: ButtplugConnectorTransport + 'static,
SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
+ 'static,
OutboundMessageType: ButtplugMessage + 'static,
InboundMessageType: ButtplugMessage + 'static,
{
transport: Option<TransportType>,
event_loop_sender: Option<Sender<ButtplugRemoteConnectorMessage<OutboundMessageType>>>,
dummy_serializer: PhantomData<SerializerType>,
}
impl<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
ButtplugRemoteConnector<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
where
TransportType: ButtplugConnectorTransport + 'static,
SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
+ 'static,
OutboundMessageType: ButtplugMessage + 'static,
InboundMessageType: ButtplugMessage + 'static,
{
pub fn new(transport: TransportType) -> Self {
Self {
transport: Some(transport),
event_loop_sender: None,
dummy_serializer: PhantomData::default(),
}
}
}
impl<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
ButtplugConnector<OutboundMessageType, InboundMessageType>
for ButtplugRemoteConnector<
TransportType,
SerializerType,
OutboundMessageType,
InboundMessageType,
>
where
TransportType: ButtplugConnectorTransport + 'static,
SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
+ 'static,
OutboundMessageType: ButtplugMessage + 'static,
InboundMessageType: ButtplugMessage + 'static,
{
fn connect(
&mut self,
) -> BoxFuture<
'static,
Result<Receiver<Result<InboundMessageType, ButtplugServerError>>, ButtplugConnectorError>,
> {
if self.transport.is_some() {
let transport = self.transport.take().unwrap();
let (connector_outgoing_sender, connector_outgoing_receiver) = bounded(256);
self.event_loop_sender = Some(connector_outgoing_sender);
Box::pin(async move {
match transport.connect().await {
Ok((transport_outgoing_sender, transport_incoming_receiver)) => {
let (connector_incoming_sender, connector_incoming_receiver) = bounded(256);
async_manager::spawn(async move {
remote_connector_event_loop::<
TransportType,
SerializerType,
OutboundMessageType,
InboundMessageType,
>(
connector_outgoing_receiver,
connector_incoming_sender,
transport,
transport_outgoing_sender,
transport_incoming_receiver,
)
.await
})
.unwrap();
Ok(connector_incoming_receiver)
}
Err(e) => Err(e),
}
})
} else {
ButtplugConnectorError::ConnectorAlreadyConnected.into()
}
}
fn disconnect(&self) -> ButtplugConnectorResultFuture {
if let Some(ref sender) = self.event_loop_sender {
let sender_clone = sender.clone();
Box::pin(async move {
sender_clone
.send(ButtplugRemoteConnectorMessage::Close)
.await
.map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
})
} else {
ButtplugConnectorError::ConnectorNotConnected.into()
}
}
fn send(&self, msg: OutboundMessageType) -> ButtplugConnectorResultFuture {
if let Some(ref sender) = self.event_loop_sender {
let sender_clone = sender.clone();
Box::pin(async move {
sender_clone
.send(ButtplugRemoteConnectorMessage::Message(msg))
.await
.map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
})
} else {
ButtplugConnectorError::ConnectorNotConnected.into()
}
}
}