1use buttplug_core::{
8 connector::ButtplugConnector,
9 errors::{ButtplugError, ButtplugHandshakeError},
10 message::{ButtplugMessageSpecVersion, ButtplugServerMessageV4},
11 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
12};
13use buttplug_server::{
14 ButtplugServer, ButtplugServerBuilder,
15 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
16};
17use buttplug_server_device_config::UserDeviceIdentifier;
18use dashmap::DashSet;
19use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
20use getset::Getters;
21use serde::{Deserialize, Serialize};
22use std::sync::Arc;
23use thiserror::Error;
24use tokio::sync::{
25 Notify,
26 broadcast::{self, Sender},
27 mpsc,
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
32pub enum ButtplugRemoteServerEvent {
33 ClientConnected(String),
34 ClientDisconnected,
35 DeviceAdded {
36 index: u32,
37 identifier: UserDeviceIdentifier,
38 name: String,
39 display_name: Option<String>,
40 needs_keepalive: bool,
41 },
42 DeviceRemoved {
43 index: u32,
44 },
45 }
47
48#[derive(Error, Debug)]
49pub enum ButtplugServerConnectorError {
50 #[error("Cannot bring up server for connection: {0}")]
51 ConnectorError(String),
52}
53
54#[derive(Getters)]
55pub struct ButtplugRemoteServer {
56 #[getset(get = "pub")]
57 server: Arc<ButtplugServer>,
58 #[getset(get = "pub")]
59 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
60 disconnect_notifier: Arc<Notify>,
61}
62
63async fn run_device_event_stream(
64 server: Arc<ButtplugServer>,
65 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
66) {
67 let server_receiver = server.server_version_event_stream();
68 let known_indexes = DashSet::<u32>::default();
69
70 pin_mut!(server_receiver);
71 loop {
72 match server_receiver.next().await {
73 None => {
74 info!("Server disconnected via server disappearance, exiting loop.");
75 break;
76 }
77 Some(msg) => {
78 if let ButtplugServerMessageV4::DeviceList(dl) = msg
79 && remote_event_sender.receiver_count() > 0
80 {
81 for da in dl.devices() {
82 if known_indexes.contains(&da.1.device_index()) {
83 continue;
84 }
85 if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
86 let added_event = ButtplugRemoteServerEvent::DeviceAdded {
87 index: da.1.device_index(),
88 name: da.1.device_name().clone(),
89 identifier: device_info.identifier().clone(),
90 display_name: device_info.display_name().clone(),
91 needs_keepalive: *device_info.needs_keepalive(),
92 };
93 if remote_event_sender.send(added_event).is_err() {
94 error!(
95 "Cannot send event to owner, dropping and assuming local server thread has exited."
96 );
97 }
98 known_indexes.insert(da.1.device_index());
99 }
100 }
101 let indexes = known_indexes.clone();
102 let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
103 for dr in indexes {
104 if current_indexes.contains(&dr) {
105 continue;
106 }
107 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
108 if remote_event_sender.send(removed_event).is_err() {
109 error!(
110 "Cannot send event to owner, dropping and assuming local server thread has exited."
111 );
112 }
113 known_indexes.remove(&dr);
114 }
115 }
116 }
117 }
118 }
119}
120
121async fn run_server<ConnectorType>(
122 server: Arc<ButtplugServer>,
123 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
124 connector: ConnectorType,
125 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
126 disconnect_notifier: Arc<Notify>,
127) where
128 ConnectorType:
129 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
130{
131 info!("Starting remote server loop");
132 let shared_connector = Arc::new(connector);
133 let server_receiver = server.server_version_event_stream();
134 let client_version_receiver = server.event_stream();
135 pin_mut!(server_receiver);
136 pin_mut!(client_version_receiver);
137 loop {
138 select! {
139 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
140 None => {
141 info!("Connector disconnected, exiting loop.");
142 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
143 warn!("Cannot update remote about client disconnection");
144 }
145 break;
146 }
147 Some(client_message) => {
148 trace!("Got message from connector: {:?}", client_message);
149 let server_clone = server.clone();
150 let connected = server_clone.connected();
151 let connector_clone = shared_connector.clone();
152 let remote_event_sender_clone = remote_event_sender.clone();
153 let disconnect_notifier = disconnect_notifier.clone();
154 async_manager::spawn(async move {
155 match server_clone.parse_message(client_message.clone()).await {
156 Ok(ret_msg) => {
157 if !connected && server_clone.connected() {
159 if remote_event_sender_clone.receiver_count() > 0
160 && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
161 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
162 }
163 }
164 if connector_clone.send(ret_msg).await.is_err() {
165 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
166 }
167 },
168 Err(err_msg) => {
169 if connector_clone.send(err_msg).await.is_err() {
170 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
171 }
172 }
173 }
174 });
175 }
176 },
177 _ = disconnect_notifier.notified().fuse() => {
178 info!("Server disconnected via controller disappearance, exiting loop.");
179 break;
180 },
181 server_msg = server_receiver.next().fuse() => match server_msg {
182 None => {
183 info!("Server disconnected via server disappearance, exiting loop.");
184 break;
185 }
186 Some(msg) => {
187 }
209 },
210 client_msg = client_version_receiver.next().fuse() => match client_msg {
211 None => {
212 info!("Server disconnected via server disappearance, exiting loop.");
213 break;
214 }
215 Some(msg) => {
216 let connector_clone = shared_connector.clone();
217 if connector_clone.send(msg).await.is_err() {
218 error!("Server disappeared, exiting remote server thread.");
219 }
220 }
221 }
222 };
223 }
224 if let Err(err) = server.disconnect().await {
225 error!("Error disconnecting server: {:?}", err);
226 }
227 info!("Exiting remote server loop");
228}
229
230impl Default for ButtplugRemoteServer {
231 fn default() -> Self {
232 Self::new(
233 ButtplugServerBuilder::default()
234 .finish()
235 .expect("Default is infallible"),
236 &None,
237 )
238 }
239}
240
241impl ButtplugRemoteServer {
242 pub fn new(
243 server: ButtplugServer,
244 event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
245 ) -> Self {
246 let event_sender = if let Some(sender) = event_sender {
247 sender.clone()
248 } else {
249 broadcast::channel(256).0
250 };
251 let server = Arc::new(server);
255 {
256 let server = server.clone();
257 tokio::spawn({
258 let server = server;
259 let event_sender = event_sender.clone();
260 async move {
261 run_device_event_stream(server, event_sender).await;
262 }
263 });
264 }
265 Self {
266 event_sender,
267 server,
268 disconnect_notifier: Arc::new(Notify::new()),
269 }
270 }
271
272 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
273 convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
274 }
275
276 pub fn start<ConnectorType>(
277 &self,
278 mut connector: ConnectorType,
279 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
280 where
281 ConnectorType:
282 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
283 {
284 let server = self.server.clone();
285 let event_sender = self.event_sender.clone();
286 let disconnect_notifier = self.disconnect_notifier.clone();
287 async move {
288 let (connector_sender, connector_receiver) = mpsc::channel(256);
289 connector
293 .connect(connector_sender)
294 .await
295 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
296 run_server(
297 server,
298 event_sender,
299 connector,
300 connector_receiver,
301 disconnect_notifier,
302 )
303 .await;
304 Ok(())
305 }
306 }
307
308 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
309 self.disconnect_notifier.notify_waiters();
310 Ok(())
311 }
312
313 pub async fn shutdown(&self) -> Result<(), ButtplugError> {
314 self.server.shutdown().await?;
315 Ok(())
316 }
317}
318
319impl Drop for ButtplugRemoteServer {
320 fn drop(&mut self) {
321 self.disconnect_notifier.notify_waiters();
322 }
323}