#![cfg_attr(not(unix), allow(unused_imports))]
use crate::dhcp_proxy::cache::{Clear, LeaseCache};
use crate::dhcp_proxy::dhcp_service::{process_client_stream, DhcpV4Service};
use crate::dhcp_proxy::ip;
use crate::dhcp_proxy::lib::g_rpc::netavark_proxy_server::{NetavarkProxy, NetavarkProxyServer};
use crate::dhcp_proxy::lib::g_rpc::{
Empty, Lease as NetavarkLease, NetworkConfig, OperationResponse,
};
use crate::dhcp_proxy::proxy_conf::{
get_cache_fqname, get_proxy_sock_fqname, DEFAULT_INACTIVITY_TIMEOUT, DEFAULT_TIMEOUT,
};
use crate::error::{NetavarkError, NetavarkResult};
use crate::network::core_utils;
use clap::Parser;
use log::{debug, error, warn};
use tokio::task::AbortHandle;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::os::unix::io::FromRawFd;
use std::os::unix::net::UnixListener as stdUnixListener;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::{env, fs};
#[cfg(unix)]
use tokio::net::UnixListener;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{timeout, Duration};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{
transport::Server, Code, Code::Internal, Code::InvalidArgument, Request, Response, Status,
};
#[derive(Debug)]
struct NetavarkProxyService<W: Write + Clear> {
cache: Arc<Mutex<LeaseCache<W>>>,
dora_timeout: u32,
timeout_sender: Option<Arc<Mutex<Sender<i32>>>>,
task_map: Arc<Mutex<HashMap<String, AbortHandle>>>,
}
impl<W: Write + Clear> NetavarkProxyService<W> {
fn reset_inactivity_timeout(&self) {
if let Some(sender) = &self.timeout_sender {
let sender_clone = sender.clone();
let locked_sender = match sender_clone.lock() {
Ok(v) => v,
Err(e) => {
log::error!("{e}");
return;
}
};
match locked_sender.try_send(1) {
Ok(..) => {}
Err(e) => log::error!("{e}"),
}
}
}
}
#[tonic::async_trait]
impl<W: Write + Clear + Send + 'static> NetavarkProxy for NetavarkProxyService<W> {
async fn setup(
&self,
request: Request<NetworkConfig>,
) -> Result<Response<NetavarkLease>, Status> {
debug!("Request from client {:?}", request.remote_addr());
self.reset_inactivity_timeout();
let cache = self.cache.clone();
let timeout = self.dora_timeout;
let task_map = self.task_map.clone();
let network_config = request.into_inner();
let (_tx, mut rx) = oneshot::channel::<()>();
let lease = tokio::task::spawn(async move {
if rx.try_recv() == Err(TryRecvError::Closed) {
log::debug!("Request dropped, aborting DORA");
return Err(Status::new(Code::Aborted, "client disconnected"));
}
let get_lease = process_setup(network_config, timeout, cache, task_map);
let get_lease: NetavarkLease = tokio::select! {
_ = &mut rx => {
log::debug!("Request dropped, aborting DORA");
return Err(Status::new(Code::Aborted, "client disconnected"))
}
lease = get_lease => {
Ok::<NetavarkLease, Status>(lease?)
}
}?;
if rx.try_recv() == Err(TryRecvError::Closed) {
log::debug!("Request dropped, aborting DORA");
return Err(Status::new(Code::Aborted, "client disconnected"));
}
Ok(get_lease)
})
.await;
return match lease {
Ok(Ok(lease)) => Ok(Response::new(lease)),
Ok(Err(status)) => Err(status),
Err(e) => Err(Status::new(Code::Unknown, e.to_string())),
};
}
async fn teardown(
&self,
request: Request<NetworkConfig>,
) -> Result<Response<NetavarkLease>, Status> {
self.reset_inactivity_timeout();
let nc = request.into_inner();
let cache = self.cache.clone();
let tasks = self.task_map.clone();
let task = tasks
.lock()
.expect("lock tasks")
.remove(&nc.container_mac_addr);
if let Some(handle) = task {
handle.abort();
}
let lease = cache
.lock()
.expect("Could not unlock cache. A thread was poisoned")
.remove_lease(&nc.container_mac_addr)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(lease))
}
async fn clean(&self, request: Request<Empty>) -> Result<Response<OperationResponse>, Status> {
debug!("Request from client: {:?}", request.remote_addr());
self.cache
.clone()
.lock()
.expect("Could not unlock cache. A thread was poisoned")
.teardown()?;
Ok(Response::new(OperationResponse { success: true }))
}
}
#[derive(Parser, Debug)]
#[clap(version = env ! ("CARGO_PKG_VERSION"))]
pub struct Opts {
#[clap(short, long)]
dir: Option<String>,
#[clap(short, long)]
uds: Option<String>,
#[clap(short, long)]
timeout: Option<u32>,
#[clap(short, long)]
activity_timeout: Option<u64>,
}
async fn handle_signal(uds_path: PathBuf) {
tokio::spawn(async move {
let mut sigterm = signal(SignalKind::terminate()).expect("Could not set up SIGTERM hook");
let mut sigint = signal(SignalKind::interrupt()).expect("Could not set up SIGINT hook");
tokio::select! {
_ = sigterm.recv() => {
warn!("Received SIGTERM, cleaning up and exiting");
}
_ = sigint.recv() => {
warn!("Received SIGINT, cleaning up and exiting");
}
}
if let Err(e) = fs::remove_file(uds_path) {
error!("Could not close uds socket: {e}");
}
std::process::exit(0x0100);
});
}
#[tokio::main]
pub async fn serve(opts: Opts) -> NetavarkResult<()> {
let optional_run_dir = opts.dir.as_deref();
let dora_timeout = opts.timeout.unwrap_or(DEFAULT_TIMEOUT);
let inactivity_timeout =
Duration::from_secs(opts.activity_timeout.unwrap_or(DEFAULT_INACTIVITY_TIMEOUT));
let uds_path = get_proxy_sock_fqname(optional_run_dir);
debug!("socket path: {}", &uds_path.display());
let mut is_systemd_activated = false;
let uds: UnixListener = match env::var("LISTEN_FDS") {
Ok(effds) => {
if effds != "1" {
return Err(NetavarkError::msg("Received more than one FD from systemd"));
}
is_systemd_activated = true;
let systemd_socket = unsafe { stdUnixListener::from_raw_fd(3) };
systemd_socket.set_nonblocking(true)?;
UnixListener::from_std(systemd_socket)?
}
Err(..) => {
match Path::new(&uds_path).parent() {
None => {
return Err(NetavarkError::msg("Could not get parent from uds path"));
}
Some(f) => tokio::fs::create_dir_all(f).await?,
}
handle_signal(uds_path.clone()).await;
UnixListener::bind(&uds_path)?
}
};
let uds_stream = UnixListenerStream::new(uds);
let fq_cache_path = get_cache_fqname(optional_run_dir);
let file = match File::create(&fq_cache_path) {
Ok(file) => {
debug!("Successfully created leases file: {fq_cache_path:?}");
file
}
Err(e) => {
return Err(NetavarkError::msg(format!(
"Exiting. Could not create lease cache file: {e}",
)));
}
};
let cache = match LeaseCache::new(file) {
Ok(c) => Arc::new(Mutex::new(c)),
Err(e) => {
return Err(NetavarkError::msg(format!(
"Could not setup the cache: {e}"
)));
}
};
let (activity_timeout_tx, activity_timeout_rx) = if inactivity_timeout.as_secs() > 0 {
let (tx, rx) = mpsc::channel(5);
(Some(tx), Some(rx))
} else {
(None, None)
};
let netavark_proxy_service = NetavarkProxyService {
cache: cache.clone(),
dora_timeout,
timeout_sender: activity_timeout_tx
.clone()
.map(|tx| Arc::new(Mutex::new(tx))),
task_map: Arc::new(Mutex::new(HashMap::new())),
};
let server = Server::builder()
.add_service(NetavarkProxyServer::new(netavark_proxy_service))
.serve_with_incoming(uds_stream);
tokio::pin!(server);
tokio::select! {
_ = handle_wakeup(activity_timeout_rx, inactivity_timeout, cache.clone()), if inactivity_timeout.as_secs() > 0 => {},
_ = &mut server => {},
};
if !is_systemd_activated {
fs::remove_file(uds_path)?;
}
Ok(())
}
async fn handle_wakeup<W: Write + Clear>(
rx: Option<mpsc::Receiver<i32>>,
timeout_duration: Duration,
current_cache: Arc<Mutex<LeaseCache<W>>>,
) {
if let Some(mut rx) = rx {
loop {
match timeout(timeout_duration, rx.recv()).await {
Ok(Some(_)) => {
debug!("timeout timer reset")
}
Ok(None) => {
println!("timeout channel closed");
break;
}
Err(_) => {
if is_catch_empty(current_cache.clone()) {
println!(
"timeout met: exiting after {} secs of inactivity",
timeout_duration.as_secs()
);
break;
}
}
}
}
}
}
fn is_catch_empty<W: Write + Clear>(current_cache: Arc<Mutex<LeaseCache<W>>>) -> bool {
match current_cache.lock() {
Ok(v) => {
debug!("cache_len is {}", v.len());
v.is_empty()
}
Err(e) => {
log::error!("{e}");
false
}
}
}
async fn process_setup<W: Write + Clear>(
network_config: NetworkConfig,
timeout: u32,
cache: Arc<Mutex<LeaseCache<W>>>,
tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,
) -> Result<NetavarkLease, Status> {
let container_network_interface = network_config.container_iface.clone();
let ns_path = network_config.ns_path.clone();
core_utils::CoreUtils::decode_address_from_hex(&network_config.container_mac_addr)
.map_err(|e| Status::new(InvalidArgument, format!("{e}")))?;
let mac = &network_config.container_mac_addr.clone();
let nv_lease = match network_config.version {
0 => {
let mut service = DhcpV4Service::new(network_config, timeout)?;
let lease = service.get_lease().await?;
let task = tokio::spawn(process_client_stream(service));
tasks
.lock()
.expect("lock tasks")
.insert(mac.to_string(), task.abort_handle());
lease
}
1 => {
return Err(Status::new(InvalidArgument, "ipv6 not yet supported"));
}
_ => {
return Err(Status::new(InvalidArgument, "invalid protocol version"));
}
};
if let Err(e) = cache
.lock()
.expect("Could not unlock cache. A thread was poisoned")
.add_lease(mac, &nv_lease)
{
return Err(Status::new(
Internal,
format!("Error caching the lease: {e}"),
));
}
ip::setup(&nv_lease, &container_network_interface, &ns_path)?;
Ok(nv_lease)
}