ppaass-agent 0.1.35

The agent part of the ppaass application
Documentation
use std::str::FromStr;
use std::sync::Arc;
use std::{net::SocketAddr, time::Duration};

use ppaass_crypto::crypto::RsaCryptoFetcher;
use tokio::{net::TcpStream, time::timeout};

use tokio_io_timeout::TimeoutStream;
use tokio_tfo::TfoStream;
use tokio_util::codec::Framed;
use tracing::{debug, error};

use crate::error::AgentServerError;
use crate::{codec::PpaassProxyEdgeCodec, config::AgentServerConfig};

pub(crate) struct ProxyConnectionFactory<F>
where
    F: RsaCryptoFetcher,
{
    proxy_addresses: Vec<SocketAddr>,
    config: Arc<AgentServerConfig>,
    rsa_crypto_fetcher: Arc<F>,
}

impl<F> ProxyConnectionFactory<F>
where
    F: RsaCryptoFetcher + Send + Sync + 'static,
{
    pub(crate) fn new(
        config: Arc<AgentServerConfig>,
        rsa_crypto_fetcher: F,
    ) -> Result<Self, AgentServerError> {
        let proxy_addresses_configuration = config.proxy_addresses();
        let proxy_addresses: Vec<SocketAddr> = proxy_addresses_configuration
            .iter()
            .filter_map(|addr| SocketAddr::from_str(addr).ok())
            .collect::<Vec<SocketAddr>>();
        if proxy_addresses.is_empty() {
            error!("No available proxy address for runtime to use.");
            panic!("No available proxy address for runtime to use.")
        }
        Ok(Self {
            proxy_addresses,
            config,
            rsa_crypto_fetcher: Arc::new(rsa_crypto_fetcher),
        })
    }

    pub(crate) async fn create_proxy_connection(
        &self,
    ) -> Result<Framed<TimeoutStream<TfoStream>, PpaassProxyEdgeCodec<Arc<F>>>, AgentServerError>
    {
        debug!("Take proxy connection from pool.");
        let proxy_tcp_stream = match timeout(
            Duration::from_secs(self.config.connect_to_proxy_timeout()),
            TcpStream::connect(self.proxy_addresses.as_slice()),
        )
        .await
        {
            Err(_) => {
                error!("Fail connect to proxy because of timeout.");
                return Err(AgentServerError::Other(format!(
                    "Fail to create proxy connection because of timeout: {}",
                    self.config.connect_to_proxy_timeout()
                )));
            }
            Ok(Ok(proxy_tcp_stream)) => proxy_tcp_stream,
            Ok(Err(e)) => {
                error!("Fail connect to proxy because of error: {e:?}");
                return Err(AgentServerError::StdIo(e));
            }
        };
        debug!("Success connect to proxy.");
        proxy_tcp_stream.set_nodelay(true)?;
        proxy_tcp_stream.set_linger(None)?;
        let proxy_tcp_stream = TfoStream::from(proxy_tcp_stream);
        let mut proxy_tcp_stream = TimeoutStream::new(proxy_tcp_stream);
        proxy_tcp_stream.set_read_timeout(Some(Duration::from_secs(
            self.config.proxy_connection_read_timeout(),
        )));
        proxy_tcp_stream.set_write_timeout(Some(Duration::from_secs(
            self.config.proxy_connection_write_timeout(),
        )));
        let proxy_connection = Framed::with_capacity(
            proxy_tcp_stream,
            PpaassProxyEdgeCodec::new(self.config.compress(), self.rsa_crypto_fetcher.clone()),
            self.config.proxy_send_buffer_size(),
        );
        Ok(proxy_connection)
    }
}