mod core_map;
pub(crate) use self::core_map::*;
use crate::batch::Pipeline;
use crate::config::RuntimeConfig;
use crate::dpdk::{
self, CoreId, KniError, KniRx, Mempool, Port, PortBuilder, PortError, PortQueue,
};
use crate::{debug, ensure, info};
use anyhow::Result;
use futures::{future, stream, StreamExt};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_executor::current_thread;
use tokio_net::driver;
use tokio_net::signal::unix::{self, SignalKind};
use tokio_timer::{timer, Interval};
#[derive(Copy, Clone, Debug)]
pub enum UnixSignal {
SIGHUP = libc::SIGHUP as isize,
SIGINT = libc::SIGINT as isize,
SIGTERM = libc::SIGTERM as isize,
}
pub struct Runtime {
ports: ManuallyDrop<Vec<Port>>,
mempools: ManuallyDrop<Vec<Mempool>>,
core_map: CoreMap,
on_signal: Arc<dyn Fn(UnixSignal) -> bool>,
config: RuntimeConfig,
}
impl Runtime {
#[allow(clippy::cognitive_complexity)]
pub fn build(config: RuntimeConfig) -> Result<Self> {
info!("initializing EAL...");
dpdk::eal_init(config.to_eal_args())?;
#[cfg(feature = "metrics")]
{
info!("initializing metrics subsystem...");
crate::metrics::init()?;
}
let cores = config.all_cores();
info!("initializing mempools...");
let sockets = cores.iter().map(CoreId::socket_id).collect::<HashSet<_>>();
let mut mempools = vec![];
for socket in sockets {
let mempool = Mempool::new(config.mempool.capacity, config.mempool.cache_size, socket)?;
debug!(?mempool);
mempools.push(mempool);
}
info!("intializing cores...");
let core_map = CoreMapBuilder::new()
.app_name(&config.app_name)
.cores(&cores)
.master_core(config.master_core)
.mempools(&mut mempools)
.finish()?;
let len = config.num_knis();
if len > 0 {
info!("initializing KNI subsystem...");
dpdk::kni_init(len)?;
}
info!("initializing ports...");
let mut ports = vec![];
for conf in config.ports.iter() {
let port = PortBuilder::new(conf.name.clone(), conf.device.clone())?
.cores(&conf.cores)?
.mempools(&mut mempools)
.rx_tx_queue_capacity(conf.rxd, conf.txd)?
.finish(conf.promiscuous, conf.multicast, conf.kni)?;
debug!(?port);
ports.push(port);
}
#[cfg(feature = "metrics")]
{
crate::metrics::register_port_stats(&ports);
crate::metrics::register_mempool_stats(&mempools);
}
info!("runtime ready.");
Ok(Runtime {
ports: ManuallyDrop::new(ports),
mempools: ManuallyDrop::new(mempools),
core_map,
on_signal: Arc::new(|_| true),
config,
})
}
#[inline]
fn get_port(&self, name: &str) -> Result<&Port> {
self.ports
.iter()
.find(|p| p.name() == name)
.ok_or_else(|| PortError::NotFound(name.to_owned()).into())
}
#[inline]
fn get_port_mut(&mut self, name: &str) -> Result<&mut Port> {
self.ports
.iter_mut()
.find(|p| p.name() == name)
.ok_or_else(|| PortError::NotFound(name.to_owned()).into())
}
#[inline]
fn get_core(&self, core_id: CoreId) -> Result<&CoreExecutor> {
self.core_map
.cores
.get(&core_id)
.ok_or_else(|| CoreError::NotFound(core_id).into())
}
#[inline]
fn get_port_qs(&self, core_id: CoreId) -> Result<HashMap<String, PortQueue>> {
let map = self
.ports
.iter()
.filter_map(|p| {
p.queues()
.get(&core_id)
.map(|q| (p.name().to_owned(), q.clone()))
})
.collect::<HashMap<_, _>>();
ensure!(!map.is_empty(), CoreError::NotAssigned(core_id));
Ok(map)
}
pub fn set_on_signal<F>(&mut self, f: F) -> &mut Self
where
F: Fn(UnixSignal) -> bool + 'static,
{
self.on_signal = Arc::new(f);
self
}
pub fn add_pipeline_to_port<T: Pipeline + 'static, F>(
&mut self,
port: &str,
installer: F,
) -> Result<&mut Self>
where
F: Fn(PortQueue) -> T + Send + Sync + 'static,
{
let port = self.get_port(port)?;
let f = Arc::new(installer);
for (core_id, port_q) in port.queues() {
let f = f.clone();
let port_q = port_q.clone();
let thread = &self.get_core(*core_id)?.thread;
thread.spawn(future::lazy(move |_| {
let fut = f(port_q);
debug!("spawned pipeline {}.", fut.name());
current_thread::spawn(fut);
}))?;
debug!("installed pipeline on port_q for {:?}.", core_id);
}
info!("installed pipeline for port {}.", port.name());
Ok(self)
}
pub fn add_kni_rx_pipeline_to_port<T: Pipeline + 'static, F>(
&mut self,
port: &str,
installer: F,
) -> Result<&mut Self>
where
F: FnOnce(KniRx, PortQueue) -> T + Send + Sync + 'static,
{
let kni_rx = self
.get_port_mut(port)?
.kni()
.ok_or(KniError::Disabled)?
.take_rx()?;
let port = self.get_port(port)?;
let core_id = port.queues().keys().last().unwrap();
let port_q = port.queues()[core_id].clone();
let thread = &self.get_core(*core_id)?.thread;
thread.spawn(future::lazy(move |_| {
let fut = installer(kni_rx, port_q);
debug!("spawned kni rx pipeline {}.", fut.name());
current_thread::spawn(fut);
}))?;
info!("installed kni rx pipeline for port {}.", port.name());
Ok(self)
}
pub fn add_pipeline_to_core<T: Pipeline + 'static, F>(
&mut self,
core: usize,
installer: F,
) -> Result<&mut Self>
where
F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
{
let core_id = CoreId::new(core);
let thread = &self.get_core(core_id)?.thread;
let port_qs = self.get_port_qs(core_id)?;
thread.spawn(future::lazy(move |_| {
let fut = installer(port_qs);
debug!("spawned pipeline {}.", fut.name());
current_thread::spawn(fut);
}))?;
info!("installed pipeline for {:?}.", core_id);
Ok(self)
}
pub fn add_periodic_pipeline_to_core<T: Pipeline + 'static, F>(
&mut self,
core: usize,
installer: F,
dur: Duration,
) -> Result<&mut Self>
where
F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
{
let core_id = CoreId::new(core);
let thread = &self.get_core(core_id)?.thread;
let port_qs = self.get_port_qs(core_id)?;
thread.spawn(future::lazy(move |_| {
let mut pipeline = installer(port_qs);
debug!("spawned periodic pipeline {}.", pipeline.name());
let fut = Interval::new_interval(dur).for_each(move |_| {
pipeline.run_once();
future::ready(())
});
current_thread::spawn(fut);
}))?;
info!("installed periodic pipeline for {:?}.", core_id);
Ok(self)
}
pub fn add_periodic_task_to_core<F>(
&mut self,
core: usize,
task: F,
dur: Duration,
) -> Result<&mut Self>
where
F: Fn() + Send + Sync + 'static,
{
let core_id = CoreId::new(core);
let thread = &self.get_core(core_id)?.thread;
thread.spawn(future::lazy(move |_| {
let fut = Interval::new_interval(dur).for_each(move |_| {
task();
future::ready(())
});
debug!("spawned periodic task.");
current_thread::spawn(fut);
}))?;
info!("installed periodic task for {:?}.", core_id);
Ok(self)
}
fn wait_for_timeout(&mut self, timeout: Duration) {
let MasterExecutor {
ref timer,
ref mut thread,
..
} = self.core_map.master_core;
let when = Instant::now() + timeout;
let delay = timer.delay(when);
debug!("waiting for {:?}...", timeout);
let _timer = timer::set_default(&timer);
thread.block_on(delay);
info!("timed out after {:?}.", timeout);
}
fn wait_for_signal(&mut self) -> Result<()> {
let sighup = unix::signal(SignalKind::hangup())?.map(|_| UnixSignal::SIGHUP);
let sigint = unix::signal(SignalKind::interrupt())?.map(|_| UnixSignal::SIGINT);
let sigterm = unix::signal(SignalKind::terminate())?.map(|_| UnixSignal::SIGTERM);
let stream = stream::select(stream::select(sighup, sigint), sigterm);
let f = self.on_signal.clone();
let mut stream = stream.filter(|&signal| future::ready(f(signal)));
let MasterExecutor {
ref reactor,
ref timer,
ref mut thread,
..
} = self.core_map.master_core;
debug!("waiting for a Unix signal...");
let _guard = driver::set_default(&reactor);
let _timer = timer::set_default(&timer);
let _ = thread.block_on(stream.next());
info!("signaled to stop.");
Ok(())
}
fn add_kni_tx_pipelines(&mut self) -> Result<()> {
let mut map = HashMap::new();
for port in self.ports.iter_mut() {
let core_id = *port.queues().keys().next().unwrap();
if let Some(kni) = port.kni() {
map.insert(core_id, kni.take_tx()?);
}
}
for (core_id, kni_tx) in map.into_iter() {
let thread = &self.get_core(core_id)?.thread;
thread.spawn(kni_tx.into_pipeline())?;
info!("installed kni tx pipeline on {:?}.", core_id);
}
Ok(())
}
fn start_ports(&mut self) -> Result<()> {
for port in self.ports.iter_mut() {
port.start()?;
}
Ok(())
}
fn unpark_cores(&mut self) {
for core in self.core_map.cores.values() {
if let Some(unpark) = &core.unpark {
unpark.unpark();
}
}
}
#[allow(clippy::cognitive_complexity)]
fn shutdown_cores(&mut self) {
for (core_id, core) in &mut self.core_map.cores {
if let Some(trigger) = core.shutdown.take() {
debug!("shutting down {:?}.", core_id);
trigger.shutdown();
debug!("sent {:?} shutdown trigger.", core_id);
let handle = core.join.take().unwrap();
let _ = handle.join();
info!("terminated {:?}.", core_id);
}
}
}
fn stop_ports(&mut self) {
for port in self.ports.iter_mut() {
port.stop();
}
}
pub fn execute(&mut self) -> Result<()> {
self.add_kni_tx_pipelines()?;
self.start_ports()?;
self.unpark_cores();
match self.config.duration {
None => self.wait_for_signal()?,
Some(d) => self.wait_for_timeout(d),
};
self.shutdown_cores();
self.stop_ports();
info!("runtime terminated.");
Ok(())
}
}
impl<'a> fmt::Debug for Runtime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("runtime")
.field("runtime configuration", &format!("{:?}", self.config))
.finish()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.ports);
ManuallyDrop::drop(&mut self.mempools);
}
if self.config.num_knis() > 0 {
debug!("freeing KNI subsystem.");
dpdk::kni_close();
}
debug!("freeing EAL.");
dpdk::eal_cleanup().unwrap();
}
}