1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Is this just two examples from tokio_tungstenite glued together?
//
// It absolute is!

use futures_util::{future, StreamExt, TryStreamExt};
use log::info;
use tokio::{
  net::{TcpListener, TcpStream},
  select,
};
use tokio_tungstenite::connect_async;
use tokio_util::sync::CancellationToken;

pub struct ButtplugRepeater {
  local_port: u16,
  remote_address: String,
  stop_token: CancellationToken,
}

impl ButtplugRepeater {
  pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self {
    Self {
      local_port,
      remote_address: remote_address.to_owned(),
      stop_token,
    }
  }

  pub async fn listen(&self) {
    info!("Repeater loop starting");
    let addr = format!("127.0.0.1:{}", self.local_port);

    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    info!("Listening on: {}", addr);

    loop {
      select! {
        stream_result = listener.accept() => {
          match stream_result {
            Ok((stream, _)) => {
              let remote_address = self.remote_address.clone();
              tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream));
            },
            Err(e) => {
              error!("Error accepting new websocket for repeater: {:?}", e);
              break;
            }
          }
        },
        _ = self.stop_token.cancelled() => {
          info!("Repeater loop requested to stop, breaking.");
          break;
        }
      }
    }
    info!("Repeater loop exiting");
  }

  async fn accept_connection(server_addr: String, stream: TcpStream) {
    let client_addr = stream
      .peer_addr()
      .expect("connected streams should have a peer address");
    info!("Client address: {}", client_addr);

    let client_ws_stream = tokio_tungstenite::accept_async(stream)
      .await
      .expect("Error during the websocket handshake occurred");

    info!("New WebSocket connection: {}", client_addr);

    let server_url = url::Url::parse(&server_addr).unwrap();

    let ws_stream = match connect_async(&server_url).await {
      Ok((stream, _)) => stream,
      Err(e) => {
        error!("Cannot connect: {:?}", e);
        return;
      }
    };
    info!("WebSocket handshake has been successfully completed");

    let (server_write, server_read) = ws_stream.split();

    let (client_write, client_read) = client_ws_stream.split();

    let client_fut = client_read
      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
      .forward(server_write);
    let server_fut = server_read
      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
      .forward(client_write);
    future::select(client_fut, server_fut).await;
    info!("Closing repeater connection.");
  }
}