1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//! Plugin (SIP003)
//!
//! ```plain
//! +------------+                    +---------------------------+
//! |  SS Client +-- Local Loopback --+  Plugin Client (Tunnel)   +--+
//! +------------+                    +---------------------------+  |
//!                                                                  |
//!             Public Internet (Obfuscated/Transformed traffic) ==> |
//!                                                                  |
//! +------------+                    +---------------------------+  |
//! |  SS Server +-- Local Loopback --+  Plugin Server (Tunnel)   +--+
//! +------------+                    +---------------------------+
//! ```

use crate::config::{Config, ServerAddr};
use futures::{stream::futures_unordered, Future, Stream};
use log::{error, info};
use std::{
    io,
    net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
};
use tokio_process::{Child, CommandExt};

mod obfs_proxy;
mod ss_plugin;

/// Config for plugin
#[derive(Debug, Clone)]
pub struct PluginConfig {
    pub plugin: String,
    pub plugin_opt: Option<String>,
}

/// Mode of Plugin
#[derive(Debug, Clone, Copy)]
pub enum PluginMode {
    Server,
    Client,
}

/// Launch plugins in config. Returns a future that completes when any plugin terminates
/// or there were an error in watching the subprocess. Returns `None` if no plugins
/// were launched.
pub fn launch_plugins(
    config: &mut Config,
    mode: PluginMode,
) -> io::Result<Option<impl Future<Item = (), Error = io::Error>>> {
    let mut plugins = Vec::new();

    for svr in &mut config.server {
        let mut svr_addr_opt = None;

        if let Some(c) = svr.plugin() {
            let loop_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
            let local_addr = SocketAddr::new(loop_ip, get_local_port()?);

            let svr_addr = match start_plugin(c, svr.addr(), &local_addr, mode) {
                Err(err) => {
                    error!("Failed to start plugin \"{}\", err: {}", c.plugin, err);
                    return Err(err);
                }
                Ok(process) => {
                    let svr_addr = ServerAddr::SocketAddr(local_addr);
                    plugins.push(process);

                    // Replace addr with plugin
                    svr_addr
                }
            };

            match mode {
                PluginMode::Client => info!("Started plugin \"{}\" on {} <-> {}", c.plugin, local_addr, svr.addr()),
                PluginMode::Server => info!("Started plugin \"{}\" on {} <-> {}", c.plugin, svr.addr(), local_addr),
            }

            svr_addr_opt = Some(svr_addr); // Fuck borrow checker
        }

        if let Some(svr_addr) = svr_addr_opt {
            svr.set_plugin_addr(svr_addr);
        }
    }

    if plugins.is_empty() {
        Ok(None)
    } else {
        // Turn the vector of `Child` futures into a single future that
        // completes with an error if any of them exits or waiting for it
        // fails. When this future completes, the remaining `Child`ren will be
        // dropped and as a result the rest of the plugins will be killed
        // automatically.
        let plugins_future =
            futures_unordered(plugins)
                .into_future()
                .then(|first_plugin_result| match first_plugin_result {
                    Ok((first_plugin_exit_status, _)) => {
                        let msg = format!("Plugin exited unexpectedly with {}", first_plugin_exit_status.unwrap());
                        Err(io::Error::new(io::ErrorKind::Other, msg))
                    }
                    Err((first_plugin_error, _)) => {
                        error!("Error while waiting for plugin subprocess: {}", first_plugin_error);
                        Err(first_plugin_error)
                    }
                });
        Ok(Some(plugins_future))
    }
}

fn start_plugin(plugin: &PluginConfig, remote: &ServerAddr, local: &SocketAddr, mode: PluginMode) -> io::Result<Child> {
    let mut cmd = if plugin.plugin == "obfsproxy" {
        obfs_proxy::plugin_cmd(plugin, remote, local, mode)
    } else {
        ss_plugin::plugin_cmd(plugin, remote, local, mode)
    };
    cmd.spawn_async()
}

fn get_local_port() -> io::Result<u16> {
    let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0))?;
    let addr = listener.local_addr()?;
    Ok(addr.port())
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn generate_random_port() {
        let port = get_local_port().unwrap();
        println!("{:?}", port);
    }
}