intiface_engine/
repeater.rs1use futures_util::{StreamExt, TryStreamExt, future};
13use log::info;
14use tokio::{
15 net::{TcpListener, TcpStream},
16 select,
17};
18use tokio_tungstenite::connect_async;
19use tokio_util::sync::CancellationToken;
20
21pub struct ButtplugRepeater {
22 local_port: u16,
23 remote_address: String,
24 stop_token: CancellationToken,
25}
26
27impl ButtplugRepeater {
28 pub fn new(local_port: u16, remote_address: &str, stop_token: CancellationToken) -> Self {
29 Self {
30 local_port,
31 remote_address: remote_address.to_owned(),
32 stop_token,
33 }
34 }
35
36 pub async fn listen(&self) {
37 info!("Repeater loop starting");
38 let addr = format!("127.0.0.1:{}", self.local_port);
39
40 let try_socket = TcpListener::bind(&addr).await;
41 let listener = try_socket.expect("Failed to bind");
42 info!("Listening on: {}", addr);
43
44 loop {
45 select! {
46 stream_result = listener.accept() => {
47 match stream_result {
48 Ok((stream, _)) => {
49 let mut remote_address = self.remote_address.clone();
50 if !remote_address.starts_with("ws://") {
51 remote_address.insert_str(0, "ws://");
52 }
53 tokio::spawn(ButtplugRepeater::accept_connection(remote_address, stream));
54 },
55 Err(e) => {
56 error!("Error accepting new websocket for repeater: {:?}", e);
57 break;
58 }
59 }
60 },
61 _ = self.stop_token.cancelled() => {
62 info!("Repeater loop requested to stop, breaking.");
63 break;
64 }
65 }
66 }
67 info!("Repeater loop exiting");
68 }
69
70 async fn accept_connection(server_addr: String, stream: TcpStream) {
71 let client_addr = stream
72 .peer_addr()
73 .expect("connected streams should have a peer address");
74 info!("Client address: {}", client_addr);
75
76 let client_ws_stream = tokio_tungstenite::accept_async(stream)
77 .await
78 .expect("Error during the websocket handshake occurred");
79
80 info!("New WebSocket connection: {}", client_addr);
81
82 info!("Connecting to server {}", server_addr);
83
84 let server_url = url::Url::parse(&server_addr).unwrap();
85
86 let ws_stream = match connect_async(&server_url).await {
87 Ok((stream, _)) => stream,
88 Err(e) => {
89 error!("Cannot connect: {:?}", e);
90 return;
91 }
92 };
93 info!("WebSocket handshake has been successfully completed");
94
95 let (server_write, server_read) = ws_stream.split();
96
97 let (client_write, client_read) = client_ws_stream.split();
98
99 let client_fut = client_read
100 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
101 .forward(server_write);
102 let server_fut = server_read
103 .try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
104 .forward(client_write);
105 future::select(client_fut, server_fut).await;
106 info!("Closing repeater connection.");
107 }
108}