intiface_engine/
backdoor_server.rs1use buttplug::{
2 core::{
3 connector::{transport::ButtplugStreamTransport, ButtplugRemoteServerConnector},
4 message::serializer::{ButtplugSerializedMessage, ButtplugServerJSONSerializer},
5 },
6 server::{device::ServerDeviceManager, ButtplugServerBuilder},
7 util::stream::convert_broadcast_receiver_to_stream,
8};
9use std::sync::Arc;
10use tokio::sync::{
11 broadcast,
12 mpsc::{self, Sender},
13};
14use tokio_stream::Stream;
15
16use crate::ButtplugRemoteServer;
17
18pub struct BackdoorServer {
21 sender: Sender<ButtplugSerializedMessage>,
23 broadcaster: broadcast::Sender<String>,
24}
25
26impl BackdoorServer {
27 pub fn new(device_manager: Arc<ServerDeviceManager>) -> Self {
28 let server = ButtplugRemoteServer::new(
29 ButtplugServerBuilder::with_shared_device_manager(device_manager.clone())
30 .name("Intiface Backdoor Server")
31 .finish()
32 .unwrap(),
33 );
34 let (s_out, mut r_out) = mpsc::channel(255);
35 let (s_in, r_in) = mpsc::channel(255);
36 let (s_stream, _) = broadcast::channel(255);
37 tokio::spawn(async move {
38 if let Err(e) = server
39 .start(ButtplugRemoteServerConnector::<
40 _,
41 ButtplugServerJSONSerializer,
42 >::new(ButtplugStreamTransport::new(s_out, r_in)))
43 .await
44 {
45 error!("Backdoor server error: {:?}", e);
47 }
48 });
49 let sender_clone = s_stream.clone();
50 tokio::spawn(async move {
51 while let Some(ButtplugSerializedMessage::Text(m)) = r_out.recv().await {
52 if sender_clone.receiver_count() == 0 {
53 continue;
54 }
55 if sender_clone.send(m).is_err() {
56 break;
57 }
58 }
59 });
60 Self {
61 sender: s_in,
62 broadcaster: s_stream,
63 }
64 }
65
66 pub fn event_stream(&self) -> impl Stream<Item = String> + '_ {
67 convert_broadcast_receiver_to_stream(self.broadcaster.subscribe())
68 }
69
70 pub async fn parse_message(&self, msg: &str) {
71 self
72 .sender
73 .send(ButtplugSerializedMessage::Text(msg.to_owned()))
74 .await
75 .unwrap();
76 }
77}