logo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#![doc(html_logo_url = "https://avatars2.githubusercontent.com/u/44586405")]
//! ERDOS is a platform for developing self-driving car and robotics
//! applications. The system is built using techniques from streaming dataflow
//! systems which is reflected by the API.
//!
//! Applications are modeled as directed graphs, in which data flows through
//! [streams](crate::dataflow::stream) and is processed by
//! [operators](crate::dataflow::operator).
//! Because applications often resemble a sequence of connected operators,
//! an ERDOS application may also be referred to as a *pipeline*.
//!
//! ## Example
//! This example shows an ERDOS application which counts the number of objects
//! detected from a stream of camera frames.
//! The example consists of the [driver](#driver) part of the program, which
//! is responsible for connecting operators via streams.
//! For information on building operators, see [§ Operators](#operators).
//!
//! ```ignore
//! // Capture arguments to set up an ERDOS node.
//! let args = erdos::new_app("ObjectCounter");
//! // Create an ERDOS node which runs the application.
//! let mut node = Node::new(Configuration::from_args(&args));
//!
//! // Stream of RGB images from a camera.
//! let camera_frames = erdos::connect_source(
//!     CameraOperator::new,
//!     OperatorConfig::new().name("Camera")
//! );
//! // Stream of labeled bounding boxes for each RGB image.
//! let detected_objects = erdos::connect_one_in_one_out(
//!     ObjectDetector::new,
//!     || {},
//!     OperatorConfig::new().name("Detector"),
//!     &camera_frames
//! );
//! // Stream of detected object count for each RGB image.
//! let num_detected = erdos::connect_one_in_one_out(
//!     || { MapOperator::new(|bboxes: &Vec<BBox>| -> usize { bboxes.len() }) },
//!     || {},
//!     OperatorConfig::new().name("Counter"),
//!     &detected_objects
//! );
//!
//! // Run the application
//! node.run();
//! ```
//!
//! ## Operators
//! ERDOS operators process received data, and use
//! [streams](crate::dataflow::stream) to broadcast
//! [`Message`s](crate::dataflow::Message) to downstream operators.
//! ERDOS provides a [standard library of operators](crate::dataflow::operators)
//! for common dataflow patterns.
//! While the standard operators are general and versatile, some applications
//! may implement custom operators to better optimize performance and take
//! fine-grained control over exection.
//!
//! ### Implementing Operators
//! For an example, see the implementation of the
//! [`FlatMapOperator`](crate::dataflow::operators::FlatMapOperator).
//!
//! Operators are structures which implement an
//! [operator trait](crate::dataflow::operator) reflecting their
//! communication pattern.
//! For example, the [`SplitOperator`](crate::dataflow::operators::SplitOperator)
//! implements [`OneInTwoOut`](crate::dataflow::operator::OneInTwoOut)
//! because it receives data on one input stream, and sends messages on
//! two output streams.
//!
//! Operators can support both push and pull-based models of execution
//! by implementing methods defined in the
//! [operator traits](crate::dataflow::operator).
//! By implementing callbacks such as
//! [`OneInOneOut::on_data`](crate::dataflow::operator::OneInOneOut::on_data),
//! operators can process messages as they arrive.
//! Moreover, operators can implement callbacks over [watermarks](#watermarks)
//! (e.g. [`OneInOneOut::on_watermark`](crate::dataflow::operator::OneInOneOut::on_watermark))
//! to ensure ordered processing over timestamps.
//! ERDOS ensures lock-free, safe, and concurrent processing by ordering
//! callbacks in an ERDOS-managed execution lattice, which serves as a run
//! queue for the system's multithreaded runtime.
//!
//! While ERDOS manages the execution of callbacks, some operators require
//! more finegrained control. Operators can use the pull-based model
//! to take over the thread of execution by overriding the `run` method
//! (e.g. [`OneInOneOut::run`](crate::dataflow::operator::OneInOneOut::run))
//! of an [operator trait](crate::dataflow::operator), and pulling data from
//! the [`ReadStream`](crate::dataflow::stream::ReadStream)s.
//! *Callbacks are not invoked while `run` executes.*
//!
//! ## Performance
//! ERDOS is designed for low latency. Self-driving car pipelines require
//! end-to-end deadlines on the order of hundreds of milliseconds for safe
//! driving. Similarly, self-driving cars typically process gigabytes per
//! second of data on small clusters. Therefore, ERDOS is optimized to
//! send small amounts of data (gigabytes as opposed to terabytes)
//! as quickly as possible.
//!
//! ## Watermarks
//! Watermarks in ERDOS signal completion of computation. More concretely,
//! sending a watermark with timestamp `t` on a stream asserts that all future
//! messages sent on that stream will have timestamps `t' > t`.
//! ERDOS also introduces a *top watermark*, which is a watermark with the
//! maximum possible timestamp. Sending a top watermark closes the stream as
//! there is no `t' > t_top`, so no more messages can be sent.
//!
//! ## Determinism
//! ERDOS provides mechanisms to enable the building of deterministic
//! applications.
//! For instance, processing sets of messages separated by watermarks using
//! watermark callbacks can turn ERDOS pipelines into
//! [Kahn process networks](https://en.wikipedia.org/wiki/Kahn_process_networks).
//!
//! ## More Information
//! To read more about the ideas behind ERDOS, refer to our paper
//! [*D3: A Dynamic Deadline-Driven Approach for Building Autonomous Vehicles*](https://dl.acm.org/doi/10.1145/3492321.3519576).
//! If you find ERDOS useful to your work, please cite our paper as follows:
//! ```bibtex
//! @inproceedings{10.1145/3492321.3519576,
//!     author = {Gog, Ionel and Kalra, Sukrit and Schafhalter, Peter and Gonzalez, Joseph E. and Stoica, Ion},
//!     title = {D3: A Dynamic Deadline-Driven Approach for Building Autonomous Vehicles},
//!     year = {2022},
//!     isbn = {9781450391627},
//!     publisher = {Association for Computing Machinery},
//!     address = {New York, NY, USA},
//!     url = {https://doi.org/10.1145/3492321.3519576},
//!     doi = {10.1145/3492321.3519576},
//!     booktitle = {Proceedings of the Seventeenth European Conference on Computer Systems},
//!     pages = {453–471},
//!     numpages = {19},
//!     location = {Rennes, France},
//!     series = {EuroSys '22}
//! }
//! ```

