1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
// Buttplug Rust Source Code File - See https://buttplug.io for more info.

//

// Copyright 2016-2019 Nonpolynomial Labs LLC. All rights reserved.

//

// Licensed under the BSD 3-Clause license. See LICENSE file in the project root

// for full license information.


//! Communications API for accessing Buttplug Servers

mod client_message_sorter;
pub mod device;
pub mod internal;

use device::ButtplugClientDevice;
use internal::{client_event_loop, ButtplugClientDeviceInternal, ButtplugClientRequest};

use crate::{
  connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorFuture},
  core::{
    errors::{ButtplugError, ButtplugHandshakeError},
    messages::{
      ButtplugCurrentSpecClientMessage,
      ButtplugCurrentSpecServerMessage,
      ButtplugMessageSpecVersion,
      DeviceMessageInfo,
      LogLevel,
      RequestDeviceList,
      RequestServerInfo,
      StartScanning,
      StopScanning,
    },
  },
  util::{
    async_manager,
    future::{ButtplugFuture, ButtplugFutureStateShared},
  },
};
use async_channel::Sender;
use dashmap::DashMap;
use futures::{
  future::{self, BoxFuture, Future},
  FutureExt,
  StreamExt,
};
use std::sync::{
  atomic::{AtomicBool, Ordering},
  Arc,
};
use thiserror::Error;
use tracing::{span::Span, Level};
use tracing_futures::Instrument;

/// Result type used inside the client module.

///

/// When communicating inside the client module, we'll usually only receive

/// errors related to the connector. Buttplug

/// [Error][crate::core::messages::Error] messages will still be valid, because

/// they're coming from the server.

// TODO This is not longer used since we return errors at the point of deserialization. Remove.

type ButtplugInternalClientResult<T = ()> = Result<T, ButtplugClientError>;
/// Result type used for public APIs.

///

/// Allows us to differentiate between an issue with the connector (as a

/// [ButtplugConnectorError]) and an issue within Buttplug (as a

/// [ButtplugError]).

type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;

/// Result type used for passing server responses.

pub type ButtplugInternalClientMessageResult =
  ButtplugInternalClientResult<ButtplugCurrentSpecServerMessage>;
pub type ButtplugInternalClientMessageResultFuture =
  BoxFuture<'static, ButtplugInternalClientMessageResult>;
/// Future state type for returning server responses across futures.

pub(crate) type ButtplugClientMessageStateShared =
  ButtplugFutureStateShared<ButtplugInternalClientMessageResult>;
/// Future type that expects server responses.

pub(crate) type ButtplugClientMessageFuture = ButtplugFuture<ButtplugInternalClientMessageResult>;

/// Future state for messages sent from the client that expect a server

/// response.

///

/// When a message is sent from the client and expects a response from the

/// server, we'd like to know when that response arrives, and usually we'll want

/// to wait for it. We can do so by creating a future that will be resolved when

/// a response is received from the server.

///

/// To do this, we build a [ButtplugFuture], then take its waker and pass it

/// along with the message we send to the connector, using the

/// [ButtplugClientMessageFuturePair] type. We can then expect the connector to

/// get the response from the server, match it with our message (using something

/// like the

/// [ClientConnectorMessageSorter][crate::connector::ClientConnectorMessageSorter]),

/// and set the reply in the waker we've sent along. This will resolve the

/// future we're waiting on and allow us to continue execution.

pub struct ButtplugClientMessageFuturePair {
  pub msg: ButtplugCurrentSpecClientMessage,
  pub waker: ButtplugClientMessageStateShared,
}

impl ButtplugClientMessageFuturePair {
  pub fn new(
    msg: ButtplugCurrentSpecClientMessage,
    waker: ButtplugClientMessageStateShared,
  ) -> Self {
    Self { msg, waker }
  }
}

/// Represents all of the different types of errors a ButtplugClient can return.

///

/// Clients can return two types of errors:

///

/// - [ButtplugConnectorError], which means there was a problem with the

/// connection between the client and the server, like a network connection

/// issue.

/// - [ButtplugError], which is an error specific to the Buttplug Protocol.

#[derive(Debug, Error)]
pub enum ButtplugClientError {
  /// Connector error

