Skip to main content

tange_collection/collection/
disk.rs

1//! Disk Collections
2//! ---
3//! This module defines the Dataflow interfaces for Out-Of-Core data processing.
4//! `DiskCollection` is intended to be used for processing datasets that might not fit
5//! in memory.
6//!
7//! All partitions are written to disk for every application, cleaning up the file when
8//! finished.  This allows DiskCollection to only need the currently executing task in
9//! in memory.  However, this also means there is going to be a fair amount of serialization/deserialization.
10//! Under the surface, we use bincode to serialize dat quickly to minmize the penalty.
11//!
12
13extern crate serde;
14use std::fs;
15use std::any::Any;
16use std::io::prelude::*;
17use std::io::BufWriter;
18use std::hash::Hash;
19use std::sync::Arc;
20
21use self::serde::Deserialize;
22use self::serde::Serialize;
23
24use tange::deferred::{Deferred, batch_apply, tree_reduce};
25use tange::scheduler::Scheduler;
26
27use collection::memory::MemoryCollection;
28use partitioned::{join_on_key as jok, partition, partition_by_key, fold_by, concat};
29use interfaces::*;
30use super::emit;
31
32
33/// DiskCollection struct.
34#[derive(Clone)]
35pub struct DiskCollection<A: Clone + Send + Sync>  {
36    path: Arc<String>,
37    partitions: Vec<Deferred<Arc<FileStore<A>>>>
38}
39
40impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
41
42    /// Create a new DiskCollection form a Vector of objects.
43    /// ```rust
44    ///   extern crate tange;
45    ///   extern crate tange_collection;
46    ///   use tange::scheduler::GreedyScheduler;
47    ///   use tange_collection::collection::disk::DiskCollection;
48    ///   
49    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
50    ///   assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
51    /// ```
52    pub fn from_vec(path: String, vec: Vec<A>) -> DiskCollection<A> {
53        MemoryCollection::from_vec(vec).to_disk(path)
54    }
55
56    /// Converts a collection of Deferred objects into a DiskCollection
57    /// This is usually best used from the `MemoryCollection`
58    pub fn from_memory(path: String, mc: &Vec<Deferred<Vec<A>>>) -> DiskCollection<A> {
59        ::std::fs::create_dir_all(&path).expect("Unable to create directory!");
60        let shared = Arc::new(path);
61        let acc = Arc::new(FileStore::empty(shared.clone()));
62        let defs = batch_apply(&mc, move |_idx, vs| {
63            acc.write_vec(vs.clone())
64        });
65        DiskCollection { path: shared, partitions: defs }
66    }
67
68    /// Creats a DiskCollection for a set of FileStores.
69    pub fn from_stores(path: String, fs: Vec<Deferred<Arc<FileStore<A>>>>) -> DiskCollection<A> {
70        DiskCollection { path: Arc::new(path), partitions: fs }
71    }
72
73    /// Provides raw access to the underlying partitions
74    pub fn to_defs(&self) -> &Vec<Deferred<Arc<FileStore<A>>>> {
75        &self.partitions
76    }
77
78    /// Converts a DiskCollection to a MemoryCollection
79    pub fn to_memory(&self) -> MemoryCollection<A> {
80        let defs = batch_apply(&self.partitions, |_idx, vs| {
81            vs.stream().into_iter().collect()
82        });
83        MemoryCollection::from_defs(defs)
84    }
85
86    /// Returns the current number of data partitions 
87    pub fn n_partitions(&self) -> usize {
88        self.partitions.len()
89    }
90
91    fn from_defs<B: Clone + Send + Sync>(&self, defs: Vec<Deferred<Arc<FileStore<B>>>>) -> DiskCollection<B> {
92        DiskCollection { path: self.path.clone(), partitions: defs }
93    }
94
95    /// Concatentates two collections into a single Collection
96    /// ```rust
97    ///   extern crate tange;
98    ///   extern crate tange_collection;
99    ///   use tange::scheduler::GreedyScheduler;
100    ///   use tange_collection::collection::disk::DiskCollection;
101    ///   
102    ///   let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
103    ///   let two = DiskCollection::from_vec("/tmp".into(), vec![4usize, 5, 6]);
104    ///   let cat = one.concat(&two);
105    ///   assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
106    /// ```
107    pub fn concat(&self, other: &DiskCollection<A>) -> DiskCollection<A> {
108        let mut nps: Vec<_> = self.partitions.iter()
109            .map(|p| (*p).clone()).collect();
110
111        for p in other.partitions.iter() {
112            nps.push(p.clone());
113        }
114
115        self.from_defs(nps)
116    }
117    
118    /// Maps a function over the values in the DiskCollection, returning a new DiskCollection
119    /// ```rust
120    ///   extern crate tange;
121    ///   extern crate tange_collection;
122    ///   use tange::scheduler::GreedyScheduler;
123    ///   use tange_collection::collection::disk::DiskCollection;
124    ///   
125    ///   let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
126    ///   let strings = one.map(|i| format!("{}", i));
127    ///   assert_eq!(strings.run(&GreedyScheduler::new()), 
128    ///     Some(vec!["1".into(),"2".into(),"3".into()]));
129    /// ```
130    pub fn map<
131        B: Any + Send + Sync + Clone + Serialize, 
132        F: 'static + Sync + Send + Clone + Fn(&A) -> B
133    >(&self, f: F) -> DiskCollection<B> {
134        self.emit(move |x, emitter| {
135            emitter(f(x))
136        })
137    }
138
139    /// Filters out items in the collection that fail the predicate.
140    /// ```rust
141    ///   extern crate tange;
142    ///   extern crate tange_collection;
143    ///   use tange::scheduler::GreedyScheduler;
144    ///   use tange_collection::collection::disk::DiskCollection;
145    ///   
146    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
147    ///   let odds = col.filter(|x| x % 2 == 1);
148    ///   assert_eq!(odds.run(&GreedyScheduler::new()), 
149    ///     Some(vec![1, 3usize]));
150    /// ```
151
152    pub fn filter<
153        F: 'static + Sync + Send + Clone + Fn(&A) -> bool
154    >(&self, f: F) -> DiskCollection<A> {
155        self.emit(move |x, emitter| {
156            if f(x) { 
157                emitter(x.clone())
158            }
159        })
160    }
161    
162    /// Re-partitions a collection by the number of provided chunks.  It uniformly distributes data from each old partition into each new partition.
163    /// ```rust
164    ///   extern crate tange;
165    ///   extern crate tange_collection;
166    ///   use tange::scheduler::GreedyScheduler;
167    ///   use tange_collection::collection::disk::DiskCollection;
168    ///   
169    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
170    ///   assert_eq!(col.n_partitions(), 1);
171    ///   let two = col.split(2);
172    ///   assert_eq!(two.n_partitions(), 2);
173    ///   let two = col.split(3);
174    ///   assert_eq!(two.n_partitions(), 3);
175    /// ```
176
177    pub fn split(&self, n_chunks: usize) -> DiskCollection<A> {
178        self.partition(n_chunks, |idx, _k| idx)
179    }
180
181    /// Maps over all items in a collection, optionally emitting new values.  It can be used
182    /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
183    /// ```rust
184    ///   extern crate tange;
185    ///   extern crate tange_collection;
186    ///   use tange::scheduler::GreedyScheduler;
187    ///   use tange_collection::collection::disk::DiskCollection;
188    ///   
189    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
190    ///   let new = col.emit(|item, emitter| {
191    ///     if item % 2 == 0 {
192    ///         emitter(format!("{}!", item));
193    ///     }
194    ///   });
195    ///   assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
196    /// ```
197    pub fn emit<
198        B: Any + Send + Sync + Clone + Serialize,
199        F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
200    >(&self, f: F) -> DiskCollection<B> {
201
202        let parts = emit(&self.partitions, Disk(self.path.clone()), f);
203
204        self.from_defs(parts)
205    }
206
207    /// Re-partitions data into N new partitions by the given function.  The user provided
208    /// function is used as a hash function, mapping the returned value to a partition index.
209    /// This makes it useful for managing which partition data ends up!
210    /// ```rust
211    ///   extern crate tange;
212    ///   extern crate tange_collection;
213    ///   use tange::scheduler::GreedyScheduler;
214    ///   use tange_collection::collection::disk::DiskCollection;
215    ///   
216    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
217    ///   let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
218    ///   
219    ///   assert_eq!(new_col.n_partitions(), 2);
220    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
221    /// ```
222
223    pub fn partition<
224        F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize
225    >(&self, partitions: usize, f: F) -> DiskCollection<A> {
226        let new_chunks = partition(&self.partitions, 
227                                   partitions, 
228                                   f);
229        // Loop over each bucket
230        self.from_defs(new_chunks)
231    }
232
233    /// Folds and accumulates values across multiple partitions into K new partitions.
234    /// This is also known as a "group by" with a following reducer.
235    ///
236    /// DiskCollection first performs a block aggregation: that is, it combines values
237    /// within each partition first using the `binop` function.  It then hashes
238    /// each key to a new partition index, where it will then aggregate all keys using the
239    /// `reduce` function.
240    ///
241    /// ```rust
242    ///   extern crate tange;
243    ///   extern crate tange_collection;
244    ///   use tange::scheduler::GreedyScheduler;
245    ///   use tange_collection::collection::disk::DiskCollection;
246    ///   
247    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4,5usize]);
248    ///   // Sum all odds and evens together
249    ///   let group_sum = col.fold_by(|x| x % 2,
250    ///                               || 0usize,
251    ///                               |block_acc, item| {*block_acc += *item},
252    ///                               |part_acc1, part_acc2| {*part_acc1 += *part_acc2},
253    ///                               1)
254    ///                   .sort_by(|x| x.0);
255    ///   
256    ///   assert_eq!(group_sum.n_partitions(), 1);
257    ///   assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
258    /// ```
259
260    pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>,
261                   B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>,
262                   D: 'static + Sync + Send + Clone + Fn() -> B,
263                   F: 'static + Sync + Send + Clone + Fn(&A) -> K, 
264                   O: 'static + Sync + Send + Clone + Fn(&mut B, &A) -> (),
265                   R: 'static + Sync + Send + Clone + Fn(&mut B, &B) -> ()>(
266        &self, key: F, default: D, binop: O, reduce: R, partitions: usize
267    ) -> DiskCollection<(K,B)> {
268        let fs = Arc::new(FileStore::empty(self.path.clone()));
269        let results = fold_by(&self.partitions, key, default, binop, 
270                              reduce, fs, partitions);
271        self.from_defs(results)
272    }
273
274    /// Simple function to re-partition values by a given key.  The return key is hashed
275    /// and moduloed by the new partition count to determine where it will end up.
276    /// ```rust
277    ///   extern crate tange;
278    ///   extern crate tange_collection;
279    ///   use tange::scheduler::GreedyScheduler;
280    ///   use tange_collection::collection::disk::DiskCollection;
281    ///   
282    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
283    ///   let new_col = col.partition_by_key(2, |x| format!("{}", x));
284    ///   
285    ///   assert_eq!(new_col.n_partitions(), 2);
286    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
287    /// ```
288
289    pub fn partition_by_key<
290        K: Any + Sync + Send + Clone + Hash + Eq,
291        F: 'static + Sync + Send + Clone + Fn(&A) -> K
292    >(&self, n_chunks: usize, key: F) -> DiskCollection<A> {
293        let results = partition_by_key(&self.partitions, n_chunks, key);
294        let groups = results.into_iter().map(|part| concat(&part).unwrap()).collect();
295        self.from_defs(groups)
296    }
297
298    /// Sorts values within each partition by a key function.  If a global sort is desired,
299    /// the collection needs to be re-partitioned into a single partition
300    /// ```rust
301    ///   extern crate tange;
302    ///   extern crate tange_collection;
303    ///   use tange::scheduler::GreedyScheduler;
304    ///   use tange_collection::collection::disk::DiskCollection;
305    ///   
306    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4i32]);
307    ///   let new_col = col.sort_by(|x| -*x);
308    ///   
309    ///   assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
310    /// ```
311pub fn sort_by<
312        K: Ord,
313        F: 'static + Sync + Send + Clone + Fn(&A) -> K
314    >(&self, key: F) -> DiskCollection<A> {
315        let acc = Arc::new(FileStore::empty(self.path.clone()));
316        let nps = batch_apply(&self.partitions, move |_idx, vs| {
317            let mut out = acc.writer();
318            let mut v2: Vec<_> = vs.stream().into_iter().collect();
319            v2.sort_by_key(|v| key(v));
320            for vi in v2 {
321                out.add(vi);
322            }
323            out.finish()
324        });
325        self.from_defs(nps)
326    }
327
328    /// Inner Joins two collections by the provided key function.
329    /// If multiple values of the same key are found, they will be cross product for each
330    /// pair found.
331    /// ```rust
332    ///   extern crate tange;
333    ///   extern crate tange_collection;
334    ///   use tange::scheduler::GreedyScheduler;
335    ///   use tange_collection::collection::disk::DiskCollection;
336    ///   let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
337    ///   let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
338    ///   
339    ///   let na = DiskCollection::from_vec("/tmp".into(), name_age);
340    ///   let nm = DiskCollection::from_vec("/tmp".into(), name_money);
341    ///   let joined = na.join_on(&nm,
342    ///                           |nax| nax.0.clone(),
343    ///                           |nmx| nmx.0.clone(),
344    ///                           |nax, nmx| (nax.0.clone(), nax.1, nmx.1),
345    ///                           1);
346    ///   assert_eq!(joined.run(&GreedyScheduler::new()), 
347    ///           Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
348    /// ```
349    pub fn join_on<
350        K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>,
351        B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>,
352        C: Any + Sync + Send + Clone + Serialize,
353        KF1: 'static + Sync + Send + Clone + Fn(&A) -> K,
354        KF2: 'static + Sync + Send + Clone + Fn(&B) -> K,
355        J:   'static + Sync + Send + Clone + Fn(&A, &B) -> C,
356    >(
357        &self, 
358        other: &DiskCollection<B>, 
359        key1: KF1, 
360        key2: KF2,
361        joiner: J,
362        partitions: usize, 
363    ) -> DiskCollection<(K,C)> {
364        // Group each by a common key
365        let p1 = self.map(move |x| (key1(x), x.clone()))
366            .partition_by_key(partitions, |x| x.0.clone());
367        let p2 = other.map(move |x| (key2(x), x.clone()))
368            .partition_by_key(partitions, |x| x.0.clone());
369
370        let mut new_parts = Vec::with_capacity(p1.partitions.len());
371        for (l, r) in p1.partitions.iter().zip(p2.partitions.iter()) {
372            let acc = Arc::new(FileStore::empty(self.path.clone()));
373            new_parts.push(jok(l, r, acc, joiner.clone()));
374        }
375
376        self.from_defs(new_parts)
377    }
378
379    /// Executes the Collection, returning the result of the computation
380    pub fn run<S: Scheduler>(&self, s: &S) -> Option<Vec<A>> {
381        let defs = batch_apply(&self.partitions, |_idx, vs| {
382            vs.stream().into_iter().collect::<Vec<_>>()
383        });
384        let cat = tree_reduce(&defs, |x, y| {
385            let mut v1: Vec<_> = (*x).clone();
386            for yi in y {
387                v1.push(yi.clone());
388            }
389            v1
390        });
391        cat.and_then(|x| x.run(s))
392    }
393}
394
395impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<Vec<A>> {
396    /// Flattens a vector of values
397    /// ```rust
398    ///   extern crate tange;
399    ///   extern crate tange_collection;
400    ///   use tange::scheduler::GreedyScheduler;
401    ///   use tange_collection::collection::disk::DiskCollection;
402    ///   
403    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
404    ///   let flattened = col.flatten();
405    ///   assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
406    /// ```
407    pub fn flatten(&self) -> DiskCollection<A> {
408        self.emit(move |x, emitter| {
409            for xi in x {
410                emitter(xi.clone());
411            }
412        })
413    }
414}
415
416impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
417    /// Returns the number of items in the collection
418    /// ```rust
419    ///   extern crate tange;
420    ///   extern crate tange_collection;
421    ///   use tange::scheduler::GreedyScheduler;
422    ///   use tange_collection::collection::disk::DiskCollection;
423    ///   
424    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
425    ///   assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
426    ///   let flattened = col.flatten();
427    ///   assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
428    /// ```
429    pub fn count(&self) -> DiskCollection<usize> {
430        let nps = batch_apply(&self.partitions, |_idx, vs| {
431            vs.stream().into_iter().map(|_| 1usize).sum::<usize>()
432        });
433        let count = tree_reduce(&nps, |x, y| x + y).unwrap();
434        let acc = Arc::new(FileStore::empty(self.path.clone()));
435        let out = count.apply(move |x| {
436            acc.write_vec(vec![*x])
437        });
438        self.from_defs(vec![out])
439    }
440}
441
442impl <A: Any + Send + Sync + Clone + PartialEq + Hash + Eq + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
443
444    /// Computes the frequencies of the items in collection.
445    /// ```rust
446    ///   extern crate tange;
447    ///   extern crate tange_collection;
448    ///   use tange::scheduler::GreedyScheduler;
449    ///   use tange_collection::collection::disk::DiskCollection;
450    ///   
451    ///   let col = DiskCollection::from_vec("/tmp".into(), vec![1, 2, 1, 5, 1, 2]);
452    ///   let freqs = col.frequencies(1).sort_by(|x| x.0);
453    ///   assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
454    /// ```
455pub fn frequencies(&self, partitions: usize) -> DiskCollection<(A, usize)> {
456        //self.partition(chunks, |x| x);
457        self.fold_by(|s| s.clone(), 
458                     || 0usize, 
459                     |acc, _l| *acc += 1, 
460                     |x, y| *x += *y, 
461                     partitions)
462    }
463}
464
465// Writes out data
466impl DiskCollection<String> {
467    /// Writes each record in a collection to disk, newline delimited.
468    /// DiskCollection will create anew file within the path for each partition written.
469    pub fn sink(&self, path: &str) -> DiskCollection<usize> {
470        let acc = Arc::new(FileStore::empty(self.path.clone()));
471        let p: Arc<String> = Arc::new(path.to_owned());
472        let pats = batch_apply(&self.partitions, move |idx, vs| {
473            let p2 = p.clone();
474            let local: &str = &p2;
475            fs::create_dir_all(local)
476                .expect("Welp, something went terribly wrong when creating directory");
477
478            let file = fs::File::create(&format!("{}/{}", local, idx))
479                .expect("Issues opening file!");
480            let mut bw = BufWriter::new(file);
481
482            let mut size = 0usize;
483            for line in vs.stream() {
484                bw.write(line.as_bytes()).expect("Error writing out line");
485                bw.write(b"\n").expect("Error writing out line");
486                size += 1;
487            }
488
489            acc.write_vec(vec![size])
490        });
491        
492        self.from_defs(pats)
493    }
494}
495
496#[cfg(test)]
497mod test_lib {
498    use super::*;
499    use tange::scheduler::{GreedyScheduler,LeveledScheduler};
500
501    fn make_col() -> DiskCollection<usize> {
502        DiskCollection::from_vec("/tmp".into(), vec![1,2,3,1,2usize])
503    }
504
505    #[test]
506    fn test_fold_by() {
507        let col = make_col();
508        let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 1);
509        let mut results = out.run(&LeveledScheduler).unwrap();
510        results.sort();
511        assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
512    }
513
514    #[test]
515    fn test_fold_by_parts() {
516        let col = make_col();
517        let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 2);
518        assert_eq!(out.partitions.len(), 2);
519        let mut results = out.run(&LeveledScheduler).unwrap();
520        results.sort();
521        assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
522    }
523
524    #[test]
525    fn test_partition_by_key() {
526        let col = make_col();
527        let computed = col.partition_by_key(2, |x| *x)
528            .sort_by(|x| *x);
529        assert_eq!(computed.partitions.len(), 2);
530        let results = computed.run(&LeveledScheduler).unwrap();
531        assert_eq!(results, vec![2, 2, 3, 1, 1]);
532    }
533
534    #[test]
535    fn test_partition() {
536        let col = make_col();
537        let computed = col.partition(2, |_idx, x| x % 2)
538            .sort_by(|x| *x);
539        assert_eq!(computed.partitions.len(), 2);
540        let results = computed.run(&GreedyScheduler::new()).unwrap();
541        assert_eq!(results, vec![2, 2, 1, 1, 3]);
542    }
543
544    #[test]
545    fn test_count() {
546        let col = make_col();
547        let results = col.split(3).count().run(&mut LeveledScheduler).unwrap();
548        assert_eq!(results, vec![5]);
549    }
550
551    #[test]
552    fn test_join() {
553        let col1 = make_col();
554        let col2 = DiskCollection::from_vec("/tmp".into(),
555            vec![(2, 1.23f64), (3usize, 2.34)]);
556        let out = col1.join_on(&col2, |x| *x, |y| y.0, |x, y| {
557            (*x, y.1)
558        }, 5).split(1).sort_by(|x| x.0);
559        let results = out.run(&LeveledScheduler).unwrap();
560        let expected = vec![(2, (2, 1.23)), (2, (2, 1.23)), (3, (3, 2.34))];
561        assert_eq!(results, expected);
562    }
563
564    #[test]
565    fn test_emit() {
566        let results = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize])
567            .emit(|num, emitter| {
568                for i in 0..*num {
569                    emitter(i);
570                }
571            })
572            .sort_by(|x| *x)
573            .run(&LeveledScheduler).unwrap();
574        let expected = vec![0, 0, 0, 1, 1, 2];
575        assert_eq!(results, expected);
576    }
577
578    #[test]
579    fn test_sort() {
580        let results = DiskCollection::from_vec("/tmp".into(), vec![1, 3, 2usize])
581            .sort_by(|x| *x)
582            .run(&LeveledScheduler).unwrap();
583        let expected = vec![1, 2, 3];
584        assert_eq!(results, expected);
585    }
586
587}