1use buttplug::{
9 core::{
10 connector::ButtplugConnector,
11 errors::ButtplugError,
12 message::{
13 ButtplugClientMessageVariant, ButtplugServerMessageV4, ButtplugServerMessageVariant,
14 },
15 },
16 server::{
17 device::configuration::UserDeviceIdentifier, ButtplugServer, ButtplugServerBuilder,
18 ButtplugServerDowngradeWrapper,
19 },
20 util::{async_manager, stream::convert_broadcast_receiver_to_stream},
21};
22use futures::{future::Future, pin_mut, select, FutureExt, Stream, StreamExt};
23use getset::Getters;
24use serde::{Deserialize, Serialize};
25use std::sync::Arc;
26use thiserror::Error;
27use tokio::sync::{broadcast, mpsc, Notify};
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
31pub enum ButtplugRemoteServerEvent {
32 ClientConnected(String),
33 ClientDisconnected,
34 DeviceAdded {
35 index: u32,
36 identifier: UserDeviceIdentifier,
37 name: String,
38 display_name: Option<String>,
39 },
40 DeviceRemoved {
41 index: u32,
42 },
43 }
45
46#[derive(Error, Debug)]
47pub enum ButtplugServerConnectorError {
48 #[error("Cannot bring up server for connection: {0}")]
49 ConnectorError(String),
50}
51
52#[derive(Getters)]
53pub struct ButtplugRemoteServer {
54 #[getset(get = "pub")]
55 server: Arc<ButtplugServerDowngradeWrapper>,
56 event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
57 disconnect_notifier: Arc<Notify>,
58}
59
60async fn run_device_event_stream(
61 server: Arc<ButtplugServerDowngradeWrapper>,
62 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
63) {
64 let server_receiver = server.server_version_event_stream();
65 pin_mut!(server_receiver);
66 loop {
67 match server_receiver.next().await {
68 None => {
69 info!("Server disconnected via server disappearance, exiting loop.");
70 break;
71 }
72 Some(msg) => {
73 if remote_event_sender.receiver_count() > 0 {
74 match &msg {
75 ButtplugServerMessageV4::DeviceAdded(da) => {
76 if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
77 let added_event = ButtplugRemoteServerEvent::DeviceAdded {
78 index: da.device_index(),
79 name: da.device_name().clone(),
80 identifier: device_info.identifier().clone().into(),
81 display_name: device_info.display_name().clone(),
82 };
83 if remote_event_sender.send(added_event).is_err() {
84 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
85 }
86 }
87 }
88 ButtplugServerMessageV4::DeviceRemoved(dr) => {
89 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved {
90 index: dr.device_index(),
91 };
92 if remote_event_sender.send(removed_event).is_err() {
93 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
94 }
95 }
96 _ => {}
97 }
98 }
99 }
100 }
101 }
102}
103
104async fn run_server<ConnectorType>(
105 server: Arc<ButtplugServerDowngradeWrapper>,
106 remote_event_sender: broadcast::Sender<ButtplugRemoteServerEvent>,
107 connector: ConnectorType,
108 mut connector_receiver: mpsc::Receiver<ButtplugClientMessageVariant>,
109 disconnect_notifier: Arc<Notify>,
110) where
111 ConnectorType:
112 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
113{
114 info!("Starting remote server loop");
115 let shared_connector = Arc::new(connector);
116 let server_receiver = server.server_version_event_stream();
117 let client_version_receiver = server.client_version_event_stream();
118 pin_mut!(server_receiver);
119 pin_mut!(client_version_receiver);
120 loop {
121 select! {
122 connector_msg = connector_receiver.recv().fuse() => match connector_msg {
123 None => {
124 info!("Connector disconnected, exiting loop.");
125 if remote_event_sender.receiver_count() > 0 && remote_event_sender.send(ButtplugRemoteServerEvent::ClientDisconnected).is_err() {
126 warn!("Cannot update remote about client disconnection");
127 }
128 break;
129 }
130 Some(client_message) => {
131 trace!("Got message from connector: {:?}", client_message);
132 let server_clone = server.clone();
133 let connected = server_clone.connected();
134 let connector_clone = shared_connector.clone();
135 let remote_event_sender_clone = remote_event_sender.clone();
136 async_manager::spawn(async move {
137 match server_clone.parse_message(client_message.clone()).await {
138 Ok(ret_msg) => {
139 if !connected && server_clone.connected() {
141 if remote_event_sender_clone.receiver_count() > 0 {
142 if remote_event_sender_clone.send(ButtplugRemoteServerEvent::ClientConnected(server_clone.client_name().unwrap_or("Buttplug Client (No name specified)".to_owned()).clone())).is_err() {
143 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
144 }
145 }
146 }
147 if connector_clone.send(ret_msg).await.is_err() {
148 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
149 }
150 },
151 Err(err_msg) => {
152 if connector_clone.send(err_msg.into()).await.is_err() {
153 error!("Cannot send reply to server, dropping and assuming remote server thread has exited.");
154 }
155 }
156 }
157 });
158 }
159 },
160 _ = disconnect_notifier.notified().fuse() => {
161 info!("Server disconnected via controller disappearance, exiting loop.");
162 break;
163 },
164 server_msg = server_receiver.next().fuse() => match server_msg {
165 None => {
166 info!("Server disconnected via server disappearance, exiting loop.");
167 break;
168 }
169 Some(msg) => {
170 if remote_event_sender.receiver_count() > 0 {
171 match &msg {
172 ButtplugServerMessageV4::DeviceAdded(da) => {
173 if let Some(device_info) = server.device_manager().device_info(da.device_index()) {
174 let added_event = ButtplugRemoteServerEvent::DeviceAdded { index: da.device_index(), name: da.device_name().clone(), identifier: device_info.identifier().clone().into(), display_name: device_info.display_name().clone() };
175 if remote_event_sender.send(added_event).is_err() {
176 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
177 }
178 }
179 },
180 ButtplugServerMessageV4::DeviceRemoved(dr) => {
181 let removed_event = ButtplugRemoteServerEvent::DeviceRemoved { index: dr.device_index() };
182 if remote_event_sender.send(removed_event).is_err() {
183 error!("Cannot send event to owner, dropping and assuming local server thread has exited.");
184 }
185 },
186 _ => {}
187 }
188 }
189 }
190 },
191 client_msg = client_version_receiver.next().fuse() => match client_msg {
192 None => {
193 info!("Server disconnected via server disappearance, exiting loop.");
194 break;
195 }
196 Some(msg) => {
197 let connector_clone = shared_connector.clone();
198 if connector_clone.send(msg.into()).await.is_err() {
199 error!("Server disappeared, exiting remote server thread.");
200 }
201 }
202 }
203 };
204 }
205 if let Err(err) = server.disconnect().await {
206 error!("Error disconnecting server: {:?}", err);
207 }
208 info!("Exiting remote server loop");
209}
210
211impl Default for ButtplugRemoteServer {
212 fn default() -> Self {
213 Self::new(
214 ButtplugServerBuilder::default()
215 .finish()
216 .expect("Default is infallible"),
217 )
218 }
219}
220
221impl ButtplugRemoteServer {
222 pub fn new(server: ButtplugServer) -> Self {
223 let (event_sender, _) = broadcast::channel(256);
224 let wrapped_server = Arc::new(ButtplugServerDowngradeWrapper::new(server));
225 tokio::spawn({
229 let server = wrapped_server.clone();
230 let event_sender = event_sender.clone();
231 async move {
232 run_device_event_stream(server, event_sender).await;
233 }
234 });
235 Self {
236 event_sender,
237 server: wrapped_server.clone(),
238 disconnect_notifier: Arc::new(Notify::new()),
239 }
240 }
241
242 pub fn event_stream(&self) -> impl Stream<Item = ButtplugRemoteServerEvent> {
243 convert_broadcast_receiver_to_stream(self.event_sender.subscribe())
244 }
245
246 pub fn start<ConnectorType>(
247 &self,
248 mut connector: ConnectorType,
249 ) -> impl Future<Output = Result<(), ButtplugServerConnectorError>>
250 where
251 ConnectorType:
252 ButtplugConnector<ButtplugServerMessageVariant, ButtplugClientMessageVariant> + 'static,
253 {
254 let server = self.server.clone();
255 let event_sender = self.event_sender.clone();
256 let disconnect_notifier = self.disconnect_notifier.clone();
257 async move {
258 let (connector_sender, connector_receiver) = mpsc::channel(256);
259 connector
263 .connect(connector_sender)
264 .await
265 .map_err(|e| ButtplugServerConnectorError::ConnectorError(format!("{:?}", e)))?;
266 run_server(
267 server,
268 event_sender,
269 connector,
270 connector_receiver,
271 disconnect_notifier,
272 )
273 .await;
274 Ok(())
275 }
276 }
277
278 pub async fn disconnect(&self) -> Result<(), ButtplugError> {
279 self.disconnect_notifier.notify_waiters();
280 Ok(())
281 }
282
283 pub async fn shutdown(&self) -> Result<(), ButtplugError> {
284 self.server.shutdown().await?;
285 Ok(())
286 }
287}
288
289impl Drop for ButtplugRemoteServer {
290 fn drop(&mut self) {
291 self.disconnect_notifier.notify_waiters();
292 }
293}