use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use crate::stream::reader::ProxyReader;
use crate::stream::writer::ProxyWriter;
use crate::{ErrorKind, GetRequestOpts, Proxy, ProxyError, ProxyResult};
pub struct ProxyStream {
proxy: Proxy,
reader: Mutex<Option<ProxyReader>>,
writer: Mutex<Option<ProxyWriter>>,
}
impl ProxyStream {
pub fn new(proxy: Proxy) -> Self {
Self {
proxy,
reader: Mutex::new(None),
writer: Mutex::new(None),
}
}
pub async fn connect(&self, target_host: impl Into<String>, target_port: u16) -> ProxyResult<()> {
let stream = self.proxy.connect(target_host, target_port).await?;
let (rh, wh) = stream.into_split();
*self.reader.lock().await = Some(ProxyReader { read_stream: rh });
*self.writer.lock().await = Some(ProxyWriter { write_stream: wh });
Ok(())
}
pub async fn shutdown(&self) -> ProxyResult<()> {
if let Some(writer) = self.writer.lock().await.as_mut() {
writer.shutdown().await?;
}
*self.reader.lock().await = None;
*self.writer.lock().await = None;
Ok(())
}
pub async fn read(&self, buffer: impl Into<&mut [u8]>) -> ProxyResult<usize> {
if let Some(reader) = self.reader.lock().await.as_mut() {
Ok(reader.read(buffer).await?)
} else {
Err(ProxyError::new(ErrorKind::StreamError, "reader is not initialized"))
}
}
pub async fn write(&self, buffer: impl Into<&[u8]>) -> ProxyResult<()> {
if let Some(writer) = self.writer.lock().await.as_mut() {
Ok(writer.write(buffer).await?)
} else {
Err(ProxyError::new(ErrorKind::StreamError, "writer is not initialized"))
}
}
pub async fn get_request(&self, host: impl Into<String>, options: GetRequestOpts) -> ProxyResult<String> {
if let Some(writer) = self.writer.lock().await.as_mut() {
let req = format!("GET / HTTP/1.0\r\nHost: {}\r\n\r\n{}", host.into(), options.to_request());
writer.write(req.as_bytes()).await?;
} else {
return Err(ProxyError::new(ErrorKind::StreamError, "writer is not initialized"));
}
if let Some(reader) = self.reader.lock().await.as_mut() {
let mut resp = Vec::new();
reader.read_stream.read_to_end(&mut resp).await?;
Ok(String::from_utf8_lossy(&resp).to_string())
} else {
Err(ProxyError::new(ErrorKind::StreamError, "reader is not initialized"))
}
}
}
#[cfg(test)]
mod tests {
use crate::{GetRequestOpts, Proxy, ProxyResult, ProxyStream};
#[tokio::test]
async fn test_proxy_stream() -> ProxyResult<()> {
let proxy = Proxy::from("socks4://68.71.242.118:4145");
let stream = ProxyStream::new(proxy);
stream.connect("ipinfo.io", 80).await?;
let options = GetRequestOpts {
user_agent: Some(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36".to_string(),
),
..Default::default()
};
println!("{}", options.to_request());
let resp = stream.get_request("ipinfo.io", options).await?;
println!("Ответ: {}", resp);
Ok(())
}
}