tange_collection/collection/memory.rs
1//! MemoryCollection
2//! ---
3//! MemoryCollection provides a variety of dataflow operators for consuming and mutating
4//! data. Unlike its Disk-based counterpart, DiskCollection, MemoryCollection keeps all
5//! data in memory, maximizing speed.
6//!
7
8extern crate serde;
9use std::fs;
10use std::any::Any;
11use std::io::prelude::*;
12use std::io::BufWriter;
13use std::hash::Hash;
14use std::sync::Arc;
15
16use self::serde::{Deserialize,Serialize};
17
18use collection::disk::DiskCollection;
19use tange::deferred::{Deferred, batch_apply, tree_reduce};
20use tange::scheduler::Scheduler;
21use partitioned::{join_on_key as jok, partition, partition_by_key, fold_by, concat};
22use interfaces::{Memory,Disk};
23use super::emit;
24
25
26/// MemoryCollection struct
27#[derive(Clone)]
28pub struct MemoryCollection<A> {
29 partitions: Vec<Deferred<Vec<A>>>
30}
31
32impl <A: Any + Send + Sync + Clone> MemoryCollection<A> {
33
34 /// Creates a MemoryCollection from a set of Deferred objects.
35 pub fn from_defs(vs: Vec<Deferred<Vec<A>>>) -> MemoryCollection<A> {
36 MemoryCollection {
37 partitions: vs
38 }
39 }
40
41 /// Provides raw access to the underlying Deferred objects
42 pub fn to_defs(&self) -> &Vec<Deferred<Vec<A>>> {
43 &self.partitions
44 }
45
46 /// Creates a new MemoryCollection from a Vec of items
47 /// ```rust
48 /// extern crate tange;
49 /// extern crate tange_collection;
50 /// use tange::scheduler::GreedyScheduler;
51 /// use tange_collection::collection::memory::MemoryCollection;
52 ///
53 /// let col = MemoryCollection::from_vec(vec![1,2,3usize]);
54 /// assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
55 /// ```
56 pub fn from_vec(vs: Vec<A>) -> MemoryCollection<A> {
57 MemoryCollection {
58 partitions: vec![Deferred::lift(vs, None)],
59 }
60 }
61
62 /// Returns the current number of data partitions
63 pub fn n_partitions(&self) -> usize {
64 self.partitions.len()
65 }
66
67 /// Concatentates two collections into a single Collection
68 /// ```rust
69 /// extern crate tange;
70 /// extern crate tange_collection;
71 /// use tange::scheduler::GreedyScheduler;
72 /// use tange_collection::collection::memory::MemoryCollection;
73 ///
74 /// let one = MemoryCollection::from_vec(vec![1,2,3usize]);
75 /// let two = MemoryCollection::from_vec(vec![4usize, 5, 6]);
76 /// let cat = one.concat(&two);
77 /// assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
78 /// ```
79 pub fn concat(&self, other: &MemoryCollection<A>) -> MemoryCollection<A> {
80 let mut nps: Vec<_> = self.partitions.iter()
81 .map(|p| (*p).clone()).collect();
82
83 for p in other.partitions.iter() {
84 nps.push(p.clone());
85 }
86
87 MemoryCollection { partitions: nps }
88 }
89
90 /// Maps a function over the values in the DiskCollection, returning a new DiskCollection
91 /// ```rust
92 /// extern crate tange;
93 /// extern crate tange_collection;
94 /// use tange::scheduler::GreedyScheduler;
95 /// use tange_collection::collection::memory::MemoryCollection;
96 ///
97 /// let one = MemoryCollection::from_vec(vec![1,2,3usize]);
98 /// let strings = one.map(|i| format!("{}", i));
99 /// assert_eq!(strings.run(&GreedyScheduler::new()),
100 /// Some(vec!["1".into(),"2".into(),"3".into()]));
101 /// ```
102 pub fn map<
103 B: Any + Send + Sync + Clone,
104 F: 'static + Sync + Send + Clone + Fn(&A) -> B
105 >(&self, f: F) -> MemoryCollection<B> {
106 self.emit(move |x, emitter| {
107 emitter(f(x))
108 })
109 }
110
111 /// Filters out items in the collection that fail the predicate.
112 /// ```rust
113 /// extern crate tange;
114 /// extern crate tange_collection;
115 /// use tange::scheduler::GreedyScheduler;
116 /// use tange_collection::collection::memory::MemoryCollection;
117 ///
118 /// let col = MemoryCollection::from_vec(vec![1,2,3usize]);
119 /// let odds = col.filter(|x| x % 2 == 1);
120 /// assert_eq!(odds.run(&GreedyScheduler::new()),
121 /// Some(vec![1, 3usize]));
122 /// ```
123
124 pub fn filter<
125 F: 'static + Sync + Send + Clone + Fn(&A) -> bool
126 >(&self, f: F) -> MemoryCollection<A> {
127 self.emit(move |x, emitter| {
128 if f(x) {
129 emitter(x.clone())
130 }
131 })
132 }
133
134 /// Re-partitions a collection by the number of provided chunks. It uniformly distributes data from each old partition into each new partition.
135 /// ```rust
136 /// extern crate tange;
137 /// extern crate tange_collection;
138 /// use tange::scheduler::GreedyScheduler;
139 /// use tange_collection::collection::memory::MemoryCollection;
140 ///
141 /// let col = MemoryCollection::from_vec(vec![1,2,3usize]);
142 /// assert_eq!(col.n_partitions(), 1);
143 /// let two = col.split(2);
144 /// assert_eq!(two.n_partitions(), 2);
145 /// ```
146 pub fn split(&self, n_chunks: usize) -> MemoryCollection<A> {
147 self.partition(n_chunks, |idx, _k| idx)
148 }
149
150 /// Maps over all items in a collection, optionally emitting new values. It can be used
151 /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
152 /// ```rust
153 /// extern crate tange;
154 /// extern crate tange_collection;
155 /// use tange::scheduler::GreedyScheduler;
156 /// use tange_collection::collection::memory::MemoryCollection;
157 ///
158 /// let col = MemoryCollection::from_vec(vec![1,2,3usize]);
159 /// let new = col.emit(|item, emitter| {
160 /// if item % 2 == 0 {
161 /// emitter(format!("{}!", item));
162 /// }
163 /// });
164 /// assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
165 /// ```
166
167 pub fn emit<
168 B: Any + Send + Sync + Clone,
169 F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
170 >(&self, f: F) -> MemoryCollection<B> {
171 let parts = emit(&self.partitions, Memory, f);
172
173 MemoryCollection { partitions: parts }
174 }
175
176 /// Maps over all items in a collection, emitting new values. It can be used
177 /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
178 /// `emit_to_disk` differs from the original `emit` by writing the emitted values directly
179 /// to disk, returning a DiskCollection instead of MemoryCollection. This makes it convenient to switch to out-of-core when needed.
180 /// ```rust
181 /// extern crate tange;
182 /// extern crate tange_collection;
183 /// use tange::scheduler::GreedyScheduler;
184 /// use tange_collection::collection::memory::MemoryCollection;
185 ///
186 /// let col = MemoryCollection::from_vec(vec![1,2,3usize]);
187 /// let new = col.emit_to_disk("/tmp".into(), |item, emitter| {
188 /// if item % 2 == 0 {
189 /// emitter(format!("{}!", item));
190 /// }
191 /// });
192 /// assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
193 /// ```
194
195 pub fn emit_to_disk<
196 B: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>,
197 F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
198 >(&self, path: String, f: F) -> DiskCollection<B> {
199 let parts = emit(&self.partitions, Disk::from_str(&path), f);
200
201 DiskCollection::from_stores(path, parts)
202 }
203
204 /// Re-partitions data into N new partitions by the given function. The user provided
205 /// function is used as a hash function, mapping the returned value to a partition index.
206 /// This makes it useful for managing which partition data ends up!
207 /// ```rust
208 /// extern crate tange;
209 /// extern crate tange_collection;
210 /// use tange::scheduler::GreedyScheduler;
211 /// use tange_collection::collection::memory::MemoryCollection;
212 ///
213 /// let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
214 /// let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
215 ///
216 /// assert_eq!(new_col.n_partitions(), 2);
217 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
218 /// ```
219 pub fn partition<
220 F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize
221 >(&self, partitions: usize, f: F) -> MemoryCollection<A> {
222 let new_chunks = partition(&self.partitions,
223 partitions,
224 f);
225 // Loop over each bucket
226 MemoryCollection { partitions: new_chunks }
227 }
228
229 /// Folds and accumulates values across multiple partitions into K new partitions.
230 /// This is also known as a "group by" with a following reducer.
231 ///
232 /// MemoryCollection first performs a block aggregation: that is, it combines values
233 /// within each partition first using the `binop` function. It then hashes
234 /// each key to a new partition index, where it will then aggregate all keys using the
235 /// `reduce` function.
236 ///
237 /// ```rust
238 /// extern crate tange;
239 /// extern crate tange_collection;
240 /// use tange::scheduler::GreedyScheduler;
241 /// use tange_collection::collection::memory::MemoryCollection;
242 ///
243 /// let col = MemoryCollection::from_vec(vec![1,2,3,4,5usize]);
244 /// // Sum all odds and evens together
245 /// let group_sum = col.fold_by(|x| x % 2,
246 /// || 0usize,
247 /// |block_acc, item| {*block_acc += *item},
248 /// |part_acc1, part_acc2| {*part_acc1 += *part_acc2},
249 /// 1)
250 /// .sort_by(|x| x.0);
251 ///
252 /// assert_eq!(group_sum.n_partitions(), 1);
253 /// assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
254 /// ```
255
256 pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq,
257 B: Any + Sync + Send + Clone,
258 D: 'static + Sync + Send + Clone + Fn() -> B,
259 F: 'static + Sync + Send + Clone + Fn(&A) -> K,
260 O: 'static + Sync + Send + Clone + Fn(&mut B, &A) -> (),
261 R: 'static + Sync + Send + Clone + Fn(&mut B, &B) -> ()>(
262 &self, key: F, default: D, binop: O, reduce: R, partitions: usize
263 ) -> MemoryCollection<(K,B)> {
264 let results = fold_by(&self.partitions, key, default, binop,
265 reduce, Vec::with_capacity(0), partitions);
266 MemoryCollection { partitions: results }
267 }
268
269 /// Simple function to re-partition values by a given key. The return key is hashed
270 /// and moduloed by the new partition count to determine where it will end up.
271 /// ```rust
272 /// extern crate tange;
273 /// extern crate tange_collection;
274 /// use tange::scheduler::GreedyScheduler;
275 /// use tange_collection::collection::memory::MemoryCollection;
276 ///
277 /// let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
278 /// let new_col = col.partition_by_key(2, |x| format!("{}", x));
279 ///
280 /// assert_eq!(new_col.n_partitions(), 2);
281 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
282 /// ```
283 pub fn partition_by_key<
284 K: Any + Sync + Send + Clone + Hash + Eq,
285 F: 'static + Sync + Send + Clone + Fn(&A) -> K
286 >(&self, n_chunks: usize, key: F) -> MemoryCollection<A> {
287 let results = partition_by_key(&self.partitions, n_chunks, key);
288 let groups = results.into_iter().map(|part| concat(&part).unwrap()).collect();
289 MemoryCollection {partitions: groups}
290 }
291
292 /// Sorts values within each partition by a key function. If a global sort is desired,
293 /// the collection needs to be re-partitioned into a single partition
294 /// ```rust
295 /// extern crate tange;
296 /// extern crate tange_collection;
297 /// use tange::scheduler::GreedyScheduler;
298 /// use tange_collection::collection::memory::MemoryCollection;
299 ///
300 /// let col = MemoryCollection::from_vec(vec![1,2,3,4i32]);
301 /// let new_col = col.sort_by(|x| -*x);
302 ///
303 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
304 /// ```
305 pub fn sort_by<
306 K: Ord,
307 F: 'static + Sync + Send + Clone + Fn(&A) -> K
308 >(&self, key: F) -> MemoryCollection<A> {
309 let nps = batch_apply(&self.partitions, move |_idx, vs| {
310 let mut v2: Vec<_> = vs.clone();
311 v2.sort_by_key(|v| key(v));
312 v2
313 });
314 MemoryCollection { partitions: nps }
315 }
316
317 /// Inner Joins two collections by the provided key function.
318 /// If multiple values of the same key are found, they will be cross product for each
319 /// pair found.
320 /// ```rust
321 /// extern crate tange;
322 /// extern crate tange_collection;
323 /// use tange::scheduler::GreedyScheduler;
324 /// use tange_collection::collection::memory::MemoryCollection;
325 ///
326 /// let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
327 /// let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
328 ///
329 /// let na = MemoryCollection::from_vec(name_age);
330 /// let nm = MemoryCollection::from_vec(name_money);
331 /// let joined = na.join_on(&nm,
332 /// |nax| nax.0.clone(),
333 /// |nmx| nmx.0.clone(),
334 /// |nax, nmx| (nax.0.clone(), nax.1, nmx.1),
335 /// 1);
336 /// assert_eq!(joined.run(&GreedyScheduler::new()),
337 /// Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
338 /// ```
339
340 pub fn join_on<
341 K: Any + Sync + Send + Clone + Hash + Eq,
342 B: Any + Sync + Send + Clone,
343 C: Any + Sync + Send + Clone,
344 KF1: 'static + Sync + Send + Clone + Fn(&A) -> K,
345 KF2: 'static + Sync + Send + Clone + Fn(&B) -> K,
346 J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C,
347 >(
348 &self,
349 other: &MemoryCollection<B>,
350 key1: KF1,
351 key2: KF2,
352 joiner: J,
353 partitions: usize,
354 ) -> MemoryCollection<(K,C)> {
355 // Group each by a common key
356 let p1 = self.map(move |x| (key1(x), x.clone()))
357 .partition_by_key(partitions, |x| x.0.clone());
358 let p2 = other.map(move |x| (key2(x), x.clone()))
359 .partition_by_key(partitions, |x| x.0.clone());
360
361 let mut new_parts = Vec::with_capacity(p1.partitions.len());
362 for (l, r) in p1.partitions.iter().zip(p2.partitions.iter()) {
363 new_parts.push(jok(l, r, Memory, joiner.clone()));
364 }
365
366 MemoryCollection { partitions: new_parts }
367 }
368
369 /// Executes the Collection, returning the result of the computation
370 pub fn run<S: Scheduler>(&self, s: &S) -> Option<Vec<A>> {
371 let cat = tree_reduce(&self.partitions, |x, y| {
372 let mut v1: Vec<_> = (*x).clone();
373 for yi in y {
374 v1.push(yi.clone());
375 }
376 v1
377 });
378 cat.and_then(|x| x.run(s))
379 }
380}
381
382impl <A: Any + Send + Sync + Clone> MemoryCollection<Vec<A>> {
383
384 /// Flattens a vector of values
385 /// ```rust
386 /// extern crate tange;
387 /// extern crate tange_collection;
388 /// use tange::scheduler::GreedyScheduler;
389 /// use tange_collection::collection::memory::MemoryCollection;
390 ///
391 /// let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
392 /// let flattened = col.flatten();
393 /// assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
394 /// ```
395
396 pub fn flatten(&self) -> MemoryCollection<A> {
397 self.emit(move |x, emitter| {
398 for xi in x {
399 emitter(xi.clone());
400 }
401 })
402 }
403}
404
405impl <A: Any + Send + Sync + Clone> MemoryCollection<A> {
406
407 /// Returns the number of items in the collection.
408 /// ```rust
409 /// extern crate tange;
410 /// extern crate tange_collection;
411 /// use tange::scheduler::GreedyScheduler;
412 /// use tange_collection::collection::memory::MemoryCollection;
413 ///
414 /// let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
415 /// assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
416 /// let flattened = col.flatten();
417 /// assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
418 /// ```
419 pub fn count(&self) -> MemoryCollection<usize> {
420 let nps = batch_apply(&self.partitions, |_idx, vs| vs.len());
421 let count = tree_reduce(&nps, |x, y| x + y).unwrap();
422 let out = count.apply(|x| vec![*x]);
423 MemoryCollection { partitions: vec![out] }
424 }
425}
426
427impl <A: Any + Send + Sync + Clone + PartialEq + Hash + Eq> MemoryCollection<A> {
428
429 /// Computes the frequencies of the items in collection.
430 /// ```rust
431 /// extern crate tange;
432 /// extern crate tange_collection;
433 /// use tange::scheduler::GreedyScheduler;
434 /// use tange_collection::collection::memory::MemoryCollection;
435 ///
436 /// let col = MemoryCollection::from_vec(vec![1, 2, 1, 5, 1, 2]);
437 /// let freqs = col.frequencies(1).sort_by(|x| x.0);
438 /// assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
439 /// ```
440pub fn frequencies(&self, partitions: usize) -> MemoryCollection<(A, usize)> {
441 //self.partition(chunks, |x| x);
442 self.fold_by(|s| s.clone(),
443 || 0usize,
444 |acc, _l| *acc += 1,
445 |x, y| *x += *y,
446 partitions)
447 }
448}
449
450// Writes out data
451impl MemoryCollection<String> {
452
453 /// Writes each record in a collection to disk, newline delimited.
454 /// MemoryCollection will create a new file within the path for each partition.
455 pub fn sink(&self, path: &str) -> MemoryCollection<usize> {
456 let p: Arc<String> = Arc::new(path.to_owned());
457 let pats = batch_apply(&self.partitions, move |idx, vs| {
458 let p2: Arc<String> = p.clone();
459 let local: &str = &p2;
460 fs::create_dir_all(local)
461 .expect("Welp, something went terribly wrong when creating directory");
462
463 let file = fs::File::create(&format!("{}/{}", local, idx))
464 .expect("Issues opening file!");
465 let mut bw = BufWriter::new(file);
466
467 let size = vs.len();
468 for line in vs {
469 bw.write(line.as_bytes()).expect("Error writing out line");
470 bw.write(b"\n").expect("Error writing out line");
471 }
472
473 vec![size]
474 });
475
476 MemoryCollection { partitions: pats }
477 }
478}
479
480impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> MemoryCollection<A> {
481
482 /// Copies the MemoryCollection to disk, returning a DiskCollection
483 pub fn to_disk(&self, path: String) -> DiskCollection<A> {
484 DiskCollection::from_memory(path, &self.partitions)
485 }
486}
487
488#[cfg(test)]
489mod test_lib {
490 use super::*;
491 use tange::scheduler::LeveledScheduler;
492
493 #[test]
494 fn test_fold_by() {
495 let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
496 let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 1);
497 let mut results = out.run(&mut LeveledScheduler).unwrap();
498 results.sort();
499 assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
500 }
501
502 #[test]
503 fn test_fold_by_parts() {
504 let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
505 let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 2);
506 assert_eq!(out.partitions.len(), 2);
507 let mut results = out.run(&mut LeveledScheduler).unwrap();
508 results.sort();
509 assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
510 }
511
512 #[test]
513 fn test_partition_by_key() {
514 let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
515 let computed = col.partition_by_key(2, |x| *x)
516 .sort_by(|x| *x);
517 assert_eq!(computed.partitions.len(), 2);
518 let results = computed.run(&mut LeveledScheduler).unwrap();
519 assert_eq!(results, vec![2, 2, 3, 1, 1]);
520 }
521
522 #[test]
523 fn test_partition() {
524 let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
525 let computed = col.partition(2, |_idx, x| x % 2)
526 .sort_by(|x| *x);
527 assert_eq!(computed.partitions.len(), 2);
528 let results = computed.run(&mut LeveledScheduler).unwrap();
529 assert_eq!(results, vec![2, 2, 1, 1, 3]);
530 }
531
532 #[test]
533 fn test_count() {
534 let col = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
535 let results = col.split(3).count().run(&mut LeveledScheduler).unwrap();
536 assert_eq!(results, vec![5]);
537 }
538
539 #[test]
540 fn test_join() {
541 let col1 = MemoryCollection::from_vec(vec![1,2,3,1,2usize]);
542 let col2 = MemoryCollection::from_vec(
543 vec![(2, 1.23f64), (3usize, 2.34)]);
544 let out = col1.join_on(&col2, |x| *x, |y| y.0, |x, y| {
545 (*x, y.1)
546 }, 5).split(1).sort_by(|x| x.0);
547 let results = out.run(&mut LeveledScheduler).unwrap();
548 let expected = vec![(2, (2, 1.23)), (2, (2, 1.23)), (3, (3, 2.34))];
549 assert_eq!(results, expected);
550 }
551
552 #[test]
553 fn test_emit() {
554 let results = MemoryCollection::from_vec(vec![1,2,3usize])
555 .emit(|num, emitter| {
556 for i in 0..*num {
557 emitter(i);
558 }
559 })
560 .sort_by(|x| *x)
561 .run(&mut LeveledScheduler).unwrap();
562 let expected = vec![0, 0, 0, 1, 1, 2];
563 assert_eq!(results, expected);
564 }
565
566 #[test]
567 fn test_sort() {
568 let results = MemoryCollection::from_vec(vec![1, 3, 2usize])
569 .sort_by(|x| *x)
570 .run(&mut LeveledScheduler).unwrap();
571 let expected = vec![1, 2, 3];
572 assert_eq!(results, expected);
573 }
574
575}