Struct tange_collection::collection::disk::DiskCollection
source · Expand description
DiskCollection struct.
Implementations§
source§impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
sourcepub fn from_vec(path: String, vec: Vec<A>) -> DiskCollection<A>
pub fn from_vec(path: String, vec: Vec<A>) -> DiskCollection<A>
Create a new DiskCollection form a Vector of objects.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
sourcepub fn from_memory(path: String, mc: &Vec<Deferred<Vec<A>>>) -> DiskCollection<A>
pub fn from_memory(path: String, mc: &Vec<Deferred<Vec<A>>>) -> DiskCollection<A>
Converts a collection of Deferred objects into a DiskCollection
This is usually best used from the MemoryCollection
sourcepub fn from_stores(
path: String,
fs: Vec<Deferred<Arc<FileStore<A>>>>
) -> DiskCollection<A>
pub fn from_stores(
path: String,
fs: Vec<Deferred<Arc<FileStore<A>>>>
) -> DiskCollection<A>
Creats a DiskCollection for a set of FileStores.
sourcepub fn to_defs(&self) -> &Vec<Deferred<Arc<FileStore<A>>>>
pub fn to_defs(&self) -> &Vec<Deferred<Arc<FileStore<A>>>>
Provides raw access to the underlying partitions
sourcepub fn to_memory(&self) -> MemoryCollection<A>
pub fn to_memory(&self) -> MemoryCollection<A>
Converts a DiskCollection to a MemoryCollection
sourcepub fn n_partitions(&self) -> usize
pub fn n_partitions(&self) -> usize
Returns the current number of data partitions
sourcepub fn concat(&self, other: &DiskCollection<A>) -> DiskCollection<A>
pub fn concat(&self, other: &DiskCollection<A>) -> DiskCollection<A>
Concatentates two collections into a single Collection
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
let two = DiskCollection::from_vec("/tmp".into(), vec![4usize, 5, 6]);
let cat = one.concat(&two);
assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
sourcepub fn map<B: Any + Send + Sync + Clone + Serialize, F: 'static + Sync + Send + Clone + Fn(&A) -> B>(
&self,
f: F
) -> DiskCollection<B>
pub fn map<B: Any + Send + Sync + Clone + Serialize, F: 'static + Sync + Send + Clone + Fn(&A) -> B>(
&self,
f: F
) -> DiskCollection<B>
Maps a function over the values in the DiskCollection, returning a new DiskCollection
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let one = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
let strings = one.map(|i| format!("{}", i));
assert_eq!(strings.run(&GreedyScheduler::new()),
Some(vec!["1".into(),"2".into(),"3".into()]));
sourcepub fn filter<F: 'static + Sync + Send + Clone + Fn(&A) -> bool>(
&self,
f: F
) -> DiskCollection<A>
pub fn filter<F: 'static + Sync + Send + Clone + Fn(&A) -> bool>(
&self,
f: F
) -> DiskCollection<A>
Filters out items in the collection that fail the predicate.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
let odds = col.filter(|x| x % 2 == 1);
assert_eq!(odds.run(&GreedyScheduler::new()),
Some(vec![1, 3usize]));
sourcepub fn split(&self, n_chunks: usize) -> DiskCollection<A>
pub fn split(&self, n_chunks: usize) -> DiskCollection<A>
Re-partitions a collection by the number of provided chunks. It uniformly distributes data from each old partition into each new partition.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
assert_eq!(col.n_partitions(), 1);
let two = col.split(2);
assert_eq!(two.n_partitions(), 2);
let two = col.split(3);
assert_eq!(two.n_partitions(), 3);
sourcepub fn emit<B: Any + Send + Sync + Clone + Serialize, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
f: F
) -> DiskCollection<B>
pub fn emit<B: Any + Send + Sync + Clone + Serialize, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
f: F
) -> DiskCollection<B>
Maps over all items in a collection, optionally emitting new values. It can be used to efficiently fuse a number of map/filter/flat_map functions into a single method.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3usize]);
let new = col.emit(|item, emitter| {
if item % 2 == 0 {
emitter(format!("{}!", item));
}
});
assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
sourcepub fn partition<F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize>(
&self,
partitions: usize,
f: F
) -> DiskCollection<A>
pub fn partition<F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize>(
&self,
partitions: usize,
f: F
) -> DiskCollection<A>
Re-partitions data into N new partitions by the given function. The user provided function is used as a hash function, mapping the returned value to a partition index. This makes it useful for managing which partition data ends up!
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
assert_eq!(new_col.n_partitions(), 2);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
sourcepub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>, B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>, D: 'static + Sync + Send + Clone + Fn() -> B, F: 'static + Sync + Send + Clone + Fn(&A) -> K, O: 'static + Sync + Send + Clone + Fn(&mut B, &A), R: 'static + Sync + Send + Clone + Fn(&mut B, &B)>(
&self,
key: F,
default: D,
binop: O,
reduce: R,
partitions: usize
) -> DiskCollection<(K, B)>
pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>, B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>, D: 'static + Sync + Send + Clone + Fn() -> B, F: 'static + Sync + Send + Clone + Fn(&A) -> K, O: 'static + Sync + Send + Clone + Fn(&mut B, &A), R: 'static + Sync + Send + Clone + Fn(&mut B, &B)>(
&self,
key: F,
default: D,
binop: O,
reduce: R,
partitions: usize
) -> DiskCollection<(K, B)>
Folds and accumulates values across multiple partitions into K new partitions. This is also known as a “group by” with a following reducer.
DiskCollection first performs a block aggregation: that is, it combines values
within each partition first using the binop
function. It then hashes
each key to a new partition index, where it will then aggregate all keys using the
reduce
function.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4,5usize]);
// Sum all odds and evens together
let group_sum = col.fold_by(|x| x % 2,
|| 0usize,
|block_acc, item| {*block_acc += *item},
|part_acc1, part_acc2| {*part_acc1 += *part_acc2},
1)
.sort_by(|x| x.0);
assert_eq!(group_sum.n_partitions(), 1);
assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
sourcepub fn partition_by_key<K: Any + Sync + Send + Clone + Hash + Eq, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
n_chunks: usize,
key: F
) -> DiskCollection<A>
pub fn partition_by_key<K: Any + Sync + Send + Clone + Hash + Eq, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
n_chunks: usize,
key: F
) -> DiskCollection<A>
Simple function to re-partition values by a given key. The return key is hashed and moduloed by the new partition count to determine where it will end up.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4usize]);
let new_col = col.partition_by_key(2, |x| format!("{}", x));
assert_eq!(new_col.n_partitions(), 2);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
sourcepub fn sort_by<K: Ord, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
key: F
) -> DiskCollection<A>
pub fn sort_by<K: Ord, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
key: F
) -> DiskCollection<A>
Sorts values within each partition by a key function. If a global sort is desired, the collection needs to be re-partitioned into a single partition
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1,2,3,4i32]);
let new_col = col.sort_by(|x| -*x);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
sourcepub fn join_on<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>, B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>, C: Any + Sync + Send + Clone + Serialize, KF1: 'static + Sync + Send + Clone + Fn(&A) -> K, KF2: 'static + Sync + Send + Clone + Fn(&B) -> K, J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C>(
&self,
other: &DiskCollection<B>,
key1: KF1,
key2: KF2,
joiner: J,
partitions: usize
) -> DiskCollection<(K, C)>
pub fn join_on<K: Any + Sync + Send + Clone + Hash + Eq + Serialize + for<'de> Deserialize<'de>, B: Any + Sync + Send + Clone + Serialize + for<'de> Deserialize<'de>, C: Any + Sync + Send + Clone + Serialize, KF1: 'static + Sync + Send + Clone + Fn(&A) -> K, KF2: 'static + Sync + Send + Clone + Fn(&B) -> K, J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C>(
&self,
other: &DiskCollection<B>,
key1: KF1,
key2: KF2,
joiner: J,
partitions: usize
) -> DiskCollection<(K, C)>
Inner Joins two collections by the provided key function. If multiple values of the same key are found, they will be cross product for each pair found.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
let na = DiskCollection::from_vec("/tmp".into(), name_age);
let nm = DiskCollection::from_vec("/tmp".into(), name_money);
let joined = na.join_on(&nm,
|nax| nax.0.clone(),
|nmx| nmx.0.clone(),
|nax, nmx| (nax.0.clone(), nax.1, nmx.1),
1);
assert_eq!(joined.run(&GreedyScheduler::new()),
Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
source§impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<Vec<A>>
impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<Vec<A>>
sourcepub fn flatten(&self) -> DiskCollection<A>
pub fn flatten(&self) -> DiskCollection<A>
Flattens a vector of values
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
let flattened = col.flatten();
assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
source§impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
sourcepub fn count(&self) -> DiskCollection<usize>
pub fn count(&self) -> DiskCollection<usize>
Returns the number of items in the collection
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![vec![1usize,2],vec![3,4]]);
assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
let flattened = col.flatten();
assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
source§impl<A: Any + Send + Sync + Clone + PartialEq + Hash + Eq + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
impl<A: Any + Send + Sync + Clone + PartialEq + Hash + Eq + Serialize + for<'de> Deserialize<'de>> DiskCollection<A>
sourcepub fn frequencies(&self, partitions: usize) -> DiskCollection<(A, usize)>
pub fn frequencies(&self, partitions: usize) -> DiskCollection<(A, usize)>
Computes the frequencies of the items in collection.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::disk::DiskCollection;
let col = DiskCollection::from_vec("/tmp".into(), vec![1, 2, 1, 5, 1, 2]);
let freqs = col.frequencies(1).sort_by(|x| x.0);
assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
source§impl DiskCollection<String>
impl DiskCollection<String>
sourcepub fn sink(&self, path: &str) -> DiskCollection<usize>
pub fn sink(&self, path: &str) -> DiskCollection<usize>
Writes each record in a collection to disk, newline delimited. DiskCollection will create anew file within the path for each partition written.