1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use crate::config::{Config, ServerAddr};
use futures::{stream::futures_unordered, Future, Stream};
use log::{error, info};
use std::{
io,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
};
use tokio_process::{Child, CommandExt};
mod obfs_proxy;
mod ss_plugin;
#[derive(Debug, Clone)]
pub struct PluginConfig {
pub plugin: String,
pub plugin_opt: Option<String>,
}
#[derive(Debug, Clone, Copy)]
pub enum PluginMode {
Server,
Client,
}
pub fn launch_plugins(
config: &mut Config,
mode: PluginMode,
) -> io::Result<Option<impl Future<Item = (), Error = io::Error>>> {
let mut plugins = Vec::new();
for svr in &mut config.server {
let mut svr_addr_opt = None;
if let Some(c) = svr.plugin() {
let loop_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let local_addr = SocketAddr::new(loop_ip, get_local_port()?);
let svr_addr = match start_plugin(c, svr.addr(), &local_addr, mode) {
Err(err) => {
error!("Failed to start plugin \"{}\", err: {}", c.plugin, err);
return Err(err);
}
Ok(process) => {
let svr_addr = ServerAddr::SocketAddr(local_addr);
plugins.push(process);
svr_addr
}
};
match mode {
PluginMode::Client => info!("Started plugin \"{}\" on {} <-> {}", c.plugin, local_addr, svr.addr()),
PluginMode::Server => info!("Started plugin \"{}\" on {} <-> {}", c.plugin, svr.addr(), local_addr),
}
svr_addr_opt = Some(svr_addr);
}
if let Some(svr_addr) = svr_addr_opt {
svr.set_plugin_addr(svr_addr);
}
}
if plugins.is_empty() {
Ok(None)
} else {
let plugins_future =
futures_unordered(plugins)
.into_future()
.then(|first_plugin_result| match first_plugin_result {
Ok((first_plugin_exit_status, _)) => {
let msg = format!("Plugin exited unexpectedly with {}", first_plugin_exit_status.unwrap());
Err(io::Error::new(io::ErrorKind::Other, msg))
}
Err((first_plugin_error, _)) => {
error!("Error while waiting for plugin subprocess: {}", first_plugin_error);
Err(first_plugin_error)
}
});
Ok(Some(plugins_future))
}
}
fn start_plugin(plugin: &PluginConfig, remote: &ServerAddr, local: &SocketAddr, mode: PluginMode) -> io::Result<Child> {
let mut cmd = if plugin.plugin == "obfsproxy" {
obfs_proxy::plugin_cmd(plugin, remote, local, mode)
} else {
ss_plugin::plugin_cmd(plugin, remote, local, mode)
};
cmd.spawn_async()
}
fn get_local_port() -> io::Result<u16> {
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0))?;
let addr = listener.local_addr()?;
Ok(addr.port())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn generate_random_port() {
let port = get_local_port().unwrap();
println!("{:?}", port);
}
}