1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use crate::{
  connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
  core::{
    errors::{ButtplugError, ButtplugMessageError, ButtplugServerError},
    messages::{
      ButtplugCurrentSpecClientMessage,
      ButtplugCurrentSpecServerMessage,
      ButtplugMessage,
    },
  },
  server::{ButtplugServer, ButtplugServerOptions},
  util::async_manager,
};
use async_channel::{bounded, Receiver, Sender};
use futures::{
  future::{self, BoxFuture},
  StreamExt,
};
use std::convert::TryInto;
use tracing_futures::Instrument;

/// In-process Buttplug Server Connector
///
/// The In-Process Connector contains a [ButtplugServer], meaning that both the
/// [ButtplugClient][crate::client::ButtplugClient] and [ButtplugServer] will
/// exist in the same process. This is useful for developing applications, or
/// for distributing an applications without requiring access to an outside
/// [ButtplugServer].
///
/// # Notes
///
/// Buttplug, as a project, is built in a way that tries to make sure all
/// programs will work with new versions of the library. This is why we have
/// [ButtplugClient][crate::client::ButtplugClient] for applications, and
/// Connectors to access out-of-process [ButtplugServer]s over IPC, network,
/// etc. It means that the out-of-process server can be upgraded by the user at
/// any time, even if the [ButtplugClient][crate::client::ButtplugClient] using
/// application hasn't been upgraded. This allows the program to support
/// hardware that may not have even been released when it was published.
///
/// While including an EmbeddedConnector in your application is the quickest and
/// easiest way to develop (and we highly recommend developing that way), and
/// also an easy way to get users up and running as quickly as possible, we
/// recommend also including some sort of IPC Connector in order for your
/// application to connect to newer servers when they come out.
#[cfg(feature = "server")]
pub struct ButtplugInProcessClientConnector {
  /// Internal server object for the embedded connector.
  server: ButtplugServer,
  server_outbound_sender: Sender<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
  /// Event receiver for the internal server.
  connector_outbound_recv:
    Option<Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>>,
}

#[cfg(feature = "server")]
impl<'a> Default for ButtplugInProcessClientConnector {
  fn default() -> Self {
    // Unwrap is fine here, if we pass in default options we'll never fail.
    ButtplugInProcessClientConnector::new_with_options(&ButtplugServerOptions::default()).unwrap()
  }
}

#[cfg(feature = "server")]
impl<'a> ButtplugInProcessClientConnector {
  /// Creates a new in-process connector, with a server instance.
  ///
  /// Sets up a server, using the basic [ButtplugServer] construction arguments.
  /// Takes the server's name and the ping time it should use, with a ping time
  /// of 0 meaning infinite ping.
  pub fn new_with_options(options: &ButtplugServerOptions) -> Result<Self, ButtplugError> {
    let (server, mut server_recv) = ButtplugServer::new_with_options(options)?;
    let (send, recv) = bounded(256);
    let server_outbound_sender = send.clone();
    async_manager::spawn(async move {
      info!("Starting In Process Client Connector Event Sender Loop");
      while let Some(event) = server_recv.next().await {
        // If we get an error back, it means the client dropped our event handler, so just stop trying.
        if send.send(Ok(event.try_into().unwrap())).await.is_err() {
          break;
        }
      }
      info!("Stopping In Process Client Connector Event Sender Loop, due to channel receiver being dropped.");
    }.instrument(tracing::info_span!("InProcessClientConnectorEventSenderLoop"))).unwrap();

    Ok(Self {
      connector_outbound_recv: Some(recv),
      server_outbound_sender,
      server,
    })
  }

  /// Get a reference to the internal server.
  ///
  /// Allows the owner to manipulate the internal server instance. Useful for
  /// setting up
  /// [DeviceCommunicationManager][crate::server::comm_managers::DeviceCommunicationManager]s
  /// before connection.
  pub fn server_ref(&'a self) -> &'a ButtplugServer {
    &self.server
  }
}

#[cfg(feature = "server")]
impl ButtplugConnector<ButtplugCurrentSpecClientMessage, ButtplugCurrentSpecServerMessage>
  for ButtplugInProcessClientConnector
{
  fn connect(
    &mut self,
  ) -> BoxFuture<
    'static,
    Result<
      Receiver<Result<ButtplugCurrentSpecServerMessage, ButtplugServerError>>,
      ButtplugConnectorError,
    >,
  > {
    if self.connector_outbound_recv.is_some() {
      let recv = self.connector_outbound_recv.take().unwrap();
      Box::pin(future::ready(Ok(recv)))
    } else {
      ButtplugConnectorError::ConnectorAlreadyConnected.into()
    }
  }

  fn disconnect(&self) -> ButtplugConnectorResultFuture {
    Box::pin(future::ready(Ok(())))
  }

  fn send(&self, msg: ButtplugCurrentSpecClientMessage) -> ButtplugConnectorResultFuture {
    let out_id = msg.get_id();
    let input = msg.try_into().unwrap();
    let output_fut = self.server.parse_message(input);
    let sender = self.server_outbound_sender.clone();
    Box::pin(async move {
      // TODO We should definitely do something different than just unwrapping errors here.
      let output = output_fut.await.and_then(|msg| {
        msg.try_into().map_err(|_| {
          ButtplugServerError::new_message_error(
            out_id,
            ButtplugMessageError::MessageConversionError(
              "Cannot convert server message to client spec.",
            )
            .into(),
          )
        })
      });
      sender
        .send(output)
        .await
        .map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
    })
  }
}