intiface_engine/
backdoor_server.rs1use buttplug_core::{
9 connector::transport::stream::ButtplugStreamTransport,
10 message::serializer::ButtplugSerializedMessage,
11 util::stream::convert_broadcast_receiver_to_stream,
12};
13use buttplug_server::{
14 ButtplugServerBuilder, connector::ButtplugRemoteServerConnector, device::ServerDeviceManager,
15 message::serializer::ButtplugServerJSONSerializer,
16};
17use std::sync::Arc;
18use tokio::sync::{
19 broadcast,
20 mpsc::{self, Sender},
21};
22use tokio_stream::Stream;
23
24use crate::ButtplugRemoteServer;
25
26pub struct BackdoorServer {
29 sender: Sender<ButtplugSerializedMessage>,
31 broadcaster: broadcast::Sender<String>,
32}
33
34impl BackdoorServer {
35 pub fn new(device_manager: Arc<ServerDeviceManager>) -> Self {
36 let server = ButtplugRemoteServer::new(
37 ButtplugServerBuilder::with_shared_device_manager(device_manager.clone())
38 .name("Intiface Backdoor Server")
39 .finish()
40 .unwrap(),
41 &None,
42 );
43 let (s_out, mut r_out) = mpsc::channel(255);
44 let (s_in, r_in) = mpsc::channel(255);
45 let (s_stream, _) = broadcast::channel(255);
46 tokio::spawn(async move {
47 if let Err(e) = server
49 .start(ButtplugRemoteServerConnector::<
50 _,
51 ButtplugServerJSONSerializer,
52 >::new(ButtplugStreamTransport::new(s_out, r_in)))
53 .await
54 {
55 error!("Backdoor server error: {:?}", e);
57 }
58 });
59 let sender_clone = s_stream.clone();
60 tokio::spawn(async move {
61 while let Some(ButtplugSerializedMessage::Text(m)) = r_out.recv().await {
62 if sender_clone.receiver_count() == 0 {
63 continue;
64 }
65 if sender_clone.send(m).is_err() {
66 break;
67 }
68 }
69 });
70 Self {
71 sender: s_in,
72 broadcaster: s_stream,
73 }
74 }
75
76 pub fn event_stream(&self) -> impl Stream<Item = String> + '_ {
77 convert_broadcast_receiver_to_stream(self.broadcaster.subscribe())
78 }
79
80 pub async fn parse_message(&self, msg: &str) {
81 self
82 .sender
83 .send(ButtplugSerializedMessage::Text(msg.to_owned()))
84 .await
85 .unwrap();
86 }
87}