use anyhow::Result;
use bytes::Bytes;
use rand::Rng;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use crate::buf as buf_pool;
use crate::relay::outbound::{self, OutboundContext};
use crate::relay::runtime::RelayRuntime;
pub async fn handle_stream<S>(
mut stream: S,
peer_addr: std::net::SocketAddr,
runtime: RelayRuntime,
) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut auth_id = [0u8; 16];
stream.read_exact(&mut auth_id).await?;
let upstream = {
let v = runtime.validator.read().await;
v.match_auth_id(&auth_id)
};
let upstream = match upstream {
Some(u) => u,
None => {
tracing::debug!("{} auth failed — draining and closing", peer_addr);
drain_and_close(stream).await;
return Ok(());
}
};
let initial_data = Bytes::copy_from_slice(&auth_id);
let outbound = outbound::from_transport(&upstream.transport);
let ctx = OutboundContext {
upstream,
initial_data,
peer: peer_addr,
runtime,
};
outbound.relay(Box::new(stream), ctx).await?;
Ok(())
}
async fn drain_and_close<S>(mut stream: S)
where
S: AsyncRead + AsyncWrite + Unpin,
{
let drain_len = rand::thread_rng().gen_range(64usize..512);
let mut buf = buf_pool::get(drain_len);
buf.resize(drain_len, 0);
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), stream.read(&mut buf)).await;
buf_pool::put(buf);
}