use crate::{
connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
core::{
errors::ButtplugError,
messages::{ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage},
},
server::{ButtplugServer, ButtplugServerOptions},
util::async_manager,
};
use futures::{
future::{self, BoxFuture},
StreamExt,
};
use std::{
convert::TryInto,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::mpsc::{channel, Sender};
use tracing_futures::Instrument;
#[cfg(feature = "server")]
pub struct ButtplugInProcessClientConnector {
server: Arc<ButtplugServer>,
server_outbound_sender: Sender<ButtplugCurrentSpecServerMessage>,
connected: Arc<AtomicBool>,
}
#[cfg(feature = "server")]
impl<'a> Default for ButtplugInProcessClientConnector {
fn default() -> Self {
ButtplugInProcessClientConnector::new_with_options(&ButtplugServerOptions::default()).unwrap()
}
}
#[cfg(feature = "server")]
impl<'a> ButtplugInProcessClientConnector {
pub fn new_with_options(options: &ButtplugServerOptions) -> Result<Self, ButtplugError> {
let server = Arc::new(ButtplugServer::new_with_options(options)?);
let (server_outbound_sender, _) = channel(256);
Ok(Self {
server_outbound_sender,
server,
connected: Arc::new(AtomicBool::new(false)),
})
}
pub fn server_ref(&'a self) -> &'a ButtplugServer {
&self.server
}
}
#[cfg(feature = "server")]
impl ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
for ButtplugInProcessClientConnector
{
fn connect(
&mut self,
message_sender: Sender<ButtplugCurrentSpecServerMessage>,
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
if !self.connected.load(Ordering::SeqCst) {
let connected = self.connected.clone();
let send = message_sender.clone();
self.server_outbound_sender = message_sender;
let server_recv = self.server.event_stream();
Box::pin(async move {
async_manager::spawn(async move {
info!("Starting In Process Client Connector Event Sender Loop");
pin_mut!(server_recv);
while let Some(event) = server_recv.next().await {
if send.send(event.try_into().unwrap()).await.is_err() {
break;
}
}
info!("Stopping In Process Client Connector Event Sender Loop, due to channel receiver being dropped.");
}.instrument(tracing::info_span!("InProcessClientConnectorEventSenderLoop"))).unwrap();
connected.store(true, Ordering::SeqCst);
Ok(())
})
} else {
ButtplugConnectorError::ConnectorAlreadyConnected.into()
}
}
fn disconnect(&self) -> ButtplugConnectorResultFuture {
if self.connected.load(Ordering::SeqCst) {
self.connected.store(false, Ordering::SeqCst);
Box::pin(future::ready(Ok(())))
} else {
ButtplugConnectorError::ConnectorNotConnected.into()
}
}
fn send(&self, msg: ButtplugCurrentSpecClientMessage) -> ButtplugConnectorResultFuture {
if !self.connected.load(Ordering::SeqCst) {
return ButtplugConnectorError::ConnectorNotConnected.into();
}
let input = msg.try_into().unwrap();
let output_fut = self.server.parse_message(input);
let sender = self.server_outbound_sender.clone();
Box::pin(async move {
let output: ButtplugCurrentSpecServerMessage = output_fut
.await
.unwrap_or_else(|e| e.into())
.try_into()
.unwrap();
sender
.send(output)
.await
.map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
})
}
}