1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Buttplug Rust Source Code File - See https://buttplug.io for more info.

//

// Copyright 2016-2020 Nonpolynomial Labs LLC. All rights reserved.

//

// Licensed under the BSD 3-Clause license. See LICENSE file in the project root

// for full license information.


//! Handling of websockets using async-tungstenite


use super::transport::{ButtplugConnectorTransport, ButtplugTransportMessage};
use crate::{
  connector::{ButtplugConnector, ButtplugConnectorError, ButtplugConnectorResultFuture},
  core::{
    errors::{ButtplugMessageError, ButtplugServerError},
    messages::{
      serializer::{
        ButtplugClientJSONSerializer,
        ButtplugMessageSerializer,
        ButtplugSerializedMessage,
      },
      ButtplugClientMessage,
      ButtplugCurrentSpecClientMessage,
      ButtplugCurrentSpecServerMessage,
      ButtplugMessage,
      ButtplugServerMessage,
    },
  },
  util::async_manager,
};
use async_channel::{bounded, Receiver, Sender};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use std::marker::PhantomData;

enum ButtplugRemoteConnectorMessage<T>
where
  T: ButtplugMessage + 'static,
{
  Message(T),
  Close,
}

enum StreamValue<T>
where
  T: ButtplugMessage + 'static,
{
  NoValue,
  Incoming(ButtplugTransportMessage),
  Outgoing(ButtplugRemoteConnectorMessage<T>),
}

async fn remote_connector_event_loop<
  TransportType,
  SerializerType,
  OutboundMessageType,
  InboundMessageType,
>(
  // Takes messages from the client

  mut connector_outgoing_recv: Receiver<ButtplugRemoteConnectorMessage<OutboundMessageType>>,
  // Sends messages not matched in the sorter to the client.

  connector_incoming_sender: Sender<Result<InboundMessageType, ButtplugServerError>>,
  transport: TransportType,
  // Sends sorter processed messages to the transport.

  transport_outgoing_sender: Sender<ButtplugSerializedMessage>,
  // Takes data coming in from the transport.

  mut transport_incoming_recv: Receiver<ButtplugTransportMessage>,
) where
  TransportType: ButtplugConnectorTransport + 'static,
  SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
    + 'static,
  OutboundMessageType: ButtplugMessage + 'static,
  InboundMessageType: ButtplugMessage + 'static, //From<Error> + 'static,

{
  // Message sorter that receives messages that come in from the client.

  let mut serializer = SerializerType::default();
  loop {
    // We use two Options instead of an enum because we may never get anything.

    //

    // For the type, we will get back one of two things: Either a serialized

    // incoming message from the transport for the connector, or an outgoing

    // message from the connector to go to the transport.

    let mut stream_return = select! {
      // Catch messages coming in from the transport.

      transport = transport_incoming_recv.next().fuse() =>
      match transport {
        Some(msg) => StreamValue::Incoming(msg),
        None => StreamValue::NoValue,
      },
      connector = connector_outgoing_recv.next().fuse() =>
      match connector {
        // Catch messages that need to be sent out through the connector.

        Some(msg) => StreamValue::Outgoing(msg),
        None => StreamValue::NoValue,
      }
    };
    match stream_return {
      // If we get NoValue back, it means one side closed, so the other should

      // too.

      StreamValue::NoValue => break,
      // If we get incoming back, it means we've received something from the

      // server. See if we have a matching future, else send whatever we got as

      // an event.

      StreamValue::Incoming(remote_msg) => {
        match remote_msg {
          ButtplugTransportMessage::Message(serialized_msg) => {
            match serializer.deserialize(serialized_msg) {
              Ok(array) => {
                for smsg in array {
                  // TODO THIS SHOULD CONVERT ERROR MESSAGES FIRST.

                  if connector_incoming_sender.send(Ok(smsg)).await.is_err() {
                    error!("Connector has disconnected, ending remote connector loop.");
                    return;
                  }
                }
              }
              Err(e) => {
                let error_str =
                  format!("Got invalid messages from remote Buttplug Server: {:?}", e);
                error!("{}", error_str);
                let _ = connector_incoming_sender
                  .send(Err(
                    ButtplugMessageError::MessageSerializationError(e).into(),
                  ))
                  .await;
              }
            }
          }
          ButtplugTransportMessage::Close(s) => {
            info!("Connector closing connection {}", s);
            break;
          }
          // TODO We should probably make connecting an event?

          ButtplugTransportMessage::Connected => {}
          // TODO We should probably figure out what this even does?

          ButtplugTransportMessage::Error(_) => {}
        }
      }
      // If we receive something from the client, register it with our sorter

      // then let the connector figure out what to do with it.

      StreamValue::Outgoing(ref mut buttplug_msg) => {
        match buttplug_msg {
          ButtplugRemoteConnectorMessage::Message(msg) => {
            // Create future sets our message ID, so make sure this

            // happens before we send out the message.

            let serialized_msg = serializer.serialize(vec![msg.clone()]);
            if transport_outgoing_sender
              .send(serialized_msg)
              .await
              .is_err()
            {
              error!("Transport has disconnected, exiting remote connector loop.");
              return;
            }
          }
          ButtplugRemoteConnectorMessage::Close => {
            if let Err(e) = transport.disconnect().await {
              error!("Error disconnecting transport: {:?}", e);
            }
            break;
          }
        }
      }
    }
  }
}

