Skip to main content

buttplug_client/
lib.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Communications API for accessing Buttplug Servers
9
10#[macro_use]
11extern crate log;
12
13pub mod client_event_loop;
14pub mod client_message_sorter;
15pub mod connector;
16pub mod device;
17pub mod serializer;
18
19use buttplug_core::{
20  connector::{ButtplugConnector, ButtplugConnectorError},
21  errors::{ButtplugError, ButtplugHandshakeError},
22  message::{
23    BUTTPLUG_CURRENT_API_MAJOR_VERSION,
24    BUTTPLUG_CURRENT_API_MINOR_VERSION,
25    ButtplugClientMessageV4,
26    ButtplugServerMessageV4,
27    InputType,
28    PingV0,
29    RequestDeviceListV0,
30    RequestServerInfoV4,
31    StartScanningV0,
32    StopCmdV4,
33    StopScanningV0,
34  },
35  util::stream::convert_broadcast_receiver_to_stream,
36};
37use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
38use dashmap::DashMap;
39pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
40use futures::{
41  Stream,
42  channel::oneshot,
43  future::{self, BoxFuture, FutureExt},
44};
45use log::*;
46use std::{
47  collections::BTreeMap,
48  sync::{
49    Arc,
50    atomic::{AtomicBool, Ordering},
51  },
52};
53use strum_macros::Display;
54use thiserror::Error;
55use tokio::sync::{Mutex, broadcast, mpsc};
56
57/// Result type used for public APIs.
58///
59/// Allows us to differentiate between an issue with the connector (as a
60/// [ButtplugConnectorError]) and an issue within Buttplug (as a
61/// [ButtplugError]).
62type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
63type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
64
65/// Result type used for passing server responses.
66pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
67pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
68/// Sender type for resolving server message futures.
69pub(crate) type ButtplugServerMessageSender = oneshot::Sender<ButtplugServerMessageResult>;
70
71/// Future state for messages sent from the client that expect a server response.
72///
73/// When a message is sent from the client and expects a response from the server, we'd like to know
74/// when that response arrives, and usually we'll want to wait for it. We can do so by creating a
75/// future that will be resolved when a response is received from the server.
76///
77/// To do this, we create a oneshot channel, then pass the sender along with the message
78/// we send to the connector, using the [ButtplugClientMessageFuturePair] type. We can then expect
79/// the connector to get the response from the server, match it with our message (using something
80/// like the ClientMessageSorter, an internal structure in the Buttplug library), and send the reply
81/// via the sender. This will resolve the receiver future we're waiting on and allow us to
82/// continue execution.
83pub struct ButtplugClientMessageFuturePair {
84  pub(crate) msg: ButtplugClientMessageV4,
85  pub(crate) sender: Option<ButtplugServerMessageSender>,
86}
87
88impl ButtplugClientMessageFuturePair {
89  pub fn new(msg: ButtplugClientMessageV4, sender: ButtplugServerMessageSender) -> Self {
90    Self {
91      msg,
92      sender: Some(sender),
93    }
94  }
95}
96
97/// Represents all of the different types of errors a ButtplugClient can return.
98///
99/// Clients can return two types of errors:
100///
101/// - [ButtplugConnectorError], which means there was a problem with the connection between the
102/// client and the server, like a network connection issue.
103/// - [ButtplugError], which is an error specific to the Buttplug Protocol.
104#[derive(Debug, Error, Display)]
105pub enum ButtplugClientError {
106  /// Connector error
107  #[error(transparent)]
108  ButtplugConnectorError(#[from] ButtplugConnectorError),
109  /// Protocol error
110  #[error(transparent)]
111  ButtplugError(#[from] ButtplugError),
112  /// Error converting output command: {}
113  ButtplugOutputCommandConversionError(String),
114  /// Multiple inputs available for {}, must use specific feature
115  ButtplugMultipleInputAvailableError(InputType),
116}
117
118/// Enum representing different events that can be emitted by a client.
119///
120/// These events are created by the server and sent to the client, and represent
121/// unrequested actions that the client will need to respond to, or that
122/// applications using the client may be interested in.
123#[derive(Clone, Debug)]
124pub enum ButtplugClientEvent {
125  /// Emitted when a scanning session (started via a StartScanning call on
126  /// [ButtplugClient]) has finished.
127  ScanningFinished,
128  /// Emitted when the device list is received as a response to a
129  /// DeviceListRequest call, which is sent during the handshake.
130  DeviceListReceived,
131  /// Emitted when a device has been added to the server. Includes a
132  /// [ButtplugClientDevice] object representing the device.
133  DeviceAdded(ButtplugClientDevice),
134  /// Emitted when a device has been removed from the server. Includes a
135  /// [ButtplugClientDevice] object representing the device.
136  DeviceRemoved(ButtplugClientDevice),
137  /// Emitted when a client has not pinged the server in a sufficient amount of
138  /// time.
139  PingTimeout,
140  /// Emitted when the client successfully connects to a server.
141  ServerConnect,
142  /// Emitted when a client connector detects that the server has disconnected.
143  ServerDisconnect,
144  /// Emitted when an error that cannot be matched to a request is received from
145  /// the server.
146  Error(ButtplugError),
147}
148
149impl Unpin for ButtplugClientEvent {
150}
151
152pub(crate) fn create_boxed_future_client_error<T>(
153  err: ButtplugError,
154) -> ButtplugClientResultFuture<T>
155where
156  T: 'static + Send + Sync,
157{
158  future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
159}
160
161#[derive(Clone, Debug)]
162pub(crate) struct ButtplugClientMessageSender {
163  message_sender: mpsc::Sender<ButtplugClientRequest>,
164  connected: Arc<AtomicBool>,
165}
166
167impl ButtplugClientMessageSender {
168  fn new(message_sender: mpsc::Sender<ButtplugClientRequest>, connected: &Arc<AtomicBool>) -> Self {
169    Self {
170      message_sender,
171      connected: connected.clone(),
172    }
173  }
174
175  /// Send message to the internal event loop.
176  ///
177  /// Mostly for handling boilerplate around possible send errors.
178  pub fn send_message_to_event_loop(
179    &self,
180    msg: ButtplugClientRequest,
181  ) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
182    // If we're running the event loop, we should have a message_sender.
183    // Being connected to the server doesn't matter here yet because we use
184    // this function in order to connect also.
185    let message_sender = self.message_sender.clone();
186    async move {
187      message_sender
188        .send(msg)
189        .await
190        .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
191      Ok(())
192    }
193    .boxed()
194  }
195
196  pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
197    if !self.connected.load(Ordering::Relaxed) {
198      future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
199    } else {
200      self.send_message_ignore_connect_status(msg)
201    }
202  }
203
204  /// Sends a ButtplugMessage from client to server. Expects to receive a ButtplugMessage back from
205  /// the server.
206  pub fn send_message_ignore_connect_status(
207    &self,
208    msg: ButtplugClientMessageV4,
209  ) -> ButtplugServerMessageResultFuture {
210    // Create a oneshot channel for receiving the response.
211    let (tx, rx) = oneshot::channel();
212    let internal_msg =
213      ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(msg, tx));
214
215    // Send message to internal loop and wait for return.
216    let send_fut = self.send_message_to_event_loop(internal_msg);
217    async move {
218      send_fut.await?;
219      rx.await
220        .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?
221    }
222    .boxed()
223  }
224
225  /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok]
226  /// type ButtplugMessage back from the server.
227  pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
228    let send_fut = self.send_message(msg);
229    async move { send_fut.await.map(|_| ()) }.boxed()
230  }
231}
232
233/// Struct used by applications to communicate with a Buttplug Server.
234///
235/// Buttplug Clients provide an API layer on top of the Buttplug Protocol that
236/// handles boring things like message creation and pairing, protocol ordering,
237/// etc... This allows developers to concentrate on controlling hardware with
238/// the API.
239///
240/// Clients serve a few different purposes:
241/// - Managing connections to servers, thru [ButtplugConnector]s
242/// - Emitting events received from the Server
243/// - Holding state related to the server (i.e. what devices are currently
244///   connected, etc...)
245///
246/// Clients are created by the [ButtplugClient::new()] method, which also
247/// handles spinning up the event loop and connecting the client to the server.
248/// Closures passed to the run() method can access and use the Client object.
249pub struct ButtplugClient {
250  /// The client name. Depending on the connection type and server being used,
251  /// this name is sometimes shown on the server logs or GUI.
252  client_name: String,
253  /// The server name that we're current connected to.
254  server_name: Arc<Mutex<Option<String>>>,
255  event_stream: broadcast::Sender<ButtplugClientEvent>,
256  // Sender to relay messages to the internal client loop
257  message_sender: ButtplugClientMessageSender,
258  // Receiver for client requests, taken on connect and given to event loop
259  request_receiver: Arc<Mutex<Option<mpsc::Receiver<ButtplugClientRequest>>>>,
260  connected: Arc<AtomicBool>,
261  device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
262}
263
264impl ButtplugClient {
265  pub fn new(name: &str) -> Self {
266    let (request_sender, request_receiver) = mpsc::channel(256);
267    let (event_stream, _) = broadcast::channel(256);
268    let connected = Arc::new(AtomicBool::new(false));
269    Self {
270      client_name: name.to_owned(),
271      server_name: Arc::new(Mutex::new(None)),
272      event_stream,
273      message_sender: ButtplugClientMessageSender::new(request_sender, &connected),
274      request_receiver: Arc::new(Mutex::new(Some(request_receiver))),
275      connected,
276      device_map: Arc::new(DashMap::new()),
277    }
278  }
279
280  pub async fn connect<ConnectorType>(
281    &self,
282    mut connector: ConnectorType,
283  ) -> Result<(), ButtplugClientError>
284  where
285    ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
286  {
287    if self.connected() {
288      return Err(ButtplugClientError::ButtplugConnectorError(
289        ButtplugConnectorError::ConnectorAlreadyConnected,
290      ));
291    }
292
293    // If connect is being called again, clear out the device map and start over.
294    self.device_map.clear();
295
296    // Take the request receiver - if None, a previous connection consumed it and we can't reconnect
297    // without creating a new client (the sender is tied to this receiver)
298    let request_receiver = self.request_receiver.lock().await.take().ok_or(
299      ButtplugConnectorError::ConnectorGenericError(
300        "Cannot reconnect - request channel already consumed. Create a new client.".to_string(),
301      ),
302    )?;
303
304    info!("Connecting to server.");
305    let (connector_sender, connector_receiver) = mpsc::channel(256);
306    connector.connect(connector_sender).await.map_err(|e| {
307      error!("Connection to server failed: {:?}", e);
308      ButtplugClientError::from(e)
309    })?;
310    info!("Connection to server succeeded.");
311    let mut client_event_loop = ButtplugClientEventLoop::new(
312      self.connected.clone(),
313      connector,
314      connector_receiver,
315      self.event_stream.clone(),
316      self.message_sender.clone(),
317      request_receiver,
318      self.device_map.clone(),
319    );
320
321    // Start the event loop before we run the handshake.
322    buttplug_core::spawn!("ButtplugClient event loop", async move {
323      client_event_loop.run().await;
324    });
325    self.run_handshake().await
326  }
327
328  /// Creates the ButtplugClient instance and tries to establish a connection.
329  ///
330  /// Takes all of the components needed to build a [ButtplugClient], creates
331  /// the struct, then tries to run connect and execute the Buttplug protocol
332  /// handshake. Will return a connected and ready to use ButtplugClient is all
333  /// goes well.
334  async fn run_handshake(&self) -> ButtplugClientResult {
335    // Run our handshake
336    info!("Running handshake with server.");
337    let msg = self
338      .message_sender
339      .send_message_ignore_connect_status(
340        RequestServerInfoV4::new(
341          &self.client_name,
342          BUTTPLUG_CURRENT_API_MAJOR_VERSION,
343          BUTTPLUG_CURRENT_API_MINOR_VERSION,
344        )
345        .into(),
346      )
347      .await?;
348
349    debug!("Got ServerInfo return.");
350    if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
351      info!("Connected to {}", server_info.server_name());
352      *self.server_name.lock().await = Some(server_info.server_name().clone());
353      // Don't set ourselves as connected until after ServerInfo has been
354      // received. This means we avoid possible races with the RequestServerInfo
355      // handshake.
356      self.connected.store(true, Ordering::Relaxed);
357
358      // Get currently connected devices. The event loop will
359      // handle sending the message and getting the return, and
360      // will send the client updates as events.
361      let msg = self
362        .message_sender
363        .send_message(RequestDeviceListV0::default().into())
364        .await?;
365      if let ButtplugServerMessageV4::DeviceList(m) = msg {
366        self
367          .message_sender
368          .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
369          .await?;
370      }
371      Ok(())
372    } else {
373      self.disconnect().await?;
374      Err(ButtplugClientError::ButtplugError(
375        ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
376      ))
377    }
378  }
379
380  /// Returns true if client is currently connected.
381  pub fn connected(&self) -> bool {
382    self.connected.load(Ordering::Relaxed)
383  }
384
385  /// Disconnects from server, if connected.
386  ///
387  /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed
388  /// that even on failure, the client will be disconnected.
389  pub fn disconnect(&self) -> ButtplugClientResultFuture {
390    if !self.connected() {
391      return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
392    }
393    // Send the connector to the internal loop for management. Once we throw
394    // the connector over, the internal loop will handle connecting and any
395    // further communications with the server, if connection is successful.
396    let (tx, rx) = oneshot::channel();
397    let msg = ButtplugClientRequest::Disconnect(tx);
398    let send_fut = self.message_sender.send_message_to_event_loop(msg);
399    let connected = self.connected.clone();
400    async move {
401      connected.store(false, Ordering::Relaxed);
402      send_fut.await?;
403      // Wait for disconnect to complete, but don't fail if channel closed
404      let _ = rx.await;
405      Ok(())
406    }
407    .boxed()
408  }
409
410  /// Tells server to start scanning for devices.
411  ///
412  /// Returns Err([ButtplugClientError]) if request fails due to issues with
413  /// DeviceManagers on the server, disconnection, etc.
414  pub fn start_scanning(&self) -> ButtplugClientResultFuture {
415    self
416      .message_sender
417      .send_message_expect_ok(StartScanningV0::default().into())
418  }
419
420  /// Tells server to stop scanning for devices.
421  ///
422  /// Returns Err([ButtplugClientError]) if request fails due to issues with
423  /// DeviceManagers on the server, disconnection, etc.
424  pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
425    self
426      .message_sender
427      .send_message_expect_ok(StopScanningV0::default().into())
428  }
429
430  /// Tells server to stop all devices.
431  ///
432  /// Returns Err([ButtplugClientError]) if request fails due to issues with
433  /// DeviceManagers on the server, disconnection, etc.
434  pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
435    self
436      .message_sender
437      .send_message_expect_ok(StopCmdV4::default().into())
438  }
439
440  pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
441    let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
442    // We can either Box::pin here or force the user to pin_mut!() on their
443    // end. While this does end up with a dynamic dispatch on our end, it
444    // still makes the API nicer for the user, so we'll just eat the perf hit.
445    // Not to mention, this is not a high throughput system really, so it
446    // shouldn't matter.
447    Box::pin(stream)
448  }
449
450  /// Retreives a list of currently connected devices.
451  pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
452    self
453      .device_map
454      .iter()
455      .map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
456      .collect()
457  }
458
459  pub fn ping(&self) -> ButtplugClientResultFuture {
460    let ping_fut = self
461      .message_sender
462      .send_message_expect_ok(PingV0::default().into());
463    ping_fut.boxed()
464  }
465
466  pub fn server_name(&self) -> Option<String> {
467    // We'd have to be calling server_name in an extremely tight, asynchronous
468    // loop for this to return None, so we'll treat this as lockless.
469    //
470    // Dear users actually reading this code: This is not an invitation for you
471    // to get the server name in a tight, asynchronous loop. This will never
472    // change throughout the life to the connection.
473    if let Ok(name) = self.server_name.try_lock() {
474      name.clone()
475    } else {
476      None
477    }
478  }
479}