  #[error(transparent)]
  ButtplugConnectorError(#[from] ButtplugConnectorError),
  /// Protocol error

  #[error(transparent)]
  ButtplugError(#[from] ButtplugError),
}

/// Enum representing different events that can be emitted by a client.

///

/// These events are created by the server and sent to the client, and represent

/// unrequested actions that the client will need to respond to, or that

/// applications using the client may be interested in.

#[derive(Clone)]
pub enum ButtplugClientEvent {
  /// Emitted when a scanning session (started via a StartScanning call on

  /// [ButtplugClient]) has finished.

  ScanningFinished,
  /// Emitted when a device has been added to the server. Includes a

  /// [ButtplugClientDevice] object representing the device.

  DeviceAdded(ButtplugClientDevice),
  /// Emitted when a device has been removed from the server. Includes a

  /// [ButtplugClientDevice] object representing the device.

  DeviceRemoved(DeviceMessageInfo),
  /// Emitted when log messages are sent from the server.

  Log(LogLevel, String),
  /// Emitted when a client has not pinged the server in a sufficient amount

  /// of time.

  PingTimeout,
  /// Emitted when a client connector detects that the server has

  /// disconnected.

  ServerDisconnect,
  Error(ButtplugError),
}

/// Struct used by applications to communicate with a Buttplug Server.

///

/// Buttplug Clients provide an API layer on top of the Buttplug Protocol that

/// handles boring things like message creation and pairing, protocol ordering,

/// etc... This allows developers to concentrate on controlling hardware with

/// the API.

///

/// Clients serve a few different purposes:

/// - Managing connections to servers, thru [ButtplugClientConnector]s

/// - Emitting events received from the Server

/// - Holding state related to the server (i.e. what devices are currently

///   connected, etc...)

///

/// Clients are created by the [ButtplugClient::run()] method, which also

/// handles spinning up the event loop and connecting the client to the server.

/// Closures passed to the run() method can access and use the Client object.

pub struct ButtplugClient {
  /// The client name. Depending on the connection type and server being used,

  /// this name is sometimes shown on the server logs or GUI.

  pub client_name: String,
  /// The server name that we're current connected to.

  pub server_name: String,
  // Sender to relay messages to the internal client loop

  message_sender: Sender<ButtplugClientRequest>,
  // True if the connector is currently connected, and handshake was

  // successful.

  connected: Arc<AtomicBool>,
  _client_span: Span,
  device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
}

unsafe impl Send for ButtplugClient {
}
// Not actually sure this should be sync, but trying to call handshake breaks

// without it.

unsafe impl Sync for ButtplugClient {
}

impl ButtplugClient {
  pub fn connect<ConnectorType>(
    name: &str,
    mut connector: ConnectorType,
  ) -> BoxFuture<
    'static,
    Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>,
  >
  where
    ConnectorType: ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
      + 'static,
  {
    trace!("run() called, creating client future.");
    let client_name = name.to_string();
    Box::pin(async move {
      let span = span!(Level::INFO, "Client");
      let _client_span = span.enter();
      info!("Connecting to server.");
      let connector_receiver = connector.connect().await.map_err(|e| {
        error!("Connection to server failed: {:?}", e);
        let err: ButtplugClientError = e.into();
        err
      })?;
      info!("Connection to server succeeded.");
      let (client_event_loop_fut, device_map_reader, message_sender, event_channel) =
        client_event_loop(connector, connector_receiver);

      let client_event_receiver = event_channel.clone();
      let mut disconnect_event_receiver = event_channel.clone();
      let connected_status = Arc::new(AtomicBool::new(true));
      let connected_status_clone = connected_status.clone();

      // Start the event loop before we run the handshake.

      async_manager::spawn(
        async move {
          let disconnect_fut = async move {
            loop {
              if let Some(ButtplugClientEvent::ServerDisconnect) =
                disconnect_event_receiver.next().await
              {
                connected_status.store(false, Ordering::SeqCst);
                break;
              }
            }
            Result::<(), ButtplugClientError>::Ok(())
          }
          .instrument(tracing::info_span!("Client Disconnect Loop"));
          // If we disconnect, we'll also stop the client event loop. If the

          // client event loop stops, we don't care about listening for disconnect

          // anymore.

          select! {
            _ = client_event_loop_fut.fuse() => (),
            _ = disconnect_fut.fuse() => (),
          };
        }
        .instrument(tracing::info_span!("Client Loop Span")),
      )
      .unwrap();
      let client = ButtplugClient::create_client(
        &client_name,
        connected_status_clone,
        message_sender,
        device_map_reader,
        span.clone(),
      )
      .await?;
      Ok((client, client_event_receiver))
    })
  }

  /// Convenience function for creating in-process connectors.

  ///

  /// Creates a [ButtplugClient] event loop, with an in-process connector with

  /// all device managers that ship with the library and work on the current

  /// platform added to it already. Takes a maximum ping time to build the

  /// server with, other parameters match `run()`.

  ///

  /// # When To Use This Instead of `run()`

  ///

  /// If you just want to build a quick example and save yourself a few use

  /// statements and setup, this will get you going. For anything *production*,

  /// we recommend using `run()` as you will have more control over what

  /// happens. This method may gain/lose device comm managers at any time.

  ///

  /// # The Device I Want To Use Doesn't Show Up

  ///

  /// If you are trying to use this method to create your client, and do not see

  /// the devices you want, there are a couple of things to check:

  ///

  /// - Are you on a platform that the device communication manager supports?

  ///   For instance, we only support XInput on windows.

  /// - Did the developers add a new Device CommunicationManager type and forget

  ///   to add it to this method? _It's more likely than you think!_ [File a

  ///   bug](https://github.com/buttplugio/buttplug-rs/issues).

  ///

  /// # Errors

  ///

  /// If the library was compiled without any device managers, the

  /// [ButtplugClient] will have nothing to do. This is considered a

  /// catastrophic failure and the library will return an error.

  ///

  /// If the library is using outside device managers, it is recommended to

  /// build your own connector, add your device manager to those, and use the

  /// `run()` method to pass it in.

  #[cfg(feature = "server")]
  pub fn connect_in_process(
    name: &str,
    max_ping_time: u64,
  ) -> impl Future<
    Output = Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>,
  > {
    use crate::connector::ButtplugInProcessClientConnector;

    let connector =
      ButtplugInProcessClientConnector::new("Default In Process Server", max_ping_time);
    #[cfg(feature = "btleplug-manager")]
    {
      use crate::server::comm_managers::btleplug::BtlePlugCommunicationManager;
      connector
        .server_ref()
        .add_comm_manager::<BtlePlugCommunicationManager>()
        .unwrap();
    }
    #[cfg(all(feature = "xinput-manager", target_os = "windows"))]
    {
      use crate::server::comm_managers::xinput::XInputDeviceCommunicationManager;
      connector
        .server_ref()
        .add_comm_manager::<XInputDeviceCommunicationManager>()
        .unwrap();
    }
    ButtplugClient::connect(name, connector)
  }

  /// Creates the ButtplugClient instance and tries to establish a connection.

  ///

  /// Takes all of the components needed to build a [ButtplugClient], creates

  /// the struct, then tries to run connect and execute the Buttplug protocol

  /// handshake. Will return a connected and ready to use ButtplugClient is all

  /// goes well.

  async fn create_client(
    client_name: &str,
    connected_status: Arc<AtomicBool>,
    message_sender: Sender<ButtplugClientRequest>,
    device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>,
    span: Span,
  ) -> Result<Self, ButtplugClientError> {
    // Create the client

    let mut client = ButtplugClient {
      client_name: client_name.to_string(),
      server_name: String::new(),
      message_sender,
      // Since we'll have already connected and initialized by the time we hand

      // this to the client function, we can go ahead and declare that we're

      // connected here. If that's not true, we won't even execute the client

      // function.

      connected: connected_status,
      device_map,
      _client_span: span,
    };

    // Run our handshake

    info!("Running handshake with server.");
    let msg = client
      .send_message(
        RequestServerInfo::new(&client.client_name, ButtplugMessageSpecVersion::Version2).into(),
      )
      .await?;

    debug!("Got ServerInfo return.");
    if let ButtplugCurrentSpecServerMessage::ServerInfo(server_info) = msg {
      info!("Connected to {}", server_info.server_name);
      client.server_name = server_info.server_name;
      // TODO Handle ping time in the internal event loop


      // Get currently connected devices. The event loop will

      // handle sending the message and getting the return, and

      // will send the client updates as events.

      let msg = client
        .send_message(RequestDeviceList::default().into())
        .await?;
      if let ButtplugCurrentSpecServerMessage::DeviceList(m) = msg {
        client
          .send_internal_message(ButtplugClientRequest::HandleDeviceList(m))
          .await?;
      }
      Ok(client)
    } else {
      client.disconnect().await?;
      Err(ButtplugClientError::ButtplugError(
        ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{:?}", msg)).into(),
      ))
    }
  }

  /// Returns true if client is currently connected.

  pub fn connected(&self) -> bool {
    self.connected.load(Ordering::SeqCst)
  }

  /// Disconnects from server, if connected.

  ///

  /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed

  /// that even on failure, the client will be disconnected.

  pub fn disconnect(&self) -> ButtplugClientResultFuture {
    // Send the connector to the internal loop for management. Once we throw

    // the connector over, the internal loop will handle connecting and any

    // further communications with the server, if connection is successful.

    let fut = ButtplugConnectorFuture::default();
    let msg = ButtplugClientRequest::Disconnect(fut.get_state_clone());
    let send_fut = self.send_internal_message(msg);
    let connected = self.connected.clone();
    Box::pin(async move {
      send_fut.await?;
      connected.store(false, Ordering::SeqCst);
      Ok(())
    })
  }

  /// Tells server to start scanning for devices.

  ///

  /// Returns Err([ButtplugClientError]) if request fails due to issues with

  /// DeviceManagers on the server, disconnection, etc.

  pub fn start_scanning(&self) -> ButtplugClientResultFuture {
    self.send_message_expect_ok(StartScanning::default().into())
  }

  /// Tells server to stop scanning for devices.

  ///

  /// Returns Err([ButtplugClientError]) if request fails due to issues with

  /// DeviceManagers on the server, disconnection, etc.

  pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
    self.send_message_expect_ok(StopScanning::default().into())
  }

  /// Send message to the internal event loop.

  ///

  /// Mostly for handling boilerplate around possible send errors.

  fn send_internal_message(
    &self,
    msg: ButtplugClientRequest,
  ) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
    if !self.connected.load(Ordering::SeqCst) {
      return Box::pin(future::ready(Err(
        ButtplugConnectorError::ConnectorNotConnected,
      )));
    }
    // If we're running the event loop, we should have a message_sender.

    // Being connected to the server doesn't matter here yet because we use

    // this function in order to connect also.

    let message_sender = self.message_sender.clone();
    Box::pin(async move {
      message_sender
        .send(msg)
        .await
        .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
      Ok(())
    })
  }