pub type ButtplugRemoteClientConnector<
  TransportType,
  SerializerType = ButtplugClientJSONSerializer,
> = ButtplugRemoteConnector<
  TransportType,
  SerializerType,
  ButtplugCurrentSpecClientMessage,
  ButtplugCurrentSpecServerMessage,
>;

pub type ButtplugRemoteServerConnector<TransportType, SerializerType> = ButtplugRemoteConnector<
  TransportType,
  SerializerType,
  ButtplugServerMessage,
  ButtplugClientMessage,
>;

pub struct ButtplugRemoteConnector<
  TransportType,
  SerializerType,
  OutboundMessageType,
  InboundMessageType,
> where
  TransportType: ButtplugConnectorTransport + 'static,
  SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
    + 'static,
  OutboundMessageType: ButtplugMessage + 'static,
  InboundMessageType: ButtplugMessage + 'static,
{
  /// Transport that the connector will use to communicate with the other

  /// connector.

  ///

  /// This is an option so that, if we connect successfully, we can `.take()`

  /// the value out of the option and send it to our event loop. This means if

  /// anyone tries to call connect twice, we'll fail (because we'll have no

  /// transport to connect to). It also limits the lifetime of the connector to

  /// the lifetime of the event loop, meaning if for any reason we exit, we make

  /// sure the transport is dropped.

  transport: Option<TransportType>,
  /// Sender for forwarding outgoing messages to the connector event loop.

  event_loop_sender: Option<Sender<ButtplugRemoteConnectorMessage<OutboundMessageType>>>,
  dummy_serializer: PhantomData<SerializerType>,
}

impl<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
  ButtplugRemoteConnector<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
where
  TransportType: ButtplugConnectorTransport + 'static,
  SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
    + 'static,
  OutboundMessageType: ButtplugMessage + 'static,
  InboundMessageType: ButtplugMessage + 'static,
{
  pub fn new(transport: TransportType) -> Self {
    Self {
      transport: Some(transport),
      event_loop_sender: None,
      dummy_serializer: PhantomData::default(),
    }
  }
}

impl<TransportType, SerializerType, OutboundMessageType, InboundMessageType>
  ButtplugConnector<OutboundMessageType, InboundMessageType>
  for ButtplugRemoteConnector<
    TransportType,
    SerializerType,
    OutboundMessageType,
    InboundMessageType,
  >
where
  TransportType: ButtplugConnectorTransport + 'static,
  SerializerType: ButtplugMessageSerializer<Inbound = InboundMessageType, Outbound = OutboundMessageType>
    + 'static,
  OutboundMessageType: ButtplugMessage + 'static,
  InboundMessageType: ButtplugMessage + 'static, //+ From<Error> + 'static,

{
  fn connect(
    &mut self,
  ) -> BoxFuture<
    'static,
    Result<Receiver<Result<InboundMessageType, ButtplugServerError>>, ButtplugConnectorError>,
  > {
    if self.transport.is_some() {
      // We can unwrap this because we just proved we had it.

      let transport = self.transport.take().unwrap();
      let (connector_outgoing_sender, connector_outgoing_receiver) = bounded(256);
      self.event_loop_sender = Some(connector_outgoing_sender);
      Box::pin(async move {
        match transport.connect().await {
          // If we connect successfully, we get back the channel from the transport

          // to send outgoing messages and receieve incoming events, all serialized.

          Ok((transport_outgoing_sender, transport_incoming_receiver)) => {
            let (connector_incoming_sender, connector_incoming_receiver) = bounded(256);
            async_manager::spawn(async move {
              remote_connector_event_loop::<
                TransportType,
                SerializerType,
                OutboundMessageType,
                InboundMessageType,
              >(
                connector_outgoing_receiver,
                connector_incoming_sender,
                transport,
                transport_outgoing_sender,
                transport_incoming_receiver,
              )
              .await
            })
            .unwrap();
            Ok(connector_incoming_receiver)
          }
          Err(e) => Err(e),
        }
      })
    } else {
      ButtplugConnectorError::ConnectorAlreadyConnected.into()
    }
  }

  fn disconnect(&self) -> ButtplugConnectorResultFuture {
    if let Some(ref sender) = self.event_loop_sender {
      let sender_clone = sender.clone();
      Box::pin(async move {
        sender_clone
          .send(ButtplugRemoteConnectorMessage::Close)
          .await
          .map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
      })
    } else {
      ButtplugConnectorError::ConnectorNotConnected.into()
    }
  }

  fn send(&self, msg: OutboundMessageType) -> ButtplugConnectorResultFuture {
    if let Some(ref sender) = self.event_loop_sender {
      let sender_clone = sender.clone();
      Box::pin(async move {
        sender_clone
          .send(ButtplugRemoteConnectorMessage::Message(msg))
          .await
          .map_err(|_| ButtplugConnectorError::ConnectorNotConnected)
      })
    } else {
      ButtplugConnectorError::ConnectorNotConnected.into()
    }
  }
}