port-plumber 0.1.0

Utility bind ports with initialization commands
use std::fmt::Display;
use std::fs;
use std::future::Future;
use std::ops::Add;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use clap::Parser;
use futures::future::Either;
use futures::lock::Mutex;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};

use crate::args::PortPlumberArgs;
use crate::cmd_resource::CmdResource;
use crate::config::{PlumbingItemConfig, PortPlumberConfig};
use crate::connections_counter::ConnectionCounter;

mod config;
mod utils;
mod runner;
mod cmd_resource;
mod args;
mod connections_counter;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
    env_logger::init();
    let args: PortPlumberArgs = PortPlumberArgs::parse();

    let config_file_path = if let Some(path) = args.config {
        path
    } else {
        config_from_user_dir()?
    };

    let config_content = fs::read_to_string(config_file_path)?;
    let config: PortPlumberConfig = toml::from_str(&config_content)?;

    futures::future::try_join_all(config.plumbing.into_iter().map(|(addr, conf)| listen_address(addr, conf))).await?;
    Ok(())
}

fn config_from_user_dir() -> anyhow::Result<PathBuf> {
    let Some(config_base_path) = dirs::config_dir() else {
        anyhow::bail!("Could not find os config dir")
    };
    let config_file_path = config_base_path.join("portplumber/config.toml");
    if !config_file_path.is_file() {
        anyhow::bail!("No config file was foud at {config_file_path:?}");
    } else {
        Ok(config_file_path)
    }
}

async fn listen_address(addr: impl ToSocketAddrs + Display, conf: PlumbingItemConfig) -> anyhow::Result<()> {
    log::info!("Starting listener for address {addr}");
    let listener = TcpListener::bind(addr).await?;

    let counter = Arc::new(Mutex::new(ConnectionCounter::new()));
    let mut resource = CmdResource::try_from(conf.resource.as_ref())?;

    loop {
        let Some((stream, _)) = timeout(Duration::from_secs(30), listener.accept()).await? else {
            if let Some(ts) = counter.lock().await.no_connections_since() {
                if ts.add(Duration::from_secs(600)) < SystemTime::now() {
                    resource.ensure_stopped()?;
                }
            }
            continue;
        };
        {
            let mut counter_guard = counter.lock().await;
            counter_guard.add_connection();
        }
        resource.ensure_running().await?;
        let clouned_counter_mtx = counter.clone();
        tokio::spawn(async move {
            let res = redirect_stream(stream, conf.target).await;
            if let Err(err) = res {
                log::error!("Error processing stream - {err}");
            }
            let mut counter_guard = clouned_counter_mtx.lock().await;
            counter_guard.rem_connection();
        });
    }
}

async fn timeout<F, O, E>(duration: Duration, future: F) -> Result<Option<O>, E>
    where
        F: Future<Output=Result<O, E>>,
{
    tokio::time::timeout(duration, future).await.ok().transpose()
}

async fn redirect_stream(incoming: TcpStream, addr: impl ToSocketAddrs) -> anyhow::Result<()> {
    let outgoing = TcpStream::connect(addr).await?;
    let (mut in_reader, mut in_writer) = incoming.into_split();
    let (mut out_reader, mut out_writer) = outgoing.into_split();
    futures::future::try_select(
    Box::pin(tokio::io::copy(&mut in_reader, &mut out_writer)),
    Box::pin(tokio::io::copy(&mut out_reader, &mut in_writer)),
    )
        .await
        .map_err(|e| match e {
            Either::Left((err, _fut)) => err,
            Either::Right((err, _fut)) => err,
        })?;
    Ok(())
}