use super::websocket_server_comm_manager::WebsocketServerDeviceCommManagerInitInfo;
use async_trait::async_trait;
use buttplug_core::{errors::ButtplugDeviceError, util::async_manager};
use buttplug_server::device::hardware::{
GenericHardwareSpecializer,
Hardware,
HardwareConnector,
HardwareEvent,
HardwareInternal,
HardwareReadCmd,
HardwareReading,
HardwareSpecializer,
HardwareSubscribeCmd,
HardwareUnsubscribeCmd,
HardwareWriteCmd,
};
use buttplug_server_device_config::{Endpoint, ProtocolCommunicationSpecifier, WebsocketSpecifier};
use futures::{
FutureExt,
SinkExt,
StreamExt,
future::{self, BoxFuture},
};
use std::{
fmt::{self, Debug},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use tokio::{
net::TcpStream,
select,
sync::{
Mutex,
broadcast,
mpsc::{Receiver, Sender, channel},
},
time::sleep,
};
use tokio_util::sync::CancellationToken;
async fn run_connection_loop(
address: &str,
event_sender: broadcast::Sender<HardwareEvent>,
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
mut request_receiver: Receiver<Vec<u8>>,
response_sender: broadcast::Sender<Vec<u8>>,
) {
info!("Starting websocket server connection event loop.");
let (mut websocket_server_sender, mut websocket_server_receiver) = ws_stream.split();
let mut pong_count = 1u32;
loop {
select! {
_ = sleep(Duration::from_millis(10000)) => {
if pong_count == 0 {
error!("No pongs received, considering connection closed.");
break;
}
pong_count = 0;
if websocket_server_sender
.send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0).into()))
.await
.is_err() {
error!("Cannot send ping to client, considering connection closed.");
break;
}
}
ws_msg = request_receiver.recv() => {
if let Some(binary_msg) = ws_msg {
if websocket_server_sender
.send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg.into()))
.await
.is_err() {
error!("Cannot send binary value to client, considering connection closed.");
break;
}
} else {
info!("Websocket server connector owner dropped, disconnecting websocket connection.");
break;
}
}
websocket_server_msg = websocket_server_receiver.next() => match websocket_server_msg {
Some(ws_data) => {
match ws_data {
Ok(msg) => {
match msg {
tokio_tungstenite::tungstenite::Message::Text(text_msg) => {
let _ = response_sender.send(text_msg.as_bytes().to_vec());
}
tokio_tungstenite::tungstenite::Message::Binary(binary_msg) => {
let _ = response_sender.send(binary_msg.to_vec());
}
tokio_tungstenite::tungstenite::Message::Close(_) => {
let _ = event_sender
.send(HardwareEvent::Disconnected(
address.to_owned()
));
break;
}
tokio_tungstenite::tungstenite::Message::Ping(_) => {
continue;
}
tokio_tungstenite::tungstenite::Message::Frame(_) => {
continue;
}
tokio_tungstenite::tungstenite::Message::Pong(_) => {
pong_count += 1;
continue;
}
}
},
Err(err) => {
error!("Error from websocket server, assuming disconnection: {:?}", err);
break;
}
}
},
None => {
error!("Websocket channel closed, breaking");
break;
}
}
}
}
if let Err(e) = websocket_server_sender.close().await {
error!("Error closing websocket: {}", e);
}
debug!("Exiting Websocket Server Device control loop.");
}
impl Debug for WebsocketServerHardwareConnector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WebsocketServerHardwareConnector")
.field("info", &self.info)
.finish()
}
}
pub struct WebsocketServerHardwareConnector {
info: WebsocketServerDeviceCommManagerInitInfo,
outgoing_sender: Sender<Vec<u8>>,
incoming_broadcaster: broadcast::Sender<Vec<u8>>,
device_event_sender: broadcast::Sender<HardwareEvent>,
}
impl WebsocketServerHardwareConnector {
pub fn new(
info: WebsocketServerDeviceCommManagerInitInfo,
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
) -> Self {
let (outgoing_sender, outgoing_receiver) = channel(256);
let (incoming_broadcaster, _) = broadcast::channel(256);
let incoming_broadcaster_clone = incoming_broadcaster.clone();
let (device_event_sender, _) = broadcast::channel(256);
let device_event_sender_clone = device_event_sender.clone();
let address = info.address().clone();
tokio::spawn(async move {
run_connection_loop(
&address,
device_event_sender_clone,
ws_stream,
outgoing_receiver,
incoming_broadcaster_clone,
)
.await;
});
Self {
info,
outgoing_sender,
incoming_broadcaster,
device_event_sender,
}
}
}
#[async_trait]
impl HardwareConnector for WebsocketServerHardwareConnector {
fn specifier(&self) -> ProtocolCommunicationSpecifier {
ProtocolCommunicationSpecifier::Websocket(WebsocketSpecifier::new(self.info.identifier()))
}
async fn connect(&mut self) -> Result<Box<dyn HardwareSpecializer>, ButtplugDeviceError> {
let hardware_internal = WebsocketServerHardware::new(
self.device_event_sender.clone(),
self.info.clone(),
self.outgoing_sender.clone(),
self.incoming_broadcaster.clone(),
);
let hardware = Hardware::new(
self.info.identifier(),
self.info.address(),
&[Endpoint::Rx, Endpoint::Tx],
&None,
false,
Box::new(hardware_internal),
);
Ok(Box::new(GenericHardwareSpecializer::new(hardware)))
}
}
pub struct WebsocketServerHardware {
connected: Arc<AtomicBool>,
subscribed: Arc<AtomicBool>,
subscribe_token: Arc<Mutex<Option<CancellationToken>>>,
info: WebsocketServerDeviceCommManagerInitInfo,
outgoing_sender: Sender<Vec<u8>>,
incoming_broadcaster: broadcast::Sender<Vec<u8>>,
device_event_sender: broadcast::Sender<HardwareEvent>,
}
impl WebsocketServerHardware {
pub fn new(
device_event_sender: broadcast::Sender<HardwareEvent>,
info: WebsocketServerDeviceCommManagerInitInfo,
outgoing_sender: Sender<Vec<u8>>,
incoming_broadcaster: broadcast::Sender<Vec<u8>>,
) -> Self {
Self {
connected: Arc::new(AtomicBool::new(true)),
info,
outgoing_sender,
incoming_broadcaster,
device_event_sender,
subscribed: Arc::new(AtomicBool::new(false)),
subscribe_token: Arc::new(Mutex::new(None)),
}
}
}
impl HardwareInternal for WebsocketServerHardware {
fn event_stream(&self) -> broadcast::Receiver<HardwareEvent> {
self.device_event_sender.subscribe()
}
fn disconnect(&self) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
let connected = self.connected.clone();
async move {
connected.store(false, Ordering::Relaxed);
Ok(())
}
.boxed()
}
fn read_value(
&self,
_msg: &HardwareReadCmd,
) -> BoxFuture<'static, Result<HardwareReading, ButtplugDeviceError>> {
future::ready(Err(ButtplugDeviceError::UnhandledCommand(
"Websocket Hardware does not support read".to_owned(),
)))
.boxed()
}
fn write_value(
&self,
msg: &HardwareWriteCmd,
) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
let sender = self.outgoing_sender.clone();
let data = msg.data().clone();
async move {
sender.send(data).await.map_err(|err| {
ButtplugDeviceError::DeviceCommunicationError(format!(
"Could not write value to websocket device: {err}"
))
})
}
.boxed()
}
fn subscribe(
&self,
_msg: &HardwareSubscribeCmd,
) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
if self.subscribed.load(Ordering::Relaxed) {
error!("Endpoint already subscribed somehow!");
return future::ready(Ok(())).boxed();
}
let mut data_receiver = self.incoming_broadcaster.subscribe();
let event_sender = self.device_event_sender.clone();
let address = self.info.address().clone();
let subscribed = self.subscribed.clone();
let subscribed_token = self.subscribe_token.clone();
async move {
subscribed.store(true, Ordering::Relaxed);
let token = CancellationToken::new();
*(subscribed_token.lock().await) = Some(token.child_token());
async_manager::spawn(async move {
loop {
select! {
result = data_receiver.recv().fuse() => {
match result {
Ok(data) => {
debug!("Got websocket data! {:?}", data);
let _ = event_sender
.send(HardwareEvent::Notification(
address.clone(),
Endpoint::Tx,
data,
));
},
Err(_) => break,
}
},
_ = token.cancelled().fuse() => {
break;
}
}
}
info!("Data channel closed, ending websocket server device listener task");
});
Ok(())
}
.boxed()
}
fn unsubscribe(
&self,
_msg: &HardwareUnsubscribeCmd,
) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {
if self.subscribed.load(Ordering::Relaxed) {
let subscribed = self.subscribed.clone();
let subscribed_token = self.subscribe_token.clone();
async move {
subscribed.store(false, Ordering::Relaxed);
let token = (subscribed_token.lock().await)
.take()
.expect("If we were subscribed, we'll have a token.");
token.cancel();
Ok(())
}
.boxed()
} else {
future::ready(Err(ButtplugDeviceError::DeviceCommunicationError(
"Device not subscribed.".to_owned(),
)))
.boxed()
}
}
}