Skip to main content

MutableAntichain

Struct MutableAntichain 

Source
pub struct MutableAntichain<T> { /* private fields */ }
Expand description

An antichain based on a multiset whose elements frequencies can be updated.

The MutableAntichain maintains frequencies for many elements of type T, and exposes the set of elements with positive count not greater than any other elements with positive count. The antichain may both advance and retreat; the changes do not all need to be to elements greater or equal to some elements of the frontier.

The type T must implement PartialOrder as well as Ord. The implementation of the Ord trait is used to efficiently organize the updates for cancellation, and to efficiently determine the lower bounds, and only needs to not contradict the PartialOrder implementation (that is, if PartialOrder orders two elements, then so does the Ord implementation).

The MutableAntichain implementation is done with the intent that updates to it are done in batches, and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means that it can be expensive to maintain a large number of counts and change few elements near the frontier.

Implementations§

Source§

impl<T> MutableAntichain<T>

Source

pub fn new() -> MutableAntichain<T>

Creates a new empty MutableAntichain.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let frontier = MutableAntichain::<usize>::new();
 assert!(frontier.is_empty());
Source

pub fn clear(&mut self)

Removes all elements.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let mut frontier = MutableAntichain::<usize>::new();
 frontier.clear();
 assert!(frontier.is_empty());
Source

pub fn frontier(&self) -> AntichainRef<'_, T>

Reveals the minimal elements with positive count.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let mut frontier = MutableAntichain::<usize>::new();
 assert!(frontier.frontier().len() == 0);
Source

pub fn new_bottom(bottom: T) -> MutableAntichain<T>
where T: Ord + Clone,

Creates a new singleton MutableAntichain.

§Examples
 use timely::progress::frontier::{AntichainRef, MutableAntichain};

 let mut frontier = MutableAntichain::new_bottom(0u64);
 assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
Source

pub fn is_empty(&self) -> bool

Returns true if there are no elements in the MutableAntichain.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let mut frontier = MutableAntichain::<usize>::new();
 assert!(frontier.is_empty());
Source

pub fn less_than<O>(&self, time: &O) -> bool
where T: PartialOrder<O>,

Returns true if any item in the MutableAntichain is strictly less than the argument.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let mut frontier = MutableAntichain::new_bottom(1u64);
 assert!(!frontier.less_than(&0));
 assert!(!frontier.less_than(&1));
 assert!(frontier.less_than(&2));
Source

pub fn less_equal<O>(&self, time: &O) -> bool
where T: PartialOrder<O>,

Returns true if any item in the MutableAntichain is less than or equal to the argument.

§Examples
 use timely::progress::frontier::MutableAntichain;

 let mut frontier = MutableAntichain::new_bottom(1u64);
 assert!(!frontier.less_equal(&0));
 assert!(frontier.less_equal(&1));
 assert!(frontier.less_equal(&2));
