1#[macro_use]
11extern crate log;
12
13pub mod client_event_loop;
14pub mod client_message_sorter;
15pub mod connector;
16pub mod device;
17pub mod serializer;
18
19use buttplug_core::{
20 connector::{ButtplugConnector, ButtplugConnectorError},
21 errors::{ButtplugError, ButtplugHandshakeError},
22 message::{
23 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
24 BUTTPLUG_CURRENT_API_MINOR_VERSION,
25 ButtplugClientMessageV4,
26 ButtplugServerMessageV4,
27 InputType,
28 PingV0,
29 RequestDeviceListV0,
30 RequestServerInfoV4,
31 StartScanningV0,
32 StopCmdV4,
33 StopScanningV0,
34 },
35 util::stream::convert_broadcast_receiver_to_stream,
36};
37use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
38use dashmap::DashMap;
39pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
40use futures::{
41 Stream,
42 channel::oneshot,
43 future::{self, BoxFuture, FutureExt},
44};
45use log::*;
46use std::{
47 collections::BTreeMap,
48 sync::{
49 Arc,
50 atomic::{AtomicBool, Ordering},
51 },
52};
53use strum_macros::Display;
54use thiserror::Error;
55use tokio::sync::{Mutex, broadcast, mpsc};
56
57type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
63type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
64
65pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
67pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
68pub(crate) type ButtplugServerMessageSender = oneshot::Sender<ButtplugServerMessageResult>;
70
71pub struct ButtplugClientMessageFuturePair {
84 pub(crate) msg: ButtplugClientMessageV4,
85 pub(crate) sender: Option<ButtplugServerMessageSender>,
86}
87
88impl ButtplugClientMessageFuturePair {
89 pub fn new(msg: ButtplugClientMessageV4, sender: ButtplugServerMessageSender) -> Self {
90 Self {
91 msg,
92 sender: Some(sender),
93 }
94 }
95}
96
97#[derive(Debug, Error, Display)]
105pub enum ButtplugClientError {
106 #[error(transparent)]
108 ButtplugConnectorError(#[from] ButtplugConnectorError),
109 #[error(transparent)]
111 ButtplugError(#[from] ButtplugError),
112 ButtplugOutputCommandConversionError(String),
114 ButtplugMultipleInputAvailableError(InputType),
116}
117
118#[derive(Clone, Debug)]
124pub enum ButtplugClientEvent {
125 ScanningFinished,
128 DeviceListReceived,
131 DeviceAdded(ButtplugClientDevice),
134 DeviceRemoved(ButtplugClientDevice),
137 PingTimeout,
140 ServerConnect,
142 ServerDisconnect,
144 Error(ButtplugError),
147}
148
149impl Unpin for ButtplugClientEvent {
150}
151
152pub(crate) fn create_boxed_future_client_error<T>(
153 err: ButtplugError,
154) -> ButtplugClientResultFuture<T>
155where
156 T: 'static + Send + Sync,
157{
158 future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
159}
160
161#[derive(Clone, Debug)]
162pub(crate) struct ButtplugClientMessageSender {
163 message_sender: mpsc::Sender<ButtplugClientRequest>,
164 connected: Arc<AtomicBool>,
165}
166
167impl ButtplugClientMessageSender {
168 fn new(message_sender: mpsc::Sender<ButtplugClientRequest>, connected: &Arc<AtomicBool>) -> Self {
169 Self {
170 message_sender,
171 connected: connected.clone(),
172 }
173 }
174
175 pub fn send_message_to_event_loop(
179 &self,
180 msg: ButtplugClientRequest,
181 ) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
182 let message_sender = self.message_sender.clone();
186 async move {
187 message_sender
188 .send(msg)
189 .await
190 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
191 Ok(())
192 }
193 .boxed()
194 }
195
196 pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
197 if !self.connected.load(Ordering::Relaxed) {
198 future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
199 } else {
200 self.send_message_ignore_connect_status(msg)
201 }
202 }
203
204 pub fn send_message_ignore_connect_status(
207 &self,
208 msg: ButtplugClientMessageV4,
209 ) -> ButtplugServerMessageResultFuture {
210 let (tx, rx) = oneshot::channel();
212 let internal_msg =
213 ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(msg, tx));
214
215 let send_fut = self.send_message_to_event_loop(internal_msg);
217 async move {
218 send_fut.await?;
219 rx.await
220 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?
221 }
222 .boxed()
223 }
224
225 pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
228 let send_fut = self.send_message(msg);
229 async move { send_fut.await.map(|_| ()) }.boxed()
230 }
231}
232
233pub struct ButtplugClient {
250 client_name: String,
253 server_name: Arc<Mutex<Option<String>>>,
255 event_stream: broadcast::Sender<ButtplugClientEvent>,
256 message_sender: ButtplugClientMessageSender,
258 request_receiver: Arc<Mutex<Option<mpsc::Receiver<ButtplugClientRequest>>>>,
260 connected: Arc<AtomicBool>,
261 device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
262}
263
264impl ButtplugClient {
265 pub fn new(name: &str) -> Self {
266 let (request_sender, request_receiver) = mpsc::channel(256);
267 let (event_stream, _) = broadcast::channel(256);
268 let connected = Arc::new(AtomicBool::new(false));
269 Self {
270 client_name: name.to_owned(),
271 server_name: Arc::new(Mutex::new(None)),
272 event_stream,
273 message_sender: ButtplugClientMessageSender::new(request_sender, &connected),
274 request_receiver: Arc::new(Mutex::new(Some(request_receiver))),
275 connected,
276 device_map: Arc::new(DashMap::new()),
277 }
278 }
279
280 pub async fn connect<ConnectorType>(
281 &self,
282 mut connector: ConnectorType,
283 ) -> Result<(), ButtplugClientError>
284 where
285 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
286 {
287 if self.connected() {
288 return Err(ButtplugClientError::ButtplugConnectorError(
289 ButtplugConnectorError::ConnectorAlreadyConnected,
290 ));
291 }
292
293 self.device_map.clear();
295
296 let request_receiver = self.request_receiver.lock().await.take().ok_or(
299 ButtplugConnectorError::ConnectorGenericError(
300 "Cannot reconnect - request channel already consumed. Create a new client.".to_string(),
301 ),
302 )?;
303
304 info!("Connecting to server.");
305 let (connector_sender, connector_receiver) = mpsc::channel(256);
306 connector.connect(connector_sender).await.map_err(|e| {
307 error!("Connection to server failed: {:?}", e);
308 ButtplugClientError::from(e)
309 })?;
310 info!("Connection to server succeeded.");
311 let mut client_event_loop = ButtplugClientEventLoop::new(
312 self.connected.clone(),
313 connector,
314 connector_receiver,
315 self.event_stream.clone(),
316 self.message_sender.clone(),
317 request_receiver,
318 self.device_map.clone(),
319 );
320
321 buttplug_core::spawn!("ButtplugClient event loop", async move {
323 client_event_loop.run().await;
324 });
325 self.run_handshake().await
326 }
327
328 async fn run_handshake(&self) -> ButtplugClientResult {
335 info!("Running handshake with server.");
337 let msg = self
338 .message_sender
339 .send_message_ignore_connect_status(
340 RequestServerInfoV4::new(
341 &self.client_name,
342 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
343 BUTTPLUG_CURRENT_API_MINOR_VERSION,
344 )
345 .into(),
346 )
347 .await?;
348
349 debug!("Got ServerInfo return.");
350 if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
351 info!("Connected to {}", server_info.server_name());
352 *self.server_name.lock().await = Some(server_info.server_name().clone());
353 self.connected.store(true, Ordering::Relaxed);
357
358 let msg = self
362 .message_sender
363 .send_message(RequestDeviceListV0::default().into())
364 .await?;
365 if let ButtplugServerMessageV4::DeviceList(m) = msg {
366 self
367 .message_sender
368 .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
369 .await?;
370 }
371 Ok(())
372 } else {
373 self.disconnect().await?;
374 Err(ButtplugClientError::ButtplugError(
375 ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
376 ))
377 }
378 }
379
380 pub fn connected(&self) -> bool {
382 self.connected.load(Ordering::Relaxed)
383 }
384
385 pub fn disconnect(&self) -> ButtplugClientResultFuture {
390 if !self.connected() {
391 return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
392 }
393 let (tx, rx) = oneshot::channel();
397 let msg = ButtplugClientRequest::Disconnect(tx);
398 let send_fut = self.message_sender.send_message_to_event_loop(msg);
399 let connected = self.connected.clone();
400 async move {
401 connected.store(false, Ordering::Relaxed);
402 send_fut.await?;
403 let _ = rx.await;
405 Ok(())
406 }
407 .boxed()
408 }
409
410 pub fn start_scanning(&self) -> ButtplugClientResultFuture {
415 self
416 .message_sender
417 .send_message_expect_ok(StartScanningV0::default().into())
418 }
419
420 pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
425 self
426 .message_sender
427 .send_message_expect_ok(StopScanningV0::default().into())
428 }
429
430 pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
435 self
436 .message_sender
437 .send_message_expect_ok(StopCmdV4::default().into())
438 }
439
440 pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
441 let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
442 Box::pin(stream)
448 }
449
450 pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
452 self
453 .device_map
454 .iter()
455 .map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
456 .collect()
457 }
458
459 pub fn ping(&self) -> ButtplugClientResultFuture {
460 let ping_fut = self
461 .message_sender
462 .send_message_expect_ok(PingV0::default().into());
463 ping_fut.boxed()
464 }
465
466 pub fn server_name(&self) -> Option<String> {
467 if let Ok(name) = self.server_name.try_lock() {
474 name.clone()
475 } else {
476 None
477 }
478 }
479}