tange_collection/collection/disk.rs
1//! Disk Collections
2//! ---
3//! This module defines the Dataflow interfaces for Out-Of-Core data processing.
4//! `DiskCollection` is intended to be used for processing datasets that might not fit
5//! in memory.
6//!
7//! All partitions are written to disk for every application, cleaning up the file when
8//! finished. This allows DiskCollection to only need the currently executing task in
9//! in memory. However, this also means there is going to be a fair amount of serialization/deserialization.
10//! Under the surface, we use bincode to serialize dat quickly to minmize the penalty.
11//!
12
13extern crate serde;
14use std::fs;
15use std::any::Any;
16use std::io::prelude::*;
17use std::io::BufWriter;
18use std::hash::Hash;
19use std::sync::Arc;
20
21use self::serde::Deserialize;
22use self::serde::Serialize;
23
24use tange::deferred::{Deferred, batch_apply, tree_reduce};
25use tange::scheduler::Scheduler;
26
27use collection::memory::MemoryCollection;
28use partitioned::{join_on_key as jok, partition, partition_by_key, fold_by, concat};
29use interfaces::*;
30use super::emit;
31
32
33/// DiskCollection struct.
34#[derive(Clone)]
35pub struct DiskCollection<A: Clone + Send + Sync> {
36 path: Arc<String>,
37 partitions: Vec<Deferred<Arc<FileStore<A>>>>
38}
39
40impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
41
42 /// Create a new DiskCollection form a Vector of objects.
43 /// ```rust
44 /// extern crate tange;
45 /// extern crate tange_collection;
46 /// use tange::scheduler::GreedyScheduler;
47 /// use tange_collection::collection::disk::DiskCollection;
48 ///
49 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
50 /// assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
51 /// ```
52 pub fn from_vec(path: String, vec: Vec<A>) -> DiskCollection<A> {
53 MemoryCollection::from_vec(vec).to_disk(path)
54 }
55
56 /// Converts a collection of Deferred objects into a DiskCollection
57 /// This is usually best used from the `MemoryCollection`
58 pub fn from_memory(path: String, mc: &Vec<Deferred<Vec<A>>>) -> DiskCollection<A> {
59 ::std::fs::create_dir_all(&path).expect("Unable to create directory!");
60 let shared = Arc::new(path);
61 let acc = Arc::new(FileStore::empty(shared.clone()));
62 let defs = batch_apply(&mc, move |_idx, vs| {
63 acc.write_vec(vs.clone())
64 });
65 DiskCollection { path: shared, partitions: defs }
66 }
67
68 /// Creats a DiskCollection for a set of FileStores.
69 pub fn from_stores(path: String, fs: Vec<Deferred<Arc<FileStore<A>>>>) -> DiskCollection<A> {
70 DiskCollection { path: Arc::new(path), partitions: fs }
71 }
72
73 /// Provides raw access to the underlying partitions
74 pub fn to_defs(&self) -> &Vec<Deferred<Arc<FileStore<A>>>> {
75 &self.partitions
76 }
77
78 /// Converts a DiskCollection to a MemoryCollection
79 pub fn to_memory(&self) -> MemoryCollection<A> {
80 let defs = batch_apply(&self.partitions, |_idx, vs| {
81 vs.stream().into_iter().collect()
82 });
83 MemoryCollection::from_defs(defs)
84 }
85
86 /// Returns the current number of data partitions
87 pub fn n_partitions(&self) -> usize {
88 self.partitions.len()
89 }
90
91 fn from_defs<B: Clone + Send + Sync>(&self, defs: Vec<Deferred<Arc<FileStore<B>>>>) -> DiskCollection<B> {
92 DiskCollection { path: self.path.clone(), partitions: defs }
93 }
94
95 /// Concatentates two collections into a single Collection
96 /// ```rust
97 /// extern crate tange;
98 /// extern crate tange_collection;
99 /// use tange::scheduler::GreedyScheduler;
100 /// use tange_collection::collection::disk::DiskCollection;
101 ///
102 /// let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
103 /// let two = DiskCollection::from_vec("/tmp".into(), vec![4usize, 5, 6]);
104 /// let cat = one.concat(&two);
105 /// assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
106 /// ```
107 pub fn concat(&self, other: &DiskCollection<A>) -> DiskCollection<A> {
108 let mut nps: Vec<_> = self.partitions.iter()
109 .map(|p| (*p).clone()).collect();
110
111 for p in other.partitions.iter() {
112 nps.push(p.clone());
113 }
114
115 self.from_defs(nps)
116 }
117
118 /// Maps a function over the values in the DiskCollection, returning a new DiskCollection
119 /// ```rust
120 /// extern crate tange;
121 /// extern crate tange_collection;
122 /// use tange::scheduler::GreedyScheduler;
123 /// use tange_collection::collection::disk::DiskCollection;
124 ///
125 /// let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
126 /// let strings = one.map(|i| format!("{}", i));
127 /// assert_eq!(strings.run(&GreedyScheduler::new()),
128 /// Some(vec!["1".into(),"2".into(),"3".into()]));
129 /// ```
130 pub fn map<
131 B: Any + Send + Sync + Clone + Serialize,
132 F: 'static + Sync + Send + Clone + Fn(&A) -> B
133 >(&self, f: F) -> DiskCollection<B> {
134 self.emit(move |x, emitter| {
135 emitter(f(x))
136 })
137 }
138
139 /// Filters out items in the collection that fail the predicate.
140 /// ```rust
141 /// extern crate tange;
142 /// extern crate tange_collection;
143 /// use tange::scheduler::GreedyScheduler;
144 /// use tange_collection::collection::disk::DiskCollection;
145 ///
146 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
147 /// let odds = col.filter(|x| x % 2 == 1);
148 /// assert_eq!(odds.run(&GreedyScheduler::new()),
149 /// Some(vec![1, 3usize]));
150 /// ```
151
152 pub fn filter<
153 F: 'static + Sync + Send + Clone + Fn(&A) -> bool
154 >(&self, f: F) -> DiskCollection<A> {
155 self.emit(move |x, emitter| {
156 if f(x) {
157 emitter(x.clone())
158 }
159 })
160 }
161
162 /// Re-partitions a collection by the number of provided chunks. It uniformly distributes data from each old partition into each new partition.
163 /// ```rust
164 /// extern crate tange;
165 /// extern crate tange_collection;
166 /// use tange::scheduler::GreedyScheduler;
167 /// use tange_collection::collection::disk::DiskCollection;
168 ///
169 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
170 /// assert_eq!(col.n_partitions(), 1);
171 /// let two = col.split(2);
172 /// assert_eq!(two.n_partitions(), 2);
173 /// let two = col.split(3);
174 /// assert_eq!(two.n_partitions(), 3);
175 /// ```
176
177 pub fn split(&self, n_chunks: usize) -> DiskCollection<A> {
178 self.partition(n_chunks, |idx, _k| idx)
179 }
180
181 /// Maps over all items in a collection, optionally emitting new values. It can be used
182 /// to efficiently fuse a number of map/filter/flat_map functions into a single method.
183 /// ```rust
184 /// extern crate tange;
185 /// extern crate tange_collection;
186 /// use tange::scheduler::GreedyScheduler;
187 /// use tange_collection::collection::disk::DiskCollection;
188 ///
189 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
190 /// let new = col.emit(|item, emitter| {
191 /// if item % 2 == 0 {
192 /// emitter(format!("{}!", item));
193 /// }
194 /// });
195 /// assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
196 /// ```
197 pub fn emit<
198 B: Any + Send + Sync + Clone + Serialize,
199 F: 'static + Sync + Send + Clone + Fn(&A, &mut FnMut(B) -> ())
200 >(&self, f: F) -> DiskCollection<B> {
201
202 let parts = emit(&self.partitions, Disk(self.path.clone()), f);
203
204 self.from_defs(parts)
205 }
206
207 /// Re-partitions data into N new partitions by the given function. The user provided
208 /// function is used as a hash function, mapping the returned value to a partition index.
209 /// This makes it useful for managing which partition data ends up!
210 /// ```rust
211 /// extern crate tange;
212 /// extern crate tange_collection;
213 /// use tange::scheduler::GreedyScheduler;
214 /// use tange_collection::collection::disk::DiskCollection;
215 ///
216 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
217 /// let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
218 ///
219 /// assert_eq!(new_col.n_partitions(), 2);
220 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
221 /// ```
222
223 pub fn partition<
224 F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize
225 >(&self, partitions: usize, f: F) -> DiskCollection<A> {
226 let new_chunks = partition(&self.partitions,
227 partitions,
228 f);
229 // Loop over each bucket
230 self.from_defs(new_chunks)
231 }
232
233 /// Folds and accumulates values across multiple partitions into K new partitions.
234 /// This is also known as a "group by" with a following reducer.
235 ///
236 /// DiskCollection first performs a block aggregation: that is, it combines values
237 /// within each partition first using the `binop` function. It then hashes
238 /// each key to a new partition index, where it will then aggregate all keys using the
239 /// `reduce` function.
240 ///
241 /// ```rust
242 /// extern crate tange;
243 /// extern crate tange_collection;
244 /// use tange::scheduler::GreedyScheduler;
245 /// use tange_collection::collection::disk::DiskCollection;
246 ///
247 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4,5usize]);
248 /// // Sum all odds and evens together
249 /// let group_sum = col.fold_by(|x| x % 2,
250 /// || 0usize,
251 /// |block_acc, item| {*block_acc += *item},
252 /// |part_acc1, part_acc2| {*part_acc1 += *part_acc2},
253 /// 1)
254 /// .sort_by(|x| x.0);
255 ///
256 /// assert_eq!(group_sum.n_partitions(), 1);
257 /// assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
258 /// ```
259
260 pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>,
261 B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>,
262 D: 'static + Sync + Send + Clone + Fn() -> B,
263 F: 'static + Sync + Send + Clone + Fn(&A) -> K,
264 O: 'static + Sync + Send + Clone + Fn(&mut B, &A) -> (),
265 R: 'static + Sync + Send + Clone + Fn(&mut B, &B) -> ()>(
266 &self, key: F, default: D, binop: O, reduce: R, partitions: usize
267 ) -> DiskCollection<(K,B)> {
268 let fs = Arc::new(FileStore::empty(self.path.clone()));
269 let results = fold_by(&self.partitions, key, default, binop,
270 reduce, fs, partitions);
271 self.from_defs(results)
272 }
273
274 /// Simple function to re-partition values by a given key. The return key is hashed
275 /// and moduloed by the new partition count to determine where it will end up.
276 /// ```rust
277 /// extern crate tange;
278 /// extern crate tange_collection;
279 /// use tange::scheduler::GreedyScheduler;
280 /// use tange_collection::collection::disk::DiskCollection;
281 ///
282 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
283 /// let new_col = col.partition_by_key(2, |x| format!("{}", x));
284 ///
285 /// assert_eq!(new_col.n_partitions(), 2);
286 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
287 /// ```
288
289 pub fn partition_by_key<
290 K: Any + Sync + Send + Clone + Hash + Eq,
291 F: 'static + Sync + Send + Clone + Fn(&A) -> K
292 >(&self, n_chunks: usize, key: F) -> DiskCollection<A> {
293 let results = partition_by_key(&self.partitions, n_chunks, key);
294 let groups = results.into_iter().map(|part| concat(&part).unwrap()).collect();
295 self.from_defs(groups)
296 }
297
298 /// Sorts values within each partition by a key function. If a global sort is desired,
299 /// the collection needs to be re-partitioned into a single partition
300 /// ```rust
301 /// extern crate tange;
302 /// extern crate tange_collection;
303 /// use tange::scheduler::GreedyScheduler;
304 /// use tange_collection::collection::disk::DiskCollection;
305 ///
306 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4i32]);
307 /// let new_col = col.sort_by(|x| -*x);
308 ///
309 /// assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
310 /// ```
311pub fn sort_by<
312 K: Ord,
313 F: 'static + Sync + Send + Clone + Fn(&A) -> K
314 >(&self, key: F) -> DiskCollection<A> {
315 let acc = Arc::new(FileStore::empty(self.path.clone()));
316 let nps = batch_apply(&self.partitions, move |_idx, vs| {
317 let mut out = acc.writer();
318 let mut v2: Vec<_> = vs.stream().into_iter().collect();
319 v2.sort_by_key(|v| key(v));
320 for vi in v2 {
321 out.add(vi);
322 }
323 out.finish()
324 });
325 self.from_defs(nps)
326 }
327
328 /// Inner Joins two collections by the provided key function.
329 /// If multiple values of the same key are found, they will be cross product for each
330 /// pair found.
331 /// ```rust
332 /// extern crate tange;
333 /// extern crate tange_collection;
334 /// use tange::scheduler::GreedyScheduler;
335 /// use tange_collection::collection::disk::DiskCollection;
336 /// let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
337 /// let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
338 ///
339 /// let na = DiskCollection::from_vec("/tmp".into(), name_age);
340 /// let nm = DiskCollection::from_vec("/tmp".into(), name_money);
341 /// let joined = na.join_on(&nm,
342 /// |nax| nax.0.clone(),
343 /// |nmx| nmx.0.clone(),
344 /// |nax, nmx| (nax.0.clone(), nax.1, nmx.1),
345 /// 1);
346 /// assert_eq!(joined.run(&GreedyScheduler::new()),
347 /// Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
348 /// ```
349 pub fn join_on<
350 K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>,
351 B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>,
352 C: Any + Sync + Send + Clone + Serialize,
353 KF1: 'static + Sync + Send + Clone + Fn(&A) -> K,
354 KF2: 'static + Sync + Send + Clone + Fn(&B) -> K,
355 J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C,
356 >(
357 &self,
358 other: &DiskCollection<B>,
359 key1: KF1,
360 key2: KF2,
361 joiner: J,
362 partitions: usize,
363 ) -> DiskCollection<(K,C)> {
364 // Group each by a common key
365 let p1 = self.map(move |x| (key1(x), x.clone()))
366 .partition_by_key(partitions, |x| x.0.clone());
367 let p2 = other.map(move |x| (key2(x), x.clone()))
368 .partition_by_key(partitions, |x| x.0.clone());
369
370 let mut new_parts = Vec::with_capacity(p1.partitions.len());
371 for (l, r) in p1.partitions.iter().zip(p2.partitions.iter()) {
372 let acc = Arc::new(FileStore::empty(self.path.clone()));
373 new_parts.push(jok(l, r, acc, joiner.clone()));
374 }
375
376 self.from_defs(new_parts)
377 }
378
379 /// Executes the Collection, returning the result of the computation
380 pub fn run<S: Scheduler>(&self, s: &S) -> Option<Vec<A>> {
381 let defs = batch_apply(&self.partitions, |_idx, vs| {
382 vs.stream().into_iter().collect::<Vec<_>>()
383 });
384 let cat = tree_reduce(&defs, |x, y| {
385 let mut v1: Vec<_> = (*x).clone();
386 for yi in y {
387 v1.push(yi.clone());
388 }
389 v1
390 });
391 cat.and_then(|x| x.run(s))
392 }
393}
394
395impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<Vec<A>> {
396 /// Flattens a vector of values
397 /// ```rust
398 /// extern crate tange;
399 /// extern crate tange_collection;
400 /// use tange::scheduler::GreedyScheduler;
401 /// use tange_collection::collection::disk::DiskCollection;
402 ///
403 /// let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
404 /// let flattened = col.flatten();
405 /// assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
406 /// ```
407 pub fn flatten(&self) -> DiskCollection<A> {
408 self.emit(move |x, emitter| {
409 for xi in x {
410 emitter(xi.clone());
411 }
412 })
413 }
414}
415
416impl <A: Any + Send + Sync + Clone + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
417 /// Returns the number of items in the collection
418 /// ```rust
419 /// extern crate tange;
420 /// extern crate tange_collection;
421 /// use tange::scheduler::GreedyScheduler;
422 /// use tange_collection::collection::disk::DiskCollection;
423 ///
424 /// let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
425 /// assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
426 /// let flattened = col.flatten();
427 /// assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
428 /// ```
429 pub fn count(&self) -> DiskCollection<usize> {
430 let nps = batch_apply(&self.partitions, |_idx, vs| {
431 vs.stream().into_iter().map(|_| 1usize).sum::<usize>()
432 });
433 let count = tree_reduce(&nps, |x, y| x + y).unwrap();
434 let acc = Arc::new(FileStore::empty(self.path.clone()));
435 let out = count.apply(move |x| {
436 acc.write_vec(vec![*x])
437 });
438 self.from_defs(vec![out])
439 }
440}
441
442impl <A: Any + Send + Sync + Clone + PartialEq + Hash + Eq + Serialize + for<'de>Deserialize<'de>> DiskCollection<A> {
443
444 /// Computes the frequencies of the items in collection.
445 /// ```rust
446 /// extern crate tange;
447 /// extern crate tange_collection;
448 /// use tange::scheduler::GreedyScheduler;
449 /// use tange_collection::collection::disk::DiskCollection;
450 ///
451 /// let col = DiskCollection::from_vec("/tmp".into(), vec![1, 2, 1, 5, 1, 2]);
452 /// let freqs = col.frequencies(1).sort_by(|x| x.0);
453 /// assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
454 /// ```
455pub fn frequencies(&self, partitions: usize) -> DiskCollection<(A, usize)> {
456 //self.partition(chunks, |x| x);
457 self.fold_by(|s| s.clone(),
458 || 0usize,
459 |acc, _l| *acc += 1,
460 |x, y| *x += *y,
461 partitions)
462 }
463}
464
465// Writes out data
466impl DiskCollection<String> {
467 /// Writes each record in a collection to disk, newline delimited.
468 /// DiskCollection will create anew file within the path for each partition written.
469 pub fn sink(&self, path: &str) -> DiskCollection<usize> {
470 let acc = Arc::new(FileStore::empty(self.path.clone()));
471 let p: Arc<String> = Arc::new(path.to_owned());
472 let pats = batch_apply(&self.partitions, move |idx, vs| {
473 let p2 = p.clone();
474 let local: &str = &p2;
475 fs::create_dir_all(local)
476 .expect("Welp, something went terribly wrong when creating directory");
477
478 let file = fs::File::create(&format!("{}/{}", local, idx))
479 .expect("Issues opening file!");
480 let mut bw = BufWriter::new(file);
481
482 let mut size = 0usize;
483 for line in vs.stream() {
484 bw.write(line.as_bytes()).expect("Error writing out line");
485 bw.write(b"\n").expect("Error writing out line");
486 size += 1;
487 }
488
489 acc.write_vec(vec![size])
490 });
491
492 self.from_defs(pats)
493 }
494}
495
496#[cfg(test)]
497mod test_lib {
498 use super::*;
499 use tange::scheduler::{GreedyScheduler,LeveledScheduler};
500
501 fn make_col() -> DiskCollection<usize> {
502 DiskCollection::from_vec("/tmp".into(), vec![1,2,3,1,2usize])
503 }
504
505 #[test]
506 fn test_fold_by() {
507 let col = make_col();
508 let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 1);
509 let mut results = out.run(&LeveledScheduler).unwrap();
510 results.sort();
511 assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
512 }
513
514 #[test]
515 fn test_fold_by_parts() {
516 let col = make_col();
517 let out = col.fold_by(|x| *x, || 0, |x, _y| *x += 1, |x, y| *x += y, 2);
518 assert_eq!(out.partitions.len(), 2);
519 let mut results = out.run(&LeveledScheduler).unwrap();
520 results.sort();
521 assert_eq!(results, vec![(1, 2), (2, 2), (3, 1)]);
522 }
523
524 #[test]
525 fn test_partition_by_key() {
526 let col = make_col();
527 let computed = col.partition_by_key(2, |x| *x)
528 .sort_by(|x| *x);
529 assert_eq!(computed.partitions.len(), 2);
530 let results = computed.run(&LeveledScheduler).unwrap();
531 assert_eq!(results, vec![2, 2, 3, 1, 1]);
532 }
533
534 #[test]
535 fn test_partition() {
536 let col = make_col();
537 let computed = col.partition(2, |_idx, x| x % 2)
538 .sort_by(|x| *x);
539 assert_eq!(computed.partitions.len(), 2);
540 let results = computed.run(&GreedyScheduler::new()).unwrap();
541 assert_eq!(results, vec![2, 2, 1, 1, 3]);
542 }
543
544 #[test]
545 fn test_count() {
546 let col = make_col();
547 let results = col.split(3).count().run(&mut LeveledScheduler).unwrap();
548 assert_eq!(results, vec![5]);
549 }
550
551 #[test]
552 fn test_join() {
553 let col1 = make_col();
554 let col2 = DiskCollection::from_vec("/tmp".into(),
555 vec![(2, 1.23f64), (3usize, 2.34)]);
556 let out = col1.join_on(&col2, |x| *x, |y| y.0, |x, y| {
557 (*x, y.1)
558 }, 5).split(1).sort_by(|x| x.0);
559 let results = out.run(&LeveledScheduler).unwrap();
560 let expected = vec![(2, (2, 1.23)), (2, (2, 1.23)), (3, (3, 2.34))];
561 assert_eq!(results, expected);
562 }
563
564 #[test]
565 fn test_emit() {
566 let results = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize])
567 .emit(|num, emitter| {
568 for i in 0..*num {
569 emitter(i);
570 }
571 })
572 .sort_by(|x| *x)
573 .run(&LeveledScheduler).unwrap();
574 let expected = vec![0, 0, 0, 1, 1, 2];
575 assert_eq!(results, expected);
576 }
577
578 #[test]
579 fn test_sort() {
580 let results = DiskCollection::from_vec("/tmp".into(), vec![1, 3, 2usize])
581 .sort_by(|x| *x)
582 .run(&LeveledScheduler).unwrap();
583 let expected = vec![1, 2, 3];
584 assert_eq!(results, expected);
585 }
586
587}