1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
// Buttplug Rust Source Code File - See https://buttplug.io for more info. // // Copyright 2016-2019 Nonpolynomial Labs LLC. All rights reserved. // // Licensed under the BSD 3-Clause license. See LICENSE file in the project root // for full license information. //! Communications API for accessing Buttplug Servers mod client_message_sorter; pub mod device; pub mod internal; use device::ButtplugClientDevice; use internal::{client_event_loop, ButtplugClientDeviceInternal, ButtplugClientRequest}; use crate::{ connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorFuture}, core::{ errors::{ButtplugError, ButtplugHandshakeError}, messages::{ ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage, ButtplugMessageSpecVersion, DeviceMessageInfo, LogLevel, RequestDeviceList, RequestServerInfo, StartScanning, StopScanning, }, }, util::{ async_manager, future::{ButtplugFuture, ButtplugFutureStateShared}, }, }; use async_channel::Sender; use dashmap::DashMap; use futures::{ future::{self, BoxFuture, Future}, FutureExt, StreamExt, }; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use thiserror::Error; use tracing::{span::Span, Level}; use tracing_futures::Instrument; /// Result type used inside the client module. /// /// When communicating inside the client module, we'll usually only receive /// errors related to the connector. Buttplug /// [Error][crate::core::messages::Error] messages will still be valid, because /// they're coming from the server. // TODO This is not longer used since we return errors at the point of deserialization. Remove. type ButtplugInternalClientResult<T = ()> = Result<T, ButtplugClientError>; /// Result type used for public APIs. /// /// Allows us to differentiate between an issue with the connector (as a /// [ButtplugConnectorError]) and an issue within Buttplug (as a /// [ButtplugError]). type ButtplugClientResult<T = ()> = Result<T, ButtplugClientError>; type ButtplugClientResultFuture<T = ()> = BoxFuture<'static, ButtplugClientResult<T>>; /// Result type used for passing server responses. pub type ButtplugInternalClientMessageResult = ButtplugInternalClientResult<ButtplugCurrentSpecServerMessage>; pub type ButtplugInternalClientMessageResultFuture = BoxFuture<'static, ButtplugInternalClientMessageResult>; /// Future state type for returning server responses across futures. pub(crate) type ButtplugClientMessageStateShared = ButtplugFutureStateShared<ButtplugInternalClientMessageResult>; /// Future type that expects server responses. pub(crate) type ButtplugClientMessageFuture = ButtplugFuture<ButtplugInternalClientMessageResult>; /// Future state for messages sent from the client that expect a server /// response. /// /// When a message is sent from the client and expects a response from the /// server, we'd like to know when that response arrives, and usually we'll want /// to wait for it. We can do so by creating a future that will be resolved when /// a response is received from the server. /// /// To do this, we build a [ButtplugFuture], then take its waker and pass it /// along with the message we send to the connector, using the /// [ButtplugClientMessageFuturePair] type. We can then expect the connector to /// get the response from the server, match it with our message (using something /// like the /// [ClientConnectorMessageSorter][crate::connector::ClientConnectorMessageSorter]), /// and set the reply in the waker we've sent along. This will resolve the /// future we're waiting on and allow us to continue execution. pub struct ButtplugClientMessageFuturePair { pub msg: ButtplugCurrentSpecClientMessage, pub waker: ButtplugClientMessageStateShared, } impl ButtplugClientMessageFuturePair { pub fn new( msg: ButtplugCurrentSpecClientMessage, waker: ButtplugClientMessageStateShared, ) -> Self { Self { msg, waker } } } /// Represents all of the different types of errors a ButtplugClient can return. /// /// Clients can return two types of errors: /// /// - [ButtplugConnectorError], which means there was a problem with the /// connection between the client and the server, like a network connection /// issue. /// - [ButtplugError], which is an error specific to the Buttplug Protocol. #[derive(Debug, Error)] pub enum ButtplugClientError { /// Connector error #[error(transparent)] ButtplugConnectorError(#[from] ButtplugConnectorError), /// Protocol error #[error(transparent)] ButtplugError(#[from] ButtplugError), } /// Enum representing different events that can be emitted by a client. /// /// These events are created by the server and sent to the client, and represent /// unrequested actions that the client will need to respond to, or that /// applications using the client may be interested in. #[derive(Clone)] pub enum ButtplugClientEvent { /// Emitted when a scanning session (started via a StartScanning call on /// [ButtplugClient]) has finished. ScanningFinished, /// Emitted when a device has been added to the server. Includes a /// [ButtplugClientDevice] object representing the device. DeviceAdded(ButtplugClientDevice), /// Emitted when a device has been removed from the server. Includes a /// [ButtplugClientDevice] object representing the device. DeviceRemoved(DeviceMessageInfo), /// Emitted when log messages are sent from the server. Log(LogLevel, String), /// Emitted when a client has not pinged the server in a sufficient amount /// of time. PingTimeout, /// Emitted when a client connector detects that the server has /// disconnected. ServerDisconnect, Error(ButtplugError), } /// Struct used by applications to communicate with a Buttplug Server. /// /// Buttplug Clients provide an API layer on top of the Buttplug Protocol that /// handles boring things like message creation and pairing, protocol ordering, /// etc... This allows developers to concentrate on controlling hardware with /// the API. /// /// Clients serve a few different purposes: /// - Managing connections to servers, thru [ButtplugClientConnector]s /// - Emitting events received from the Server /// - Holding state related to the server (i.e. what devices are currently /// connected, etc...) /// /// Clients are created by the [ButtplugClient::run()] method, which also /// handles spinning up the event loop and connecting the client to the server. /// Closures passed to the run() method can access and use the Client object. pub struct ButtplugClient { /// The client name. Depending on the connection type and server being used, /// this name is sometimes shown on the server logs or GUI. pub client_name: String, /// The server name that we're current connected to. pub server_name: String, // Sender to relay messages to the internal client loop message_sender: Sender<ButtplugClientRequest>, // True if the connector is currently connected, and handshake was // successful. connected: Arc<AtomicBool>, _client_span: Span, device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>, } unsafe impl Send for ButtplugClient { } // Not actually sure this should be sync, but trying to call handshake breaks // without it. unsafe impl Sync for ButtplugClient { } impl ButtplugClient { pub fn connect<ConnectorType>( name: &str, mut connector: ConnectorType, ) -> BoxFuture< 'static, Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>, > where ConnectorType: ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage> + 'static, { trace!("run() called, creating client future."); let client_name = name.to_string(); Box::pin(async move { let span = span!(Level::INFO, "Client"); let _client_span = span.enter(); info!("Connecting to server."); let connector_receiver = connector.connect().await.map_err(|e| { error!("Connection to server failed: {:?}", e); let err: ButtplugClientError = e.into(); err })?; info!("Connection to server succeeded."); let (client_event_loop_fut, device_map_reader, message_sender, event_channel) = client_event_loop(connector, connector_receiver); let client_event_receiver = event_channel.clone(); let mut disconnect_event_receiver = event_channel.clone(); let connected_status = Arc::new(AtomicBool::new(true)); let connected_status_clone = connected_status.clone(); // Start the event loop before we run the handshake. async_manager::spawn( async move { let disconnect_fut = async move { loop { if let Some(ButtplugClientEvent::ServerDisconnect) = disconnect_event_receiver.next().await { connected_status.store(false, Ordering::SeqCst); break; } } Result::<(), ButtplugClientError>::Ok(()) } .instrument(tracing::info_span!("Client Disconnect Loop")); // If we disconnect, we'll also stop the client event loop. If the // client event loop stops, we don't care about listening for disconnect // anymore. select! { _ = client_event_loop_fut.fuse() => (), _ = disconnect_fut.fuse() => (), }; } .instrument(tracing::info_span!("Client Loop Span")), ) .unwrap(); let client = ButtplugClient::create_client( &client_name, connected_status_clone, message_sender, device_map_reader, span.clone(), ) .await?; Ok((client, client_event_receiver)) }) } /// Convenience function for creating in-process connectors. /// /// Creates a [ButtplugClient] event loop, with an in-process connector with /// all device managers that ship with the library and work on the current /// platform added to it already. Takes a maximum ping time to build the /// server with, other parameters match `run()`. /// /// # When To Use This Instead of `run()` /// /// If you just want to build a quick example and save yourself a few use /// statements and setup, this will get you going. For anything *production*, /// we recommend using `run()` as you will have more control over what /// happens. This method may gain/lose device comm managers at any time. /// /// # The Device I Want To Use Doesn't Show Up /// /// If you are trying to use this method to create your client, and do not see /// the devices you want, there are a couple of things to check: /// /// - Are you on a platform that the device communication manager supports? /// For instance, we only support XInput on windows. /// - Did the developers add a new Device CommunicationManager type and forget /// to add it to this method? _It's more likely than you think!_ [File a /// bug](https://github.com/buttplugio/buttplug-rs/issues). /// /// # Errors /// /// If the library was compiled without any device managers, the /// [ButtplugClient] will have nothing to do. This is considered a /// catastrophic failure and the library will return an error. /// /// If the library is using outside device managers, it is recommended to /// build your own connector, add your device manager to those, and use the /// `run()` method to pass it in. #[cfg(feature = "server")] pub fn connect_in_process( name: &str, max_ping_time: u64, ) -> impl Future< Output = Result<(Self, impl StreamExt<Item = ButtplugClientEvent>), ButtplugClientError>, > { use crate::connector::ButtplugInProcessClientConnector; let connector = ButtplugInProcessClientConnector::new("Default In Process Server", max_ping_time); #[cfg(feature = "btleplug-manager")] { use crate::server::comm_managers::btleplug::BtlePlugCommunicationManager; connector .server_ref() .add_comm_manager::<BtlePlugCommunicationManager>() .unwrap(); } #[cfg(all(feature = "xinput-manager", target_os = "windows"))] { use crate::server::comm_managers::xinput::XInputDeviceCommunicationManager; connector .server_ref() .add_comm_manager::<XInputDeviceCommunicationManager>() .unwrap(); } ButtplugClient::connect(name, connector) } /// Creates the ButtplugClient instance and tries to establish a connection. /// /// Takes all of the components needed to build a [ButtplugClient], creates /// the struct, then tries to run connect and execute the Buttplug protocol /// handshake. Will return a connected and ready to use ButtplugClient is all /// goes well. async fn create_client( client_name: &str, connected_status: Arc<AtomicBool>, message_sender: Sender<ButtplugClientRequest>, device_map: Arc<DashMap<u32, ButtplugClientDeviceInternal>>, span: Span, ) -> Result<Self, ButtplugClientError> { // Create the client let mut client = ButtplugClient { client_name: client_name.to_string(), server_name: String::new(), message_sender, // Since we'll have already connected and initialized by the time we hand // this to the client function, we can go ahead and declare that we're // connected here. If that's not true, we won't even execute the client // function. connected: connected_status, device_map, _client_span: span, }; // Run our handshake info!("Running handshake with server."); let msg = client .send_message( RequestServerInfo::new(&client.client_name, ButtplugMessageSpecVersion::Version2).into(), ) .await?; debug!("Got ServerInfo return."); if let ButtplugCurrentSpecServerMessage::ServerInfo(server_info) = msg { info!("Connected to {}", server_info.server_name); client.server_name = server_info.server_name; // TODO Handle ping time in the internal event loop // Get currently connected devices. The event loop will // handle sending the message and getting the return, and // will send the client updates as events. let msg = client .send_message(RequestDeviceList::default().into()) .await?; if let ButtplugCurrentSpecServerMessage::DeviceList(m) = msg { client .send_internal_message(ButtplugClientRequest::HandleDeviceList(m)) .await?; } Ok(client) } else { client.disconnect().await?; Err(ButtplugClientError::ButtplugError( ButtplugHandshakeError::UnexpectedHandshakeMessageReceived(format!("{:?}", msg)).into(), )) } } /// Returns true if client is currently connected. pub fn connected(&self) -> bool { self.connected.load(Ordering::SeqCst) } /// Disconnects from server, if connected. /// /// Returns Err(ButtplugClientError) if disconnection fails. It can be assumed /// that even on failure, the client will be disconnected. pub fn disconnect(&self) -> ButtplugClientResultFuture { // Send the connector to the internal loop for management. Once we throw // the connector over, the internal loop will handle connecting and any // further communications with the server, if connection is successful. let fut = ButtplugConnectorFuture::default(); let msg = ButtplugClientRequest::Disconnect(fut.get_state_clone()); let send_fut = self.send_internal_message(msg); let connected = self.connected.clone(); Box::pin(async move { send_fut.await?; connected.store(false, Ordering::SeqCst); Ok(()) }) } /// Tells server to start scanning for devices. /// /// Returns Err([ButtplugClientError]) if request fails due to issues with /// DeviceManagers on the server, disconnection, etc. pub fn start_scanning(&self) -> ButtplugClientResultFuture { self.send_message_expect_ok(StartScanning::default().into()) } /// Tells server to stop scanning for devices. /// /// Returns Err([ButtplugClientError]) if request fails due to issues with /// DeviceManagers on the server, disconnection, etc. pub fn stop_scanning(&self) -> ButtplugClientResultFuture { self.send_message_expect_ok(StopScanning::default().into()) } /// Send message to the internal event loop. /// /// Mostly for handling boilerplate around possible send errors. fn send_internal_message( &self, msg: ButtplugClientRequest, ) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> { if !self.connected.load(Ordering::SeqCst) { return Box::pin(future::ready(Err( ButtplugConnectorError::ConnectorNotConnected, ))); } // If we're running the event loop, we should have a message_sender. // Being connected to the server doesn't matter here yet because we use // this function in order to connect also. let message_sender = self.message_sender.clone(); Box::pin(async move { message_sender .send(msg) .await .map_err(|_| ButtplugConnectorError::ConnectorChannelClosed)?; Ok(()) }) } /// Sends a ButtplugMessage from client to server. Expects to receive a /// ButtplugMessage back from the server. fn send_message( &self, msg: ButtplugCurrentSpecClientMessage, ) -> ButtplugInternalClientMessageResultFuture { // Create a future to pair with the message being resolved. let fut = ButtplugClientMessageFuture::default(); let internal_msg = ButtplugClientRequest::Message(ButtplugClientMessageFuturePair::new( msg, fut.get_state_clone(), )); // Send message to internal loop and wait for return. let send_fut = self.send_internal_message(internal_msg); Box::pin(async move { send_fut.await?; fut.await }) } /// Sends a ButtplugMessage from client to server. Expects to receive an [Ok] /// type ButtplugMessage back from the server. fn send_message_expect_ok( &self, msg: ButtplugCurrentSpecClientMessage, ) -> ButtplugClientResultFuture { let send_fut = self.send_message(msg); Box::pin(async move { send_fut.await.map(|_| ()).map_err(|err| err) }) } /// Retreives a list of currently connected devices. /// /// As the device list is maintained in the event loop structure, retreiving /// the list requires an asynchronous call to retreive the list from the task. pub fn devices(&self) -> Vec<ButtplugClientDevice> { info!("Request devices from inner loop!"); let mut device_clones = vec![]; for device in self.device_map.iter() { device_clones.push(ButtplugClientDevice::from(( &(*device.device), self.message_sender.clone(), (*device.channel).clone(), ))); } device_clones } }