Examples found in repository?
examples/wordcount.rs (line 39)
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // define a distribution function for strings.
16        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18        // create a new input, exchange data, and inspect its output
19        worker.dataflow::<usize,_,_>(|scope| {
20            input.to_stream(scope)
21                 .flat_map(|(text, diff): (String, i64)|
22                    text.split_whitespace()
23                        .map(move |word| (word.to_owned(), diff))
24                        .collect::<Vec<_>>()
25                 )
26                 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28                    let mut queues = HashMap::new();
29                    let mut counts = HashMap::new();
30
31                    move |(input, frontier), output| {
32                        input.for_each_time(|time, data| {
33                            queues.entry(time.retain(output.output_index()))
34                                  .or_insert(Vec::new())
35                                  .extend(data.map(std::mem::take));
36                        });
37
38                        for (key, val) in queues.iter_mut() {
39                            if !frontier.less_equal(key.time()) {
40                                let mut session = output.session(key);
41                                for mut batch in val.drain(..) {
42                                    for (word, diff) in batch.drain(..) {
43                                        let entry = counts.entry(word.clone()).or_insert(0i64);
44                                        *entry += diff;
45                                        session.give((word, *entry));
46                                    }
47                                }
48                            }
49                        }
50
51                        queues.retain(|_key, val| !val.is_empty());
52                    }})
53                 .container::<Vec<_>>()
54                 .inspect(|x| println!("seen: {:?}", x))
55                 .probe_with(&probe);
56        });
57
58        // introduce data and watch!
59        for round in 0..10 {
60            input.send(("round".to_owned(), 1));
61            input.advance_to(round + 1);
62            while probe.less_than(input.time()) {
63                worker.step();
64            }
65        }
66    }).unwrap();
67}
More examples
Hide additional examples
examples/columnar.rs (line 78)
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_string(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", wc.text, wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}
examples/pagerank.rs (line 60)
8fn main() {
9
10    timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        worker.dataflow::<usize,_,_>(|scope| {
16
17            // create a new input, into which we can push edge changes.
18            let edge_stream = input.to_stream(scope);
19
20            // create a new feedback stream, which will be changes to ranks.
21            let (handle, rank_stream) = scope.feedback(1);
22
23            // bring edges and ranks together!
24            let changes = edge_stream.binary_frontier(
25                rank_stream,
26                Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27                Exchange::new(|x: &(usize, i64)| x.0 as u64),
28                "PageRank",
29                |_capability, _info| {
30
31                    // where we stash out-of-order data.
32                    let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33                    let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35                    // lists of edges, ranks, and changes.
36                    let mut edges = Vec::new();
37                    let mut ranks = Vec::new();
38                    let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39                    let mut delta = Vec::new();
40
41                    let timer = ::std::time::Instant::now();
42
43                    move |(input1, frontier1), (input2, frontier2), output| {
44
45                        // hold on to edge changes until it is time.
46                        input1.for_each_time(|time, data| {
47                            let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48                            data.for_each(|data| entry.append(data));
49                        });
50
51                        // hold on to rank changes until it is time.
52                        input2.for_each_time(|time, data| {
53                            let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54                            data.for_each(|data| entry.append(data));
55                        });
56
57                        let frontiers = &[frontier1, frontier2];
58
59                        for (time, edge_changes) in edge_stash.iter_mut() {
60                            if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62                                let mut session = output.session(time);
63
64                                compact(edge_changes);
65
66                                for ((src, dst), diff) in edge_changes.drain(..) {
67
68                                    // 0. ensure enough state allocated
69                                    while edges.len() <= src { edges.push(Vec::new()); }
70                                    while ranks.len() <= src { ranks.push(1_000); }
71                                    while diffs.len() <= src { diffs.push(0); }
72
73                                    // 1. subtract previous distribution.
74                                    allocate(ranks[src], &edges[src][..], &mut delta);
75                                    for x in delta.iter_mut() { x.1 *= -1; }
76
77                                    // 2. update edges.
78                                    edges[src].push((dst, diff));
79                                    compact(&mut edges[src]);
80
81                                    // 3. re-distribute allocations.
82                                    allocate(ranks[src], &edges[src][..], &mut delta);
83
84                                    // 4. compact down and send cumulative changes.
85                                    compact(&mut delta);
86                                    for (dst, diff) in delta.drain(..) {
87                                        session.give((dst, diff));
88                                    }
89                                }
90                            }
91                        }
92
93                        edge_stash.retain(|_key, val| !val.is_empty());
94
95                        for (time, rank_changes) in rank_stash.iter_mut() {
96                            if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98                                let mut session = output.session(time);
99
100                                compact(rank_changes);
101
102                                let mut cnt = 0;
103                                let mut sum = 0;
104                                let mut max = 0;
105
106                                for (src, diff) in rank_changes.drain(..) {
107
108                                    cnt += 1;
109                                    sum += diff.abs();
110                                    max = if max < diff.abs() { diff.abs() } else { max };
111
112                                    // 0. ensure enough state allocated
113                                    while edges.len() <= src { edges.push(Vec::new()); }
114                                    while ranks.len() <= src { ranks.push(1_000); }
115                                    while diffs.len() <= src { diffs.push(0); }
116
117                                    // 1. subtract previous distribution.
118                                    allocate(ranks[src], &edges[src][..], &mut delta);
119                                    for x in delta.iter_mut() { x.1 *= -1; }
120
121                                    // 2. update ranks.
122                                    diffs[src] += diff;
123                                    if diffs[src].abs() >= 6 {
124                                        ranks[src] += diffs[src];
125                                        diffs[src] = 0;
126                                    }
127
128                                    // 3. re-distribute allocations.
129                                    allocate(ranks[src], &edges[src][..], &mut delta);
130
131                                    // 4. compact down and send cumulative changes.
132                                    compact(&mut delta);
133                                    for (dst, diff) in delta.drain(..) {
134                                        session.give((dst, diff));
135                                    }
136                                }
137
138                                println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139                            }
140                        }
141
142                        rank_stash.retain(|_key, val| !val.is_empty());
143
144                    }
145                }
146            );
147
148            changes
149                .probe_with(&probe)
150                .connect_loop(handle);
151
152        });
153
154        let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155        let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157        let index = worker.index();
158        // Generate roughly random data.
159        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160        let hasher = BuildHasherDefault::<DefaultHasher>::new();
161        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162                                             hasher.hash_one(&(i,index,1)) as usize % nodes));
163        let remove = insert.clone();
164
165        for ins in (&mut insert).take(edges / worker.peers()) {
166            input.send((ins, 1));
167        }
168
169        input.advance_to(1);
170
171        while probe.less_than(input.time()) {
172            worker.step();
173        }
174
175        for (ins, del) in insert.zip(remove).take(1000) {
176            input.send((ins, 1));
177            input.send((del,-1));
178            input.advance_to(input.time() + 1);
179            while probe.less_than(input.time()) {
180                worker.step();
181            }
182        }
183
184    }).unwrap(); // asserts error-free execution;
185}
Source

