capsule/runtime/
mod.rs

1/*
2* Copyright 2019 Comcast Cable Communications Management, LLC
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*
16* SPDX-License-Identifier: Apache-2.0
17*/
18
19mod core_map;
20
21pub(crate) use self::core_map::*;
22
23use crate::batch::Pipeline;
24use crate::config::RuntimeConfig;
25use crate::dpdk::{
26    self, CoreId, KniError, KniRx, Mempool, Port, PortBuilder, PortError, PortQueue,
27};
28use crate::{debug, ensure, info};
29use anyhow::Result;
30use futures::{future, stream, StreamExt};
31use std::collections::{HashMap, HashSet};
32use std::fmt;
33use std::mem::ManuallyDrop;
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio_executor::current_thread;
37use tokio_net::driver;
38use tokio_net::signal::unix::{self, SignalKind};
39use tokio_timer::{timer, Interval};
40
41/// Supported [Unix signals].
42///
43/// [Unix signals]: https://en.wikipedia.org/wiki/Signal_(IPC)#POSIX_signals
44#[derive(Copy, Clone, Debug)]
45pub enum UnixSignal {
46    /// This signal is sent to a process when its controlling terminal is closed.
47    /// In modern systems, this signal usually means that the controlling pseudo
48    /// or virtual terminal has been closed. Many daemons will reload their
49    /// configuration files and reopen their log files instead of exiting when
50    /// receiving this signal. `nohup` is a command to make a command ignore the
51    /// signal.
52    SIGHUP = libc::SIGHUP as isize,
53    /// This signal is sent to a process by its controlling terminal when a user
54    /// wishes to interrupt the process. This is typically initiated by pressing
55    /// `Ctrl-C`, but on some systems, the "delete" character or "break" key can
56    /// be used.
57    SIGINT = libc::SIGINT as isize,
58    /// This signal is sent to a process to request its termination. Unlike the
59    /// `SIGKILL` signal, it can be caught and interpreted or ignored by the
60    /// process. This allows the process to perform nice termination releasing
61    /// resources and saving state if appropriate. `SIGINT` is nearly identical
62    /// to `SIGTERM`.
63    SIGTERM = libc::SIGTERM as isize,
64}
65
66/// The Capsule runtime.
67///
68/// The runtime initializes the underlying DPDK environment, and it also manages
69/// the task scheduler that executes the packet processing pipelines.
70pub struct Runtime {
71    ports: ManuallyDrop<Vec<Port>>,
72    mempools: ManuallyDrop<Vec<Mempool>>,
73    core_map: CoreMap,
74    on_signal: Arc<dyn Fn(UnixSignal) -> bool>,
75    config: RuntimeConfig,
76}
77
78impl Runtime {
79    /// Builds a runtime from config settings.
80    #[allow(clippy::cognitive_complexity)]
81    pub fn build(config: RuntimeConfig) -> Result<Self> {
82        info!("initializing EAL...");
83        dpdk::eal_init(config.to_eal_args())?;
84
85        #[cfg(feature = "metrics")]
86        {
87            info!("initializing metrics subsystem...");
88            crate::metrics::init()?;
89        }
90
91        let cores = config.all_cores();
92
93        info!("initializing mempools...");
94        let sockets = cores.iter().map(CoreId::socket_id).collect::<HashSet<_>>();
95        let mut mempools = vec![];
96        for socket in sockets {
97            let mempool = Mempool::new(config.mempool.capacity, config.mempool.cache_size, socket)?;
98            debug!(?mempool);
99            mempools.push(mempool);
100        }
101
102        info!("intializing cores...");
103        let core_map = CoreMapBuilder::new()
104            .app_name(&config.app_name)
105            .cores(&cores)
106            .master_core(config.master_core)
107            .mempools(&mut mempools)
108            .finish()?;
109
110        let len = config.num_knis();
111        if len > 0 {
112            info!("initializing KNI subsystem...");
113            dpdk::kni_init(len)?;
114        }
115
116        info!("initializing ports...");
117        let mut ports = vec![];
118        for conf in config.ports.iter() {
119            let port = PortBuilder::new(conf.name.clone(), conf.device.clone())?
120                .cores(&conf.cores)?
121                .mempools(&mut mempools)
122                .rx_tx_queue_capacity(conf.rxd, conf.txd)?
123                .finish(conf.promiscuous, conf.multicast, conf.kni)?;
124
125            debug!(?port);
126            ports.push(port);
127        }
128
129        #[cfg(feature = "metrics")]
130        {
131            crate::metrics::register_port_stats(&ports);
132            crate::metrics::register_mempool_stats(&mempools);
133        }
134
135        info!("runtime ready.");
136
137        Ok(Runtime {
138            ports: ManuallyDrop::new(ports),
139            mempools: ManuallyDrop::new(mempools),
140            core_map,
141            on_signal: Arc::new(|_| true),
142            config,
143        })
144    }
145
146    #[inline]
147    fn get_port(&self, name: &str) -> Result<&Port> {
148        self.ports
149            .iter()
150            .find(|p| p.name() == name)
151            .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
152    }
153
154    #[inline]
155    fn get_port_mut(&mut self, name: &str) -> Result<&mut Port> {
156        self.ports
157            .iter_mut()
158            .find(|p| p.name() == name)
159            .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
160    }
161
162    #[inline]
163    fn get_core(&self, core_id: CoreId) -> Result<&CoreExecutor> {
164        self.core_map
165            .cores
166            .get(&core_id)
167            .ok_or_else(|| CoreError::NotFound(core_id).into())
168    }
169
170    #[inline]
171    fn get_port_qs(&self, core_id: CoreId) -> Result<HashMap<String, PortQueue>> {
172        let map = self
173            .ports
174            .iter()
175            .filter_map(|p| {
176                p.queues()
177                    .get(&core_id)
178                    .map(|q| (p.name().to_owned(), q.clone()))
179            })
180            .collect::<HashMap<_, _>>();
181
182        ensure!(!map.is_empty(), CoreError::NotAssigned(core_id));
183
184        Ok(map)
185    }
186
187    /// Sets the Unix signal handler.
188    ///
189    /// `SIGHUP`, `SIGINT` and `SIGTERM` are the supported Unix signals.
190    /// The return of the handler determines whether to terminate the
191    /// process. `true` indicates the signal is received and the process
192    /// should be terminated. `false` indicates to discard the signal and
193    /// keep the process running.
194    ///
195    /// # Example
196    ///
197    /// ```
198    /// Runtime::build(&config)?;
199    ///     .set_on_signal(|signal| match signal {
200    ///         SIGHUP => {
201    ///             reload_config();
202    ///             false
203    ///         }
204    ///         _ => true,
205    ///     })
206    ///     .execute();
207    /// ```
208    pub fn set_on_signal<F>(&mut self, f: F) -> &mut Self
209    where
210        F: Fn(UnixSignal) -> bool + 'static,
211    {
212        self.on_signal = Arc::new(f);
213        self
214    }
215
216    /// Installs a pipeline to a port. The pipeline will run on all the
217    /// cores assigned to the port.
218    ///
219    /// `port` is the logical name that identifies the port. The `installer`
220    /// is a closure that takes in a [`PortQueue`] and returns a [`Pipeline`]
221    /// that will be spawned onto the thread executor.
222    ///
223    /// # Example
224    ///
225    /// ```
226    /// Runtime::build(config)?
227    ///     .add_add_pipeline_to_port("eth1", install)?
228    ///     .execute()
229    /// ```
230    ///
231    /// [`PortQueue`]: crate::PortQueue
232    /// [`Pipeline`]: crate::batch::Pipeline
233    pub fn add_pipeline_to_port<T: Pipeline + 'static, F>(
234        &mut self,
235        port: &str,
236        installer: F,
237    ) -> Result<&mut Self>
238    where
239        F: Fn(PortQueue) -> T + Send + Sync + 'static,
240    {
241        let port = self.get_port(port)?;
242        let f = Arc::new(installer);
243
244        for (core_id, port_q) in port.queues() {
245            let f = f.clone();
246            let port_q = port_q.clone();
247            let thread = &self.get_core(*core_id)?.thread;
248
249            // spawns the bootstrap. we want the bootstrapping to execute on the
250            // target core instead of the master core. that way the actual task
251            // is spawned locally and the type bounds are less restricting.
252            thread.spawn(future::lazy(move |_| {
253                let fut = f(port_q);
254                debug!("spawned pipeline {}.", fut.name());
255                current_thread::spawn(fut);
256            }))?;
257
258            debug!("installed pipeline on port_q for {:?}.", core_id);
259        }
260
261        info!("installed pipeline for port {}.", port.name());
262
263        Ok(self)
264    }
265
266    /// Installs a pipeline to a KNI enabled port to receive packets coming
267    /// from the kernel. This pipeline will run on a randomly select core
268    /// that's assigned to the port.
269    ///
270    /// # Remarks
271    ///
272    /// This function has be to invoked once per port. Otherwise the packets
273    /// coming from the kernel will be silently dropped. For the most common
274    /// use case where the application only needs simple packet forwarding,
275    /// use [`batch::splice`] to join the kernel's RX with the port's TX.
276    ///
277    /// # Example
278    ///
279    /// ```
280    /// Runtime::build(config)?
281    ///     .add_add_pipeline_to_port("kni0", install)?
282    ///     .add_kni_rx_pipeline_to_port("kni0", batch::splice)?
283    ///     .execute()
284    /// ```
285    ///
286    /// [`batch::splice`]: crate::batch::splice
287    pub fn add_kni_rx_pipeline_to_port<T: Pipeline + 'static, F>(
288        &mut self,
289        port: &str,
290        installer: F,
291    ) -> Result<&mut Self>
292    where
293        F: FnOnce(KniRx, PortQueue) -> T + Send + Sync + 'static,
294    {
295        // takes ownership of the kni rx handle.
296        let kni_rx = self
297            .get_port_mut(port)?
298            .kni()
299            .ok_or(KniError::Disabled)?
300            .take_rx()?;
301
302        // selects a core to run a rx pipeline for this port. the selection is
303        // randomly choosing the last core we find. if the port has more than one
304        // core assigned, this will be different from the core that's running the
305        // tx pipeline.
306        let port = self.get_port(port)?;
307        let core_id = port.queues().keys().last().unwrap();
308        let port_q = port.queues()[core_id].clone();
309        let thread = &self.get_core(*core_id)?.thread;
310
311        // spawns the bootstrap. we want the bootstrapping to execute on the
312        // target core instead of the master core.
313        thread.spawn(future::lazy(move |_| {
314            let fut = installer(kni_rx, port_q);
315            debug!("spawned kni rx pipeline {}.", fut.name());
316            current_thread::spawn(fut);
317        }))?;
318
319        info!("installed kni rx pipeline for port {}.", port.name());
320
321        Ok(self)
322    }
323
324    /// Installs a pipeline to a core. All the ports the core is assigned
325    /// to will be available to the pipeline.
326    ///
327    /// `core` is the logical id that identifies the core. The `installer`
328    /// is a closure that takes in a hashmap of [`PortQueues`] and returns a
329    /// [`Pipeline`] that will be spawned onto the thread executor of the core.
330    ///
331    /// # Example
332    ///
333    /// ```
334    /// Runtime::build(config)?
335    ///     .add_pipeline_to_core(1, install)?
336    ///     .execute()
337    /// ```
338    ///
339    /// [`PortQueues`]: crate::PortQueue
340    /// [`Pipeline`]: crate::batch::Pipeline
341    pub fn add_pipeline_to_core<T: Pipeline + 'static, F>(
342        &mut self,
343        core: usize,
344        installer: F,
345    ) -> Result<&mut Self>
346    where
347        F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
348    {
349        let core_id = CoreId::new(core);
350        let thread = &self.get_core(core_id)?.thread;
351        let port_qs = self.get_port_qs(core_id)?;
352
353        // spawns the bootstrap. we want the bootstrapping to execute on the
354        // target core instead of the master core.
355        thread.spawn(future::lazy(move |_| {
356            let fut = installer(port_qs);
357            debug!("spawned pipeline {}.", fut.name());
358            current_thread::spawn(fut);
359        }))?;
360
361        info!("installed pipeline for {:?}.", core_id);
362
363        Ok(self)
364    }
365
366    /// Installs a periodic pipeline to a core.
367    ///
368    /// `core` is the logical id that identifies the core. The `installer` is a
369    /// closure that takes in a hashmap of [`PortQueues`] and returns a
370    /// [`Pipeline`] that will be run periodically every `dur` interval.
371    ///
372    /// # Remarks
373    ///
374    /// All the ports the core is assigned to will be available to this
375    /// pipeline. However they should only be used to transmit packets. This
376    /// variant is for pipelines that generate new packets periodically.
377    /// A new packet batch can be created with [`batch::poll_fn`] and ingested
378    /// into the pipeline.
379    ///
380    /// # Example
381    ///
382    /// ```
383    /// Runtime::build(config)?
384    ///     .add_periodic_pipeline_to_core(1, install, Duration::from_millis(10))?
385    ///     .execute()
386    /// ```
387    ///
388    /// [`PortQueues`]: crate::PortQueue
389    /// [`Pipeline`]: crate::batch::Pipeline
390    /// [`batch::poll_fn`]: crate::batch::poll_fn
391    pub fn add_periodic_pipeline_to_core<T: Pipeline + 'static, F>(
392        &mut self,
393        core: usize,
394        installer: F,
395        dur: Duration,
396    ) -> Result<&mut Self>
397    where
398        F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
399    {
400        let core_id = CoreId::new(core);
401        let thread = &self.get_core(core_id)?.thread;
402        let port_qs = self.get_port_qs(core_id)?;
403
404        // spawns the bootstrap. we want the bootstrapping to execute on the
405        // target core instead of the master core so the periodic task is
406        // associated with the correct timer instance.
407        thread.spawn(future::lazy(move |_| {
408            let mut pipeline = installer(port_qs);
409            debug!("spawned periodic pipeline {}.", pipeline.name());
410            let fut = Interval::new_interval(dur).for_each(move |_| {
411                pipeline.run_once();
412                future::ready(())
413            });
414            current_thread::spawn(fut);
415        }))?;
416
417        info!("installed periodic pipeline for {:?}.", core_id);
418
419        Ok(self)
420    }
421
422    /// Installs a periodic task to a core.
423    ///
424    /// `core` is the logical id that identifies the core. `task` is the
425    /// closure to execute. The task will rerun every `dur` interval.
426    ///
427    /// # Example
428    ///
429    /// ```
430    /// Runtime::build(config)?
431    ///     .add_periodic_task_to_core(0, print_stats, Duration::from_secs(1))?
432    ///     .execute()
433    /// ```
434    pub fn add_periodic_task_to_core<F>(
435        &mut self,
436        core: usize,
437        task: F,
438        dur: Duration,
439    ) -> Result<&mut Self>
440    where
441        F: Fn() + Send + Sync + 'static,
442    {
443        let core_id = CoreId::new(core);
444        let thread = &self.get_core(core_id)?.thread;
445
446        // spawns the bootstrap. we want the bootstrapping to execute on the
447        // target core instead of the master core so the periodic task is
448        // associated with the correct timer instance.
449        thread.spawn(future::lazy(move |_| {
450            let fut = Interval::new_interval(dur).for_each(move |_| {
451                task();
452                future::ready(())
453            });
454            debug!("spawned periodic task.");
455            current_thread::spawn(fut);
456        }))?;
457
458        info!("installed periodic task for {:?}.", core_id);
459
460        Ok(self)
461    }
462
463    /// Blocks the main thread until a timeout expires.
464    ///
465    /// This mode is useful for running integration tests. The timeout
466    /// duration can be set in `RuntimeSettings`.
467    fn wait_for_timeout(&mut self, timeout: Duration) {
468        let MasterExecutor {
469            ref timer,
470            ref mut thread,
471            ..
472        } = self.core_map.master_core;
473
474        let when = Instant::now() + timeout;
475        let delay = timer.delay(when);
476
477        debug!("waiting for {:?}...", timeout);
478        let _timer = timer::set_default(&timer);
479        thread.block_on(delay);
480        info!("timed out after {:?}.", timeout);
481    }
482
483    /// Blocks the main thread until receives a signal to terminate.
484    fn wait_for_signal(&mut self) -> Result<()> {
485        let sighup = unix::signal(SignalKind::hangup())?.map(|_| UnixSignal::SIGHUP);
486        let sigint = unix::signal(SignalKind::interrupt())?.map(|_| UnixSignal::SIGINT);
487        let sigterm = unix::signal(SignalKind::terminate())?.map(|_| UnixSignal::SIGTERM);
488
489        // combines the streams together
490        let stream = stream::select(stream::select(sighup, sigint), sigterm);
491
492        // passes each signal through the `on_signal` closure, and discard
493        // any that shouldn't stop the execution.
494        let f = self.on_signal.clone();
495        let mut stream = stream.filter(|&signal| future::ready(f(signal)));
496
497        let MasterExecutor {
498            ref reactor,
499            ref timer,
500            ref mut thread,
501            ..
502        } = self.core_map.master_core;
503
504        // sets the reactor so we receive the signals and runs the future
505        // on the master core. the execution stops on the first signal that
506        // wasn't filtered out.
507        debug!("waiting for a Unix signal...");
508        let _guard = driver::set_default(&reactor);
509        let _timer = timer::set_default(&timer);
510        let _ = thread.block_on(stream.next());
511        info!("signaled to stop.");
512
513        Ok(())
514    }
515
516    /// Installs the KNI TX pipelines.
517    fn add_kni_tx_pipelines(&mut self) -> Result<()> {
518        let mut map = HashMap::new();
519        for port in self.ports.iter_mut() {
520            // selects a core if we need to run a tx pipeline for this port. the
521            // selection is randomly choosing the first core we find. if the port
522            // has more than one core assigned, this will be different from the
523            // core that's running the rx pipeline.
524            let core_id = *port.queues().keys().next().unwrap();
525
526            // if the port is kni enabled, then we will take ownership of the
527            // tx handle.
528            if let Some(kni) = port.kni() {
529                map.insert(core_id, kni.take_tx()?);
530            }
531        }
532
533        // spawns all the pipelines.
534        for (core_id, kni_tx) in map.into_iter() {
535            let thread = &self.get_core(core_id)?.thread;
536            thread.spawn(kni_tx.into_pipeline())?;
537
538            info!("installed kni tx pipeline on {:?}.", core_id);
539        }
540
541        Ok(())
542    }
543
544    /// Starts all the ports to receive packets.
545    fn start_ports(&mut self) -> Result<()> {
546        for port in self.ports.iter_mut() {
547            port.start()?;
548        }
549
550        Ok(())
551    }
552
553    /// Unparks all the cores to start task execution.
554    fn unpark_cores(&mut self) {
555        for core in self.core_map.cores.values() {
556            if let Some(unpark) = &core.unpark {
557                unpark.unpark();
558            }
559        }
560    }
561
562    /// Shuts down all the cores to stop task execution.
563    #[allow(clippy::cognitive_complexity)]
564    fn shutdown_cores(&mut self) {
565        for (core_id, core) in &mut self.core_map.cores {
566            if let Some(trigger) = core.shutdown.take() {
567                debug!("shutting down {:?}.", core_id);
568                trigger.shutdown();
569                debug!("sent {:?} shutdown trigger.", core_id);
570                let handle = core.join.take().unwrap();
571                let _ = handle.join();
572                info!("terminated {:?}.", core_id);
573            }
574        }
575    }
576
577    /// Stops all the ports.
578    fn stop_ports(&mut self) {
579        for port in self.ports.iter_mut() {
580            port.stop();
581        }
582    }
583
584    /// Executes the pipeline(s) until a stop signal is received.
585    pub fn execute(&mut self) -> Result<()> {
586        self.add_kni_tx_pipelines()?;
587        self.start_ports()?;
588        self.unpark_cores();
589
590        // runs the app until main loop finishes.
591        match self.config.duration {
592            None => self.wait_for_signal()?,
593            Some(d) => self.wait_for_timeout(d),
594        };
595
596        self.shutdown_cores();
597        self.stop_ports();
598        info!("runtime terminated.");
599
600        Ok(())
601    }
602}
603
604impl<'a> fmt::Debug for Runtime {
605    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606        f.debug_struct("runtime")
607            .field("runtime configuration", &format!("{:?}", self.config))
608            .finish()
609    }
610}
611
612impl Drop for Runtime {
613    fn drop(&mut self) {
614        // the default rust drop order is self before fields, which is the wrong
615        // order for what EAL needs. To control the order, we manually drop the
616        // fields first.
617        unsafe {
618            ManuallyDrop::drop(&mut self.ports);
619            ManuallyDrop::drop(&mut self.mempools);
620        }
621
622        if self.config.num_knis() > 0 {
623            debug!("freeing KNI subsystem.");
624            dpdk::kni_close();
625        }
626
627        debug!("freeing EAL.");
628        dpdk::eal_cleanup().unwrap();
629    }
630}