use super::{
internal::{
ButtplugClientFuture, ButtplugClientFutureState, ButtplugClientFutureStateShared,
ButtplugClientMessageStateShared,
},
messagesorter::ClientConnectorMessageSorter,
};
use crate::{core::messages::ButtplugMessageUnion, server::ButtplugServer};
use async_std::sync::{channel, Receiver, Sender};
use async_trait::async_trait;
use futures::{future::Future, select, FutureExt, StreamExt};
use std::{error::Error, fmt};
pub type ButtplugClientConnectionState =
ButtplugClientFutureState<Option<ButtplugClientConnectorError>>;
pub type ButtplugClientConnectionStateShared =
ButtplugClientFutureStateShared<Option<ButtplugClientConnectorError>>;
pub type ButtplugClientConnectionFuture =
ButtplugClientFuture<Option<ButtplugClientConnectorError>>;
#[derive(Debug, Clone)]
pub struct ButtplugClientConnectorError {
pub message: String,
}
impl ButtplugClientConnectorError {
pub fn new(msg: &str) -> Self {
Self {
message: msg.to_owned(),
}
}
}
impl fmt::Display for ButtplugClientConnectorError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Init Error: {}", self.message)
}
}
impl Error for ButtplugClientConnectorError {
fn description(&self) -> &str {
self.message.as_str()
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
#[async_trait]
pub trait ButtplugClientConnector: Send {
async fn connect(&mut self) -> Option<ButtplugClientConnectorError>;
fn disconnect(&mut self) -> Option<ButtplugClientConnectorError>;
async fn send(&mut self, msg: &ButtplugMessageUnion, state: &ButtplugClientMessageStateShared);
fn get_event_receiver(&mut self) -> Receiver<ButtplugMessageUnion>;
}
pub struct ButtplugEmbeddedClientConnector {
server: ButtplugServer,
recv: Option<Receiver<ButtplugMessageUnion>>,
}
impl ButtplugEmbeddedClientConnector {
pub fn new(name: &str, max_ping_time: u32) -> Self {
let (send, recv) = channel(256);
Self {
recv: Some(recv),
server: ButtplugServer::new(&name, max_ping_time, send),
}
}
}
#[async_trait]
impl ButtplugClientConnector for ButtplugEmbeddedClientConnector {
async fn connect(&mut self) -> Option<ButtplugClientConnectorError> {
None
}
fn disconnect(&mut self) -> Option<ButtplugClientConnectorError> {
None
}
async fn send(&mut self, msg: &ButtplugMessageUnion, state: &ButtplugClientMessageStateShared) {
let ret_msg = self.server.send_message(msg).await;
let mut waker_state = state.lock().unwrap();
waker_state.set_reply_msg(&(ret_msg.unwrap()));
}
fn get_event_receiver(&mut self) -> Receiver<ButtplugMessageUnion> {
self.recv.take().unwrap()
}
}
pub trait ButtplugRemoteClientConnectorSender: Sync + Send {
fn send(&self, msg: ButtplugMessageUnion);
fn close(&self);
}
pub enum ButtplugRemoteClientConnectorMessage {
Sender(Box<dyn ButtplugRemoteClientConnectorSender>),
Connected(),
Text(String),
Error(String),
Close(String),
}
pub struct ButtplugRemoteClientConnectorHelper {
internal_send: Sender<(ButtplugMessageUnion, ButtplugClientMessageStateShared)>,
internal_recv: Option<Receiver<(ButtplugMessageUnion, ButtplugClientMessageStateShared)>>,
remote_send: Sender<ButtplugRemoteClientConnectorMessage>,
remote_recv: Option<Receiver<ButtplugRemoteClientConnectorMessage>>,
event_send: Sender<ButtplugMessageUnion>,
}
unsafe impl Send for ButtplugRemoteClientConnectorHelper {}
unsafe impl Sync for ButtplugRemoteClientConnectorHelper {}
impl ButtplugRemoteClientConnectorHelper {
pub fn new(event_sender: Sender<ButtplugMessageUnion>) -> Self {
let (internal_send, internal_recv) = channel(256);
let (remote_send, remote_recv) = channel(256);
Self {
event_send: event_sender,
remote_send,
remote_recv: Some(remote_recv),
internal_send,
internal_recv: Some(internal_recv),
}
}
pub fn get_remote_send(&self) -> Sender<ButtplugRemoteClientConnectorMessage> {
self.remote_send.clone()
}
pub async fn send(
&mut self,
msg: &ButtplugMessageUnion,
state: &ButtplugClientMessageStateShared,
) {
self.internal_send.send((msg.clone(), state.clone())).await;
}
pub fn get_recv_future(&mut self) -> impl Future {
let event_send = self.event_send.clone();
let mut remote_recv = self.remote_recv.take().unwrap();
let mut internal_recv = self.internal_recv.take().unwrap();
async move {
let mut sorter = ClientConnectorMessageSorter::new();
let mut remote_send: Option<Box<dyn ButtplugRemoteClientConnectorSender>> = None;
enum StreamValue {
NoValue,
Incoming(ButtplugRemoteClientConnectorMessage),
Outgoing((ButtplugMessageUnion, ButtplugClientMessageStateShared)),
}
loop {
let mut incoming_stream = remote_recv.next().fuse();
let mut outgoing_stream = internal_recv.next().fuse();
let mut stream_return: StreamValue = select! {
a = incoming_stream => {
info!("Got connector message!");
match a {
Some(msg) => StreamValue::Incoming(msg),
None => StreamValue::NoValue,
}
},
b = outgoing_stream => {
info!("Got client message!");
match b {
Some(msg) => StreamValue::Outgoing(msg),
None => StreamValue::NoValue,
}
},
};
match stream_return {
StreamValue::NoValue => break,
StreamValue::Incoming(remote_msg) => {
match remote_msg {
ButtplugRemoteClientConnectorMessage::Sender(_s) => {
remote_send = Some(_s);
}
ButtplugRemoteClientConnectorMessage::Text(_t) => {
let array: Vec<ButtplugMessageUnion> =
serde_json::from_str(&_t.clone()).unwrap();
for smsg in array {
if !sorter.maybe_resolve_message(&smsg) {
info!("Sending event!");
event_send.send(smsg).await;
}
}
}
_ => {
panic!("UNHANDLED BRANCH");
}
}
}
StreamValue::Outgoing(ref mut buttplug_fut_msg) => {
sorter.register_future(&mut buttplug_fut_msg.0, &buttplug_fut_msg.1);
if let Some(ref mut remote_sender) = remote_send {
remote_sender.send(buttplug_fut_msg.0.clone());
} else {
panic!("Can't send message yet!");
}
}
}
}
}
}
}