pub fn update_iter<I>(&mut self, updates: I) -> Drain<'_, [(T, i64); 2]>
where T: Clone + PartialOrder + Ord, I: IntoIterator<Item = (T, i64)>,

Applies updates to the antichain and enumerates any changes.

§Examples
 use timely::progress::frontier::{AntichainRef, MutableAntichain};

 let mut frontier = MutableAntichain::new_bottom(1u64);
 let changes =
 frontier
     .update_iter(vec![(1, -1), (2, 7)])
     .collect::<Vec<_>>();

 assert!(frontier.frontier() == AntichainRef::new(&[2]));
 assert!(changes == vec![(1, -1), (2, 1)]);
Source

pub fn count_for<O>(&self, query_time: &O) -> i64
where T: PartialEq<O>,

Reports the count for a queried time.

Source

pub fn updates(&mut self) -> impl Iterator<Item = &(T, i64)>
where T: Clone + PartialOrder + Ord,

Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.

Rebuilds the internal representation before revealing times and frequencies.

Trait Implementations§

Source§

impl<T: Clone> Clone for MutableAntichain<T>

Source§

fn clone(&self) -> MutableAntichain<T>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug> Debug for MutableAntichain<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T> Default for MutableAntichain<T>

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<'de, T> Deserialize<'de> for MutableAntichain<T>
where T: Deserialize<'de>,

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
Source§

impl<T: PartialOrder + Ord + Clone> From<Antichain<T>> for MutableAntichain<T>

Source§

fn from(antichain: Antichain<T>) -> Self

Converts to this type from the input type.
Source§

impl<'a, T: PartialOrder + Ord + Clone> From<AntichainRef<'a, T>> for MutableAntichain<T>

Source§

fn from(antichain: AntichainRef<'a, T>) -> Self

Converts to this type from the input type.
Source§

impl<T> FromIterator<(T, i64)> for MutableAntichain<T>
where T: Clone + PartialOrder + Ord,

Source§

fn from_iter<I>(iterator: I) -> Self
where I: IntoIterator<Item = (T, i64)>,

Creates a value from an iterator. Read more
Source§

impl<T> Serialize for MutableAntichain<T>
where T: Serialize,

Source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

§

impl<T> Freeze for MutableAntichain<T>
where T: Freeze,

§

impl<T> RefUnwindSafe for MutableAntichain<T>
where T: RefUnwindSafe,

§

impl<T> Send for MutableAntichain<T>
where T: Send,

§

impl<T> Sync for MutableAntichain<T>
where T: Sync,

§

impl<T> Unpin for MutableAntichain<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for MutableAntichain<T>
where T: UnsafeUnpin,

§

impl<T> UnwindSafe for MutableAntichain<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> Data for T
where T: Send + Any + Serialize + for<'a> Deserialize<'a>,

Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,