capsule 0.1.5

A framework for network function development. Written in Rust, inspired by NetBricks and built on Intel's Data Plane Development Kit.
Documentation
/*
* Copyright 2019 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

mod core_map;

pub(crate) use self::core_map::*;

use crate::batch::Pipeline;
use crate::config::RuntimeConfig;
use crate::dpdk::{
    self, CoreId, KniError, KniRx, Mempool, Port, PortBuilder, PortError, PortQueue,
};
use crate::{debug, ensure, info};
use anyhow::Result;
use futures::{future, stream, StreamExt};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_executor::current_thread;
use tokio_net::driver;
use tokio_net::signal::unix::{self, SignalKind};
use tokio_timer::{timer, Interval};

/// Supported [Unix signals].
///
/// [Unix signals]: https://en.wikipedia.org/wiki/Signal_(IPC)#POSIX_signals
#[derive(Copy, Clone, Debug)]
pub enum UnixSignal {
    /// This signal is sent to a process when its controlling terminal is closed.
    /// In modern systems, this signal usually means that the controlling pseudo
    /// or virtual terminal has been closed. Many daemons will reload their
    /// configuration files and reopen their log files instead of exiting when
    /// receiving this signal. `nohup` is a command to make a command ignore the
    /// signal.
    SIGHUP = libc::SIGHUP as isize,
    /// This signal is sent to a process by its controlling terminal when a user
    /// wishes to interrupt the process. This is typically initiated by pressing
    /// `Ctrl-C`, but on some systems, the "delete" character or "break" key can
    /// be used.
    SIGINT = libc::SIGINT as isize,
    /// This signal is sent to a process to request its termination. Unlike the
    /// `SIGKILL` signal, it can be caught and interpreted or ignored by the
    /// process. This allows the process to perform nice termination releasing
    /// resources and saving state if appropriate. `SIGINT` is nearly identical
    /// to `SIGTERM`.
    SIGTERM = libc::SIGTERM as isize,
}

/// The Capsule runtime.
///
/// The runtime initializes the underlying DPDK environment, and it also manages
/// the task scheduler that executes the packet processing pipelines.
pub struct Runtime {
    ports: ManuallyDrop<Vec<Port>>,
    mempools: ManuallyDrop<Vec<Mempool>>,
    core_map: CoreMap,
    on_signal: Arc<dyn Fn(UnixSignal) -> bool>,
    config: RuntimeConfig,
}

impl Runtime {
    /// Builds a runtime from config settings.
    #[allow(clippy::cognitive_complexity)]
    pub fn build(config: RuntimeConfig) -> Result<Self> {
        info!("initializing EAL...");
        dpdk::eal_init(config.to_eal_args())?;

        #[cfg(feature = "metrics")]
        {
            info!("initializing metrics subsystem...");
            crate::metrics::init()?;
        }

        let cores = config.all_cores();

        info!("initializing mempools...");
        let sockets = cores.iter().map(CoreId::socket_id).collect::<HashSet<_>>();
        let mut mempools = vec![];
        for socket in sockets {
            let mempool = Mempool::new(config.mempool.capacity, config.mempool.cache_size, socket)?;
            debug!(?mempool);
            mempools.push(mempool);
        }

        info!("intializing cores...");
        let core_map = CoreMapBuilder::new()
            .app_name(&config.app_name)
            .cores(&cores)
            .master_core(config.master_core)
            .mempools(&mut mempools)
            .finish()?;

        let len = config.num_knis();
        if len > 0 {
            info!("initializing KNI subsystem...");
            dpdk::kni_init(len)?;
        }

        info!("initializing ports...");
        let mut ports = vec![];
        for conf in config.ports.iter() {
            let port = PortBuilder::new(conf.name.clone(), conf.device.clone())?
                .cores(&conf.cores)?
                .mempools(&mut mempools)
                .rx_tx_queue_capacity(conf.rxd, conf.txd)?
                .finish(conf.promiscuous, conf.multicast, conf.kni)?;

            debug!(?port);
            ports.push(port);
        }

        #[cfg(feature = "metrics")]
        {
            crate::metrics::register_port_stats(&ports);
            crate::metrics::register_mempool_stats(&mempools);
        }

        info!("runtime ready.");

        Ok(Runtime {
            ports: ManuallyDrop::new(ports),
            mempools: ManuallyDrop::new(mempools),
            core_map,
            on_signal: Arc::new(|_| true),
            config,
        })
    }

    #[inline]
    fn get_port(&self, name: &str) -> Result<&Port> {
        self.ports
            .iter()
            .find(|p| p.name() == name)
            .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
    }

    #[inline]
    fn get_port_mut(&mut self, name: &str) -> Result<&mut Port> {
        self.ports
            .iter_mut()
            .find(|p| p.name() == name)
            .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
    }

    #[inline]
    fn get_core(&self, core_id: CoreId) -> Result<&CoreExecutor> {
        self.core_map
            .cores
            .get(&core_id)
            .ok_or_else(|| CoreError::NotFound(core_id).into())
    }

    #[inline]
    fn get_port_qs(&self, core_id: CoreId) -> Result<HashMap<String, PortQueue>> {
        let map = self
            .ports
            .iter()
            .filter_map(|p| {
                p.queues()
                    .get(&core_id)
                    .map(|q| (p.name().to_owned(), q.clone()))
            })
            .collect::<HashMap<_, _>>();

        ensure!(!map.is_empty(), CoreError::NotAssigned(core_id));

        Ok(map)
    }

    /// Sets the Unix signal handler.
    ///
    /// `SIGHUP`, `SIGINT` and `SIGTERM` are the supported Unix signals.
    /// The return of the handler determines whether to terminate the
    /// process. `true` indicates the signal is received and the process
    /// should be terminated. `false` indicates to discard the signal and
    /// keep the process running.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(&config)?;
    ///     .set_on_signal(|signal| match signal {
    ///         SIGHUP => {
    ///             reload_config();
    ///             false
    ///         }
    ///         _ => true,
    ///     })
    ///     .execute();
    /// ```
    pub fn set_on_signal<F>(&mut self, f: F) -> &mut Self
    where
        F: Fn(UnixSignal) -> bool + 'static,
    {
        self.on_signal = Arc::new(f);
        self
    }

    /// Installs a pipeline to a port. The pipeline will run on all the
    /// cores assigned to the port.
    ///
    /// `port` is the logical name that identifies the port. The `installer`
    /// is a closure that takes in a [`PortQueue`] and returns a [`Pipeline`]
    /// that will be spawned onto the thread executor.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(config)?
    ///     .add_add_pipeline_to_port("eth1", install)?
    ///     .execute()
    /// ```
    ///
    /// [`PortQueue`]: crate::PortQueue
    /// [`Pipeline`]: crate::batch::Pipeline
    pub fn add_pipeline_to_port<T: Pipeline + 'static, F>(
        &mut self,
        port: &str,
        installer: F,
    ) -> Result<&mut Self>
    where
        F: Fn(PortQueue) -> T + Send + Sync + 'static,
    {
        let port = self.get_port(port)?;
        let f = Arc::new(installer);

        for (core_id, port_q) in port.queues() {
            let f = f.clone();
            let port_q = port_q.clone();
            let thread = &self.get_core(*core_id)?.thread;

            // spawns the bootstrap. we want the bootstrapping to execute on the
            // target core instead of the master core. that way the actual task
            // is spawned locally and the type bounds are less restricting.
            thread.spawn(future::lazy(move |_| {
                let fut = f(port_q);
                debug!("spawned pipeline {}.", fut.name());
                current_thread::spawn(fut);
            }))?;

            debug!("installed pipeline on port_q for {:?}.", core_id);
        }

        info!("installed pipeline for port {}.", port.name());

        Ok(self)
    }

    /// Installs a pipeline to a KNI enabled port to receive packets coming
    /// from the kernel. This pipeline will run on a randomly select core
    /// that's assigned to the port.
    ///
    /// # Remarks
    ///
    /// This function has be to invoked once per port. Otherwise the packets
    /// coming from the kernel will be silently dropped. For the most common
    /// use case where the application only needs simple packet forwarding,
    /// use [`batch::splice`] to join the kernel's RX with the port's TX.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(config)?
    ///     .add_add_pipeline_to_port("kni0", install)?
    ///     .add_kni_rx_pipeline_to_port("kni0", batch::splice)?
    ///     .execute()
    /// ```
    ///
    /// [`batch::splice`]: crate::batch::splice
    pub fn add_kni_rx_pipeline_to_port<T: Pipeline + 'static, F>(
        &mut self,
        port: &str,
        installer: F,
    ) -> Result<&mut Self>
    where
        F: FnOnce(KniRx, PortQueue) -> T + Send + Sync + 'static,
    {
        // takes ownership of the kni rx handle.
        let kni_rx = self
            .get_port_mut(port)?
            .kni()
            .ok_or(KniError::Disabled)?
            .take_rx()?;

        // selects a core to run a rx pipeline for this port. the selection is
        // randomly choosing the last core we find. if the port has more than one
        // core assigned, this will be different from the core that's running the
        // tx pipeline.
        let port = self.get_port(port)?;
        let core_id = port.queues().keys().last().unwrap();
        let port_q = port.queues()[core_id].clone();
        let thread = &self.get_core(*core_id)?.thread;

        // spawns the bootstrap. we want the bootstrapping to execute on the
        // target core instead of the master core.
        thread.spawn(future::lazy(move |_| {
            let fut = installer(kni_rx, port_q);
            debug!("spawned kni rx pipeline {}.", fut.name());
            current_thread::spawn(fut);
        }))?;

        info!("installed kni rx pipeline for port {}.", port.name());

        Ok(self)
    }

    /// Installs a pipeline to a core. All the ports the core is assigned
    /// to will be available to the pipeline.
    ///
    /// `core` is the logical id that identifies the core. The `installer`
    /// is a closure that takes in a hashmap of [`PortQueues`] and returns a
    /// [`Pipeline`] that will be spawned onto the thread executor of the core.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(config)?
    ///     .add_pipeline_to_core(1, install)?
    ///     .execute()
    /// ```
    ///
    /// [`PortQueues`]: crate::PortQueue
    /// [`Pipeline`]: crate::batch::Pipeline
    pub fn add_pipeline_to_core<T: Pipeline + 'static, F>(
        &mut self,
        core: usize,
        installer: F,
    ) -> Result<&mut Self>
    where
        F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
    {
        let core_id = CoreId::new(core);
        let thread = &self.get_core(core_id)?.thread;
        let port_qs = self.get_port_qs(core_id)?;

        // spawns the bootstrap. we want the bootstrapping to execute on the
        // target core instead of the master core.
        thread.spawn(future::lazy(move |_| {
            let fut = installer(port_qs);
            debug!("spawned pipeline {}.", fut.name());
            current_thread::spawn(fut);
        }))?;

        info!("installed pipeline for {:?}.", core_id);

        Ok(self)
    }

    /// Installs a periodic pipeline to a core.
    ///
    /// `core` is the logical id that identifies the core. The `installer` is a
    /// closure that takes in a hashmap of [`PortQueues`] and returns a
    /// [`Pipeline`] that will be run periodically every `dur` interval.
    ///
    /// # Remarks
    ///
    /// All the ports the core is assigned to will be available to this
    /// pipeline. However they should only be used to transmit packets. This
    /// variant is for pipelines that generate new packets periodically.
    /// A new packet batch can be created with [`batch::poll_fn`] and ingested
    /// into the pipeline.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(config)?
    ///     .add_periodic_pipeline_to_core(1, install, Duration::from_millis(10))?
    ///     .execute()
    /// ```
    ///
    /// [`PortQueues`]: crate::PortQueue
    /// [`Pipeline`]: crate::batch::Pipeline
    /// [`batch::poll_fn`]: crate::batch::poll_fn
    pub fn add_periodic_pipeline_to_core<T: Pipeline + 'static, F>(
        &mut self,
        core: usize,
        installer: F,
        dur: Duration,
    ) -> Result<&mut Self>
    where
        F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
    {
        let core_id = CoreId::new(core);
        let thread = &self.get_core(core_id)?.thread;
        let port_qs = self.get_port_qs(core_id)?;

        // spawns the bootstrap. we want the bootstrapping to execute on the
        // target core instead of the master core so the periodic task is
        // associated with the correct timer instance.
        thread.spawn(future::lazy(move |_| {
            let mut pipeline = installer(port_qs);
            debug!("spawned periodic pipeline {}.", pipeline.name());
            let fut = Interval::new_interval(dur).for_each(move |_| {
                pipeline.run_once();
                future::ready(())
            });
            current_thread::spawn(fut);
        }))?;

        info!("installed periodic pipeline for {:?}.", core_id);

        Ok(self)
    }

    /// Installs a periodic task to a core.
    ///
    /// `core` is the logical id that identifies the core. `task` is the
    /// closure to execute. The task will rerun every `dur` interval.
    ///
    /// # Example
    ///
    /// ```
    /// Runtime::build(config)?
    ///     .add_periodic_task_to_core(0, print_stats, Duration::from_secs(1))?
    ///     .execute()
    /// ```
    pub fn add_periodic_task_to_core<F>(
        &mut self,
        core: usize,
        task: F,
        dur: Duration,
    ) -> Result<&mut Self>
    where
        F: Fn() + Send + Sync + 'static,
    {
        let core_id = CoreId::new(core);
        let thread = &self.get_core(core_id)?.thread;

        // spawns the bootstrap. we want the bootstrapping to execute on the
        // target core instead of the master core so the periodic task is
        // associated with the correct timer instance.
        thread.spawn(future::lazy(move |_| {
            let fut = Interval::new_interval(dur).for_each(move |_| {
                task();
                future::ready(())
            });
            debug!("spawned periodic task.");
            current_thread::spawn(fut);
        }))?;

        info!("installed periodic task for {:?}.", core_id);

        Ok(self)
    }

    /// Blocks the main thread until a timeout expires.
    ///
    /// This mode is useful for running integration tests. The timeout
    /// duration can be set in `RuntimeSettings`.
    fn wait_for_timeout(&mut self, timeout: Duration) {
        let MasterExecutor {
            ref timer,
            ref mut thread,
            ..
        } = self.core_map.master_core;

        let when = Instant::now() + timeout;
        let delay = timer.delay(when);

        debug!("waiting for {:?}...", timeout);
        let _timer = timer::set_default(&timer);
        thread.block_on(delay);
        info!("timed out after {:?}.", timeout);
    }

    /// Blocks the main thread until receives a signal to terminate.
    fn wait_for_signal(&mut self) -> Result<()> {
        let sighup = unix::signal(SignalKind::hangup())?.map(|_| UnixSignal::SIGHUP);
        let sigint = unix::signal(SignalKind::interrupt())?.map(|_| UnixSignal::SIGINT);
        let sigterm = unix::signal(SignalKind::terminate())?.map(|_| UnixSignal::SIGTERM);

        // combines the streams together
        let stream = stream::select(stream::select(sighup, sigint), sigterm);

        // passes each signal through the `on_signal` closure, and discard
        // any that shouldn't stop the execution.
        let f = self.on_signal.clone();
        let mut stream = stream.filter(|&signal| future::ready(f(signal)));

        let MasterExecutor {
            ref reactor,
            ref timer,
            ref mut thread,
            ..
        } = self.core_map.master_core;

        // sets the reactor so we receive the signals and runs the future
        // on the master core. the execution stops on the first signal that
        // wasn't filtered out.
        debug!("waiting for a Unix signal...");
        let _guard = driver::set_default(&reactor);
        let _timer = timer::set_default(&timer);
        let _ = thread.block_on(stream.next());
        info!("signaled to stop.");

        Ok(())
    }

    /// Installs the KNI TX pipelines.
    fn add_kni_tx_pipelines(&mut self) -> Result<()> {
        let mut map = HashMap::new();
        for port in self.ports.iter_mut() {
            // selects a core if we need to run a tx pipeline for this port. the
            // selection is randomly choosing the first core we find. if the port
            // has more than one core assigned, this will be different from the
            // core that's running the rx pipeline.
            let core_id = *port.queues().keys().next().unwrap();

            // if the port is kni enabled, then we will take ownership of the
            // tx handle.
            if let Some(kni) = port.kni() {
                map.insert(core_id, kni.take_tx()?);
            }
        }

        // spawns all the pipelines.
        for (core_id, kni_tx) in map.into_iter() {
            let thread = &self.get_core(core_id)?.thread;
            thread.spawn(kni_tx.into_pipeline())?;

            info!("installed kni tx pipeline on {:?}.", core_id);
        }

        Ok(())
    }

    /// Starts all the ports to receive packets.
    fn start_ports(&mut self) -> Result<()> {
        for port in self.ports.iter_mut() {
            port.start()?;
        }

        Ok(())
    }

    /// Unparks all the cores to start task execution.
    fn unpark_cores(&mut self) {
        for core in self.core_map.cores.values() {
            if let Some(unpark) = &core.unpark {
                unpark.unpark();
            }
        }
    }

    /// Shuts down all the cores to stop task execution.
    #[allow(clippy::cognitive_complexity)]
    fn shutdown_cores(&mut self) {
        for (core_id, core) in &mut self.core_map.cores {
            if let Some(trigger) = core.shutdown.take() {
                debug!("shutting down {:?}.", core_id);
                trigger.shutdown();
                debug!("sent {:?} shutdown trigger.", core_id);
                let handle = core.join.take().unwrap();
                let _ = handle.join();
                info!("terminated {:?}.", core_id);
            }
        }
    }

    /// Stops all the ports.
    fn stop_ports(&mut self) {
        for port in self.ports.iter_mut() {
            port.stop();
        }
    }

    /// Executes the pipeline(s) until a stop signal is received.
    pub fn execute(&mut self) -> Result<()> {
        self.add_kni_tx_pipelines()?;
        self.start_ports()?;
        self.unpark_cores();

        // runs the app until main loop finishes.
        match self.config.duration {
            None => self.wait_for_signal()?,
            Some(d) => self.wait_for_timeout(d),
        };

        self.shutdown_cores();
        self.stop_ports();
        info!("runtime terminated.");

        Ok(())
    }
}

impl<'a> fmt::Debug for Runtime {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("runtime")
            .field("runtime configuration", &format!("{:?}", self.config))
            .finish()
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        // the default rust drop order is self before fields, which is the wrong
        // order for what EAL needs. To control the order, we manually drop the
        // fields first.
        unsafe {
            ManuallyDrop::drop(&mut self.ports);
            ManuallyDrop::drop(&mut self.mempools);
        }

        if self.config.num_knis() > 0 {
            debug!("freeing KNI subsystem.");
            dpdk::kni_close();
        }

        debug!("freeing EAL.");
        dpdk::eal_cleanup().unwrap();
    }
}