atm0s_reverse_proxy_agent/connection/
quic.rs1use anyhow::anyhow;
2use helper::configure_client;
3use protocol::{key::AgentSigner, stream::TunnelStream};
4use rustls::pki_types::CertificateDer;
5use serde::de::DeserializeOwned;
6use std::net::ToSocketAddrs;
7use url::Url;
8
9use quinn::{Endpoint, RecvStream, SendStream};
10
11mod helper;
12mod no_servername_verify;
13
14use super::Connection;
15
16pub type QuicSubConnection = TunnelStream<RecvStream, SendStream>;
17
18pub struct QuicConnection<RES> {
19 response: RES,
20 connection: quinn::Connection,
21}
22
23impl<RES: DeserializeOwned> QuicConnection<RES> {
24 pub async fn new<AS: AgentSigner<RES>>(url: Url, agent_signer: &AS, server_certs: &[CertificateDer<'static>], allow_quic_insecure: bool) -> anyhow::Result<Self> {
25 let url_host = url.host_str().ok_or(anyhow!("InvalidUrl"))?;
26 let url_port = url.port().unwrap_or(33333);
27 log::info!("connecting to server {}:{}", url_host, url_port);
28 let remote = (url_host, url_port).to_socket_addrs()?.next().ok_or(anyhow!("DnsError"))?;
29
30 let mut endpoint = Endpoint::client("0.0.0.0:0".parse().expect("Should parse local addr"))?;
31 endpoint.set_default_client_config(configure_client(server_certs, allow_quic_insecure)?);
32
33 let connection = endpoint.connect(remote, url_host)?.await?;
35
36 log::info!("connected to {}, open bi stream", url);
37 let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
38 log::info!("opened bi stream, send register request");
39
40 send_stream.write_all(&agent_signer.sign_connect_req()).await?;
41
42 let mut buf = [0u8; 4096];
43 let buf_len = recv_stream.read(&mut buf).await?.ok_or(anyhow!("NoData"))?;
44 let response: RES = agent_signer.validate_connect_res(&buf[..buf_len]).map_err(|e| anyhow!("validate connect rest error {e}"))?;
45 Ok(Self { connection, response })
46 }
47
48 pub fn response(&self) -> &RES {
49 &self.response
50 }
51}
52
53#[async_trait::async_trait]
54impl<RES: Send + Sync> Connection<QuicSubConnection> for QuicConnection<RES> {
55 async fn create_outgoing(&mut self) -> anyhow::Result<QuicSubConnection> {
56 let (send, recv) = self.connection.open_bi().await?;
57 Ok(QuicSubConnection::new(recv, send))
58 }
59
60 async fn recv(&mut self) -> anyhow::Result<QuicSubConnection> {
61 let (send, recv) = self.connection.accept_bi().await?;
62 Ok(QuicSubConnection::new(recv, send))
63 }
64}