use std::collections::VecDeque;
use std::io::{self, ErrorKind};
use std::mem::MaybeUninit;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::Command;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Context as _;
use bytes::Bytes;
use futures_util::{Future, StreamExt};
use mio::unix::SourceFd;
use mio::{event, Interest, Registry, Token};
use tun_tap::{Iface, Mode};
use virtual_mio::{InterestGuard, InterestWakerMap, Selector};
use virtual_net::meta::FrameSerializationFormat;
use virtual_net::{
io_err_into_net_error, IpAddr, IpCidr, IpRoute, Ipv4Addr, Ipv6Addr, NetworkError,
RemoteNetworkingClient, RemoteNetworkingClientDriver, VirtualNetworking, VirtualRawSocket,
};
fn cmd(cmd: &str, args: &[&str]) -> anyhow::Result<()> {
let ecode = Command::new(cmd).args(args).spawn()?.wait()?;
if ecode.success() {
Ok(())
} else {
Err(anyhow::format_err!(
"failed to execute linux command (cmd={}, args={:?}, code={:?})",
cmd,
args,
ecode.code()
))
}
}
pub struct TunTapSocket {}
impl TunTapSocket {
pub async fn create(
selector: Arc<Selector>,
url: url::Url,
bring_up: bool,
static_ips: Vec<IpAddr>,
) -> anyhow::Result<TunTapDriver> {
tracing::info!("creating tun/tap device");
let iface = match Iface::without_packet_info("edge%d", Mode::Tun) {
Ok(i) => i,
Err(err) => {
tracing::error!(
"This process does not have permissions to open TUN/TAP sockets - {err}"
);
return Err(err).context("failed to open TUN/TAP socket");
}
};
tracing::info!("connecting to {url}");
let (stream, _res) = tokio_tungstenite::connect_async(url).await?;
let (tx, rx) = stream.split();
tracing::info!("established web socket connection to the edge");
let (remote, mut remote_driver) =
RemoteNetworkingClient::new_from_tokio_ws_io(tx, rx, FrameSerializationFormat::Bincode);
tracing::info!("creating RAW socket");
let mut client = tokio::select! {
a = remote.bind_raw() => a,
_ = Pin::new(&mut remote_driver) => {
return Err(anyhow::format_err!("the driver closed before we could create the RAW socket"));
}
}?;
for ip in static_ips {
tracing::info!("setting static IP ({ip})");
let res: anyhow::Result<()> = tokio::select! {
a = async {
remote.ip_clear().await?;
remote.ip_add(ip, match ip.is_ipv4() {
true => 24,
false => 120,
}).await?;
Ok(())
} => a,
_ = Pin::new(&mut remote_driver) => {
return Err(anyhow::format_err!("the driver closed before it could set the static IP address"));
}
};
res?;
}
tracing::info!("getting remote socket addresses");
let ips = tokio::select! {
a = remote.ip_list() => a,
_ = Pin::new(&mut remote_driver) => {
return Err(anyhow::format_err!("the driver closed before we could the mac and IP addresses"));
}
}?;
let routes = tokio::select! {
a = remote.route_list() => a,
_ = Pin::new(&mut remote_driver) => {
return Err(anyhow::format_err!("the driver closed before we could retrieve the routes"));
}
}?;
if bring_up {
tracing::info!("bringing up device ({})", iface.name());
interfaces::Interface::get_by_name(iface.name())?
.ok_or_else(|| anyhow::format_err!("The TUN/TAP interface could not be found"))?
.set_up(true)?;
for ip in ips.iter() {
let ip_str = ip.ip.to_string();
let prefix = ip.prefix;
let ip_cidr = format!("{ip_str}/{prefix}");
println!("\rAssigning IP address ({ip_cidr})");
if cmd(
"ip",
&["address", "add", ip_cidr.as_str(), "dev", iface.name()],
)
.is_err()
{
println!(
"\rEscalating to elevated rights (sudo) due to limited user permissions"
);
cmd(
"sudo",
&[
"ip",
"address",
"add",
ip_cidr.as_str(),
"dev",
iface.name(),
],
)?;
}
}
for mut route in routes.iter().cloned() {
route.cidr.prefix = 8;
if route.cidr.ip.is_ipv6() {
route.cidr.ip = IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0));
} else {
route.cidr.ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0));
}
let ip_str = route.cidr.ip.to_string();
let prefix = route.cidr.prefix;
let ip_cidr = format!("{ip_str}/{prefix}");
let via_str = route.via_router.to_string();
let via = via_str.to_string();
println!("\rAssigning route ({ip_cidr})");
if cmd(
"ip",
&[
"route",
"add",
ip_cidr.as_str(),
"via",
via.as_str(),
"dev",
iface.name(),
],
)
.is_err()
{
println!(
"\rEscalating to elevated rights (sudo) due to limited user permissions"
);
cmd(
"sudo",
&[
"ip",
"route",
"add",
ip_cidr.as_str(),
"via",
via.as_str(),
"dev",
iface.name(),
],
)?;
}
}
}
tracing::info!("switching to non-blocking IO");
set_non_blocking(&iface)?;
let handler = InterestWakerMap::default();
let mut source = MioWrapper {
fd: iface.as_raw_fd(),
};
let interest = InterestGuard::new(
&selector,
Box::new(handler.clone()),
&mut source,
mio::Interest::READABLE.add(mio::Interest::WRITABLE),
)
.map_err(io_err_into_net_error)?;
client.set_handler(Box::new(handler.clone()))?;
let driver = TunTapDriver {
iface,
handler,
send_queue: Default::default(),
_interest: interest,
client,
ips,
routes,
remote,
remote_driver,
};
Ok(driver)
}
}
fn set_non_blocking(iface: &Iface) -> anyhow::Result<()> {
let fd = iface.as_raw_fd();
let mut nonblock: libc::c_int = 1;
let result = unsafe { libc::ioctl(fd, libc::FIONBIO, &mut nonblock) };
if result == -1 {
Err(std::io::Error::last_os_error().into())
} else {
Ok(())
}
}
pub struct TunTapDriver {
iface: Iface,
handler: InterestWakerMap,
send_queue: VecDeque<Bytes>,
_interest: InterestGuard,
client: Box<dyn VirtualRawSocket + Sync + 'static>,
ips: Vec<IpCidr>,
routes: Vec<IpRoute>,
remote: RemoteNetworkingClient,
remote_driver: RemoteNetworkingClientDriver,
}
impl TunTapDriver {
pub fn client(&self) -> &RemoteNetworkingClient {
&self.remote
}
pub fn ips(&self) -> &Vec<IpCidr> {
&self.ips
}
pub fn routes(&self) -> &Vec<IpRoute> {
&self.routes
}
}
impl Future for TunTapDriver {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.remote_driver).poll(cx) {
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {}
}
self.handler
.add(virtual_mio::InterestType::Readable, cx.waker());
self.handler
.add(virtual_mio::InterestType::Writable, cx.waker());
while let Some(packet) = self.send_queue.pop_front() {
if self.client.try_send(&packet).is_ok() == false {
self.send_queue.push_front(packet);
break;
}
}
if self.send_queue.is_empty() {
let mut chunk = [0u8; 10240];
loop {
match self.iface.recv(&mut chunk) {
Ok(0) => return Poll::Ready(Err(ErrorKind::ConnectionReset.into())),
Ok(data) => {
let packet = &chunk[..data];
if self.client.try_send(packet).is_ok() == false {
self.send_queue.push_back(Bytes::copy_from_slice(packet));
break;
}
continue;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => return Poll::Ready(Err(err)),
};
}
}
let mut chunk: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
loop {
let chunk_unsafe: &mut [MaybeUninit<u8>] = &mut chunk[..];
match self.client.try_recv(chunk_unsafe) {
Ok(0) => return Poll::Ready(Err(ErrorKind::ConnectionReset.into())),
Ok(data) => {
let chunk_unsafe: &mut [u8] = unsafe { std::mem::transmute(chunk_unsafe) };
let packet = &chunk_unsafe[..data];
self.iface.send(packet).ok();
continue;
}
Err(NetworkError::WouldBlock) => {
break;
}
Err(err) => {
tracing::error!("packet recv error - {}", err);
return Poll::Ready(Err(ErrorKind::Other.into()));
}
}
}
Poll::Pending
}
}
struct MioWrapper {
fd: RawFd,
}
impl event::Source for MioWrapper {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
SourceFd(&self.fd).register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
SourceFd(&self.fd).reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.fd).deregister(registry)
}
}