Skip to main content

tange_collection/collection/
memory.rs

1//! MemoryCollection
2//! ---
3//! MemoryCollection provides a variety of dataflow operators for consuming and mutating
4//! data.  Unlike its Disk-based counterpart, DiskCollection, MemoryCollection keeps all
5//! data in memory, maximizing speed.
6//!
7
8extern crate serde;
9use std::fs;
10use std::any::Any;
11use std::io::prelude::*;
12use std::io::BufWriter;
13use std::hash::Hash;
14use std::sync::Arc;
15
16use self::serde::{Deserialize,Serialize};
17
18use collection::disk::DiskCollection;
19use tange::deferred::{Deferred, batch_apply, tree_reduce};
20use tange::scheduler::Scheduler;
21use partitioned::{join_on_key as jok, partition, partition_by_key, fold_by, concat};
22use interfaces::{Memory,Disk};
23use super::emit;
24
25
26/// MemoryCollection struct
27#[derive(Clone)]
28pub struct MemoryCollection<A>  {
29    partitions: Vec<Deferred<Vec<A>>>
30}
31
32impl <A: Any + Send + Sync + Clone> MemoryCollection<A> {
33
34    /// Creates a MemoryCollection from a set of Deferred objects.
35    pub fn from_defs(vs: Vec<Deferred<Vec<A>>>) -> MemoryCollection<A> {
36        MemoryCollection {
37            partitions: vs
38        }
39    }
40
41    /// Provides raw access to the underlying Deferred objects
42    pub fn to_defs(&self) -> &Vec<Deferred<Vec<A>>> {
43        &self.partitions
44    }
45
46    /// Creates a new MemoryCollection from a Vec of items
47    /// ```rust
48    ///   extern crate tange;
49    ///   extern crate tange_collection;
50    ///   use tange::scheduler::GreedyScheduler;
51    ///   use tange_collection::collection::memory::MemoryCollection;
52    ///   
53    ///   let col = MemoryCollection::from_vec(vec![1,2,3usize]);
54    ///   assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
55    /// ```
56    pub fn from_vec(vs: Vec<A>) -> MemoryCollection<A> {
57        MemoryCollection {
58            partitions: vec![Deferred::lift(vs, None)],
59        }
60    }
61
62    /// Returns the current number of data partitions 
63    pub fn n_partitions(&self) -> usize {
64        self.partitions.len()
65    }
66
67    /// Concatentates two collections into a single Collection
68    /// ```rust
69    ///   extern crate tange;
70    ///   extern crate tange_collection;
71    ///   use tange::scheduler::GreedyScheduler;
72    ///   use tange_collection::collection::memory::MemoryCollection;
73    ///   
74    ///   let one = MemoryCollection::from_vec(vec![1,2,3usize]);
75    ///   let two = MemoryCollection::from_vec(vec![4usize, 5, 6]);
76    ///   let cat = one.concat(&two);
77    ///   assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
78    /// ```
79    pub fn concat(&self, other: &MemoryCollection<A>) -> MemoryCollection<A> {
80        let mut nps: Vec<_> = self.partitions.iter()
81            .map(|p| (*p).clone()).collect();
82
83        for p in other.partitions.iter() {
84            nps.push(p.clone());
85        }
86
87        MemoryCollection { partitions: nps }
88    }
89    
90    /// Maps a function over the values in the DiskCollection, returning a new DiskCollection
91    /// ```rust
92    ///   extern crate tange;
93    ///   extern crate tange_collection;
94    ///   use tange::scheduler::GreedyScheduler;
95    ///   use tange_collection::collection::memory::MemoryCollection;
96    ///   
97    ///   let one = MemoryCollection::from_vec(vec![1,2,3usize]);
98    ///   let strings = one.map(|i| format!("{}", i));
99    ///   assert_eq!(strings.run(&GreedyScheduler::new()), 
100    ///     Some(vec!["1".into(),"2".into(),"3".into()]));
101    /// ```
102    pub fn map<
103        B: Any + Send + Sync + Clone, 
104        F: 'static + Sync + Send + Clone + Fn(&A) -> B
105    >(&self, f: F) -> MemoryCollection<B> {
106        self.emit(move |x, emitter| {
107            emitter(f(x))
108        })
109    }
110
111    /// Filters out items in the collection that fail the predicate.
112    /// ```rust
113    ///   extern crate tange;
114    ///   extern crate tange_collection;
115    ///   use tange::scheduler::GreedyScheduler;
116    ///   use tange_collection::collection::memory::MemoryCollection;
117    ///   
118    ///   let col = MemoryCollection::from_vec(vec![1,2,3usize]);
119    ///   let odds = col.filter(|x| x % 2 == 1);
120    ///   assert_eq!(odds.run(&GreedyScheduler::new()), 
121    ///     Some(vec![1, 3usize]));
122    /// ```
123
124    pub fn filter<
125        F: 'static + Sync + Send + Clone + Fn(&A) -> bool
126    >(&self, f: F) -> MemoryCollection<A> {
127        self.emit(move |x, emitter| {
128            if f(x) { 
129                emitter(x.clone())
130            }
131        })
132    }
133    
134    /// Re-partitions a collection by the number of provided chunks.  It uniformly distributes data from each old partition into each new partition.
135    /// ```rust
136    ///   extern crate tange;
137    ///   extern crate tange_collection;
138    ///   use tange::scheduler::GreedyScheduler;
139    ///   use tange_collection::collection::memory::MemoryCollection;
140    ///   
141    ///   let col = MemoryCollection::from_vec(vec![1,2,3usize]);
142    ///   assert_eq!(col.n_partitions(), 1);
143    ///   let two = col.split(2);
144    ///   assert_eq!(two.n_partitions(), 2);
145    /// ```
146    pub fn split(&self, n_chunks: usize) -> MemoryCollection<A> {
147        self.partition(n_chunks, |idx, _k| idx)
148    }
149
150    /// Maps over all items in a collection, optionally emitting new values.  It can be used
151    /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
152    /// ```rust
153    ///   extern crate tange;
154    ///   extern crate tange_collection;
155    ///   use tange::scheduler::GreedyScheduler;
156    ///   use tange_collection::collection::memory::MemoryCollection;
157    ///   
158    ///   let col = MemoryCollection::from_vec(vec![1,2,3usize]);
159    ///   let new = col.emit(|item, emitter| {
160    ///     if item % 2 == 0 {
161    ///         emitter(format!("{}!", item));
162    ///     }
163    ///   });
164    ///   assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
165    /// ```
166
167    pub fn emit<
168        B: Any + Send + Sync + Clone,
169        F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
170    >(&self, f: F) -> MemoryCollection<B> {
171        let parts = emit(&self.partitions, Memory, f);
172
173        MemoryCollection { partitions: parts }
174    }
175
176    /// Maps over all items in a collection, emitting new values.  It can be used
177    /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
178    /// `emit_to_disk` differs from the original `emit` by writing the emitted values directly
179    /// to disk, returning a DiskCollection instead of MemoryCollection.  This makes it convenient to switch to out-of-core when needed.
180    /// ```rust
181    ///   extern crate tange;
182    ///   extern crate tange_collection;
183    ///   use tange::scheduler::GreedyScheduler;
184    ///   use tange_collection::collection::memory::MemoryCollection;
185    ///   
186    ///   let col = MemoryCollection::from_vec(vec![1,2,3usize]);
187    ///   let new = col.emit_to_disk("/tmp".into(), |item, emitter| {
188    ///     if item % 2 == 0 {
189    ///         emitter(format!("{}!", item));
190    ///     }
191    ///   });
192    ///   assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
193    /// ```
194
195    pub fn emit_to_disk<
196        B: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>,
197        F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
198    >(&self, path: String, f: F) -> DiskCollection<B> {
199        let parts = emit(&self.partitions, Disk::from_str(&path), f);
200
201        DiskCollection::from_stores(path, parts)
202    }
203
204    /// Re-partitions data into N new partitions by the given function.  The user provided
205    /// function is used as a hash function, mapping the returned value to a partition index.
206    /// This makes it useful for managing which partition data ends up!
207    /// ```rust
208    ///   extern crate tange;
209    ///   extern crate tange_collection;
210    ///   use tange::scheduler::GreedyScheduler;
211    ///   use tange_collection::collection::memory::MemoryCollection;
212    ///   
213    ///   let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
214    ///   let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
215    ///   
216    ///   assert_eq!(new_col.n_partitions(), 2);
217    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
218    /// ```
219    pub fn partition<
220        F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize
221    >(&self, partitions: usize, f: F) -> MemoryCollection<A> {
222        let new_chunks = partition(&self.partitions, 
223                                   partitions, 
224                                   f);
225        // Loop over each bucket
226        MemoryCollection { partitions: new_chunks }
227    }
228
229    /// Folds and accumulates values across multiple partitions into K new partitions.
230    /// This is also known as a "group by" with a following reducer.
231    ///
232    /// MemoryCollection first performs a block aggregation: that is, it combines values
233    /// within each partition first using the `binop` function.  It then hashes
234    /// each key to a new partition index, where it will then aggregate all keys using the
235    /// `reduce` function.
236    ///
237    /// ```rust
238    ///   extern crate tange;
239    ///   extern crate tange_collection;
240    ///   use tange::scheduler::GreedyScheduler;
241    ///   use tange_collection::collection::memory::MemoryCollection;
242    ///   
243    ///   let col = MemoryCollection::from_vec(vec![1,2,3,4,5usize]);
244    ///   // Sum all odds and evens together
245    ///   let group_sum = col.fold_by(|x| x % 2,
246    ///                               || 0usize,
247    ///                               |block_acc, item| {*block_acc += *item},
248    ///                               |part_acc1, part_acc2| {*part_acc1 += *part_acc2},
249    ///                               1)
250    ///                   .sort_by(|x| x.0);
251    ///   
252    ///   assert_eq!(group_sum.n_partitions(), 1);
253    ///   assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
254    /// ```
255
256    pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq,
257                   B: Any + Sync + Send + Clone,
258                   D: 'static + Sync + Send + Clone + Fn() -> B, 
259                   F: 'static + Sync + Send + Clone + Fn(&A) -> K, 
260                   O: 'static + Sync + Send + Clone + Fn(&mut B, &A) -> (),
261                   R: 'static + Sync + Send + Clone + Fn(&mut B, &B) -> ()>(
262        &self, key: F, default: D, binop: O, reduce: R, partitions: usize
263    ) -> MemoryCollection<(K,B)> {
264        let results = fold_by(&self.partitions, key, default, binop, 
265                              reduce, Vec::with_capacity(0), partitions);
266        MemoryCollection { partitions: results }
267    }
268
269    /// Simple function to re-partition values by a given key.  The return key is hashed
270    /// and moduloed by the new partition count to determine where it will end up.
271    /// ```rust
272    ///   extern crate tange;
273    ///   extern crate tange_collection;
274    ///   use tange::scheduler::GreedyScheduler;
275    ///   use tange_collection::collection::memory::MemoryCollection;
276    ///   
277    ///   let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
278    ///   let new_col = col.partition_by_key(2, |x| format!("{}", x));
279    ///   
280    ///   assert_eq!(new_col.n_partitions(), 2);
281    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
282    /// ```
283    pub fn partition_by_key<
284        K: Any + Sync + Send + Clone + Hash + Eq,
285        F: 'static + Sync + Send + Clone + Fn(&A) -> K
286    >(&self, n_chunks: usize, key: F) -> MemoryCollection<A> {
287        let results = partition_by_key(&self.partitions, n_chunks, key);
288        let groups = results.into_iter().map(|part| concat(&part).unwrap()).collect();
289        MemoryCollection {partitions: groups}
290    }
291
292    /// Sorts values within each partition by a key function.  If a global sort is desired,
293    /// the collection needs to be re-partitioned into a single partition
294    /// ```rust
295    ///   extern crate tange;
296    ///   extern crate tange_collection;
297    ///   use tange::scheduler::GreedyScheduler;
298    ///   use tange_collection::collection::memory::MemoryCollection;
299    ///   
300    ///   let col = MemoryCollection::from_vec(vec![1,2,3,4i32]);
301    ///   let new_col = col.sort_by(|x| -*x);
302    ///   
303    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
304    /// ```
305    pub fn sort_by<
306        K: Ord,
307        F: 'static + Sync + Send + Clone + Fn(&A) -> K
308    >(&self, key: F) -> MemoryCollection<A> {
309        let nps = batch_apply(&self.partitions, move |_idx, vs| {
310            let mut v2: Vec<_> = vs.clone();
311            v2.sort_by_key(|v| key(v));
312            v2
313        });
314        MemoryCollection { partitions: nps }
315    }
316
317    /// Inner Joins two collections by the provided key function.
318    /// If multiple values of the same key are found, they will be cross product for each
319    /// pair found.
320    /// ```rust
321    ///   extern crate tange;
322    ///   extern crate tange_collection;
323    ///   use tange::scheduler::GreedyScheduler;
324    ///   use tange_collection::collection::memory::MemoryCollection;
325    ///
326    ///   let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
327    ///   let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
328    ///   
329    ///   let na = MemoryCollection::from_vec(name_age);
330    ///   let nm = MemoryCollection::from_vec(name_money);
331    ///   let joined = na.join_on(&nm,
332    ///                           |nax| nax.0.clone(),
333    ///                           |nmx| nmx.0.clone(),
334    ///                           |nax, nmx| (nax.0.clone(), nax.1, nmx.1),
335    ///                           1);
336    ///   assert_eq!(joined.run(&GreedyScheduler::new()), 
337    ///           Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
338    /// ```
339
340    pub fn join_on<
341        K: Any + Sync + Send + Clone + Hash + Eq,
342        B: Any + Sync + Send + Clone,
343        C: Any + Sync + Send + Clone,
344        KF1: 'static + Sync + Send + Clone + Fn(&A) -> K,
345        KF2: 'static + Sync + Send + Clone + Fn(&B) -> K,
346        J:   'static + Sync + Send + Clone + Fn(&A, &B) -> C,
347    >(
348        &self, 
349        other: &MemoryCollection<B>, 
350        key1: KF1, 
351        key2: KF2,
352        joiner: J,
353        partitions: usize, 
354    ) -> MemoryCollection<(K,C)> {
355        // Group each by a common key
356        let p1 = self.map(move |x| (key1(x), x.clone()))
357            .partition_by_key(partitions, |x| x.0.clone());
358        let p2 = other.map(move |x| (key2(x), x.clone()))
359           .partition_by_key(partitions, |x| x.0.clone());
360
361        let mut new_parts = Vec::with_capacity(p1.partitions.len());
362        for (l, r) in p1.partitions.iter().zip(p2.partitions.iter()) {
363            new_parts.push(jok(l, r, Memory, joiner.clone()));
364        }
365
366        MemoryCollection { partitions: new_parts }
367    }
368
369    /// Executes the Collection, returning the result of the computation
370    pub fn run<S: Scheduler>(&self, s: &S) -> Option<Vec<A>> {
371        let cat = tree_reduce(&self.partitions, |x, y| {
372            let mut v1: Vec<_> = (*x).clone();
373            for yi in y {
374                v1.push(yi.clone());
375            }
376            v1
377        });
378        cat.and_then(|x| x.run(s))
379    }
380}
381
382impl <A: Any + Send + Sync + Clone> MemoryCollection<Vec<A>> {
383
384    /// Flattens a vector of values
385    /// ```rust
386    ///   extern crate tange;
387    ///   extern crate tange_collection;
388    ///   use tange::scheduler::GreedyScheduler;
389    ///   use tange_collection::collection::memory::MemoryCollection;
390    ///   
391    ///   let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
392    ///   let flattened = col.flatten();
393    ///   assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
394    /// ```
395
396    pub fn flatten(&self) -> MemoryCollection<A> {
397        self.emit(move |x, emitter| {
398            for xi in x {
399                emitter(xi.clone());
400            }
401        })
402    }
403}
404
405impl <A: Any + Send + Sync + Clone> MemoryCollection<A> {
406
407    /// Returns the number of items in the collection.
408    /// ```rust
409    ///   extern crate tange;
410    ///   extern crate tange_collection;
411    ///   use tange::scheduler::GreedyScheduler;
412    ///   use tange_collection::collection::memory::MemoryCollection;
413    ///   
414    ///   let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
415    ///   assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
416    ///   let flattened = col.flatten();
417    ///   assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
418    /// ```
419    pub fn count(&self) -> MemoryCollection<usize> {
420        let nps = batch_apply(&self.partitions, |_idx, vs| vs.len());
421        let count = tree_reduce(&nps, |x, y| x + y).unwrap();
422        let out = count.apply(|x| vec![*x]);
423        MemoryCollection { partitions: vec![out] }
424    }
425}
426
427impl <A: Any + Send + Sync + Clone + PartialEq + Hash + Eq> MemoryCollection<A> {
428
429    /// Computes the frequencies of the items in collection.
430    /// ```rust
431    ///   extern crate tange;
432    ///   extern crate tange_collection;
433    ///   use tange::scheduler::GreedyScheduler;
434    ///   use tange_collection::collection::memory::MemoryCollection;
435    ///   
436    ///   let col = MemoryCollection::from_vec(vec![1, 2, 1, 5, 1, 2]);
437    ///   let freqs = col.frequencies(1).sort_by(|x| x.0);
438    ///   assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
439    /// ```
440pub fn frequencies(&self, partitions: usize) -> MemoryCollection<(A, usize)> {
441        //self.partition(chunks, |x| x);
442        self.fold_by(|s| s.clone(), 
443                     || 0usize, 
444                     |acc, _l| *acc += 1, 
445                     |x, y| *x += *y, 
446                     partitions)
447    }
448}
449
450// Writes out data
451impl MemoryCollection<String> {
452
453    /// Writes each record in a collection to disk, newline delimited.
454    /// MemoryCollection will create a new file within the path for each partition.
455    pub fn sink(&self, path: &str) -> MemoryCollection<usize> {
456        let p: Arc<String> = Arc::new(path.to_owned());
457        let pats = batch_apply(&self.partitions, move |idx, vs| {
458            let p2: Arc<String> = p.clone();
459            let local: &str = &p2;
460            fs::create_dir_all(local)
461                .expect("Welp, something went terribly wrong when creating directory");
462
463            let file = fs::File::create(&format!("{}/{}", local, idx))
464                .expect("Issues opening file!");
465            let mut bw = BufWriter::new(file);
466
467            let size = vs.len();
468            for line in vs {
469                bw.write(line.as_bytes()).expect("Error writing out line");
470                bw.write(b"\n").expect("Error writing out line");
471            }
472
473            vec![size]
474        });
475        
476        MemoryCollection { partitions: pats }
477    }
478}
479
480impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> MemoryCollection<A> {
481
482    /// Copies the MemoryCollection to disk, returning a DiskCollection
483    pub fn to_disk(&self, path: String) -> DiskCollection<A> {
484        DiskCollection::from_memory(path, &self.partitions)
485    }
486}
487
488#[cfg(test)]
489mod test_lib {
490    use super::*;
491    use tange::scheduler::LeveledScheduler;
492
493    #[test]
494    fn test_fold_by() {
495        let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
496        let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 1);
497        let mut results = out.run(&mut LeveledScheduler).unwrap();
498        results.sort();
499        assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
500    }
501
502    #[test]
503    fn test_fold_by_parts() {
504        let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
505        let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 2);
506        assert_eq!(out.partitions.len(), 2);
507        let mut results = out.run(&mut LeveledScheduler).unwrap();
508        results.sort();
509        assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
510    }
511
512    #[test]
513    fn test_partition_by_key() {
514        let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
515        let computed = col.partition_by_key(2, |x| *x)
516            .sort_by(|x| *x);
517        assert_eq!(computed.partitions.len(), 2);
518        let results = computed.run(&mut LeveledScheduler).unwrap();
519        assert_eq!(results, vec![2, 2, 3, 1, 1]);
520    }
521
522    #[test]
523    fn test_partition() {
524        let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
525        let computed = col.partition(2, |_idx, x| x % 2)
526            .sort_by(|x| *x);
527        assert_eq!(computed.partitions.len(), 2);
528        let results = computed.run(&mut LeveledScheduler).unwrap();
529        assert_eq!(results, vec![2, 2, 1, 1, 3]);
530    }
531
532    #[test]
533    fn test_count() {
534        let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
535        let results = col.split(3).count().run(&mut LeveledScheduler).unwrap();
536        assert_eq!(results, vec![5]);
537    }
538
539    #[test]
540    fn test_join() {
541        let col1 = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
542        let col2 = MemoryCollection::from_vec(
543            vec![(2, 1.23f64), (3usize, 2.34)]);
544        let out = col1.join_on(&col2, |x| *x, |y| y.0, |x, y| {
545            (*x, y.1)
546        }, 5).split(1).sort_by(|x| x.0);
547        let results = out.run(&mut LeveledScheduler).unwrap();
548        let expected = vec![(2, (2, 1.23)), (2, (2, 1.23)), (3, (3, 2.34))];
549        assert_eq!(results, expected);
550    }
551
552    #[test]
553    fn test_emit() {
554        let results = MemoryCollection::from_vec(vec![1,2,3usize])
555            .emit(|num, emitter| {
556                for i in 0..*num {
557                    emitter(i);
558                }
559            })
560            .sort_by(|x| *x)
561            .run(&mut LeveledScheduler).unwrap();
562        let expected = vec![0, 0, 0, 1, 1, 2];
563        assert_eq!(results, expected);
564    }
565
566    #[test]
567    fn test_sort() {
568        let results = MemoryCollection::from_vec(vec![1, 3, 2usize])
569            .sort_by(|x| *x)
570            .run(&mut LeveledScheduler).unwrap();
571        let expected = vec![1, 2, 3];
572        assert_eq!(results, expected);
573    }
574
575}