use crate::dpdk::{CoreId, Mempool, MempoolMap, MEMPOOL};
use crate::{debug, error, ffi, info};
use anyhow::Result;
use futures::Future;
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::thread::{self, JoinHandle};
use thiserror::Error;
use tokio::sync::oneshot;
use tokio_executor::current_thread::{self, CurrentThread};
use tokio_executor::park::ParkThread;
use tokio_net::driver::{self, Reactor};
use tokio_timer::timer::{self, Timer};
pub(crate) struct Park {
core_id: CoreId,
sender: SyncSender<()>,
receiver: Receiver<()>,
}
impl Park {
fn new(core_id: CoreId) -> Self {
let (sender, receiver) = mpsc::sync_channel(0);
Park {
core_id,
sender,
receiver,
}
}
fn unpark(&self) -> Unpark {
Unpark {
core_id: self.core_id,
sender: self.sender.clone(),
}
}
fn park(&self) {
if let Err(err) = self.receiver.recv() {
error!(message = "park failed.", core=?self.core_id, ?err);
}
}
}
pub(crate) struct Unpark {
core_id: CoreId,
sender: SyncSender<()>,
}
impl Unpark {
pub(crate) fn unpark(&self) {
if let Err(err) = self.sender.send(()) {
error!(message = "unpark failed.", core=?self.core_id, ?err);
}
}
}
pub(crate) struct Shutdown {
receiver: oneshot::Receiver<()>,
}
impl Shutdown {
fn new(core_id: CoreId) -> (Self, ShutdownTrigger) {
let (sender, receiver) = oneshot::channel();
let shutdown = Shutdown { receiver };
let trigger = ShutdownTrigger { core_id, sender };
(shutdown, trigger)
}
fn into_task(self) -> impl Future {
self.receiver
}
}
pub(crate) struct ShutdownTrigger {
core_id: CoreId,
sender: oneshot::Sender<()>,
}
impl ShutdownTrigger {
pub(crate) fn shutdown(self) {
if let Err(err) = self.sender.send(()) {
error!(message = "shutdown failed.", core=?self.core_id, ?err);
}
}
}
pub(crate) struct MasterExecutor {
pub(crate) reactor: driver::Handle,
pub(crate) timer: timer::Handle,
pub(crate) thread: CurrentThread<Timer<Reactor>>,
}
pub(crate) struct CoreExecutor {
pub(crate) timer: timer::Handle,
pub(crate) thread: current_thread::Handle,
pub(crate) unpark: Option<Unpark>,
pub(crate) shutdown: Option<ShutdownTrigger>,
pub(crate) join: Option<JoinHandle<()>>,
}
#[derive(Debug, Error)]
pub(crate) enum CoreError {
#[error("{0:?} is not found.")]
NotFound(CoreId),
#[error("{0:?} is not assigned to any ports.")]
NotAssigned(CoreId),
}
pub(crate) struct CoreMap {
pub(crate) master_core: MasterExecutor,
pub(crate) cores: HashMap<CoreId, CoreExecutor>,
}
struct SendablePtr(*mut ffi::rte_mempool);
unsafe impl std::marker::Send for SendablePtr {}
pub(crate) struct CoreMapBuilder<'a> {
app_name: String,
cores: HashSet<CoreId>,
master_core: CoreId,
mempools: MempoolMap<'a>,
}
impl<'a> CoreMapBuilder<'a> {
pub(crate) fn new() -> Self {
CoreMapBuilder {
app_name: String::new(),
cores: Default::default(),
master_core: CoreId::new(0),
mempools: Default::default(),
}
}
pub(crate) fn app_name(&mut self, app_name: &str) -> &mut Self {
self.app_name = app_name.to_owned();
self
}
pub(crate) fn cores(&mut self, cores: &[CoreId]) -> &mut Self {
self.cores = cores.iter().cloned().collect();
self
}
pub(crate) fn master_core(&mut self, master_core: CoreId) -> &mut Self {
self.master_core = master_core;
self
}
pub(crate) fn mempools(&'a mut self, mempools: &'a mut [Mempool]) -> &'a mut Self {
self.mempools = MempoolMap::new(mempools);
self
}
#[allow(clippy::cognitive_complexity)]
pub(crate) fn finish(&'a mut self) -> Result<CoreMap> {
let mut map = HashMap::new();
let socket_id = self.master_core.socket_id();
let mempool = self.mempools.get_raw(socket_id)?;
let (master_thread, core_executor) = init_master_core(self.master_core, mempool)?;
map.insert(self.master_core, core_executor);
info!("initialized master on {:?}.", self.master_core);
self.cores.remove(&self.master_core);
for &core_id in self.cores.iter() {
let socket_id = core_id.socket_id();
let mempool = self.mempools.get_raw(socket_id)?;
let ptr = SendablePtr(mempool);
let (sender, receiver) = mpsc::sync_channel(0);
let join = thread::Builder::new()
.name(format!("{}-{:?}", self.app_name, core_id))
.spawn(move || {
debug!("spawned background thread {:?}.", thread::current().id());
match init_background_core(core_id, ptr.0) {
Ok((mut thread, park, shutdown, executor)) => {
info!("initialized thread on {:?}.", core_id);
let timer_handle = executor.timer.clone();
sender.send(Ok(executor)).unwrap();
info!("parking {:?}.", core_id);
park.park();
info!("unparked {:?}.", core_id);
let _timer = timer::set_default(&timer_handle);
let _ = thread.block_on(shutdown.into_task());
info!("unblocked {:?}.", core_id);
}
Err(err) => sender.send(Err(err)).unwrap(),
}
})?;
let mut executor = receiver.recv().unwrap()?;
executor.join = Some(join);
map.insert(core_id, executor);
}
Ok(CoreMap {
master_core: master_thread,
cores: map,
})
}
}
fn init_master_core(
id: CoreId,
mempool: *mut ffi::rte_mempool,
) -> Result<(MasterExecutor, CoreExecutor)> {
id.set_thread_affinity()?;
MEMPOOL.with(|tls| tls.set(mempool));
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();
let timer = Timer::new(reactor);
let timer_handle = timer.handle();
let thread = CurrentThread::new_with_park(timer);
let thread_handle = thread.handle();
let main = MasterExecutor {
reactor: reactor_handle,
timer: timer_handle.clone(),
thread,
};
let executor = CoreExecutor {
timer: timer_handle,
thread: thread_handle,
unpark: None,
shutdown: None,
join: None,
};
Ok((main, executor))
}
fn init_background_core(
id: CoreId,
mempool: *mut ffi::rte_mempool,
) -> Result<(
CurrentThread<Timer<ParkThread>>,
Park,
Shutdown,
CoreExecutor,
)> {
id.set_thread_affinity()?;
MEMPOOL.with(|tls| tls.set(mempool));
let park = ParkThread::new();
let timer = Timer::new(park);
let timer_handle = timer.handle();
let thread = CurrentThread::new_with_park(timer);
let thread_handle = thread.handle();
let park = Park::new(id);
let (shutdown, trigger) = Shutdown::new(id);
let executor = CoreExecutor {
timer: timer_handle,
thread: thread_handle,
unpark: Some(park.unpark()),
shutdown: Some(trigger),
join: None,
};
Ok((thread, park, shutdown, executor))
}