atm0s_reverse_proxy_agent/connection/
quic.rs

1use anyhow::anyhow;
2use helper::configure_client;
3use protocol::{key::AgentSigner, stream::TunnelStream};
4use rustls::pki_types::CertificateDer;
5use serde::de::DeserializeOwned;
6use std::net::ToSocketAddrs;
7use url::Url;
8
9use quinn::{Endpoint, RecvStream, SendStream};
10
11mod helper;
12mod no_servername_verify;
13
14use super::Connection;
15
16pub type QuicSubConnection = TunnelStream<RecvStream, SendStream>;
17
18pub struct QuicConnection<RES> {
19    response: RES,
20    connection: quinn::Connection,
21}
22
23impl<RES: DeserializeOwned> QuicConnection<RES> {
24    pub async fn new<AS: AgentSigner<RES>>(url: Url, agent_signer: &AS, server_certs: &[CertificateDer<'static>], allow_quic_insecure: bool) -> anyhow::Result<Self> {
25        let url_host = url.host_str().ok_or(anyhow!("InvalidUrl"))?;
26        let url_port = url.port().unwrap_or(33333);
27        log::info!("connecting to server {}:{}", url_host, url_port);
28        let remote = (url_host, url_port).to_socket_addrs()?.next().ok_or(anyhow!("DnsError"))?;
29
30        let mut endpoint = Endpoint::client("0.0.0.0:0".parse().expect("Should parse local addr"))?;
31        endpoint.set_default_client_config(configure_client(server_certs, allow_quic_insecure)?);
32
33        // connect to server
34        let connection = endpoint.connect(remote, url_host)?.await?;
35
36        log::info!("connected to {}, open bi stream", url);
37        let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
38        log::info!("opened bi stream, send register request");
39
40        send_stream.write_all(&agent_signer.sign_connect_req()).await?;
41
42        let mut buf = [0u8; 4096];
43        let buf_len = recv_stream.read(&mut buf).await?.ok_or(anyhow!("NoData"))?;
44        let response: RES = agent_signer.validate_connect_res(&buf[..buf_len]).map_err(|e| anyhow!("validate connect rest error {e}"))?;
45        Ok(Self { connection, response })
46    }
47
48    pub fn response(&self) -> &RES {
49        &self.response
50    }
51}
52
53#[async_trait::async_trait]
54impl<RES: Send + Sync> Connection<QuicSubConnection> for QuicConnection<RES> {
55    async fn create_outgoing(&mut self) -> anyhow::Result<QuicSubConnection> {
56        let (send, recv) = self.connection.open_bi().await?;
57        Ok(QuicSubConnection::new(recv, send))
58    }
59
60    async fn recv(&mut self) -> anyhow::Result<QuicSubConnection> {
61        let (send, recv) = self.connection.accept_bi().await?;
62        Ok(QuicSubConnection::new(recv, send))
63    }
64}