atm0s_reverse_proxy_agent/
lib.rs

1use std::sync::Arc;
2
3use protocol::cluster::{wait_object, AgentTunnelRequest};
4
5mod connection;
6mod local_tunnel;
7#[cfg(feature = "quic")]
8pub use connection::quic::{QuicConnection, QuicSubConnection};
9#[cfg(feature = "tcp")]
10pub use connection::{
11    tcp::{TcpConnection, TcpSubConnection},
12    tls::TlsConnection,
13};
14
15pub use connection::{Connection, Protocol, SubConnection};
16pub use local_tunnel::{registry::SimpleServiceRegistry, LocalTunnel, ServiceRegistry};
17use tokio::{io::copy_bidirectional, net::TcpStream};
18
19pub async fn run_tunnel_connection<S>(mut incoming_proxy_conn: S, registry: Arc<dyn ServiceRegistry>)
20where
21    S: SubConnection,
22{
23    log::info!("sub_connection pipe to local_tunnel start");
24
25    let local_tunnel = match wait_object::<_, AgentTunnelRequest, 1000>(&mut incoming_proxy_conn).await {
26        Ok(handshake) => {
27            log::info!("sub_connection pipe with handshake: tls: {}, {}/{:?} ", handshake.tls, handshake.domain, handshake.service);
28            if let Some(dest) = registry.dest_for(handshake.tls, handshake.service, &handshake.domain) {
29                log::info!("create tunnel to dest {}", dest);
30                TcpStream::connect(dest).await
31            } else {
32                log::warn!("dest for service {:?} tls {} domain {} not found", handshake.service, handshake.tls, handshake.domain);
33                return;
34            }
35        }
36        Err(e) => {
37            log::error!("read first pkt error: {}", e);
38            return;
39        }
40    };
41
42    let mut local_tunnel = match local_tunnel {
43        Ok(local_tunnel) => local_tunnel,
44        Err(e) => {
45            log::error!("create local_tunnel error: {}", e);
46            return;
47        }
48    };
49
50    log::info!("sub_connection pipe to local_tunnel start");
51
52    match copy_bidirectional(&mut incoming_proxy_conn, &mut local_tunnel).await {
53        Ok(res) => {
54            log::info!("sub_connection pipe to local_tunnel stop res {res:?}");
55        }
56        Err(e) => {
57            log::error!("sub_connection pipe to local_tunnel stop err {e:?}");
58        }
59    }
60}