Skip to main content

intiface_engine/
remote_server.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7use buttplug_core::{
8  connector::ButtplugConnector,
9  errors::{ButtplugError, ButtplugHandshakeError},
10  message::{ButtplugMessageSpecVersion, ButtplugServerMessageV4},
11  util::{async_manager, stream::convert_broadcast_receiver_to_stream},
12};
13use buttplug_server::{
14  ButtplugServer, ButtplugServerBuilder,
15  message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
16};
17use buttplug_server_device_config::UserDeviceIdentifier;
18use dashmap::DashSet;
19use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
20use getset::Getters;
21use serde::{Deserialize, Serialize};
22use std::sync::Arc;
23use thiserror::Error;
24use tokio::sync::{
25  Notify,
26  broadcast::{self, Sender},
27  mpsc,
28};
29
30// Clone derived here to satisfy tokio broadcast requirements.
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub enum ButtplugRemoteServerEvent {
33  ClientConnected(String),
34  ClientDisconnected,
35  DeviceAdded {
36    index: u32,
37    identifier: UserDeviceIdentifier,
38    name: String,
39    display_name: Option<String>,
40    needs_keepalive: bool,
41  },
42  DeviceRemoved {
43    index: u32,
44  },
45  //DeviceCommand(ButtplugDeviceCommandMessageUnion)
46}
47
48#[derive(Error, Debug)]
49pub enum ButtplugServerConnectorError {
50  #[error("Cannot bring up server for connection: {0}")]
51  ConnectorError(String),
52}
53
54#[derive(Getters)]
55pub struct ButtplugRemoteServer {
56  #[getset(get = "pub")]
57  server: Arc<ButtplugServer>,
58  #[getset(get = "pub")]
59  event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
60  disconnect_notifier: Arc<Notify>,
61}
62
63async fn run_device_event_stream(
64  server: Arc<ButtplugServer>,
65  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
66) {
67  let server_receiver = server.server_version_event_stream();
68  let known_indexes = DashSet::<u32>::default();
69
70  pin_mut!(server_receiver);
71  loop {
72    match server_receiver.next().await {
73      None => {
74        info!("Server disconnected via server disappearance, exiting loop.");
75        break;
76      }
77      Some(msg) => {
78        if let ButtplugServerMessageV4::DeviceList(dl) = msg
79          && remote_event_sender.receiver_count() > 0
80        {
81          for da in dl.devices() {
82            if known_indexes.contains(&da.1.device_index()) {
83              continue;
84            }
85            if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
86              let added_event = ButtplugRemoteServerEvent::DeviceAdded {
87                index: da.1.device_index(),
88                name: da.1.device_name().clone(),
89                identifier: device_info.identifier().clone(),
90                display_name: device_info.display_name().clone(),
91                needs_keepalive: *device_info.needs_keepalive(),
92              };
93              if remote_event_sender.send(added_event).is_err() {
94                error!(
95                  "Cannot send event to owner, dropping and assuming local server thread has exited."
96                );
97              }
98              known_indexes.insert(da.1.device_index());
99            }
100          }
101          let indexes = known_indexes.clone();
102          let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
103          for dr in indexes {
104            if current_indexes.contains(&dr) {
105              continue;
106            }
107            let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
108            if remote_event_sender.send(removed_event).is_err() {
109              error!(
110                "Cannot send event to owner, dropping and assuming local server thread has exited."
111              );
112            }
113            known_indexes.remove(&dr);
114          }
115        }
116      }
117    }
118  }
119}
120
121async fn run_server<ConnectorType>(
122  server: Arc<ButtplugServer>,
123  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
124  connector: ConnectorType,
125  mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
126  disconnect_notifier: Arc<Notify>,
127) where
128  ConnectorType:
129    ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
130{
131  info!("Starting remote server loop");
132  let shared_connector = Arc::new(connector);
133  let server_receiver = server.server_version_event_stream();
134  let client_version_receiver = server.event_stream();
135  pin_mut!(server_receiver);
136  pin_mut!(client_version_receiver);
137  loop {
138    select! {
139      connector_msg = connector_receiver.recv().fuse() => match connector_msg {
140        None => {
141          info!("Connector disconnected, exiting loop.");
142          if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
143            warn!("Cannot update remote about client disconnection");
144          }
145          break;
146        }
147        Some(client_message) => {
148          trace!("Got message from connector: {:?}", client_message);
149          let server_clone = server.clone();
150          let connected = server_clone.connected();
151          let connector_clone = shared_connector.clone();
152          let remote_event_sender_clone = remote_event_sender.clone();
153          let disconnect_notifier = disconnect_notifier.clone();
154          async_manager::spawn(async move {
155            match server_clone.parse_message(client_message.clone()).await {
156              Ok(ret_msg) => {
157                // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick.
158                if !connected && server_clone.connected() {
159                  if remote_event_sender_clone.receiver_count() > 0
160                    && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
161                      error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
162                  }
163                }
164                if connector_clone.send(ret_msg).await.is_err() {
165                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
166                }
167              },
168              Err(err_msg) => {
169                if connector_clone.send(err_msg).await.is_err() {
170                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
171                }
172              }
173            }
174          });
175        }
176      },
177      _ = disconnect_notifier.notified().fuse() => {
178        info!("Server disconnected via controller disappearance, exiting loop.");
179        break;
180      },
181      server_msg = server_receiver.next().fuse() => match server_msg {
182        None => {
183          info!("Server disconnected via server disappearance, exiting loop.");
184          break;
185        }
186        Some(msg) => {
187          /*
188          if remote_event_sender.receiver_count() > 0 {
189            match &msg {
190              ButtplugServerMessageV4::DeviceAdded(da) => {
191                if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
192                  let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
193                  if remote_event_sender.send(added_event).is_err() {
194                    error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
195                  }
196                }
197              },
198              ButtplugServerMessageV4::DeviceRemoved(dr) => {
199                let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
200                if remote_event_sender.send(removed_event).is_err() {
201                  error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
202                }
203              },
204              _ => {}
205            }
206          }
207          */
208        }
209      },
210      client_msg = client_version_receiver.next().fuse() => match client_msg {
211        None => {
212          info!("Server disconnected via server disappearance, exiting loop.");
213          break;
214        }
215        Some(msg) => {
216          let connector_clone = shared_connector.clone();
217          if connector_clone.send(msg).await.is_err() {
218            error!("Server disappeared, exiting remote server thread.");
219          }
220        }
221      }
222    };
223  }
224  if let Err(err) = server.disconnect().await {
225    error!("Error disconnecting server: {:?}", err);
226  }
227  info!("Exiting remote server loop");
228}
229
230impl Default for ButtplugRemoteServer {
231  fn default() -> Self {
232    Self::new(
233      ButtplugServerBuilder::default()
234        .finish()
235        .expect("Default is infallible"),
236      &None,
237    )
238  }
239}
240
241impl ButtplugRemoteServer {
242  pub fn new(
243    server: ButtplugServer,
244    event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
245  ) -> Self {
246    let event_sender = if let Some(sender) = event_sender {
247      sender.clone()
248    } else {
249      broadcast::channel(256).0
250    };
251    // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to
252    // the RemoteServer instance, not just during client connect. We need to make sure these are
253    // emitted to the frontend.
254    let server = Arc::new(server);
255    {
256      let server = server.clone();
257      tokio::spawn({
258        let server = server;
259        let event_sender = event_sender.clone();
260        async move {
261          run_device_event_stream(server, event_sender).await;
262        }
263      });
264    }
265    Self {
266      event_sender,
267      server,
268      disconnect_notifier: Arc::new(Notify::new()),
269    }
270  }
271
272  pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
273    convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
274  }
275
276  pub fn start<ConnectorType>(
277    &self,
278    mut connector: ConnectorType,
279  ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
280  where
281    ConnectorType:
282      ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
283  {
284    let server = self.server.clone();
285    let event_sender = self.event_sender.clone();
286    let disconnect_notifier = self.disconnect_notifier.clone();
287    async move {
288      let (connector_sender, connector_receiver) = mpsc::channel(256);
289      // Due to the connect method requiring a mutable connector, we must connect before starting up
290      // our server loop. Anything that needs to happen outside of the client connection session
291      // should happen around this. This flow is locked.
292      connector
293        .connect(connector_sender)
294        .await
295        .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
296      run_server(
297        server,
298        event_sender,
299        connector,
300        connector_receiver,
301        disconnect_notifier,
302      )
303      .await;
304      Ok(())
305    }
306  }
307
308  pub async fn disconnect(&self) -> Result<(), ButtplugError> {
309    self.disconnect_notifier.notify_waiters();
310    Ok(())
311  }
312
313  pub async fn shutdown(&self) -> Result<(), ButtplugError> {
314    self.server.shutdown().await?;
315    Ok(())
316  }
317}
318
319impl Drop for ButtplugRemoteServer {
320  fn drop(&mut self) {
321    self.disconnect_notifier.notify_waiters();
322  }
323}