1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
//! Aggregates the weights of equal records into at most one record. //! //! As differential dataflow streams are unordered and taken to be the accumulation of all records, //! no semantic change happens via `consolidate`. However, there is a practical difference between //! a collection that aggregates down to zero records, and one that actually has no records. The //! underlying system can more clearly see that no work must be done in the later case, and we can //! drop out of, e.g. iterative computations. //! //! #Examples //! //! This example performs a standard "word count", where each line of text is split into multiple //! words, each word is converted to a word with count 1, and `consolidate` then accumulates the //! counts of equivalent words. //! //! ```ignore //! stream.flat_map(|line| line.split_whitespace()) //! .map(|word| (word, 1)) //! .consolidate(); //! ``` use std::rc::Rc; use std::fmt::Debug; use linear_map::LinearMap; use timely::dataflow::*; use timely::dataflow::operators::*; use timely::dataflow::channels::pact::Exchange; use timely_sort::{LSBRadixSorter, Unsigned}; use collection::Lookup; use iterators::coalesce::Coalesce; use ::{Collection, Data}; /// An extension method for consolidating weighted streams. pub trait ConsolidateExt<D: Data> { /// Aggregates the weights of equal records into at most one record. /// /// This method uses the type `D`'s `hashed()` method to partition the data. /// /// #Examples /// /// In the following fragment, `result` contains only `(1, 3)`: /// /// ```ignore /// let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope); /// let collection = Collection::new(stream); /// let result = collection.consolidate(); /// ``` fn consolidate(&self) -> Self; /// Aggregates the weights of equal records into at most one record, partitions the /// data using the supplied partition function. /// /// Note that `consolidate_by` attempts to aggregate weights as it goes, to ensure /// that it does not consume more memory than is required of its collection. It does /// among blocks of records with the same `part` value, so if you just set all part /// values to the same value, it may not do a great job because you'll have lots of /// blocks with distinct values. Just, bear that in mind if you want to be clever. /// /// #Examples /// /// In the following fragment, `result` contains only `(1, 3)`: /// /// ```ignore /// let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope); /// let collection = Collection::new(stream); /// let result = collection.consolidate_by(|&x| x); /// ``` fn consolidate_by<U: Unsigned, F: Fn(&D)->U+'static>(&self, part: F) -> Self; } impl<G: Scope, D: Ord+Data+Debug> ConsolidateExt<D> for Collection<G, D> { fn consolidate(&self) -> Self { self.consolidate_by(|x| x.hashed()) } fn consolidate_by<U: Unsigned, F: Fn(&D)->U+'static>(&self, part: F) -> Self { let mut inputs = LinearMap::new(); // LinearMap<G::Timestamp, Vec<(D, i32)>> let part1 = Rc::new(part); let part2 = part1.clone(); let exch = Exchange::new(move |&(ref x,_)| (*part1)(x).as_u64()); Collection::new(self.inner.unary_notify(exch, "Consolidate", vec![], move |input, output, notificator| { // Consolidation needs to address the issue that the amount of incoming data // may be too large to maintain in queues, but small enough once aggregated. // Although arbitrarily large queues *should* work at sequential disk io (?) // it seems like we should be smarter than that. We could overflow disk when // counting crazy 12-cliques or something, right? // So, we can't just pop data off and enqueue it, we need to do something like // sort and consolidate what we have, or something like that. We know how to // sort and consolidate (see below), but who knows how we should be doing merges // and weird stuff like that... Probably power-of-two merges or somesuch... // If we thought we were unlikely to have collisions, we could just sort each // of the hunks we get out of the radix sorter and coalesce them. This could be // arbitrarily ineffective if we have lots of collisions though (consolidating // by a key of a (key,val) pair, for some reason). Let's try it for now. input.for_each(|index, data| { // make large to turn off compaction. let default_threshold = usize::max_value(); // let default_threshold = 256 << 10; // an entry stores a sorter (the data), a current count, and a compaction threshold. let entry = inputs.entry_or_insert(index.time(), || (LSBRadixSorter::new(), 0, default_threshold)); let (ref mut sorter, ref mut count, ref mut thresh) = *entry; *count += data.len(); sorter.extend(data.drain(..), &|x| (*part2)(&x.0)); if count > thresh { *count = 0; *thresh = 0; // pull out blocks sorted by the hash; coalesce each. let finished = sorter.finish(&|x| (*part2)(&x.0)); for mut block in finished { let mut finger = 0; for i in 1..block.len() { if block[finger].0 == block[i].0 { block[finger].1 += block[i].1; block[i].1 = 0; } else { finger = i; } } block.retain(|x| x.1 != 0); *thresh += block.len(); sorter.push_batch(block, &|x| (*part2)(&x.0)); } if *thresh < default_threshold { *thresh = default_threshold; } } notificator.notify_at(index); }); // 2. go through each time of interest that has reached completion notificator.for_each(|index, _count, _notificator| { // pull out sorter, ignore count and thresh (irrelevant now). if let Some((mut sorter, _, _)) = inputs.remove_key(&index) { let mut session = output.session(&index); let mut buffer = vec![]; let mut current = 0; let source = sorter.finish(&|x| (*part2)(&x.0)); for (datum, wgt) in source.into_iter().flat_map(|x| x.into_iter()) { let hash = (*part2)(&datum).as_u64(); if buffer.len() > 0 && hash != current { buffer.sort_by(|x: &(D,i32),y: &(D,i32)| x.0.cmp(&y.0)); session.give_iterator(buffer.drain(..).coalesce()); } buffer.push((datum,wgt)); current = hash; } if buffer.len() > 0 { buffer.sort_by(|x: &(D,i32),y: &(D,i32)| x.0.cmp(&y.0)); session.give_iterator(buffer.drain(..).coalesce()); } } }); })) } }