use crate::{
connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
core::{
errors::{ButtplugError, ButtplugMessageError, ButtplugServerError},
messages::{
ButtplugCurrentSpecClientMessage,
ButtplugCurrentSpecServerMessage,
ButtplugMessage,
},
},
server::{ButtplugServer, ButtplugServerOptions},
util::async_manager,
};
use async_channel::{bounded, Receiver, Sender};
use futures::{
future::{self, BoxFuture},
StreamExt,
};
use std::convert::TryInto;
use tracing_futures::Instrument;
#[cfg(feature = "server")]
pub struct ButtplugInProcessClientConnector {
server: ButtplugServer,
server_outbound_sender: Sender<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
connector_outbound_recv:
Option<Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>>,
}
#[cfg(feature = "server")]
impl<'a> Default for ButtplugInProcessClientConnector {
fn default() -> Self {
ButtplugInProcessClientConnector::new_with_options(&ButtplugServerOptions::default()).unwrap()
}
}
#[cfg(feature = "server")]
impl<'a> ButtplugInProcessClientConnector {
pub fn new_with_options(options: &ButtplugServerOptions) -> Result<Self, ButtplugError> {
let (server, mut server_recv) = ButtplugServer::new_with_options(options)?;
let (send, recv) = bounded(256);
let server_outbound_sender = send.clone();
async_manager::spawn(async move {
info!("Starting In Process Client Connector Event Sender Loop");
while let Some(event) = server_recv.next().await {
if send.send(Ok(event.try_into().unwrap())).await.is_err() {
break;
}
}
info!("Stopping In Process Client Connector Event Sender Loop, due to channel receiver being dropped.");
}.instrument(tracing::info_span!("InProcessClientConnectorEventSenderLoop"))).unwrap();
Ok(Self {
connector_outbound_recv: Some(recv),
server_outbound_sender,
server,
})
}
pub fn server_ref(&'a self) -> &'a ButtplugServer {
&self.server
}
}
#[cfg(feature = "server")]
impl ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
for ButtplugInProcessClientConnector
{
fn connect(
&mut self,
) -> BoxFuture<
'static,
Result<
Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
ButtplugConnectorError,
>,
> {
if self.connector_outbound_recv.is_some() {
let recv = self.connector_outbound_recv.take().unwrap();
Box::pin(future::ready(Ok(recv)))
} else {
ButtplugConnectorError::ConnectorAlreadyConnected.into()
}
}
fn disconnect(&self) -> ButtplugConnectorResultFuture {
Box::pin(future::ready(Ok(())))
}
fn send(&self, msg: ButtplugCurrentSpecClientMessage) -> ButtplugConnectorResultFuture {
let out_id = msg.get_id();
let input = msg.try_into().unwrap();
let output_fut = self.server.parse_message(input);
let sender = self.server_outbound_sender.clone();
Box::pin(async move {
let output = output_fut.await.and_then(|msg| {
msg.try_into().map_err(|_| {
ButtplugServerError::new_message_error(
out_id,
ButtplugMessageError::MessageConversionError(
"Cannot convert server message to client spec.",
)
.into(),
)
})
});
sender
.send(output)
.await
.map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
})
}
}