1#[macro_use]
11extern crate log;
12
13pub mod client_event_loop;
14pub mod client_message_sorter;
15pub mod connector;
16pub mod device;
17pub mod serializer;
18
19use buttplug_core::{
20 connector::{ButtplugConnector, ButtplugConnectorError},
21 errors::{ButtplugError, ButtplugHandshakeError},
22 message::{
23 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
24 BUTTPLUG_CURRENT_API_MINOR_VERSION,
25 ButtplugClientMessageV4,
26 ButtplugServerMessageV4,
27 InputType,
28 PingV0,
29 RequestDeviceListV0,
30 RequestServerInfoV4,
31 StartScanningV0,
32 StopCmdV4,
33 StopScanningV0,
34 },
35 util::stream::convert_broadcast_receiver_to_stream,
36};
37use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
38use dashmap::DashMap;
39pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
40use futures::{
41 Stream,
42 channel::oneshot,
43 future::{self, BoxFuture, FutureExt},
44};
45use log::*;
46use std::{
47 collections::BTreeMap,
48 sync::{
49 Arc,
50 atomic::{AtomicBool, Ordering},
51 },
52};
53use thiserror::Error;
54use tokio::sync::{Mutex, broadcast, mpsc};
55
56type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
62type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
63
64pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
66pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
67pub(crate) type ButtplugServerMessageSender = oneshot::Sender<ButtplugServerMessageResult>;
69
70pub struct ButtplugClientMessageFuturePair {
83 pub(crate) msg: ButtplugClientMessageV4,
84 pub(crate) sender: Option<ButtplugServerMessageSender>,
85}
86
87impl ButtplugClientMessageFuturePair {
88 pub fn new(msg: ButtplugClientMessageV4, sender: ButtplugServerMessageSender) -> Self {
89 Self {
90 msg,
91 sender: Some(sender),
92 }
93 }
94}
95
96#[derive(Debug, Error)]
104pub enum ButtplugClientError {
105 #[error(transparent)]
107 ButtplugConnectorError(#[from] ButtplugConnectorError),
108 #[error(transparent)]
110 ButtplugError(#[from] ButtplugError),
111 #[error("Error converting output command: {0}")]
113 ButtplugOutputCommandConversionError(String),
114 #[error("Multiple inputs available for {0}, must use specific feature")]
116 ButtplugMultipleInputAvailableError(InputType),
117}
118
119#[derive(Clone, Debug)]
125pub enum ButtplugClientEvent {
126 ScanningFinished,
129 DeviceListReceived,
132 DeviceAdded(ButtplugClientDevice),
135 DeviceRemoved(ButtplugClientDevice),
138 PingTimeout,
141 ServerConnect,
143 ServerDisconnect,
145 Error(ButtplugError),
148}
149
150impl Unpin for ButtplugClientEvent {
151}
152
153pub(crate) fn create_boxed_future_client_error<T>(
154 err: ButtplugError,
155) -> ButtplugClientResultFuture<T>
156where
157 T: 'static + Send + Sync,
158{
159 future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
160}
161
162#[derive(Clone, Debug)]
163pub(crate) struct ButtplugClientMessageSender {
164 message_sender: mpsc::Sender<ButtplugClientRequest>,
165 connected: Arc<AtomicBool>,
166}
167
168impl ButtplugClientMessageSender {
169 fn new(message_sender: mpsc::Sender<ButtplugClientRequest>, connected: &Arc<AtomicBool>) -> Self {
170 Self {
171 message_sender,
172 connected: connected.clone(),
173 }
174 }
175
176 pub fn send_message_to_event_loop(
180 &self,
181 msg: ButtplugClientRequest,
182 ) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
183 let message_sender = self.message_sender.clone();
187 async move {
188 message_sender
189 .send(msg)
190 .await
191 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
192 Ok(())
193 }
194 .boxed()
195 }
196
197 pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
198 if !self.connected.load(Ordering::Relaxed) {
199 future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
200 } else {
201 self.send_message_ignore_connect_status(msg)
202 }
203 }
204
205 pub fn send_message_ignore_connect_status(
208 &self,
209 msg: ButtplugClientMessageV4,
210 ) -> ButtplugServerMessageResultFuture {
211 let (tx, rx) = oneshot::channel();
213 let internal_msg =
214 ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(msg, tx));
215
216 let send_fut = self.send_message_to_event_loop(internal_msg);
218 async move {
219 send_fut.await?;
220 rx.await
221 .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?
222 }
223 .boxed()
224 }
225
226 pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
229 let send_fut = self.send_message(msg);
230 async move { send_fut.await.map(|_| ()) }.boxed()
231 }
232}
233
234pub struct ButtplugClient {
251 client_name: String,
254 server_name: Arc<Mutex<Option<String>>>,
256 event_stream: broadcast::Sender<ButtplugClientEvent>,
257 message_sender: ButtplugClientMessageSender,
259 request_receiver: Arc<Mutex<Option<mpsc::Receiver<ButtplugClientRequest>>>>,
261 connected: Arc<AtomicBool>,
262 device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
263}
264
265impl ButtplugClient {
266 pub fn new(name: &str) -> Self {
267 let (request_sender, request_receiver) = mpsc::channel(256);
268 let (event_stream, _) = broadcast::channel(256);
269 let connected = Arc::new(AtomicBool::new(false));
270 Self {
271 client_name: name.to_owned(),
272 server_name: Arc::new(Mutex::new(None)),
273 event_stream,
274 message_sender: ButtplugClientMessageSender::new(request_sender, &connected),
275 request_receiver: Arc::new(Mutex::new(Some(request_receiver))),
276 connected,
277 device_map: Arc::new(DashMap::new()),
278 }
279 }
280
281 pub async fn connect<ConnectorType>(
282 &self,
283 mut connector: ConnectorType,
284 ) -> Result<(), ButtplugClientError>
285 where
286 ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
287 {
288 if self.connected() {
289 return Err(ButtplugClientError::ButtplugConnectorError(
290 ButtplugConnectorError::ConnectorAlreadyConnected,
291 ));
292 }
293
294 self.device_map.clear();
296
297 let request_receiver = self.request_receiver.lock().await.take().ok_or(
300 ButtplugConnectorError::ConnectorGenericError(
301 "Cannot reconnect - request channel already consumed. Create a new client.".to_string(),
302 ),
303 )?;
304
305 info!("Connecting to server.");
306 let (connector_sender, connector_receiver) = mpsc::channel(256);
307 connector.connect(connector_sender).await.map_err(|e| {
308 error!("Connection to server failed: {:?}", e);
309 ButtplugClientError::from(e)
310 })?;
311 info!("Connection to server succeeded.");
312 let mut client_event_loop = ButtplugClientEventLoop::new(
313 self.connected.clone(),
314 connector,
315 connector_receiver,
316 self.event_stream.clone(),
317 self.message_sender.clone(),
318 request_receiver,
319 self.device_map.clone(),
320 );
321
322 buttplug_core::spawn!("ButtplugClient event loop", async move {
324 client_event_loop.run().await;
325 });
326 self.run_handshake().await
327 }
328
329 async fn run_handshake(&self) -> ButtplugClientResult {
336 info!("Running handshake with server.");
338 let msg = self
339 .message_sender
340 .send_message_ignore_connect_status(
341 RequestServerInfoV4::new(
342 &self.client_name,
343 BUTTPLUG_CURRENT_API_MAJOR_VERSION,
344 BUTTPLUG_CURRENT_API_MINOR_VERSION,
345 )
346 .into(),
347 )
348 .await?;
349
350 debug!("Got ServerInfo return.");
351 if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
352 info!("Connected to {}", server_info.server_name());
353 *self.server_name.lock().await = Some(server_info.server_name().clone());
354 self.connected.store(true, Ordering::Relaxed);
358
359 let msg = self
363 .message_sender
364 .send_message(RequestDeviceListV0::default().into())
365 .await?;
366 if let ButtplugServerMessageV4::DeviceList(m) = msg {
367 self
368 .message_sender
369 .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
370 .await?;
371 }
372 Ok(())
373 } else {
374 self.disconnect().await?;
375 Err(ButtplugClientError::ButtplugError(
376 ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
377 ))
378 }
379 }
380
381 pub fn connected(&self) -> bool {
383 self.connected.load(Ordering::Relaxed)
384 }
385
386 pub fn disconnect(&self) -> ButtplugClientResultFuture {
391 if !self.connected() {
392 return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
393 }
394 let (tx, rx) = oneshot::channel();
398 let msg = ButtplugClientRequest::Disconnect(tx);
399 let send_fut = self.message_sender.send_message_to_event_loop(msg);
400 let connected = self.connected.clone();
401 async move {
402 connected.store(false, Ordering::Relaxed);
403 send_fut.await?;
404 let _ = rx.await;
406 Ok(())
407 }
408 .boxed()
409 }
410
411 pub fn start_scanning(&self) -> ButtplugClientResultFuture {
416 self
417 .message_sender
418 .send_message_expect_ok(StartScanningV0::default().into())
419 }
420
421 pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
426 self
427 .message_sender
428 .send_message_expect_ok(StopScanningV0::default().into())
429 }
430
431 pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
436 self
437 .message_sender
438 .send_message_expect_ok(StopCmdV4::default().into())
439 }
440
441 pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
442 let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
443 Box::pin(stream)
449 }
450
451 pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
453 self
454 .device_map
455 .iter()
456 .map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
457 .collect()
458 }
459
460 pub fn ping(&self) -> ButtplugClientResultFuture {
461 let ping_fut = self
462 .message_sender
463 .send_message_expect_ok(PingV0::default().into());
464 ping_fut.boxed()
465 }
466
467 pub fn server_name(&self) -> Option<String> {
468 if let Ok(name) = self.server_name.try_lock() {
475 name.clone()
476 } else {
477 None
478 }
479 }
480}