dynamic_tcp_proxy 0.1.0

Dynamic proxy implementation in Rust. This crate is designed to allow you to start a proxy that can be reconfigured at runtime.
Documentation
mod config;
mod proxy_handler;

use std::io::Error;
use std::sync::mpsc::{channel, Receiver as StdReceiver, Sender as StdSender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

use tokio::runtime::Runtime;
use tokio::sync::mpsc::{self, Sender};

use lazy_static::lazy_static;
use proxy_handler::create_proxy;

pub use config::ProxyConfig;
use tokio::task::JoinHandle as TokioJoinHandle;

lazy_static! {
    static ref TARGET_PORT: Arc<Mutex<Option<u16>>> = Arc::new(Mutex::new(Default::default()));
}

fn get_target_port() -> u16 {
    let read_guard = TARGET_PORT.lock().expect("Cannot lock target port mutex");
    read_guard.unwrap()
}

fn set_target_port(target_port: u16) {
    let mut write_guard = TARGET_PORT.lock().expect("Cannot lock target port mutex");
    *write_guard = Some(target_port);
}

pub struct DynamicProxy;

impl DynamicProxy {
    pub fn start(self) -> Result<(StdSender<ProxyConfig>, JoinHandle<()>), Error> {
        let (update_tx, update_rx): (StdSender<ProxyConfig>, StdReceiver<ProxyConfig>) = channel();

        let handle = thread::Builder::new()
            .name("dynamic_proxy".to_string())
            .spawn(move || self.initiate_update_observer(update_rx))?;
        Ok((update_tx, handle))
    }

    fn initiate_update_observer(self, update_rx: StdReceiver<ProxyConfig>) {
        let mut running_proxy_thread: Option<TokioJoinHandle<()>> = None;
        let mut proxy_kill_tx: Option<Sender<()>> = None;

        let runtime = Runtime::new().unwrap();
        while let Ok(config) = update_rx.recv() {
            if config.off() && running_proxy_thread.is_some() {
                let curr_shudown_tx = proxy_kill_tx.clone();
                runtime.block_on(async move {
                    let kill_tx =
                        curr_shudown_tx.expect("Tx for shutting down current sever not found");
                    let _ = kill_tx.send(()).await;
                });

                running_proxy_thread = None;
            } else if config.on() {
                let forward_port = config
                    .forward_port()
                    .expect("Listening port not set before starting server");
                set_target_port(forward_port);

                if running_proxy_thread.is_none() {
                    let listen_port = config
                        .listen_port()
                        .expect("Listening port not set before starting server");

                    let (new_proxy_kill_tx, new_proxy_kill_rx) = mpsc::channel::<()>(1);
                    let handle = create_proxy(&runtime, listen_port, new_proxy_kill_rx);
                    running_proxy_thread = Some(handle);
                    proxy_kill_tx = Some(new_proxy_kill_tx);
                }
            }
        }

        if let Some(join_handle) = running_proxy_thread {
            runtime.block_on(async move {
                let _ = proxy_kill_tx
                    .expect("kill tx should be present")
                    .send(())
                    .await;
                let _ = join_handle.await;
            });
        }
    }
}