1use buttplug_core::{
8 connector::ButtplugConnector,
9 errors::{ButtplugError, ButtplugHandshakeError},
10 message::{ButtplugMessageSpecVersion, ButtplugServerMessageV4},
11 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
12};
13use buttplug_server::{
14 ButtplugServer, ButtplugServerBuilder,
15 message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
16};
17use buttplug_server_device_config::UserDeviceIdentifier;
18use dashmap::DashSet;
19use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
20use getset::Getters;
21use serde::{Deserialize, Serialize};
22use std::sync::Arc;
23use thiserror::Error;
24use tokio::sync::{
25 Notify,
26 broadcast::{self, Sender},
27 mpsc,
28};
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
32pub enum ButtplugRemoteServerEvent {
33 ClientConnected(String),
34 ClientDisconnected,
35 DeviceAdded {
36 index: u32,
37 identifier: UserDeviceIdentifier,
38 name: String,
39 display_name: Option<String>,
40 },
41 DeviceRemoved {
42 index: u32,
43 },
44 }
46
47#[derive(Error, Debug)]
48pub enum ButtplugServerConnectorError {
49 #[error("Cannot bring up server for connection: {0}")]
50 ConnectorError(String),
51}
52
53#[derive(Getters)]
54pub struct ButtplugRemoteServer {
55 #[getset(get = "pub")]
56 server: Arc<ButtplugServer>,
57 #[getset(get = "pub")]
58 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
59 disconnect_notifier: Arc<Notify>,
60}
61
62async fn run_device_event_stream(
63 server: Arc<ButtplugServer>,
64 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
65) {
66 let server_receiver = server.server_version_event_stream();
67 let known_indexes = DashSet::<u32>::default();
68
69 pin_mut!(server_receiver);
70 loop {
71 match server_receiver.next().await {
72 None => {
73 info!("Server disconnected via server disappearance, exiting loop.");
74 break;
75 }
76 Some(msg) => {
77 if let ButtplugServerMessageV4::DeviceList(dl) = msg
78 && remote_event_sender.receiver_count() > 0
79 {
80 for da in dl.devices() {
81 if known_indexes.contains(&da.1.device_index()) {
82 continue;
83 }
84 if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
85 let added_event = ButtplugRemoteServerEvent::DeviceAdded {
86 index: da.1.device_index(),
87 name: da.1.device_name().clone(),
88 identifier: device_info.identifier().clone(),
89 display_name: device_info.display_name().clone(),
90 };
91 if remote_event_sender.send(added_event).is_err() {
92 error!(
93 "Cannot send event to owner, dropping and assuming local server thread has exited."
94 );
95 }
96 known_indexes.insert(da.1.device_index());
97 }
98 }
99 let indexes = known_indexes.clone();
100 let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
101 for dr in indexes {
102 if current_indexes.contains(&dr) {
103 continue;
104 }
105 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
106 if remote_event_sender.send(removed_event).is_err() {
107 error!(
108 "Cannot send event to owner, dropping and assuming local server thread has exited."
109 );
110 }
111 known_indexes.remove(&dr);
112 }
113 }
114 }
115 }
116 }
117}
118
119async fn run_server<ConnectorType>(
120 server: Arc<ButtplugServer>,
121 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
122 connector: ConnectorType,
123 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
124 disconnect_notifier: Arc<Notify>,
125) where
126 ConnectorType:
127 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
128{
129 info!("Starting remote server loop");
130 let shared_connector = Arc::new(connector);
131 let server_receiver = server.server_version_event_stream();
132 let client_version_receiver = server.event_stream();
133 pin_mut!(server_receiver);
134 pin_mut!(client_version_receiver);
135 loop {
136 select! {
137 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
138 None => {
139 info!("Connector disconnected, exiting loop.");
140 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
141 warn!("Cannot update remote about client disconnection");
142 }
143 break;
144 }
145 Some(client_message) => {
146 trace!("Got message from connector: {:?}", client_message);
147 let server_clone = server.clone();
148 let connected = server_clone.connected();
149 let connector_clone = shared_connector.clone();
150 let remote_event_sender_clone = remote_event_sender.clone();
151 let disconnect_notifier = disconnect_notifier.clone();
152 async_manager::spawn(async move {
153 match server_clone.parse_message(client_message.clone()).await {
154 Ok(ret_msg) => {
155 if !connected && server_clone.connected() {
157 if remote_event_sender_clone.receiver_count() > 0
158 && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
159 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
160 }
161 }
162 if connector_clone.send(ret_msg).await.is_err() {
163 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
164 }
165 },
166 Err(err_msg) => {
167 if connector_clone.send(err_msg).await.is_err() {
168 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
169 }
170 }
171 }
172 });
173 }
174 },
175 _ = disconnect_notifier.notified().fuse() => {
176 info!("Server disconnected via controller disappearance, exiting loop.");
177 break;
178 },
179 server_msg = server_receiver.next().fuse() => match server_msg {
180 None => {
181 info!("Server disconnected via server disappearance, exiting loop.");
182 break;
183 }
184 Some(msg) => {
185 }
207 },
208 client_msg = client_version_receiver.next().fuse() => match client_msg {
209 None => {
210 info!("Server disconnected via server disappearance, exiting loop.");
211 break;
212 }
213 Some(msg) => {
214 let connector_clone = shared_connector.clone();
215 if connector_clone.send(msg).await.is_err() {
216 error!("Server disappeared, exiting remote server thread.");
217 }
218 }
219 }
220 };
221 }
222 if let Err(err) = server.disconnect().await {
223 error!("Error disconnecting server: {:?}", err);
224 }
225 info!("Exiting remote server loop");
226}
227
228impl Default for ButtplugRemoteServer {
229 fn default() -> Self {
230 Self::new(
231 ButtplugServerBuilder::default()
232 .finish()
233 .expect("Default is infallible"),
234 &None,
235 )
236 }
237}
238
239impl ButtplugRemoteServer {
240 pub fn new(
241 server: ButtplugServer,
242 event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
243 ) -> Self {
244 let event_sender = if let Some(sender) = event_sender {
245 sender.clone()
246 } else {
247 broadcast::channel(256).0
248 };
249 let server = Arc::new(server);
253 {
254 let server = server.clone();
255 tokio::spawn({
256 let server = server;
257 let event_sender = event_sender.clone();
258 async move {
259 run_device_event_stream(server, event_sender).await;
260 }
261 });
262 }
263 Self {
264 event_sender,
265 server,
266 disconnect_notifier: Arc::new(Notify::new()),
267 }
268 }
269
270 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
271 convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
272 }
273
274 pub fn start<ConnectorType>(
275 &self,
276 mut connector: ConnectorType,
277 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
278 where
279 ConnectorType:
280 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
281 {
282 let server = self.server.clone();
283 let event_sender = self.event_sender.clone();
284 let disconnect_notifier = self.disconnect_notifier.clone();
285 async move {
286 let (connector_sender, connector_receiver) = mpsc::channel(256);
287 connector
291 .connect(connector_sender)
292 .await
293 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
294 run_server(
295 server,
296 event_sender,
297 connector,
298 connector_receiver,
299 disconnect_notifier,
300 )
301 .await;
302 Ok(())
303 }
304 }
305
306 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
307 self.disconnect_notifier.notify_waiters();
308 Ok(())
309 }
310
311 pub async fn shutdown(&self) -> Result<(), ButtplugError> {
312 self.server.shutdown().await?;
313 Ok(())
314 }
315}
316
317impl Drop for ButtplugRemoteServer {
318 fn drop(&mut self) {
319 self.disconnect_notifier.notify_waiters();
320 }
321}