1use std::collections::HashMap;
2use std::net::{Ipv4Addr, SocketAddr};
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use futures_util::stream::{SplitSink, SplitStream};
9use futures_util::{SinkExt, StreamExt};
10use ipnetwork::Ipv4Network;
11use log::{debug, error, info};
12use mdns_sd::{IfKind, ServiceDaemon, ServiceInfo};
13use notify::event::AccessKind;
14use notify::{self, EventKind, Watcher};
15use packet::{Builder as _, Packet, ip, udp};
16use tokio::net::{TcpListener, TcpStream, UdpSocket};
17use tokio::select;
18use tokio::sync::Mutex;
19use tokio::sync::mpsc::{Receiver, Sender, channel};
20use tokio::task::JoinHandle;
21use tokio_tungstenite::WebSocketStream;
22use tokio_tungstenite::tungstenite::Message;
23use tokio_util::bytes::Bytes;
24use tokio_util::codec::Framed;
25use tun::{self, AsyncDevice, TunPacketCodec};
26use uuid::Uuid;
27
28use crate::protocol::{
29 API_VERSION, Authentication, Hello, Identified, Identify, MessageRequest, MessageRequestData,
30 MessageResponse, MessageToRelay, MessageToStreamer, MoblinkResult, Present, ResponseData,
31 StartTunnelRequest, calculate_authentication,
32};
33use crate::utils::{AnyError, execute_command, random_string, resolve_host};
34use crate::{MDNS_SERVICE_TYPE, belaui};
35
36type WebSocketWriter = SplitSink<WebSocketStream<TcpStream>, Message>;
37type WebSocketReader = SplitStream<WebSocketStream<TcpStream>>;
38
39type TunWriter = SplitSink<Framed<AsyncDevice, TunPacketCodec>, Vec<u8>>;
40type TunReader = SplitStream<Framed<AsyncDevice, TunPacketCodec>>;
41
42#[derive(Debug)]
43struct PacketBuilder {
44 source_address: Ipv4Addr,
45 source_port: u16,
46 destination_address: Ipv4Addr,
47 destination_port: u16,
48}
49
50impl PacketBuilder {
51 fn new(
52 source_address: Ipv4Addr,
53 source_port: u16,
54 destination_address: Ipv4Addr,
55 destination_port: u16,
56 ) -> Self {
57 Self {
58 source_address,
59 source_port,
60 destination_address,
61 destination_port,
62 }
63 }
64
65 fn pack(&self, payload: &[u8]) -> Result<Vec<u8>, AnyError> {
66 Ok(ip::v4::Builder::default()
67 .source(self.source_address)?
68 .destination(self.destination_address)?
69 .udp()?
70 .source(self.source_port)?
71 .destination(self.destination_port)?
72 .payload(payload)?
73 .build()?)
74 }
75}
76
77struct Relay {
78 me: Weak<Mutex<Self>>,
79 streamer: Weak<Mutex<StreamerInner>>,
80 relay_address: SocketAddr,
81 writer: Option<WebSocketWriter>,
82 challenge: String,
83 salt: String,
84 identified: bool,
85 relay_id: Uuid,
86 relay_name: String,
87 relay_tunnel_port: Option<u16>,
88 tun_ip_address: String,
89 relay_receiver: Option<JoinHandle<()>>,
90 tun_receiver: Option<JoinHandle<()>>,
91 unique_index: u32,
92 pong_received: bool,
93}
94
95impl Relay {
96 pub fn new(
97 streamer: Weak<Mutex<StreamerInner>>,
98 relay_address: SocketAddr,
99 writer: WebSocketWriter,
100 tun_ip_address: String,
101 unique_index: u32,
102 ) -> Arc<Mutex<Self>> {
103 Arc::new_cyclic(|me| {
104 Mutex::new(Self {
105 me: me.clone(),
106 streamer,
107 relay_address,
108 writer: Some(writer),
109 challenge: String::new(),
110 salt: String::new(),
111 identified: false,
112 relay_id: Uuid::new_v4(),
113 relay_name: "".into(),
114 relay_tunnel_port: None,
115 tun_ip_address,
116 relay_receiver: None,
117 tun_receiver: None,
118 unique_index,
119 pong_received: true,
120 })
121 })
122 }
123
124 fn start(&mut self, reader: WebSocketReader) {
125 self.start_websocket_receiver(reader);
126 self.start_pinger();
127 }
128
129 fn start_websocket_receiver(&mut self, mut reader: WebSocketReader) {
130 let relay = self.me.clone();
131
132 tokio::spawn(async move {
133 let Some(relay) = relay.upgrade() else {
134 return;
135 };
136
137 relay.lock().await.start_handshake().await;
138
139 loop {
140 match tokio::time::timeout(Duration::from_secs(20), reader.next()).await {
141 Ok(Some(Ok(message))) => {
142 if let Err(error) =
143 relay.lock().await.handle_websocket_message(message).await
144 {
145 error!("Relay error: {}", error);
146 break;
147 }
148 }
149 Ok(Some(Err(error))) => {
150 info!("Websocket error {}", error);
151 break;
152 }
153 Ok(None) => {
154 info!("No more websocket messages to receive");
155 break;
156 }
157 Err(_) => {
158 info!("Websocket read timeout");
159 if relay.lock().await.writer.is_none() {
160 break;
161 }
162 }
163 }
164 }
165 let streamer = {
166 let mut relay = relay.lock().await;
167 info!("Relay disconnected: {}", relay.relay_address);
168 relay.tunnel_destroyed().await;
169 relay.streamer.upgrade()
170 };
171 if let Some(streamer) = streamer {
172 streamer.lock().await.remove_relay(&relay).await;
173 }
174 });
175 }
176
177 fn start_pinger(&mut self) {
178 let relay = self.me.clone();
179
180 tokio::spawn(async move {
181 loop {
182 {
183 let Some(relay) = relay.upgrade() else {
184 break;
185 };
186 let mut relay = relay.lock().await;
187 if !relay.pong_received {
188 info!("Pong not received.");
189 relay.writer = None;
190 break;
191 } else {
192 relay.pong_received = false;
193 relay.send_websocket(Message::Ping(Bytes::new())).await.ok();
194 }
195 }
196 tokio::time::sleep(Duration::from_secs(10)).await;
197 }
198 });
199 }
200
201 async fn handle_websocket_message(&mut self, message: Message) -> Result<(), AnyError> {
202 debug!("Websocket got: {:?}", message);
203 match message {
204 Message::Text(text) => match serde_json::from_str(&text) {
205 Ok(message) => self.handle_message(message).await,
206 Err(error) => {
207 Err(format!("Failed to deserialize message with error: {}", error).into())
208 }
209 },
210 Message::Ping(data) => Ok(self.send_websocket(Message::Pong(data)).await?),
211 Message::Pong(_) => {
212 self.pong_received = true;
213 Ok(())
214 }
215 _ => Err(format!("Unsupported websocket message: {:?}", message).into()),
216 }
217 }
218
219 async fn handle_message(&mut self, message: MessageToStreamer) -> Result<(), AnyError> {
220 match message {
221 MessageToStreamer::Identify(identify) => self.handle_message_identify(identify).await,
222 MessageToStreamer::Response(response) => self.handle_message_response(response).await,
223 }
224 }
225
226 async fn handle_message_identify(&mut self, identify: Identify) -> Result<(), AnyError> {
227 let Some(streamer) = self.streamer.upgrade() else {
228 return Err("No streamer".into());
229 };
230 let streamer = streamer.lock().await;
231 if identify.authentication
232 == calculate_authentication(&streamer.password, &self.salt, &self.challenge)
233 {
234 self.identified = true;
235 self.relay_id = identify.id;
236 self.relay_name = identify.name;
237 let identified = Identified {
238 result: MoblinkResult::Ok(Present {}),
239 };
240 self.send(MessageToRelay::Identified(identified)).await?;
241 self.start_tunnel(&streamer.destination_address, streamer.destination_port)
242 .await
243 } else {
244 let identified = Identified {
245 result: MoblinkResult::WrongPassword(Present {}),
246 };
247 self.send(MessageToRelay::Identified(identified)).await?;
248 Err("Relay sent wrong password".into())
249 }
250 }
251
252 async fn handle_message_response(&mut self, response: MessageResponse) -> Result<(), AnyError> {
253 match response.data {
254 ResponseData::StartTunnel(data) => {
255 self.relay_tunnel_port = Some(data.port);
256 self.tunnel_created().await?;
257 }
258 message => {
259 info!("Ignoring message {:?}", message);
260 }
261 }
262 Ok(())
263 }
264
265 async fn tunnel_created(&mut self) -> Result<(), AnyError> {
266 let Some(relay_tunnel_port) = self.relay_tunnel_port else {
267 return Ok(());
268 };
269 info!(
270 "Tunnel created: {}:{} ({}, {})",
271 self.relay_address.ip(),
272 relay_tunnel_port,
273 self.relay_name,
274 self.relay_id
275 );
276 self.start_udp_networking(relay_tunnel_port).await?;
277 Ok(())
278 }
279
280 async fn tunnel_destroyed(&mut self) {
281 let Some(relay_tunnel_port) = self.relay_tunnel_port.take() else {
282 return;
283 };
284 info!(
285 "Tunnel destroyed: {}:{} ({}, {})",
286 self.relay_address.ip(),
287 relay_tunnel_port,
288 self.relay_name,
289 self.relay_id
290 );
291 self.stop_udp_networking().await;
292 }
293
294 async fn start_udp_networking(&mut self, relay_tunnel_port: u16) -> Result<(), AnyError> {
295 let (tun_writer, tun_reader) = self.create_tun_device()?;
296 let relay_socket = self.create_relay_socket(relay_tunnel_port).await?;
297 self.setup_os_networking().await;
298 let (tun_port_writer, tun_port_reader) = channel(1);
299 self.start_relay_receiver(relay_socket.clone(), tun_writer, tun_port_reader)
300 .await?;
301 self.start_tun_receiver(tun_reader, relay_socket, tun_port_writer)
302 .await;
303
304 Ok(())
305 }
306
307 async fn stop_udp_networking(&mut self) {
308 if let Some(relay_receiver) = self.relay_receiver.take() {
309 relay_receiver.abort();
310 relay_receiver.await.ok();
311 }
312 if let Some(tun_receiver) = self.tun_receiver.take() {
313 tun_receiver.abort();
314 tun_receiver.await.ok();
315 }
316 self.teardown_os_networking().await;
317 }
318
319 async fn create_relay_socket(
320 &self,
321 relay_tunnel_port: u16,
322 ) -> Result<Arc<UdpSocket>, AnyError> {
323 let socket = UdpSocket::bind("0.0.0.0:0").await?;
324 let tunnel_address = format!("{}:{}", self.relay_address.ip(), relay_tunnel_port);
325 socket.connect(tunnel_address).await?;
326 Ok(Arc::new(socket))
327 }
328
329 fn create_tun_device(&self) -> Result<(TunWriter, TunReader), AnyError> {
330 let mut config = tun::Configuration::default();
331 config
332 .address(&self.tun_ip_address)
333 .tun_name(self.tun_device_name())
334 .up();
335 let device = tun::create_as_async(&config)?;
336 Ok(device.into_framed().split())
337 }
338
339 #[cfg(not(target_os = "macos"))]
340 fn tun_device_name(&self) -> String {
341 use libc::IF_NAMESIZE;
342 let name = self
343 .relay_name
344 .replace(|c: char| !c.is_ascii() || c.is_whitespace(), "-");
345 let name = format!("mob{}-{}", self.unique_index, name);
346 name[..name.len().min(IF_NAMESIZE - 1)].to_string()
347 }
348
349 #[cfg(target_os = "macos")]
350 fn tun_device_name(&self) -> String {
351 format!("utun{}", 99 + self.unique_index)
352 }
353
354 async fn setup_os_networking(&self) {
355 #[cfg(target_os = "linux")]
356 self.setup_linux_networking().await;
357 }
358
359 #[allow(dead_code)]
360 async fn setup_linux_networking(&self) {
361 let Some(streamer) = self.streamer.upgrade() else {
362 return;
363 };
364 let destination_address = &streamer.lock().await.destination_address;
365 let table = self.get_linux_networking_table();
366 self.teardown_linux_networking().await;
367 execute_command(
368 "ip",
369 &[
370 "route",
371 "add",
372 destination_address,
373 "dev",
374 &self.tun_device_name(),
375 "proto",
376 "kernel",
377 "scope",
378 "link",
379 "src",
380 &self.tun_ip_address,
381 "table",
382 &table,
383 ],
384 )
385 .await;
386 execute_command(
387 "ip",
388 &[
389 "route",
390 "add",
391 "default",
392 "via",
393 &self.tun_ip_address,
394 "dev",
395 &self.tun_device_name(),
396 "table",
397 &table,
398 ],
399 )
400 .await;
401 execute_command(
402 "ip",
403 &[
404 "rule",
405 "add",
406 "from",
407 &self.tun_ip_address,
408 "lookup",
409 &table,
410 ],
411 )
412 .await;
413 }
414
415 async fn teardown_os_networking(&self) {
416 #[cfg(target_os = "linux")]
417 self.teardown_linux_networking().await;
418 }
419
420 #[allow(dead_code)]
421 async fn teardown_linux_networking(&self) {
422 let table = self.get_linux_networking_table();
423 execute_command("ip", &["rule", "del", "lookup", &table]).await;
424 execute_command("ip", &["route", "flush", "table", &table]).await;
425 }
426
427 fn get_linux_networking_table(&self) -> String {
428 format!("{}", 300 + self.unique_index)
429 }
430
431 async fn start_tun_receiver(
432 &mut self,
433 mut tun_reader: TunReader,
434 relay_socket: Arc<UdpSocket>,
435 tun_port_writer: Sender<u16>,
436 ) {
437 let Some(streamer) = self.streamer.upgrade() else {
438 return;
439 };
440 let streamer = streamer.lock().await;
441 let Ok(destination_address) = Ipv4Addr::from_str(&streamer.destination_address) else {
442 return;
443 };
444 self.tun_receiver = Some(tokio::spawn(async move {
445 let mut tun_port = 0u16;
446 while let Some(packet) = tun_reader.next().await {
447 if let Err(error) = Self::handle_tun_packet(
448 packet,
449 &relay_socket,
450 destination_address,
451 &tun_port_writer,
452 &mut tun_port,
453 )
454 .await
455 {
456 error!("TUN receiver: {}", error);
457 break;
458 }
459 }
460 }));
461 }
462
463 async fn handle_tun_packet(
464 packet: Result<Vec<u8>, std::io::Error>,
465 relay_socket: &Arc<UdpSocket>,
466 destination_address: Ipv4Addr,
467 tun_port_writer: &Sender<u16>,
468 tun_port: &mut u16,
469 ) -> Result<(), AnyError> {
470 match packet {
471 Ok(packet) => match ip::Packet::new(packet) {
472 Ok(ip::Packet::V4(packet)) => {
473 if packet.protocol() == ip::Protocol::Udp
474 && packet.destination() == destination_address
475 {
476 Self::handle_tun_udp_packet(
477 packet.payload(),
478 relay_socket,
479 tun_port_writer,
480 tun_port,
481 )
482 .await?;
483 }
484 }
485 Ok(ip::Packet::V6(_)) => {
486 debug!("TUN receiver: Discarding IPv6 packet");
487 }
488 Err(error) => {
489 return Err(format!("Invalid IP packet: {}", error).into());
490 }
491 },
492 Err(error) => {
493 return Err(format!("TUN receiver: Read failed with: {}", error).into());
494 }
495 }
496 Ok(())
497 }
498
499 async fn handle_tun_udp_packet(
500 packet: &[u8],
501 relay_socket: &Arc<UdpSocket>,
502 tun_port_writer: &Sender<u16>,
503 tun_port: &mut u16,
504 ) -> Result<(), AnyError> {
505 match udp::Packet::new(packet) {
506 Ok(packet) => {
507 let new_tun_port = packet.source();
508 if new_tun_port != *tun_port {
509 tun_port_writer.send(new_tun_port).await.ok();
510 *tun_port = new_tun_port;
511 }
512 if let Err(error) = relay_socket.send(packet.payload()).await {
513 return Err(format!("Send error {}", error).into());
514 }
515 }
516 Err(error) => {
517 return Err(format!("Invalid UDP packet: {}", error).into());
518 }
519 }
520 Ok(())
521 }
522
523 async fn start_relay_receiver(
524 &mut self,
525 relay_socket: Arc<UdpSocket>,
526 mut tun_writer: TunWriter,
527 mut tun_port_reader: Receiver<u16>,
528 ) -> Result<(), AnyError> {
529 let Some(streamer) = self.streamer.upgrade() else {
530 return Err("No streamer".into());
531 };
532 let streamer = streamer.lock().await;
533 let destination_address = streamer.destination_address.clone();
534 let destination_port = streamer.destination_port;
535 let tun_ip_address = self.tun_ip_address.clone();
536
537 self.relay_receiver = Some(tokio::spawn(async move {
538 let Ok(destination_address) = Ipv4Addr::from_str(&destination_address) else {
539 return;
540 };
541 let Ok(tun_ip_address) = Ipv4Addr::from_str(&tun_ip_address) else {
542 return;
543 };
544 let mut buffer = vec![0; 2048];
545 let mut packet_builder =
546 PacketBuilder::new(destination_address, destination_port, tun_ip_address, 10000);
547 loop {
548 if let Err(error) = select! {
549 result = relay_socket.recv(&mut buffer) => {
550 Self::handle_relay_packet(&mut tun_writer, &packet_builder, result, &buffer).await
551 }
552 tun_port = tun_port_reader.recv() => {
553 Self::handle_tun_port(&mut packet_builder, tun_port)
554 }
555 } {
556 error!("Relay receiver: Error {}", error);
557 break;
558 }
559 }
560 }));
561 Ok(())
562 }
563
564 async fn handle_relay_packet(
565 tun_writer: &mut TunWriter,
566 packet_builder: &PacketBuilder,
567 result: Result<usize, std::io::Error>,
568 buffer: &[u8],
569 ) -> Result<(), AnyError> {
570 match result {
571 Ok(length) => {
572 debug!("Relay receiver: Got {:?}", &buffer[..length]);
573 let Ok(packet) = packet_builder.pack(&buffer[..length]) else {
574 return Err("Relay receiver: IP create error".into());
575 };
576 if let Err(error) = tun_writer.send(packet).await {
577 Err(format!("Relay receiver: Send error {}", error).into())
578 } else {
579 Ok(())
580 }
581 }
582 Err(error) => Err(format!("Relay receiver: Error {}", error).into()),
583 }
584 }
585
586 fn handle_tun_port(
587 packet_builder: &mut PacketBuilder,
588 tun_port: Option<u16>,
589 ) -> Result<(), AnyError> {
590 let Some(tun_port) = tun_port else {
591 return Err("TUN port missing".into());
592 };
593 packet_builder.destination_port = tun_port;
594 info!("Relay receiver: Ready with {:?}", packet_builder);
595 Ok(())
596 }
597
598 async fn start_handshake(&mut self) {
599 self.challenge = random_string();
600 self.salt = random_string();
601 self.send_hello().await;
602 self.identified = false;
603 }
604
605 async fn start_tunnel(
606 &mut self,
607 destination_address: &str,
608 destination_port: u16,
609 ) -> Result<(), AnyError> {
610 if !self.identified {
611 return Ok(());
612 }
613 if destination_address.is_empty() {
614 return Err("Destination address not available".into());
615 }
616 let start_tunnel = StartTunnelRequest {
617 address: destination_address.to_string(),
618 port: destination_port,
619 };
620 let request = MessageRequest {
621 id: 1,
622 data: MessageRequestData::StartTunnel(start_tunnel),
623 };
624 self.send(MessageToRelay::Request(request)).await
625 }
626
627 async fn send_hello(&mut self) {
628 let hello = MessageToRelay::Hello(Hello {
629 api_version: API_VERSION.into(),
630 authentication: Authentication {
631 challenge: self.challenge.clone(),
632 salt: self.salt.clone(),
633 },
634 });
635 self.send(hello).await.ok();
636 }
637
638 async fn send(&mut self, message: MessageToRelay) -> Result<(), AnyError> {
639 let text = serde_json::to_string(&message)?;
640 self.send_websocket(Message::Text(text.into())).await
641 }
642
643 async fn send_websocket(&mut self, message: Message) -> Result<(), AnyError> {
644 match self.writer.as_mut() {
645 Some(writer) => {
646 debug!("Websocket sending: {:?}", message);
647 writer.send(message).await?;
648 }
649 _ => {
650 return Err("No websocket writer".into());
651 }
652 }
653 Ok(())
654 }
655}
656
657struct StreamerInner {
658 me: Weak<Mutex<Self>>,
659 id: String,
660 name: String,
661 address: String,
662 port: u16,
663 password: String,
664 destination_address: String,
665 destination_port: u16,
666 belabox: bool,
667 belabox_config: PathBuf,
668 relays: Vec<Arc<Mutex<Relay>>>,
669 unique_indexes: Vec<u32>,
670 tun_ip_network: Ipv4Network,
671 service_daemon: ServiceDaemon,
672}
673
674impl StreamerInner {
675 pub fn new(
676 id: String,
677 name: String,
678 address: String,
679 port: u16,
680 tun_ip_network: String,
681 password: String,
682 destination_address: String,
683 destination_port: u16,
684 belabox: bool,
685 belabox_config: PathBuf,
686 ) -> Result<Arc<Mutex<Self>>, Box<dyn std::error::Error + Send + Sync>> {
687 let tun_ip_network = parse_tun_ip_network(&tun_ip_network)?;
688 Ok(Arc::new_cyclic(|me| {
689 Mutex::new(Self {
690 me: me.clone(),
691 id,
692 name,
693 address,
694 port,
695 password,
696 destination_address,
697 destination_port,
698 belabox,
699 belabox_config,
700 relays: Vec::new(),
701 unique_indexes: (1..tun_ip_network.size() - 1).rev().collect(),
702 tun_ip_network,
703 service_daemon: Self::create_service_daemon(),
704 })
705 }))
706 }
707
708 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
709 if self.belabox {
710 if let Err(error) = self.read_belaui_config_file().await {
711 error!("Read BELABOX config error: {}", error);
712 }
713 self.start_belaui_config_watcher();
714 } else {
715 self.destination_address = resolve_host(&self.destination_address).await?;
716 }
717 self.start_relay_listener().await?;
718 self.start_mdns_daemon();
719 Ok(())
720 }
721
722 fn create_service_daemon() -> ServiceDaemon {
723 let service_daemon = ServiceDaemon::new().unwrap();
724 service_daemon
725 .disable_interface(Vec::from([IfKind::IPv6]))
726 .ok();
727 service_daemon
728 }
729
730 async fn start_relay_listener(
731 &mut self,
732 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
733 let listener_address = format!("{}:{}", self.address, self.port);
734 let listener = TcpListener::bind(&listener_address).await?;
735 info!("WebSocket server listening on '{}'", listener_address);
736 let streamer = self.me.clone();
737
738 tokio::spawn(async move {
739 while let Ok((tcp_stream, relay_address)) = listener.accept().await {
740 match streamer.upgrade() {
741 Some(streamer) => {
742 streamer
743 .lock()
744 .await
745 .handle_relay_connection(tcp_stream, relay_address)
746 .await;
747 }
748 _ => {
749 break;
750 }
751 }
752 }
753 });
754
755 Ok(())
756 }
757
758 fn start_belaui_config_watcher(&mut self) {
759 let (async_events_writer, mut async_events_reader) = tokio::sync::mpsc::channel(1);
760 let belabox_config = self.belabox_config.clone();
761 std::thread::spawn(move || {
762 let (events_writer, events_reader) =
763 std::sync::mpsc::channel::<notify::Result<notify::Event>>();
764 let Ok(mut watcher) = notify::recommended_watcher(events_writer) else {
765 error!("Failed to create watcher");
766 return;
767 };
768 if let Err(error) = watcher.watch(&belabox_config, notify::RecursiveMode::NonRecursive)
769 {
770 error!("Watch failed with error: {}", error);
771 return;
772 }
773 for result in events_reader {
774 if async_events_writer.blocking_send(result).is_err() {
775 break;
776 }
777 }
778 });
779
780 let streamer = self.me.clone();
781 tokio::spawn(async move {
782 while let Some(result) = async_events_reader.recv().await {
783 match result {
784 Ok(event) => {
785 let EventKind::Access(AccessKind::Close(_)) = event.kind else {
786 continue;
787 };
788 let Some(streamer) = streamer.upgrade() else {
789 continue;
790 };
791 let mut streamer = streamer.lock().await;
792 match streamer.read_belaui_config_file().await {
793 Ok(true) => {
794 for relay in &streamer.relays {
795 let mut relay = relay.lock().await;
796 relay.tunnel_destroyed().await;
797 relay
798 .start_tunnel(
799 &streamer.destination_address,
800 streamer.destination_port,
801 )
802 .await
803 .ok();
804 }
805 }
806 Ok(false) => {}
807 Err(error) => {
808 error!("Read BELABOX config error: {}", error)
809 }
810 }
811 }
812 Err(error) => error!("Config error: {:?}", error),
813 }
814 }
815 });
816 }
817
818 fn start_mdns_daemon(&mut self) {
819 match self.create_mdns_service_info() {
820 Ok(service_info) => {
821 if let Err(error) = self.service_daemon.register(service_info) {
822 error!("Failed to register mDNS service with error: {}", error);
823 }
824 }
825 Err(error) => {
826 error!("Failed to create mDNS service info with error: {}", error);
827 }
828 }
829 }
830
831 async fn read_belaui_config_file(&mut self) -> Result<bool, AnyError> {
832 let config = belaui::Config::new_from_file(&self.belabox_config).await?;
833 let mut destination_changed = false;
834 let address = resolve_host(&config.get_address()).await?;
835 if self.destination_address != address {
836 self.destination_address = address;
837 info!("New destination address {}", self.destination_address);
838 destination_changed = true;
839 }
840 if self.destination_port != config.get_port() {
841 self.destination_port = config.get_port();
842 info!("New destination port {}", self.destination_port);
843 destination_changed = true;
844 }
845 Ok(destination_changed)
846 }
847
848 fn create_mdns_service_info(&self) -> Result<ServiceInfo, AnyError> {
849 let properties = HashMap::from([("name".to_string(), self.name.clone())]);
850 let service_info = ServiceInfo::new(
851 MDNS_SERVICE_TYPE,
852 &self.id,
853 &format!("{}.local.", self.id),
854 "",
855 self.port,
856 properties,
857 )?
858 .enable_addr_auto();
859 Ok(service_info)
860 }
861
862 async fn handle_relay_connection(&mut self, tcp_stream: TcpStream, relay_address: SocketAddr) {
863 match tokio_tungstenite::accept_async(tcp_stream).await {
864 Ok(websocket_stream) => {
865 info!("Relay connected: {}", relay_address);
866 let (writer, reader) = websocket_stream.split();
867 let Some(unique_index) = self.unique_indexes.pop() else {
868 return;
869 };
870 let Some(tun_ip_address) = self.tun_ip_network.nth(unique_index) else {
871 self.unique_indexes.insert(0, unique_index);
872 return;
873 };
874 let relay = Relay::new(
875 self.me.clone(),
876 relay_address,
877 writer,
878 tun_ip_address.to_string(),
879 unique_index,
880 );
881 relay.lock().await.start(reader);
882 self.add_relay(relay);
883 }
884 Err(error) => {
885 error!("Relay websocket handshake failed with: {}", error);
886 }
887 }
888 }
889
890 fn add_relay(&mut self, relay: Arc<Mutex<Relay>>) {
891 self.relays.push(relay);
892 self.log_number_of_relays();
893 }
894
895 async fn remove_relay(&mut self, relay: &Arc<Mutex<Relay>>) {
896 let unique_index = relay.lock().await.unique_index;
897 self.unique_indexes.insert(0, unique_index);
898 self.relays.retain(|r| !Arc::ptr_eq(r, relay));
899 self.log_number_of_relays();
900 }
901
902 fn log_number_of_relays(&self) {
903 info!("Number of relays: {}", self.relays.len())
904 }
905}
906
907pub struct Streamer {
908 inner: Arc<Mutex<StreamerInner>>,
909}
910
911impl Streamer {
912 pub fn new(
913 id: String,
914 name: String,
915 address: String,
916 port: u16,
917 tun_ip_network: String,
918 password: String,
919 destination_address: String,
920 destination_port: u16,
921 belabox: bool,
922 belabox_config: PathBuf,
923 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
924 Ok(Self {
925 inner: StreamerInner::new(
926 id,
927 name,
928 address,
929 port,
930 tun_ip_network,
931 password,
932 destination_address,
933 destination_port,
934 belabox,
935 belabox_config,
936 )?,
937 })
938 }
939
940 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
941 self.inner.lock().await.start().await
942 }
943}
944
945fn parse_tun_ip_network(network: &str) -> Result<Ipv4Network, AnyError> {
946 let network: Ipv4Network = network.parse()?;
947 if network.size() > 256 {
948 return Err(format!("TUN IP network too big ({} > 256)", network.size()).into());
949 }
950 Ok(network)
951}