// Required for specialization.
#![allow(incomplete_features)]
#![feature(specialization)]
#![feature(box_into_pin)]

// Re-exports of libraries used in macros.
#[doc(hidden)]
pub use ::tokio;

// Libraries used in this file.
use std::{cell::RefCell, fmt};

use abomonation_derive::Abomonation;
use clap::{self, App, Arg};
use rand::{Rng, SeedableRng, StdRng};
use serde::{Deserialize, Serialize};

// Private submodules
mod configuration;

// Public submodules
#[doc(hidden)]
pub mod communication;
pub mod dataflow;
pub mod node;
#[doc(hidden)]
pub mod scheduler;

// Public exports
pub use configuration::Configuration;
pub use dataflow::{connect::*, OperatorConfig};

/// A unique identifier for an operator.
pub type OperatorId = Uuid;

// Random number generator which should be the same accross threads and processes.
thread_local!(static RNG: RefCell<StdRng>= RefCell::new(StdRng::from_seed(&[1913, 3, 26])));

/// Produces a deterministic, unique ID.
pub fn generate_id() -> Uuid {
    RNG.with(|rng| {
        let mut bytes = [0u8; 16];
        rng.borrow_mut().fill_bytes(&mut bytes);
        Uuid(bytes)
    })
}

/// Wrapper around [`uuid::Uuid`] that implements [`Abomonation`](abomonation::Abomonation) for fast serialization.
#[derive(
    Abomonation, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize,
)]
pub struct Uuid(uuid::Bytes);

impl Uuid {
    pub fn new_v4() -> Self {
        Self(*uuid::Uuid::new_v4().as_bytes())
    }

    pub fn new_deterministic() -> Self {
        generate_id()
    }

    pub fn nil() -> Uuid {
        Uuid([0; 16])
    }
}

impl fmt::Debug for Uuid {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
        let &Uuid(bytes) = self;
        let id = uuid::Uuid::from_bytes(bytes);
        fmt::Display::fmt(&id, f)
    }
}

impl fmt::Display for Uuid {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
        let &Uuid(bytes) = self;
        let id = uuid::Uuid::from_bytes(bytes);
        fmt::Display::fmt(&id, f)
    }
}

/// Resets seed and creates a new dataflow graph.
pub fn reset() {
    // All global variables should be reset here.
    RNG.with(|rng| {
        *rng.borrow_mut() = StdRng::from_seed(&[1913, 3, 26]);
    });
    dataflow::graph::default_graph::set(dataflow::graph::AbstractGraph::new());
}

/// Defines command line arguments for running a multi-node ERDOS application.
pub fn new_app(name: &str) -> clap::App {
    App::new(name)
        .arg(
            Arg::with_name("threads")
                .short("t")
                .long("threads")
                .default_value("4")
                .help("Number of worker threads per process"),
        )
        .arg(
            Arg::with_name("data-addresses")
                .short("d")
                .long("data-addresses")
                .default_value("127.0.0.1:9000")
                .help("Comma separated list of data socket addresses of all nodes"),
        )
        .arg(
            Arg::with_name("control-addresses")
                .short("c")
                .long("control-addresses")
                .default_value("127.0.0.1:9000")
                .help("Comma separated list of control socket addresses of all nodes"),
        )
        .arg(
            Arg::with_name("index")
                .short("i")
                .long("index")
                .default_value("0")
                .help("Current node index"),
        )
        .arg(
            Arg::with_name("graph-filename")
                .short("g")
                .long("graph-filename")
                .default_value("")
                .help("Exports the dataflow graph as a DOT file to the provided filename"),
        )
        .arg(
            Arg::with_name("verbose")
                .short("v")
                .long("verbose")
                .multiple(true)
                .takes_value(false)
                .help("Sets the level of verbosity"),
        )
}