intiface_engine/
repeater.rs1use futures_util::{future, StreamExt, TryStreamExt};
6use log::info;
7use tokio::{
8 net::{TcpListener, TcpStream},
9 select,
10};
11use tokio_tungstenite::connect_async;
12use tokio_util::sync::CancellationToken;
13
14pub struct ButtplugRepeater {
15 local_port: u16,
16 remote_address: String,
17 stop_token: CancellationToken,
18}
19
20impl ButtplugRepeater {
21 pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self {
22 Self {
23 local_port,
24 remote_address: remote_address.to_owned(),
25 stop_token,
26 }
27 }
28
29 pub async fn listen(&self) {
30 info!("Repeater loop starting");
31 let addr = format!("127.0.0.1:{}", self.local_port);
32
33 let try_socket = TcpListener::bind(&addr).await;
34 let listener = try_socket.expect("Failed to bind");
35 info!("Listening on: {}", addr);
36
37 loop {
38 select! {
39 stream_result = listener.accept() => {
40 match stream_result {
41 Ok((stream, _)) => {
42 let mut remote_address = self.remote_address.clone();
43 if !remote_address.starts_with("ws://") {
44 remote_address.insert_str(0, "ws://");
45 }
46 tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream));
47 },
48 Err(e) => {
49 error!("Error accepting new websocket for repeater: {:?}", e);
50 break;
51 }
52 }
53 },
54 _ = self.stop_token.cancelled() => {
55 info!("Repeater loop requested to stop, breaking.");
56 break;
57 }
58 }
59 }
60 info!("Repeater loop exiting");
61 }
62
63 async fn accept_connection(server_addr: String, stream: TcpStream) {
64 let client_addr = stream
65 .peer_addr()
66 .expect("connected streams should have a peer address");
67 info!("Client address: {}", client_addr);
68
69 let client_ws_stream = tokio_tungstenite::accept_async(stream)
70 .await
71 .expect("Error during the websocket handshake occurred");
72
73 info!("New WebSocket connection: {}", client_addr);
74
75 info!("Connecting to server {}", server_addr);
76
77 let server_url = url::Url::parse(&server_addr).unwrap();
78
79 let ws_stream = match connect_async(&server_url).await {
80 Ok((stream, _)) => stream,
81 Err(e) => {
82 error!("Cannot connect: {:?}", e);
83 return;
84 }
85 };
86 info!("WebSocket handshake has been successfully completed");
87
88 let (server_write, server_read) = ws_stream.split();
89
90 let (client_write, client_read) = client_ws_stream.split();
91
92 let client_fut = client_read
93 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
94 .forward(server_write);
95 let server_fut = server_read
96 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
97 .forward(client_write);
98 future::select(client_fut, server_fut).await;
99 info!("Closing repeater connection.");
100 }
101}