atm0s_reverse_proxy_agent/connection/
tcp.rs1use std::net::ToSocketAddrs;
2
3use super::{Connection, SubConnection};
4use anyhow::anyhow;
5use futures::prelude::*;
6use protocol::key::AgentSigner;
7use serde::de::DeserializeOwned;
8use tokio::{
9 io::{AsyncReadExt, AsyncWriteExt},
10 net::TcpStream,
11};
12use tokio_yamux::{Session, StreamHandle};
13use url::Url;
14
15pub type TcpSubConnection = StreamHandle;
16
17impl SubConnection for StreamHandle {}
18
19pub struct TcpConnection<RES> {
20 response: RES,
21 session: Session<TcpStream>,
22}
23
24impl<RES: DeserializeOwned> TcpConnection<RES> {
25 pub async fn new<AS: AgentSigner<RES>>(url: Url, agent_signer: &AS) -> anyhow::Result<Self> {
26 let url_host = url.host_str().ok_or(anyhow!("couldn't get host from url"))?;
27 let url_port = url.port().unwrap_or(33333);
28 log::info!("connecting to server {}:{}", url_host, url_port);
29 let remote = (url_host, url_port).to_socket_addrs()?.next().ok_or(anyhow!("couldn't resolve to an address"))?;
30
31 let mut stream = TcpStream::connect(remote).await?;
32 stream.write_all(&agent_signer.sign_connect_req()).await?;
33
34 let mut buf = [0u8; 4096];
35 let buf_len = stream.read(&mut buf).await?;
36 let response: RES = agent_signer.validate_connect_res(&buf[..buf_len]).map_err(|e| anyhow!("{e}"))?;
37 Ok(Self {
38 session: Session::new_server(stream, Default::default()),
39 response,
40 })
41 }
42
43 pub fn response(&self) -> &RES {
44 &self.response
45 }
46}
47
48#[async_trait::async_trait]
49impl<RES: Send + Sync> Connection<TcpSubConnection> for TcpConnection<RES> {
50 async fn create_outgoing(&mut self) -> anyhow::Result<TcpSubConnection> {
51 let stream = self.session.open_stream()?;
52 Ok(stream)
53 }
54
55 async fn recv(&mut self) -> anyhow::Result<TcpSubConnection> {
56 let stream = self.session.next().await.ok_or(anyhow!("accept new connection error"))??;
57 Ok(stream)
58 }
59}