Skip to main content

intiface_engine/
repeater.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8// Is this just two examples from tokio_tungstenite glued together?
9//
10// It absolute is!
11
12use futures_util::{StreamExt, TryStreamExt, future};
13use log::info;
14use tokio::{
15  net::{TcpListener, TcpStream},
16  select,
17};
18use tokio_tungstenite::connect_async;
19use tokio_util::sync::CancellationToken;
20
21pub struct ButtplugRepeater {
22  local_port: u16,
23  remote_address: String,
24  stop_token: CancellationToken,
25}
26
27impl ButtplugRepeater {
28  pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self {
29    Self {
30      local_port,
31      remote_address: remote_address.to_owned(),
32      stop_token,
33    }
34  }
35
36  pub async fn listen(&self) {
37    info!("Repeater loop starting");
38    let addr = format!("127.0.0.1:{}", self.local_port);
39
40    let try_socket = TcpListener::bind(&addr).await;
41    let listener = try_socket.expect("Failed to bind");
42    info!("Listening on: {}", addr);
43
44    loop {
45      select! {
46        stream_result = listener.accept() => {
47          match stream_result {
48            Ok((stream, _)) => {
49              let mut remote_address = self.remote_address.clone();
50              if !remote_address.starts_with("ws://") {
51                remote_address.insert_str(0, "ws://");
52              }
53              tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream));
54            },
55            Err(e) => {
56              error!("Error accepting new websocket for repeater: {:?}", e);
57              break;
58            }
59          }
60        },
61        _ = self.stop_token.cancelled() => {
62          info!("Repeater loop requested to stop, breaking.");
63          break;
64        }
65      }
66    }
67    info!("Repeater loop exiting");
68  }
69
70  async fn accept_connection(server_addr: String, stream: TcpStream) {
71    let client_addr = stream
72      .peer_addr()
73      .expect("connected streams should have a peer address");
74    info!("Client address: {}", client_addr);
75
76    let client_ws_stream = tokio_tungstenite::accept_async(stream)
77      .await
78      .expect("Error during the websocket handshake occurred");
79
80    info!("New WebSocket connection: {}", client_addr);
81
82    info!("Connecting to server {}", server_addr);
83
84    let server_url = url::Url::parse(&server_addr).unwrap();
85
86    let ws_stream = match connect_async(&server_url).await {
87      Ok((stream, _)) => stream,
88      Err(e) => {
89        error!("Cannot connect: {:?}", e);
90        return;
91      }
92    };
93    info!("WebSocket handshake has been successfully completed");
94
95    let (server_write, server_read) = ws_stream.split();
96
97    let (client_write, client_read) = client_ws_stream.split();
98
99    let client_fut = client_read
100      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
101      .forward(server_write);
102    let server_fut = server_read
103      .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
104      .forward(client_write);
105    future::select(client_fut, server_fut).await;
106    info!("Closing repeater connection.");
107  }
108}