zeloxy 0.2.2

A library for creating lightweight, asynchronous, and lag-free proxy connections.
Documentation
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::Mutex;

use crate::stream::reader::ProxyReader;
use crate::stream::writer::ProxyWriter;
use crate::{ErrorKind, GetRequestOpts, Proxy, ProxyError, ProxyResult};

/// TCP-соединение с прокси
pub struct ProxyStream {
  proxy: Option<Proxy>,
  reader: Mutex<Option<ProxyReader>>,
  writer: Mutex<Option<ProxyWriter>>,
}

impl ProxyStream {
  /// Метод создания `ProxyStream` (данный метод не подключается к целевому
  /// серверу через прокси, для этого используется метод `ProxyStream::connect`)
  pub fn new(proxy: impl Into<Proxy>) -> Self {
    Self {
      proxy: Some(proxy.into()),
      reader: Mutex::new(None),
      writer: Mutex::new(None),
    }
  }

  /// Метод создания `ProxyStream` с ранее созданным `TcpStream`
  pub fn new_with_stream(stream: impl Into<TcpStream>) -> Self {
    let (rh, wh) = stream.into().into_split();

    Self {
      proxy: None,
      reader: Mutex::new(Some(ProxyReader { read_stream: rh })),
      writer: Mutex::new(Some(ProxyWriter { write_stream: wh })),
    }
  }

  /// Метод установки прокси
  pub fn set_proxy(&mut self, proxy: impl Into<Proxy>) {
    self.proxy = Some(proxy.into());
  }

  /// Метод подключения к целевому серверу через прокси
  pub async fn connect(&self, target_host: impl Into<String>, target_port: u16) -> ProxyResult<()> {
    if let Some(proxy) = &self.proxy {
      let stream = proxy.connect(target_host, target_port).await?;
      let (rh, wh) = stream.into_split();

      *self.reader.lock().await = Some(ProxyReader { read_stream: rh });
      *self.writer.lock().await = Some(ProxyWriter { write_stream: wh });

      Ok(())
    } else {
      Err(ProxyError::new(ErrorKind::InvalidData, "proxy not set"))
    }
  }

  /// Метод выключения текущего TCP-соединения
  pub async fn shutdown(&self) -> ProxyResult<()> {
    if let Some(writer) = self.writer.lock().await.as_mut() {
      writer.shutdown().await?;
    }

    *self.reader.lock().await = None;
    *self.writer.lock().await = None;

    Ok(())
  }

  /// Метод чтения данных из потока
  pub async fn read(&self, buffer: impl Into<&mut [u8]>) -> ProxyResult<usize> {
    if let Some(reader) = self.reader.lock().await.as_mut() {
      Ok(reader.read(buffer).await?)
    } else {
      Err(ProxyError::new(ErrorKind::StreamError, "reader is not initialized"))
    }
  }

  /// Метод записи данных в поток
  pub async fn write(&self, buffer: impl Into<&[u8]>) -> ProxyResult<()> {
    if let Some(writer) = self.writer.lock().await.as_mut() {
      Ok(writer.write(buffer).await?)
    } else {
      Err(ProxyError::new(ErrorKind::StreamError, "writer is not initialized"))
    }
  }

  /// Вспомогательный метод отправки команды GET-запроса
  pub async fn get_request(&self, host: impl Into<String>, options: GetRequestOpts) -> ProxyResult<String> {
    if let Some(writer) = self.writer.lock().await.as_mut() {
      let req = format!("GET / HTTP/1.0\r\nHost: {}\r\n\r\n{}", host.into(), options.to_request());
      writer.write(req.as_bytes()).await?;
    } else {
      return Err(ProxyError::new(ErrorKind::StreamError, "writer is not initialized"));
    }

    if let Some(reader) = self.reader.lock().await.as_mut() {
      let mut resp = Vec::new();
      reader.read_stream.read_to_end(&mut resp).await?;

      Ok(String::from_utf8_lossy(&resp).to_string())
    } else {
      Err(ProxyError::new(ErrorKind::StreamError, "reader is not initialized"))
    }
  }
}

impl From<TcpStream> for ProxyStream {
  fn from(value: TcpStream) -> Self {
    Self::new_with_stream(value)
  }
}