renoir/lib.rs
1/*!
2# Renoir
3
4[Preprint](https://arxiv.org/abs/2306.04421)
5
6### REactive Network of Operators In Rust
7
8[API Docs](https://deib-polimi.github.io/renoir/renoir/)
9
10Renoir *(short: Noir) [/ʁənwaʁ/, /nwaʁ/]* is a distributed data processing platform based on the dataflow paradigm that provides an ergonomic programming interface, similar to that of Apache Flink, but has much better performance characteristics.
11
12
13Renoir converts each job into a dataflow graph of
14operators and groups them in blocks. Blocks contain a sequence of operors which process the data sequentially without repartitioning it. They are the deployment unit used by the system and can be distributed and executed on multiple systems.
15
16The common layout of a Renoir program starts with the creation of a `StreamContext`, then one or more `Source`s are initialised creating a `Stream`. The graph of operators is composed using the methods of the `Stream` object, which follow a similar approach to Rust's `Iterator` trait allowing ergonomically define a processing workflow through method chaining.
17
18### Examples
19
20#### Wordcount
21
22```no_run
23use renoir::prelude::*;
24
25fn main() {
26 // Convenience method to parse deployment config from CLI arguments
27 let (config, args) = RuntimeConfig::from_args();
28 config.spawn_remote_workers();
29 let env = StreamContext::new(config);
30
31 let result = env
32 // Open and read file line by line in parallel
33 .stream_file(&args[0])
34 // Split into words
35 .flat_map(|line| tokenize(&line))
36 // Partition
37 .group_by(|word| word.clone())
38 // Count occurrences
39 .fold(0, |count, _word| *count += 1)
40 // Collect result
41 .collect_vec();
42
43 env.execute_blocking(); // Start execution (blocking)
44 if let Some(result) = result.get() {
45 // Print word counts
46 result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
47 }
48}
49
50fn tokenize(s: &str) -> Vec<String> {
51 // Simple tokenisation strategy
52 s.split_whitespace().map(str::to_lowercase).collect()
53}
54
55// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
56```
57
58#### Wordcount associative (faster)
59
60
61```no_run
62use renoir::prelude::*;
63
64fn main() {
65 // Convenience method to parse deployment config from CLI arguments
66 let (config, args) = RuntimeConfig::from_args();
67 let env = StreamContext::new(config);
68
69 let result = env
70 .stream_file(&args[0])
71 // Adaptive batching(default) has predictable latency
72 // Fixed size batching often leads to shorter execution times
73 // If data is immediately available and latency is not critical
74 .batch_mode(BatchMode::fixed(1024))
75 .flat_map(move |line| tokenize(&line))
76 .map(|word| (word, 1))
77 // Associative operators split the operation in a local and a
78 // global step for faster execution
79 .group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
80 .drop_key()
81 .collect_vec();
82
83 env.execute_blocking(); // Start execution (blocking)
84 if let Some(result) = result.get() {
85 // Print word counts
86 result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
87 }
88}
89
90fn tokenize(s: &str) -> Vec<String> {
91 s.split_whitespace().map(str::to_lowercase).collect()
92}
93
94// Execute on multiple hosts `cargo run -- -r config.toml input.txt`
95```
96
97### Remote deployment
98
99```toml
100# config.toml
101[[host]]
102address = "host1.lan"
103base_port = 9500
104num_cores = 16
105
106[[host]]
107address = "host2.lan"
108base_port = 9500
109num_cores = 24
110ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
111```
112
113Refer to the [examples](examples/) directory for an extended set of working examples
114*/
115#[macro_use]
116extern crate derivative;
117#[macro_use]
118extern crate tracing;
119
120pub use block::{BatchMode, Replication};
121pub use config::RuntimeConfig;
122pub use environment::StreamContext;
123pub use operator::iteration::IterationStateHandle;
124pub use scheduler::ExecutionMetadata;
125pub use stream::{KeyedStream, Stream, WindowedStream};
126
127pub mod block;
128pub(crate) mod channel;
129pub mod config;
130pub(crate) mod environment;
131pub(crate) mod network;
132pub mod operator;
133mod profiler;
134#[cfg(feature = "ssh")]
135pub(crate) mod runner;
136pub(crate) mod scheduler;
137pub(crate) mod stream;
138#[cfg(test)]
139pub(crate) mod test;
140pub(crate) mod worker;
141
142pub type CoordUInt = u64;
143
144/// Re-export of commonly used structs and traits
145pub mod prelude {
146 pub use crate::block::{BatchMode, Replication};
147 pub use crate::config::RuntimeConfig;
148 pub use crate::operator::sink::StreamOutput;
149 pub use crate::operator::source::*;
150 pub use crate::operator::window::{CountWindow, ProcessingTimeWindow, SessionWindow};
151 #[cfg(feature = "timestamp")]
152 pub use crate::operator::window::{EventTimeWindow, TransactionWindow};
153 pub use crate::StreamContext;
154}