intiface_engine/
backdoor_server.rs

1use buttplug::{
2  core::{
3    connector::{transport::ButtplugStreamTransport, ButtplugRemoteServerConnector},
4    message::serializer::{ButtplugSerializedMessage, ButtplugServerJSONSerializer},
5  },
6  server::{device::ServerDeviceManager, ButtplugServerBuilder},
7  util::stream::convert_broadcast_receiver_to_stream,
8};
9use std::sync::Arc;
10use tokio::sync::{
11  broadcast,
12  mpsc::{self, Sender},
13};
14use tokio_stream::Stream;
15
16use crate::ButtplugRemoteServer;
17
18// Allows direct access to the Device Manager of a running ButtplugServer. Bypasses requirements for
19// client handshake, ping, etc...
20pub struct BackdoorServer {
21  //server: ButtplugRemoteServer,
22  sender: Sender<ButtplugSerializedMessage>,
23  broadcaster: broadcast::Sender<String>,
24}
25
26impl BackdoorServer {
27  pub fn new(device_manager: Arc<ServerDeviceManager>) -> Self {
28    let server = ButtplugRemoteServer::new(
29      ButtplugServerBuilder::with_shared_device_manager(device_manager.clone())
30        .name("Intiface Backdoor Server")
31        .finish()
32        .unwrap(),
33    );
34    let (s_out, mut r_out) = mpsc::channel(255);
35    let (s_in, r_in) = mpsc::channel(255);
36    let (s_stream, _) = broadcast::channel(255);
37    tokio::spawn(async move {
38      if let Err(e) = server
39        .start(ButtplugRemoteServerConnector::<
40          _,
41          ButtplugServerJSONSerializer,
42        >::new(ButtplugStreamTransport::new(s_out, r_in)))
43        .await
44      {
45        // We can't do much if the server fails, but we *can* yell into the logs!
46        error!("Backdoor server error: {:?}", e);
47      }
48    });
49    let sender_clone = s_stream.clone();
50    tokio::spawn(async move {
51      while let Some(ButtplugSerializedMessage::Text(m)) = r_out.recv().await {
52        if sender_clone.receiver_count() == 0 {
53          continue;
54        }
55        if sender_clone.send(m).is_err() {
56          break;
57        }
58      }
59    });
60    Self {
61      sender: s_in,
62      broadcaster: s_stream,
63    }
64  }
65
66  pub fn event_stream(&self) -> impl Stream<Item = String> + '_ {
67    convert_broadcast_receiver_to_stream(self.broadcaster.subscribe())
68  }
69
70  pub async fn parse_message(&self, msg: &str) {
71    self
72      .sender
73      .send(ButtplugSerializedMessage::Text(msg.to_owned()))
74      .await
75      .unwrap();
76  }
77}