use super::connector::{
ButtplugClientConnectionStateShared, ButtplugClientConnector, ButtplugClientConnectorError,
};
use crate::core::messages::ButtplugMessageUnion;
use async_std::{
future::{select, Future},
sync::{Receiver, Sender},
task::{Context, Poll, Waker},
};
use core::pin::Pin;
use futures::StreamExt;
use futures_util::future::FutureExt;
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub struct ButtplugClientFutureState<T> {
reply_msg: Option<T>,
waker: Option<Waker>,
}
impl<T> Default for ButtplugClientFutureState<T> {
fn default() -> Self {
ButtplugClientFutureState::<T> {
reply_msg: None,
waker: None,
}
}
}
impl<T> ButtplugClientFutureState<T> {
pub fn set_reply_msg(&mut self, msg: &T)
where
T: Clone,
{
self.reply_msg = Some(msg.clone());
let waker = self.waker.take();
if let Some(wake) = waker {
wake.wake();
}
}
}
pub type ButtplugClientFutureStateShared<T> = Arc<Mutex<ButtplugClientFutureState<T>>>;
#[derive(Debug)]
pub struct ButtplugClientFuture<T> {
waker_state: ButtplugClientFutureStateShared<T>,
}
impl<T> Default for ButtplugClientFuture<T> {
fn default() -> Self {
ButtplugClientFuture::<T> {
waker_state: ButtplugClientFutureStateShared::<T>::default(),
}
}
}
impl<T> ButtplugClientFuture<T> {
pub fn get_state_clone(&self) -> ButtplugClientFutureStateShared<T> {
self.waker_state.clone()
}
}
impl<T> Future for ButtplugClientFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut waker_state = self.waker_state.lock().unwrap();
if waker_state.reply_msg.is_some() {
let msg = waker_state.reply_msg.take().unwrap();
Poll::Ready(msg)
} else {
debug!("Waker set.");
waker_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
pub type ButtplugClientMessageState = ButtplugClientFutureState<ButtplugMessageUnion>;
pub type ButtplugClientMessageStateShared = ButtplugClientFutureStateShared<ButtplugMessageUnion>;
pub type ButtplugClientMessageFuture = ButtplugClientFuture<ButtplugMessageUnion>;
pub struct ButtplugClientInternalLoop {
connector: Option<Box<dyn ButtplugClientConnector>>,
client_receiver: Receiver<ButtplugInternalClientMessage>,
event_sender: Sender<ButtplugMessageUnion>,
}
unsafe impl Send for ButtplugClientInternalLoop {}
unsafe impl Sync for ButtplugClientInternalLoop {}
pub enum ButtplugInternalClientMessage {
Connect(
Box<dyn ButtplugClientConnector>,
ButtplugClientConnectionStateShared,
),
Disconnect,
Message((ButtplugMessageUnion, ButtplugClientMessageStateShared)),
}
enum StreamReturn {
ConnectorMessage(ButtplugMessageUnion),
ClientMessage(ButtplugInternalClientMessage),
Disconnect,
}
impl ButtplugClientInternalLoop {
pub fn new(
event_sender: Sender<ButtplugMessageUnion>,
client_receiver: Receiver<ButtplugInternalClientMessage>,
) -> Self {
Self {
connector: None,
client_receiver,
event_sender,
}
}
async fn wait_for_connector(&mut self) -> Option<Receiver<ButtplugMessageUnion>> {
match self.client_receiver.next().await {
None => {
debug!("Client disconnected.");
None
}
Some(msg) => match msg {
ButtplugInternalClientMessage::Connect(mut connector, state) => {
match connector.connect().await {
Some(_s) => {
error!("Cannot connect to server.");
let mut waker_state = state.lock().unwrap();
let reply = Some(ButtplugClientConnectorError::new(
"Cannot connect to server.",
));
waker_state.set_reply_msg(&reply);
None
}
None => {
info!("Connected!");
let mut waker_state = state.lock().unwrap();
waker_state.set_reply_msg(&None);
let recv = connector.get_event_receiver();
self.connector = Option::Some(connector);
Some(recv)
}
}
}
_ => {
error!("Received non-connector message before connector message.");
None
}
},
}
}
pub async fn event_loop(&mut self) {
info!("Starting client event loop.");
let mut connector_receiver;
match self.wait_for_connector().await {
None => return,
Some(recv) => connector_receiver = recv,
}
loop {
let client_future = self.client_receiver.next().map(|x| match x {
None => {
debug!("Client disconnected.");
StreamReturn::Disconnect
}
Some(msg) => StreamReturn::ClientMessage(msg),
});
let event_future = connector_receiver.next().map(|x| match x {
None => {
debug!("Connector disconnected.");
StreamReturn::Disconnect
}
Some(msg) => StreamReturn::ConnectorMessage(msg),
});
let stream_ret = select!(event_future, client_future).await;
match stream_ret {
StreamReturn::ConnectorMessage(_msg) => {
info!("Sending message to clients.");
self.event_sender.send(_msg.clone()).await;
}
StreamReturn::ClientMessage(_msg) => {
debug!("Parsing a client message.");
match _msg {
ButtplugInternalClientMessage::Message(_msg_fut) => {
debug!("Sending message through connector.");
if let Some(ref mut connector) = self.connector {
connector.send(&_msg_fut.0, &_msg_fut.1).await;
}
}
ButtplugInternalClientMessage::Disconnect => {
info!("Client requested disconnect");
break;
}
_ => panic!("Message not handled!"),
}
}
StreamReturn::Disconnect => {
info!("Disconnected!");
break;
}
}
}
info!("Exiting client event loop");
}
}