1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*!
# Noir
[Preprint](https://arxiv.org/abs/2306.04421)
### Network of Operators In Rust
[API Docs](https://deib-polimi.github.io/noir/noir/)
Noir is a distributed data processing platform based on the dataflow paradigm that provides an ergonomic programming interface, similar to that of Apache Flink, but has much better performance characteristics.
Noir converts each job into a dataflow graph of
operators and groups them in blocks. Blocks contain a sequence of operors which process the data sequentially without repartitioning it. They are the deployment unit used by the system and can be distributed and executed on multiple systems.
The common layout of a Noir program starts with the creation of a `StreamContext`, then one or more `Source`s are initialised creating a `Stream`. The graph of operators is composed using the methods of the `Stream` object, which follow a similar approach to Rust's `Iterator` trait allowing ergonomically define a processing workflow through method chaining.
### Examples
#### Wordcount
```no_run
use noir_compute::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Open and read file line by line in parallel
.stream_file(&args[0])
// Split into words
.flat_map(|line| tokenize(&line))
// Partition
.group_by(|word| word.clone())
// Count occurrences
.fold(0, |count, _word| *count += 1)
// Collect result
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Simple tokenisation strategy
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
```
#### Wordcount associative (faster)
```no_run
use noir_compute::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[0])
// Adaptive batching(default) has predictable latency
// Fixed size batching often leads to shorter execution times
// If data is immediately available and latency is not critical
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Associative operators split the operation in a local and a
// global step for faster execution
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.drop_key()
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on multiple hosts `cargo run -- -r config.yaml input.txt`
```
### Remote deployment
```yaml
# config.yaml
hosts:
- address: host1.lan
base_port: 9500
num_cores: 16
- address: host2.lan
base_port: 9500
num_cores: 8
ssh:
username: noir-compute
key_file: /home/user/.ssh/id_rsa
```
Refer to the [examples](examples/) directory for an extended set of working examples
*/
extern crate derivative;
extern crate log;
use ;
use ;
pub use structure;
pub use BatchMode;
pub use Replication;
pub use ;
pub use RuntimeConfig;
pub use StreamContext;
pub use IterationStateHandle;
pub use ExecutionMetadata;
pub use ;
use crateBlockStructure;
use crateCoord;
use crateProfilerResult;
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub type CoordUInt = u64;
/// Re-export of commonly used structs and traits
/// Tracing information of the current execution.
pub