use super::websocket_server_hardware::WebsocketServerHardwareConnector;
use buttplug_core::{ButtplugResultFuture, util::async_manager};
use buttplug_server::device::hardware::communication::{
HardwareCommunicationManager,
HardwareCommunicationManagerBuilder,
HardwareCommunicationManagerEvent,
};
use futures::{FutureExt, StreamExt};
use getset::{CopyGetters, Getters};
use serde::{Deserialize, Serialize};
use tokio::{net::TcpListener, select, sync::mpsc::Sender};
use tokio_util::sync::CancellationToken;
#[derive(Serialize, Deserialize, Debug, Clone, Getters, CopyGetters)]
pub struct WebsocketServerDeviceCommManagerInitInfo {
#[getset(get = "pub")]
identifier: String,
#[getset(get = "pub")]
address: String,
#[getset(get_copy = "pub")]
version: u32,
}
#[derive(Clone)]
pub struct WebsocketServerDeviceCommunicationManagerBuilder {
listen_on_all_interfaces: bool,
server_port: u16,
}
impl Default for WebsocketServerDeviceCommunicationManagerBuilder {
fn default() -> Self {
Self {
listen_on_all_interfaces: false,
server_port: 54817,
}
}
}
impl WebsocketServerDeviceCommunicationManagerBuilder {
pub fn listen_on_all_interfaces(mut self, should_listen: bool) -> Self {
self.listen_on_all_interfaces = should_listen;
self
}
pub fn server_port(mut self, port: u16) -> Self {
self.server_port = port;
self
}
}
impl HardwareCommunicationManagerBuilder for WebsocketServerDeviceCommunicationManagerBuilder {
fn finish(
&mut self,
sender: Sender<HardwareCommunicationManagerEvent>,
) -> Box<dyn HardwareCommunicationManager> {
Box::new(WebsocketServerDeviceCommunicationManager::new(
sender,
self.server_port,
self.listen_on_all_interfaces,
))
}
}
pub struct WebsocketServerDeviceCommunicationManager {
server_cancellation_token: CancellationToken,
}
impl WebsocketServerDeviceCommunicationManager {
fn new(
sender: Sender<HardwareCommunicationManagerEvent>,
port: u16,
listen_on_all_interfaces: bool,
) -> Self {
trace!("Websocket server port created.");
let server_cancellation_token = CancellationToken::new();
let child_token = server_cancellation_token.child_token();
async_manager::spawn(async move {
let base_addr = if listen_on_all_interfaces {
"0.0.0.0"
} else {
"127.0.0.1"
};
let addr = format!("{base_addr}:{port}");
debug!("Trying to listen on {}", addr);
debug!("Socket bound.");
let listener = match TcpListener::bind(&addr).await {
Ok(listener) => listener,
Err(err) => {
error!("Cannot bind websocket server to {}: {:?}.", addr, err);
return;
}
};
debug!("Listening on: {}", addr);
loop {
select! {
listener_result = listener.accept() => {
let stream = if let Ok((stream, _)) = listener_result {
stream
} else {
error!("Cannot bind websocket server comm manager to address {}.", addr);
return;
};
info!("Got connection");
let ws_fut = tokio_tungstenite::accept_async(stream);
let mut ws_stream = match ws_fut.await {
Ok(ws_stream) => ws_stream,
Err(err) => {
error!("Cannot accept socket: {}", err);
continue;
}
};
let sender_clone = sender.clone();
tokio::spawn(async move {
if let Some(Ok(tokio_tungstenite::tungstenite::Message::Text(info_message))) =
ws_stream.next().await
{
let info_packet: WebsocketServerDeviceCommManagerInitInfo =
if let Ok(packet) = serde_json::from_str(&info_message) {
packet
} else {
error!("Did not receive a valid JSON info packet as the first packet, disconnecting.");
if let Err(err) = ws_stream.close(None).await {
error!("Error closing connection: {}", err);
}
return;
};
if sender_clone
.send(HardwareCommunicationManagerEvent::DeviceFound {
name: format!("Websocket Device {}", info_packet.identifier),
address: info_packet.address.clone(),
creator: Box::new(WebsocketServerHardwareConnector::new(
info_packet,
ws_stream,
)),
})
.await
.is_err()
{
error!("Device manager disappeared, exiting.");
}
} else {
error!("Did not receive info message as first packet, dropping connection.");
}
});
},
_ = child_token.cancelled() => {
info!("Task token cancelled, assuming websocket server comm manager shutdown.");
break;
}
}
}
});
Self {
server_cancellation_token,
}
}
}
impl HardwareCommunicationManager for WebsocketServerDeviceCommunicationManager {
fn name(&self) -> &'static str {
"WebsocketServerCommunicationManager"
}
fn start_scanning(&mut self) -> ButtplugResultFuture {
debug!("Websocket server manager scanning for devices.");
async move { Ok(()) }.boxed()
}
fn stop_scanning(&mut self) -> ButtplugResultFuture {
async move { Ok(()) }.boxed()
}
fn can_scan(&self) -> bool {
true
}
}
impl Drop for WebsocketServerDeviceCommunicationManager {
fn drop(&mut self) {
self.server_cancellation_token.cancel();
}
}