atm0s_reverse_proxy_agent/connection/
tcp.rs

1use std::net::ToSocketAddrs;
2
3use super::{Connection, SubConnection};
4use anyhow::anyhow;
5use futures::prelude::*;
6use protocol::key::AgentSigner;
7use serde::de::DeserializeOwned;
8use tokio::{
9    io::{AsyncReadExt, AsyncWriteExt},
10    net::TcpStream,
11};
12use tokio_yamux::{Session, StreamHandle};
13use url::Url;
14
15pub type TcpSubConnection = StreamHandle;
16
17impl SubConnection for StreamHandle {}
18
19pub struct TcpConnection<RES> {
20    response: RES,
21    session: Session<TcpStream>,
22}
23
24impl<RES: DeserializeOwned> TcpConnection<RES> {
25    pub async fn new<AS: AgentSigner<RES>>(url: Url, agent_signer: &AS) -> anyhow::Result<Self> {
26        let url_host = url.host_str().ok_or(anyhow!("couldn't get host from url"))?;
27        let url_port = url.port().unwrap_or(33333);
28        log::info!("connecting to server {}:{}", url_host, url_port);
29        let remote = (url_host, url_port).to_socket_addrs()?.next().ok_or(anyhow!("couldn't resolve to an address"))?;
30
31        let mut stream = TcpStream::connect(remote).await?;
32        stream.write_all(&agent_signer.sign_connect_req()).await?;
33
34        let mut buf = [0u8; 4096];
35        let buf_len = stream.read(&mut buf).await?;
36        let response: RES = agent_signer.validate_connect_res(&buf[..buf_len]).map_err(|e| anyhow!("{e}"))?;
37        Ok(Self {
38            session: Session::new_server(stream, Default::default()),
39            response,
40        })
41    }
42
43    pub fn response(&self) -> &RES {
44        &self.response
45    }
46}
47
48#[async_trait::async_trait]
49impl<RES: Send + Sync> Connection<TcpSubConnection> for TcpConnection<RES> {
50    async fn create_outgoing(&mut self) -> anyhow::Result<TcpSubConnection> {
51        let stream = self.session.open_stream()?;
52        Ok(stream)
53    }
54
55    async fn recv(&mut self) -> anyhow::Result<TcpSubConnection> {
56        let stream = self.session.next().await.ok_or(anyhow!("accept new connection error"))??;
57        Ok(stream)
58    }
59}