Skip to main content

intiface_engine/
remote_server.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7use buttplug_core::{
8  connector::ButtplugConnector,
9  errors::{ButtplugError, ButtplugHandshakeError},
10  message::{ButtplugMessageSpecVersion, ButtplugServerMessageV4},
11  util::{async_manager, stream::convert_broadcast_receiver_to_stream},
12};
13use buttplug_server::{
14  ButtplugServer, ButtplugServerBuilder,
15  message::{ButtplugClientMessageVariant, ButtplugServerMessageVariant},
16};
17use buttplug_server_device_config::UserDeviceIdentifier;
18use dashmap::DashSet;
19use futures::{FutureExt, Stream, StreamExt, future::Future, pin_mut, select};
20use getset::Getters;
21use serde::{Deserialize, Serialize};
22use std::sync::Arc;
23use thiserror::Error;
24use tokio::sync::{
25  Notify,
26  broadcast::{self, Sender},
27  mpsc,
28};
29
30// Clone derived here to satisfy tokio broadcast requirements.
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub enum ButtplugRemoteServerEvent {
33  ClientConnected(String),
34  ClientDisconnected,
35  DeviceAdded {
36    index: u32,
37    identifier: UserDeviceIdentifier,
38    name: String,
39    display_name: Option<String>,
40  },
41  DeviceRemoved {
42    index: u32,
43  },
44  //DeviceCommand(ButtplugDeviceCommandMessageUnion)
45}
46
47#[derive(Error, Debug)]
48pub enum ButtplugServerConnectorError {
49  #[error("Cannot bring up server for connection: {0}")]
50  ConnectorError(String),
51}
52
53#[derive(Getters)]
54pub struct ButtplugRemoteServer {
55  #[getset(get = "pub")]
56  server: Arc<ButtplugServer>,
57  #[getset(get = "pub")]
58  event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
59  disconnect_notifier: Arc<Notify>,
60}
61
62async fn run_device_event_stream(
63  server: Arc<ButtplugServer>,
64  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
65) {
66  let server_receiver = server.server_version_event_stream();
67  let known_indexes = DashSet::<u32>::default();
68
69  pin_mut!(server_receiver);
70  loop {
71    match server_receiver.next().await {
72      None => {
73        info!("Server disconnected via server disappearance, exiting loop.");
74        break;
75      }
76      Some(msg) => {
77        if let ButtplugServerMessageV4::DeviceList(dl) = msg
78          && remote_event_sender.receiver_count() > 0
79        {
80          for da in dl.devices() {
81            if known_indexes.contains(&da.1.device_index()) {
82              continue;
83            }
84            if let Some(device_info) = server.device_manager().device_info(da.1.device_index()) {
85              let added_event = ButtplugRemoteServerEvent::DeviceAdded {
86                index: da.1.device_index(),
87                name: da.1.device_name().clone(),
88                identifier: device_info.identifier().clone(),
89                display_name: device_info.display_name().clone(),
90              };
91              if remote_event_sender.send(added_event).is_err() {
92                error!(
93                  "Cannot send event to owner, dropping and assuming local server thread has exited."
94                );
95              }
96              known_indexes.insert(da.1.device_index());
97            }
98          }
99          let indexes = known_indexes.clone();
100          let current_indexes: Vec<u32> = dl.devices().keys().cloned().collect();
101          for dr in indexes {
102            if current_indexes.contains(&dr) {
103              continue;
104            }
105            let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr };
106            if remote_event_sender.send(removed_event).is_err() {
107              error!(
108                "Cannot send event to owner, dropping and assuming local server thread has exited."
109              );
110            }
111            known_indexes.remove(&dr);
112          }
113        }
114      }
115    }
116  }
117}
118
119async fn run_server<ConnectorType>(
120  server: Arc<ButtplugServer>,
121  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
122  connector: ConnectorType,
123  mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
124  disconnect_notifier: Arc<Notify>,
125) where
126  ConnectorType:
127    ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
128{
129  info!("Starting remote server loop");
130  let shared_connector = Arc::new(connector);
131  let server_receiver = server.server_version_event_stream();
132  let client_version_receiver = server.event_stream();
133  pin_mut!(server_receiver);
134  pin_mut!(client_version_receiver);
135  loop {
136    select! {
137      connector_msg = connector_receiver.recv().fuse() => match connector_msg {
138        None => {
139          info!("Connector disconnected, exiting loop.");
140          if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
141            warn!("Cannot update remote about client disconnection");
142          }
143          break;
144        }
145        Some(client_message) => {
146          trace!("Got message from connector: {:?}", client_message);
147          let server_clone = server.clone();
148          let connected = server_clone.connected();
149          let connector_clone = shared_connector.clone();
150          let remote_event_sender_clone = remote_event_sender.clone();
151          let disconnect_notifier = disconnect_notifier.clone();
152          async_manager::spawn(async move {
153            match server_clone.parse_message(client_message.clone()).await {
154              Ok(ret_msg) => {
155                // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick.
156                if !connected && server_clone.connected() {
157                  if remote_event_sender_clone.receiver_count() > 0
158                    && remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
159                      error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
160                  }
161                }
162                if connector_clone.send(ret_msg).await.is_err() {
163                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
164                }
165              },
166              Err(err_msg) => {
167                if connector_clone.send(err_msg).await.is_err() {
168                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
169                }
170              }
171            }
172          });
173        }
174      },
175      _ = disconnect_notifier.notified().fuse() => {
176        info!("Server disconnected via controller disappearance, exiting loop.");
177        break;
178      },
179      server_msg = server_receiver.next().fuse() => match server_msg {
180        None => {
181          info!("Server disconnected via server disappearance, exiting loop.");
182          break;
183        }
184        Some(msg) => {
185          /*
186          if remote_event_sender.receiver_count() > 0 {
187            match &msg {
188              ButtplugServerMessageV4::DeviceAdded(da) => {
189                if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
190                  let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
191                  if remote_event_sender.send(added_event).is_err() {
192                    error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
193                  }
194                }
195              },
196              ButtplugServerMessageV4::DeviceRemoved(dr) => {
197                let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
198                if remote_event_sender.send(removed_event).is_err() {
199                  error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
200                }
201              },
202              _ => {}
203            }
204          }
205          */
206        }
207      },
208      client_msg = client_version_receiver.next().fuse() => match client_msg {
209        None => {
210          info!("Server disconnected via server disappearance, exiting loop.");
211          break;
212        }
213        Some(msg) => {
214          let connector_clone = shared_connector.clone();
215          if connector_clone.send(msg).await.is_err() {
216            error!("Server disappeared, exiting remote server thread.");
217          }
218        }
219      }
220    };
221  }
222  if let Err(err) = server.disconnect().await {
223    error!("Error disconnecting server: {:?}", err);
224  }
225  info!("Exiting remote server loop");
226}
227
228impl Default for ButtplugRemoteServer {
229  fn default() -> Self {
230    Self::new(
231      ButtplugServerBuilder::default()
232        .finish()
233        .expect("Default is infallible"),
234      &None,
235    )
236  }
237}
238
239impl ButtplugRemoteServer {
240  pub fn new(
241    server: ButtplugServer,
242    event_sender: &Option<Sender<ButtplugRemoteServerEvent>>,
243  ) -> Self {
244    let event_sender = if let Some(sender) = event_sender {
245      sender.clone()
246    } else {
247      broadcast::channel(256).0
248    };
249    // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to
250    // the RemoteServer instance, not just during client connect. We need to make sure these are
251    // emitted to the frontend.
252    let server = Arc::new(server);
253    {
254      let server = server.clone();
255      tokio::spawn({
256        let server = server;
257        let event_sender = event_sender.clone();
258        async move {
259          run_device_event_stream(server, event_sender).await;
260        }
261      });
262    }
263    Self {
264      event_sender,
265      server,
266      disconnect_notifier: Arc::new(Notify::new()),
267    }
268  }
269
270  pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> + use<> {
271    convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
272  }
273
274  pub fn start<ConnectorType>(
275    &self,
276    mut connector: ConnectorType,
277  ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>> + use<ConnectorType>
278  where
279    ConnectorType:
280      ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
281  {
282    let server = self.server.clone();
283    let event_sender = self.event_sender.clone();
284    let disconnect_notifier = self.disconnect_notifier.clone();
285    async move {
286      let (connector_sender, connector_receiver) = mpsc::channel(256);
287      // Due to the connect method requiring a mutable connector, we must connect before starting up
288      // our server loop. Anything that needs to happen outside of the client connection session
289      // should happen around this. This flow is locked.
290      connector
291        .connect(connector_sender)
292        .await
293        .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
294      run_server(
295        server,
296        event_sender,
297        connector,
298        connector_receiver,
299        disconnect_notifier,
300      )
301      .await;
302      Ok(())
303    }
304  }
305
306  pub async fn disconnect(&self) -> Result<(), ButtplugError> {
307    self.disconnect_notifier.notify_waiters();
308    Ok(())
309  }
310
311  pub async fn shutdown(&self) -> Result<(), ButtplugError> {
312    self.server.shutdown().await?;
313    Ok(())
314  }
315}
316
317impl Drop for ButtplugRemoteServer {
318  fn drop(&mut self) {
319    self.disconnect_notifier.notify_waiters();
320  }
321}