use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{self, Read, Seek, SeekFrom, Write as IoWrite};
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::process::Command as SysCmd;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Ipv4Net {
pub addr: Ipv4Addr,
pub prefix_len: u8,
}
impl Ipv4Net {
pub fn from_cidr(s: &str) -> Result<Self, String> {
let (addr_s, len_s) = s
.split_once('/')
.ok_or_else(|| format!("invalid CIDR '{}': expected ADDR/LEN", s))?;
let addr: Ipv4Addr = addr_s
.parse()
.map_err(|e| format!("invalid IP in '{}': {}", s, e))?;
let prefix_len: u8 = len_s
.parse()
.map_err(|e| format!("invalid prefix in '{}': {}", s, e))?;
if prefix_len > 32 {
return Err(format!("prefix length {} > 32", prefix_len));
}
Ok(Ipv4Net { addr, prefix_len })
}
fn mask(&self) -> u32 {
if self.prefix_len == 0 {
0
} else {
!0u32 << (32 - self.prefix_len)
}
}
pub fn network(&self) -> Ipv4Addr {
Ipv4Addr::from(u32::from(self.addr) & self.mask())
}
pub fn broadcast(&self) -> Ipv4Addr {
Ipv4Addr::from(u32::from(self.addr) | !self.mask())
}
pub fn gateway(&self) -> Ipv4Addr {
Ipv4Addr::from(u32::from(self.network()) + 1)
}
pub fn host_min(&self) -> Ipv4Addr {
Ipv4Addr::from(u32::from(self.network()) + 2)
}
pub fn host_max(&self) -> Ipv4Addr {
Ipv4Addr::from(u32::from(self.broadcast()) - 1)
}
pub fn contains(&self, ip: Ipv4Addr) -> bool {
(u32::from(ip) & self.mask()) == (u32::from(self.addr) & self.mask())
}
pub fn overlaps(&self, other: &Ipv4Net) -> bool {
let smaller_prefix = self.prefix_len.min(other.prefix_len);
let mask = if smaller_prefix == 0 {
0
} else {
!0u32 << (32 - smaller_prefix)
};
(u32::from(self.addr) & mask) == (u32::from(other.addr) & mask)
}
pub fn cidr_string(&self) -> String {
format!("{}/{}", self.network(), self.prefix_len)
}
pub fn gateway_cidr(&self) -> String {
format!("{}/{}", self.gateway(), self.prefix_len)
}
}
impl std::fmt::Display for Ipv4Net {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.addr, self.prefix_len)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkDef {
pub name: String,
pub subnet: Ipv4Net,
pub gateway: Ipv4Addr,
pub bridge_name: String,
}
impl NetworkDef {
pub fn load(name: &str) -> io::Result<Self> {
let path = crate::paths::network_config_dir(name).join("config.json");
let data = std::fs::read_to_string(&path).map_err(|e| {
io::Error::other(format!(
"network '{}' not found ({}): {}",
name,
path.display(),
e
))
})?;
serde_json::from_str(&data).map_err(|e| io::Error::other(e.to_string()))
}
pub fn save(&self) -> io::Result<()> {
let dir = crate::paths::network_config_dir(&self.name);
std::fs::create_dir_all(&dir)?;
let json =
serde_json::to_string_pretty(self).map_err(|e| io::Error::other(e.to_string()))?;
std::fs::write(dir.join("config.json"), json)
}
pub fn nft_table_name(&self) -> String {
format!("pelagos-{}", self.name)
}
}
pub fn bootstrap_default_network() -> io::Result<NetworkDef> {
let config_path = crate::paths::network_config_dir("pelagos0").join("config.json");
if config_path.exists() {
return NetworkDef::load("pelagos0");
}
let net = NetworkDef {
name: "pelagos0".to_string(),
subnet: Ipv4Net {
addr: Ipv4Addr::new(172, 19, 0, 0),
prefix_len: 24,
},
gateway: Ipv4Addr::new(172, 19, 0, 1),
bridge_name: "pelagos0".to_string(),
};
net.save()?;
let rt = crate::paths::runtime_dir();
let net_rt = crate::paths::network_runtime_dir("pelagos0");
let old_ipam = rt.join("next_ip");
let new_ipam = net_rt.join("next_ip");
if old_ipam.exists() && !new_ipam.exists() {
std::fs::create_dir_all(&net_rt)?;
if let Err(e) = std::fs::rename(&old_ipam, &new_ipam) {
log::warn!(
"migrate {} → {}: {}",
old_ipam.display(),
new_ipam.display(),
e
);
}
}
let _ = std::fs::remove_file(rt.join("nat_refcount"));
let _ = std::fs::remove_file(rt.join("port_forwards"));
Ok(net)
}
pub fn load_network_def(name: &str) -> io::Result<NetworkDef> {
if name == "pelagos0" {
bootstrap_default_network()
} else {
NetworkDef::load(name)
}
}
pub fn ensure_network(name: &str) -> io::Result<()> {
let config = crate::paths::network_config_dir(name).join("config.json");
if config.exists() {
return Ok(());
}
let networks_dir = crate::paths::networks_config_dir();
let mut used: Vec<Ipv4Net> = Vec::new();
if networks_dir.exists() {
if let Ok(entries) = std::fs::read_dir(&networks_dir) {
for entry in entries.flatten() {
let cfg = entry.path().join("config.json");
if let Ok(data) = std::fs::read_to_string(&cfg) {
if let Ok(existing) = serde_json::from_str::<NetworkDef>(&data) {
used.push(existing.subnet);
}
}
}
}
}
for octet in 0u8..=255 {
let cidr = format!("10.99.{}.0/24", octet);
let subnet = Ipv4Net::from_cidr(&cidr)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
if used.iter().any(|u| u.overlaps(&subnet)) {
continue;
}
let bridge_name = if name == "pelagos0" {
"pelagos0".to_string()
} else {
format!("rm-{}", name)
};
let net = NetworkDef {
name: name.to_string(),
gateway: subnet.gateway(),
bridge_name,
subnet,
};
net.save()?;
log::info!("ensure_network: created '{}' ({})", name, cidr);
return Ok(());
}
Err(io::Error::other(format!(
"ensure_network: all subnets in 10.99.x.0/24 exhausted for '{}'",
name
)))
}
pub const BRIDGE_NAME: &str = "pelagos0";
pub const BRIDGE_GW: &str = "172.19.0.1";
static NS_COUNTER: AtomicU32 = AtomicU32::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PortProto {
Tcp,
Udp,
Both,
}
impl PortProto {
fn as_str(self) -> &'static str {
match self {
PortProto::Tcp => "tcp",
PortProto::Udp => "udp",
PortProto::Both => "both",
}
}
pub fn parse(s: &str) -> Self {
match s.trim() {
"udp" => PortProto::Udp,
"both" => PortProto::Both,
_ => PortProto::Tcp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NetworkMode {
None,
Loopback,
Bridge,
BridgeNamed(String),
Pasta,
}
impl NetworkMode {
pub fn is_bridge(&self) -> bool {
matches!(self, NetworkMode::Bridge | NetworkMode::BridgeNamed(_))
}
pub fn bridge_network_name(&self) -> Option<&str> {
match self {
NetworkMode::Bridge => Some("pelagos0"),
NetworkMode::BridgeNamed(name) => Some(name),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct NetworkConfig {
pub mode: NetworkMode,
}
pub struct NetworkSetup {
pub veth_host: String,
pub ns_name: String,
pub container_ip: Ipv4Addr,
pub nat_enabled: bool,
pub port_forwards: Vec<(u16, u16, PortProto)>,
proxy_tcp_runtime: Option<tokio::runtime::Runtime>,
proxy_udp_stop: Option<Arc<AtomicBool>>,
proxy_udp_threads: Vec<std::thread::JoinHandle<()>>,
pub network_name: String,
}
impl std::fmt::Debug for NetworkSetup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetworkSetup")
.field("veth_host", &self.veth_host)
.field("ns_name", &self.ns_name)
.field("container_ip", &self.container_ip)
.field("nat_enabled", &self.nat_enabled)
.field("port_forwards", &self.port_forwards)
.field("proxy_tcp_active", &self.proxy_tcp_runtime.is_some())
.field("proxy_udp_active", &self.proxy_udp_stop.is_some())
.field("proxy_udp_threads", &self.proxy_udp_threads.len())
.field("network_name", &self.network_name)
.finish()
}
}
pub struct PastaSetup {
process: std::process::Child,
output_thread: Option<std::thread::JoinHandle<String>>,
ns_bind_mount: Option<std::path::PathBuf>,
}
pub fn generate_ns_name() -> String {
let pid = unsafe { libc::getpid() };
let n = NS_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("rem-{}-{}", pid, n)
}
pub fn bring_up_loopback() -> io::Result<()> {
#[repr(C)]
struct Ifreq {
ifr_name: [u8; 16],
ifr_flags: libc::c_short,
_pad: [u8; 22],
}
let mut req = Ifreq {
ifr_name: [0u8; 16],
ifr_flags: 0,
_pad: [0u8; 22],
};
req.ifr_name[0] = b'l';
req.ifr_name[1] = b'o';
unsafe {
let sock = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
if sock < 0 {
return Err(io::Error::last_os_error());
}
let ret = libc::ioctl(sock, libc::SIOCGIFFLAGS as _, &mut req as *mut Ifreq);
if ret < 0 {
let e = io::Error::last_os_error();
libc::close(sock);
return Err(e);
}
req.ifr_flags |= libc::IFF_UP as libc::c_short;
let ret = libc::ioctl(sock, libc::SIOCSIFFLAGS as _, &mut req as *mut Ifreq);
libc::close(sock);
if ret < 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
fn ensure_bridge(net: &NetworkDef) -> io::Result<()> {
let _ = SysCmd::new("ip")
.args(["link", "add", &net.bridge_name, "type", "bridge"])
.stderr(std::process::Stdio::null())
.status();
let gw_cidr = net.subnet.gateway_cidr();
let _ = SysCmd::new("ip")
.args(["addr", "add", &gw_cidr, "dev", &net.bridge_name])
.stderr(std::process::Stdio::null())
.status();
run("ip", &["link", "set", &net.bridge_name, "up"])
}
fn allocate_ip(net: &NetworkDef) -> io::Result<Ipv4Addr> {
let ipam_path = crate::paths::network_ipam_file(&net.name);
if let Some(parent) = ipam_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&ipam_path)?;
unsafe {
libc::flock(file.as_raw_fd(), libc::LOCK_EX);
}
let mut content = String::new();
file.read_to_string(&mut content)?;
let host_min = u32::from(net.subnet.host_min());
let host_max = u32::from(net.subnet.host_max());
let current: Ipv4Addr = content.trim().parse().unwrap_or(net.subnet.host_min());
let current_u32 = u32::from(current);
let ip_u32 = if current_u32 < host_min || current_u32 > host_max {
host_min
} else {
current_u32
};
let ip = Ipv4Addr::from(ip_u32);
let next_u32 = ip_u32 + 1;
let next = if next_u32 > host_max {
Ipv4Addr::from(host_min)
} else {
Ipv4Addr::from(next_u32)
};
file.seek(SeekFrom::Start(0))?;
file.set_len(0)?;
write!(file, "{}", next)?;
Ok(ip)
}
fn fnv1a(input: &[u8]) -> u32 {
let mut hash: u32 = 0x811c9dc5;
for &b in input {
hash ^= b as u32;
hash = hash.wrapping_mul(0x01000193);
}
hash
}
fn veth_names_for(ns_name: &str) -> (String, String) {
let hash = fnv1a(ns_name.as_bytes());
(format!("vh-{:08x}", hash), format!("vp-{:08x}", hash))
}
fn veth_names_for_network(ns_name: &str, network_name: &str) -> (String, String) {
let input = format!("{}:{}", ns_name, network_name);
let hash = fnv1a(input.as_bytes());
(format!("vh-{:08x}", hash), format!("vp-{:08x}", hash))
}
pub fn setup_bridge_network(
ns_name: &str,
network_name: &str,
nat: bool,
port_forwards: Vec<(u16, u16, PortProto)>,
) -> io::Result<NetworkSetup> {
let net_def = load_network_def(network_name)?;
ensure_bridge(&net_def)?;
let container_ip = allocate_ip(&net_def)?;
let (veth_host, veth_peer) = veth_names_for(ns_name);
run("ip", &["netns", "add", ns_name])?;
run("ip", &["-n", ns_name, "link", "set", "lo", "up"])?;
run(
"ip",
&[
"link", "add", &veth_host, "type", "veth", "peer", "name", &veth_peer,
],
)?;
run("ip", &["link", "set", &veth_peer, "netns", ns_name])?;
let ip_cidr = format!("{}/{}", container_ip, net_def.subnet.prefix_len);
let gw_str = net_def.gateway.to_string();
run(
"ip",
&["-n", ns_name, "link", "set", &veth_peer, "name", "eth0"],
)?;
run(
"ip",
&["-n", ns_name, "addr", "add", &ip_cidr, "dev", "eth0"],
)?;
run("ip", &["-n", ns_name, "link", "set", "eth0", "up"])?;
run(
"ip",
&["-n", ns_name, "route", "add", "default", "via", &gw_str],
)?;
run(
"ip",
&["link", "set", &veth_host, "master", &net_def.bridge_name],
)?;
run("ip", &["link", "set", &veth_host, "up"])?;
if nat {
enable_nat(ns_name, &net_def)?;
}
if !port_forwards.is_empty() {
enable_port_forwards(ns_name, &net_def, container_ip, &port_forwards)?;
}
let (proxy_tcp_runtime, proxy_udp_stop, proxy_udp_threads) = if !port_forwards.is_empty() {
start_port_proxies(container_ip, &port_forwards)
} else {
(None, None, Vec::new())
};
Ok(NetworkSetup {
veth_host,
ns_name: ns_name.to_string(),
container_ip,
nat_enabled: nat,
port_forwards,
proxy_tcp_runtime,
proxy_udp_stop,
proxy_udp_threads,
network_name: network_name.to_string(),
})
}
pub fn teardown_network(mut setup: NetworkSetup) {
if let Some(rt) = setup.proxy_tcp_runtime.take() {
rt.shutdown_background();
}
if let Some(ref stop) = setup.proxy_udp_stop {
stop.store(true, Ordering::Relaxed);
}
for handle in setup.proxy_udp_threads.drain(..) {
let _ = handle.join();
}
if let Err(e) = run("ip", &["link", "del", &setup.veth_host]) {
log::warn!("network teardown veth (non-fatal): {}", e);
}
if let Err(e) = run("ip", &["netns", "del", &setup.ns_name]) {
log::warn!("network teardown netns (non-fatal): {}", e);
}
let net_def = match load_network_def(&setup.network_name) {
Ok(n) => n,
Err(e) => {
log::warn!(
"network teardown: cannot load network '{}': {}",
setup.network_name,
e
);
return;
}
};
if !setup.port_forwards.is_empty() {
disable_port_forwards(&setup.ns_name, &net_def);
}
if setup.nat_enabled {
disable_nat(&setup.ns_name, &net_def);
}
}
pub fn attach_network_to_netns(
ns_name: &str,
network_name: &str,
iface_name: &str,
) -> io::Result<NetworkSetup> {
let net_def = load_network_def(network_name)?;
ensure_bridge(&net_def)?;
let container_ip = allocate_ip(&net_def)?;
let (veth_host, veth_peer) = veth_names_for_network(ns_name, network_name);
run(
"ip",
&[
"link", "add", &veth_host, "type", "veth", "peer", "name", &veth_peer,
],
)?;
run("ip", &["link", "set", &veth_peer, "netns", ns_name])?;
let ip_cidr = format!("{}/{}", container_ip, net_def.subnet.prefix_len);
run(
"ip",
&["-n", ns_name, "link", "set", &veth_peer, "name", iface_name],
)?;
run(
"ip",
&["-n", ns_name, "addr", "add", &ip_cidr, "dev", iface_name],
)?;
run("ip", &["-n", ns_name, "link", "set", iface_name, "up"])?;
run(
"ip",
&["link", "set", &veth_host, "master", &net_def.bridge_name],
)?;
run("ip", &["link", "set", &veth_host, "up"])?;
Ok(NetworkSetup {
veth_host,
ns_name: ns_name.to_string(),
container_ip,
nat_enabled: false,
port_forwards: Vec::new(),
proxy_tcp_runtime: None,
proxy_udp_stop: None,
proxy_udp_threads: Vec::new(),
network_name: network_name.to_string(),
})
}
pub fn teardown_secondary_network(setup: &NetworkSetup) {
if let Err(e) = run("ip", &["link", "del", &setup.veth_host]) {
log::warn!("secondary network teardown veth (non-fatal): {}", e);
}
}
fn build_nat_script(net: &NetworkDef) -> String {
let table = net.nft_table_name();
let cidr = net.subnet.cidr_string();
let bridge = &net.bridge_name;
format!(
"add table ip {table}\n\
add chain ip {table} postrouting {{ type nat hook postrouting priority 100; }}\n\
add rule ip {table} postrouting ip saddr {cidr} oifname != \"{bridge}\" masquerade\n\
add chain ip {table} forward {{ type filter hook forward priority 0; }}\n\
add rule ip {table} forward ip saddr {cidr} accept\n\
add rule ip {table} forward ip daddr {cidr} accept\n"
)
}
fn run_nft(script: &str) -> io::Result<()> {
use std::io::Write as IoWriteLocal;
use std::process::Stdio as ProcStdio;
let mut child = SysCmd::new("nft")
.arg("-f")
.arg("-")
.stdin(ProcStdio::piped())
.stdout(ProcStdio::null())
.stderr(ProcStdio::inherit())
.spawn()?;
child.stdin.as_mut().unwrap().write_all(script.as_bytes())?;
let status = child.wait()?;
if status.success() {
Ok(())
} else {
Err(io::Error::other(format!("nft -f - exited with {}", status)))
}
}
fn run_nft_quiet(script: &str) -> io::Result<()> {
use std::io::Write as IoWriteLocal;
use std::process::Stdio as ProcStdio;
let mut child = SysCmd::new("nft")
.arg("-f")
.arg("-")
.stdin(ProcStdio::piped())
.stdout(ProcStdio::null())
.stderr(ProcStdio::null())
.spawn()?;
child.stdin.as_mut().unwrap().write_all(script.as_bytes())?;
let status = child.wait()?;
if status.success() {
Ok(())
} else {
Err(io::Error::other(format!("nft -f - exited with {}", status)))
}
}
fn netns_exists(ns_name: &str) -> bool {
std::path::Path::new(&format!("/run/netns/{}", ns_name)).exists()
}
fn enable_nat(ns_name: &str, net: &NetworkDef) -> io::Result<()> {
let active_path = crate::paths::network_nat_refcount_file(&net.name);
if let Some(parent) = active_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&active_path)?;
unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
let mut content = String::new();
file.read_to_string(&mut content)?;
let mut active: Vec<String> = content
.lines()
.filter(|l| !l.trim().is_empty())
.filter(|name| netns_exists(name))
.map(|s| s.to_string())
.collect();
if active.is_empty() {
std::fs::write("/proc/sys/net/ipv4/ip_forward", b"1\n")?;
if net.name == "pelagos0" {
let _ = run_nft_quiet("delete table ip pelagos\n");
}
let script = build_nat_script(net);
run_nft(&script)?;
let cidr = net.subnet.cidr_string();
while run_quiet("iptables", &["-D", "FORWARD", "-s", &cidr, "-j", "ACCEPT"]).is_ok() {}
while run_quiet("iptables", &["-D", "FORWARD", "-d", &cidr, "-j", "ACCEPT"]).is_ok() {}
let _ = run("iptables", &["-I", "FORWARD", "-s", &cidr, "-j", "ACCEPT"]);
let _ = run("iptables", &["-I", "FORWARD", "-d", &cidr, "-j", "ACCEPT"]);
}
active.push(ns_name.to_string());
file.seek(SeekFrom::Start(0))?;
file.set_len(0)?;
write!(file, "{}", active.join("\n"))?;
Ok(())
}
fn disable_nat(ns_name: &str, net: &NetworkDef) {
let active_path = crate::paths::network_nat_refcount_file(&net.name);
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&active_path);
let mut file = match file {
Ok(f) => f,
Err(e) => {
log::warn!("NAT active-set open (non-fatal): {}", e);
return;
}
};
unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
let mut content = String::new();
if let Err(e) = file.read_to_string(&mut content) {
log::warn!("NAT active-set read (non-fatal): {}", e);
return;
}
let remaining: Vec<String> = content
.lines()
.filter(|l| !l.trim().is_empty())
.filter(|name| *name != ns_name) .filter(|name| netns_exists(name)) .map(|s| s.to_string())
.collect();
let table = net.nft_table_name();
let cidr = net.subnet.cidr_string();
if let Err(e) = file
.seek(SeekFrom::Start(0))
.and_then(|_| file.set_len(0))
.and_then(|_| {
write!(file, "{}", remaining.join("\n"))?;
Ok(())
})
{
log::warn!("NAT active-set write (non-fatal): {}", e);
}
drop(file);
if remaining.is_empty() {
let _ = run("iptables", &["-D", "FORWARD", "-s", &cidr, "-j", "ACCEPT"]);
let _ = run("iptables", &["-D", "FORWARD", "-d", &cidr, "-j", "ACCEPT"]);
if read_port_forwards_count(&net.name) == 0 {
if let Err(e) = run_nft_quiet(&format!("delete table ip {}\n", table)) {
log::warn!("nft delete table {} (non-fatal): {}", table, e);
}
} else {
let _ = run_nft(&format!("flush chain ip {} postrouting\n", table));
}
}
}
type PortForwardEntry = (String, Ipv4Addr, u16, u16, PortProto);
fn parse_port_forward_line(line: &str) -> Option<(String, Ipv4Addr, u16, u16, PortProto)> {
let first_colon = line.find(':')?;
let first = &line[..first_colon];
if let Ok(ip) = first.parse::<Ipv4Addr>() {
let mut parts = line.splitn(4, ':');
let _ip_str = parts.next()?; let host_port: u16 = parts.next()?.parse().ok()?;
let container_port: u16 = parts.next()?.parse().ok()?;
let proto = parts.next().map(PortProto::parse).unwrap_or(PortProto::Tcp);
Some((String::new(), ip, host_port, container_port, proto))
} else {
let mut parts = line.splitn(5, ':');
let ns_name = parts.next()?.to_string();
let ip: Ipv4Addr = parts.next()?.parse().ok()?;
let host_port: u16 = parts.next()?.parse().ok()?;
let container_port: u16 = parts.next()?.parse().ok()?;
let proto = parts.next().map(PortProto::parse).unwrap_or(PortProto::Tcp);
Some((ns_name, ip, host_port, container_port, proto))
}
}
fn read_port_forwards_locked(file: &mut std::fs::File) -> io::Result<Vec<PortForwardEntry>> {
file.seek(SeekFrom::Start(0))?;
let mut content = String::new();
file.read_to_string(&mut content)?;
let entries = content
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(parse_port_forward_line)
.collect();
Ok(entries)
}
fn read_port_forwards_count(network_name: &str) -> usize {
let content =
match std::fs::read_to_string(crate::paths::network_port_forwards_file(network_name)) {
Ok(c) => c,
Err(_) => return 0,
};
content
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(parse_port_forward_line)
.filter(|(ns_name, _, _, _, _)| !ns_name.is_empty() && netns_exists(ns_name))
.count()
}
fn read_nat_refcount(network_name: &str) -> u32 {
let content =
match std::fs::read_to_string(crate::paths::network_nat_refcount_file(network_name)) {
Ok(c) => c,
Err(_) => return 0,
};
content
.lines()
.filter(|l| !l.trim().is_empty())
.filter(|name| netns_exists(name))
.count() as u32
}
fn build_prerouting_script(
net: &NetworkDef,
entries: &[(Ipv4Addr, u16, u16, PortProto)],
) -> String {
let table = net.nft_table_name();
let mut s = format!(
"add table ip {table}\n\
add chain ip {table} prerouting {{ type nat hook prerouting priority -100; }}\n\
flush chain ip {table} prerouting\n",
);
for (ip, host_port, container_port, proto) in entries {
if matches!(proto, PortProto::Tcp | PortProto::Both) {
s.push_str(&format!(
"add rule ip {} prerouting tcp dport {} dnat to {}:{}\n",
table, host_port, ip, container_port
));
}
if matches!(proto, PortProto::Udp | PortProto::Both) {
s.push_str(&format!(
"add rule ip {} prerouting udp dport {} dnat to {}:{}\n",
table, host_port, ip, container_port
));
}
}
s
}
fn enable_port_forwards(
ns_name: &str,
net: &NetworkDef,
container_ip: Ipv4Addr,
forwards: &[(u16, u16, PortProto)],
) -> io::Result<()> {
let pf_path = crate::paths::network_port_forwards_file(&net.name);
if let Some(parent) = pf_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&pf_path)?;
unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
let existing = read_port_forwards_locked(&mut file)?;
let mut live: Vec<PortForwardEntry> = existing
.into_iter()
.filter(|(n, _, _, _, _)| !n.is_empty() && netns_exists(n))
.collect();
for &(host_port, container_port, proto) in forwards {
live.push((
ns_name.to_string(),
container_ip,
host_port,
container_port,
proto,
));
}
file.seek(SeekFrom::Start(0))?;
file.set_len(0)?;
for (ns, ip, hp, cp, proto) in &live {
writeln!(file, "{}:{}:{}:{}:{}", ns, ip, hp, cp, proto.as_str())?;
}
let nft_entries: Vec<(Ipv4Addr, u16, u16, PortProto)> = live
.iter()
.map(|(_, ip, hp, cp, proto)| (*ip, *hp, *cp, *proto))
.collect();
let script = build_prerouting_script(net, &nft_entries);
run_nft(&script)?;
Ok(())
}
fn disable_port_forwards(ns_name: &str, net: &NetworkDef) {
let pf_path = crate::paths::network_port_forwards_file(&net.name);
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&pf_path);
let mut file = match file {
Ok(f) => f,
Err(e) => {
log::warn!("port forwards file open (non-fatal): {}", e);
return;
}
};
unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
let entries = match read_port_forwards_locked(&mut file) {
Ok(e) => e,
Err(e) => {
log::warn!("port forwards read (non-fatal): {}", e);
return;
}
};
let remaining: Vec<PortForwardEntry> = entries
.into_iter()
.filter(|(n, _, _, _, _)| !n.is_empty() && n != ns_name && netns_exists(n))
.collect();
if let Err(e) = file.seek(SeekFrom::Start(0)).and_then(|_| file.set_len(0)) {
log::warn!("port forwards file truncate (non-fatal): {}", e);
return;
}
for (ns, ip, hp, cp, proto) in &remaining {
let _ = writeln!(file, "{}:{}:{}:{}:{}", ns, ip, hp, cp, proto.as_str());
}
drop(file);
let nft_remaining: Vec<(Ipv4Addr, u16, u16, PortProto)> = remaining
.iter()
.map(|(_, ip, hp, cp, proto)| (*ip, *hp, *cp, *proto))
.collect();
let table = net.nft_table_name();
if nft_remaining.is_empty() {
if read_nat_refcount(&net.name) == 0 {
if let Err(e) = run_nft_quiet(&format!("delete table ip {}\n", table)) {
log::warn!("nft delete table {} (non-fatal): {}", table, e);
}
} else {
let _ = run_nft(&format!("flush chain ip {} prerouting\n", table));
}
} else {
let script = build_prerouting_script(net, &nft_remaining);
if let Err(e) = run_nft(&script) {
log::warn!("nft rebuild prerouting (non-fatal): {}", e);
}
}
}
fn start_port_proxies(
container_ip: Ipv4Addr,
forwards: &[(u16, u16, PortProto)],
) -> (
Option<tokio::runtime::Runtime>,
Option<Arc<AtomicBool>>,
Vec<std::thread::JoinHandle<()>>,
) {
let tcp_forwards: Vec<(u16, u16)> = forwards
.iter()
.filter(|(_, _, p)| matches!(p, PortProto::Tcp | PortProto::Both))
.map(|&(h, c, _)| (h, c))
.collect();
let udp_forwards: Vec<(u16, u16)> = forwards
.iter()
.filter(|(_, _, p)| matches!(p, PortProto::Udp | PortProto::Both))
.map(|&(h, c, _)| (h, c))
.collect();
let tcp_runtime = if !tcp_forwards.is_empty() {
Some(start_tcp_proxies_async(container_ip, &tcp_forwards))
} else {
None
};
let (udp_stop, udp_threads) = if !udp_forwards.is_empty() {
let stop = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for (host_port, container_port) in udp_forwards {
let stop_clone = Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
start_udp_proxy(host_port, container_ip, container_port, stop_clone);
}));
}
(Some(stop), handles)
} else {
(None, Vec::new())
};
(tcp_runtime, udp_stop, udp_threads)
}
fn start_tcp_proxies_async(
container_ip: Ipv4Addr,
tcp_forwards: &[(u16, u16)],
) -> tokio::runtime::Runtime {
let workers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.min(4);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(workers)
.enable_io()
.enable_time()
.thread_name("pelagos-tcp-proxy")
.build()
.expect("tokio tcp proxy runtime");
for &(host_port, container_port) in tcp_forwards {
let target = SocketAddr::from((container_ip, container_port));
rt.spawn(tcp_accept_loop(host_port, target));
}
rt
}
async fn tcp_accept_loop(host_port: u16, target: SocketAddr) {
let listener =
match tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], host_port))).await {
Ok(l) => l,
Err(e) => {
log::warn!(
"tcp proxy: cannot bind 0.0.0.0:{}: {} (nftables DNAT still active)",
host_port,
e
);
return;
}
};
log::debug!("tcp proxy: 0.0.0.0:{} -> {}", host_port, target);
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
tokio::spawn(tcp_relay(stream, target));
}
Err(e) => {
log::warn!("tcp proxy accept error on port {}: {}", host_port, e);
break;
}
}
}
}
async fn tcp_relay(mut client: tokio::net::TcpStream, target: SocketAddr) {
let mut upstream = match tokio::time::timeout(
std::time::Duration::from_secs(5),
tokio::net::TcpStream::connect(target),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
log::debug!("tcp proxy: cannot connect to {}: {}", target, e);
return;
}
Err(_) => {
log::debug!("tcp proxy: connect to {} timed out", target);
return;
}
};
if let Err(e) = tokio::io::copy_bidirectional(&mut client, &mut upstream).await {
log::debug!("tcp proxy relay error: {}", e);
}
}
fn start_udp_proxy(
host_port: u16,
container_ip: Ipv4Addr,
container_port: u16,
stop: Arc<AtomicBool>,
) {
let inbound = match UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], host_port))) {
Ok(s) => Arc::new(s),
Err(e) => {
log::warn!(
"udp proxy: cannot bind 0.0.0.0:{}: {} (nftables DNAT still active)",
host_port,
e
);
return;
}
};
inbound
.set_read_timeout(Some(Duration::from_millis(100)))
.expect("set_read_timeout");
let target = SocketAddr::from((container_ip, container_port));
log::debug!("udp proxy: 0.0.0.0:{} -> {}", host_port, target);
type SessionMap = HashMap<SocketAddr, (Arc<UdpSocket>, Instant)>;
let sessions: Arc<Mutex<SessionMap>> = Arc::new(Mutex::new(HashMap::new()));
let mut buf = [0u8; 65535];
let mut reply_handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
while !stop.load(Ordering::Relaxed) {
match inbound.recv_from(&mut buf) {
Ok((n, client_addr)) => {
let data = buf[..n].to_vec();
let (outbound, spawned) = {
let mut map = sessions.lock().unwrap();
map.retain(|_, (_, last)| last.elapsed() < Duration::from_secs(30));
if let Some((sock, last)) = map.get_mut(&client_addr) {
*last = Instant::now();
(Arc::clone(sock), None)
} else {
let sock = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => Arc::new(s),
Err(e) => {
log::warn!("udp proxy: outbound bind failed: {}", e);
continue;
}
};
if let Err(e) = sock.connect(target) {
log::warn!("udp proxy: connect to {} failed: {}", target, e);
continue;
}
map.insert(client_addr, (Arc::clone(&sock), Instant::now()));
let reply_sock = Arc::clone(&sock);
let inbound_ref = Arc::clone(&inbound);
let stop2 = Arc::clone(&stop);
let handle = std::thread::spawn(move || {
let mut rbuf = [0u8; 65535];
reply_sock
.set_read_timeout(Some(Duration::from_millis(100)))
.ok();
while !stop2.load(Ordering::Relaxed) {
match reply_sock.recv(&mut rbuf) {
Ok(m) => {
let _ = inbound_ref.send_to(&rbuf[..m], client_addr);
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock
|| e.kind() == io::ErrorKind::TimedOut => {}
Err(_) => break,
}
}
});
(sock, Some(handle))
}
};
if let Some(h) = spawned {
reply_handles.push(h);
reply_handles.retain(|h| !h.is_finished());
}
if let Err(e) = outbound.send(&data) {
log::debug!("udp proxy: forward to {} failed: {}", target, e);
}
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
}
Err(e) => {
if !stop.load(Ordering::Relaxed) {
log::warn!("udp proxy recv_from error on port {}: {}", host_port, e);
}
break;
}
}
}
for handle in reply_handles {
let _ = handle.join();
}
}
pub fn setup_pasta_network(
child_pid: u32,
port_forwards: &[(u16, u16, PortProto)],
) -> io::Result<PastaSetup> {
let mut args: Vec<String> = vec![];
let running_as_root = unsafe { libc::geteuid() } == 0;
for &(host, container, proto) in port_forwards {
if matches!(proto, PortProto::Tcp | PortProto::Both) {
args.push("-t".to_string());
args.push(format!("{}:{}", host, container));
}
if matches!(proto, PortProto::Udp | PortProto::Both) {
args.push("-u".to_string());
args.push(format!("{}:{}", host, container));
}
}
args.push("--config-net".to_string());
args.push("--foreground".to_string());
let ns_bind_mount = if running_as_root {
let ns_dir = std::path::Path::new("/run/pelagos/pasta-ns");
std::fs::create_dir_all(ns_dir)?;
let mount_path = ns_dir.join(format!("{}", child_pid));
std::fs::write(&mount_path, b"")
.map_err(|e| io::Error::new(e.kind(), format!("create netns mount point: {}", e)))?;
let src = std::ffi::CString::new(format!("/proc/{}/ns/net", child_pid)).unwrap();
let dst = std::ffi::CString::new(mount_path.as_os_str().as_encoded_bytes()).unwrap();
let fstype = std::ffi::CString::new("").unwrap();
let rc = unsafe {
libc::mount(
src.as_ptr(),
dst.as_ptr(),
fstype.as_ptr(),
libc::MS_BIND,
std::ptr::null(),
)
};
if rc == -1 {
let _ = std::fs::remove_file(&mount_path);
return Err(io::Error::new(
io::Error::last_os_error().kind(),
format!(
"mount --bind /proc/{}/ns/net {}: {}",
child_pid,
mount_path.display(),
io::Error::last_os_error()
),
));
}
args.push("--netns".to_string());
args.push(mount_path.to_string_lossy().into_owned());
args.push("--runas".to_string());
args.push("0".to_string());
Some(mount_path)
} else {
args.push(child_pid.to_string());
None
};
let mut pipe_fds = [-1i32; 2];
let pipe_rc = unsafe { libc::pipe2(pipe_fds.as_mut_ptr(), libc::O_CLOEXEC) };
if pipe_rc != 0 {
if let Some(ref p) = ns_bind_mount {
unsafe {
libc::umount2(
p.as_os_str().as_encoded_bytes().as_ptr() as *const libc::c_char,
libc::MNT_DETACH,
)
};
let _ = std::fs::remove_file(p);
}
return Err(io::Error::last_os_error());
}
let (pipe_read_fd, pipe_write_fd) = (pipe_fds[0], pipe_fds[1]);
let pipe_write_fd_err = unsafe { libc::dup(pipe_write_fd) };
if pipe_write_fd_err < 0 {
unsafe {
libc::close(pipe_read_fd);
libc::close(pipe_write_fd);
}
if let Some(ref p) = ns_bind_mount {
unsafe {
libc::umount2(
p.as_os_str().as_encoded_bytes().as_ptr() as *const libc::c_char,
libc::MNT_DETACH,
)
};
let _ = std::fs::remove_file(p);
}
return Err(io::Error::last_os_error());
}
unsafe {
libc::fcntl(pipe_write_fd_err, libc::F_SETFD, libc::FD_CLOEXEC);
}
let stdout_stdio = unsafe { std::process::Stdio::from_raw_fd(pipe_write_fd) };
let stderr_stdio = unsafe { std::process::Stdio::from_raw_fd(pipe_write_fd_err) };
let mut process = SysCmd::new("pasta")
.args(&args)
.stdin(std::process::Stdio::null())
.stdout(stdout_stdio) .stderr(stderr_stdio) .spawn()
.map_err(|e| {
unsafe { libc::close(pipe_read_fd) };
if let Some(ref p) = ns_bind_mount {
unsafe {
libc::umount2(
p.as_os_str().as_encoded_bytes().as_ptr() as *const libc::c_char,
libc::MNT_DETACH,
)
};
let _ = std::fs::remove_file(p);
}
io::Error::other(format!("failed to start pasta (is it installed?): {}", e))
})?;
let output_thread = {
use std::io::Read;
use std::os::unix::io::FromRawFd;
let pipe_read_file = unsafe { std::fs::File::from_raw_fd(pipe_read_fd) };
Some(std::thread::spawn(move || {
let mut out = String::new();
let _ = std::io::BufReader::new(pipe_read_file).read_to_string(&mut out);
out
}))
};
wait_for_pasta_network(child_pid, &mut process);
Ok(PastaSetup {
process,
output_thread,
ns_bind_mount,
})
}
fn wait_for_pasta_network(child_pid: u32, process: &mut std::process::Child) {
let net_dev = format!("/proc/{}/net/dev", child_pid);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
if let Ok(Some(status)) = process.try_wait() {
log::warn!(
"pasta exited (status: {}) before TAP interface appeared in \
/proc/{}/net/dev — network setup failed; stdout/stderr output \
will be logged at teardown",
status,
child_pid
);
return;
}
if let Ok(contents) = std::fs::read_to_string(&net_dev) {
let has_non_lo = contents
.lines()
.skip(2) .any(|line| {
let iface = line.trim().split(':').next().unwrap_or("").trim();
!iface.is_empty() && iface != "lo"
});
if has_non_lo {
log::debug!(
"pasta: TAP interface appeared in /proc/{}/net/dev",
child_pid
);
return;
}
}
if std::time::Instant::now() >= deadline {
log::warn!(
"pasta network setup timeout (5s) — no non-loopback interface in \
/proc/{}/net/dev; container will proceed without pasta networking",
child_pid
);
return;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
}
pub fn teardown_pasta_network(setup: &mut PastaSetup) {
let _ = setup.process.kill();
let _ = setup.process.wait();
if let Some(thread) = setup.output_thread.take() {
match thread.join() {
Ok(out) if !out.trim().is_empty() => {
log::warn!("pasta output:\n{}", out.trim());
}
Ok(_) => {
log::debug!("pasta output: (empty)");
}
Err(_) => {
log::debug!("pasta output reader thread panicked");
}
}
}
if let Some(ref p) = setup.ns_bind_mount {
let rc = unsafe {
libc::umount2(
p.as_os_str().as_encoded_bytes().as_ptr() as *const libc::c_char,
libc::MNT_DETACH,
)
};
if rc == -1 {
log::debug!(
"pasta netns bind-mount umount2 failed for {}: {}",
p.display(),
io::Error::last_os_error()
);
}
let _ = std::fs::remove_file(p);
}
}
pub fn is_pasta_available() -> bool {
SysCmd::new("pasta")
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
fn run(cmd: &str, args: &[&str]) -> io::Result<()> {
let status = SysCmd::new(cmd).args(args).status()?;
if status.success() {
Ok(())
} else {
Err(io::Error::other(format!(
"`{} {}` exited with {}",
cmd,
args.join(" "),
status
)))
}
}
fn run_quiet(cmd: &str, args: &[&str]) -> io::Result<()> {
use std::process::Stdio as ProcStdio;
let status = SysCmd::new(cmd)
.args(args)
.stderr(ProcStdio::null())
.status()?;
if status.success() {
Ok(())
} else {
Err(io::Error::other(format!(
"`{} {}` exited with {}",
cmd,
args.join(" "),
status
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ipv4net_parse_valid() {
let net = Ipv4Net::from_cidr("10.88.1.0/24").unwrap();
assert_eq!(net.addr, Ipv4Addr::new(10, 88, 1, 0));
assert_eq!(net.prefix_len, 24);
}
#[test]
fn test_ipv4net_parse_invalid() {
assert!(Ipv4Net::from_cidr("not-a-cidr").is_err());
assert!(Ipv4Net::from_cidr("10.0.0.0/33").is_err());
assert!(Ipv4Net::from_cidr("10.0.0.0").is_err());
assert!(Ipv4Net::from_cidr("999.0.0.0/24").is_err());
}
#[test]
fn test_ipv4net_network_broadcast() {
let net = Ipv4Net::from_cidr("10.88.1.0/24").unwrap();
assert_eq!(net.network(), Ipv4Addr::new(10, 88, 1, 0));
assert_eq!(net.broadcast(), Ipv4Addr::new(10, 88, 1, 255));
}
#[test]
fn test_ipv4net_gateway_hosts() {
let net = Ipv4Net::from_cidr("172.19.0.0/24").unwrap();
assert_eq!(net.gateway(), Ipv4Addr::new(172, 19, 0, 1));
assert_eq!(net.host_min(), Ipv4Addr::new(172, 19, 0, 2));
assert_eq!(net.host_max(), Ipv4Addr::new(172, 19, 0, 254));
}
#[test]
fn test_ipv4net_contains() {
let net = Ipv4Net::from_cidr("10.88.1.0/24").unwrap();
assert!(net.contains(Ipv4Addr::new(10, 88, 1, 5)));
assert!(net.contains(Ipv4Addr::new(10, 88, 1, 254)));
assert!(!net.contains(Ipv4Addr::new(10, 88, 2, 1)));
assert!(!net.contains(Ipv4Addr::new(192, 168, 1, 1)));
}
#[test]
fn test_ipv4net_overlaps() {
let a = Ipv4Net::from_cidr("10.88.0.0/16").unwrap();
let b = Ipv4Net::from_cidr("10.88.1.0/24").unwrap();
assert!(a.overlaps(&b));
assert!(b.overlaps(&a));
let c = Ipv4Net::from_cidr("10.89.0.0/16").unwrap();
assert!(!a.overlaps(&c));
}
#[test]
fn test_ipv4net_no_overlap_disjoint() {
let a = Ipv4Net::from_cidr("10.0.0.0/24").unwrap();
let b = Ipv4Net::from_cidr("10.0.1.0/24").unwrap();
assert!(!a.overlaps(&b));
}
#[test]
fn test_ipv4net_cidr_string() {
let net = Ipv4Net::from_cidr("10.88.1.0/24").unwrap();
assert_eq!(net.cidr_string(), "10.88.1.0/24");
assert_eq!(net.gateway_cidr(), "10.88.1.1/24");
}
#[test]
fn test_ipv4net_slash16() {
let net = Ipv4Net::from_cidr("10.0.0.0/16").unwrap();
assert_eq!(net.network(), Ipv4Addr::new(10, 0, 0, 0));
assert_eq!(net.broadcast(), Ipv4Addr::new(10, 0, 255, 255));
assert_eq!(net.gateway(), Ipv4Addr::new(10, 0, 0, 1));
assert_eq!(net.host_min(), Ipv4Addr::new(10, 0, 0, 2));
assert_eq!(net.host_max(), Ipv4Addr::new(10, 0, 255, 254));
}
#[test]
fn test_network_def_serde_roundtrip() {
let net = NetworkDef {
name: "frontend".to_string(),
subnet: Ipv4Net::from_cidr("10.88.1.0/24").unwrap(),
gateway: Ipv4Addr::new(10, 88, 1, 1),
bridge_name: "rm-frontend".to_string(),
};
let json = serde_json::to_string(&net).unwrap();
let parsed: NetworkDef = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.name, "frontend");
assert_eq!(parsed.subnet, net.subnet);
assert_eq!(parsed.gateway, net.gateway);
assert_eq!(parsed.bridge_name, "rm-frontend");
}
#[test]
fn test_network_def_nft_table_name() {
let net = NetworkDef {
name: "frontend".to_string(),
subnet: Ipv4Net::from_cidr("10.88.1.0/24").unwrap(),
gateway: Ipv4Addr::new(10, 88, 1, 1),
bridge_name: "rm-frontend".to_string(),
};
assert_eq!(net.nft_table_name(), "pelagos-frontend");
}
#[test]
fn test_network_mode_is_bridge() {
assert!(!NetworkMode::None.is_bridge());
assert!(!NetworkMode::Loopback.is_bridge());
assert!(NetworkMode::Bridge.is_bridge());
assert!(NetworkMode::BridgeNamed("frontend".into()).is_bridge());
assert!(!NetworkMode::Pasta.is_bridge());
}
#[test]
fn test_veth_names_for_network_unique() {
let (h1, p1) = super::veth_names_for("rem-123-0");
let (h2, p2) = super::veth_names_for_network("rem-123-0", "frontend");
let (h3, p3) = super::veth_names_for_network("rem-123-0", "backend");
assert_ne!(h1, h2);
assert_ne!(p1, p2);
assert_ne!(h2, h3);
assert_ne!(p2, p3);
assert!(h1.len() <= 15);
assert!(h2.len() <= 15);
assert!(h3.len() <= 15);
}
#[test]
fn test_port_proto_parse() {
assert_eq!(PortProto::parse("tcp"), PortProto::Tcp);
assert_eq!(PortProto::parse("udp"), PortProto::Udp);
assert_eq!(PortProto::parse("both"), PortProto::Both);
assert_eq!(PortProto::parse(""), PortProto::Tcp);
assert_eq!(PortProto::parse("sctp"), PortProto::Tcp);
}
#[test]
fn test_port_proto_as_str() {
assert_eq!(PortProto::Tcp.as_str(), "tcp");
assert_eq!(PortProto::Udp.as_str(), "udp");
assert_eq!(PortProto::Both.as_str(), "both");
}
#[test]
fn test_parse_port_forward_line_with_proto() {
let r = parse_port_forward_line("192.168.1.5:8080:80:udp").unwrap();
assert_eq!(r.0, ""); assert_eq!(r.1, "192.168.1.5".parse::<std::net::Ipv4Addr>().unwrap());
assert_eq!(r.2, 8080);
assert_eq!(r.3, 80);
assert_eq!(r.4, PortProto::Udp);
}
#[test]
fn test_parse_port_forward_line_backwards_compat() {
let r = parse_port_forward_line("10.0.0.2:443:443").unwrap();
assert_eq!(r.0, ""); assert_eq!(r.4, PortProto::Tcp);
}
#[test]
fn test_parse_port_forward_line_both_proto() {
let r = parse_port_forward_line("172.19.0.2:53:53:both").unwrap();
assert_eq!(r.4, PortProto::Both);
}
#[test]
fn test_parse_port_forward_line_new_format() {
let r = parse_port_forward_line("rem-1234-0:10.0.0.3:8080:80:tcp").unwrap();
assert_eq!(r.0, "rem-1234-0");
assert_eq!(r.1, "10.0.0.3".parse::<std::net::Ipv4Addr>().unwrap());
assert_eq!(r.2, 8080);
assert_eq!(r.3, 80);
assert_eq!(r.4, PortProto::Tcp);
}
#[test]
fn test_build_prerouting_script_tcp_only() {
use std::net::Ipv4Addr;
let net = NetworkDef {
name: "test".to_string(),
subnet: Ipv4Net::from_cidr("172.19.0.0/24").unwrap(),
gateway: Ipv4Addr::new(172, 19, 0, 1),
bridge_name: "pelagos0".to_string(),
};
let ip = Ipv4Addr::new(172, 19, 0, 2);
let entries = vec![(ip, 8080u16, 80u16, PortProto::Tcp)];
let script = build_prerouting_script(&net, &entries);
assert!(script.contains("tcp dport 8080 dnat to 172.19.0.2:80"));
assert!(!script.contains("udp"));
}
#[test]
fn test_build_prerouting_script_udp_only() {
use std::net::Ipv4Addr;
let net = NetworkDef {
name: "test".to_string(),
subnet: Ipv4Net::from_cidr("172.19.0.0/24").unwrap(),
gateway: Ipv4Addr::new(172, 19, 0, 1),
bridge_name: "pelagos0".to_string(),
};
let ip = Ipv4Addr::new(172, 19, 0, 2);
let entries = vec![(ip, 5353u16, 53u16, PortProto::Udp)];
let script = build_prerouting_script(&net, &entries);
assert!(script.contains("udp dport 5353 dnat to 172.19.0.2:53"));
assert!(!script.contains("tcp dport"));
}
#[test]
fn test_build_prerouting_script_both() {
use std::net::Ipv4Addr;
let net = NetworkDef {
name: "test".to_string(),
subnet: Ipv4Net::from_cidr("172.19.0.0/24").unwrap(),
gateway: Ipv4Addr::new(172, 19, 0, 1),
bridge_name: "pelagos0".to_string(),
};
let ip = Ipv4Addr::new(172, 19, 0, 2);
let entries = vec![(ip, 53u16, 53u16, PortProto::Both)];
let script = build_prerouting_script(&net, &entries);
assert!(script.contains("tcp dport 53 dnat to 172.19.0.2:53"));
assert!(script.contains("udp dport 53 dnat to 172.19.0.2:53"));
}
#[test]
fn test_network_mode_bridge_network_name() {
assert_eq!(NetworkMode::Bridge.bridge_network_name(), Some("pelagos0"));
assert_eq!(
NetworkMode::BridgeNamed("test".into()).bridge_network_name(),
Some("test")
);
assert_eq!(NetworkMode::None.bridge_network_name(), None);
assert_eq!(NetworkMode::Loopback.bridge_network_name(), None);
}
fn spawn_echo_server() -> (SocketAddr, Arc<AtomicBool>) {
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").expect("echo server bind");
let addr = listener.local_addr().unwrap();
let stop = Arc::new(AtomicBool::new(false));
let stop2 = Arc::clone(&stop);
std::thread::spawn(move || {
listener.set_nonblocking(true).unwrap();
while !stop2.load(Ordering::Relaxed) {
match listener.accept() {
Ok((mut stream, _)) => {
stream
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.ok();
let mut buf = vec![0u8; 4096];
while let Ok(n) = stream.read(&mut buf) {
if n == 0 {
break;
}
let _ = stream.write_all(&buf[..n]);
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(_) => break,
}
}
});
(addr, stop)
}
#[test]
fn test_tcp_proxy_bidirectional_relay() {
use std::io::{Read, Write};
let (server_addr, server_stop) = spawn_echo_server();
let container_ip: Ipv4Addr = server_addr.ip().to_string().parse().unwrap();
let container_port = server_addr.port();
let rt = start_tcp_proxies_async(container_ip, &[(0, container_port)]);
std::thread::sleep(std::time::Duration::from_millis(100));
drop(rt);
server_stop.store(true, Ordering::Relaxed);
let proxy_port: u16 = 19290;
let (server_addr2, server_stop2) = spawn_echo_server();
let container_ip2: Ipv4Addr = server_addr2.ip().to_string().parse().unwrap();
let container_port2 = server_addr2.port();
let rt2 = start_tcp_proxies_async(container_ip2, &[(proxy_port, container_port2)]);
std::thread::sleep(std::time::Duration::from_millis(100));
let payload = b"hello-from-client";
let mut response = vec![0u8; payload.len()];
{
let mut conn =
std::net::TcpStream::connect(format!("127.0.0.1:{}", proxy_port)).unwrap();
conn.set_read_timeout(Some(std::time::Duration::from_secs(3)))
.ok();
conn.write_all(payload).unwrap();
conn.shutdown(std::net::Shutdown::Write).unwrap();
conn.read_exact(&mut response).unwrap();
}
drop(rt2);
server_stop2.store(true, Ordering::Relaxed);
assert_eq!(
&response, payload,
"proxy should relay bytes unchanged in both directions"
);
}
#[test]
fn test_tcp_proxy_concurrent_relay() {
use std::io::{Read, Write};
let proxy_port: u16 = 19291;
let (server_addr, server_stop) = spawn_echo_server();
let container_ip: Ipv4Addr = server_addr.ip().to_string().parse().unwrap();
let container_port = server_addr.port();
let rt = start_tcp_proxies_async(container_ip, &[(proxy_port, container_port)]);
std::thread::sleep(std::time::Duration::from_millis(100));
const N: usize = 8;
let handles: Vec<_> = (0..N)
.map(|i| {
std::thread::spawn(move || -> bool {
let payload = format!("payload-{:02}", i);
let mut conn =
match std::net::TcpStream::connect(format!("127.0.0.1:{}", proxy_port)) {
Ok(c) => c,
Err(_) => return false,
};
conn.set_read_timeout(Some(std::time::Duration::from_secs(3)))
.ok();
if conn.write_all(payload.as_bytes()).is_err() {
return false;
}
conn.shutdown(std::net::Shutdown::Write).ok();
let mut buf = vec![0u8; payload.len()];
if conn.read_exact(&mut buf).is_err() {
return false;
}
buf == payload.as_bytes()
})
})
.collect();
let results: Vec<bool> = handles
.into_iter()
.map(|h| h.join().unwrap_or(false))
.collect();
drop(rt);
server_stop.store(true, Ordering::Relaxed);
let failures: Vec<usize> = results
.iter()
.enumerate()
.filter(|(_, &ok)| !ok)
.map(|(i, _)| i)
.collect();
assert!(
failures.is_empty(),
"concurrent relay failed for connections: {:?}",
failures
);
}
#[test]
fn test_tcp_proxy_runtime_cleanup() {
let proxy_port: u16 = 19292;
let (server_addr, server_stop) = spawn_echo_server();
let container_ip: Ipv4Addr = server_addr.ip().to_string().parse().unwrap();
let container_port = server_addr.port();
let rt = start_tcp_proxies_async(container_ip, &[(proxy_port, container_port)]);
std::thread::sleep(std::time::Duration::from_millis(100));
assert!(
std::net::TcpListener::bind(format!("0.0.0.0:{}", proxy_port)).is_err(),
"port should be bound while runtime is alive"
);
rt.shutdown_background();
std::thread::sleep(std::time::Duration::from_millis(200));
server_stop.store(true, Ordering::Relaxed);
assert!(
std::net::TcpListener::bind(format!("0.0.0.0:{}", proxy_port)).is_ok(),
"port should be released after runtime shutdown"
);
}
}