1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::ops::Range;
use crate::{Circuit, Stream, trace::BatchReaderFactories, typed_batch::Batch};
impl<C, IB> Stream<C, IB>
where
C: Circuit,
IB: Batch<Time = ()>,
IB::InnerBatch: Send,
{
/// Shard batches across multiple worker threads based on keys.
///
/// # Theory
///
/// We parallelize processing across `N` worker threads by creating a
/// replica of the same circuit per thread and sharding data across
/// replicas. To ensure correctness (i.e., that the sum of outputs
/// produced by individual workers is equal to the output produced
/// by processing the entire dataset by one worker), sharding must satisfy
/// certain requirements determined by each operator. In particular,
/// for `distinct`, and `aggregate` all tuples that share the same key
/// must be processed by the same worker. For `join`, tuples from both
/// input streams with the same key must be processed by the same worker.
///
/// Other operators, e.g., `filter` and `flat_map`, impose no restrictions
/// on the sharding scheme: as long as each tuple in a batch is
/// processed by some worker, the correct result will be produced. This
/// is true for all linear operators.
///
/// The `shard` operator shards input batches based on the hash of the key,
/// making sure that tuples with the same key always end up at the same
/// worker. More precisely, the operator **re-shards** its input by
/// partitioning batches in the input stream of each worker based on the
/// hash of the key, distributing resulting fragments among peers
/// and re-assembling fragments at each peer:
///
/// ```text
/// ┌──────────────────┐
/// worker1 │ │
/// ───────►├─────┬───────────►├──────►
/// │ │ │
/// ───────►├─────┴───────────►├──────►
/// worker2 │ │
/// └──────────────────┘
/// ```
///
/// # Usage
///
/// Most users do not need to invoke `shard` directly (and doing so is
/// likely to lead to incorrect results unless you know exactly what you
/// are doing). Instead, each operator re-shards its inputs as
/// necessary, e.g., `join` applies `shard` to both of its
/// input streams, while `filter` consumes its input directly without
/// re-sharding.
///
/// # Performance considerations
///
/// In the current implementation, the `shard` operator introduces a
/// synchronization barrier across all workers: its output at any worker
/// is only produced once input batches have been collected from all
/// workers. This limits the scalability since a slow worker (e.g., running
/// on a busy CPU core or sharing the core with other workers) or uneven
/// sharding can slow down the whole system and reduce gains from
/// parallelization.
pub fn shard(&self) -> Stream<C, IB> {
let factories = BatchReaderFactories::new::<IB::Key, IB::Val, IB::R>();
self.inner().dyn_shard(&factories).typed()
}
/// Shard batch across just the specified range of `workers`.
///
/// If `workers` contains just one worker, then [Stream::gather] is more
/// efficient.
pub fn shard_workers(&self, workers: Range<usize>) -> Stream<C, IB> {
let factories = BatchReaderFactories::new::<IB::Key, IB::Val, IB::R>();
self.inner().dyn_shard_workers(workers, &factories).typed()
}
}