intiface_engine/
remote_server.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2022 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use buttplug::{
9  core::{
10    connector::ButtplugConnector,
11    errors::ButtplugError,
12    message::{
13      ButtplugClientMessageVariant, ButtplugServerMessageV4, ButtplugServerMessageVariant,
14    },
15  },
16  server::{
17    device::configuration::UserDeviceIdentifier, ButtplugServer, ButtplugServerBuilder,
18    ButtplugServerDowngradeWrapper,
19  },
20  util::{async_manager, stream::convert_broadcast_receiver_to_stream},
21};
22use futures::{future::Future, pin_mut, select, FutureExt, Stream, StreamExt};
23use getset::Getters;
24use serde::{Deserialize, Serialize};
25use std::sync::Arc;
26use thiserror::Error;
27use tokio::sync::{broadcast, mpsc, Notify};
28
29// Clone derived here to satisfy tokio broadcast requirements.
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub enum ButtplugRemoteServerEvent {
32  ClientConnected(String),
33  ClientDisconnected,
34  DeviceAdded {
35    index: u32,
36    identifier: UserDeviceIdentifier,
37    name: String,
38    display_name: Option<String>,
39  },
40  DeviceRemoved {
41    index: u32,
42  },
43  //DeviceCommand(ButtplugDeviceCommandMessageUnion)
44}
45
46#[derive(Error, Debug)]
47pub enum ButtplugServerConnectorError {
48  #[error("Cannot bring up server for connection: {0}")]
49  ConnectorError(String),
50}
51
52#[derive(Getters)]
53pub struct ButtplugRemoteServer {
54  #[getset(get = "pub")]
55  server: Arc<ButtplugServerDowngradeWrapper>,
56  event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
57  disconnect_notifier: Arc<Notify>,
58}
59
60async fn run_device_event_stream(
61  server: Arc<ButtplugServerDowngradeWrapper>,
62  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
63) {
64  let server_receiver = server.server_version_event_stream();
65  pin_mut!(server_receiver);
66  loop {
67    match server_receiver.next().await {
68      None => {
69        info!("Server disconnected via server disappearance, exiting loop.");
70        break;
71      }
72      Some(msg) => {
73        if remote_event_sender.receiver_count() > 0 {
74          match &msg {
75            ButtplugServerMessageV4::DeviceAdded(da) => {
76              if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
77                let added_event = ButtplugRemoteServerEvent::DeviceAdded {
78                  index: da.device_index(),
79                  name: da.device_name().clone(),
80                  identifier: device_info.identifier().clone().into(),
81                  display_name: device_info.display_name().clone(),
82                };
83                if remote_event_sender.send(added_event).is_err() {
84                  error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
85                }
86              }
87            }
88            ButtplugServerMessageV4::DeviceRemoved(dr) => {
89              let removed_event = ButtplugRemoteServerEvent::DeviceRemoved {
90                index: dr.device_index(),
91              };
92              if remote_event_sender.send(removed_event).is_err() {
93                error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
94              }
95            }
96            _ => {}
97          }
98        }
99      }
100    }
101  }
102}
103
104async fn run_server<ConnectorType>(
105  server: Arc<ButtplugServerDowngradeWrapper>,
106  remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
107  connector: ConnectorType,
108  mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
109  disconnect_notifier: Arc<Notify>,
110) where
111  ConnectorType:
112    ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
113{
114  info!("Starting remote server loop");
115  let shared_connector = Arc::new(connector);
116  let server_receiver = server.server_version_event_stream();
117  let client_version_receiver = server.client_version_event_stream();
118  pin_mut!(server_receiver);
119  pin_mut!(client_version_receiver);
120  loop {
121    select! {
122      connector_msg = connector_receiver.recv().fuse() => match connector_msg {
123        None => {
124          info!("Connector disconnected, exiting loop.");
125          if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
126            warn!("Cannot update remote about client disconnection");
127          }
128          break;
129        }
130        Some(client_message) => {
131          trace!("Got message from connector: {:?}", client_message);
132          let server_clone = server.clone();
133          let connected = server_clone.connected();
134          let connector_clone = shared_connector.clone();
135          let remote_event_sender_clone = remote_event_sender.clone();
136          async_manager::spawn(async move {
137            match server_clone.parse_message(client_message.clone()).await {
138              Ok(ret_msg) => {
139                // Only send event if we just connected. Sucks to check it on every message but the boolean check should be quick.
140                if !connected && server_clone.connected() {
141                  if remote_event_sender_clone.receiver_count() > 0 {
142                    if remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
143                      error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
144                    }
145                  }
146                }
147                if connector_clone.send(ret_msg).await.is_err() {
148                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
149                }
150              },
151              Err(err_msg) => {
152                if connector_clone.send(err_msg.into()).await.is_err() {
153                  error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
154                }
155              }
156            }
157          });
158        }
159      },
160      _ = disconnect_notifier.notified().fuse() => {
161        info!("Server disconnected via controller disappearance, exiting loop.");
162        break;
163      },
164      server_msg = server_receiver.next().fuse() => match server_msg {
165        None => {
166          info!("Server disconnected via server disappearance, exiting loop.");
167          break;
168        }
169        Some(msg) => {
170          if remote_event_sender.receiver_count() > 0 {
171            match &msg {
172              ButtplugServerMessageV4::DeviceAdded(da) => {
173                if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
174                  let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
175                  if remote_event_sender.send(added_event).is_err() {
176                    error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
177                  }
178                }
179              },
180              ButtplugServerMessageV4::DeviceRemoved(dr) => {
181                let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
182                if remote_event_sender.send(removed_event).is_err() {
183                  error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
184                }
185              },
186              _ => {}
187            }
188          }
189        }
190      },
191      client_msg = client_version_receiver.next().fuse() => match client_msg {
192        None => {
193          info!("Server disconnected via server disappearance, exiting loop.");
194          break;
195        }
196        Some(msg) => {
197          let connector_clone = shared_connector.clone();
198          if connector_clone.send(msg.into()).await.is_err() {
199            error!("Server disappeared, exiting remote server thread.");
200          }
201        }
202      }
203    };
204  }
205  if let Err(err) = server.disconnect().await {
206    error!("Error disconnecting server: {:?}", err);
207  }
208  info!("Exiting remote server loop");
209}
210
211impl Default for ButtplugRemoteServer {
212  fn default() -> Self {
213    Self::new(
214      ButtplugServerBuilder::default()
215        .finish()
216        .expect("Default is infallible"),
217    )
218  }
219}
220
221impl ButtplugRemoteServer {
222  pub fn new(server: ButtplugServer) -> Self {
223    let (event_sender, _) = broadcast::channel(256);
224    let wrapped_server = Arc::new(ButtplugServerDowngradeWrapper::new(server));
225    // Thanks to the existence of the backdoor server, device updates can happen for the lifetime to
226    // the RemoteServer instance, not just during client connect. We need to make sure these are
227    // emitted to the frontend.
228    tokio::spawn({
229      let server = wrapped_server.clone();
230      let event_sender = event_sender.clone();
231      async move {
232        run_device_event_stream(server, event_sender).await;
233      }
234    });
235    Self {
236      event_sender,
237      server: wrapped_server.clone(),
238      disconnect_notifier: Arc::new(Notify::new()),
239    }
240  }
241
242  pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> {
243    convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
244  }
245
246  pub fn start<ConnectorType>(
247    &self,
248    mut connector: ConnectorType,
249  ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>>
250  where
251    ConnectorType:
252      ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
253  {
254    let server = self.server.clone();
255    let event_sender = self.event_sender.clone();
256    let disconnect_notifier = self.disconnect_notifier.clone();
257    async move {
258      let (connector_sender, connector_receiver) = mpsc::channel(256);
259      // Due to the connect method requiring a mutable connector, we must connect before starting up
260      // our server loop. Anything that needs to happen outside of the client connection session
261      // should happen around this. This flow is locked.
262      connector
263        .connect(connector_sender)
264        .await
265        .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
266      run_server(
267        server,
268        event_sender,
269        connector,
270        connector_receiver,
271        disconnect_notifier,
272      )
273      .await;
274      Ok(())
275    }
276  }
277
278  pub async fn disconnect(&self) -> Result<(), ButtplugError> {
279    self.disconnect_notifier.notify_waiters();
280    Ok(())
281  }
282
283  pub async fn shutdown(&self) -> Result<(), ButtplugError> {
284    self.server.shutdown().await?;
285    Ok(())
286  }
287}
288
289impl Drop for ButtplugRemoteServer {
290  fn drop(&mut self) {
291    self.disconnect_notifier.notify_waiters();
292  }
293}