moblink_rust/
streamer.rs

1use std::collections::HashMap;
2use std::net::{Ipv4Addr, SocketAddr};
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::{Arc, Weak};
6use std::time::Duration;
7
8use futures_util::stream::{SplitSink, SplitStream};
9use futures_util::{SinkExt, StreamExt};
10use ipnetwork::Ipv4Network;
11use log::{debug, error, info};
12use mdns_sd::{IfKind, ServiceDaemon, ServiceInfo};
13use notify::event::AccessKind;
14use notify::{self, EventKind, Watcher};
15use packet::{Builder as _, Packet, ip, udp};
16use tokio::net::{TcpListener, TcpStream, UdpSocket};
17use tokio::select;
18use tokio::sync::Mutex;
19use tokio::sync::mpsc::{Receiver, Sender, channel};
20use tokio::task::JoinHandle;
21use tokio_tungstenite::WebSocketStream;
22use tokio_tungstenite::tungstenite::Message;
23use tokio_util::bytes::Bytes;
24use tokio_util::codec::Framed;
25use tun::{self, AsyncDevice, TunPacketCodec};
26use uuid::Uuid;
27
28use crate::protocol::{
29    API_VERSION, Authentication, Hello, Identified, Identify, MessageRequest, MessageRequestData,
30    MessageResponse, MessageToRelay, MessageToStreamer, MoblinkResult, Present, ResponseData,
31    StartTunnelRequest, calculate_authentication,
32};
33use crate::utils::{AnyError, execute_command, random_string, resolve_host};
34use crate::{MDNS_SERVICE_TYPE, belaui};
35
36type WebSocketWriter = SplitSink<WebSocketStream<TcpStream>, Message>;
37type WebSocketReader = SplitStream<WebSocketStream<TcpStream>>;
38
39type TunWriter = SplitSink<Framed<AsyncDevice, TunPacketCodec>, Vec<u8>>;
40type TunReader = SplitStream<Framed<AsyncDevice, TunPacketCodec>>;
41
42#[derive(Debug)]
43struct PacketBuilder {
44    source_address: Ipv4Addr,
45    source_port: u16,
46    destination_address: Ipv4Addr,
47    destination_port: u16,
48}
49
50impl PacketBuilder {
51    fn new(
52        source_address: Ipv4Addr,
53        source_port: u16,
54        destination_address: Ipv4Addr,
55        destination_port: u16,
56    ) -> Self {
57        Self {
58            source_address,
59            source_port,
60            destination_address,
61            destination_port,
62        }
63    }
64
65    fn pack(&self, payload: &[u8]) -> Result<Vec<u8>, AnyError> {
66        Ok(ip::v4::Builder::default()
67            .source(self.source_address)?
68            .destination(self.destination_address)?
69            .udp()?
70            .source(self.source_port)?
71            .destination(self.destination_port)?
72            .payload(payload)?
73            .build()?)
74    }
75}
76
77struct Relay {
78    me: Weak<Mutex<Self>>,
79    streamer: Weak<Mutex<StreamerInner>>,
80    relay_address: SocketAddr,
81    writer: Option<WebSocketWriter>,
82    challenge: String,
83    salt: String,
84    identified: bool,
85    relay_id: Uuid,
86    relay_name: String,
87    relay_tunnel_port: Option<u16>,
88    tun_ip_address: String,
89    relay_receiver: Option<JoinHandle<()>>,
90    tun_receiver: Option<JoinHandle<()>>,
91    unique_index: u32,
92    pong_received: bool,
93}
94
95impl Relay {
96    pub fn new(
97        streamer: Weak<Mutex<StreamerInner>>,
98        relay_address: SocketAddr,
99        writer: WebSocketWriter,
100        tun_ip_address: String,
101        unique_index: u32,
102    ) -> Arc<Mutex<Self>> {
103        Arc::new_cyclic(|me| {
104            Mutex::new(Self {
105                me: me.clone(),
106                streamer,
107                relay_address,
108                writer: Some(writer),
109                challenge: String::new(),
110                salt: String::new(),
111                identified: false,
112                relay_id: Uuid::new_v4(),
113                relay_name: "".into(),
114                relay_tunnel_port: None,
115                tun_ip_address,
116                relay_receiver: None,
117                tun_receiver: None,
118                unique_index,
119                pong_received: true,
120            })
121        })
122    }
123
124    fn start(&mut self, reader: WebSocketReader) {
125        self.start_websocket_receiver(reader);
126        self.start_pinger();
127    }
128
129    fn start_websocket_receiver(&mut self, mut reader: WebSocketReader) {
130        let relay = self.me.clone();
131
132        tokio::spawn(async move {
133            let Some(relay) = relay.upgrade() else {
134                return;
135            };
136
137            relay.lock().await.start_handshake().await;
138
139            loop {
140                match tokio::time::timeout(Duration::from_secs(20), reader.next()).await {
141                    Ok(Some(Ok(message))) => {
142                        if let Err(error) =
143                            relay.lock().await.handle_websocket_message(message).await
144                        {
145                            error!("Relay error: {}", error);
146                            break;
147                        }
148                    }
149                    Ok(Some(Err(error))) => {
150                        info!("Websocket error {}", error);
151                        break;
152                    }
153                    Ok(None) => {
154                        info!("No more websocket messages to receive");
155                        break;
156                    }
157                    Err(_) => {
158                        info!("Websocket read timeout");
159                        if relay.lock().await.writer.is_none() {
160                            break;
161                        }
162                    }
163                }
164            }
165            let streamer = {
166                let mut relay = relay.lock().await;
167                info!("Relay disconnected: {}", relay.relay_address);
168                relay.tunnel_destroyed().await;
169                relay.streamer.upgrade()
170            };
171            if let Some(streamer) = streamer {
172                streamer.lock().await.remove_relay(&relay).await;
173            }
174        });
175    }
176
177    fn start_pinger(&mut self) {
178        let relay = self.me.clone();
179
180        tokio::spawn(async move {
181            loop {
182                {
183                    let Some(relay) = relay.upgrade() else {
184                        break;
185                    };
186                    let mut relay = relay.lock().await;
187                    if !relay.pong_received {
188                        info!("Pong not received.");
189                        relay.writer = None;
190                        break;
191                    } else {
192                        relay.pong_received = false;
193                        relay.send_websocket(Message::Ping(Bytes::new())).await.ok();
194                    }
195                }
196                tokio::time::sleep(Duration::from_secs(10)).await;
197            }
198        });
199    }
200
201    async fn handle_websocket_message(&mut self, message: Message) -> Result<(), AnyError> {
202        debug!("Websocket got: {:?}", message);
203        match message {
204            Message::Text(text) => match serde_json::from_str(&text) {
205                Ok(message) => self.handle_message(message).await,
206                Err(error) => {
207                    Err(format!("Failed to deserialize message with error: {}", error).into())
208                }
209            },
210            Message::Ping(data) => Ok(self.send_websocket(Message::Pong(data)).await?),
211            Message::Pong(_) => {
212                self.pong_received = true;
213                Ok(())
214            }
215            _ => Err(format!("Unsupported websocket message: {:?}", message).into()),
216        }
217    }
218
219    async fn handle_message(&mut self, message: MessageToStreamer) -> Result<(), AnyError> {
220        match message {
221            MessageToStreamer::Identify(identify) => self.handle_message_identify(identify).await,
222            MessageToStreamer::Response(response) => self.handle_message_response(response).await,
223        }
224    }
225
226    async fn handle_message_identify(&mut self, identify: Identify) -> Result<(), AnyError> {
227        let Some(streamer) = self.streamer.upgrade() else {
228            return Err("No streamer".into());
229        };
230        let streamer = streamer.lock().await;
231        if identify.authentication
232            == calculate_authentication(&streamer.password, &self.salt, &self.challenge)
233        {
234            self.identified = true;
235            self.relay_id = identify.id;
236            self.relay_name = identify.name;
237            let identified = Identified {
238                result: MoblinkResult::Ok(Present {}),
239            };
240            self.send(MessageToRelay::Identified(identified)).await?;
241            self.start_tunnel(&streamer.destination_address, streamer.destination_port)
242                .await
243        } else {
244            let identified = Identified {
245                result: MoblinkResult::WrongPassword(Present {}),
246            };
247            self.send(MessageToRelay::Identified(identified)).await?;
248            Err("Relay sent wrong password".into())
249        }
250    }
251
252    async fn handle_message_response(&mut self, response: MessageResponse) -> Result<(), AnyError> {
253        match response.data {
254            ResponseData::StartTunnel(data) => {
255                self.relay_tunnel_port = Some(data.port);
256                self.tunnel_created().await?;
257            }
258            message => {
259                info!("Ignoring message {:?}", message);
260            }
261        }
262        Ok(())
263    }
264
265    async fn tunnel_created(&mut self) -> Result<(), AnyError> {
266        let Some(relay_tunnel_port) = self.relay_tunnel_port else {
267            return Ok(());
268        };
269        info!(
270            "Tunnel created: {}:{} ({}, {})",
271            self.relay_address.ip(),
272            relay_tunnel_port,
273            self.relay_name,
274            self.relay_id
275        );
276        self.start_udp_networking(relay_tunnel_port).await?;
277        Ok(())
278    }
279
280    async fn tunnel_destroyed(&mut self) {
281        let Some(relay_tunnel_port) = self.relay_tunnel_port.take() else {
282            return;
283        };
284        info!(
285            "Tunnel destroyed: {}:{} ({}, {})",
286            self.relay_address.ip(),
287            relay_tunnel_port,
288            self.relay_name,
289            self.relay_id
290        );
291        self.stop_udp_networking().await;
292    }
293
294    async fn start_udp_networking(&mut self, relay_tunnel_port: u16) -> Result<(), AnyError> {
295        let (tun_writer, tun_reader) = self.create_tun_device()?;
296        let relay_socket = self.create_relay_socket(relay_tunnel_port).await?;
297        self.setup_os_networking().await;
298        let (tun_port_writer, tun_port_reader) = channel(1);
299        self.start_relay_receiver(relay_socket.clone(), tun_writer, tun_port_reader)
300            .await?;
301        self.start_tun_receiver(tun_reader, relay_socket, tun_port_writer)
302            .await;
303
304        Ok(())
305    }
306
307    async fn stop_udp_networking(&mut self) {
308        if let Some(relay_receiver) = self.relay_receiver.take() {
309            relay_receiver.abort();
310            relay_receiver.await.ok();
311        }
312        if let Some(tun_receiver) = self.tun_receiver.take() {
313            tun_receiver.abort();
314            tun_receiver.await.ok();
315        }
316        self.teardown_os_networking().await;
317    }
318
319    async fn create_relay_socket(
320        &self,
321        relay_tunnel_port: u16,
322    ) -> Result<Arc<UdpSocket>, AnyError> {
323        let socket = UdpSocket::bind("0.0.0.0:0").await?;
324        let tunnel_address = format!("{}:{}", self.relay_address.ip(), relay_tunnel_port);
325        socket.connect(tunnel_address).await?;
326        Ok(Arc::new(socket))
327    }
328
329    fn create_tun_device(&self) -> Result<(TunWriter, TunReader), AnyError> {
330        let mut config = tun::Configuration::default();
331        config
332            .address(&self.tun_ip_address)
333            .tun_name(self.tun_device_name())
334            .up();
335        let device = tun::create_as_async(&config)?;
336        Ok(device.into_framed().split())
337    }
338
339    #[cfg(not(target_os = "macos"))]
340    fn tun_device_name(&self) -> String {
341        use libc::IF_NAMESIZE;
342        let name = self
343            .relay_name
344            .replace(|c: char| !c.is_ascii() || c.is_whitespace(), "-");
345        let name = format!("mob{}-{}", self.unique_index, name);
346        name[..name.len().min(IF_NAMESIZE - 1)].to_string()
347    }
348
349    #[cfg(target_os = "macos")]
350    fn tun_device_name(&self) -> String {
351        format!("utun{}", 99 + self.unique_index)
352    }
353
354    async fn setup_os_networking(&self) {
355        #[cfg(target_os = "linux")]
356        self.setup_linux_networking().await;
357    }
358
359    #[allow(dead_code)]
360    async fn setup_linux_networking(&self) {
361        let Some(streamer) = self.streamer.upgrade() else {
362            return;
363        };
364        let destination_address = &streamer.lock().await.destination_address;
365        let table = self.get_linux_networking_table();
366        self.teardown_linux_networking().await;
367        execute_command(
368            "ip",
369            &[
370                "route",
371                "add",
372                destination_address,
373                "dev",
374                &self.tun_device_name(),
375                "proto",
376                "kernel",
377                "scope",
378                "link",
379                "src",
380                &self.tun_ip_address,
381                "table",
382                &table,
383            ],
384        )
385        .await;
386        execute_command(
387            "ip",
388            &[
389                "route",
390                "add",
391                "default",
392                "via",
393                &self.tun_ip_address,
394                "dev",
395                &self.tun_device_name(),
396                "table",
397                &table,
398            ],
399        )
400        .await;
401        execute_command(
402            "ip",
403            &[
404                "rule",
405                "add",
406                "from",
407                &self.tun_ip_address,
408                "lookup",
409                &table,
410            ],
411        )
412        .await;
413    }
414
415    async fn teardown_os_networking(&self) {
416        #[cfg(target_os = "linux")]
417        self.teardown_linux_networking().await;
418    }
419
420    #[allow(dead_code)]
421    async fn teardown_linux_networking(&self) {
422        let table = self.get_linux_networking_table();
423        execute_command("ip", &["rule", "del", "lookup", &table]).await;
424        execute_command("ip", &["route", "flush", "table", &table]).await;
425    }
426
427    fn get_linux_networking_table(&self) -> String {
428        format!("{}", 300 + self.unique_index)
429    }
430
431    async fn start_tun_receiver(
432        &mut self,
433        mut tun_reader: TunReader,
434        relay_socket: Arc<UdpSocket>,
435        tun_port_writer: Sender<u16>,
436    ) {
437        let Some(streamer) = self.streamer.upgrade() else {
438            return;
439        };
440        let streamer = streamer.lock().await;
441        let Ok(destination_address) = Ipv4Addr::from_str(&streamer.destination_address) else {
442            return;
443        };
444        self.tun_receiver = Some(tokio::spawn(async move {
445            let mut tun_port = 0u16;
446            while let Some(packet) = tun_reader.next().await {
447                if let Err(error) = Self::handle_tun_packet(
448                    packet,
449                    &relay_socket,
450                    destination_address,
451                    &tun_port_writer,
452                    &mut tun_port,
453                )
454                .await
455                {
456                    error!("TUN receiver: {}", error);
457                    break;
458                }
459            }
460        }));
461    }
462
463    async fn handle_tun_packet(
464        packet: Result<Vec<u8>, std::io::Error>,
465        relay_socket: &Arc<UdpSocket>,
466        destination_address: Ipv4Addr,
467        tun_port_writer: &Sender<u16>,
468        tun_port: &mut u16,
469    ) -> Result<(), AnyError> {
470        match packet {
471            Ok(packet) => match ip::Packet::new(packet) {
472                Ok(ip::Packet::V4(packet)) => {
473                    if packet.protocol() == ip::Protocol::Udp
474                        && packet.destination() == destination_address
475                    {
476                        Self::handle_tun_udp_packet(
477                            packet.payload(),
478                            relay_socket,
479                            tun_port_writer,
480                            tun_port,
481                        )
482                        .await?;
483                    }
484                }
485                Ok(ip::Packet::V6(_)) => {
486                    debug!("TUN receiver: Discarding IPv6 packet");
487                }
488                Err(error) => {
489                    return Err(format!("Invalid IP packet: {}", error).into());
490                }
491            },
492            Err(error) => {
493                return Err(format!("TUN receiver: Read failed with: {}", error).into());
494            }
495        }
496        Ok(())
497    }
498
499    async fn handle_tun_udp_packet(
500        packet: &[u8],
501        relay_socket: &Arc<UdpSocket>,
502        tun_port_writer: &Sender<u16>,
503        tun_port: &mut u16,
504    ) -> Result<(), AnyError> {
505        match udp::Packet::new(packet) {
506            Ok(packet) => {
507                let new_tun_port = packet.source();
508                if new_tun_port != *tun_port {
509                    tun_port_writer.send(new_tun_port).await.ok();
510                    *tun_port = new_tun_port;
511                }
512                if let Err(error) = relay_socket.send(packet.payload()).await {
513                    return Err(format!("Send error {}", error).into());
514                }
515            }
516            Err(error) => {
517                return Err(format!("Invalid UDP packet: {}", error).into());
518            }
519        }
520        Ok(())
521    }
522
523    async fn start_relay_receiver(
524        &mut self,
525        relay_socket: Arc<UdpSocket>,
526        mut tun_writer: TunWriter,
527        mut tun_port_reader: Receiver<u16>,
528    ) -> Result<(), AnyError> {
529        let Some(streamer) = self.streamer.upgrade() else {
530            return Err("No streamer".into());
531        };
532        let streamer = streamer.lock().await;
533        let destination_address = streamer.destination_address.clone();
534        let destination_port = streamer.destination_port;
535        let tun_ip_address = self.tun_ip_address.clone();
536
537        self.relay_receiver = Some(tokio::spawn(async move {
538            let Ok(destination_address) = Ipv4Addr::from_str(&destination_address) else {
539                return;
540            };
541            let Ok(tun_ip_address) = Ipv4Addr::from_str(&tun_ip_address) else {
542                return;
543            };
544            let mut buffer = vec![0; 2048];
545            let mut packet_builder =
546                PacketBuilder::new(destination_address, destination_port, tun_ip_address, 10000);
547            loop {
548                if let Err(error) = select! {
549                    result = relay_socket.recv(&mut buffer) => {
550                        Self::handle_relay_packet(&mut tun_writer, &packet_builder, result, &buffer).await
551                    }
552                    tun_port = tun_port_reader.recv() => {
553                        Self::handle_tun_port(&mut packet_builder, tun_port)
554                    }
555                } {
556                    error!("Relay receiver: Error {}", error);
557                    break;
558                }
559            }
560        }));
561        Ok(())
562    }
563
564    async fn handle_relay_packet(
565        tun_writer: &mut TunWriter,
566        packet_builder: &PacketBuilder,
567        result: Result<usize, std::io::Error>,
568        buffer: &[u8],
569    ) -> Result<(), AnyError> {
570        match result {
571            Ok(length) => {
572                debug!("Relay receiver: Got {:?}", &buffer[..length]);
573                let Ok(packet) = packet_builder.pack(&buffer[..length]) else {
574                    return Err("Relay receiver: IP create error".into());
575                };
576                if let Err(error) = tun_writer.send(packet).await {
577                    Err(format!("Relay receiver: Send error {}", error).into())
578                } else {
579                    Ok(())
580                }
581            }
582            Err(error) => Err(format!("Relay receiver: Error {}", error).into()),
583        }
584    }
585
586    fn handle_tun_port(
587        packet_builder: &mut PacketBuilder,
588        tun_port: Option<u16>,
589    ) -> Result<(), AnyError> {
590        let Some(tun_port) = tun_port else {
591            return Err("TUN port missing".into());
592        };
593        packet_builder.destination_port = tun_port;
594        info!("Relay receiver: Ready with {:?}", packet_builder);
595        Ok(())
596    }
597
598    async fn start_handshake(&mut self) {
599        self.challenge = random_string();
600        self.salt = random_string();
601        self.send_hello().await;
602        self.identified = false;
603    }
604
605    async fn start_tunnel(
606        &mut self,
607        destination_address: &str,
608        destination_port: u16,
609    ) -> Result<(), AnyError> {
610        if !self.identified {
611            return Ok(());
612        }
613        if destination_address.is_empty() {
614            return Err("Destination address not available".into());
615        }
616        let start_tunnel = StartTunnelRequest {
617            address: destination_address.to_string(),
618            port: destination_port,
619        };
620        let request = MessageRequest {
621            id: 1,
622            data: MessageRequestData::StartTunnel(start_tunnel),
623        };
624        self.send(MessageToRelay::Request(request)).await
625    }
626
627    async fn send_hello(&mut self) {
628        let hello = MessageToRelay::Hello(Hello {
629            api_version: API_VERSION.into(),
630            authentication: Authentication {
631                challenge: self.challenge.clone(),
632                salt: self.salt.clone(),
633            },
634        });
635        self.send(hello).await.ok();
636    }
637
638    async fn send(&mut self, message: MessageToRelay) -> Result<(), AnyError> {
639        let text = serde_json::to_string(&message)?;
640        self.send_websocket(Message::Text(text.into())).await
641    }
642
643    async fn send_websocket(&mut self, message: Message) -> Result<(), AnyError> {
644        match self.writer.as_mut() {
645            Some(writer) => {
646                debug!("Websocket sending: {:?}", message);
647                writer.send(message).await?;
648            }
649            _ => {
650                return Err("No websocket writer".into());
651            }
652        }
653        Ok(())
654    }
655}
656
657struct StreamerInner {
658    me: Weak<Mutex<Self>>,
659    id: String,
660    name: String,
661    address: String,
662    port: u16,
663    password: String,
664    destination_address: String,
665    destination_port: u16,
666    belabox: bool,
667    belabox_config: PathBuf,
668    relays: Vec<Arc<Mutex<Relay>>>,
669    unique_indexes: Vec<u32>,
670    tun_ip_network: Ipv4Network,
671    service_daemon: ServiceDaemon,
672}
673
674impl StreamerInner {
675    pub fn new(
676        id: String,
677        name: String,
678        address: String,
679        port: u16,
680        tun_ip_network: String,
681        password: String,
682        destination_address: String,
683        destination_port: u16,
684        belabox: bool,
685        belabox_config: PathBuf,
686    ) -> Result<Arc<Mutex<Self>>, Box<dyn std::error::Error + Send + Sync>> {
687        let tun_ip_network = parse_tun_ip_network(&tun_ip_network)?;
688        Ok(Arc::new_cyclic(|me| {
689            Mutex::new(Self {
690                me: me.clone(),
691                id,
692                name,
693                address,
694                port,
695                password,
696                destination_address,
697                destination_port,
698                belabox,
699                belabox_config,
700                relays: Vec::new(),
701                unique_indexes: (1..tun_ip_network.size() - 1).rev().collect(),
702                tun_ip_network,
703                service_daemon: Self::create_service_daemon(),
704            })
705        }))
706    }
707
708    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
709        if self.belabox {
710            if let Err(error) = self.read_belaui_config_file().await {
711                error!("Read BELABOX config error: {}", error);
712            }
713            self.start_belaui_config_watcher();
714        } else {
715            self.destination_address = resolve_host(&self.destination_address).await?;
716        }
717        self.start_relay_listener().await?;
718        self.start_mdns_daemon();
719        Ok(())
720    }
721
722    fn create_service_daemon() -> ServiceDaemon {
723        let service_daemon = ServiceDaemon::new().unwrap();
724        service_daemon
725            .disable_interface(Vec::from([IfKind::IPv6]))
726            .ok();
727        service_daemon
728    }
729
730    async fn start_relay_listener(
731        &mut self,
732    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
733        let listener_address = format!("{}:{}", self.address, self.port);
734        let listener = TcpListener::bind(&listener_address).await?;
735        info!("WebSocket server listening on '{}'", listener_address);
736        let streamer = self.me.clone();
737
738        tokio::spawn(async move {
739            while let Ok((tcp_stream, relay_address)) = listener.accept().await {
740                match streamer.upgrade() {
741                    Some(streamer) => {
742                        streamer
743                            .lock()
744                            .await
745                            .handle_relay_connection(tcp_stream, relay_address)
746                            .await;
747                    }
748                    _ => {
749                        break;
750                    }
751                }
752            }
753        });
754
755        Ok(())
756    }
757
758    fn start_belaui_config_watcher(&mut self) {
759        let (async_events_writer, mut async_events_reader) = tokio::sync::mpsc::channel(1);
760        let belabox_config = self.belabox_config.clone();
761        std::thread::spawn(move || {
762            let (events_writer, events_reader) =
763                std::sync::mpsc::channel::<notify::Result<notify::Event>>();
764            let Ok(mut watcher) = notify::recommended_watcher(events_writer) else {
765                error!("Failed to create watcher");
766                return;
767            };
768            if let Err(error) = watcher.watch(&belabox_config, notify::RecursiveMode::NonRecursive)
769            {
770                error!("Watch failed with error: {}", error);
771                return;
772            }
773            for result in events_reader {
774                if async_events_writer.blocking_send(result).is_err() {
775                    break;
776                }
777            }
778        });
779
780        let streamer = self.me.clone();
781        tokio::spawn(async move {
782            while let Some(result) = async_events_reader.recv().await {
783                match result {
784                    Ok(event) => {
785                        let EventKind::Access(AccessKind::Close(_)) = event.kind else {
786                            continue;
787                        };
788                        let Some(streamer) = streamer.upgrade() else {
789                            continue;
790                        };
791                        let mut streamer = streamer.lock().await;
792                        match streamer.read_belaui_config_file().await {
793                            Ok(true) => {
794                                for relay in &streamer.relays {
795                                    let mut relay = relay.lock().await;
796                                    relay.tunnel_destroyed().await;
797                                    relay
798                                        .start_tunnel(
799                                            &streamer.destination_address,
800                                            streamer.destination_port,
801                                        )
802                                        .await
803                                        .ok();
804                                }
805                            }
806                            Ok(false) => {}
807                            Err(error) => {
808                                error!("Read BELABOX config error: {}", error)
809                            }
810                        }
811                    }
812                    Err(error) => error!("Config error: {:?}", error),
813                }
814            }
815        });
816    }
817
818    fn start_mdns_daemon(&mut self) {
819        match self.create_mdns_service_info() {
820            Ok(service_info) => {
821                if let Err(error) = self.service_daemon.register(service_info) {
822                    error!("Failed to register mDNS service with error: {}", error);
823                }
824            }
825            Err(error) => {
826                error!("Failed to create mDNS service info with error: {}", error);
827            }
828        }
829    }
830
831    async fn read_belaui_config_file(&mut self) -> Result<bool, AnyError> {
832        let config = belaui::Config::new_from_file(&self.belabox_config).await?;
833        let mut destination_changed = false;
834        let address = resolve_host(&config.get_address()).await?;
835        if self.destination_address != address {
836            self.destination_address = address;
837            info!("New destination address {}", self.destination_address);
838            destination_changed = true;
839        }
840        if self.destination_port != config.get_port() {
841            self.destination_port = config.get_port();
842            info!("New destination port {}", self.destination_port);
843            destination_changed = true;
844        }
845        Ok(destination_changed)
846    }
847
848    fn create_mdns_service_info(&self) -> Result<ServiceInfo, AnyError> {
849        let properties = HashMap::from([("name".to_string(), self.name.clone())]);
850        let service_info = ServiceInfo::new(
851            MDNS_SERVICE_TYPE,
852            &self.id,
853            &format!("{}.local.", self.id),
854            "",
855            self.port,
856            properties,
857        )?
858        .enable_addr_auto();
859        Ok(service_info)
860    }
861
862    async fn handle_relay_connection(&mut self, tcp_stream: TcpStream, relay_address: SocketAddr) {
863        match tokio_tungstenite::accept_async(tcp_stream).await {
864            Ok(websocket_stream) => {
865                info!("Relay connected: {}", relay_address);
866                let (writer, reader) = websocket_stream.split();
867                let Some(unique_index) = self.unique_indexes.pop() else {
868                    return;
869                };
870                let Some(tun_ip_address) = self.tun_ip_network.nth(unique_index) else {
871                    self.unique_indexes.insert(0, unique_index);
872                    return;
873                };
874                let relay = Relay::new(
875                    self.me.clone(),
876                    relay_address,
877                    writer,
878                    tun_ip_address.to_string(),
879                    unique_index,
880                );
881                relay.lock().await.start(reader);
882                self.add_relay(relay);
883            }
884            Err(error) => {
885                error!("Relay websocket handshake failed with: {}", error);
886            }
887        }
888    }
889
890    fn add_relay(&mut self, relay: Arc<Mutex<Relay>>) {
891        self.relays.push(relay);
892        self.log_number_of_relays();
893    }
894
895    async fn remove_relay(&mut self, relay: &Arc<Mutex<Relay>>) {
896        let unique_index = relay.lock().await.unique_index;
897        self.unique_indexes.insert(0, unique_index);
898        self.relays.retain(|r| !Arc::ptr_eq(r, relay));
899        self.log_number_of_relays();
900    }
901
902    fn log_number_of_relays(&self) {
903        info!("Number of relays: {}", self.relays.len())
904    }
905}
906
907pub struct Streamer {
908    inner: Arc<Mutex<StreamerInner>>,
909}
910
911impl Streamer {
912    pub fn new(
913        id: String,
914        name: String,
915        address: String,
916        port: u16,
917        tun_ip_network: String,
918        password: String,
919        destination_address: String,
920        destination_port: u16,
921        belabox: bool,
922        belabox_config: PathBuf,
923    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
924        Ok(Self {
925            inner: StreamerInner::new(
926                id,
927                name,
928                address,
929                port,
930                tun_ip_network,
931                password,
932                destination_address,
933                destination_port,
934                belabox,
935                belabox_config,
936            )?,
937        })
938    }
939
940    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
941        self.inner.lock().await.start().await
942    }
943}
944
945fn parse_tun_ip_network(network: &str) -> Result<Ipv4Network, AnyError> {
946    let network: Ipv4Network = network.parse()?;
947    if network.size() > 256 {
948        return Err(format!("TUN IP network too big ({} > 256)", network.size()).into());
949    }
950    Ok(network)
951}