capsule/runtime/mod.rs
1/*
2* Copyright 2019 Comcast Cable Communications Management, LLC
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*
16* SPDX-License-Identifier: Apache-2.0
17*/
18
19mod core_map;
20
21pub(crate) use self::core_map::*;
22
23use crate::batch::Pipeline;
24use crate::config::RuntimeConfig;
25use crate::dpdk::{
26 self, CoreId, KniError, KniRx, Mempool, Port, PortBuilder, PortError, PortQueue,
27};
28use crate::{debug, ensure, info};
29use anyhow::Result;
30use futures::{future, stream, StreamExt};
31use std::collections::{HashMap, HashSet};
32use std::fmt;
33use std::mem::ManuallyDrop;
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio_executor::current_thread;
37use tokio_net::driver;
38use tokio_net::signal::unix::{self, SignalKind};
39use tokio_timer::{timer, Interval};
40
41/// Supported [Unix signals].
42///
43/// [Unix signals]: https://en.wikipedia.org/wiki/Signal_(IPC)#POSIX_signals
44#[derive(Copy, Clone, Debug)]
45pub enum UnixSignal {
46 /// This signal is sent to a process when its controlling terminal is closed.
47 /// In modern systems, this signal usually means that the controlling pseudo
48 /// or virtual terminal has been closed. Many daemons will reload their
49 /// configuration files and reopen their log files instead of exiting when
50 /// receiving this signal. `nohup` is a command to make a command ignore the
51 /// signal.
52 SIGHUP = libc::SIGHUP as isize,
53 /// This signal is sent to a process by its controlling terminal when a user
54 /// wishes to interrupt the process. This is typically initiated by pressing
55 /// `Ctrl-C`, but on some systems, the "delete" character or "break" key can
56 /// be used.
57 SIGINT = libc::SIGINT as isize,
58 /// This signal is sent to a process to request its termination. Unlike the
59 /// `SIGKILL` signal, it can be caught and interpreted or ignored by the
60 /// process. This allows the process to perform nice termination releasing
61 /// resources and saving state if appropriate. `SIGINT` is nearly identical
62 /// to `SIGTERM`.
63 SIGTERM = libc::SIGTERM as isize,
64}
65
66/// The Capsule runtime.
67///
68/// The runtime initializes the underlying DPDK environment, and it also manages
69/// the task scheduler that executes the packet processing pipelines.
70pub struct Runtime {
71 ports: ManuallyDrop<Vec<Port>>,
72 mempools: ManuallyDrop<Vec<Mempool>>,
73 core_map: CoreMap,
74 on_signal: Arc<dyn Fn(UnixSignal) -> bool>,
75 config: RuntimeConfig,
76}
77
78impl Runtime {
79 /// Builds a runtime from config settings.
80 #[allow(clippy::cognitive_complexity)]
81 pub fn build(config: RuntimeConfig) -> Result<Self> {
82 info!("initializing EAL...");
83 dpdk::eal_init(config.to_eal_args())?;
84
85 #[cfg(feature = "metrics")]
86 {
87 info!("initializing metrics subsystem...");
88 crate::metrics::init()?;
89 }
90
91 let cores = config.all_cores();
92
93 info!("initializing mempools...");
94 let sockets = cores.iter().map(CoreId::socket_id).collect::<HashSet<_>>();
95 let mut mempools = vec![];
96 for socket in sockets {
97 let mempool = Mempool::new(config.mempool.capacity, config.mempool.cache_size, socket)?;
98 debug!(?mempool);
99 mempools.push(mempool);
100 }
101
102 info!("intializing cores...");
103 let core_map = CoreMapBuilder::new()
104 .app_name(&config.app_name)
105 .cores(&cores)
106 .master_core(config.master_core)
107 .mempools(&mut mempools)
108 .finish()?;
109
110 let len = config.num_knis();
111 if len > 0 {
112 info!("initializing KNI subsystem...");
113 dpdk::kni_init(len)?;
114 }
115
116 info!("initializing ports...");
117 let mut ports = vec![];
118 for conf in config.ports.iter() {
119 let port = PortBuilder::new(conf.name.clone(), conf.device.clone())?
120 .cores(&conf.cores)?
121 .mempools(&mut mempools)
122 .rx_tx_queue_capacity(conf.rxd, conf.txd)?
123 .finish(conf.promiscuous, conf.multicast, conf.kni)?;
124
125 debug!(?port);
126 ports.push(port);
127 }
128
129 #[cfg(feature = "metrics")]
130 {
131 crate::metrics::register_port_stats(&ports);
132 crate::metrics::register_mempool_stats(&mempools);
133 }
134
135 info!("runtime ready.");
136
137 Ok(Runtime {
138 ports: ManuallyDrop::new(ports),
139 mempools: ManuallyDrop::new(mempools),
140 core_map,
141 on_signal: Arc::new(|_| true),
142 config,
143 })
144 }
145
146 #[inline]
147 fn get_port(&self, name: &str) -> Result<&Port> {
148 self.ports
149 .iter()
150 .find(|p| p.name() == name)
151 .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
152 }
153
154 #[inline]
155 fn get_port_mut(&mut self, name: &str) -> Result<&mut Port> {
156 self.ports
157 .iter_mut()
158 .find(|p| p.name() == name)
159 .ok_or_else(|| PortError::NotFound(name.to_owned()).into())
160 }
161
162 #[inline]
163 fn get_core(&self, core_id: CoreId) -> Result<&CoreExecutor> {
164 self.core_map
165 .cores
166 .get(&core_id)
167 .ok_or_else(|| CoreError::NotFound(core_id).into())
168 }
169
170 #[inline]
171 fn get_port_qs(&self, core_id: CoreId) -> Result<HashMap<String, PortQueue>> {
172 let map = self
173 .ports
174 .iter()
175 .filter_map(|p| {
176 p.queues()
177 .get(&core_id)
178 .map(|q| (p.name().to_owned(), q.clone()))
179 })
180 .collect::<HashMap<_, _>>();
181
182 ensure!(!map.is_empty(), CoreError::NotAssigned(core_id));
183
184 Ok(map)
185 }
186
187 /// Sets the Unix signal handler.
188 ///
189 /// `SIGHUP`, `SIGINT` and `SIGTERM` are the supported Unix signals.
190 /// The return of the handler determines whether to terminate the
191 /// process. `true` indicates the signal is received and the process
192 /// should be terminated. `false` indicates to discard the signal and
193 /// keep the process running.
194 ///
195 /// # Example
196 ///
197 /// ```
198 /// Runtime::build(&config)?;
199 /// .set_on_signal(|signal| match signal {
200 /// SIGHUP => {
201 /// reload_config();
202 /// false
203 /// }
204 /// _ => true,
205 /// })
206 /// .execute();
207 /// ```
208 pub fn set_on_signal<F>(&mut self, f: F) -> &mut Self
209 where
210 F: Fn(UnixSignal) -> bool + 'static,
211 {
212 self.on_signal = Arc::new(f);
213 self
214 }
215
216 /// Installs a pipeline to a port. The pipeline will run on all the
217 /// cores assigned to the port.
218 ///
219 /// `port` is the logical name that identifies the port. The `installer`
220 /// is a closure that takes in a [`PortQueue`] and returns a [`Pipeline`]
221 /// that will be spawned onto the thread executor.
222 ///
223 /// # Example
224 ///
225 /// ```
226 /// Runtime::build(config)?
227 /// .add_add_pipeline_to_port("eth1", install)?
228 /// .execute()
229 /// ```
230 ///
231 /// [`PortQueue`]: crate::PortQueue
232 /// [`Pipeline`]: crate::batch::Pipeline
233 pub fn add_pipeline_to_port<T: Pipeline + 'static, F>(
234 &mut self,
235 port: &str,
236 installer: F,
237 ) -> Result<&mut Self>
238 where
239 F: Fn(PortQueue) -> T + Send + Sync + 'static,
240 {
241 let port = self.get_port(port)?;
242 let f = Arc::new(installer);
243
244 for (core_id, port_q) in port.queues() {
245 let f = f.clone();
246 let port_q = port_q.clone();
247 let thread = &self.get_core(*core_id)?.thread;
248
249 // spawns the bootstrap. we want the bootstrapping to execute on the
250 // target core instead of the master core. that way the actual task
251 // is spawned locally and the type bounds are less restricting.
252 thread.spawn(future::lazy(move |_| {
253 let fut = f(port_q);
254 debug!("spawned pipeline {}.", fut.name());
255 current_thread::spawn(fut);
256 }))?;
257
258 debug!("installed pipeline on port_q for {:?}.", core_id);
259 }
260
261 info!("installed pipeline for port {}.", port.name());
262
263 Ok(self)
264 }
265
266 /// Installs a pipeline to a KNI enabled port to receive packets coming
267 /// from the kernel. This pipeline will run on a randomly select core
268 /// that's assigned to the port.
269 ///
270 /// # Remarks
271 ///
272 /// This function has be to invoked once per port. Otherwise the packets
273 /// coming from the kernel will be silently dropped. For the most common
274 /// use case where the application only needs simple packet forwarding,
275 /// use [`batch::splice`] to join the kernel's RX with the port's TX.
276 ///
277 /// # Example
278 ///
279 /// ```
280 /// Runtime::build(config)?
281 /// .add_add_pipeline_to_port("kni0", install)?
282 /// .add_kni_rx_pipeline_to_port("kni0", batch::splice)?
283 /// .execute()
284 /// ```
285 ///
286 /// [`batch::splice`]: crate::batch::splice
287 pub fn add_kni_rx_pipeline_to_port<T: Pipeline + 'static, F>(
288 &mut self,
289 port: &str,
290 installer: F,
291 ) -> Result<&mut Self>
292 where
293 F: FnOnce(KniRx, PortQueue) -> T + Send + Sync + 'static,
294 {
295 // takes ownership of the kni rx handle.
296 let kni_rx = self
297 .get_port_mut(port)?
298 .kni()
299 .ok_or(KniError::Disabled)?
300 .take_rx()?;
301
302 // selects a core to run a rx pipeline for this port. the selection is
303 // randomly choosing the last core we find. if the port has more than one
304 // core assigned, this will be different from the core that's running the
305 // tx pipeline.
306 let port = self.get_port(port)?;
307 let core_id = port.queues().keys().last().unwrap();
308 let port_q = port.queues()[core_id].clone();
309 let thread = &self.get_core(*core_id)?.thread;
310
311 // spawns the bootstrap. we want the bootstrapping to execute on the
312 // target core instead of the master core.
313 thread.spawn(future::lazy(move |_| {
314 let fut = installer(kni_rx, port_q);
315 debug!("spawned kni rx pipeline {}.", fut.name());
316 current_thread::spawn(fut);
317 }))?;
318
319 info!("installed kni rx pipeline for port {}.", port.name());
320
321 Ok(self)
322 }
323
324 /// Installs a pipeline to a core. All the ports the core is assigned
325 /// to will be available to the pipeline.
326 ///
327 /// `core` is the logical id that identifies the core. The `installer`
328 /// is a closure that takes in a hashmap of [`PortQueues`] and returns a
329 /// [`Pipeline`] that will be spawned onto the thread executor of the core.
330 ///
331 /// # Example
332 ///
333 /// ```
334 /// Runtime::build(config)?
335 /// .add_pipeline_to_core(1, install)?
336 /// .execute()
337 /// ```
338 ///
339 /// [`PortQueues`]: crate::PortQueue
340 /// [`Pipeline`]: crate::batch::Pipeline
341 pub fn add_pipeline_to_core<T: Pipeline + 'static, F>(
342 &mut self,
343 core: usize,
344 installer: F,
345 ) -> Result<&mut Self>
346 where
347 F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
348 {
349 let core_id = CoreId::new(core);
350 let thread = &self.get_core(core_id)?.thread;
351 let port_qs = self.get_port_qs(core_id)?;
352
353 // spawns the bootstrap. we want the bootstrapping to execute on the
354 // target core instead of the master core.
355 thread.spawn(future::lazy(move |_| {
356 let fut = installer(port_qs);
357 debug!("spawned pipeline {}.", fut.name());
358 current_thread::spawn(fut);
359 }))?;
360
361 info!("installed pipeline for {:?}.", core_id);
362
363 Ok(self)
364 }
365
366 /// Installs a periodic pipeline to a core.
367 ///
368 /// `core` is the logical id that identifies the core. The `installer` is a
369 /// closure that takes in a hashmap of [`PortQueues`] and returns a
370 /// [`Pipeline`] that will be run periodically every `dur` interval.
371 ///
372 /// # Remarks
373 ///
374 /// All the ports the core is assigned to will be available to this
375 /// pipeline. However they should only be used to transmit packets. This
376 /// variant is for pipelines that generate new packets periodically.
377 /// A new packet batch can be created with [`batch::poll_fn`] and ingested
378 /// into the pipeline.
379 ///
380 /// # Example
381 ///
382 /// ```
383 /// Runtime::build(config)?
384 /// .add_periodic_pipeline_to_core(1, install, Duration::from_millis(10))?
385 /// .execute()
386 /// ```
387 ///
388 /// [`PortQueues`]: crate::PortQueue
389 /// [`Pipeline`]: crate::batch::Pipeline
390 /// [`batch::poll_fn`]: crate::batch::poll_fn
391 pub fn add_periodic_pipeline_to_core<T: Pipeline + 'static, F>(
392 &mut self,
393 core: usize,
394 installer: F,
395 dur: Duration,
396 ) -> Result<&mut Self>
397 where
398 F: FnOnce(HashMap<String, PortQueue>) -> T + Send + Sync + 'static,
399 {
400 let core_id = CoreId::new(core);
401 let thread = &self.get_core(core_id)?.thread;
402 let port_qs = self.get_port_qs(core_id)?;
403
404 // spawns the bootstrap. we want the bootstrapping to execute on the
405 // target core instead of the master core so the periodic task is
406 // associated with the correct timer instance.
407 thread.spawn(future::lazy(move |_| {
408 let mut pipeline = installer(port_qs);
409 debug!("spawned periodic pipeline {}.", pipeline.name());
410 let fut = Interval::new_interval(dur).for_each(move |_| {
411 pipeline.run_once();
412 future::ready(())
413 });
414 current_thread::spawn(fut);
415 }))?;
416
417 info!("installed periodic pipeline for {:?}.", core_id);
418
419 Ok(self)
420 }
421
422 /// Installs a periodic task to a core.
423 ///
424 /// `core` is the logical id that identifies the core. `task` is the
425 /// closure to execute. The task will rerun every `dur` interval.
426 ///
427 /// # Example
428 ///
429 /// ```
430 /// Runtime::build(config)?
431 /// .add_periodic_task_to_core(0, print_stats, Duration::from_secs(1))?
432 /// .execute()
433 /// ```
434 pub fn add_periodic_task_to_core<F>(
435 &mut self,
436 core: usize,
437 task: F,
438 dur: Duration,
439 ) -> Result<&mut Self>
440 where
441 F: Fn() + Send + Sync + 'static,
442 {
443 let core_id = CoreId::new(core);
444 let thread = &self.get_core(core_id)?.thread;
445
446 // spawns the bootstrap. we want the bootstrapping to execute on the
447 // target core instead of the master core so the periodic task is
448 // associated with the correct timer instance.
449 thread.spawn(future::lazy(move |_| {
450 let fut = Interval::new_interval(dur).for_each(move |_| {
451 task();
452 future::ready(())
453 });
454 debug!("spawned periodic task.");
455 current_thread::spawn(fut);
456 }))?;
457
458 info!("installed periodic task for {:?}.", core_id);
459
460 Ok(self)
461 }
462
463 /// Blocks the main thread until a timeout expires.
464 ///
465 /// This mode is useful for running integration tests. The timeout
466 /// duration can be set in `RuntimeSettings`.
467 fn wait_for_timeout(&mut self, timeout: Duration) {
468 let MasterExecutor {
469 ref timer,
470 ref mut thread,
471 ..
472 } = self.core_map.master_core;
473
474 let when = Instant::now() + timeout;
475 let delay = timer.delay(when);
476
477 debug!("waiting for {:?}...", timeout);
478 let _timer = timer::set_default(&timer);
479 thread.block_on(delay);
480 info!("timed out after {:?}.", timeout);
481 }
482
483 /// Blocks the main thread until receives a signal to terminate.
484 fn wait_for_signal(&mut self) -> Result<()> {
485 let sighup = unix::signal(SignalKind::hangup())?.map(|_| UnixSignal::SIGHUP);
486 let sigint = unix::signal(SignalKind::interrupt())?.map(|_| UnixSignal::SIGINT);
487 let sigterm = unix::signal(SignalKind::terminate())?.map(|_| UnixSignal::SIGTERM);
488
489 // combines the streams together
490 let stream = stream::select(stream::select(sighup, sigint), sigterm);
491
492 // passes each signal through the `on_signal` closure, and discard
493 // any that shouldn't stop the execution.
494 let f = self.on_signal.clone();
495 let mut stream = stream.filter(|&signal| future::ready(f(signal)));
496
497 let MasterExecutor {
498 ref reactor,
499 ref timer,
500 ref mut thread,
501 ..
502 } = self.core_map.master_core;
503
504 // sets the reactor so we receive the signals and runs the future
505 // on the master core. the execution stops on the first signal that
506 // wasn't filtered out.
507 debug!("waiting for a Unix signal...");
508 let _guard = driver::set_default(&reactor);
509 let _timer = timer::set_default(&timer);
510 let _ = thread.block_on(stream.next());
511 info!("signaled to stop.");
512
513 Ok(())
514 }
515
516 /// Installs the KNI TX pipelines.
517 fn add_kni_tx_pipelines(&mut self) -> Result<()> {
518 let mut map = HashMap::new();
519 for port in self.ports.iter_mut() {
520 // selects a core if we need to run a tx pipeline for this port. the
521 // selection is randomly choosing the first core we find. if the port
522 // has more than one core assigned, this will be different from the
523 // core that's running the rx pipeline.
524 let core_id = *port.queues().keys().next().unwrap();
525
526 // if the port is kni enabled, then we will take ownership of the
527 // tx handle.
528 if let Some(kni) = port.kni() {
529 map.insert(core_id, kni.take_tx()?);
530 }
531 }
532
533 // spawns all the pipelines.
534 for (core_id, kni_tx) in map.into_iter() {
535 let thread = &self.get_core(core_id)?.thread;
536 thread.spawn(kni_tx.into_pipeline())?;
537
538 info!("installed kni tx pipeline on {:?}.", core_id);
539 }
540
541 Ok(())
542 }
543
544 /// Starts all the ports to receive packets.
545 fn start_ports(&mut self) -> Result<()> {
546 for port in self.ports.iter_mut() {
547 port.start()?;
548 }
549
550 Ok(())
551 }
552
553 /// Unparks all the cores to start task execution.
554 fn unpark_cores(&mut self) {
555 for core in self.core_map.cores.values() {
556 if let Some(unpark) = &core.unpark {
557 unpark.unpark();
558 }
559 }
560 }
561
562 /// Shuts down all the cores to stop task execution.
563 #[allow(clippy::cognitive_complexity)]
564 fn shutdown_cores(&mut self) {
565 for (core_id, core) in &mut self.core_map.cores {
566 if let Some(trigger) = core.shutdown.take() {
567 debug!("shutting down {:?}.", core_id);
568 trigger.shutdown();
569 debug!("sent {:?} shutdown trigger.", core_id);
570 let handle = core.join.take().unwrap();
571 let _ = handle.join();
572 info!("terminated {:?}.", core_id);
573 }
574 }
575 }
576
577 /// Stops all the ports.
578 fn stop_ports(&mut self) {
579 for port in self.ports.iter_mut() {
580 port.stop();
581 }
582 }
583
584 /// Executes the pipeline(s) until a stop signal is received.
585 pub fn execute(&mut self) -> Result<()> {
586 self.add_kni_tx_pipelines()?;
587 self.start_ports()?;
588 self.unpark_cores();
589
590 // runs the app until main loop finishes.
591 match self.config.duration {
592 None => self.wait_for_signal()?,
593 Some(d) => self.wait_for_timeout(d),
594 };
595
596 self.shutdown_cores();
597 self.stop_ports();
598 info!("runtime terminated.");
599
600 Ok(())
601 }
602}
603
604impl<'a> fmt::Debug for Runtime {
605 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606 f.debug_struct("runtime")
607 .field("runtime configuration", &format!("{:?}", self.config))
608 .finish()
609 }
610}
611
612impl Drop for Runtime {
613 fn drop(&mut self) {
614 // the default rust drop order is self before fields, which is the wrong
615 // order for what EAL needs. To control the order, we manually drop the
616 // fields first.
617 unsafe {
618 ManuallyDrop::drop(&mut self.ports);
619 ManuallyDrop::drop(&mut self.mempools);
620 }
621
622 if self.config.num_knis() > 0 {
623 debug!("freeing KNI subsystem.");
624 dpdk::kni_close();
625 }
626
627 debug!("freeing EAL.");
628 dpdk::eal_cleanup().unwrap();
629 }
630}