Skip to main content

buttplug_client/
lib.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8//! Communications API for accessing Buttplug Servers
9
10#[macro_use]
11extern crate log;
12
13pub mod client_event_loop;
14pub mod client_message_sorter;
15pub mod connector;
16pub mod device;
17pub mod serializer;
18
19use buttplug_core::{
20  connector::{ButtplugConnector, ButtplugConnectorError},
21  errors::{ButtplugError, ButtplugHandshakeError},
22  message::{
23    BUTTPLUG_CURRENT_API_MAJOR_VERSION,
24    BUTTPLUG_CURRENT_API_MINOR_VERSION,
25    ButtplugClientMessageV4,
26    ButtplugServerMessageV4,
27    InputType,
28    PingV0,
29    RequestDeviceListV0,
30    RequestServerInfoV4,
31    StartScanningV0,
32    StopCmdV4,
33    StopScanningV0,
34  },
35  util::stream::convert_broadcast_receiver_to_stream,
36};
37use client_event_loop::{ButtplugClientEventLoop, ButtplugClientRequest};
38use dashmap::DashMap;
39pub use device::{ButtplugClientDevice, ButtplugClientDeviceEvent};
40use futures::{
41  Stream,
42  channel::oneshot,
43  future::{self, BoxFuture, FutureExt},
44};
45use log::*;
46use std::{
47  collections::BTreeMap,
48  sync::{
49    Arc,
50    atomic::{AtomicBool, Ordering},
51  },
52};
53use thiserror::Error;
54use tokio::sync::{Mutex, broadcast, mpsc};
55
56/// Result type used for public APIs.
57///
58/// Allows us to differentiate between an issue with the connector (as a
59/// [ButtplugConnectorError]) and an issue within Buttplug (as a
60/// [ButtplugError]).
61type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>;
62type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>;
63
64/// Result type used for passing server responses.
65pub type ButtplugServerMessageResult = ButtplugClientResult<ButtplugServerMessageV4>;
66pub type ButtplugServerMessageResultFuture = ButtplugClientResultFuture<ButtplugServerMessageV4>;
67/// Sender type for resolving server message futures.
68pub(crate) type ButtplugServerMessageSender = oneshot::Sender<ButtplugServerMessageResult>;
69
70/// Future state for messages sent from the client that expect a server response.
71///
72/// When a message is sent from the client and expects a response from the server, we'd like to know
73/// when that response arrives, and usually we'll want to wait for it. We can do so by creating a
74/// future that will be resolved when a response is received from the server.
75///
76/// To do this, we create a oneshot channel, then pass the sender along with the message
77/// we send to the connector, using the [ButtplugClientMessageFuturePair] type. We can then expect
78/// the connector to get the response from the server, match it with our message (using something
79/// like the ClientMessageSorter, an internal structure in the Buttplug library), and send the reply
80/// via the sender. This will resolve the receiver future we're waiting on and allow us to
81/// continue execution.
82pub struct ButtplugClientMessageFuturePair {
83  pub(crate) msg: ButtplugClientMessageV4,
84  pub(crate) sender: Option<ButtplugServerMessageSender>,
85}
86
87impl ButtplugClientMessageFuturePair {
88  pub fn new(msg: ButtplugClientMessageV4, sender: ButtplugServerMessageSender) -> Self {
89    Self {
90      msg,
91      sender: Some(sender),
92    }
93  }
94}
95
96/// Represents all of the different types of errors a ButtplugClient can return.
97///
98/// Clients can return two types of errors:
99///
100/// - [ButtplugConnectorError], which means there was a problem with the connection between the
101///   client and the server, like a network connection issue.
102/// - [ButtplugError], which is an error specific to the Buttplug Protocol.
103#[derive(Debug, Error)]
104pub enum ButtplugClientError {
105  /// Connector error.
106  #[error(transparent)]
107  ButtplugConnectorError(#[from] ButtplugConnectorError),
108  /// Protocol error.
109  #[error(transparent)]
110  ButtplugError(#[from] ButtplugError),
111  /// Error converting output command.
112  #[error("Error converting output command: {0}")]
113  ButtplugOutputCommandConversionError(String),
114  /// Multiple inputs are available, so a specific feature must be used.
115  #[error("Multiple inputs available for {0}, must use specific feature")]
116  ButtplugMultipleInputAvailableError(InputType),
117}
118
119/// Enum representing different events that can be emitted by a client.
120///
121/// These events are created by the server and sent to the client, and represent
122/// unrequested actions that the client will need to respond to, or that
123/// applications using the client may be interested in.
124#[derive(Clone, Debug)]
125pub enum ButtplugClientEvent {
126  /// Emitted when a scanning session (started via a StartScanning call on
127  /// [ButtplugClient]) has finished.
128  ScanningFinished,
129  /// Emitted when the device list is received as a response to a
130  /// DeviceListRequest call, which is sent during the handshake.
131  DeviceListReceived,
132  /// Emitted when a device has been added to the server. Includes a
133  /// [ButtplugClientDevice] object representing the device.
134  DeviceAdded(ButtplugClientDevice),
135  /// Emitted when a device has been removed from the server. Includes a
136  /// [ButtplugClientDevice] object representing the device.
137  DeviceRemoved(ButtplugClientDevice),
138  /// Emitted when a client has not pinged the server in a sufficient amount of
139  /// time.
140  PingTimeout,
141  /// Emitted when the client successfully connects to a server.
142  ServerConnect,
143  /// Emitted when a client connector detects that the server has disconnected.
144  ServerDisconnect,
145  /// Emitted when an error that cannot be matched to a request is received from
146  /// the server.
147  Error(ButtplugError),
148}
149
150impl Unpin for ButtplugClientEvent {
151}
152
153pub(crate) fn create_boxed_future_client_error<T>(
154  err: ButtplugError,
155) -> ButtplugClientResultFuture<T>
156where
157  T: 'static + Send + Sync,
158{
159  future::ready(Err(ButtplugClientError::ButtplugError(err))).boxed()
160}
161
162#[derive(Clone, Debug)]
163pub(crate) struct ButtplugClientMessageSender {
164  message_sender: mpsc::Sender<ButtplugClientRequest>,
165  connected: Arc<AtomicBool>,
166}
167
168impl ButtplugClientMessageSender {
169  fn new(message_sender: mpsc::Sender<ButtplugClientRequest>, connected: &Arc<AtomicBool>) -> Self {
170    Self {
171      message_sender,
172      connected: connected.clone(),
173    }
174  }
175
176  /// Send message to the internal event loop.
177  ///
178  /// Mostly for handling boilerplate around possible send errors.
179  pub fn send_message_to_event_loop(
180    &self,
181    msg: ButtplugClientRequest,
182  ) -> BoxFuture<'static, Result<(), ButtplugClientError>> {
183    // If we're running the event loop, we should have a message_sender.
184    // Being connected to the server doesn't matter here yet because we use
185    // this function in order to connect also.
186    let message_sender = self.message_sender.clone();
187    async move {
188      message_sender
189        .send(msg)
190        .await
191        .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?;
192      Ok(())
193    }
194    .boxed()
195  }
196
197  pub fn send_message(&self, msg: ButtplugClientMessageV4) -> ButtplugServerMessageResultFuture {
198    if !self.connected.load(Ordering::Relaxed) {
199      future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed()
200    } else {
201      self.send_message_ignore_connect_status(msg)
202    }
203  }
204
205  /// Sends a ButtplugMessage from client to server. Expects to receive a ButtplugMessage back from
206  /// the server.
207  pub fn send_message_ignore_connect_status(
208    &self,
209    msg: ButtplugClientMessageV4,
210  ) -> ButtplugServerMessageResultFuture {
211    // Create a oneshot channel for receiving the response.
212    let (tx, rx) = oneshot::channel();
213    let internal_msg =
214      ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new(msg, tx));
215
216    // Send message to internal loop and wait for return.
217    let send_fut = self.send_message_to_event_loop(internal_msg);
218    async move {
219      send_fut.await?;
220      rx.await
221        .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?
222    }
223    .boxed()
224  }
225
226  /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok]
227  /// type ButtplugMessage back from the server.
228  pub fn send_message_expect_ok(&self, msg: ButtplugClientMessageV4) -> ButtplugClientResultFuture {
229    let send_fut = self.send_message(msg);
230    async move { send_fut.await.map(|_| ()) }.boxed()
231  }
232}
233
234/// Struct used by applications to communicate with a Buttplug Server.
235///
236/// Buttplug Clients provide an API layer on top of the Buttplug Protocol that
237/// handles boring things like message creation and pairing, protocol ordering,
238/// etc... This allows developers to concentrate on controlling hardware with
239/// the API.
240///
241/// Clients serve a few different purposes:
242/// - Managing connections to servers, thru [ButtplugConnector]s
243/// - Emitting events received from the Server
244/// - Holding state related to the server (i.e. what devices are currently
245///   connected, etc...)
246///
247/// Clients are created by the [ButtplugClient::new()] method, which also
248/// handles spinning up the event loop and connecting the client to the server.
249/// Closures passed to the run() method can access and use the Client object.
250pub struct ButtplugClient {
251  /// The client name. Depending on the connection type and server being used,
252  /// this name is sometimes shown on the server logs or GUI.
253  client_name: String,
254  /// The server name that we're current connected to.
255  server_name: Arc<Mutex<Option<String>>>,
256  event_stream: broadcast::Sender<ButtplugClientEvent>,
257  // Sender to relay messages to the internal client loop
258  message_sender: ButtplugClientMessageSender,
259  // Receiver for client requests, taken on connect and given to event loop
260  request_receiver: Arc<Mutex<Option<mpsc::Receiver<ButtplugClientRequest>>>>,
261  connected: Arc<AtomicBool>,
262  device_map: Arc<DashMap<u32, ButtplugClientDevice>>,
263}
264
265impl ButtplugClient {
266  pub fn new(name: &str) -> Self {
267    let (request_sender, request_receiver) = mpsc::channel(256);
268    let (event_stream, _) = broadcast::channel(256);
269    let connected = Arc::new(AtomicBool::new(false));
270    Self {
271      client_name: name.to_owned(),
272      server_name: Arc::new(Mutex::new(None)),
273      event_stream,
274      message_sender: ButtplugClientMessageSender::new(request_sender, &connected),
275      request_receiver: Arc::new(Mutex::new(Some(request_receiver))),
276      connected,
277      device_map: Arc::new(DashMap::new()),
278    }
279  }
280
281  pub async fn connect<ConnectorType>(
282    &self,
283    mut connector: ConnectorType,
284  ) -> Result<(), ButtplugClientError>
285  where
286    ConnectorType: ButtplugConnector<ButtplugClientMessageV4, ButtplugServerMessageV4> + 'static,
287  {
288    if self.connected() {
289      return Err(ButtplugClientError::ButtplugConnectorError(
290        ButtplugConnectorError::ConnectorAlreadyConnected,
291      ));
292    }
293
294    // If connect is being called again, clear out the device map and start over.
295    self.device_map.clear();
296
297    // Take the request receiver - if None, a previous connection consumed it and we can't reconnect
298    // without creating a new client (the sender is tied to this receiver)
299    let request_receiver = self.request_receiver.lock().await.take().ok_or(
300      ButtplugConnectorError::ConnectorGenericError(
301        "Cannot reconnect - request channel already consumed. Create a new client.".to_string(),
302      ),
303    )?;
304
305    info!("Connecting to server.");
306    let (connector_sender, connector_receiver) = mpsc::channel(256);
307    connector.connect(connector_sender).await.map_err(|e| {
308      error!("Connection to server failed: {:?}", e);
309      ButtplugClientError::from(e)
310    })?;
311    info!("Connection to server succeeded.");
312    let mut client_event_loop = ButtplugClientEventLoop::new(
313      self.connected.clone(),
314      connector,
315      connector_receiver,
316      self.event_stream.clone(),
317      self.message_sender.clone(),
318      request_receiver,
319      self.device_map.clone(),
320    );
321
322    // Start the event loop before we run the handshake.
323    buttplug_core::spawn!("ButtplugClient event loop", async move {
324      client_event_loop.run().await;
325    });
326    self.run_handshake().await
327  }
328
329  /// Creates the ButtplugClient instance and tries to establish a connection.
330  ///
331  /// Takes all of the components needed to build a [ButtplugClient], creates
332  /// the struct, then tries to run connect and execute the Buttplug protocol
333  /// handshake. Will return a connected and ready to use ButtplugClient is all
334  /// goes well.
335  async fn run_handshake(&self) -> ButtplugClientResult {
336    // Run our handshake
337    info!("Running handshake with server.");
338    let msg = self
339      .message_sender
340      .send_message_ignore_connect_status(
341        RequestServerInfoV4::new(
342          &self.client_name,
343          BUTTPLUG_CURRENT_API_MAJOR_VERSION,
344          BUTTPLUG_CURRENT_API_MINOR_VERSION,
345        )
346        .into(),
347      )
348      .await?;
349
350    debug!("Got ServerInfo return.");
351    if let ButtplugServerMessageV4::ServerInfo(server_info) = msg {
352      info!("Connected to {}", server_info.server_name());
353      *self.server_name.lock().await = Some(server_info.server_name().clone());
354      // Don't set ourselves as connected until after ServerInfo has been
355      // received. This means we avoid possible races with the RequestServerInfo
356      // handshake.
357      self.connected.store(true, Ordering::Relaxed);
358
359      // Get currently connected devices. The event loop will
360      // handle sending the message and getting the return, and
361      // will send the client updates as events.
362      let msg = self
363        .message_sender
364        .send_message(RequestDeviceListV0::default().into())
365        .await?;
366      if let ButtplugServerMessageV4::DeviceList(m) = msg {
367        self
368          .message_sender
369          .send_message_to_event_loop(ButtplugClientRequest::HandleDeviceList(m))
370          .await?;
371      }
372      Ok(())
373    } else {
374      self.disconnect().await?;
375      Err(ButtplugClientError::ButtplugError(
376        ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{msg:?}")).into(),
377      ))
378    }
379  }
380
381  /// Returns true if client is currently connected.
382  pub fn connected(&self) -> bool {
383    self.connected.load(Ordering::Relaxed)
384  }
385
386  /// Disconnects from server, if connected.
387  ///
388  /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed
389  /// that even on failure, the client will be disconnected.
390  pub fn disconnect(&self) -> ButtplugClientResultFuture {
391    if !self.connected() {
392      return future::ready(Err(ButtplugConnectorError::ConnectorNotConnected.into())).boxed();
393    }
394    // Send the connector to the internal loop for management. Once we throw
395    // the connector over, the internal loop will handle connecting and any
396    // further communications with the server, if connection is successful.
397    let (tx, rx) = oneshot::channel();
398    let msg = ButtplugClientRequest::Disconnect(tx);
399    let send_fut = self.message_sender.send_message_to_event_loop(msg);
400    let connected = self.connected.clone();
401    async move {
402      connected.store(false, Ordering::Relaxed);
403      send_fut.await?;
404      // Wait for disconnect to complete, but don't fail if channel closed
405      let _ = rx.await;
406      Ok(())
407    }
408    .boxed()
409  }
410
411  /// Tells server to start scanning for devices.
412  ///
413  /// Returns Err([ButtplugClientError]) if request fails due to issues with
414  /// DeviceManagers on the server, disconnection, etc.
415  pub fn start_scanning(&self) -> ButtplugClientResultFuture {
416    self
417      .message_sender
418      .send_message_expect_ok(StartScanningV0::default().into())
419  }
420
421  /// Tells server to stop scanning for devices.
422  ///
423  /// Returns Err([ButtplugClientError]) if request fails due to issues with
424  /// DeviceManagers on the server, disconnection, etc.
425  pub fn stop_scanning(&self) -> ButtplugClientResultFuture {
426    self
427      .message_sender
428      .send_message_expect_ok(StopScanningV0::default().into())
429  }
430
431  /// Tells server to stop all devices.
432  ///
433  /// Returns Err([ButtplugClientError]) if request fails due to issues with
434  /// DeviceManagers on the server, disconnection, etc.
435  pub fn stop_all_devices(&self) -> ButtplugClientResultFuture {
436    self
437      .message_sender
438      .send_message_expect_ok(StopCmdV4::default().into())
439  }
440
441  pub fn event_stream(&self) -> impl Stream<Item = ButtplugClientEvent> + use<> {
442    let stream = convert_broadcast_receiver_to_stream(self.event_stream.subscribe());
443    // We can either Box::pin here or force the user to pin_mut!() on their
444    // end. While this does end up with a dynamic dispatch on our end, it
445    // still makes the API nicer for the user, so we'll just eat the perf hit.
446    // Not to mention, this is not a high throughput system really, so it
447    // shouldn't matter.
448    Box::pin(stream)
449  }
450
451  /// Retreives a list of currently connected devices.
452  pub fn devices(&self) -> BTreeMap<u32, ButtplugClientDevice> {
453    self
454      .device_map
455      .iter()
456      .map(|map_pair| (*map_pair.key(), map_pair.value().clone()))
457      .collect()
458  }
459
460  pub fn ping(&self) -> ButtplugClientResultFuture {
461    let ping_fut = self
462      .message_sender
463      .send_message_expect_ok(PingV0::default().into());
464    ping_fut.boxed()
465  }
466
467  pub fn server_name(&self) -> Option<String> {
468    // We'd have to be calling server_name in an extremely tight, asynchronous
469    // loop for this to return None, so we'll treat this as lockless.
470    //
471    // Dear users actually reading this code: This is not an invitation for you
472    // to get the server name in a tight, asynchronous loop. This will never
473    // change throughout the life to the connection.
474    if let Ok(name) = self.server_name.try_lock() {
475      name.clone()
476    } else {
477      None
478    }
479  }
480}