use kameo::actor::ActorRef;
use netstack::{
HasChannel,
netcore::{Channel, NetstackControl},
};
use tokio::task::JoinSet;
use ts_forwarder::{
DirectDialer, Forwarder, HostExitDialer, ProxyExitDialer, RealDialer, RouteTable,
};
use ts_packet::PacketMut;
use crate::{
Error,
dataplane::{OverlayFromDataplane, OverlayToDataplane},
env::Env,
};
pub struct ForwarderActor {
_joinset: JoinSet<()>,
channel: Channel,
}
fn spawn_forwarder<D: RealDialer>(
joinset: &mut JoinSet<()>,
channel: Channel,
routes: RouteTable,
dialer: D,
all_ports: bool,
tcp_ports: Vec<u16>,
udp_ports: Vec<u16>,
) {
let forwarder = match forwarder_mode(all_ports) {
ForwarderMode::AllPorts => Forwarder::all_ports(channel, routes, dialer),
ForwarderMode::Ports => Forwarder::new(channel, routes, dialer, tcp_ports, udp_ports),
};
joinset.spawn(async move {
if let Err(e) = forwarder.run().await {
tracing::error!(error = %e, "forwarder run loop exited");
}
});
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DialerChoice {
Direct,
HostExit,
Proxy,
}
fn dialer_choice(forward_exit_egress: bool, has_exit_proxy: bool) -> DialerChoice {
match (forward_exit_egress, has_exit_proxy) {
(false, _) => DialerChoice::Direct,
(true, true) => DialerChoice::Proxy,
(true, false) => DialerChoice::HostExit,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ForwarderMode {
AllPorts,
Ports,
}
fn forwarder_mode(forward_all_ports: bool) -> ForwarderMode {
if forward_all_ports {
ForwarderMode::AllPorts
} else {
ForwarderMode::Ports
}
}
impl kameo::Actor for ForwarderActor {
type Args = (
Env,
netstack::netcore::Config,
OverlayToDataplane,
OverlayFromDataplane,
);
type Error = Error;
async fn on_start(
(env, config, netstack_up, mut netstack_down): Self::Args,
_slf: ActorRef<Self>,
) -> Result<Self, Self::Error> {
let (
mut netstack,
netstack::WakingPipe {
rx: mut netstack_down_rx,
tx: netstack_down_tx,
},
) = netstack::piped(config);
let channel = netstack.command_channel();
let mut joinset = JoinSet::new();
joinset.spawn(async move {
netstack.run_tokio().await;
});
joinset.spawn(async move {
while let Some(buf) = netstack_down_rx.recv_async().await {
if netstack_up.send(vec![buf.to_vec().into()]).is_err() {
break;
}
}
tracing::warn!("forwarder netstack downlink shut down!");
});
joinset.spawn(async move {
while let Some(bufs) = netstack_down.recv().await {
for buf in bufs {
let buf: PacketMut = buf;
netstack_down_tx.send_async(buf.as_ref()).await;
}
}
tracing::warn!("forwarder netstack uplink shut down!");
});
if let Err(e) = channel.set_any_ip(true).await {
tracing::error!(error = %e, "enabling any-IP on forwarder netstack");
return Err(Error {
kind: crate::ErrorKind::ActorGone,
message_ty: None,
target_actor: None,
});
}
let routes = RouteTable::new(env.forward_routes.iter().copied());
let all_ports = env.forward_all_ports;
let tcp_ports = env.forward_tcp_ports.as_ref().clone();
let udp_ports = env.forward_udp_ports.as_ref().clone();
let n_routes = env.forward_routes.len();
let n_tcp_ports = tcp_ports.len();
let n_udp_ports = udp_ports.len();
let choice = dialer_choice(env.forward_exit_egress, env.exit_proxy.is_some());
match choice {
DialerChoice::Proxy => {
let proxy_config = env
.exit_proxy
.clone()
.expect("dialer_choice returned Proxy without an exit proxy configured");
spawn_forwarder(
&mut joinset,
channel.clone(),
routes,
ProxyExitDialer::new(proxy_config),
all_ports,
tcp_ports,
udp_ports,
);
}
DialerChoice::HostExit => spawn_forwarder(
&mut joinset,
channel.clone(),
routes,
HostExitDialer,
all_ports,
tcp_ports,
udp_ports,
),
DialerChoice::Direct => spawn_forwarder(
&mut joinset,
channel.clone(),
routes,
DirectDialer,
all_ports,
tcp_ports,
udp_ports,
),
}
tracing::debug!(
n_routes,
n_tcp_ports,
n_udp_ports,
all_ports,
exit_egress = env.forward_exit_egress,
exit_proxy = env.exit_proxy.is_some(),
dialer = ?choice,
"forwarder started"
);
Ok(Self {
_joinset: joinset,
channel,
})
}
}
#[kameo::messages]
impl ForwarderActor {
#[message]
pub fn get_channel(&self) -> (Channel,) {
(self.channel.clone(),)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::env::ForwarderConfig;
fn cfg(forward_all_ports: bool, forward_exit_egress: bool) -> ForwarderConfig {
ForwarderConfig {
accept_routes: false,
exit_node: None,
forward_routes: vec![],
forward_tcp_ports: vec![],
forward_udp_ports: vec![],
forward_all_ports,
forward_exit_egress,
block_incoming: false,
exit_proxy: None,
peerapi_port: None,
taildrop_dir: None,
enable_ipv6: false,
persistent_keepalive_interval: None,
ingress_active: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
fn proxy_cfg() -> ts_forwarder::ProxyConfig {
ts_forwarder::ProxyConfig {
addr: "203.0.113.7:1080".parse().unwrap(),
scheme: ts_forwarder::ProxyScheme::Socks5,
auth: None,
}
}
#[test]
fn host_exit_dialer_iff_forward_exit_egress() {
assert_eq!(
dialer_choice(cfg(false, false).forward_exit_egress, false),
DialerChoice::Direct
);
assert_eq!(
dialer_choice(cfg(false, true).forward_exit_egress, false),
DialerChoice::HostExit
);
assert_eq!(
dialer_choice(cfg(true, false).forward_exit_egress, false),
DialerChoice::Direct
);
assert_eq!(
dialer_choice(cfg(true, true).forward_exit_egress, false),
DialerChoice::HostExit
);
}
#[test]
fn proxy_dialer_iff_exit_egress_and_proxy_configured() {
assert_eq!(
dialer_choice(cfg(false, true).forward_exit_egress, true),
DialerChoice::Proxy
);
assert_eq!(
dialer_choice(cfg(false, false).forward_exit_egress, true),
DialerChoice::Direct
);
assert_eq!(
dialer_choice(cfg(false, true).forward_exit_egress, false),
DialerChoice::HostExit
);
}
#[test]
fn exit_proxy_converts_through_control_config() {
let control = ts_control::Config {
forward_exit_egress: true,
exit_proxy: Some(ts_control::ExitProxyConfig {
addr: "198.51.100.9:8080".parse().unwrap(),
scheme: ts_control::ExitProxyScheme::HttpConnect,
auth: Some(("user".to_owned(), "pass".to_owned())),
}),
..Default::default()
};
let fwd = ForwarderConfig::from_control_config(&control);
assert_eq!(
dialer_choice(fwd.forward_exit_egress, fwd.exit_proxy.is_some()),
DialerChoice::Proxy
);
let proxy = fwd.exit_proxy.expect("proxy threaded through");
assert_eq!(proxy.addr, "198.51.100.9:8080".parse().unwrap());
assert_eq!(proxy.scheme, ts_forwarder::ProxyScheme::HttpConnect);
assert_eq!(proxy.auth, Some(("user".to_owned(), "pass".to_owned())));
}
#[test]
fn exit_proxy_absent_when_unconfigured() {
let control = ts_control::Config::default();
let fwd = ForwarderConfig::from_control_config(&control);
assert!(fwd.exit_proxy.is_none());
let cfg = proxy_cfg();
assert_eq!(cfg.scheme, ts_forwarder::ProxyScheme::Socks5);
}
#[test]
fn all_ports_mode_iff_forward_all_ports() {
assert_eq!(
forwarder_mode(cfg(false, false).forward_all_ports),
ForwarderMode::Ports
);
assert_eq!(
forwarder_mode(cfg(true, false).forward_all_ports),
ForwarderMode::AllPorts
);
assert_eq!(
forwarder_mode(cfg(false, true).forward_all_ports),
ForwarderMode::Ports
);
assert_eq!(
forwarder_mode(cfg(true, true).forward_all_ports),
ForwarderMode::AllPorts
);
}
}