erdos/lib.rs
1#![doc(html_logo_url = "https://avatars2.githubusercontent.com/u/44586405")]
2//! ERDOS is a platform for developing self-driving car and robotics
3//! applications. The system is built using techniques from streaming dataflow
4//! systems which is reflected by the API.
5//!
6//! Applications are modeled as directed graphs, in which data flows through
7//! [streams](crate::dataflow::stream) and is processed by
8//! [operators](crate::dataflow::operator).
9//! Because applications often resemble a sequence of connected operators,
10//! an ERDOS application may also be referred to as a *pipeline*.
11//!
12//! ## Example
13//! This example shows an ERDOS application which counts the number of objects
14//! detected from a stream of camera frames.
15//! The example consists of the [driver](#driver) part of the program, which
16//! is responsible for connecting operators via streams.
17//! For information on building operators, see [§ Operators](#operators).
18//!
19//! ```ignore
20//! // Capture arguments to set up an ERDOS node.
21//! let args = erdos::new_app("ObjectCounter");
22//! // Create an ERDOS node which runs the application.
23//! let mut node = Node::new(Configuration::from_args(&args));
24//!
25//! // Stream of RGB images from a camera.
26//! let camera_frames = erdos::connect_source(
27//! CameraOperator::new,
28//! OperatorConfig::new().name("Camera")
29//! );
30//! // Stream of labeled bounding boxes for each RGB image.
31//! let detected_objects = erdos::connect_one_in_one_out(
32//! ObjectDetector::new,
33//! || {},
34//! OperatorConfig::new().name("Detector"),
35//! &camera_frames
36//! );
37//! // Stream of detected object count for each RGB image.
38//! let num_detected = erdos::connect_one_in_one_out(
39//! || { MapOperator::new(|bboxes: &Vec<BBox>| -> usize { bboxes.len() }) },
40//! || {},
41//! OperatorConfig::new().name("Counter"),
42//! &detected_objects
43//! );
44//!
45//! // Run the application
46//! node.run();
47//! ```
48//!
49//! ## Operators
50//! ERDOS operators process received data, and use
51//! [streams](crate::dataflow::stream) to broadcast
52//! [`Message`s](crate::dataflow::Message) to downstream operators.
53//! ERDOS provides a [standard library of operators](crate::dataflow::operators)
54//! for common dataflow patterns.
55//! While the standard operators are general and versatile, some applications
56//! may implement custom operators to better optimize performance and take
57//! fine-grained control over exection.
58//!
59//! ### Implementing Operators
60//! For an example, see the implementation of the
61//! [`FlatMapOperator`](crate::dataflow::operators::FlatMapOperator).
62//!
63//! Operators are structures which implement an
64//! [operator trait](crate::dataflow::operator) reflecting their
65//! communication pattern.
66//! For example, the [`SplitOperator`](crate::dataflow::operators::SplitOperator)
67//! implements [`OneInTwoOut`](crate::dataflow::operator::OneInTwoOut)
68//! because it receives data on one input stream, and sends messages on
69//! two output streams.
70//!
71//! Operators can support both push and pull-based models of execution
72//! by implementing methods defined in the
73//! [operator traits](crate::dataflow::operator).
74//! By implementing callbacks such as
75//! [`OneInOneOut::on_data`](crate::dataflow::operator::OneInOneOut::on_data),
76//! operators can process messages as they arrive.
77//! Moreover, operators can implement callbacks over [watermarks](#watermarks)
78//! (e.g. [`OneInOneOut::on_watermark`](crate::dataflow::operator::OneInOneOut::on_watermark))
79//! to ensure ordered processing over timestamps.
80//! ERDOS ensures lock-free, safe, and concurrent processing by ordering
81//! callbacks in an ERDOS-managed execution lattice, which serves as a run
82//! queue for the system's multithreaded runtime.
83//!
84//! While ERDOS manages the execution of callbacks, some operators require
85//! more finegrained control. Operators can use the pull-based model
86//! to take over the thread of execution by overriding the `run` method
87//! (e.g. [`OneInOneOut::run`](crate::dataflow::operator::OneInOneOut::run))
88//! of an [operator trait](crate::dataflow::operator), and pulling data from
89//! the [`ReadStream`](crate::dataflow::stream::ReadStream)s.
90//! *Callbacks are not invoked while `run` executes.*
91//!
92//! ## Performance
93//! ERDOS is designed for low latency. Self-driving car pipelines require
94//! end-to-end deadlines on the order of hundreds of milliseconds for safe
95//! driving. Similarly, self-driving cars typically process gigabytes per
96//! second of data on small clusters. Therefore, ERDOS is optimized to
97//! send small amounts of data (gigabytes as opposed to terabytes)
98//! as quickly as possible.
99//!
100//! ## Watermarks
101//! Watermarks in ERDOS signal completion of computation. More concretely,
102//! sending a watermark with timestamp `t` on a stream asserts that all future
103//! messages sent on that stream will have timestamps `t' > t`.
104//! ERDOS also introduces a *top watermark*, which is a watermark with the
105//! maximum possible timestamp. Sending a top watermark closes the stream as
106//! there is no `t' > t_top`, so no more messages can be sent.
107//!
108//! ## Determinism
109//! ERDOS provides mechanisms to enable the building of deterministic
110//! applications.
111//! For instance, processing sets of messages separated by watermarks using
112//! watermark callbacks can turn ERDOS pipelines into
113//! [Kahn process networks](https://en.wikipedia.org/wiki/Kahn_process_networks).
114//!
115//! ## More Information
116//! To read more about the ideas behind ERDOS, refer to our paper
117//! [*D3: A Dynamic Deadline-Driven Approach for Building Autonomous Vehicles*](https://dl.acm.org/doi/10.1145/3492321.3519576).
118//! If you find ERDOS useful to your work, please cite our paper as follows:
119//! ```bibtex
120//! @inproceedings{10.1145/3492321.3519576,
121//! author = {Gog, Ionel and Kalra, Sukrit and Schafhalter, Peter and Gonzalez, Joseph E. and Stoica, Ion},
122//! title = {D3: A Dynamic Deadline-Driven Approach for Building Autonomous Vehicles},
123//! year = {2022},
124//! isbn = {9781450391627},
125//! publisher = {Association for Computing Machinery},
126//! address = {New York, NY, USA},
127//! url = {https://doi.org/10.1145/3492321.3519576},
128//! doi = {10.1145/3492321.3519576},
129//! booktitle = {Proceedings of the Seventeenth European Conference on Computer Systems},
130//! pages = {453–471},
131//! numpages = {19},
132//! location = {Rennes, France},
133//! series = {EuroSys '22}
134//! }
135//! ```
136
137// Required for specialization.
138#![allow(incomplete_features)]
139#![feature(specialization)]
140#![feature(box_into_pin)]
141
142// Re-exports of libraries used in macros.
143#[doc(hidden)]
144pub use ::tokio;
145
146// Libraries used in this file.
147use std::{cell::RefCell, fmt};
148
149use abomonation_derive::Abomonation;
150use clap::{self, App, Arg};
151use rand::{Rng, SeedableRng, StdRng};
152use serde::{Deserialize, Serialize};
153
154// Private submodules
155mod configuration;
156
157// Public submodules
158#[doc(hidden)]
159pub mod communication;
160pub mod dataflow;
161pub mod node;
162#[doc(hidden)]
163pub mod scheduler;
164
165// Public exports
166pub use configuration::Configuration;
167pub use dataflow::{connect::*, OperatorConfig};
168
169/// A unique identifier for an operator.
170pub type OperatorId = Uuid;
171
172// Random number generator which should be the same accross threads and processes.
173thread_local!(static RNG: RefCell<StdRng>= RefCell::new(StdRng::from_seed(&[1913, 3, 26])));
174
175/// Produces a deterministic, unique ID.
176pub fn generate_id() -> Uuid {
177 RNG.with(|rng| {
178 let mut bytes = [0u8; 16];
179 rng.borrow_mut().fill_bytes(&mut bytes);
180 Uuid(bytes)
181 })
182}
183
184/// Wrapper around [`uuid::Uuid`] that implements [`Abomonation`](abomonation::Abomonation) for fast serialization.
185#[derive(
186 Abomonation, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize,
187)]
188pub struct Uuid(uuid::Bytes);
189
190impl Uuid {
191 pub fn new_v4() -> Self {
192 Self(*uuid::Uuid::new_v4().as_bytes())
193 }
194
195 pub fn new_deterministic() -> Self {
196 generate_id()
197 }
198
199 pub fn nil() -> Uuid {
200 Uuid([0; 16])
201 }
202}
203
204impl fmt::Debug for Uuid {
205 fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
206 let &Uuid(bytes) = self;
207 let id = uuid::Uuid::from_bytes(bytes);
208 fmt::Display::fmt(&id, f)
209 }
210}
211
212impl fmt::Display for Uuid {
213 fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
214 let &Uuid(bytes) = self;
215 let id = uuid::Uuid::from_bytes(bytes);
216 fmt::Display::fmt(&id, f)
217 }
218}
219
220/// Resets seed and creates a new dataflow graph.
221pub fn reset() {
222 // All global variables should be reset here.
223 RNG.with(|rng| {
224 *rng.borrow_mut() = StdRng::from_seed(&[1913, 3, 26]);
225 });
226 dataflow::graph::default_graph::set(dataflow::graph::AbstractGraph::new());
227}
228
229/// Defines command line arguments for running a multi-node ERDOS application.
230pub fn new_app(name: &str) -> clap::App {
231 App::new(name)
232 .arg(
233 Arg::with_name("threads")
234 .short("t")
235 .long("threads")
236 .default_value("4")
237 .help("Number of worker threads per process"),
238 )
239 .arg(
240 Arg::with_name("data-addresses")
241 .short("d")
242 .long("data-addresses")
243 .default_value("127.0.0.1:9000")
244 .help("Comma separated list of data socket addresses of all nodes"),
245 )
246 .arg(
247 Arg::with_name("control-addresses")
248 .short("c")
249 .long("control-addresses")
250 .default_value("127.0.0.1:9000")
251 .help("Comma separated list of control socket addresses of all nodes"),
252 )
253 .arg(
254 Arg::with_name("index")
255 .short("i")
256 .long("index")
257 .default_value("0")
258 .help("Current node index"),
259 )
260 .arg(
261 Arg::with_name("graph-filename")
262 .short("g")
263 .long("graph-filename")
264 .default_value("")
265 .help("Exports the dataflow graph as a DOT file to the provided filename"),
266 )
267 .arg(
268 Arg::with_name("verbose")
269 .short("v")
270 .long("verbose")
271 .multiple(true)
272 .takes_value(false)
273 .help("Sets the level of verbosity"),
274 )
275}