intiface_engine/
repeater.rs

1// Is this just two examples from tokio_tungstenite glued together?
2//
3// It absolute is!
4
5use futures_util::{future, StreamExt, TryStreamExt};
6use log::info;
7use tokio::{
8  net::{TcpListener, TcpStream},
9  select,
10};
11use tokio_tungstenite::connect_async;
12use tokio_util::sync::CancellationToken;
13
14pub struct ButtplugRepeater {
15  local_port: u16,
16  remote_address: String,
17  stop_token: CancellationToken,
18}
19
20impl ButtplugRepeater {
21  pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self {
22    Self {
23      local_port,
24      remote_address: remote_address.to_owned(),
25      stop_token,
26    }
27  }
28
29  pub async fn listen(&self) {
30    info!("Repeater loop starting");
31    let addr = format!("127.0.0.1:{}", self.local_port);
32
33    let try_socket = TcpListener::bind(&addr).await;
34    let listener = try_socket.expect("Failed to bind");
35    info!("Listening on: {}", addr);
36
37    loop {
38      select! {
39        stream_result = listener.accept() => {
40          match stream_result {
41            Ok((stream, _)) => {
42              let mut remote_address = self.remote_address.clone();
43              if !remote_address.starts_with("ws://") {
44                remote_address.insert_str(0, "ws://");
45              }
46              tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream));
47            },
48            Err(e) => {
49              error!("Error accepting new websocket for repeater: {:?}", e);
50              break;
51            }
52          }
53        },
54        _ = self.stop_token.cancelled() => {
55          info!("Repeater loop requested to stop, breaking.");
56          break;
57        }
58      }
59    }
60    info!("Repeater loop exiting");
61  }
62
63  async fn accept_connection(server_addr: String, stream: TcpStream) {
64    let client_addr = stream
65      .peer_addr()
66      .expect("connected streams should have a peer address");
67    info!("Client address: {}", client_addr);
68
69    let client_ws_stream = tokio_tungstenite::accept_async(stream)
70      .await
71      .expect("Error during the websocket handshake occurred");
72
73    info!("New WebSocket connection: {}", client_addr);
74
75    info!("Connecting to server {}", server_addr);
76
77    let server_url = url::Url::parse(&server_addr).unwrap();
78
79    let ws_stream = match connect_async(&server_url).await {
80      Ok((stream, _)) => stream,
81      Err(e) => {
82        error!("Cannot connect: {:?}", e);
83        return;
84      }
85    };
86    info!("WebSocket handshake has been successfully completed");
87
88    let (server_write, server_read) = ws_stream.split();
89
90    let (client_write, client_read) = client_ws_stream.split();
91
92    let client_fut = client_read
93      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
94      .forward(server_write);
95    let server_fut = server_read
96      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
97      .forward(client_write);
98    future::select(client_fut, server_fut).await;
99    info!("Closing repeater connection.");
100  }
101}