  /// Sends a ButtplugMessage from client to server. Expects to receive a

  /// ButtplugMessage back from the server.

  fn send_message(
    &self,
    msg: ButtplugCurrentSpecClientMessage,
  ) -> ButtplugInternalClientMessageResultFuture {
    // Create a future to pair with the message being resolved.

    let fut = ButtplugClientMessageFuture::default();
    let internal_msg = ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(
      msg,
      fut.get_state_clone(),
    ));

    // Send message to internal loop and wait for return.

    let send_fut = self.send_internal_message(internal_msg);
    Box::pin(async move {
      send_fut.await?;
      fut.await
    })
  }

  /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok]

  /// type ButtplugMessage back from the server.

  fn send_message_expect_ok(
    &self,
    msg: ButtplugCurrentSpecClientMessage,
  ) -> ButtplugClientResultFuture {
    let send_fut = self.send_message(msg);
    Box::pin(async move { send_fut.await.map(|_| ()).map_err(|err| err) })
  }

  /// Retreives a list of currently connected devices.

  ///

  /// As the device list is maintained in the event loop structure, retreiving

  /// the list requires an asynchronous call to retreive the list from the task.

  pub fn devices(&self) -> Vec<ButtplugClientDevice> {
    info!("Request devices from inner loop!");
    let mut device_clones = vec![];
    for device in self.device_map.iter() {
      device_clones.push(ButtplugClientDevice::from((
        &(*device.device),
        self.message_sender.clone(),
        (*device.channel).clone(),
      )));
    }
    device_clones
  }
}