use crate::{
connector::{
transport::{
ButtplugConnectorTransport,
ButtplugConnectorTransportSpecificError,
ButtplugTransportIncomingMessage,
},
ButtplugConnectorError,
ButtplugConnectorResultFuture,
},
core::messages::serializer::ButtplugSerializedMessage,
util::async_manager,
};
use async_tungstenite::{
async_std::connect_async_with_tls_connector,
tungstenite::protocol::Message,
};
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
use std::sync::Arc;
use tokio::sync::{
mpsc::{Receiver, Sender},
Notify,
};
use tracing::Instrument;
pub struct ButtplugWebsocketClientTransport {
address: String,
should_use_tls: bool,
bypass_cert_verify: bool,
disconnect_notifier: Arc<Notify>,
}
impl ButtplugWebsocketClientTransport {
fn create(address: &str, should_use_tls: bool, bypass_cert_verify: bool) -> Self {
Self {
should_use_tls,
address: address.to_owned(),
bypass_cert_verify,
disconnect_notifier: Arc::new(Notify::new()),
}
}
pub fn new_insecure_connector(address: &str) -> Self {
ButtplugWebsocketClientTransport::create(address, false, false)
}
pub fn new_secure_connector(address: &str, bypass_cert_verify: bool) -> Self {
ButtplugWebsocketClientTransport::create(address, true, bypass_cert_verify)
}
}
impl ButtplugConnectorTransport for ButtplugWebsocketClientTransport {
fn connect(
&self,
mut outgoing_receiver: Receiver<ButtplugSerializedMessage>,
incoming_sender: Sender<ButtplugTransportIncomingMessage>,
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
let disconnect_notifier = self.disconnect_notifier.clone();
let tls_connector = if self.should_use_tls {
use async_tls::TlsConnector;
if self.bypass_cert_verify {
use rustls::ClientConfig;
pub struct NoCertificateVerification {}
impl rustls::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef<'_>,
_ocsp: &[u8],
) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
}
}
let mut config = ClientConfig::new();
config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification {}));
Some(TlsConnector::from(Arc::new(config)))
} else {
Some(TlsConnector::new())
}
} else {
None
};
let address = self.address.clone();
Box::pin(async move {
match connect_async_with_tls_connector(&address, tls_connector).await {
Ok((stream, _)) => {
let (mut writer, mut reader) = stream.split();
async_manager::spawn(
async move {
loop {
select! {
msg = outgoing_receiver.recv().fuse() => {
if let Some(msg) = msg {
let out_msg = match msg {
ButtplugSerializedMessage::Text(text) => Message::Text(text),
ButtplugSerializedMessage::Binary(bin) => Message::Binary(bin),
};
writer.send(out_msg).await.expect("This should never fail?");
} else {
info!("Connector holding websocket dropped, returning");
return;
}
},
_ = disconnect_notifier.notified().fuse() => {
info!("Websocket requested to disconnect.");
writer.close().await.unwrap_or_else(|err| error!("{}", err));
return;
}
}
}
}
.instrument(tracing::info_span!("Websocket Send Task")),
)
.unwrap();
async_manager::spawn(
async move {
while let Some(response) = reader.next().await {
trace!("Websocket receiving: {:?}", response);
match response {
Ok(msg) => match msg {
Message::Text(t) => {
if incoming_sender
.send(ButtplugTransportIncomingMessage::Message(
ButtplugSerializedMessage::Text(t.to_string()),
))
.await
.is_err()
{
error!("Websocket holder has closed, exiting websocket loop.");
return;
}
}
Message::Binary(v) => {
if incoming_sender
.send(ButtplugTransportIncomingMessage::Message(
ButtplugSerializedMessage::Binary(v),
))
.await
.is_err()
{
error!("Websocket holder has closed, exiting websocket loop.");
return;
}
}
Message::Ping(_) => {}
Message::Pong(_) => {}
Message::Close(_) => {
info!("Websocket has requested close.");
return;
}
},
Err(err) => {
error!(
"Error in websocket client loop (assuming disconnect): {}",
err
);
break;
}
}
}
}
.instrument(tracing::info_span!("Websocket Receive Task")),
)
.unwrap();
Ok(())
}
Err(websocket_error) => Err(ButtplugConnectorError::TransportSpecificError(
ButtplugConnectorTransportSpecificError::TungsteniteError(websocket_error),
)),
}
})
}
fn disconnect(self) -> ButtplugConnectorResultFuture {
let disconnect_notifier = self.disconnect_notifier;
Box::pin(async move {
disconnect_notifier.notify_waiters();
Ok(())
})
}
}