atm0s_reverse_proxy_agent/
lib.rs1use std::sync::Arc;
2
3use protocol::cluster::{wait_object, AgentTunnelRequest};
4
5mod connection;
6mod local_tunnel;
7#[cfg(feature = "quic")]
8pub use connection::quic::{QuicConnection, QuicSubConnection};
9#[cfg(feature = "tcp")]
10pub use connection::{
11 tcp::{TcpConnection, TcpSubConnection},
12 tls::TlsConnection,
13};
14
15pub use connection::{Connection, Protocol, SubConnection};
16pub use local_tunnel::{registry::SimpleServiceRegistry, LocalTunnel, ServiceRegistry};
17use tokio::{io::copy_bidirectional, net::TcpStream};
18
19pub async fn run_tunnel_connection<S>(mut incoming_proxy_conn: S, registry: Arc<dyn ServiceRegistry>)
20where
21 S: SubConnection,
22{
23 log::info!("sub_connection pipe to local_tunnel start");
24
25 let local_tunnel = match wait_object::<_, AgentTunnelRequest, 1000>(&mut incoming_proxy_conn).await {
26 Ok(handshake) => {
27 log::info!("sub_connection pipe with handshake: tls: {}, {}/{:?} ", handshake.tls, handshake.domain, handshake.service);
28 if let Some(dest) = registry.dest_for(handshake.tls, handshake.service, &handshake.domain) {
29 log::info!("create tunnel to dest {}", dest);
30 TcpStream::connect(dest).await
31 } else {
32 log::warn!("dest for service {:?} tls {} domain {} not found", handshake.service, handshake.tls, handshake.domain);
33 return;
34 }
35 }
36 Err(e) => {
37 log::error!("read first pkt error: {}", e);
38 return;
39 }
40 };
41
42 let mut local_tunnel = match local_tunnel {
43 Ok(local_tunnel) => local_tunnel,
44 Err(e) => {
45 log::error!("create local_tunnel error: {}", e);
46 return;
47 }
48 };
49
50 log::info!("sub_connection pipe to local_tunnel start");
51
52 match copy_bidirectional(&mut incoming_proxy_conn, &mut local_tunnel).await {
53 Ok(res) => {
54 log::info!("sub_connection pipe to local_tunnel stop res {res:?}");
55 }
56 Err(e) => {
57 log::error!("sub_connection pipe to local_tunnel stop err {e:?}");
58 }
59 }
60}