playit-agent-core 0.20.1

Contains the logic to create a playit.gg agent
Documentation
use std::{net::SocketAddr, time::Duration};

use message_encoding::MessageEncoding;
use playit_agent_proto::{control_feed::ControlFeed, control_messages::{ControlRequest, ControlResponse, Ping}, rpc::ControlRpcMessage};

use crate::utils::now_milli;

use super::{connected_control::ConnectedControl, errors::SetupError, PacketIO};


pub struct AddressSelector<IO: PacketIO> {
    options: Vec<SocketAddr>,
    packet_io: IO,
}

impl<IO: PacketIO> AddressSelector<IO> {
    pub fn new(options: Vec<SocketAddr>, packet_io: IO) -> Self {
        AddressSelector { options, packet_io }
    }

    pub async fn connect_to_first(self) -> Result<ConnectedControl<IO>, SetupError> {
        let mut buffer: Vec<u8> = Vec::new();

        for addr in self.options {
            tracing::info!(?addr, "trying to establish tunnel connection");

            let is_ip6 = addr.is_ipv6();
            let attempts = if is_ip6 { 1 } else { 3 };
            
            for _ in 0..attempts {
                buffer.clear();

                ControlRpcMessage {
                    request_id: 1,
                    content: ControlRequest::Ping(Ping {
                        now: now_milli(),
                        current_ping: None,
                        session_id: None,
                    }),
                }.write_to(&mut buffer)?;

                if let Err(error) = self.packet_io.send_to(&buffer, addr).await {
                    tracing::error!(?error, ?addr, "failed to send initial ping");
                    break;
                }

                buffer.resize(2048, 0);

                let waits = if is_ip6 { 3 } else { 5 };
                for i in 0..waits {
                    let res = tokio::time::timeout(
                        Duration::from_millis(500),
                        self.packet_io.recv_from(&mut buffer),
                    ).await;

                    match res {
                        Ok(Ok((bytes, peer))) => {
                            if peer != addr {
                                tracing::error!(?peer, ?addr, "got message from different source");
                                continue;
                            }

                            let mut reader = &buffer[..bytes];
                            match ControlFeed::read_from(&mut reader) {
                                Ok(ControlFeed::Response(msg)) => {
                                    if msg.request_id != 1 {
                                        tracing::error!(?msg, "got response with unexpected request_id");
                                        continue;
                                    }

                                    match msg.content {
                                        ControlResponse::Pong(pong) => {
                                            tracing::info!(?pong, "got initial pong from tunnel server");
                                            return Ok(ConnectedControl::new(addr, self.packet_io, pong));
                                        }
                                        other => {
                                            tracing::error!(?other, "expected pong got other response");
                                        }
                                    }
                                }
                                Ok(other) => {
                                    tracing::error!(?other, "unexpected control feed");
                                }
                                Err(error) => {
                                    tracing::error!(?error, test = ?(), "failed to parse response data");
                                }
                            }
                        }
                        Ok(Err(error)) => {
                            tracing::error!(?error, "failed to receive UDP packet");
                        }
                        Err(_) => {
                            tracing::warn!(%addr, "waited {}ms for pong", (i + 1) * 500);
                        }
                    }
                }

                tracing::error!("timeout waiting for pong");
            }

            tracing::error!("failed to ping tunnel server");
        }

        Err(SetupError::FailedToConnect)
    }
}