pub struct MutableAntichain<T> { /* private fields */ }Expand description
An antichain based on a multiset whose elements frequencies can be updated.
The MutableAntichain maintains frequencies for many elements of type T, and exposes the set
of elements with positive count not greater than any other elements with positive count. The
antichain may both advance and retreat; the changes do not all need to be to elements greater or
equal to some elements of the frontier.
The type T must implement PartialOrder as well as Ord. The implementation of the Ord trait
is used to efficiently organize the updates for cancellation, and to efficiently determine the lower
bounds, and only needs to not contradict the PartialOrder implementation (that is, if PartialOrder
orders two elements, then so does the Ord implementation).
The MutableAntichain implementation is done with the intent that updates to it are done in batches,
and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
that it can be expensive to maintain a large number of counts and change few elements near the frontier.
Implementations§
Source§impl<T> MutableAntichain<T>
impl<T> MutableAntichain<T>
Sourcepub fn new() -> MutableAntichain<T>
pub fn new() -> MutableAntichain<T>
Creates a new empty MutableAntichain.
§Examples
use timely::progress::frontier::MutableAntichain;
let frontier = MutableAntichain::<usize>::new();
assert!(frontier.is_empty());Sourcepub fn clear(&mut self)
pub fn clear(&mut self)
Removes all elements.
§Examples
use timely::progress::frontier::MutableAntichain;
let mut frontier = MutableAntichain::<usize>::new();
frontier.clear();
assert!(frontier.is_empty());Sourcepub fn frontier(&self) -> AntichainRef<'_, T>
pub fn frontier(&self) -> AntichainRef<'_, T>
Reveals the minimal elements with positive count.
§Examples
use timely::progress::frontier::MutableAntichain;
let mut frontier = MutableAntichain::<usize>::new();
assert!(frontier.frontier().len() == 0);Sourcepub fn new_bottom(bottom: T) -> MutableAntichain<T>
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
Creates a new singleton MutableAntichain.
§Examples
use timely::progress::frontier::{AntichainRef, MutableAntichain};
let mut frontier = MutableAntichain::new_bottom(0u64);
assert!(frontier.frontier() == AntichainRef::new(&[0u64]));Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true if there are no elements in the MutableAntichain.
§Examples
use timely::progress::frontier::MutableAntichain;
let mut frontier = MutableAntichain::<usize>::new();
assert!(frontier.is_empty());Sourcepub fn less_than<O>(&self, time: &O) -> boolwhere
T: PartialOrder<O>,
pub fn less_than<O>(&self, time: &O) -> boolwhere
T: PartialOrder<O>,
Returns true if any item in the MutableAntichain is strictly less than the argument.
§Examples
use timely::progress::frontier::MutableAntichain;
let mut frontier = MutableAntichain::new_bottom(1u64);
assert!(!frontier.less_than(&0));
assert!(!frontier.less_than(&1));
assert!(frontier.less_than(&2));Sourcepub fn less_equal<O>(&self, time: &O) -> boolwhere
T: PartialOrder<O>,
pub fn less_equal<O>(&self, time: &O) -> boolwhere
T: PartialOrder<O>,
Returns true if any item in the MutableAntichain is less than or equal to the argument.
§Examples
use timely::progress::frontier::MutableAntichain;
let mut frontier = MutableAntichain::new_bottom(1u64);
assert!(!frontier.less_equal(&0));
assert!(frontier.less_equal(&1));
assert!(frontier.less_equal(&2));Examples found in repository?
8fn main() {
9 // initializes and runs a timely dataflow.
10 timely::execute_from_args(std::env::args(), |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // define a distribution function for strings.
16 let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18 // create a new input, exchange data, and inspect its output
19 worker.dataflow::<usize,_,_>(|scope| {
20 input.to_stream(scope)
21 .flat_map(|(text, diff): (String, i64)|
22 text.split_whitespace()
23 .map(move |word| (word.to_owned(), diff))
24 .collect::<Vec<_>>()
25 )
26 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28 let mut queues = HashMap::new();
29 let mut counts = HashMap::new();
30
31 move |(input, frontier), output| {
32 input.for_each_time(|time, data| {
33 queues.entry(time.retain(output.output_index()))
34 .or_insert(Vec::new())
35 .extend(data.map(std::mem::take));
36 });
37
38 for (key, val) in queues.iter_mut() {
39 if !frontier.less_equal(key.time()) {
40 let mut session = output.session(key);
41 for mut batch in val.drain(..) {
42 for (word, diff) in batch.drain(..) {
43 let entry = counts.entry(word.clone()).or_insert(0i64);
44 *entry += diff;
45 session.give((word, *entry));
46 }
47 }
48 }
49 }
50
51 queues.retain(|_key, val| !val.is_empty());
52 }})
53 .container::<Vec<_>>()
54 .inspect(|x| println!("seen: {:?}", x))
55 .probe_with(&probe);
56 });
57
58 // introduce data and watch!
59 for round in 0..10 {
60 input.send(("round".to_owned(), 1));
61 input.advance_to(round + 1);
62 while probe.less_than(input.time()) {
63 worker.step();
64 }
65 }
66 }).unwrap();
67}More examples
21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 // initializes and runs a timely dataflow.
34 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 // create a new input, exchange data, and inspect its output
39 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_string(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", wc.text, wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 // introduce data and watch!
117 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}8fn main() {
9
10 timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 worker.dataflow::<usize,_,_>(|scope| {
16
17 // create a new input, into which we can push edge changes.
18 let edge_stream = input.to_stream(scope);
19
20 // create a new feedback stream, which will be changes to ranks.
21 let (handle, rank_stream) = scope.feedback(1);
22
23 // bring edges and ranks together!
24 let changes = edge_stream.binary_frontier(
25 rank_stream,
26 Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27 Exchange::new(|x: &(usize, i64)| x.0 as u64),
28 "PageRank",
29 |_capability, _info| {
30
31 // where we stash out-of-order data.
32 let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33 let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35 // lists of edges, ranks, and changes.
36 let mut edges = Vec::new();
37 let mut ranks = Vec::new();
38 let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39 let mut delta = Vec::new();
40
41 let timer = ::std::time::Instant::now();
42
43 move |(input1, frontier1), (input2, frontier2), output| {
44
45 // hold on to edge changes until it is time.
46 input1.for_each_time(|time, data| {
47 let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48 data.for_each(|data| entry.append(data));
49 });
50
51 // hold on to rank changes until it is time.
52 input2.for_each_time(|time, data| {
53 let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54 data.for_each(|data| entry.append(data));
55 });
56
57 let frontiers = &[frontier1, frontier2];
58
59 for (time, edge_changes) in edge_stash.iter_mut() {
60 if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62 let mut session = output.session(time);
63
64 compact(edge_changes);
65
66 for ((src, dst), diff) in edge_changes.drain(..) {
67
68 // 0. ensure enough state allocated
69 while edges.len() <= src { edges.push(Vec::new()); }
70 while ranks.len() <= src { ranks.push(1_000); }
71 while diffs.len() <= src { diffs.push(0); }
72
73 // 1. subtract previous distribution.
74 allocate(ranks[src], &edges[src][..], &mut delta);
75 for x in delta.iter_mut() { x.1 *= -1; }
76
77 // 2. update edges.
78 edges[src].push((dst, diff));
79 compact(&mut edges[src]);
80
81 // 3. re-distribute allocations.
82 allocate(ranks[src], &edges[src][..], &mut delta);
83
84 // 4. compact down and send cumulative changes.
85 compact(&mut delta);
86 for (dst, diff) in delta.drain(..) {
87 session.give((dst, diff));
88 }
89 }
90 }
91 }
92
93 edge_stash.retain(|_key, val| !val.is_empty());
94
95 for (time, rank_changes) in rank_stash.iter_mut() {
96 if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98 let mut session = output.session(time);
99
100 compact(rank_changes);
101
102 let mut cnt = 0;
103 let mut sum = 0;
104 let mut max = 0;
105
106 for (src, diff) in rank_changes.drain(..) {
107
108 cnt += 1;
109 sum += diff.abs();
110 max = if max < diff.abs() { diff.abs() } else { max };
111
112 // 0. ensure enough state allocated
113 while edges.len() <= src { edges.push(Vec::new()); }
114 while ranks.len() <= src { ranks.push(1_000); }
115 while diffs.len() <= src { diffs.push(0); }
116
117 // 1. subtract previous distribution.
118 allocate(ranks[src], &edges[src][..], &mut delta);
119 for x in delta.iter_mut() { x.1 *= -1; }
120
121 // 2. update ranks.
122 diffs[src] += diff;
123 if diffs[src].abs() >= 6 {
124 ranks[src] += diffs[src];
125 diffs[src] = 0;
126 }
127
128 // 3. re-distribute allocations.
129 allocate(ranks[src], &edges[src][..], &mut delta);
130
131 // 4. compact down and send cumulative changes.
132 compact(&mut delta);
133 for (dst, diff) in delta.drain(..) {
134 session.give((dst, diff));
135 }
136 }
137
138 println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139 }
140 }
141
142 rank_stash.retain(|_key, val| !val.is_empty());
143
144 }
145 }
146 );
147
148 changes
149 .probe_with(&probe)
150 .connect_loop(handle);
151
152 });
153
154 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157 let index = worker.index();
158 // Generate roughly random data.
159 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160 let hasher = BuildHasherDefault::<DefaultHasher>::new();
161 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162 hasher.hash_one(&(i,index,1)) as usize % nodes));
163 let remove = insert.clone();
164
165 for ins in (&mut insert).take(edges / worker.peers()) {
166 input.send((ins, 1));
167 }
168
169 input.advance_to(1);
170
171 while probe.less_than(input.time()) {
172 worker.step();
173 }
174
175 for (ins, del) in insert.zip(remove).take(1000) {
176 input.send((ins, 1));
177 input.send((del,-1));
178 input.advance_to(input.time() + 1);
179 while probe.less_than(input.time()) {
180 worker.step();
181 }
182 }
183
184 }).unwrap(); // asserts error-free execution;
185}Sourcepub fn update_iter<I>(&mut self, updates: I) -> Drain<'_, [(T, i64); 2]>
pub fn update_iter<I>(&mut self, updates: I) -> Drain<'_, [(T, i64); 2]>
Applies updates to the antichain and enumerates any changes.
§Examples
use timely::progress::frontier::{AntichainRef, MutableAntichain};
let mut frontier = MutableAntichain::new_bottom(1u64);
let changes =
frontier
.update_iter(vec![(1, -1), (2, 7)])
.collect::<Vec<_>>();
assert!(frontier.frontier() == AntichainRef::new(&[2]));
assert!(changes == vec![(1, -1), (2, 1)]);Trait Implementations§
Source§impl<T: Clone> Clone for MutableAntichain<T>
impl<T: Clone> Clone for MutableAntichain<T>
Source§fn clone(&self) -> MutableAntichain<T>
fn clone(&self) -> MutableAntichain<T>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl<T: Debug> Debug for MutableAntichain<T>
impl<T: Debug> Debug for MutableAntichain<T>
Source§impl<T> Default for MutableAntichain<T>
impl<T> Default for MutableAntichain<T>
Source§impl<'de, T> Deserialize<'de> for MutableAntichain<T>where
T: Deserialize<'de>,
impl<'de, T> Deserialize<'de> for MutableAntichain<T>where
T: Deserialize<'de>,
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Source§impl<T: PartialOrder + Ord + Clone> From<Antichain<T>> for MutableAntichain<T>
impl<T: PartialOrder + Ord + Clone> From<Antichain<T>> for MutableAntichain<T>
Source§impl<'a, T: PartialOrder + Ord + Clone> From<AntichainRef<'a, T>> for MutableAntichain<T>
impl<'a, T: PartialOrder + Ord + Clone> From<AntichainRef<'a, T>> for MutableAntichain<T>
Source§fn from(antichain: AntichainRef<'a, T>) -> Self
fn from(antichain: AntichainRef<'a, T>) -> Self
Source§impl<T> FromIterator<(T, i64)> for MutableAntichain<T>
impl<T> FromIterator<(T, i64)> for MutableAntichain<T>
Auto Trait Implementations§
impl<T> Freeze for MutableAntichain<T>where
T: Freeze,
impl<T> RefUnwindSafe for MutableAntichain<T>where
T: RefUnwindSafe,
impl<T> Send for MutableAntichain<T>where
T: Send,
impl<T> Sync for MutableAntichain<T>where
T: Sync,
impl<T> Unpin for MutableAntichain<T>where
T: Unpin,
impl<T> UnsafeUnpin for MutableAntichain<T>where
T: UnsafeUnpin,
impl<T> UnwindSafe for MutableAntichain<T>where
T: UnwindSafe + RefUnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more